In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("WSDM User Logs Processing").getOrCreate() 
df_validation = spark.read.csv('./data/sample_submission_zero.csv', inferSchema=True, header=True)
df_userlogs = spark.read.csv('./data/user_logs.csv', inferSchema=True, header=True)
# Limit userlog 'msno' column to only include those 'msno' values present in 'df_validation', so we only have the logs 
# of relevant users (in this case, the users in the validation set). This will take a while, as will all Spark SQL calls.
df_validation_userlogs = df_userlogs.join(df_validation, 'msno', how='inner')
# Clear up some memory
del df_userlogs

In [2]:
# Now, to aggregate over the relevant user log data
AGG_COLS = ['num_100', 'num_25', 'num_50', 'num_75', 'num_985', 'num_unq', 'total_secs']
AGG_FNS = ['avg', 'max', 'min', 'sum']
build_agg_dict = lambda agg_name: dict((cname, agg_name) for cname in AGG_COLS)
dfs_agg = []

for agg_name in AGG_FNS:
    agg_dict = build_agg_dict(agg_name)
    df_agg = df_validation_userlogs.groupby('msno').agg(agg_dict)
    dfs_agg.append(df_agg)
    
# Clear up some memory
del df_validation_userlogs

for df_agg in dfs_agg:
    df_validation = df_validation.join(df_agg, 'msno', how='inner')
    del df_agg

assert len(df_validation.columns) == 30

In [4]:
# Now we write the validation dataframe to a directory called 'validation_data'. Since the Spark execution
# model performs everything in parts, I've written a method in utils.py to compile all the parts' individual
# csv files into one larger csv file. This file will only be ~300MB and will fit easily into memory so we
# can do all our regular tricks with pandas.
import os
import pandas as pd

from utils import compile_csv_parts_to_larger_csv

VALIDATION_DATA_PATH = './data/validation_data'
VALIDATION_CSV_PATH = './data/validation_data.csv'

df_validation.write.csv(VALIDATION_DATA_PATH, header=True)
assert os.path.isdir(VALIDATION_DATA_PATH)
del df_validation
compile_csv_parts_to_larger_csv(csv_parts_path=VALIDATION_DATA_PATH, to_write_path=VALIDATION_CSV_PATH)
assert os.path.isfile(VALIDATION_CSV_PATH)

# Sanity check
df_validation = pd.read_csv(VALIDATION_CSV_PATH)
df_validation.head()

Now processing file name ./data/validation_data/part-00000-9e96cb8e-b2cb-45b3-bfae-171837523d42-c000.csv...
Now processing file name ./data/validation_data/part-00001-9e96cb8e-b2cb-45b3-bfae-171837523d42-c000.csv...
Now processing file name ./data/validation_data/part-00002-9e96cb8e-b2cb-45b3-bfae-171837523d42-c000.csv...
Now processing file name ./data/validation_data/part-00003-9e96cb8e-b2cb-45b3-bfae-171837523d42-c000.csv...
Now processing file name ./data/validation_data/part-00004-9e96cb8e-b2cb-45b3-bfae-171837523d42-c000.csv...
Now processing file name ./data/validation_data/part-00005-9e96cb8e-b2cb-45b3-bfae-171837523d42-c000.csv...
Now processing file name ./data/validation_data/part-00006-9e96cb8e-b2cb-45b3-bfae-171837523d42-c000.csv...
Now processing file name ./data/validation_data/part-00007-9e96cb8e-b2cb-45b3-bfae-171837523d42-c000.csv...
Now processing file name ./data/validation_data/part-00008-9e96cb8e-b2cb-45b3-bfae-171837523d42-c000.csv...
Now processing file name ./d

Now processing file name ./data/validation_data/part-00076-9e96cb8e-b2cb-45b3-bfae-171837523d42-c000.csv...
Now processing file name ./data/validation_data/part-00077-9e96cb8e-b2cb-45b3-bfae-171837523d42-c000.csv...
Now processing file name ./data/validation_data/part-00078-9e96cb8e-b2cb-45b3-bfae-171837523d42-c000.csv...
Now processing file name ./data/validation_data/part-00079-9e96cb8e-b2cb-45b3-bfae-171837523d42-c000.csv...
Now processing file name ./data/validation_data/part-00080-9e96cb8e-b2cb-45b3-bfae-171837523d42-c000.csv...
Now processing file name ./data/validation_data/part-00081-9e96cb8e-b2cb-45b3-bfae-171837523d42-c000.csv...
Now processing file name ./data/validation_data/part-00082-9e96cb8e-b2cb-45b3-bfae-171837523d42-c000.csv...
Now processing file name ./data/validation_data/part-00083-9e96cb8e-b2cb-45b3-bfae-171837523d42-c000.csv...
Now processing file name ./data/validation_data/part-00084-9e96cb8e-b2cb-45b3-bfae-171837523d42-c000.csv...
Now processing file name ./d

Now processing file name ./data/validation_data/part-00152-9e96cb8e-b2cb-45b3-bfae-171837523d42-c000.csv...
Now processing file name ./data/validation_data/part-00153-9e96cb8e-b2cb-45b3-bfae-171837523d42-c000.csv...
Now processing file name ./data/validation_data/part-00154-9e96cb8e-b2cb-45b3-bfae-171837523d42-c000.csv...
Now processing file name ./data/validation_data/part-00155-9e96cb8e-b2cb-45b3-bfae-171837523d42-c000.csv...
Now processing file name ./data/validation_data/part-00156-9e96cb8e-b2cb-45b3-bfae-171837523d42-c000.csv...
Now processing file name ./data/validation_data/part-00157-9e96cb8e-b2cb-45b3-bfae-171837523d42-c000.csv...
Now processing file name ./data/validation_data/part-00158-9e96cb8e-b2cb-45b3-bfae-171837523d42-c000.csv...
Now processing file name ./data/validation_data/part-00159-9e96cb8e-b2cb-45b3-bfae-171837523d42-c000.csv...
Now processing file name ./data/validation_data/part-00160-9e96cb8e-b2cb-45b3-bfae-171837523d42-c000.csv...
Now processing file name ./d

Unnamed: 0,avg(num_100),sum(total_secs),sum(num_50),min(num_75),avg(num_50),avg(num_985),min(total_secs),min(num_985),sum(num_25),min(num_unq),...,max(num_75),avg(num_unq),max(total_secs),msno,sum(num_985),max(num_25),avg(num_75),sum(num_unq),max(num_unq),max(num_50)
0,13.603248,1719500.266,557,0,1.292343,0.670534,5.34,0,3164,1,...,10,21.37355,32437.844,++4RuqBw0Ss6bQU4oMxaRlbBPoWzoEiIZaxPM04Y4+U=,289,46,0.886311,9212,133,28
1,28.584746,952643.059,168,0,1.423729,1.20339,5.878,0,2457,1,...,10,46.016949,109681.417,+/namlXq+u3izRjHCFJV4MgqcXcLidZYszVsROOq/y4=,142,101,1.186441,5430,241,9
2,31.03962,5130378.051,719,0,1.139461,0.892235,0.317,0,2797,1,...,35,30.091918,36909.298,+0/X9tkmyHyet9X80G6GTrDFHnJqvai8d1ZPhayT0os=,563,82,0.838352,18988,189,110
3,44.923195,7508563.4,648,0,0.995392,0.608295,75.001,0,2461,1,...,20,42.254992,44077.115,+09YGn842g6h2EZUXe0VWeC4bBoCbDGfUboitc0vIHw=,396,79,0.668203,27508,218,45
4,16.692939,2831811.704,1648,0,2.706076,0.83087,14.466,0,3317,1,...,92,21.883415,180091.548,+0jTOa6KGPk1vtNTwRDMZc/McUo41AeuwV3ndo54Y+Q=,506,175,1.07225,13327,187,90
