In [2]:
%additional_python_modules duckdb==1.4.2

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.7 
Additional python modules to be included:
duckdb==1.4.2


In [1]:
%idle_timeout 10
%glue_version 5.0
%worker_type G.1X
%number_of_workers 2

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Current idle_timeout is None minutes.
idle_timeout has been set to 10 minutes.
Setting Glue version to: 5.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 2
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 2
Idle Timeout: 10
Session ID: 38b98433-9c0d-4de9-9c66-cfeb613b7965
Applying the following default arguments:
--glue_kernel_version 1.0.7
--enable-glue-datacatalog true
--additional-python-modules duckdb==1.4.2
Waiting for session 38b98433-9c0d-4de9-9c66-cfeb613b7965 to get into ready status...
Session 38b98433-9c0d-4de9-9c66-cfeb613b7965 has been created.



In [2]:
import duckdb
print("Installed package")

Installed package


In [3]:
import pandas as pd
import numpy as np




In [4]:
def load_raw():
    applications_train = pd.read_csv("s3://crisk-nico-prod/raw/applications/application_train.csv.gz", compression='gzip')
    applications_test = pd.read_csv("s3://crisk-nico-prod/raw/applications/application_test.csv.gz", compression='gzip')
    bureau = pd.read_csv("s3://crisk-nico-prod/raw/bureau/bureau_raw.csv.gz", compression='gzip')
    return applications_train, applications_test, bureau

def drop_redundancy(df, threshold = 0.9):

    corr_matrix = df.select_dtypes(include=[np.number]).corr().abs()
    upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))
    to_drop = [column for column in upper.columns if any(upper[column] > threshold)]
    df.drop(columns=to_drop, inplace=True)
    print(f"Dropped {len(to_drop)} correlated features")

# def days_to_years(df, cols):
#     for col in cols:
#         new_col = col.replace("DAYS", "YEARS") 
#         df[new_col] = (df[col] / 365).abs()    
#         df.drop(columns=[col], inplace=True)  
#         print(f"Dropped feature: {col}, New feature: {new_col}")

# def capping_children(df):
#     df["CNT_CHILDREN_CAPPED"] = df["CNT_CHILDREN"].clip(upper=4)
#     df.drop(columns=['CNT_CHILDREN'], inplace=True)

def drop_missing(df, threshold):
    missing_ratio = df.isna().mean()
    high_missing = missing_ratio[missing_ratio > threshold].index
    df.drop(columns=high_missing, inplace=True)
    print(f"Dropped {len(high_missing)} features")

def correlation_target(df, dtype):
    nan_df = df[df.columns[df.isna().any()]]
    nan_df_columns = (nan_df.select_dtypes(include=dtype).copy()).columns
    for col in nan_df_columns:
        print(df.groupby(df[col].isna())['TARGET'].mean())

def create_missing_flag(df, cols):
    # Create missing flags for informative columns
    for col in cols:
        if col in df.columns:
            df[f'{col}_IS_MISSING'] = df[col].isna().astype('Int8')

def fill_missing_numerical(df, cols, strategy):

    if strategy == 'median': 
        # Median imputation

        for col in cols:
            df.fillna({col: df[col].median()}, inplace=True)

    elif strategy == 'zero':
        # 0-imputation for count-like features

        for col in cols:
            df.fillna({col: 0}, inplace=True)
    else:
        raise ValueError("Strategy must be 'median' or 'zero'")
        

def fill_missing_categorical(df, unknown_cols):

    # Fill missing categories with 'unknown'
    for col in unknown_cols:
        if col in df.columns:
            df[col] = df[col].fillna('Unknown')

def one_hot_encoding(df, cat_cols):
    encoder = OneHotEncoder(drop='first', sparse_output = False, handle_unknown='ignore')
    encoded_array = encoder.fit_transform(df[cat_cols])
    
    encoded_df = pd.DataFrame(encoded_array, columns=encoder.get_feature_names_out(cat_cols), index=df.index)
    encoded_df = pd.concat([df.drop(columns = cat_cols), encoded_df], axis=1)
    assert len(encoded_df) == len(df), "Row count changed during OHE!"
    
    return encoded_df

def label_encoding(df, cats_col):
    encoded_df = df.copy()
    for col in cats_col:
        encoder = LabelEncoder()
        encoded_df[col] = encoder.fit_transform(df[col].astype(str))
    return encoded_df




In [5]:
applications_train, applications_test, bureau = load_raw()




In [6]:
drop_redundancy(applications_train)
drop_redundancy(applications_test)

Dropped 35 correlated features
Dropped 35 correlated features


In [7]:
# days_cols = ["DAYS_BIRTH", "DAYS_EMPLOYED"]

# days_to_years(applications_train, days_cols)

In [7]:
drop_missing(applications_train, threshold=0.4)
drop_missing(applications_test, threshold=0.4)

Dropped 18 features
Dropped 18 features


In [8]:
correlation_target(applications_train, 'number')

AMT_ANNUITY
False    0.080732
True     0.000000
Name: TARGET, dtype: float64
CNT_FAM_MEMBERS
False    0.080729
True     0.000000
Name: TARGET, dtype: float64
EXT_SOURCE_2
False    0.080733
True     0.078788
Name: TARGET, dtype: float64
EXT_SOURCE_3
False    0.077665
True     0.093119
Name: TARGET, dtype: float64
OBS_30_CNT_SOCIAL_CIRCLE
False    0.08088
True     0.03526
Name: TARGET, dtype: float64
DEF_30_CNT_SOCIAL_CIRCLE
False    0.08088
True     0.03526
Name: TARGET, dtype: float64
DEF_60_CNT_SOCIAL_CIRCLE
False    0.08088
True     0.03526
Name: TARGET, dtype: float64
DAYS_LAST_PHONE_CHANGE
False    0.080729
True     0.000000
Name: TARGET, dtype: float64
AMT_REQ_CREDIT_BUREAU_HOUR
False    0.077194
True     0.103374
Name: TARGET, dtype: float64
AMT_REQ_CREDIT_BUREAU_DAY
False    0.077194
True     0.103374
Name: TARGET, dtype: float64
AMT_REQ_CREDIT_BUREAU_WEEK
False    0.077194
True     0.103374
Name: TARGET, dtype: float64
AMT_REQ_CREDIT_BUREAU_MON
False    0.077194
True     0.1033

In [9]:
flag_cols = [
    'EXT_SOURCE_3', 'OBS_30_CNT_SOCIAL_CIRCLE', 'DEF_30_CNT_SOCIAL_CIRCLE',
    'DEF_60_CNT_SOCIAL_CIRCLE', 'AMT_REQ_CREDIT_BUREAU_HOUR', 'AMT_REQ_CREDIT_BUREAU_DAY',
    'AMT_REQ_CREDIT_BUREAU_WEEK', 'AMT_REQ_CREDIT_BUREAU_MON',
    'AMT_REQ_CREDIT_BUREAU_QRT', 'AMT_REQ_CREDIT_BUREAU_YEAR', 'NAME_TYPE_SUITE', 'OCCUPATION_TYPE'
]
create_missing_flag(applications_train, cols = flag_cols)
create_missing_flag(applications_test, cols = flag_cols)




In [10]:
median_cols = [
    'AMT_ANNUITY', 'CNT_FAM_MEMBERS', 'EXT_SOURCE_2',
    'EXT_SOURCE_3', 'DAYS_LAST_PHONE_CHANGE'
]
fill_missing_numerical(applications_train, cols = median_cols, strategy='median')
fill_missing_numerical(applications_test, cols = median_cols, strategy='median')




In [11]:
zero_cols = [
    'OBS_30_CNT_SOCIAL_CIRCLE', 'DEF_30_CNT_SOCIAL_CIRCLE',
    'DEF_60_CNT_SOCIAL_CIRCLE',
    'AMT_REQ_CREDIT_BUREAU_HOUR', 'AMT_REQ_CREDIT_BUREAU_DAY',
    'AMT_REQ_CREDIT_BUREAU_WEEK', 'AMT_REQ_CREDIT_BUREAU_MON',
    'AMT_REQ_CREDIT_BUREAU_QRT', 'AMT_REQ_CREDIT_BUREAU_YEAR'
]
fill_missing_numerical(applications_train, cols = zero_cols, strategy='zero')
fill_missing_numerical(applications_test, cols = zero_cols, strategy='zero')




In [12]:
correlation_target(applications_train, 'object')

NAME_TYPE_SUITE
False    0.080841
True     0.054180
Name: TARGET, dtype: float64
OCCUPATION_TYPE
False    0.087851
True     0.065131
Name: TARGET, dtype: float64


In [13]:
unknown_cols = ['NAME_TYPE_SUITE', 'OCCUPATION_TYPE']
fill_missing_categorical(applications_train,unknown_cols)
fill_missing_categorical(applications_test,unknown_cols)




In [14]:
drop_redundancy(bureau)

Dropped 0 correlated features


In [8]:
# days_cols = ['DAYS_CREDIT', 'DAYS_ENDDATE_FACT', 'DAYS_CREDIT_UPDATE']
# days_to_years(bureau, days_cols)

In [15]:
drop_missing(bureau, threshold = 0.4)

Dropped 2 features


In [16]:
median_cols = [
    'DAYS_CREDIT_ENDDATE', 'DAYS_ENDDATE_FACT'
]
fill_missing_numerical(bureau, cols = median_cols, strategy='median')




In [17]:
zero_cols = [
    'AMT_CREDIT_SUM_DEBT', 'AMT_CREDIT_SUM_LIMIT'
]
fill_missing_numerical(bureau, cols = zero_cols, strategy='zero')




In [18]:
con = duckdb.connect()
con.register('bureau', bureau)

<_duckdb.DuckDBPyConnection object at 0x7fce0cb0b8f0>


In [19]:
bureau_agg = con.sql("WITH base AS( " \
    # KEY/ID                        
    "SELECT SK_ID_CURR, " \
    # CREDIT_ACTIVE: Count actives
    "SUM(CASE WHEN CREDIT_ACTIVE = 'Active' THEN 1 ELSE 0 END)   AS ACTIVE_COUNT, " \
    "SUM(CASE WHEN CREDIT_ACTIVE = 'Closed' THEN 1 ELSE 0 END)   AS CLOSED_COUNT, " \
    "SUM(CASE WHEN CREDIT_ACTIVE = 'Sold' THEN 1 ELSE 0 END)     AS SOLD_COUNT, " \
    "SUM(CASE WHEN CREDIT_ACTIVE = 'Bad debt' THEN 1 ELSE 0 END) AS BAD_DEBT_COUNT, " \
    # CREDIT_CURRENCY: Count currency type (distinct)
    "COUNT(DISTINCT CREDIT_CURRENCY) AS CREDIT_CURRENCY_NUNIQUE," \
    # DAYS_CREDIT: Max, Min, Avg
    "MAX(DAYS_CREDIT) AS DAYS_CREDIT_MAX," \
    "MIN(DAYS_CREDIT) AS DAYS_CREDIT_MIN," \
    "AVG(DAYS_CREDIT) AS DAYS_CREDIT_AVG," \
    # SUM_CREDIT_DAY_OVERDUE: Sum
    "SUM(CREDIT_DAY_OVERDUE) AS CREDIT_DAY_OVERDUE_SUM," \
    # DAYS_CREDIT_ENDDATE max, min, avg
    "MAX(DAYS_CREDIT_ENDDATE) AS DAYS_CREDIT_ENDDATE_MAX," \
    "MIN(DAYS_CREDIT_ENDDATE) AS DAYS_CREDIT_ENDDATE_MIN," \
    "AVG(DAYS_CREDIT_ENDDATE) AS DAYS_CREDIT_ENDDATE_AVG," \
    # CNT_CREDIT_PROLONG sum, max
    "SUM(CNT_CREDIT_PROLONG) AS CNT_CREDIT_PROLONG_SUM," \
    "MAX(CNT_CREDIT_PROLONG) AS CNT_CREDIT_PROLONG_MAX," \
    # AMT_CREDIT_SUM max, sum, avg
    "MAX(AMT_CREDIT_SUM) AS AMT_CREDIT_SUM_MAX," \
    "SUM(AMT_CREDIT_SUM) AS AMT_CREDIT_SUM_SUM," \
    "AVG(AMT_CREDIT_SUM) AS AMT_CREDIT_SUM_AVG," \
    # AMT_CREDIT_SUM_DEBT max, sum, avg
    "MAX(AMT_CREDIT_SUM_DEBT) AS AMT_CREDIT_SUM_DEBT_MAX," \
    "SUM(AMT_CREDIT_SUM_DEBT) AS AMT_CREDIT_SUM_DEBT_SUM," \
    "AVG(AMT_CREDIT_SUM_DEBT) AS AMT_CREDIT_SUM_DEBT_AVG," \
    # AMT_CREDIT_SUM_LIMIT max, sum, avg
    "MAX(AMT_CREDIT_SUM_LIMIT) AS AMT_CREDIT_SUM_LIMIT_MAX," \
    "SUM(AMT_CREDIT_SUM_LIMIT) AS AMT_CREDIT_SUM_LIMIT_SUM," \
    "AVG(AMT_CREDIT_SUM_LIMIT) AS AMT_CREDIT_SUM_LIMIT_AVG," \
    # AMT_CREDIT_SUM_OVERDUE max, sum, avg
    "MAX(AMT_CREDIT_SUM_OVERDUE) AS AMT_CREDIT_SUM_OVERDUE_MAX," \
    "SUM(AMT_CREDIT_SUM_OVERDUE) AS AMT_CREDIT_SUM_OVERDUE_SUM," \
    "AVG(AMT_CREDIT_SUM_OVERDUE) AS AMT_CREDIT_SUM_OVERDUE_AVG," \
    # CREDIT_TYPE count credit type (distinct)
    "COUNT(DISTINCT CREDIT_TYPE) AS CREDIT_TYPE_NUNIQUE," \
    # DAYS_CREDIT_UPDATE max, avg
    "MAX(DAYS_CREDIT_UPDATE) AS DAYS_CREDIT_UPDATE_MAX," \
    "AVG(DAYS_CREDIT_UPDATE) AS DAYS_CREDIT_UPDATE_AVG," \
    # DAYS_ENDDATE_FACT mean
    "AVG(DAYS_ENDDATE_FACT) AS DAYS_ENDDATE_FACT_AVG," \
    # Portfolio size
    "COUNT(*) AS TOTAL_LOANS " \
    "FROM bureau GROUP BY SK_ID_CURR)" \
"SELECT * FROM base").df()




In [20]:
apps_train_merged_bureau_agg = applications_train.merge(bureau_agg, on='SK_ID_CURR', how='left').fillna(0)
apps_test_merged_bureau_agg = applications_test.merge(bureau_agg, on='SK_ID_CURR', how='left').fillna(0)





In [9]:
# apps_train_merged_bureau_agg["bucket_id"] = apps_train_merged_bureau_agg["SK_ID_CURR"] % 8

# CURATED = "s3://crisk-nico-prod/curated/apps_merged_bureau_agg"

# for bucket, df_part in apps_merged_bureau_agg.groupby("bucket_id"):
#     output_path = f"{CURATED}/bucket_id={bucket}/data.parquet"
#     df_part.to_parquet(output_path, index=False)


In [21]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

CURATED = "s3://crisk-nico-prod/curated/apps_merged_bureau_agg_1"

apps_train_merged_bureau_agg["bucket_id"] = apps_train_merged_bureau_agg["SK_ID_CURR"] % 8
spark_df = spark.createDataFrame(apps_train_merged_bureau_agg)

spark_df.write.mode("overwrite") \
    .partitionBy("bucket_id") \
    .parquet(CURATED)




In [22]:
CURATED = "s3://crisk-nico-prod/curated/apps_merged_bureau_agg_test"

apps_test_merged_bureau_agg["bucket_id"] = apps_test_merged_bureau_agg["SK_ID_CURR"] % 8
spark_df = spark.createDataFrame(apps_test_merged_bureau_agg)

spark_df.write.mode("overwrite") \
    .partitionBy("bucket_id") \
    .parquet(CURATED)


