#Functions for silver layer transformations

In [0]:
from pyspark.sql.functions import col, countDistinct, isnull

In [0]:
#Creates all event tables from a given dataframe and returns them as a dictionary with key {event_name} and value {dataframe containing those events}
def create_event_tables(df_in):
    events_df = remove_non_event_cols(df_in)#Remove actor/repo/org columns
    event_names = df_in.select('type').distinct().rdd.map(lambda r: r[0]).collect()# get python list of unique event types (names)

    #map over list of event names, returning a tuples (event_name, event_df) as list; See create_event_table function
    event_tables_tuple = map(lambda e_name: (e_name, create_event_table(events_df, e_name)), event_names)

    #convert list of tuples (event_name, event_df) to dictionary with event_name as key, and event_df as value
    event_tables = {event_name:event_table for event_name,event_table in event_tables_tuple}
    
    return event_tables

In [0]:
# Drops columns unrelated to event tables
def remove_non_event_cols(df_in):
    drop_column_names = ['actor_gravatar_id','actor_login','actor_url','org_avatar_url','org_gravatar_id','org_login',
                        'org_url','repo_name','repo_url','actor_avatar','actor_display_login','actor_avatar']
    return df_in.drop(*drop_column_names)

In [0]:
#Drop columns where a sample of 20 columns is all null (assume the column is all null). 
def drop_null_cols(df_in): 
    row = df_in.take(1)[0].asDict() # get one row, for schema as a dict
    count = 20 #sample only a few rows for performance
    drop_names = [] #list of names to drop
    df = df_in.limit(20) #sample n number of rows
    for name, value in row.items(): #iterate over rows
        null_count = df.where(col(name).isNull()).count()
        if count == null_count: #if all sampled rows are null, assume column is row
            drop_names.append(name) #add this column to list of names to drop
    df_out = df_in.drop(*drop_names) #drop the null columns
    return df_out


In [0]:
#Flattens the specified column and drops the original column
def flatten_out(df_in, column_name):
    #select all columns, and sub columns of event_name as one df, and drop event_name column
    df_out = df_in.select('*', column_name+".*").drop(column_name)
    return df_out

In [0]:
# filters rows of dataframe by event type string 
def filter_event_type(df_in, event_type = str):
    event_df = df_in.where(df_in.type == f'{event_type}')
    return(event_df)

In [0]:
# Creates a single event table
def create_event_table(df_in, event_name):
    #et all events of specific type (i.e. ForkEvent, PushEvent, etc.)
    event_table = filter_event_type(df_in, event_name)

    #this is used on paylod, such that the result is all of payload's columns in the event df, and no more payload column
    event_table = flatten_out(event_table, "payload")

    #drop null columns
    event_table = drop_null_cols(event_table)
    return event_table