In [1]:
import json
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [145]:
def read_nested_json(df):
    col_list = []
    
    for name in df.schema.names:
        # if item is structtype, then get field names inside item
        if isinstance(df.schema[name].dataType, StructType):
            for field in df.schema[name].dataType.fields:
#                 col_list.append(col(".".join([name,field.name])).alias("_".join([name,field.name])))
                col_list.append(col(".".join([name,field.name])).alias(field.name))      
                
        # if item is Array type, then make new df and explode/separate array
        elif isinstance(df.schema[name].dataType, ArrayType):
            df = df.withColumn(name,explode(name).alias(name))
            col_list.append(name)
        # if other types, then just add
        else:
            col_list.append(name)
    df = df.select(col_list)
    return df

def correct_time_string(time_string):
    try:
        return time_string[:-2]+ ":" + time_string[-2:]
    except:
        return time_string
    
assert correct_time_string("20-04-2017 10:00:00 +0000") == "20-04-2017 10:00:00 +00:00"

def convert_to_timestamp(df,col_list):
    correct_tz_UDF = udf(lambda z:correct_time_string(z)) 
    for col_name in col_list:
        df = df.withColumn(col_name,correct_tz_UDF(col(col_name)))
        df = df.withColumn(col_name,to_timestamp(col_name,format = "dd-MM-yyyy HH:mm:ss z"))
    
    return df

In [4]:
spark = SparkSession.builder.appName('ticket_processing').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/27 13:21:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
json_path = "example.json"
with open(json_path) as json_file:
    json_dict = json.load(json_file)


In [5]:
# read json as rawfile
df_json = spark.read.json(json_path, multiLine= True)
df_json.printSchema()

root
 |-- activities_data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- activity: struct (nullable = true)
 |    |    |    |-- agent_id: long (nullable = true)
 |    |    |    |-- category: string (nullable = true)
 |    |    |    |-- contacted_customer: boolean (nullable = true)
 |    |    |    |-- group: string (nullable = true)
 |    |    |    |-- issue_type: string (nullable = true)
 |    |    |    |-- note: struct (nullable = true)
 |    |    |    |    |-- id: long (nullable = true)
 |    |    |    |    |-- type: long (nullable = true)
 |    |    |    |-- priority: long (nullable = true)
 |    |    |    |-- product: string (nullable = true)
 |    |    |    |-- requester: long (nullable = true)
 |    |    |    |-- shipment_date: string (nullable = true)
 |    |    |    |-- shipping_address: string (nullable = true)
 |    |    |    |-- source: long (nullable = true)
 |    |    |    |-- status: string (nullable = true)
 |    |    |-- performe

In [152]:
# extract activity data and flatten  dataframe
activity_df = df_json.select('activities_data')
activity_df = activity_df.withColumn('activities_data',explode('activities_data'))
col_list = [".".join(['activities_data', field.name]) \
            for field in activity_df.schema['activities_data'].dataType.fields]
activity_df = activity_df.select(col_list)
activity_df = read_nested_json(activity_df)

# convert performed_at and shipment_date to appropriate timestamp format
activity_df = convert_to_timestamp(activity_df,['performed_at'])
activity_df = activity_df.withColumn('shipment_date',to_date('shipment_date',format = "dd MMM, yyyy"))

#======================== check outputs ========================
assert activity_df.filter(activity_df.performed_at.isNull()).count() == 0
activity_df.printSchema()
activity_df.select(['ticket_id','performed_at','shipment_date']).show()


root
 |-- agent_id: long (nullable = true)
 |-- category: string (nullable = true)
 |-- contacted_customer: boolean (nullable = true)
 |-- group: string (nullable = true)
 |-- issue_type: string (nullable = true)
 |-- note: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- type: long (nullable = true)
 |-- priority: long (nullable = true)
 |-- product: string (nullable = true)
 |-- requester: long (nullable = true)
 |-- shipment_date: date (nullable = true)
 |-- shipping_address: string (nullable = true)
 |-- source: long (nullable = true)
 |-- status: string (nullable = true)
 |-- performed_at: timestamp (nullable = true)
 |-- performer_id: long (nullable = true)
 |-- performer_type: string (nullable = true)
 |-- ticket_id: long (nullable = true)

+---------+-------------------+-------------+
|ticket_id|       performed_at|shipment_date|
+---------+-------------------+-------------+
|      600|2017-04-21 19:03:38|         null|
|      704|2017-04-21 19:08:24|   2

In [190]:
col_list = [".".join(['metadata',field.name]) for field in df_json.schema['metadata'].dataType.fields]
metadata_df = df_json.select(col_list)
to_convert_list = ['start_at','end_at']
metadata_df = convert_to_timestamp(metadata_df,to_convert_list)
metadata_df.printSchema()
test_df.show()

root
 |-- activities_count: long (nullable = true)
 |-- end_at: timestamp (nullable = true)
 |-- start_at: timestamp (nullable = true)

+----------------+-------------------+-------------------+
|activities_count|             end_at|           start_at|
+----------------+-------------------+-------------------+
|               2|2017-04-21 19:29:59|2017-04-20 19:30:00|
+----------------+-------------------+-------------------+



In [205]:
metadata_df.withColumn('time_delta_mins',
                       round((metadata_df.end_at.cast('long') - metadata_df.start_at.cast('long')) / 60,0 )) \
.show()

+----------------+-------------------+-------------------+---------------+
|activities_count|             end_at|           start_at|time_delta_mins|
+----------------+-------------------+-------------------+---------------+
|               2|2017-04-21 19:29:59|2017-04-20 19:30:00|         1440.0|
+----------------+-------------------+-------------------+---------------+



In [178]:
test_dict = {"start_at" : "20-04-2017 10:00:00",
"end_at":  "21-04-2017 09:59:59",
"performed_at": "21-04-2017 09:38:24"}

In [179]:
from datetime import datetime as dt


In [182]:
test_dict_dt = {k:dt.strptime(v,"%d-%m-%Y %H:%M:%S") for k,v in test_dict.items()}
open_time = test_dict_dt['end_at'] - test_dict_dt['performed_at']
open_time.total_seconds()/60

21.583333333333332

In [206]:
pd_df = pd.DataFrame(json_dict['activities_data'])
df_raw = pd.concat([pd_df.drop(columns = ['activity']),pd_df.activity.apply(pd.Series)],
                   axis = 1)
df_raw

Unnamed: 0,performed_at,ticket_id,performer_type,performer_id,note,shipping_address,shipment_date,category,contacted_customer,issue_type,source,status,priority,group,agent_id,requester,product
0,21-04-2017 09:33:38 +0000,600,user,149018,"{'id': 4025864, 'type': 4}",,,,,,,,,,,,
1,21-04-2017 09:38:24 +0000,704,user,149018,,,"21 Apr, 2017",Phone,True,Incident,3.0,Open,4.0,refund,149018.0,145423.0,mobile
