In [1]:
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [2]:
import pandas as pd
import numpy as np
import pyspark.sql.functions as f

from sklearn.metrics import roc_auc_score
from pyspark.sql.window import Window
from pyspark.sql import SparkSession

In [3]:
# Для вывода информации о сессии:
# spark.sparkContext
spark = (
    SparkSession
    .builder
    .appName("name")
    .getOrCreate()
)

In [4]:
config = {
    'data_path':'data/data_v1'
}

In [5]:
df = (
    spark
    .read
    .parquet(config['data_path'])
)
df.show()

+---------+------+----+----+----+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+--------------------+-----+-----+-----+--------------------+-----+-----+-----+-------------+-----+-----+-----+-------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+-----------------+------+------+------+------+------+------+------+------+------+------+------+----

In [7]:
def unpivot(df, cols, name_col: str, name_value: str):
    unpvt = (
        f.explode(
            f.array(*[f.struct(f.lit(col).alias('name_col'),f.col(col).alias('name_value'))for col in cols]
            )
        )
        .alias('unpvt')
    )

    unchanged_cols = list(filter(lambda x: x not in cols, df.columns))
    
    return (
        df
        .select(*unchanged_cols, unpvt)
        .select(*unchanged_cols,f.col('unpvt.name_col').alias(name_col),f.col('unpvt.name_value').alias(name_value)
        )
    )
    

In [8]:
features = [col for col in df.columns if col not in ['target', 'client_id', 'report_date']]

w = Window.partitionBy('report_date', 'feature_name', 'feature_value')
unpivot_df = (
    unpivot(df, features, 'feature_name', 'feature_value')
    .withColumn(
        'value_freq',
        f.count(f.when(f.col('feature_value').isNotNull(), f.col('feature_value')))
        .over(w)
    )
)

not_null_percent = (
    unpivot_df
    .groupBy('report_date', 'feature_name')
    .agg((f.count('feature_value') / f.count('*')).alias('not_null_percent'))

)

mode_frequency = (
    unpivot_df
    .groupBy('report_date', 'feature_name')
    .agg((f.max('value_freq') / f.count('feature_value')).alias('mode_frequency'))
)

factor_df = not_null_percent.join(mode_frequency, on=['report_date','feature_name'], how='inner').toPandas()
factor_df.head()

23/09/08 20:13:02 WARN DAGScheduler: Broadcasting large task binary with size 1596.5 KiB
23/09/08 20:13:03 WARN DAGScheduler: Broadcasting large task binary with size 1669.6 KiB
23/09/08 20:14:23 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
23/09/08 20:14:48 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
23/09/08 20:15:32 WARN DAGScheduler: Broadcasting large task binary with size 4.3 MiB
                                                                                

Unnamed: 0,report_date,feature_name,not_null_percent,mode_frequency
0,2021-02-01,col1271,0.0,
1,2021-02-01,col2271,0.083333,0.222222
2,2021-02-01,col245,0.0,
3,2021-02-01,col2548,0.027778,0.333333
4,2021-02-01,col336,0.0,


In [9]:
df = (
    pd.read_parquet('/kaggle/input/data-v1/data_v1')
)

gini_list = []
feature_list = []
for feature in features:
    try:
        data = df[df[feature].isnull() == False]
        gini = np.round(abs((roc_auc_score(data['target'],data[feature]))*2 - 1),3)
        gini_list.append(gini)
        feature_list.append(feature)
    except:
        
        gini_list.append(np.nan)
        feature_list.append(feature)


gini_df = pd.DataFrame(
    {
        'ind_gini':gini_list,
        'feature_name':feature_list
    }
)
gini_df.head()

Unnamed: 0,ind_gini,feature_name
0,,col1
1,,col2
2,,col3
3,,col4
4,,col5


In [13]:
obj = list(df.select_dtypes('object').columns)
features_corr = [f for f in features if f not in obj]

correlation_matrix_pd = df[features_corr].corr(method='spearman')


threshold = 0.85
groups = {}
group_number = 1

for i, col in enumerate(correlation_matrix_pd.columns):
    for j in range(i+1, len(correlation_matrix_pd.columns)):
        if abs(correlation_matrix_pd.iloc[i, j]) > threshold:
            if col not in groups:
                groups[col] = group_number
                group_number += 1
            if correlation_matrix_pd.columns[j] not in groups:
                groups[correlation_matrix_pd.columns[j]] = groups[col]
                
corr_df = pd.DataFrame(groups, index=range(1)).T.reset_index()
corr_df.columns = ['feature_name','cor_group']
corr_df.head()

Unnamed: 0,feature_name,cor_group
0,col1,1
1,col2,1
2,col3,1
3,col4,1
4,col73,1


In [15]:
corr_df['cor_group'].value_counts().head(30)

cor_group
1     744
2     538
4     229
8     125
70     33
3      29
72     24
6      19
10     16
54     14
71     12
40     10
91      8
79      8
92      8
5       8
29      7
42      5
53      4
85      4
74      3
75      3
52      3
32      3
78      3
7       3
47      3
76      3
81      3
58      2
Name: count, dtype: int64

In [16]:
factor_df = (
    factor_df
    .merge(gini_df, on='feature_name', how='left')
    .merge(corr_df, on='feature_name', how='left')
)

factor_df.to_parquet('/kaggle/working/factor_analysis/factor_analysis.parquet')

Unnamed: 0,report_date,feature_name,not_null_percent,mode_frequency,ind_gini,cor_group
0,2021-02-01,col1271,0.0,,0.132,8.0
1,2021-02-01,col2271,0.083333,0.222222,0.006,1.0
2,2021-02-01,col245,0.0,,0.022,12.0
3,2021-02-01,col2548,0.027778,0.333333,0.172,1.0
4,2021-02-01,col336,0.0,,0.144,1.0


In [18]:
factor_df.to_parquet('/kaggle/working/factor_analysis.parquet')

In [None]:
!zip -r /kaggle/working/factor_analysis.zip factor_analysis