In [1]:
import warnings
warnings.filterwarnings("ignore")

### Spark

In [20]:
from pyspark import init_spark
import pyspark.sql.functions as sf
from pyspark.sql.window import Window as sw
from pyspark.sql.types import StringType, IntegerType, ArrayType
spark = init_spark({"appName": 'main_features'})

### Imports

In [21]:
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from sklearn.metrics import roc_auc_score

from tqdm import tqdm
import seaborn as sns
plt.style.use("bmh")
plt.rcParams['font.family'] = 'DejaVu Sans'

# Datasets

In [4]:
train = spark.table('alfa.andrey_auto_train')
test = spark.table('alfa.andrey_auto_test')

### Make real amnt

In [5]:
train = train.withColumn('amnt', sf.round(sf.exp(sf.col('amnt') * sf.lit(17.8209)) - sf.lit(1)))
test = test.withColumn('amnt', sf.round(sf.exp(sf.col('amnt') * sf.lit(17.8209)) - sf.lit(1)))

train = train.withColumn('log10_amnt', sf.log10('amnt'))
test = test.withColumn('log10_amnt', sf.log10('amnt'))

train = train.withColumn('n_order', sf.col('log10_amnt').cast(IntegerType()))
test = test.withColumn('n_order', sf.col('log10_amnt').cast(IntegerType()))

# Preproc

###### Number order

In [6]:
train_number_order = train.groupBy(['app_id']).agg(sf.mean('log10_amnt').alias('order_number_mean'),
                                                   sf.mean('n_order').alias('order_number_int_mean'),
                                                   sf.max('log10_amnt').alias('order_number_max'),
                                                   sf.max('n_order').alias('order_number_int_max'),
                                        sf.expr('percentile_approx(log10_amnt, 0.5)').alias('order_number_median'),
                                        sf.expr('percentile_approx(n_order, 0.5)').alias('order_number_int_median'))

test_number_order = test.groupBy(['app_id']).agg(sf.mean('log10_amnt').alias('order_number_mean'),
                                                 sf.mean('n_order').alias('order_number_int_mean'),
                                                 sf.max('log10_amnt').alias('order_number_max'),
                                                 sf.max('n_order').alias('order_number_int_max'),
                                        sf.expr('percentile_approx(log10_amnt, 0.5)').alias('order_number_median'),
                                        sf.expr('percentile_approx(n_order, 0.5)').alias('order_number_int_median'))

###### Features with hour_diff

In [7]:
train_hours_diff_features = train.select(['app_id', 'hour_diff'])
train_hours_diff_features = train_hours_diff_features.withColumn('is_0',
                                                (sf.col('hour_diff') == sf.lit(0)).cast(IntegerType()))
train_hours_diff_features = train_hours_diff_features.withColumn('is_1_2',
                ((sf.col('hour_diff') >= sf.lit(1)) & (sf.col('hour_diff') <= sf.lit(2))).cast(IntegerType()))
train_hours_diff_features = train_hours_diff_features.withColumn('is_3_10',
                ((sf.col('hour_diff') >= sf.lit(3)) & (sf.col('hour_diff') <= sf.lit(10))).cast(IntegerType()))
train_hours_diff_features = train_hours_diff_features.withColumn('is_1_10',
                ((sf.col('hour_diff') >= sf.lit(1)) & (sf.col('hour_diff') <= sf.lit(10))).cast(IntegerType()))
train_hours_diff_features = train_hours_diff_features.withColumn('is_11_100',
                ((sf.col('hour_diff') >= sf.lit(11)) & (sf.col('hour_diff') <= sf.lit(100))).cast(IntegerType()))
train_hours_diff_features = train_hours_diff_features.withColumn('is_100_plus',
                (sf.col('hour_diff') > sf.lit(100)).cast(IntegerType()))

train_hours_diff_features = train_hours_diff_features.groupBy(['app_id']).agg(
                                                            sf.mean('is_0').alias('hour_diff_fraq_0'),
                                                            sf.mean('is_1_2').alias('hour_diff_fraq_1_2'),
                                                            sf.mean('is_1_10').alias('hour_diff_fraq_1_10'),
                                                            sf.mean('is_3_10').alias('hour_diff_fraq_3_10'),
                                                            sf.mean('is_11_100').alias('hour_diff_fraq_11_100'),
                                                            sf.mean('is_100_plus').alias('hour_diff_fraq_100_plus'))

In [8]:
test_hours_diff_features = test.select(['app_id', 'hour_diff'])
test_hours_diff_features = test_hours_diff_features.withColumn('is_0',
                                                (sf.col('hour_diff') == sf.lit(0)).cast(IntegerType()))
test_hours_diff_features = test_hours_diff_features.withColumn('is_1_2',
                ((sf.col('hour_diff') >= sf.lit(1)) & (sf.col('hour_diff') <= sf.lit(2))).cast(IntegerType()))
test_hours_diff_features = test_hours_diff_features.withColumn('is_3_10',
                ((sf.col('hour_diff') >= sf.lit(3)) & (sf.col('hour_diff') <= sf.lit(10))).cast(IntegerType()))
test_hours_diff_features = test_hours_diff_features.withColumn('is_1_10',
                ((sf.col('hour_diff') >= sf.lit(1)) & (sf.col('hour_diff') <= sf.lit(10))).cast(IntegerType()))
test_hours_diff_features = test_hours_diff_features.withColumn('is_11_100',
                ((sf.col('hour_diff') >= sf.lit(11)) & (sf.col('hour_diff') <= sf.lit(100))).cast(IntegerType()))
test_hours_diff_features = test_hours_diff_features.withColumn('is_100_plus',
                (sf.col('hour_diff') > sf.lit(100)).cast(IntegerType()))

test_hours_diff_features = test_hours_diff_features.groupBy(['app_id']).agg(
                                                            sf.mean('is_0').alias('hour_diff_fraq_0'),
                                                            sf.mean('is_1_2').alias('hour_diff_fraq_1_2'),
                                                            sf.mean('is_1_10').alias('hour_diff_fraq_1_10'),
                                                            sf.mean('is_3_10').alias('hour_diff_fraq_3_10'),
                                                            sf.mean('is_11_100').alias('hour_diff_fraq_11_100'),
                                                            sf.mean('is_100_plus').alias('hour_diff_fraq_100_plus'))

###### Income_flag

In [11]:
train = train.withColumn('income_flag', (sf.col('income_flag') == sf.lit(1)).cast(IntegerType()))
test = test.withColumn('income_flag', (sf.col('income_flag') == sf.lit(1)).cast(IntegerType()))

In [12]:
train_income_flag_table = train.select(['app_id']).dropDuplicates()

c_df = train.groupBy(['app_id', 'income_flag']).agg(sf.max('amnt').alias('max_amnt'))
c_df = c_df.groupBy(['app_id']).pivot('income_flag').agg(sf.first('max_amnt'))
c_df = c_df.withColumnRenamed('0', 'max_debet_income').withColumnRenamed('1', 'max_credit_income')
c_df = c_df.fillna(0, subset=['max_debet_income', 'max_credit_income'])
c_df = c_df.withColumn('frac_max_debet_credit_income', (sf.col('max_debet_income') + sf.lit(0.001)) /\
                                                       (sf.col('max_credit_income') + sf.lit(0.001)))
train_income_flag_table = train_income_flag_table.join(c_df, on=['app_id'], how='left')

c_df = train.groupBy(['app_id', 'income_flag']).agg(sf.sum('amnt').alias('sum_amnt'))
c_df = c_df.groupBy(['app_id']).pivot('income_flag').agg(sf.first('sum_amnt'))
c_df = c_df.withColumnRenamed('0', 'sum_debet_income').withColumnRenamed('1', 'sum_credit_income')
c_df = c_df.fillna(0, subset=['sum_debet_income', 'sum_credit_income'])
c_df = c_df.withColumn('frac_sum_debet_credit_income', (sf.col('sum_debet_income') + sf.lit(0.001)) /\
                                                       (sf.col('sum_credit_income') + sf.lit(0.001)))
train_income_flag_table = train_income_flag_table.join(c_df, on=['app_id'], how='left')

c_df = train.groupBy(['app_id', 'income_flag', 'days_before']).agg(sf.sum('amnt').alias('sum_amnt'),
                                                                   sf.count('amnt').alias('cnt_amnt'))
c_df = c_df.withColumn('min_dys_before', sf.min('days_before').over(sw().partitionBy('app_id')))
c_df = c_df.withColumn('max_dys_before', sf.max('days_before').over(sw().partitionBy('app_id')))
c_df = c_df.withColumn('known_days', sf.col('max_dys_before') - sf.col('min_dys_before'))
c_df = c_df.groupBy(['app_id', 'income_flag']).agg(sf.count('sum_amnt').alias('cnt_unique_days_amnt'),
                                                   sf.sum('cnt_amnt').alias('cnt_all_trans'),
                                                   sf.first('known_days').alias('cnt_all_days'),
                                                   
                                                   sf.mean('sum_amnt').alias('mean_day_amnt'),
                                                   sf.max('sum_amnt').alias('max_day_amnt'),
                                                   sf.stddev('sum_amnt').alias('std_day_amnt'),

                                                   sf.mean('cnt_amnt').alias('mean_day_cnt_amnt'),
                                                   sf.stddev('cnt_amnt').alias('std_day_cnt_amnt'))

c_df = c_df.withColumn('fraq_trans_days', sf.col('cnt_unique_days_amnt') / sf.col('cnt_all_days'))
c_df = c_df.withColumn('trans_per_month', sf.col('cnt_all_trans') / sf.col('cnt_all_days') * sf.lit(30))
c_df = c_df.withColumn('trans_per_day_known', sf.col('cnt_unique_days_amnt') / sf.col('cnt_all_trans'))
c_df = c_df.withColumn('zp_days', sf.col('max_day_amnt') / sf.col('trans_per_day_known'))
c_df = c_df.withColumn('zp_month', sf.col('trans_per_month') * sf.col('mean_day_amnt'))

c_df_0 = c_df.filter(sf.col('income_flag') == 0)
select_0_cols = ['mean_day_cnt_amnt', 'mean_day_amnt', 'std_day_amnt', 'fraq_trans_days',
                 'trans_per_month', 'zp_days', 'zp_month']
c_df_0 = c_df_0.select(['app_id'] + select_0_cols)
for col in select_0_cols:
    c_df_0 = c_df_0.withColumnRenamed(col, col + '_debet')
train_income_flag_table = train_income_flag_table.join(c_df_0, on=['app_id'], how='left')

c_df_1 = c_df.filter(sf.col('income_flag') == 1)
select_1_cols = ['mean_day_cnt_amnt', 'std_day_cnt_amnt', 'mean_day_amnt', 'std_day_amnt', 'fraq_trans_days',
                 'trans_per_month', 'trans_per_day_known']
c_df_1 = c_df_1.select(['app_id'] + select_1_cols)
for col in select_1_cols:
    c_df_1 = c_df_1.withColumnRenamed(col, col + '_credit')
train_income_flag_table = train_income_flag_table.join(c_df_1, on=['app_id'], how='left')

In [13]:
test_income_flag_table = test.select(['app_id']).dropDuplicates()

c_df = test.groupBy(['app_id', 'income_flag']).agg(sf.max('amnt').alias('max_amnt'))
c_df = c_df.groupBy(['app_id']).pivot('income_flag').agg(sf.first('max_amnt'))
c_df = c_df.withColumnRenamed('0', 'max_debet_income').withColumnRenamed('1', 'max_credit_income')
c_df = c_df.fillna(0, subset=['max_debet_income', 'max_credit_income'])
c_df = c_df.withColumn('frac_max_debet_credit_income', (sf.col('max_debet_income') + sf.lit(0.001)) /\
                                                       (sf.col('max_credit_income') + sf.lit(0.001)))
test_income_flag_table = test_income_flag_table.join(c_df, on=['app_id'], how='left')

c_df = test.groupBy(['app_id', 'income_flag']).agg(sf.sum('amnt').alias('sum_amnt'))
c_df = c_df.groupBy(['app_id']).pivot('income_flag').agg(sf.first('sum_amnt'))
c_df = c_df.withColumnRenamed('0', 'sum_debet_income').withColumnRenamed('1', 'sum_credit_income')
c_df = c_df.fillna(0, subset=['sum_debet_income', 'sum_credit_income'])
c_df = c_df.withColumn('frac_sum_debet_credit_income', (sf.col('sum_debet_income') + sf.lit(0.001)) /\
                                                       (sf.col('sum_credit_income') + sf.lit(0.001)))
test_income_flag_table = test_income_flag_table.join(c_df, on=['app_id'], how='left')

c_df = test.groupBy(['app_id', 'income_flag', 'days_before']).agg(sf.sum('amnt').alias('sum_amnt'),
                                                                   sf.count('amnt').alias('cnt_amnt'))
c_df = c_df.withColumn('min_dys_before', sf.min('days_before').over(sw().partitionBy('app_id')))
c_df = c_df.withColumn('max_dys_before', sf.max('days_before').over(sw().partitionBy('app_id')))
c_df = c_df.withColumn('known_days', sf.col('max_dys_before') - sf.col('min_dys_before'))
c_df = c_df.groupBy(['app_id', 'income_flag']).agg(sf.count('sum_amnt').alias('cnt_unique_days_amnt'),
                                                   sf.sum('cnt_amnt').alias('cnt_all_trans'),
                                                   sf.first('known_days').alias('cnt_all_days'),
                                                   
                                                   sf.mean('sum_amnt').alias('mean_day_amnt'),
                                                   sf.max('sum_amnt').alias('max_day_amnt'),
                                                   sf.stddev('sum_amnt').alias('std_day_amnt'),

                                                   sf.mean('cnt_amnt').alias('mean_day_cnt_amnt'),
                                                   sf.stddev('cnt_amnt').alias('std_day_cnt_amnt'))

c_df = c_df.withColumn('fraq_trans_days', sf.col('cnt_unique_days_amnt') / sf.col('cnt_all_days'))
c_df = c_df.withColumn('trans_per_month', sf.col('cnt_all_trans') / sf.col('cnt_all_days') * sf.lit(30))
c_df = c_df.withColumn('trans_per_day_known', sf.col('cnt_unique_days_amnt') / sf.col('cnt_all_trans'))
c_df = c_df.withColumn('zp_days', sf.col('max_day_amnt') / sf.col('trans_per_day_known'))
c_df = c_df.withColumn('zp_month', sf.col('trans_per_month') * sf.col('mean_day_amnt'))

c_df_0 = c_df.filter(sf.col('income_flag') == 0)
select_0_cols = ['mean_day_cnt_amnt', 'mean_day_amnt', 'std_day_amnt', 'fraq_trans_days',
                 'trans_per_month', 'zp_days', 'zp_month']
c_df_0 = c_df_0.select(['app_id'] + select_0_cols)
for col in select_0_cols:
    c_df_0 = c_df_0.withColumnRenamed(col, col + '_debet')
test_income_flag_table = test_income_flag_table.join(c_df_0, on=['app_id'], how='left')

c_df_1 = c_df.filter(sf.col('income_flag') == 1)
select_1_cols = ['mean_day_cnt_amnt', 'std_day_cnt_amnt', 'mean_day_amnt', 'std_day_amnt', 'fraq_trans_days',
                 'trans_per_month', 'trans_per_day_known']
c_df_1 = c_df_1.select(['app_id'] + select_1_cols)
for col in select_1_cols:
    c_df_1 = c_df_1.withColumnRenamed(col, col + '_credit')
test_income_flag_table = test_income_flag_table.join(c_df_1, on=['app_id'], how='left')

###### Ecommerce flag

In [14]:
train = train.withColumn('ecommerce_flag', (sf.col('ecommerce_flag') == sf.lit(1)).cast(IntegerType()))
test = test.withColumn('ecommerce_flag', (sf.col('ecommerce_flag') == sf.lit(1)).cast(IntegerType()))

###### Currency

In [15]:
train = train.withColumn('rub_trans', (sf.col('currency') == sf.lit(1)).cast(IntegerType()))
test = test.withColumn('rub_trans', (sf.col('currency') == sf.lit(1)).cast(IntegerType()))

###### Countries

In [16]:
country_mapper = {'13': 'bad', '8': 'bad', '16': 'bad', '19': 'bad',
                  '1': 'med', '4': 'med', '3': 'med', '2': 'med', '5': 'med', '11': 'med',
                  '6': 'good', '7': 'good', '9': 'good', '10': 'good', '12': 'good', '14': 'good', '15': 'good',
                  '17': 'good', '18': 'good', '20': 'good', '21': 'good', '22': 'good', '23': 'good', '24': 'good'}
iplookup_udf_country = sf.udf(lambda x: country_mapper[x])
train = train.withColumn('country_group', iplookup_udf_country(sf.col('country').cast(StringType())))
test = test.withColumn('country_group', iplookup_udf_country(sf.col('country').cast(StringType())))

###### Mcc_category

In [18]:
mcc_category_mapper = {'28': 'bad', '23': 'bad', '8': 'bad',
                       '3': 'med', '12': 'med', '16': 'med', '1': 'med', '2': 'med', '6': 'med', '4': 'med',
                       '11': 'med', '5': 'med', '24': 'med', '17': 'med', '14': 'med', '27': 'med', '18': 'med',
                       '10': 'good', '7': 'good', '15': 'good', '9': 'good', '13': 'good', '19': 'good',
                       '22': 'good', '21': 'good', '20': 'good', '26': 'good', '25': 'good'}
iplookup_udf_mcc_category = sf.udf(lambda x: mcc_category_mapper[x])
train = train.withColumn('mcc_category_group', iplookup_udf_mcc_category(sf.col('mcc_category').cast(StringType())))
test = test.withColumn('mcc_category_group', iplookup_udf_mcc_category(sf.col('mcc_category').cast(StringType())))

###### Hour

In [19]:
train = train.withColumn('is_night_trans', sf.col('hour').isin({0, 1, 2, 3, 23}).cast(IntegerType()))
test = test.withColumn('is_night_trans', sf.col('hour').isin({0, 1, 2, 3, 23}).cast(IntegerType()))

###### Weekday

In [20]:
train = train.withColumn('is_weekend_trans_67', sf.col('day_of_week').isin({6, 7}).cast(IntegerType()))
train = train.withColumn('is_weekend_trans_12', sf.col('day_of_week').isin({1, 2}).cast(IntegerType()))

test = test.withColumn('is_weekend_trans_67', sf.col('day_of_week').isin({6, 7}).cast(IntegerType()))
test = test.withColumn('is_weekend_trans_12', sf.col('day_of_week').isin({1, 2}).cast(IntegerType()))

###### Age

In [21]:
train_age_table = train.filter(sf.col('weekofyear').isin({8, 10}))
train_age_table = train_age_table.groupBy(['app_id', 'weekofyear']).agg(sf.sum('amnt').alias('sum_amnt'))
train_age_table = train_age_table.groupBy('app_id').pivot('weekofyear').agg(sf.first('sum_amnt'))
train_age_table = train_age_table.withColumn('age_frac_8_10_with_null', sf.col("8") / (sf.col("8") + sf.col("10")))
train_age_table = train_age_table.select(['app_id', 'age_frac_8_10_with_null'])

test_age_table = test.filter(sf.col('weekofyear').isin({8, 10}))
test_age_table = test_age_table.groupBy(['app_id', 'weekofyear']).agg(sf.sum('amnt').alias('sum_amnt'))
test_age_table = test_age_table.groupBy('app_id').pivot('weekofyear').agg(sf.first('sum_amnt'))
test_age_table = test_age_table.withColumn('age_frac_8_10_with_null', sf.col("8") / (sf.col("8") + sf.col("10")))
test_age_table = test_age_table.select(['app_id', 'age_frac_8_10_with_null'])

# Features with incomes

In [22]:
train_income_features_add_table = train.select(['app_id']).dropDuplicates()

train_income = train.filter(sf.col('income_flag') == 0)
train_income = train_income.select(['app_id', 'amnt', 'days_before'])
train_income = train_income.withColumn('more_5k', (sf.col('amnt') >= 5000).cast(IntegerType()))
train_income = train_income.withColumn('more_15k', (sf.col('amnt') > 15000).cast(IntegerType()))
train_income = train_income.groupBy(['app_id']).agg(sf.mean('more_5k').alias('add_income_perc_more_5k'),
                                                    sf.sum('more_15k').alias('add_income_sum_more_15k'),
                                                    sf.max('days_before').alias('add_income_max_days_before'))
train_income = train_income.withColumn('add_income_per_month_income_more_15k',
                        sf.col('add_income_sum_more_15k') / sf.col('add_income_max_days_before'))
train_income = train_income.select(['app_id', 'add_income_perc_more_5k', 'add_income_per_month_income_more_15k'])
train_income_features_add_table = train_income_features_add_table.join(train_income, how='left', on=['app_id'])

train_last_month = train.filter(sf.col('income_flag') == 0).filter(sf.col('days_before') <= 31)
train_last_month = train_last_month.groupBy(['app_id']).agg(sf.sum('amnt').alias('last_month_income_sum'))
train_last_month = train_last_month.fillna(0, subset=['last_month_income_sum'])
train_income_features_add_table = train_income_features_add_table.join(train_last_month, how='left', on=['app_id'])

train_month_before_c = train.filter(sf.col('income_flag') == 0)
train_month_before_c = train_month_before_c.select(['app_id', 'amnt', 'days_before'])
train_month_before_c = train_month_before_c.withColumn('month_before', sf.floor(sf.col('days_before') / sf.lit(30)))
train_month_before_c = train_month_before_c.groupBy(['app_id', 'month_before']).agg(sf.sum('amnt').alias('amnt'))
train_month_before_c = train_month_before_c.groupBy(['app_id']).agg(
                                sf.mean('amnt').alias('add_income_month_income_debet_mean'),
                                sf.count('amnt').alias('add_income_month_income_debet_cnt'),
                                sf.max('amnt').alias('add_income_month_income_debet_max'),
                                sf.stddev('amnt').alias('add_income_month_income_debet_std'),
                                sf.min('month_before').alias('add_income_month_before_debet_min'))
train_income_features_add_table = train_income_features_add_table.join(train_month_before_c,
                                                                       how='left', on=['app_id'])
train_month_before_c = train.filter(sf.col('income_flag') == 1)
train_month_before_c = train_month_before_c.select(['app_id', 'amnt', 'days_before'])
train_month_before_c = train_month_before_c.withColumn('month_before', sf.floor(sf.col('days_before') / sf.lit(30)))
train_month_before_c = train_month_before_c.groupBy(['app_id', 'month_before']).agg(sf.sum('amnt').alias('amnt'))
train_month_before_c = train_month_before_c.groupBy(['app_id']).agg(
                                sf.mean('amnt').alias('add_income_month_income_credit_mean'),
                                sf.count('amnt').alias('add_income_month_income_credit_cnt'),
                                sf.max('amnt').alias('add_income_month_income_credit_max'),
                                sf.stddev('amnt').alias('add_income_month_income_credit_std'),
                                sf.min('month_before').alias('add_income_month_before_credit_min'))
train_income_features_add_table = train_income_features_add_table.join(train_month_before_c,
                                                                       how='left', on=['app_id'])
train_month_before_c = train.filter(sf.col('income_flag') == 0)
train_month_before_c = train_month_before_c.withColumn('month_before', sf.floor(sf.col('days_before') / sf.lit(30)))
train_month_before_c = train_month_before_c.withColumn('more_5k', (sf.col('amnt') >= 5000).cast(IntegerType()))
train_month_before_c = train_month_before_c.withColumn('sum_more_5k', sf.col('more_5k') * sf.col('amnt'))
train_month_before_c = train_month_before_c.groupBy(['app_id', 'month_before']).agg(
                                                      sf.count('amnt').alias('cnt_income'),
                                                      sf.sum('more_5k').alias('cnt_more_5k'),
                                                      sf.mean('more_5k').alias('perc_more_5k'),
                                                      sf.sum('sum_more_5k').alias('zp'))
train_month_before_c = train_month_before_c.filter(sf.col('cnt_more_5k') > 0)
train_month_before_c = train_month_before_c.groupBy(['app_id']).agg(
                                      sf.min('month_before').alias('add_2222_month_before_min'),
                                      sf.mean('cnt_income').alias('add_2222_cnt_income_mean'),
                                      sf.stddev('cnt_income').alias('add_2222_cnt_income_std'),
                                      sf.mean('cnt_more_5k').alias('add_2222_cnt_more_5k_mean'),
                                      sf.stddev('cnt_more_5k').alias('add_2222_cnt_more_5k_std'),
                                      sf.mean('perc_more_5k').alias('add_2222_perc_more_5k_mean'),
                                      sf.stddev('perc_more_5k').alias('add_2222_perc_more_5k_std'),
                                      sf.mean('zp').alias('add_2222_zp_mean'),
                                      sf.stddev('zp').alias('add_2222_zp_std'))
train_income_features_add_table = train_income_features_add_table.join(train_month_before_c,
                                                                       how='left', on=['app_id'])

In [23]:
test_income_features_add_table = test.select(['app_id']).dropDuplicates()

test_income = test.filter(sf.col('income_flag') == 0)
test_income = test_income.select(['app_id', 'amnt', 'days_before'])
test_income = test_income.withColumn('more_5k', (sf.col('amnt') >= 5000).cast(IntegerType()))
test_income = test_income.withColumn('more_15k', (sf.col('amnt') > 15000).cast(IntegerType()))
test_income = test_income.groupBy(['app_id']).agg(sf.mean('more_5k').alias('add_income_perc_more_5k'),
                                                    sf.sum('more_15k').alias('add_income_sum_more_15k'),
                                                    sf.max('days_before').alias('add_income_max_days_before'))
test_income = test_income.withColumn('add_income_per_month_income_more_15k',
                        sf.col('add_income_sum_more_15k') / sf.col('add_income_max_days_before'))
test_income = test_income.select(['app_id', 'add_income_perc_more_5k', 'add_income_per_month_income_more_15k'])
test_income_features_add_table = test_income_features_add_table.join(test_income, how='left', on=['app_id'])

test_last_month = test.filter(sf.col('income_flag') == 0).filter(sf.col('days_before') <= 31)
test_last_month = test_last_month.groupBy(['app_id']).agg(sf.sum('amnt').alias('last_month_income_sum'))
test_last_month = test_last_month.fillna(0, subset=['last_month_income_sum'])
test_income_features_add_table = test_income_features_add_table.join(test_last_month, how='left', on=['app_id'])

test_month_before_c = test.filter(sf.col('income_flag') == 0)
test_month_before_c = test_month_before_c.select(['app_id', 'amnt', 'days_before'])
test_month_before_c = test_month_before_c.withColumn('month_before', sf.floor(sf.col('days_before') / sf.lit(30)))
test_month_before_c = test_month_before_c.groupBy(['app_id', 'month_before']).agg(sf.sum('amnt').alias('amnt'))
test_month_before_c = test_month_before_c.groupBy(['app_id']).agg(
                                sf.mean('amnt').alias('add_income_month_income_debet_mean'),
                                sf.count('amnt').alias('add_income_month_income_debet_cnt'),
                                sf.max('amnt').alias('add_income_month_income_debet_max'),
                                sf.stddev('amnt').alias('add_income_month_income_debet_std'),
                                sf.min('month_before').alias('add_income_month_before_debet_min'))
test_income_features_add_table = test_income_features_add_table.join(test_month_before_c, how='left', on=['app_id'])

test_month_before_c = test.filter(sf.col('income_flag') == 1)
test_month_before_c = test_month_before_c.select(['app_id', 'amnt', 'days_before'])
test_month_before_c = test_month_before_c.withColumn('month_before', sf.floor(sf.col('days_before') / sf.lit(30)))
test_month_before_c = test_month_before_c.groupBy(['app_id', 'month_before']).agg(sf.sum('amnt').alias('amnt'))
test_month_before_c = test_month_before_c.groupBy(['app_id']).agg(
                                sf.mean('amnt').alias('add_income_month_income_credit_mean'),
                                sf.count('amnt').alias('add_income_month_income_credit_cnt'),
                                sf.max('amnt').alias('add_income_month_income_credit_max'),
                                sf.stddev('amnt').alias('add_income_month_income_credit_std'),
                                sf.min('month_before').alias('add_income_month_before_credit_min'))
test_income_features_add_table = test_income_features_add_table.join(test_month_before_c, how='left', on=['app_id'])

test_month_before_c = test.filter(sf.col('income_flag') == 0)
test_month_before_c = test_month_before_c.withColumn('month_before', sf.floor(sf.col('days_before') / sf.lit(30)))
test_month_before_c = test_month_before_c.withColumn('more_5k', (sf.col('amnt') >= 5000).cast(IntegerType()))
test_month_before_c = test_month_before_c.withColumn('sum_more_5k', sf.col('more_5k') * sf.col('amnt'))
test_month_before_c = test_month_before_c.groupBy(['app_id', 'month_before']).agg(
                                                      sf.count('amnt').alias('cnt_income'),
                                                      sf.sum('more_5k').alias('cnt_more_5k'),
                                                      sf.mean('more_5k').alias('perc_more_5k'),
                                                      sf.sum('sum_more_5k').alias('zp'))
test_month_before_c = test_month_before_c.filter(sf.col('cnt_more_5k') > 0)
test_month_before_c = test_month_before_c.groupBy(['app_id']).agg(
                                      sf.min('month_before').alias('add_2222_month_before_min'),
                                      sf.mean('cnt_income').alias('add_2222_cnt_income_mean'),
                                      sf.stddev('cnt_income').alias('add_2222_cnt_income_std'),
                                      sf.mean('cnt_more_5k').alias('add_2222_cnt_more_5k_mean'),
                                      sf.stddev('cnt_more_5k').alias('add_2222_cnt_more_5k_std'),
                                      sf.mean('perc_more_5k').alias('add_2222_perc_more_5k_mean'),
                                      sf.stddev('perc_more_5k').alias('add_2222_perc_more_5k_std'),
                                      sf.mean('zp').alias('add_2222_zp_mean'),
                                      sf.stddev('zp').alias('add_2222_zp_std'))
test_income_features_add_table = test_income_features_add_table.join(test_month_before_c, how='left', on=['app_id'])

# Common features

### Cat cols balances

In [24]:
target_for_balances = spark.table('alfa.andrey_auto_target_train')
target_for_balances = target_for_balances.withColumnRenamed('flag', 'target')
target_for_balances = target_for_balances.select(['app_id', 'target'])

In [25]:
train_balances_table = train.select(['app_id']).dropDuplicates()
test_balances_table = test.select(['app_id']).dropDuplicates()
    
for c_col in ['currency', 'card_type', 'hour']:
    current_cat_balance = train.select(['app_id', c_col])
    current_cat_balance = current_cat_balance.join(target_for_balances, on='app_id', how='inner')
    current_cat_balance = current_cat_balance.groupBy([c_col, 'target']).count()
    current_cat_balance = current_cat_balance.groupBy(c_col).pivot('target').agg(sf.first('count'))
    current_cat_balance = current_cat_balance.withColumnRenamed('0', 'good').withColumnRenamed('1', 'bad')
    current_cat_balance = current_cat_balance.fillna(0, subset=['good', 'bad'])
    current_cat_balance = current_cat_balance.withColumn('sum_trans', sf.col('bad') + sf.col('good'))
    current_cat_balance = current_cat_balance.withColumn('balance', sf.col('bad') / sf.col('sum_trans'))
    current_cat_balance = current_cat_balance.select([c_col, 'balance'])

    if c_col in ['card_type', 'hour']:
        f_table = train.select(['app_id', 'amnt', c_col])
        f_table = f_table.join(current_cat_balance, on=[c_col], how='inner')
        f_table = f_table.groupBy(['app_id']).agg(sf.mean('balance').alias('mean_balance_' + c_col + '_cnt'))
        train_balances_table = train_balances_table.join(f_table, on=['app_id'], how='left')
        f_table = test.select(['app_id', 'amnt', c_col])
        f_table = f_table.join(current_cat_balance, on=[c_col], how='inner')
        f_table = f_table.groupBy(['app_id']).agg(sf.mean('balance').alias('mean_balance_' + c_col + '_cnt'))
        test_balances_table = test_balances_table.join(f_table, on=['app_id'], how='left')
    
    if c_col in ['currency']:
        f_table = train.select(['app_id', 'amnt', c_col]).filter(sf.col('amnt') > 0)
        f_table = f_table.join(current_cat_balance, on=[c_col], how='inner')
        f_table = f_table.withColumn('sum_amnt', sf.sum('amnt').over(sw().partitionBy('app_id')))
        f_table = f_table.withColumn('fraq_amnt', sf.col('amnt') / sf.col('sum_amnt'))
        f_table = f_table.withColumn('w_balance', sf.col('fraq_amnt') * sf.col('balance'))
        f_table = f_table.groupBy(['app_id']).agg(sf.sum('w_balance').alias('w_mean_balance_' + c_col + '_cnt'))
        train_balances_table = train_balances_table.join(f_table, on=['app_id'], how='left')
        f_table = test.select(['app_id', 'amnt', c_col]).filter(sf.col('amnt') > 0)
        f_table = f_table.join(current_cat_balance, on=[c_col], how='inner')
        f_table = f_table.withColumn('sum_amnt', sf.sum('amnt').over(sw().partitionBy('app_id')))
        f_table = f_table.withColumn('fraq_amnt', sf.col('amnt') / sf.col('sum_amnt'))
        f_table = f_table.withColumn('w_balance', sf.col('fraq_amnt') * sf.col('balance'))
        f_table = f_table.groupBy(['app_id']).agg(sf.sum('w_balance').alias('w_mean_balance_' + c_col + '_cnt'))
        test_balances_table = test_balances_table.join(f_table, on=['app_id'], how='left')

for c_col in ['operation_kind', 'operation_type', 'mcc', 'mcc_category']:
    current_cat_balance = train.select(['app_id', 'amnt', c_col]).filter(sf.col('amnt') > 0)
    current_cat_balance = current_cat_balance.join(target_for_balances, on='app_id', how='inner')
    current_cat_balance = current_cat_balance.groupBy([c_col, 'target']).agg(sf.sum('amnt').alias('count'))
    current_cat_balance = current_cat_balance.groupBy(c_col).pivot('target').agg(sf.first('count'))
    current_cat_balance = current_cat_balance.withColumnRenamed('0', 'good').withColumnRenamed('1', 'bad')
    current_cat_balance = current_cat_balance.fillna(0, subset=['good', 'bad'])
    current_cat_balance = current_cat_balance.withColumn('sum_trans', sf.col('bad') + sf.col('good'))
    current_cat_balance = current_cat_balance.withColumn('balance', sf.col('bad') / sf.col('sum_trans'))
    current_cat_balance = current_cat_balance.select([c_col, 'balance'])
    
    f_table = train.select(['app_id', 'amnt', c_col]).filter(sf.col('amnt') > 0)
    f_table = f_table.join(current_cat_balance, on=[c_col], how='inner')
    f_table = f_table.withColumn('sum_amnt', sf.sum('amnt').over(sw().partitionBy('app_id')))
    f_table = f_table.withColumn('fraq_amnt', sf.col('amnt') / sf.col('sum_amnt'))
    f_table = f_table.withColumn('w_balance', sf.col('fraq_amnt') * sf.col('balance'))
    f_table = f_table.groupBy(['app_id']).agg(sf.sum('w_balance').alias('w_mean_balance_' + c_col + '_amnt'))
    train_balances_table = train_balances_table.join(f_table, on=['app_id'], how='left')   
    f_table = test.select(['app_id', 'amnt', c_col]).filter(sf.col('amnt') > 0)
    f_table = f_table.join(current_cat_balance, on=[c_col], how='inner')
    f_table = f_table.withColumn('sum_amnt', sf.sum('amnt').over(sw().partitionBy('app_id')))
    f_table = f_table.withColumn('fraq_amnt', sf.col('amnt') / sf.col('sum_amnt'))
    f_table = f_table.withColumn('w_balance', sf.col('fraq_amnt') * sf.col('balance'))
    f_table = f_table.groupBy(['app_id']).agg(sf.sum('w_balance').alias('w_mean_balance_' + c_col + '_amnt'))
    test_balances_table = test_balances_table.join(f_table, on=['app_id'], how='left')

### Bad, good features

In [26]:
train_good_bad_table = train.select(['app_id']).dropDuplicates()
for c_col in ['country_group', 'mcc_category_group']:
    c_df = train.filter(sf.col('amnt') > 0)
    c_df = c_df.groupBy(['app_id', c_col]).agg(sf.sum('amnt').alias('sum_amnt'))
    c_df = c_df.withColumn('sum_all_' + c_col, sf.sum('sum_amnt').over(sw().partitionBy('app_id')))
    c_df = c_df.withColumn('fraq', sf.col('sum_amnt') / sf.col('sum_all_' + c_col))
    c_df = c_df.groupBy(['app_id']).pivot(c_col).agg(sf.first('fraq'))
    for col in c_df.columns:
        if col != 'app_id':
            c_df = c_df.fillna(0, subset=[col]).withColumnRenamed(col, 'fraq_sum_' + c_col + '_' + col)
    c_df = c_df.select('app_id', 'fraq_sum_' + c_col + '_bad', 'fraq_sum_' + c_col + '_good')
    train_good_bad_table = train_good_bad_table.join(c_df, ['app_id'], 'left')

In [27]:
test_good_bad_table = test.select(['app_id']).dropDuplicates()
for c_col in ['country_group', 'mcc_category_group']:
    c_df = test.filter(sf.col('amnt') > 0)
    c_df = c_df.groupBy(['app_id', c_col]).agg(sf.sum('amnt').alias('sum_amnt'))
    c_df = c_df.withColumn('sum_all_' + c_col, sf.sum('sum_amnt').over(sw().partitionBy('app_id')))
    c_df = c_df.withColumn('fraq', sf.col('sum_amnt') / sf.col('sum_all_' + c_col))
    c_df = c_df.groupBy(['app_id']).pivot(c_col).agg(sf.first('fraq'))
    for col in c_df.columns:
        if col != 'app_id':
            c_df = c_df.fillna(0, subset=[col]).withColumnRenamed(col, 'fraq_sum_' + c_col + '_' + col)
    c_df = c_df.select('app_id', 'fraq_sum_' + c_col + '_bad', 'fraq_sum_' + c_col + '_good')
    test_good_bad_table = test_good_bad_table.join(c_df, ['app_id'], 'left')

### Add Ex of columns

In [28]:
train_Ex_table = train.select(['app_id']).dropDuplicates()
for c_col in ['is_night_trans', 'rub_trans', 'income_flag', 'is_weekend_trans_67', 'is_weekend_trans_12']:
    c_df = train.filter(sf.col('amnt') > 0)
    c_df = c_df.groupBy(['app_id', c_col]).agg(sf.sum('amnt').alias('sum_amnt'))
    c_df = c_df.withColumn('sum_all', sf.sum('sum_amnt').over(sw().partitionBy('app_id')))
    c_df = c_df.withColumn('fraq_sum_' + c_col, sf.col('sum_amnt') / sf.col('sum_all'))
    c_df = c_df.groupBy('app_id').pivot(c_col).agg(sf.first('fraq_sum_' + c_col))
    c_df = c_df.fillna(0, subset=['1']).withColumnRenamed('1', 'fraq_sum_' + c_col)
    c_df = c_df.select(['app_id', 'fraq_sum_' + c_col])
    train_Ex_table = train_Ex_table.join(c_df, ['app_id'], 'left')

In [29]:
test_Ex_table = test.select(['app_id']).dropDuplicates()
for c_col in ['is_night_trans', 'rub_trans', 'income_flag', 'is_weekend_trans_67', 'is_weekend_trans_12']:
    c_df = test.filter(sf.col('amnt') > 0)
    c_df = c_df.groupBy(['app_id', c_col]).agg(sf.sum('amnt').alias('sum_amnt'))
    c_df = c_df.withColumn('sum_all', sf.sum('sum_amnt').over(sw().partitionBy('app_id')))
    c_df = c_df.withColumn('fraq_sum_' + c_col, sf.col('sum_amnt') / sf.col('sum_all'))
    c_df = c_df.groupBy('app_id').pivot(c_col).agg(sf.first('fraq_sum_' + c_col))
    c_df = c_df.fillna(0, subset=['1']).withColumnRenamed('1', 'fraq_sum_' + c_col)
    c_df = c_df.select(['app_id', 'fraq_sum_' + c_col])
    test_Ex_table = test_Ex_table.join(c_df, ['app_id'], 'left')

### Add mode for columns and statistics

In [30]:
train_mode_table = train.select(['app_id']).dropDuplicates()
for_mmode_cols = ['currency', 'card_type', 'operation_type', 'operation_type_group',
                  'mcc', 'city', 'mcc_category', 'day_of_week', 'hour', 'n_order']
for c_col in for_mmode_cols:
    c_df = train.groupBy(['app_id', c_col]).count().alias('counts')
    c_df = c_df.withColumn('cnt', sf.sum('count').over(sw().partitionBy('app_id')))
    c_df = c_df.withColumn('cnt_fraction_' + c_col, sf.col('count') / sf.col('cnt'))
    c_df = c_df.withColumn('std_fraction_' + c_col,
                           sf.stddev('cnt_fraction_' + c_col).over(sw().partitionBy('app_id')))
    c_df = c_df.withColumn('cnt_unique_' + c_col, sf.count('cnt_fraction_' + c_col).over(sw().partitionBy('app_id')))
    c_df = c_df.withColumn('row_n', sf.row_number().over(sw.partitionBy(['app_id'])\
                            .orderBy(sf.desc('cnt_fraction_' + c_col)))).where('row_n=1').drop('row_n')
    c_df = c_df.withColumnRenamed(c_col, c_col + '_mode')
    c_df = c_df.select(['app_id', c_col + '_mode', 'std_fraction_' + c_col,
                        'cnt_unique_' + c_col, 'cnt_fraction_' + c_col])
    train_mode_table = train_mode_table.join(c_df, ['app_id'], 'left')

In [31]:
test_mode_table = test.select(['app_id']).dropDuplicates()
for_mmode_cols = ['currency', 'card_type', 'operation_type', 'operation_type_group',
                  'mcc', 'city', 'mcc_category', 'day_of_week', 'hour', 'n_order']
for c_col in for_mmode_cols:
    c_df = test.groupBy(['app_id', c_col]).count().alias('counts')
    c_df = c_df.withColumn('cnt', sf.sum('count').over(sw().partitionBy('app_id')))
    c_df = c_df.withColumn('cnt_fraction_' + c_col, sf.col('count') / sf.col('cnt'))
    c_df = c_df.withColumn('std_fraction_' + c_col,
                           sf.stddev('cnt_fraction_' + c_col).over(sw().partitionBy('app_id')))
    c_df = c_df.withColumn('cnt_unique_' + c_col, sf.count('cnt_fraction_' + c_col).over(sw().partitionBy('app_id')))
    c_df = c_df.withColumn('row_n', sf.row_number().over(sw.partitionBy(['app_id'])\
                            .orderBy(sf.desc('cnt_fraction_' + c_col)))).where('row_n=1').drop('row_n')
    c_df = c_df.withColumnRenamed(c_col, c_col + '_mode')
    c_df = c_df.select(['app_id', c_col + '_mode', 'std_fraction_' + c_col,
                        'cnt_unique_' + c_col, 'cnt_fraction_' + c_col])             
    test_mode_table = test_mode_table.join(c_df, ['app_id'], 'left')

### Add cols counts

In [32]:
train_cats_table = train.select(['app_id']).dropDuplicates()
for_cat_cols = ['currency', 'operation_kind', 'day_of_week', 'operation_type', 'mcc_category',
                'mcc', 'card_type', 'hour', 'operation_type_group']#, 'city']
for c_col in for_cat_cols:
    c_df = train.groupBy(['app_id', c_col]).count().alias('counts')
    c_df = c_df.withColumn('sum_' + c_col, sf.sum('count').over(sw().partitionBy('app_id')))
    c_df = c_df.withColumn('fraq', sf.col('count') / sf.col('sum_' + c_col))
    c_df = c_df.groupBy(['app_id']).pivot(c_col).agg(sf.first('fraq'))
    for col in c_df.columns:
        if col != 'app_id':
            c_df = c_df.fillna(0, subset=[col]).withColumnRenamed(col, 'fraq_' + c_col + '_' + col)
    train_cats_table = train_cats_table.join(c_df, ['app_id'], 'left')

In [33]:
test_cats_table = test.select(['app_id']).dropDuplicates()
for_cat_cols = ['currency', 'operation_kind', 'day_of_week', 'operation_type', 'mcc_category',
                'mcc', 'card_type', 'hour', 'operation_type_group']#, 'city']
for c_col in for_cat_cols:
    c_df = test.groupBy(['app_id', c_col]).count().alias('counts')
    c_df = c_df.withColumn('sum_' + c_col, sf.sum('count').over(sw().partitionBy('app_id')))
    c_df = c_df.withColumn('fraq', sf.col('count') / sf.col('sum_' + c_col))
    c_df = c_df.groupBy(['app_id']).pivot(c_col).agg(sf.first('fraq'))
    for col in c_df.columns:
        if col != 'app_id':
            c_df = c_df.fillna(0, subset=[col]).withColumnRenamed(col, 'fraq_' + c_col + '_' + col)
    test_cats_table = test_cats_table.join(c_df, ['app_id'], 'left')

### Agg with zero statistics

In [34]:
agg_non_0_cols = ['app_id', 'amnt', 'days_before', 'hour_diff', 'is_night_trans',
                  'ecommerce_flag', 'income_flag']
train_agg_with_zero = train.select(agg_non_0_cols)

train_agg_with_zero = train_agg_with_zero.withColumn("days_before_pv", sf.lag(train_agg_with_zero.days_before)\
                             .over(sw.partitionBy(['app_id']).orderBy(sf.desc('days_before'))))
train_agg_with_zero = train_agg_with_zero.filter(sf.col('days_before_pv').isNotNull())
train_agg_with_zero = train_agg_with_zero.withColumn("diff_days",
                            train_agg_with_zero.days_before_pv - train_agg_with_zero.days_before)

train_agg_with_zero = train_agg_with_zero.groupBy(['app_id']).agg(
                                    sf.max('diff_days').alias('max_days_without_trans'),
                                    sf.mean('diff_days').alias('mean_days_diff'),
                                    sf.stddev('diff_days').alias('std_diff_days'),

                                    sf.max('hour_diff').alias('max_hours_without_trans'),
                                    sf.expr('percentile_approx(hour_diff, 0.5)').alias('median_hour_diff'),
                                    sf.stddev('hour_diff').alias('std_hour_diff'),

                                    sf.mean('is_night_trans').alias('mean_is_night_trans'),
                                    sf.mean('ecommerce_flag').alias('mean_ecommerce_flag'),
                                    sf.mean('income_flag').alias('mean_income_flag'),
                                    sf.max('days_before').alias('days_from_first_transaction'))

In [35]:
agg_non_0_cols = ['app_id', 'amnt', 'days_before', 'hour_diff', 'is_night_trans',
                  'ecommerce_flag', 'income_flag']
test_agg_with_zero = test.select(agg_non_0_cols)

test_agg_with_zero = test_agg_with_zero.withColumn("days_before_pv", sf.lag(test_agg_with_zero.days_before)\
                             .over(sw.partitionBy(['app_id']).orderBy(sf.desc('days_before'))))
test_agg_with_zero = test_agg_with_zero.filter(sf.col('days_before_pv').isNotNull())
test_agg_with_zero = test_agg_with_zero.withColumn("diff_days",
                            test_agg_with_zero.days_before_pv - test_agg_with_zero.days_before)

test_agg_with_zero = test_agg_with_zero.groupBy(['app_id']).agg(
                                    sf.max('diff_days').alias('max_days_without_trans'),
                                    sf.mean('diff_days').alias('mean_days_diff'),
                                    sf.stddev('diff_days').alias('std_diff_days'),

                                    sf.max('hour_diff').alias('max_hours_without_trans'),
                                    sf.expr('percentile_approx(hour_diff, 0.5)').alias('median_hour_diff'),
                                    sf.stddev('hour_diff').alias('std_hour_diff'),

                                    sf.mean('is_night_trans').alias('mean_is_night_trans'),
                                    sf.mean('ecommerce_flag').alias('mean_ecommerce_flag'),
                                    sf.mean('income_flag').alias('mean_income_flag'),
                                    sf.max('days_before').alias('days_from_first_transaction'))

### Agg non zero statistics

In [36]:
agg_non_0_cols = ['app_id', 'amnt', 'day_of_week', 'days_before', 'ecommerce_flag', 'hour', 'hour_diff',
                  'income_flag', 'transaction_number', 'weekofyear']
train_agg_non_zero = train.select(agg_non_0_cols).filter(sf.col('amnt') > 0)

train_agg_non_zero = train_agg_non_zero.groupBy(['app_id']).agg(
                                    sf.mean('amnt').alias('mean_amnt'),
                                    sf.stddev('amnt').alias('std_amnt'),
                                    sf.max('amnt').alias('max_amnt'),
                                    sf.expr('percentile_approx(amnt, 0.5)').alias('median_amnt'))

In [37]:
agg_non_0_cols = ['app_id', 'amnt', 'day_of_week', 'days_before', 'ecommerce_flag', 'hour', 'hour_diff',
                  'income_flag', 'transaction_number', 'weekofyear']
test_agg_non_zero = test.select(agg_non_0_cols).filter(sf.col('amnt') > 0)

test_agg_non_zero = test_agg_non_zero.groupBy(['app_id']).agg(
                                    sf.mean('amnt').alias('mean_amnt'),
                                    sf.stddev('amnt').alias('std_amnt'),
                                    sf.max('amnt').alias('max_amnt'),
                                    sf.expr('percentile_approx(amnt, 0.5)').alias('median_amnt'))

### Add last day transaction features

In [38]:
train_last_trans = train.withColumn('min_days_before', sf.min('days_before').over(sw().partitionBy('app_id')))
train_last_trans = train_last_trans.filter(sf.col('days_before') == sf.col('min_days_before'))

train_last_trans = train_last_trans.withColumn('cnt_last_day', sf.count('amnt').over(sw().partitionBy('app_id')))
train_last_trans = train_last_trans.withColumn('min_hour', sf.min('hour').over(sw().partitionBy('app_id')))
train_last_trans = train_last_trans.filter(sf.col('hour') == sf.col('min_hour'))

train_last_trans = train_last_trans.withColumn('cnt_last_hour', sf.count('amnt').over(sw().partitionBy('app_id')))
train_last_trans = train_last_trans.withColumn('max_last_day_hours_diff',
                                               sf.max('hour_diff').over(sw().partitionBy('app_id')))

train_last_trans = train_last_trans.withColumn('last_transaction_number',
                                               sf.max('transaction_number').over(sw().partitionBy('app_id')))
train_last_trans = train_last_trans.filter(sf.col('transaction_number') == sf.col('last_transaction_number'))

select_last_trans_cols = ['app_id', 'mcc', 'days_before']
train_last_trans = train_last_trans.select(select_last_trans_cols)
for col in ['mcc', 'days_before', 'weekofyear', 'min_days_before']:
    train_last_trans = train_last_trans.withColumnRenamed(col, 'last_day_' + col)

In [39]:
test_last_trans = test.withColumn('min_days_before', sf.min('days_before').over(sw().partitionBy('app_id')))
test_last_trans = test_last_trans.filter(sf.col('days_before') == sf.col('min_days_before'))

test_last_trans = test_last_trans.withColumn('cnt_last_day', sf.count('amnt').over(sw().partitionBy('app_id')))
test_last_trans = test_last_trans.withColumn('min_hour', sf.min('hour').over(sw().partitionBy('app_id')))
test_last_trans = test_last_trans.filter(sf.col('hour') == sf.col('min_hour'))

test_last_trans = test_last_trans.withColumn('cnt_last_hour', sf.count('amnt').over(sw().partitionBy('app_id')))
test_last_trans = test_last_trans.withColumn('max_last_day_hours_diff',
                                             sf.max('hour_diff').over(sw().partitionBy('app_id')))

test_last_trans = test_last_trans.withColumn('last_transaction_number',
                                             sf.max('transaction_number').over(sw().partitionBy('app_id')))
test_last_trans = test_last_trans.filter(sf.col('transaction_number') == sf.col('last_transaction_number'))

select_last_trans_cols = ['app_id', 'mcc', 'days_before']
test_last_trans = test_last_trans.select(select_last_trans_cols)

for col in ['mcc', 'days_before', 'weekofyear', 'min_days_before']:
    test_last_trans = test_last_trans.withColumnRenamed(col, 'last_day_' + col)

_____

_____

_____

# Merge features and Add target

In [40]:
target_train = spark.table('alfa.andrey_auto_target_train')
target_train = target_train.withColumnRenamed('flag', 'target')

target_train = target_train.join(train_income_features_add_table, on=['app_id'], how='left')
target_train = target_train.join(train_number_order, on=['app_id'], how='left')
target_train = target_train.join(train_hours_diff_features, on=['app_id'], how='left')
target_train = target_train.join(train_balances_table, on=['app_id'], how='left')
target_train = target_train.join(train_income_flag_table, on=['app_id'], how='left')
target_train = target_train.join(train_age_table, on=['app_id'], how='left')
target_train = target_train.join(train_good_bad_table, on=['app_id'], how='left')
target_train = target_train.join(train_Ex_table, on=['app_id'], how='left')
target_train = target_train.join(train_mode_table, on=['app_id'], how='left')
target_train = target_train.join(train_cats_table, on=['app_id'], how='left')
target_train = target_train.join(train_agg_with_zero, on=['app_id'], how='left')
target_train = target_train.join(train_agg_non_zero, on=['app_id'], how='left')
target_train = target_train.join(train_last_trans, on=['app_id'], how='left')

In [41]:
target_test = spark.table('alfa.andrey_auto_target_test')
target_test = target_test.withColumn('target', sf.lit(-1))

target_test = target_test.join(test_income_features_add_table, on=['app_id'], how='left')
target_test = target_test.join(test_number_order, on=['app_id'], how='left')
target_test = target_test.join(test_hours_diff_features, on=['app_id'], how='left')
target_test = target_test.join(test_balances_table, on=['app_id'], how='left')
target_test = target_test.join(test_income_flag_table, on=['app_id'], how='left')
target_test = target_test.join(test_age_table, on=['app_id'], how='left')
target_test = target_test.join(test_good_bad_table, on=['app_id'], how='left')
target_test = target_test.join(test_Ex_table, on=['app_id'], how='left')
target_test = target_test.join(test_mode_table, on=['app_id'], how='left')
target_test = target_test.join(test_cats_table, on=['app_id'], how='left')
target_test = target_test.join(test_agg_with_zero, on=['app_id'], how='left')
target_test = target_test.join(test_agg_non_zero, on=['app_id'], how='left')
target_test = target_test.join(test_last_trans, on=['app_id'], how='left')

# Features after merge

In [42]:
len(target_train.columns), len(target_test.columns)

(505, 507)

In [42]:
len(target_train.columns), len(target_test.columns)

(514, 516)

# Save tables

In [43]:
%%time
target_train.write.format('orc').mode('overwrite').saveAsTable('alfa.andrey_auto_payments_targets_train')
spark.table('alfa.andrey_auto_payments_targets_train').toPandas().to_pickle('./train.pkl')

CPU times: user 7min 37s, sys: 48.7 s, total: 8min 25s
Wall time: 22min 45s


In [44]:
%%time
target_test.write.format('orc').mode('overwrite').saveAsTable('alfa.andrey_auto_payments_targets_test')
spark.table('alfa.andrey_auto_payments_targets_test').toPandas().to_pickle('./test.pkl')

CPU times: user 3min 43s, sys: 18.3 s, total: 4min 2s
Wall time: 14min 8s


______

______

______

______

______

______

______

______

______

______

______

______

______

______

______

______

______

______

______

______

______

______

______