In [None]:
import pandas as pd
import pickle
import numpy as np

In [None]:
with open('patch_responses.pickle', 'rb') as handle:
    data = pickle.load(handle)
    

### Create dataframe with data from PaStA 
The "pasta aggregate" script aggregates/extracts variousinformation from the mbox-result of PaStA.
The responses option extracts and dumps the data for mbox-clusters with patches and all associated
emails and commits as a pickled dictionary. These can be further
used for input to various analyses on code review.

In [None]:
response_df = pd.DataFrame(data)

#### Some preprocessing and exploration
We explore some numbers on the input data, like patch count, commit counts, data types etc.

In [None]:
response_df.patch_id.nunique()

In [None]:
response_df.dtypes

In [None]:
# Replace null/NaN patch_ids
response_df.fillna({'patch_id':'_'}, inplace=True)

In [None]:
response_df['upstream'] = response_df['upstream'].map(list)

In [None]:
response_df.index.name = "idx"

In [None]:
response_df.set_index(['cluster_id', 'patch_id'], append=True, inplace=True)

### Denormalize responses
The responses column is a dict type with different attributes like mesg_id, parent (parent thread'd mesg id), and the actual message (bytestring) itself.

In [None]:
df_melt_responses = pd.melt(response_df.responses.apply(pd.Series).reset_index(), 
            id_vars=['idx', 'cluster_id', 'patch_id'],
            value_name='responses').sort_index()

In [None]:
df_melt_responses.drop('variable', axis=1, inplace=True)

In [None]:
df_melt_responses.shape

In [None]:
# This library is a wrapper around json_normalize. Due to NaNs in the columns (no responses for some patches). 
# Ideally one could also use json_normalize, but due to NaNs it would't be straightforward.
# In principle we could directly use the flat_table on the list of dicts instead of the melt step above, 
# but that somehow did not work
import flat_table

In [None]:
df_with_responses = flat_table.normalize(df_melt_responses, expand_dicts=True, expand_lists=False)

In [None]:
df_with_responses.drop('index', axis=1, inplace=True)

In [None]:
df_with_responses.to_csv("df_with_responses.csv", index=False)

### Denormalize upstream

In [None]:
df_melt_upstream = pd.melt(response_df.upstream.apply(pd.Series).reset_index(),
             id_vars=['idx', 'cluster_id', 'patch_id'],
             value_name='upstream').sort_index()

In [None]:
df_melt_upstream.drop('variable', axis=1, inplace=True)

In [None]:
df_melt_upstream.to_csv("df_with_upstream.csv", index=False)

### Merge with Dask

In [None]:
import dask.dataframe as dd
import dask.multiprocessing
from dask.diagnostics import ProgressBar

In [None]:
dd1 = dd.read_csv("df_with_responses.csv", blocksize=1e9, dtype={"cluster_id ": "int32", "patch_id ": "category", \
                                                                 "responses.resp_msg_id": "category", \
                                                                 "responses.parent": "category" })

In [None]:
dd2 = dd.read_csv("df_with_upstream.csv", blocksize=1e9, dtype={"cluster_id ": "int32", "patch_id ": "category", \
                                                               "upstream": "category" })

In [None]:
df_dask_final = dd.merge(dd1, dd2, left_index=True, right_index=True, how='left') \
.drop(['patch_id_y', 'cluster_id_y', 'idx_y'], axis=1) \
.reset_index(drop=True) \
.rename(columns={"idx_x": "idx", "cluster_id_x": "cluster_id", "patch_id_x": "patch_id"})

In [None]:
# To compute the dataframe (otherwise the computation is lazy)
df_dask_final.compute()

In [None]:
# This can be executed directly, instead of compute above to save the frame as a single file
df_dask_final.to_csv("df_dask_final.csv", single_file = True)

In [None]:
# This is necessary if reading the final dataframe from disk. Reading with Dask gives 
# the advantage of using the resources better (blocksize parameter), dtypes are tuned to reduce memory usage.
final = dd.read_csv("df_dask_final.csv", blocksize=50e7, dtype={"cluster_id ": "int32", "patch_id ": "category", \
                                                                 "responses.resp_msg_id": "category", \
                                                                 "responses.parent": "category", \
                                                                 "upstream": "category"}).drop('Unnamed: 0', axis=1)

In [None]:
# Convert to pandas dataframe
final = final.compute(num_workers=100) 

In [None]:
# Apparently, duplicates can only be eliminated after converting to pandas. I suspect, while Dask is merging 
# several distributed dataframes, all duplicates cannot be detected. They are only found when the results 
# are collected as a whole
final.drop_duplicates(inplace=True)

In [None]:
# Size considerable reduced than df_dask_final
final.shape

In [None]:
# Save the pandas dataframe - intermediate denormalized data for the kernel patches with email response 
# and commit data
final.to_csv("df_pd_final.csv", index=False)