In [1]:
# Inspired by notebook from Aguiar
# Func: Extract all features, flatten json columns
# with multiprocessing module
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import json
import gc
import sys
import math
from pandas.io.json import json_normalize
from datetime import datetime
import os
import time
from datetime import datetime
from ast import literal_eval
import multiprocessing
import glob
pd.set_option('display.max_columns', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)


In [2]:
# Define the dataframe processing module
JSON_COLUMNS = ['device', 'geoNetwork', 'totals', 'trafficSource']
def work_on_one_reader(df,index,flag):
    df.reset_index(drop = True,inplace = True)
    for column in JSON_COLUMNS:
        column_as_df = json_normalize(df[column])
        column_as_df.columns = [f"{column}.{subcolumn}" for subcolumn in column_as_df.columns]
        df = df.drop(column, axis=1).merge(column_as_df, right_index=True, left_index=True)

        # Normalize customDimensions
    df['customDimensions']=df['customDimensions'].apply(literal_eval)
    df['customDimensions']=df['customDimensions'].str[0]
    df['customDimensions']=df['customDimensions'].apply(lambda x: {'index':np.NaN,'value':np.NaN} if pd.isnull(x) else x)

    column_as_df = json_normalize(df['customDimensions'])
    column_as_df.columns = [f"customDimensions_{subcolumn}" for subcolumn in column_as_df.columns]
    df = df.drop('customDimensions', axis=1).merge(column_as_df, right_index=True, left_index=True)

    # ===============================================
#     print("---> working on hits")
    feat = 'hits'
    df[feat]=df[feat].apply(literal_eval)
    df[feat]=df[feat].str[0]
    df[feat]=df[feat].apply(lambda x: {'index':np.NaN} if pd.isnull(x) else x)
    column_as_df = json_normalize(df[feat])
    column_as_df.columns = [f"hits_{subcolumn}" for subcolumn in column_as_df.columns]
    df = df.drop(feat, axis=1).merge(column_as_df, right_index=True, left_index=True)

    # ===============================================
#     print("---> working on hits_promotion")
    feat = 'hits_promotion'
#     df[feat]=df[feat].apply(literal_eval)
    df[feat]=df[feat].str[0]
    df[feat]=df[feat].apply(lambda x: {'index':np.NaN} if pd.isnull(x) else x)
    column_as_df = json_normalize(df[feat])
    column_as_df.columns = [f"hits_promotion_{subcolumn}" for subcolumn in column_as_df.columns]
    df = df.drop(feat, axis=1).merge(column_as_df, right_index=True, left_index=True)


    # ===============================================
#     print("---> working on hits_product")
    feat = 'hits_product'
#     df[feat]=df[feat].apply(literal_eval)
    df[feat]=df[feat].str[0]
    df[feat]=df[feat].apply(lambda x: {'index':np.NaN} if pd.isnull(x) else x)
    column_as_df = json_normalize(df[feat])
    column_as_df.columns = [f"hits_product_{subcolumn}" for subcolumn in column_as_df.columns]
    df = df.drop(feat, axis=1).merge(column_as_df, right_index=True, left_index=True)

    bracket_col = ['hits_customDimensions','hits_customMetrics','hits_customVariables','hits_experiment',
               'hits_publisher_infos','hits_product_customDimensions','hits_product_customMetrics']
    for col in bracket_col:
        df[col] = df[col].str[0]
    out_name = "./data/out.{0}.{1}.csv".format(flag,index)
    df.to_csv(out_name,index=False)
    

In [3]:
def process_input_csv(csv_path,chunksize=5000,flag=None):
    time_beg = datetime.now()
    dfs = pd.read_csv(csv_path, sep=',',
            converters={column: json.loads for column in JSON_COLUMNS}, 
            dtype={'fullVisitorId': 'str'}, # Important!!
            chunksize=chunksize)
    jobs = []
    for index,df in enumerate(dfs):
        print("--> job {} started".format(index))
        p = multiprocessing.Process(target=work_on_one_reader,args=(df,index,flag,))
        p.start()
        jobs.append(p)
    for index,job in enumerate(jobs):
        job.join()
    
    # read in all processed csv file and concat together
    path = "./data"
    all_file = glob.glob(path+"/out.{}.*.csv".format(flag))
    list_ = []
    new_df = pd.DataFrame()
    for file in all_file:
        df = pd.read_csv(file,low_memory=False)
        list_.append(df)
    new_df = pd.concat(list_)
    time_end = datetime.now()
    print("----> Total time spent is {}".format(time_end - time_beg))
    return new_df

In [4]:
train_df = process_input_csv("./data/train_v2.csv",chunksize=5000,flag="train")
train_df.to_csv('./data/my_train.csv',index=False)

--> job 0 started
--> job 1 started
--> job 2 started
--> job 3 started
--> job 4 started
--> job 5 started
--> job 6 started
--> job 7 started
--> job 8 started
--> job 9 started
--> job 10 started
--> job 11 started
--> job 12 started
--> job 13 started
--> job 14 started
--> job 15 started
--> job 16 started
--> job 17 started
--> job 18 started
--> job 19 started
--> job 20 started
--> job 21 started
--> job 22 started
--> job 23 started
--> job 24 started
--> job 25 started
--> job 26 started
--> job 27 started
--> job 28 started
--> job 29 started
--> job 30 started
--> job 31 started
--> job 32 started
--> job 33 started
--> job 34 started
--> job 35 started
--> job 36 started
--> job 37 started
--> job 38 started
--> job 39 started
--> job 40 started
--> job 41 started
--> job 42 started
--> job 43 started
--> job 44 started
--> job 45 started
--> job 46 started
--> job 47 started
--> job 48 started
--> job 49 started
--> job 50 started
--> job 51 started
--> job 52 started
-->

of pandas will change to not sort by default.

To accept the future behavior, pass 'sort=False'.




----> Total time spent is 0:13:01.441712
