In [1]:
import sys
import math
assert sys.version_info >= (3, 5) # make sure we have Python 3.5+

from pyspark.sql import SparkSession, functions, types
cluster_seeds = ['199.60.17.188', '199.60.17.216']
spark = SparkSession.builder.appName('cassandra read tables and etl') \
    .config('spark.cassandra.connection.host', ','.join(cluster_seeds)).getOrCreate()
assert spark.version >= '2.3' # make sure we have Spark 2.3+
sc = spark.sparkContext

In [2]:
bureau = spark.read.option("inferSchema", True).\
    csv('/Users/michaelyang/Downloads/all_data/bureau.csv',header=True)
bureau_balance = spark.read.option("inferSchema", True).\
    csv('/Users/michaelyang/Downloads/all_data/bureau_balance.csv',header=True)


In [3]:
def agg_numeric(df, group_var):
    """Aggregates the numeric values in a dataframe. This can
    be used to create features for each instance of the grouping variable.
    """
    
    # Remove id variables other than grouping variable
    for col in df.columns:
        if col != group_var and 'SK_ID' in col:
            df = df.drop(col)
            
    numerical_feats= [f for f,t in df.dtypes if t != 'string']
    numeric_df = df.select(numerical_feats)

    # Group by the specified variable and calculate the statistics
    # = numeric_df.groupBy(group_var).agg(['count', 'mean', 'max', 'min', 'sum']).reset_index()
    count = numeric_df.groupBy(group_var).count()
    means = numeric_df.groupBy(group_var).avg().drop('avg(%s)' % (group_var))
    maxs = numeric_df.groupBy(group_var).max().drop('max(%s)' % (group_var))
    mins = numeric_df.groupBy(group_var).min().drop('min(%s)' % (group_var))
    sums = numeric_df.groupBy(group_var).sum().drop('sum(%s)' % (group_var))
    joined = count.join(means, count[group_var] == means[group_var]).drop(means[group_var])
    joined1 = joined.join(maxs,joined[group_var] == maxs[group_var]).drop(maxs[group_var])
    joined2 = joined1.join(mins,joined1[group_var] == mins[group_var]).drop(mins[group_var])
    joined3 = joined2.join(sums,joined1[group_var] == sums[group_var]).drop(sums[group_var])
    
    return joined3

In [4]:
def get_dummies(df,group_var):
    
    # parameter: a list of categorical features and a group key
    pivot_cols= [f for f,t in df.dtypes if t == 'string']
    keys = pivot_cols + [group_var] 
    
    before = df.select(keys)

    #Helper function to recursively join a list of dataframes
    #Can be simplified if you only need two columns
    def join_all(dfs,keys):
        if len(dfs) > 1:
            return dfs[0].join(join_all(dfs[1:],keys), on = keys, how = 'inner')
        else:
            return dfs[0]

    dfs = []
    combined = []
    for pivot_col in pivot_cols:
        pivotDF = before.groupBy(keys).pivot(pivot_col).count()
        new_names = pivotDF.columns[:len(keys)] +  ["e_{0}_{1}".format(pivot_col, c)
                                                    for c in pivotDF.columns[len(keys):]]        
        df = pivotDF.toDF(*new_names).fillna(0)    
        combined.append(df)

    encoded = join_all(combined,keys)
    
    #drop its original columns
    for col in pivot_cols:
        encoded = encoded.drop(col)
        
    return encoded

In [5]:
def count_categorical(df, group_var):
    """Computes counts and normalized counts for each observation
    of `group_var` of each unique category in every categorical variable
    """
    
    # Select the categorical columns
    encoded_df = get_dummies(df,group_var)

    # Groupby the group var and calculate the sum and mean
    categorical_encoded = encoded_df.groupBy(group_var).sum().drop('sum(%s)' % (group_var))
    
    return categorical_encoded

In [15]:
# bureau data
bureau_num = agg_numeric(bureau,'SK_ID_CURR')
bureau_cat = count_categorical(bureau,'SK_ID_CURR')
bureau_client = bureau_num.join(bureau_cat,bureau_num.SK_ID_CURR == bureau_cat.SK_ID_CURR).\
    drop(bureau_cat.SK_ID_CURR)

In [13]:
print(bureau_num.count(),len(bureau_num.columns))
print(bureau_cat.count(),len(bureau_cat.columns))

305811 50


305811 24


In [16]:
print(bureau_client.count(),len(bureau_client.columns))

305811 73


In [23]:
print(bureau.count(),len(bureau.columns))

1716428 17


In [19]:
# bureau balance
bb_num = agg_numeric(bureau_balance,'SK_ID_BUREAU')
bb_cat = count_categorical(bureau_balance,'SK_ID_BUREAU')
bb_client = bb_num.join(bb_cat,bb_num.SK_ID_BUREAU == bb_cat.SK_ID_BUREAU).\
    drop(bb_cat.SK_ID_BUREAU)
print(bb_num.count(),len(bb_num.columns))
print(bb_cat.count(),len(bb_cat.columns))
print(bb_client.count(),len(bb_client.columns))

817395 6


817395 9


817395 14


In [27]:
bureau_with_balance = bureau.join(bb_client, bureau.SK_ID_BUREAU == bb_client.SK_ID_BUREAU,
                                  how= 'left_outer')
bureau_num = agg_numeric(bureau_with_balance, 'SK_ID_CURR')
bureau_cat = count_categorical(bureau_with_balance, 'SK_ID_CURR')
bureau_client1 = bureau_num.join(bureau_cat, bureau_num.SK_ID_CURR == bureau_cat.SK_ID_CURR).drop(bureau_cat.SK_ID_CURR)
bureau_client1.show()
print(bureau_client1.count(),len(bureau_client1.columns))

+-----+----------+-------------------+-----------------------+------------------------+----------------------+---------------------------+-----------------------+-------------------+------------------------+-------------------------+---------------------------+-----------------------+----------------+------------------+------------------------+------------------------+------------------------+------------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------+-----------------------+------------------------+----------------------+---------------------------+-----------------------+-------------------+------------------------+-------------------------+---------------------------+-----------------------+----------------+----------+------------------------+------------------------+------------------------+------------------------+-------------------

305811 125


In [30]:
credit_card_balance = spark.read.option("inferSchema", True).\
    csv('/Users/michaelyang/Downloads/all_data/credit_card_balance.csv',header=True)
cc20 = spark.read.option("inferSchema", True).\
    csv('/Users/michaelyang/PycharmProjects/home-credit/20LineCSVs/credit_card_balance.csv',header=True)


In [36]:
bureau_with_balance.columns

['SK_ID_CURR',
 'SK_ID_BUREAU',
 'CREDIT_ACTIVE',
 'CREDIT_CURRENCY',
 'DAYS_CREDIT',
 'CREDIT_DAY_OVERDUE',
 'DAYS_CREDIT_ENDDATE',
 'DAYS_ENDDATE_FACT',
 'AMT_CREDIT_MAX_OVERDUE',
 'CNT_CREDIT_PROLONG',
 'AMT_CREDIT_SUM',
 'AMT_CREDIT_SUM_DEBT',
 'AMT_CREDIT_SUM_LIMIT',
 'AMT_CREDIT_SUM_OVERDUE',
 'CREDIT_TYPE',
 'DAYS_CREDIT_UPDATE',
 'AMT_ANNUITY',
 'count',
 'SK_ID_BUREAU',
 'avg(MONTHS_BALANCE)',
 'max(MONTHS_BALANCE)',
 'min(MONTHS_BALANCE)',
 'sum(MONTHS_BALANCE)',
 'sum(e_STATUS_0)',
 'sum(e_STATUS_1)',
 'sum(e_STATUS_2)',
 'sum(e_STATUS_3)',
 'sum(e_STATUS_4)',
 'sum(e_STATUS_5)',
 'sum(e_STATUS_C)',
 'sum(e_STATUS_X)']

In [31]:
print(credit_card_balance.count(),len(credit_card_balance.columns))


3840312 23


In [35]:
cc_num = agg_numeric(credit_card_balance,'SK_ID_PREV')
cc_num.show()


+-----+----------+-------------------+------------------+----------------------------+-----------------------------+-------------------------+-------------------------------+-----------------------------+----------------------------+------------------------+------------------------------+-----------------------------+------------------+-------------------------+-----------------------------+-------------------------+-------------------------------+-----------------------------+------------------------------+-------------------+-------------------+-------------------+----------------+----------------------------+-----------------------------+-------------------------+-------------------------------+-----------------------------+----------------------------+------------------------+------------------------------+-----------------------------+------------------+-------------------------+-----------------------------+-------------------------+-------------------------------+---------------