In [4]:
"""
Zone 5, step 6: CTU imputations

After imputing rows for missing CTUs, create table with row-wise listing:		
Column name		Table 1
Imputation approach (min, max, median, cumsum, zero, FFill, etc.)		Table 1
Proportion of accounts that have more than:		Table 1
99% missing		
75% missing		
50% missing		
25% missing		
		
Differences in descriptive statistics between steps 6 and 3		Table 2
Delta min		
Delta max		
Delta mean		
Delta std		
Delta median	

Input files: 
imputed_train_ and  preprocessing_
		
        
TO DO:
Sort by delta max desc
"""

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

def start_spark_session():
    """
    Starting spark session
    """

    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()

    return spark

def calc_column_func(df, column, func):

    """
    for a column, calculate a statistical value
    """

    return df.agg({column : func}).collect()[0][0]


def get_descriptive_statistics_for_columns(df):

    """
    Get the columns names and for every column create a tupel col, maximum , minumium to make it sutable to create a datafrma out out tuples
    (event1, 3, 1) 
    """

    columns = preprocessing_df.schema.names
    columns_with_stats = []  # append tuples to a list, later to create a spark df
    for col in columns: # for each column calculate stat values
        maximum = calc_column_func(df, col, 'max')
        minimum = calc_column_func(df, col, 'min')
        mean = calc_column_func(df, col, 'avg')
        columns_with_stats.append((col,maximum, minimum, mean))
    return columns_with_stats 


def drop_garbage_cols(df):
    """
    Drop some of the unnesessary columns
    """
    columns_to_drop = ['level_0', 'index', 'Unnamed: 0', '_c0']
    df_to_drop = df.select('*')
    df_to_drop = df_to_drop.drop(*columns_to_drop)
    
    return df_to_drop


def get_delta_columns_df(joined_df):
    """
    Substract simmilar summary columns (like min, max, mean .. ) for preprocessing df and imputed df 
    """
    joined_df_min = joined_df.withColumn("delta_min", col("min_pre") - col("min"))
    joined_df_min_max = joined_df_min.withColumn("delta_max", col("max_pre") - col("max"))
    joined_df_min_max_mean = joined_df_min_max.withColumn("delta_mean", col("mean_pre") - col("mean"))
    
    return joined_df_min_max_mean
    
 
"""
**** MAIN *****
"""


spark = start_spark_session()
preprocessing_file_name = "../data/preprocessing_2020_06_30_1.csv"
imputed_file_name = "../data/imputed_train_2020_06_30_1.csv"
#example_file_name = "../example1.csv"

preprocessing_df = spark.read.format("csv").option("header", "true").load(preprocessing_file_name)
preprocessing_columns_with_stats = get_descriptive_statistics_for_columns(preprocessing_df)
preprocessing_cols_stats_df = spark.createDataFrame( preprocessing_columns_with_stats, ['column','max','min','mean'] )


imputed_df = spark.read.format("csv").option("header", "true").load(imputed_file_name)
imputed_columns_with_stats = get_descriptive_statistics_for_columns(imputed_df)
imputed_cols_stats_df = spark.createDataFrame( imputed_columns_with_stats, ['column','max','min','mean'] )


preprocessing_cols_stats_df_re = preprocessing_cols_stats_df.\
select(*(col(x).alias(x + '_pre') for x in preprocessing_cols_stats_df.columns))
joined_df = preprocessing_cols_stats_df_re.join(imputed_cols_stats_df, preprocessing_cols_stats_df_re.column_pre == imputed_cols_stats_df.column)

delta_columns_df = get_delta_columns_df(joined_df)
delta_columns_df.select('column','delta_min', 'delta_max', 'delta_mean').show(n=45, truncate= False)


KeyboardInterrupt: 

In [31]:
preprocessing_file_name = "../data/preprocessing_2020_06_30_1.csv"
imputed_file_name = "../data/imputed_train_2020_06_30_1.csv"
genereal_preprocessing_file_name = "../data/general_preprocessing_2020_06_30_1.csv"

In [33]:
spark = start_spark_session()
preprocessing_df = spark.read.format("csv").option("header", "true").load(preprocessing_file_name)
imputed_df = spark.read.format("csv").option("header", "true").load(imputed_file_name)
general_preprocessing_df = spark.read.format("csv").option("header", "true").\
        load(genereal_preprocessing_file_name)


In [61]:
general_preprocessing_df.where("party_id = 7").toPandas()

Unnamed: 0.1,_c0,level_0,index,Unnamed: 0,party_id,event_date,cai_ins_grs_vmc,cai_ins_grs_mrc,cai_ins_grs_erc,cai_ins_grs_evmc,...,event_positive_interactions,event_negative_interactions,td_last_cai_ins_grs_vmc,td_last_cai_ins_grs_mrc,td_last_cai_ins_grs_erc,td_last_cai_ins_grs_evmc,td_last_cai_ins_grs_vuc,td_last_cai_ins_grs_evnt_1,td_last_cai_ins_grs_evnt_2,td_last_cai_ins_grs_evnt_3
0,541,541,541,541,7,2019-01-05,17.0,181.0,123.0,0.0,...,321.0,0.0,0,0,0,-1,-1,0,-1,-1
1,542,542,542,542,7,2019-01-30,9.0,11.0,59.0,0.0,...,79.0,0.0,0,0,0,-1,0,25,-1,-1
2,543,543,543,543,7,2019-02-01,19.0,19.0,15.0,15.597355484771953,...,68.59735548477195,0.0,0,0,0,0,2,27,-1,-1
3,544,544,544,544,7,2019-02-17,14.0,68.0,38.0,0.0,...,120.0,0.0,0,0,0,16,18,43,-1,-1
4,545,545,545,545,7,2019-02-19,17.0,38.0,73.0,0.0,...,128.0,0.0,0,0,0,18,20,45,-1,-1
5,546,546,546,546,7,2019-02-22,18.0,34.0,44.0,0.0,...,96.0,0.0,0,0,0,21,23,48,-1,-1
6,547,547,547,547,7,2019-02-27,0.0,48.0,23.0,0.0,...,71.0,0.0,5,0,0,26,28,53,-1,-1
7,548,548,548,548,7,2019-02-28,0.0,0.0,7.0,0.0,...,7.0,0.0,6,1,0,27,29,54,-1,-1
8,549,549,549,549,7,2019-03-15,1.0,74.0,64.0,0.0,...,139.0,0.0,0,0,0,42,44,69,-1,0
9,550,550,550,550,7,2019-03-18,16.0,48.0,14.0,0.0,...,78.0,0.0,0,0,0,45,47,0,-1,3


In [62]:
preprocessing_df.where("ctu >= 0").where("party_id = 7").toPandas()

Unnamed: 0.1,_c0,level_0,index,Unnamed: 0,party_id,event_date,cai_ins_grs_vmc,cai_ins_grs_mrc,cai_ins_grs_erc,cai_ins_grs_evmc,...,event_negative_interactions,td_last_cai_ins_grs_vmc,td_last_cai_ins_grs_mrc,td_last_cai_ins_grs_erc,td_last_cai_ins_grs_evmc,td_last_cai_ins_grs_vuc,td_last_cai_ins_grs_evnt_1,td_last_cai_ins_grs_evnt_2,td_last_cai_ins_grs_evnt_3,CTU
0,541,541,541,541,7,2019-01-05,17.0,181.0,123.0,0.0,...,0.0,0,0,0,-1,-1,0,-1,-1,15.0
1,542,542,542,542,7,2019-01-30,9.0,11.0,59.0,0.0,...,0.0,0,0,0,-1,0,25,-1,-1,15.0
2,543,543,543,543,7,2019-02-01,19.0,19.0,15.0,15.597355484771953,...,0.0,0,0,0,0,2,27,-1,-1,14.0
3,544,544,544,544,7,2019-02-17,14.0,68.0,38.0,0.0,...,0.0,0,0,0,16,18,43,-1,-1,14.0
4,545,545,545,545,7,2019-02-19,17.0,38.0,73.0,0.0,...,0.0,0,0,0,18,20,45,-1,-1,14.0
5,546,546,546,546,7,2019-02-22,18.0,34.0,44.0,0.0,...,0.0,0,0,0,21,23,48,-1,-1,14.0
6,547,547,547,547,7,2019-02-27,0.0,48.0,23.0,0.0,...,0.0,5,0,0,26,28,53,-1,-1,14.0
7,548,548,548,548,7,2019-02-28,0.0,0.0,7.0,0.0,...,0.0,6,1,0,27,29,54,-1,-1,14.0
8,549,549,549,549,7,2019-03-15,1.0,74.0,64.0,0.0,...,0.0,0,0,0,42,44,69,-1,0,13.0
9,550,550,550,550,7,2019-03-18,16.0,48.0,14.0,0.0,...,0.0,0,0,0,45,47,0,-1,3,13.0


In [60]:
general_preprocessing_pds

Unnamed: 0,party_id,count
0,296,81
1,467,87
2,675,84
3,691,93
4,829,98
5,125,89
6,451,106
7,800,80
8,853,98
9,944,102


In [56]:
preprocessing_df_pds

Unnamed: 0,party_id,count
0,296,68
1,467,76
2,675,76
3,691,86
4,829,86
...,...,...
994,990,88
995,119,76
996,898,95
997,438,78


In [59]:
import pandas as pd
pd.set_option('max_rows', None)
general_preprocessing_pds['count'] - preprocessing_df_pds['count']

0      13
1      11
2       8
3       7
4      12
5       6
6      13
7       6
8       8
9      12
10      9
11     11
12      8
13      5
14     12
15      8
16     12
17     14
18      9
19     12
20     11
21     11
22     12
23     10
24     16
25      9
26      9
27     10
28     13
29     11
30      7
31      8
32      7
33      7
34      9
35     12
36     15
37     11
38     11
39      6
40      9
41     10
42     11
43      9
44      7
45     15
46      9
47      7
48      8
49     14
50     13
51     13
52     10
53      9
54     10
55     10
56      8
57      9
58      9
59     13
60     14
61      6
62     10
63     11
64     11
65     10
66      9
67     14
68      8
69      6
70     11
71     12
72     11
73      9
74      6
75     10
76      9
77     15
78      7
79     15
80     14
81      9
82      6
83      8
84      8
85     14
86      4
87     10
88      9
89     11
90      8
91      6
92     11
93     14
94     12
95     13
96     18
97      8
98      9
99      7


In [41]:
preprocessing_df.where("ctu >= 0").groupBy('party_id').count().show()

+--------+-----+
|party_id|count|
+--------+-----+
|     296|   68|
|     467|   76|
|     675|   76|
|     691|   86|
|     829|   86|
|     125|   83|
|     451|   93|
|     800|   74|
|     853|   90|
|     944|   90|
|     666|   77|
|     870|   85|
|     919|   74|
|     926|   65|
|       7|   76|
|      51|   94|
|     124|   77|
|     447|   83|
|     591|   81|
|     307|   86|
+--------+-----+
only showing top 20 rows



In [42]:
imputed_df.where("ctu >=0").groupBy('party_id').count().show()

+--------+-----+
|party_id|count|
+--------+-----+
|     296|   68|
|     467|   76|
|     675|   77|
|     691|   86|
|     829|   86|
|     125|   83|
|     451|   93|
|     800|   74|
|     853|   90|
|     944|   90|
|     666|   77|
|     870|   85|
|     919|   74|
|     926|   65|
|       7|   76|
|      51|   94|
|     124|   77|
|     447|   83|
|     591|   81|
|     307|   86|
+--------+-----+
only showing top 20 rows



In [28]:
preprocessing_df.where("party_id == 7").select("event_date","cai_ins_grs_vmc",
                                               "cai_ins_grs_mrc","cai_ins_grs_erc", "ctu").show(n=90)

+----------+---------------+---------------+---------------+----+
|event_date|cai_ins_grs_vmc|cai_ins_grs_mrc|cai_ins_grs_erc| ctu|
+----------+---------------+---------------+---------------+----+
|2019-01-05|           17.0|          181.0|          123.0|15.0|
|2019-01-30|            9.0|           11.0|           59.0|15.0|
|2019-02-01|           19.0|           19.0|           15.0|14.0|
|2019-02-17|           14.0|           68.0|           38.0|14.0|
|2019-02-19|           17.0|           38.0|           73.0|14.0|
|2019-02-22|           18.0|           34.0|           44.0|14.0|
|2019-02-27|            0.0|           48.0|           23.0|14.0|
|2019-02-28|            0.0|            0.0|            7.0|14.0|
|2019-03-15|            1.0|           74.0|           64.0|13.0|
|2019-03-18|           16.0|           48.0|           14.0|13.0|
|2019-04-04|           16.0|           84.0|           63.0|12.0|
|2019-04-05|           11.0|           59.0|           69.0|12.0|
|2019-04-1

In [29]:
imputed_df.where("party_id == 7").select("event_date","cai_ins_grs_vmc",
                                               "cai_ins_grs_mrc","cai_ins_grs_erc", "ctu").show(n=90)

+----------+---------------+---------------+---------------+----+
|event_date|cai_ins_grs_vmc|cai_ins_grs_mrc|cai_ins_grs_erc| ctu|
+----------+---------------+---------------+---------------+----+
|2019-01-05|           17.0|          181.0|          123.0|15.0|
|2019-01-30|            9.0|           11.0|           59.0|15.0|
|2019-02-01|           19.0|           19.0|           15.0|14.0|
|2019-02-17|           14.0|           68.0|           38.0|14.0|
|2019-02-19|           17.0|           38.0|           73.0|14.0|
|2019-02-22|           18.0|           34.0|           44.0|14.0|
|2019-02-27|            0.0|           48.0|           23.0|14.0|
|2019-02-28|            0.0|            0.0|            7.0|14.0|
|2019-03-15|            1.0|           74.0|           64.0|13.0|
|2019-03-18|           16.0|           48.0|           14.0|13.0|
|2019-04-04|           16.0|           84.0|           63.0|12.0|
|2019-04-05|           11.0|           59.0|           69.0|12.0|
|2019-04-1

In [40]:
general_preprocessing_df.where("party_id == 7").select("event_date","cai_ins_grs_vmc","cai_ins_grs_mrc","cai_ins_grs_erc").count()
#.show(n=90)

88