In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window
spark = SparkSession.builder.appName("abc")\
.config('spark.driver.memory','80g')\
.config('spark.sql.legacy.timeParserPolicy','LEGACY')\
.config('spark.sql.codegen.wholeStage', 'false')\
.getOrCreate()

In [None]:
df_hist_old = spark.read.parquet('/home/data/transfer/kissht/data/training_data/history_data.parquet', inferSchema = True, header =True)
df_inq = spark.read.parquet('/home/data/transfer/kissht/data/training_data/inquiry_data.parquet', inferSchema = True, header =True)
df_trade = spark.read.parquet('/home/data/transfer/kissht/data/training_data/tradeline_data.parquet', inferSchema = True, header =True)
df_target = spark.read.parquet('/home/data/transfer/kissht/data/training_data/target_data.parquet', inferSchema = True, header =True)

# TARGET

In [None]:
target = df_target.select("FB_TRANSACTION_ID", "MONTH", "SCHEDULED_PAYMENT_DATE", "PAID_T30").filter(F.col("MONTH").isin([1, 2, 3]))\
                      .withColumn("PAID_T30_transformed", 1 - F.col("PAID_T30"))

In [None]:
target_df = target.groupBy("FB_TRANSACTION_ID").agg(
                  F.max("PAID_T30_transformed").cast('int').alias("TARGET")
        )
target_df.show(2)

# Event Rate

In [5]:
total_instances = target_df.count()
positive_instances = target_df.filter(F.col("TARGET") == 1.0).count()
event_rate = (positive_instances / total_instances)*100
print("Event Rate:", event_rate)

Event Rate: 5.963267619441508


# FEATURES

### Filters

In [6]:
df_trade.show(2)

24/01/16 12:26:07 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+--------------------+-----------------------+-------------------+--------+------------------+-----------------+---------------------+-----------------------------+----------+--------------+-----------------+-----------+-------------+----------+---------------+--------------+------------------+----------------------------+-------------------------------+-------------------------+--------------------------+------------------------+-------------------+------------+----------+---------------------+----------------+---------------+----------------+
|   FB_TRANSACTION_ID|BUREAU_REFERENCE_NUMBER|         CREATED_AT|      ID|LOAN_SERIAL_NUMBER|ACCOUNT_TYPE_CODE|REPORTING_MEMBER_NAME|HIGHEST_CREDIT_OR_LOAN_AMOUNT| OPEN_DATE|REPORTING_DATE|LAST_PAYMENT_DATE|CLOSED_DATE|INTEREST_RATE|EMI_AMOUNT|CURRENT_BALANCE|AMOUNT_OVERDUE|WRITTEN_OFF_AMOUNT|WRITTEN_OFF_AMOUNT_PRINCIPAL|IS_SUIT_FILED_OR_WILFUL_DEFAULT|IS_WRITTEN_OFF_OR_SETTLED|PAYMENT_HISTORY_START_DATE|PAYMENT_HISTORY_END_DATE|ACCOUNT_HOLDER_

In [8]:
df_trade_1 = df_trade.withColumn("CREATED_AT", F.when(F.col("CREATED_AT")>= "1950-01-01", F.col("CREATED_AT")).otherwise(None))\
                     .withColumn("OPEN_DATE", F.when(F.col("OPEN_DATE")>= "1950-01-01", F.col("OPEN_DATE")).otherwise(None))\
                     .withColumn("REPORTING_DATE", F.when(F.col("REPORTING_DATE")>= "1950-01-01", F.col("REPORTING_DATE")).otherwise(None))\
                     .withColumn("LAST_PAYMENT_DATE", F.when(F.col("LAST_PAYMENT_DATE")>= "1950-01-01", F.col("LAST_PAYMENT_DATE")).otherwise(None))\
                     .withColumn("CLOSED_DATE", F.when(F.col("CLOSED_DATE")>= "1950-01-01", F.col("CLOSED_DATE")).otherwise(None))\
                     .withColumn("PAYMENT_HISTORY_START_DATE", F.when(F.col("PAYMENT_HISTORY_START_DATE")>= "1950-01-01", F.col("PAYMENT_HISTORY_START_DATE")).otherwise(None))\
                     .withColumn("PAYMENT_HISTORY_END_DATE", F.when(F.col("PAYMENT_HISTORY_END_DATE")>= "1950-01-01", F.col("PAYMENT_HISTORY_END_DATE")).otherwise(None))

In [9]:
condition = (F.col("CREATED_AT") > F.col("OPEN_DATE")) & \
            (F.col("CREATED_AT") > F.col("PAYMENT_HISTORY_START_DATE"))&\
            (F.col("CREATED_AT") > F.col("PAYMENT_HISTORY_END_DATE"))&\
            (F.col("CLOSED_DATE").isNull() | (F.col("CLOSED_DATE") > F.col("OPEN_DATE")))&\
            (F.col("OPEN_DATE").isNotNull() & F.col("CREATED_AT").isNotNull() & F.col("PAYMENT_HISTORY_START_DATE").isNotNull())

df_trade_1 = df_trade_1.filter(condition)
window1 = Window.partitionBy("FB_TRANSACTION_ID", "ID").orderBy(F.col("REPORTING_DATE").desc(), F.col("CURRENT_BALANCE").desc(), F.col("CLOSED_DATE").desc())
df_trade_filtered = df_trade_1.withColumn("row_number", F.row_number().over(window1)).filter(F.col("row_number") == 1).drop("row_number")

In [11]:
df_trade_1.count()

                                                                                

2368450

In [12]:
df_trade.count()

2388304

### Joining Tradeline with History

In [13]:
df_hist = df_hist_old.select('FB_TRANSACTION_ID', 'BUREAU_LOAN_ID', 'DATE', 'DPD', 'ASSET_CODE', 'CREATED_AT')\
                     .withColumnRenamed("BUREAU_LOAN_ID", "ID")\
                     .withColumnRenamed("CREATED_AT", "CREATED_AT_H")\
                     .withColumn("CREATED_AT_H", F.trunc(F.col("CREATED_AT_H"), "month"))\
                     .withColumn("DATE", F.trunc(F.col("DATE"), "month"))\
                     .withColumn("month_diff", (F.months_between(F.col("CREATED_AT_H"), F.col("DATE"))).cast('int'))

In [14]:
hist_exp = ["*",
    """
    case when DPD >= 360 then 360
    when (DPD >= 180 and DPD < 360) or ASSET_CODE in ('LSS' , 'DBT') then 180
    when (DPD >= 150 and DPD < 180) then 150
    when (DPD >= 90 and DPD < 150) or ASSET_CODE = 'SUB' then 90
    when (DPD >= 60 and DPD < 90) or ASSET_CODE = 'SMA' then 60
    when (DPD >= 30 and DPD < 60) then 30
    when (DPD >= 1 and DPD < 30) then 1
    when (DPD = 0) or ASSET_CODE = 'STD' then 0
    else DPD end as DPD_NEW
    """]

In [15]:
df_hist=df_hist.selectExpr(hist_exp)
# df_hist.show(2)

In [19]:
df_hist.show()

+--------------------+--------+----------+---+----------+------------+----------+-------+
|   FB_TRANSACTION_ID|      ID|      DATE|DPD|ASSET_CODE|CREATED_AT_H|month_diff|DPD_NEW|
+--------------------+--------+----------+---+----------+------------+----------+-------+
|FAST6167832445294185|49355733|2022-01-01|  0|          |  2022-10-01|         9|      0|
|FAST6167832445294185|49355726|2022-06-01|  0|          |  2022-10-01|         4|      0|
|FAST6167832445294185|49355717|2022-07-01|  0|          |  2022-10-01|         3|      0|
|FAST6167832445294185|49355733|2022-03-01|  0|          |  2022-10-01|         7|      0|
|FAST6167832445294185|49355733|2022-04-01|  0|          |  2022-10-01|         6|      0|
|FAST6167832445294185|49355718|2022-07-01|  0|          |  2022-10-01|         3|      0|
|FAST6167832445294185|49355732|2022-01-01|  0|          |  2022-10-01|         9|      0|
|FAST6167832445294185|49355738|2021-10-01|  0|          |  2022-10-01|        12|      0|
|FAST61678

In [16]:
tradeline_hist = df_trade_filtered.join(df_hist, on = ["FB_TRANSACTION_ID", "ID"], how = "inner")
tradeline_hist = tradeline_hist.drop(*['CREATED_AT_H'])
# tradeline_hist.show(2)

In [17]:
tradeline_hist.count()

                                                                                

14721227

In [18]:
df_hist.count()

                                                                                

14749126

In [40]:
filtered_tradeline.count()

                                                                                

2368450

In [41]:
tradeline_hist.count()

                                                                                

14749126

In [42]:
filtered_tradeline.show(5)

[Stage 125:>                                                        (0 + 1) / 1]

+--------------------+--------+-----------------------+-------------------+------------------+-----------------+---------------------+-----------------------------+----------+--------------+-----------------+-----------+-------------+----------+---------------+--------------+------------------+----------------------------+-------------------------------+-------------------------+--------------------------+------------------------+-------------------+------------+----------+---------------------+----------------+---------------+----------------+----------+---+----------+----------+-------+
|   FB_TRANSACTION_ID|      ID|BUREAU_REFERENCE_NUMBER|         CREATED_AT|LOAN_SERIAL_NUMBER|ACCOUNT_TYPE_CODE|REPORTING_MEMBER_NAME|HIGHEST_CREDIT_OR_LOAN_AMOUNT| OPEN_DATE|REPORTING_DATE|LAST_PAYMENT_DATE|CLOSED_DATE|INTEREST_RATE|EMI_AMOUNT|CURRENT_BALANCE|AMOUNT_OVERDUE|WRITTEN_OFF_AMOUNT|WRITTEN_OFF_AMOUNT_PRINCIPAL|IS_SUIT_FILED_OR_WILFUL_DEFAULT|IS_WRITTEN_OFF_OR_SETTLED|PAYMENT_HISTORY_START_

                                                                                

### Trancate

In [39]:
tradeline_hist_truncated = tradeline_hist\
                                .withColumn("CREATED_AT_TRUNC", F.trunc(F.col("CREATED_AT"), "month"))\
                                .withColumn("OPEN_DATE_TRUNC", F.trunc(F.col("OPEN_DATE"), "month"))\
                                .withColumn("REPORTING_DATE_TRUNC", F.trunc(F.col("REPORTING_DATE"), "month"))\
                                .withColumn("LAST_PAYMENT_DATE_TRUNC", F.trunc(F.col("LAST_PAYMENT_DATE"), "month"))\
                                .withColumn("CLOSED_DATE_TRUNC", F.trunc(F.col("CLOSED_DATE"), "month"))\
                                .withColumn("PAYMENT_HISTORY_START_DATE_TRUNC", F.trunc(F.col("PAYMENT_HISTORY_START_DATE"), "month"))\
                                .withColumn("PAYMENT_HISTORY_END_DATE_TRUNC", F.trunc(F.col("PAYMENT_HISTORY_END_DATE"), "month"))\
                                .withColumn('ACCOUNT_TYPE_CODE', F.col('ACCOUNT_TYPE_CODE').cast('int'))\
                                .withColumn('DATE_TRUNC',F.trunc(F.col("DATE"), "month"))\
.withColumn("vintage_bureau", F.months_between(F.col("CREATED_AT_TRUNC"), F.col("OPEN_DATE_TRUNC")).cast('int'))

In [40]:
tradeline_hist_truncated.select("FB_TRANSACTION_ID", "CREATED_AT_TRUNC", "OPEN_DATE_TRUNC", "vintage_bureau").show()

                                                                                

+--------------------+----------------+---------------+--------------+
|   FB_TRANSACTION_ID|CREATED_AT_TRUNC|OPEN_DATE_TRUNC|vintage_bureau|
+--------------------+----------------+---------------+--------------+
|FAST6167832445294185|      2022-10-01|     2022-01-01|             9|
|FAST6167832445294185|      2022-10-01|     2022-03-01|             7|
|FAST6167832445294185|      2022-10-01|     2022-07-01|             3|
|FAST6167832445294185|      2022-10-01|     2022-01-01|             9|
|FAST6167832445294185|      2022-10-01|     2022-01-01|             9|
|FAST6167832445294185|      2022-10-01|     2022-07-01|             3|
|FAST6167832445294185|      2022-10-01|     2022-01-01|             9|
|FAST6167832445294185|      2022-10-01|     2021-10-01|            12|
|FAST6167832445294185|      2022-10-01|     2021-12-01|            10|
|FAST6167832445294185|      2022-10-01|     2022-03-01|             7|
|FAST6167832445294185|      2022-10-01|     2021-10-01|            12|
|FAST6

In [48]:
window2 = Window.partitionBy("FB_TRANSACTION_ID")
window3 = Window.partitionBy("FB_TRANSACTION_ID").orderBy(F.col("OPEN_DATE").asc())

In [66]:
tradeline = tradeline_hist_truncated\
                                    .withColumn("max_vintage_bureau", F.max("vintage_bureau").over(window2))\
                                    .withColumn("min_vintage_bureau", F.min("vintage_bureau").over(window2))\
                                    .withColumn('lagged_open_date', F.lag('OPEN_DATE').over(window3))\
                                    .withColumn("days_diff", F.datediff(F.col("lagged_open_date"), F.col("OPEN_DATE")))\
                                    # .withColumn("M1", F.add_months(F.col("CREATED_AT_TRUNC"), -1))\
                                    # .withColumn("M2", F.add_months(F.col("CREATED_AT_TRUNC"), -2))\
                                    # .withColumn("M6", F.add_months(F.col("CREATED_AT_TRUNC"), -6))\
                                    # .withColumn("M7", F.add_months(F.col("CREATED_AT_TRUNC"), -7))\
                                    # .withColumn("M8", F.add_months(F.col("CREATED_AT_TRUNC"), -8))

In [67]:
tradeline.filter(F.col("FB_TRANSACTION_ID") == "FAST1167168558886728").select("FB_TRANSACTION_ID", "CREATED_AT_TRUNC", "OPEN_DATE_TRUNC", "ACCOUNT_TYPE_CODE", "vintage_bureau", "max_vintage_bureau", "min_vintage_bureau", "OPEN_DATE", "lagged_open_date", "days_diff").show(20)



+--------------------+----------------+---------------+-----------------+--------------+------------------+------------------+----------+----------------+---------+
|   FB_TRANSACTION_ID|CREATED_AT_TRUNC|OPEN_DATE_TRUNC|ACCOUNT_TYPE_CODE|vintage_bureau|max_vintage_bureau|min_vintage_bureau| OPEN_DATE|lagged_open_date|days_diff|
+--------------------+----------------+---------------+-----------------+--------------+------------------+------------------+----------+----------------+---------+
|FAST1167168558886728|      2022-12-01|     2014-08-01|               10|           100|               100|                 1|2014-08-13|            NULL|     NULL|
|FAST1167168558886728|      2022-12-01|     2014-08-01|               10|           100|               100|                 1|2014-08-13|      2014-08-13|        0|
|FAST1167168558886728|      2022-12-01|     2014-08-01|               10|           100|               100|                 1|2014-08-13|      2014-08-13|        0|
|FAST11671

                                                                                

In [65]:
tradeline.filter((F.col("FB_TRANSACTION_ID") == "FAST1167168558886728") & (F.col("vintage_bureau") == F.col("min_vintage_bureau"))).select("FB_TRANSACTION_ID", "ACCOUNT_TYPE_CODE", "CREATED_AT_TRUNC", "OPEN_DATE_TRUNC", "vintage_bureau", "max_vintage_bureau", "min_vintage_bureau", "OPEN_DATE", "lagged_open_date").show(20)



+--------------------+-----------------+----------------+---------------+--------------+------------------+------------------+----------+----------------+
|   FB_TRANSACTION_ID|ACCOUNT_TYPE_CODE|CREATED_AT_TRUNC|OPEN_DATE_TRUNC|vintage_bureau|max_vintage_bureau|min_vintage_bureau| OPEN_DATE|lagged_open_date|
+--------------------+-----------------+----------------+---------------+--------------+------------------+------------------+----------+----------------+
|FAST1167168558886728|                5|      2022-12-01|     2022-11-01|             1|               100|                 1|2022-11-23|      2022-10-13|
|FAST1167168558886728|                5|      2022-12-01|     2022-11-01|             1|               100|                 1|2022-11-23|      2022-11-23|
|FAST1167168558886728|               10|      2022-12-01|     2022-11-01|             1|               100|                 1|2022-11-25|      2022-11-23|
+--------------------+-----------------+----------------+-------------

                                                                                

In [13]:
exp_tradeline = ["*",
       "case when vintage_bureau = max_vintage_bureau then account_type_code end as first_product",
       "case when vintage_bureau = min_vintage_bureau then account_type_code end as latest_product",
       "case when (vintage_bureau >= 0 and vintage_bureau < 1) then 1 else 0 end as open_flag_1",
       "case when (vintage_bureau >= 0 and vintage_bureau < 3) then 1 else 0 end as open_flag_3",
       "case when (vintage_bureau >= 0 and vintage_bureau < 6) then 1 else 0 end as open_flag_6",
       "case when (vintage_bureau >= 0 and vintage_bureau < 12) then 1 else 0 end as open_flag_12",
       "case when (vintage_bureau >= 0 and vintage_bureau < 36) then 1 else 0 end as open_flag_36",
       """
          case when account_type_code in (1, 34, 32) then 'AL'
               when account_type_code in  (51, 53, 54, 52, 50, 61, 59, 55, 57, 56, 23, 24, 40, 39) then 'BL'
               when account_type_code in (35, 10, 16, 36, 15, 31) then 'CC'
               when account_type_code = 6 then 'CD'
               when account_type_code = 33 then 'CE'
               when account_type_code = 17 then 'CV'
               when account_type_code = 8 then 'EL'
               when account_type_code = 7 then 'GL'
               when account_type_code in (2, 42, 44) then 'HL'
               when account_type_code = 3 then 'LAP'
               when account_type_code = 4 then 'LAS'
               when account_type_code in (12, 38) then 'OD'
               when account_type_code in (14, 0) then 'Other'
               when account_type_code in (37, 9, 43, 41, 45, 5) then 'PL'
               when account_type_code = 13 then 'TW'
               else null end as product_type
               """,
       """case when account_type_code in (1, 51, 53, 54, 52, 50, 59, 55, 56, 57, 17, 33, 23, 7, 2, 15, 4, 40, 42, 43, 41, 44, 3, 31, 34, 13, 32) then 'Secured'
               when account_type_code in (61, 6, 35, 10, 8, 16, 24, 36, 37, 9, 39, 14, 0, 12, 45, 5, 38) then 'Unsecured'
               else null end as risk_type
               """,
       "case when product_type = 'HL' then 1 else 0 end as HL_flag",
       "case when product_type = 'BL' then 1 else 0 end as BL_flag",
       "case when product_type = 'PL' then 1 else 0 end as PL_flag",
       "case when account_type_code in (1,51,53,54,52,50,61,59,17,33,6,8,16,23,24,7,2,15,4,37,9,40,42,43,41,39,0,45,5,0,3,34,13,32) then 1 else 0 end as installment_flag",
       "case when account_type_code in (55,56,57,35,10,36,14,12,38,31) then 1 else 0 end as revolving_flag",
       
       """
          case when closed_date is not null and closed_date < created_at then 0
               when installment_flag = 1 and current_balance = 0 and last_payment_date < created_at then 0
               else 1 end as live_flag
               """,       
       """
          case when closed_date is not null and closed_date < M1 then 0
               when installment_flag = 1 and current_balance = 0 and last_payment_date < M1 then 0
               else 1 end as M1_live_flag
               """,       
       """
          case when closed_date is not null and closed_date < M2 then 0
               when installment_flag = 1 and current_balance = 0 and last_payment_date < M2 then 0
               else 1 end as M2_live_flag
               """,       
       """
          case when closed_date is not null and closed_date < M6 then 0
               when installment_flag = 1 and current_balance = 0 and last_payment_date < M6 then 0
               else 1 end as M6_live_flag
               """,       
       """
          case when closed_date is not null and closed_date < M7 then 0
               when installment_flag = 1 and current_balance = 0 and last_payment_date < M7 then 0
               else 1 end as M7_live_flag
               """,       
       """
          case when closed_date is not null and closed_date < M8 then 0
               when installment_flag = 1 and current_balance = 0 and last_payment_date < M8 then 0
               else 1 end as M8_live_flag
               """,
       "case when product_type = 'PL' and  live_flag = 1 then 1 else 0 end as PL_live_flag",
       "case when product_type = 'PL' and open_flag_1 = 1 then 1 else 0 end as PL_open_flag_1",
       "case when product_type = 'PL' and open_flag_3 = 1 then 1 else 0 end as PL_open_flag_3",
       "case when product_type = 'PL' and open_flag_6 = 1 then 1 else 0 end as PL_open_flag_6",
       "case when product_type = 'PL' and open_flag_12 = 1 then 1 else 0 end as PL_open_flag_12",
       "case when product_type = 'CC' then 1 else 0 end as CC_flag",
       "case when product_type = 'CC' and open_flag_1 = 1 then 1 else 0 end as CC_open_flag_1",
       "case when product_type = 'CC' and open_flag_3 = 1 then 1 else 0 end as CC_open_flag_3",
       "case when product_type = 'CC' and open_flag_6 = 1 then 1 else 0 end as CC_open_flag_6",
       "case when product_type = 'CC' and open_flag_12 = 1 then 1 else 0 end as CC_open_flag_12",
       "case when risk_type = 'Secured' then 1 else 0 end as Secured_flag",
       "case when risk_type = 'Unsecured' then 1 else 0 end as Unsecured_flag",
       "case when risk_type = 'Unsecured' and open_flag_1 = 1 then 1 else 0 end as Unsecured_open_flag_1",
       "case when risk_type = 'Unsecured' and open_flag_3 = 1 then 1 else 0 end as Unsecured_open_flag_3",
       "case when risk_type = 'Unsecured' and open_flag_6 = 1 then 1 else 0 end as Unsecured_open_flag_6",
       "case when risk_type = 'Unsecured' and open_flag_12 = 1 then 1 else 0 end as Unsecured_open_flag_12",
                 
       "case when live_flag = 1 or M1_live_flag = 1 or M2_live_flag = 1 then 1 else 0 end as M0_2_live_flag",       
       "case when M6_live_flag = 1 or M7_live_flag = 1 or M8_live_flag = 1 then 1 else 0 end as M6_8_live_flag",
       "case when M0_2_live_flag = 1 and Unsecured_flag = 1 then 1 else 0 end as M0_2_unsecured_live_flag",
       "case when M6_8_live_flag = 1 and Unsecured_flag = 1 then 1 else 0 end as M6_8_unsecured_live_flag",
       "case when M0_2_live_flag = 1 and installment_flag = 1 then 1 else 0 end as M0_2_installment_live_flag",
                 
       "case when product_type = 'PL' then current_balance else 0 end as PL_balance",
       "case when product_type = 'BL' then current_balance else 0 end as BL_balance",
       "case when product_type = 'CD' then current_balance else 0 end as CD_balance",
       "case when product_type = 'AL' then current_balance else 0 end as AL_balance",
       "case when product_type = 'HL' then current_balance else 0 end as HL_balance",
       "case when product_type = 'CC' then current_balance else 0 end as CC_balance",
       "case when product_type = 'PL' then HIGHEST_CREDIT_OR_LOAN_AMOUNT else 0 end as PL_sanctioned_amount",
       "case when product_type = 'BL' then HIGHEST_CREDIT_OR_LOAN_AMOUNT else 0 end as BL_sanctioned_amount",
       "case when product_type = 'CD' then HIGHEST_CREDIT_OR_LOAN_AMOUNT else 0 end as CD_sanctioned_amount",
       "case when product_type = 'AL' then HIGHEST_CREDIT_OR_LOAN_AMOUNT else 0 end as AL_sanctioned_amount",
       "case when product_type = 'HL' then HIGHEST_CREDIT_OR_LOAN_AMOUNT else 0 end as HL_sanctioned_amount",
       "case when product_type = 'CC' then HIGHEST_CREDIT_OR_LOAN_AMOUNT else 0 end as CC_sanctioned_amount",
       "case when IS_SUIT_FILED_OR_WILFUL_DEFAULT = 1 or IS_WRITTEN_OFF_OR_SETTLED = 1 then 1 else 0 end as writtenoff_suit_flag",
       "case when writtenoff_suit_flag = 1 and vintage_bureau <= 36 and vintage_bureau >= 0 then 1 else 0 end as writtenoff_suit_last36_flag",
       "case when open_flag_6 = 1 then days_diff else 0 end as days_open_last6",
       "case when open_flag_12 = 1 then days_diff else 0 end as days_open_last12",
       "case when open_flag_36 = 1 then days_diff else 0 end as days_open_last36",
       "case when open_flag_6 = 1 and risk_type = 'Unsecured' then days_diff else 0 end as days_unsecured_open_last6",
       "case when open_flag_12 = 1 and risk_type = 'Unsecured' then days_diff else 0 end as days_unsecured_open_last12",
       "case when open_flag_36 = 1 and risk_type = 'Unsecured' then days_diff else 0 end as days_unsecured_open_last36",
       "case when open_flag_6 = 1 and product_type = 'PL' then days_diff else 0 end as days_PL_open_last6",
       "case when open_flag_12 = 1 and product_type = 'PL' then days_diff else 0 end as days_PL_open_last12",
       "case when open_flag_36 = 1 and product_type = 'PL' then days_diff else 0 end as days_PL_open_last36",
       "case when product_type = 'PL' and open_flag_36 = 1 then vintage_bureau else 0 end as PL_vintage_last36",
       "case when product_type = 'BL' and open_flag_36 = 1 then vintage_bureau else 0 end as BL_vintage_last36",
       "case when product_type = 'CC' and open_flag_36 = 1 then vintage_bureau else 0 end as CC_vintage_last36",
       "case when product_type = 'CD' and open_flag_36 = 1 then vintage_bureau else 0 end as CD_vintage_last36",
       "case when product_type = 'CC' then CURRENT_BALANCE/CREDIT_LIMIT else 0 end as CC_util",
       "case when dpd_new >= 0 and open_flag_3 = 1 then 1 else 0 end as 0_3_dpd_flag",
       "case when dpd_new >= 0 and open_flag_12 = 1 then 1 else 0 end as 0_12_dpd_flag",
       "case when dpd_new >= 0 and open_flag_36 = 1 then 1 else 0 end as 0_36_dpd_flag",
       "case when dpd_new >= 1 and open_flag_3 = 1 then 1 else 0 end as 1_3_dpd_flag",
       "case when dpd_new >= 1 and open_flag_12 = 1 then 1 else 0 end as 1_12_dpd_flag",
       "case when dpd_new >= 1 and open_flag_36 = 1 then 1 else 0 end as 1_36_dpd_flag",
       "case when dpd_new >= 30 and open_flag_3 = 1 then 1 else 0 end as 30_3_dpd_flag",
       "case when dpd_new >= 30 and open_flag_12 = 1 then 1 else 0 end as 30_12_dpd_flag",
       "case when dpd_new >= 30 and open_flag_36 = 1 then 1 else 0 end as 30_36_dpd_flag",
       "case when dpd_new >= 60 and open_flag_3 = 1 then 1 else 0 end as 60_3_dpd_flag",
       "case when dpd_new >= 60 and open_flag_12 = 1 then 1 else 0 end as 60_12_dpd_flag",
       "case when dpd_new >= 60 and open_flag_36 = 1 then 1 else 0 end as 60_36_dpd_flag",
       "case when dpd_new >= 90 and open_flag_3 = 1 then 1 else 0 end as 90_3_dpd_flag",
       "case when dpd_new >= 90 and open_flag_12 = 1 then 1 else 0 end as 90_12_dpd_flag",
       "case when dpd_new >= 90 and open_flag_36 = 1 then 1 else 0 end as 90_36_dpd_flag",
       "case when dpd_new >= 180 and open_flag_3 = 1 then 1 else 0 end as 180_3_dpd_flag",
       "case when dpd_new >= 180 and open_flag_12 = 1 then 1 else 0 end as 180_12_dpd_flag",
       "case when dpd_new >= 180 and open_flag_36 = 1 then 1 else 0 end as 180_36_dpd_flag",
       "case when 30_3_dpd_flag = 1 and product_type = 'PL' then 1 else 0 end as 30_3_dpd_PL_flag",
       "case when 30_12_dpd_flag = 1 and product_type = 'PL' then 1 else 0 end as 30_12_dpd_PL_flag",
       "case when 30_36_dpd_flag = 1 and product_type = 'PL' then 1 else 0 end as 30_36_dpd_PL_flag",
       "case when 90_3_dpd_flag = 1 and product_type = 'PL' then 1 else 0 end as 90_3_dpd_PL_flag",
       "case when 90_12_dpd_flag = 1 and product_type = 'PL' then 1 else 0 end as 90_12_dpd_PL_flag",
       "case when 90_36_dpd_flag = 1 and product_type = 'PL' then 1 else 0 end as 90_36_dpd_PL_flag",
       "case when 180_3_dpd_flag = 1 and product_type = 'PL' then 1 else 0 end as 180_3_dpd_PL_flag",
       "case when 180_12_dpd_flag = 1 and product_type = 'PL' then 1 else 0 end as 180_12_dpd_PL_flag",
       "case when 180_36_dpd_flag = 1 and product_type = 'PL' then 1 else 0 end as 180_36_dpd_PL_flag",
       "case when 30_3_dpd_flag = 1 and product_type = 'CC' then 1 else 0 end as 30_3_dpd_CC_flag",
       "case when 30_12_dpd_flag = 1 and product_type = 'CC' then 1 else 0 end as 30_12_dpd_CC_flag",
       "case when 30_36_dpd_flag = 1 and product_type = 'CC' then 1 else 0 end as 30_36_dpd_CC_flag",
       "case when 90_3_dpd_flag = 1 and product_type = 'CC' then 1 else 0 end as 90_3_dpd_CC_flag",
       "case when 90_12_dpd_flag = 1 and product_type = 'CC' then 1 else 0 end as 90_12_dpd_CC_flag",
       "case when 90_36_dpd_flag = 1 and product_type = 'CC' then 1 else 0 end as 90_36_dpd_CC_flag",
       "case when 180_3_dpd_flag = 1 and product_type = 'CC' then 1 else 0 end as 180_3_dpd_CC_flag",
       "case when 180_12_dpd_flag = 1 and product_type = 'CC' then 1 else 0 end as 180_12_dpd_CC_flag",
       "case when 180_36_dpd_flag = 1 and product_type = 'CC' then 1 else 0 end as 180_36_dpd_CC_flag",
       "case when 30_3_dpd_flag = 1 and risk_type = 'Unsecured' then 1 else 0 end as 30_3_dpd_unsecured_flag",
       "case when 30_12_dpd_flag = 1 and risk_type = 'Unsecured' then 1 else 0 end as 30_12_dpd_unsecured_flag",
       "case when 30_36_dpd_flag = 1 and risk_type = 'Unsecured' then 1 else 0 end as 30_36_dpd_unsecured_flag",
       "case when 90_3_dpd_flag = 1 and risk_type = 'Unsecured' then 1 else 0 end as 90_3_dpd_unsecured_flag",
       "case when 90_12_dpd_flag = 1 and risk_type = 'Unsecured' then 1 else 0 end as 90_12_dpd_unsecured_flag",
       "case when 90_36_dpd_flag = 1 and risk_type = 'Unsecured' then 1 else 0 end as 90_36_dpd_unsecured_flag",
       "case when 180_3_dpd_flag = 1 and risk_type = 'Unsecured' then 1 else 0 end as 180_3_dpd_unsecured_flag",
       "case when 180_12_dpd_flag = 1 and risk_type = 'Unsecured' then 1 else 0 end as 180_12_dpd_unsecured_flag",
       "case when 180_36_dpd_flag = 1 and risk_type = 'Unsecured' then 1 else 0 end as 180_36_dpd_unsecured_flag",
       "case when open_flag_3 = 1 then dpd_new else 0 end as 3_dpd",
       "case when open_flag_12 = 1 then dpd_new else 0 end as 12_dpd",
       "case when open_flag_36 = 1 then dpd_new else 0 end as 36_dpd",
       "case when open_flag_3 = 1 and product_type = 'PL' then dpd_new else 0 end as 3_dpd_PL",
       "case when open_flag_12 = 1 and product_type = 'PL' then dpd_new else 0 end as 12_dpd_PL",
       "case when open_flag_36 = 1 and product_type = 'PL' then dpd_new else 0 end as 36_dpd_PL",
       "case when open_flag_3 = 1 and risk_type = 'Unsecured' then dpd_new else 0 end as 3_dpd_unsecured",
       "case when open_flag_12 = 1 and risk_type = 'Unsecured' then dpd_new else 0 end as 12_dpd_unsecured",
       "case when open_flag_36 = 1 and risk_type = 'Unsecured' then dpd_new else 0 end as 36_dpd_unsecured"             
      ]

In [14]:
tradeline = tradeline.selectExpr(exp_tradeline)

In [15]:
df_tradeline = tradeline.groupBy("FB_TRANSACTION_ID").agg(
    F.max("vintage_bureau").alias("max_vintage_bureau"),
    F.first("first_product").alias("first_product"),
    F.first("latest_product").alias("latest_product"),
    F.count("FB_TRANSACTION_ID").alias("total_nbr_trades"),
    F.sum("open_flag_1").alias("total_trades_in_last1"),
    F.sum("open_flag_3").alias("total_trades_in_last3"),
    F.sum("open_flag_6").alias("total_trades_in_last6"),
    F.sum("open_flag_12").alias("total_trades_in_last12"),
    F.sum("HL_flag").alias("total_HL_trades"),
    F.sum("BL_flag").alias("total_BL_trades"),
    F.sum("PL_flag").alias("total_PL_trades"),
    F.sum("PL_live_flag").alias("total_live_PL_trades"),
    F.sum("PL_open_flag_1").alias("total_PL_trades_in_last1"),
    F.sum("PL_open_flag_3").alias("total_PL_trades_in_last3"),
    F.sum("PL_open_flag_6").alias("total_PL_trades_in_last6"),
    F.sum("PL_open_flag_12").alias("total_PL_trades_in_last12"),
    F.sum("CC_flag").alias("total_CC_trades"),
    F.sum("CC_open_flag_1").alias("total_CC_trades_in_last1"),
    F.sum("CC_open_flag_3").alias("total_CC_trades_in_last3"),
    F.sum("CC_open_flag_6").alias("total_CC_trades_in_last6"),
    F.sum("CC_open_flag_12").alias("total_CC_trades_in_last12"),
    F.sum("Secured_flag").alias("total_Secured_trades"),
    F.sum("Unsecured_flag").alias("total_Unsecured_trades"),
    F.sum("Unsecured_open_flag_1").alias("total_Unsecured_trades_in_last1"),
    F.sum("Unsecured_open_flag_3").alias("total_Unsecured_trades_in_last3"),
    F.sum("Unsecured_open_flag_6").alias("total_Unsecured_trades_in_last6"),
    F.sum("Unsecured_open_flag_12").alias("total_Unsecured_trades_in_last12"),
    F.sum("installment_flag").alias("total_installment_trades"),
    F.sum("M0_2_live_flag").alias("total_live_trades_M0_2"),
    F.sum("M0_2_unsecured_live_flag").alias("total_unsecured_live_trades_M0_2"),
    F.sum("M0_2_installment_live_flag").alias("total_installment_live_trades_M0_2"),
    F.abs(F.sum("M0_2_live_flag") - F.sum("M6_8_live_flag")).alias("diff_live_trades_M02_M68"),
    F.abs(F.sum("M0_2_unsecured_live_flag") - F.sum("M6_8_unsecured_live_flag")).alias("diff_unsecured_live_trades_M02_M68"),
    F.sum("CURRENT_BALANCE").alias("total_balance"),
    F.sum("PL_balance").alias("total_PL_balance"),
    F.sum("BL_balance").alias("total_BL_balance"),
    F.sum("CD_balance").alias("total_CD_balance"),
    F.sum("AL_balance").alias("total_AL_balance"),
    F.sum("HL_balance").alias("total_HL_balance"),
    F.sum("CC_balance").alias("total_CC_balance"),
    F.sum("HIGHEST_CREDIT_OR_LOAN_AMOUNT").alias("total_sanctioned_amount"),
    F.sum("PL_sanctioned_amount").alias("total_sanctioned_PL_amount"),
    F.sum("BL_sanctioned_amount").alias("total_sanctioned_BL_amount"),
    F.sum("CD_sanctioned_amount").alias("total_sanctioned_CD_amount"),
    F.sum("AL_sanctioned_amount").alias("total_sanctioned_AL_amount"),
    F.sum("HL_sanctioned_amount").alias("total_sanctioned_HL_amount"),
    F.sum("CC_sanctioned_amount").alias("total_sanctioned_CC_amount"),
    F.sum("AMOUNT_OVERDUE").alias("total_past_due_amount"),
    F.sum("writtenoff_suit_flag").alias("total_writtenoff_suit_accounts"),
    F.sum("writtenoff_suit_last36_flag").alias("total_writtenoff_suit_accounts_last36"),
    F.avg("days_open_last6").alias("avg_days_open_loans_last6"),
    F.avg("days_open_last12").alias("avg_days_open_loans_last12"),
    F.avg("days_open_last36").alias("avg_days_open_loans_last36"),
    F.median("days_open_last6").alias("median_days_open_loans_last6"),
    F.median("days_open_last12").alias("median_days_open_loans_last12"),
    F.median("days_open_last36").alias("median_days_open_loans_last36"),
    F.avg("days_unsecured_open_last6").alias("avg_days_unsecured_open_loans_last6"),
    F.avg("days_unsecured_open_last12").alias("avg_days_unsecured_open_loans_last12"),
    F.avg("days_unsecured_open_last36").alias("avg_days_unsecured_open_loans_last36"),
    F.median("days_unsecured_open_last6").alias("median_days_unsecured_open_loans_last6"),
    F.median("days_unsecured_open_last12").alias("median_days_unsecured_open_loans_last12"),
    F.median("days_unsecured_open_last36").alias("median_days_unsecured_open_loans_last36"),
    F.avg("days_PL_open_last6").alias("avg_days_PL_open_loans_last6"),
    F.avg("days_PL_open_last12").alias("avg_days_PL_open_loans_last12"),
    F.avg("days_PL_open_last36").alias("avg_days_PL_open_loans_last36"),
    F.median("days_PL_open_last6").alias("median_days_PL_open_loans_last6"),
    F.median("days_PL_open_last12").alias("median_days_PL_open_loans_last12"),
    F.median("days_PL_open_last36").alias("median_days_PL_open_loans_last36"),
    F.avg("PL_vintage_last36").alias("avg_PL_vintage_last36"),
    F.avg("BL_vintage_last36").alias("avg_BL_vintage_last36"),
    F.avg("CC_vintage_last36").alias("avg_CC_vintage_last36"),
    F.avg("CD_vintage_last36").alias("avg_CD_vintage_last36"),
    F.abs(F.sum("CURRENT_BALANCE")/F.sum("CREDIT_LIMIT")).alias("avg_CC_micro"),
    F.abs(F.sum("CC_util")/F.sum("CC_flag")).alias("avg_CC_macro"),
    F.sum("0_3_dpd_flag").alias("total_0_3_dpd"),
    F.sum("0_12_dpd_flag").alias("total_0_12_dpd"),
    F.sum("0_36_dpd_flag").alias("total_0_36_dpd"),
    F.sum("1_3_dpd_flag").alias("total_1_3_dpd"),
    F.sum("1_12_dpd_flag").alias("total_1_12_dpd"),
    F.sum("1_36_dpd_flag").alias("total_1_36_dpd"),
    F.sum("30_3_dpd_flag").alias("total_30_3_dpd"),
    F.sum("30_12_dpd_flag").alias("total_30_12_dpd"),
    F.sum("30_36_dpd_flag").alias("total_30_36_dpd"),
    F.sum("60_3_dpd_flag").alias("total_60_3_dpd"),
    F.sum("60_12_dpd_flag").alias("total_60_12_dpd"),
    F.sum("60_36_dpd_flag").alias("total_60_36_dpd"),
    F.sum("90_3_dpd_flag").alias("total_90_3_dpd"),
    F.sum("90_12_dpd_flag").alias("total_90_12_dpd"),
    F.sum("90_36_dpd_flag").alias("total_90_36_dpd"),
    F.sum("180_3_dpd_flag").alias("total_180_3_dpd"),
    F.sum("180_12_dpd_flag").alias("total_180_12_dpd"),
    F.sum("180_36_dpd_flag").alias("total_180_36_dpd"),
    F.sum("30_3_dpd_PL_flag").alias("total_30_3_dpd_PL"),
    F.sum("30_12_dpd_PL_flag").alias("total_30_12_dpd_PL"),
    F.sum("30_36_dpd_PL_flag").alias("total_30_36_dpd_PL"),
    F.sum("90_3_dpd_PL_flag").alias("total_90_3_dpd_PL"),
    F.sum("90_12_dpd_PL_flag").alias("total_90_12_dpd_PL"),
    F.sum("90_36_dpd_PL_flag").alias("total_90_36_dpd_PL"),
    F.sum("180_3_dpd_PL_flag").alias("total_180_3_dpd_PL"),
    F.sum("180_12_dpd_PL_flag").alias("total_180_12_dpd_PL"),
    F.sum("180_36_dpd_PL_flag").alias("total_180_36_dpd_PL"),
    F.sum("30_3_dpd_CC_flag").alias("total_30_3_dpd_CC"),
    F.sum("30_12_dpd_CC_flag").alias("total_30_12_dpd_CC"),
    F.sum("30_36_dpd_CC_flag").alias("total_30_36_dpd_CC"),
    F.sum("90_3_dpd_CC_flag").alias("total_90_3_dpd_CC"),
    F.sum("90_12_dpd_CC_flag").alias("total_90_12_dpd_CC"),
    F.sum("90_36_dpd_CC_flag").alias("total_90_36_dpd_CC"),
    F.sum("180_3_dpd_CC_flag").alias("total_180_3_dpd_CC"),
    F.sum("180_12_dpd_CC_flag").alias("total_180_12_dpd_CC"),
    F.sum("180_36_dpd_CC_flag").alias("total_180_36_dpd_CC"),
    F.sum("30_3_dpd_unsecured_flag").alias("total_30_3_dpd_unsecured"),
    F.sum("30_12_dpd_unsecured_flag").alias("total_30_12_dpd_unsecured"),
    F.sum("30_36_dpd_unsecured_flag").alias("total_30_36_dpd_unsecured"),
    F.sum("90_3_dpd_unsecured_flag").alias("total_90_3_dpd_unsecured"),
    F.sum("90_12_dpd_unsecured_flag").alias("total_90_12_dpd_unsecured"),
    F.sum("90_36_dpd_unsecured_flag").alias("total_90_36_dpd_unsecured"),
    F.sum("180_3_dpd_unsecured_flag").alias("total_180_3_dpd_unsecured"),
    F.sum("180_12_dpd_unsecured_flag").alias("total_180_12_dpd_unsecured"),
    F.sum("180_36_dpd_unsecured_flag").alias("total_180_36_dpd_unsecured"),
    F.max("3_dpd").alias("max_3_dpd"),
    F.max("12_dpd").alias("max_12_dpd"),
    F.max("36_dpd").alias("max_36_dpd"),
    F.max("3_dpd_PL").alias("max_3_dpd_PL"),
    F.max("12_dpd_PL").alias("max_12_dpd_PL"),
    F.max("36_dpd_PL").alias("max_36_dpd_PL"),
    F.max("3_dpd_unsecured").alias("max_3_dpd_unsecured"),
    F.max("12_dpd_unsecured").alias("max_12_dpd_unsecured"),
    F.max("36_dpd_unsecured").alias("max_36_dpd_unsecured")
    
)
df_tradeline.show(truncate=False, n=1)

[Stage 41:>                                                         (0 + 1) / 1]

+--------------------+------------------+-------------+--------------+----------------+---------------------+---------------------+---------------------+----------------------+---------------+---------------+---------------+--------------------+------------------------+------------------------+------------------------+-------------------------+---------------+------------------------+------------------------+------------------------+-------------------------+--------------------+----------------------+-------------------------------+-------------------------------+-------------------------------+--------------------------------+------------------------+----------------------+--------------------------------+----------------------------------+------------------------+----------------------------------+-------------+----------------+----------------+----------------+----------------+----------------+----------------+-----------------------+--------------------------+---------------------

                                                                                

In [30]:
df_tradeline.select('FB_TRANSACTION_ID','first_product', 'latest_product').show()

[Stage 102:>                                                        (0 + 1) / 1]

+--------------------+-------------+--------------+
|   FB_TRANSACTION_ID|first_product|latest_product|
+--------------------+-------------+--------------+
|FAST1167168558886728|           10|            10|
|FAST1167184128342981|           31|            31|
|FAST1167376146175796|            6|             6|
|FAST1167488592897943|            5|             5|
|FAST1167527658135697|            7|             7|
|FAST1167547366918513|            5|             5|
|FAST1167567882284581|            6|             6|
|FAST1167575478888941|            5|             5|
|FAST1167598348139538|            6|             6|
|FAST1167599381961472|            5|             5|
|FAST1167623961447289|            5|             5|
|FAST1167678589554798|            5|             5|
|FAST1167681766742791|            6|             6|
|FAST1167788445759386|            5|             5|
|FAST1167815891624216|            6|             6|
|FAST1167829774884827|            5|             5|
|FAST1167834

                                                                                

## Inquiry Features

In [16]:
w1 = Window.partitionBy("FB_TRANSACTION_ID", "DATE_OF_ENQUIRY", "ACCOUNT_TYPE_CODE").orderBy(F.desc('ENQUIRY_AMOUNT'))
w2 = Window.partitionBy("FB_TRANSACTION_ID").orderBy(F.asc("DATE_OF_ENQUIRY"))
df_inq1 =  df_inq.withColumn('lagged_enq_dates', F.lag('DATE_OF_ENQUIRY').over(w2))\
                 .withColumn('row_no', F.row_number().over(w1)).filter("row_no = 1")


In [17]:
df_inq1.show(2)



+--------------------+-----------------------+-------------------+---------------+-----------------+--------------+-------------------+----------------+------+
|   FB_TRANSACTION_ID|BUREAU_REFERENCE_NUMBER|ENQUIRY_MEMBER_NAME|DATE_OF_ENQUIRY|ACCOUNT_TYPE_CODE|ENQUIRY_AMOUNT|         CREATED_AT|lagged_enq_dates|row_no|
+--------------------+-----------------------+-------------------+---------------+-----------------+--------------+-------------------+----------------+------+
|FAST1167168558886728|   BURE1176451823136D53|      NOT DISCLOSED|     2014-07-29|               10|       10000.0|2022-12-18 12:01:44|            NULL|     1|
|FAST1167168558886728|   BURE1176451823136D53|      NOT DISCLOSED|     2015-04-27|               10|       50000.0|2022-12-18 12:01:44|      2014-07-29|     1|
+--------------------+-----------------------+-------------------+---------------+-----------------+--------------+-------------------+----------------+------+
only showing top 2 rows



                                                                                

In [18]:
df_inq_trunc = df_inq1.withColumn("DATE_OF_ENQUIRY_TRUNC", F.trunc(F.col("DATE_OF_ENQUIRY"), "month"))\
                     .withColumn("CREATED_AT_TRUNC", F.trunc(F.col("CREATED_AT"), "month"))

In [19]:
inquiry = df_inq_trunc.withColumn("month_diff", F.months_between(F.col("CREATED_AT_TRUNC"), F.col("DATE_OF_ENQUIRY_TRUNC")))\
                      .withColumn("days_diff", F.datediff(F.col("DATE_OF_ENQUIRY"), F.col("lagged_enq_dates")))\
                      .withColumn('ACCOUNT_TYPE_CODE', F.col('ACCOUNT_TYPE_CODE').cast('int'))
inquiry.show(2)



+--------------------+-----------------------+-------------------+---------------+-----------------+--------------+-------------------+----------------+------+---------------------+----------------+----------+---------+
|   FB_TRANSACTION_ID|BUREAU_REFERENCE_NUMBER|ENQUIRY_MEMBER_NAME|DATE_OF_ENQUIRY|ACCOUNT_TYPE_CODE|ENQUIRY_AMOUNT|         CREATED_AT|lagged_enq_dates|row_no|DATE_OF_ENQUIRY_TRUNC|CREATED_AT_TRUNC|month_diff|days_diff|
+--------------------+-----------------------+-------------------+---------------+-----------------+--------------+-------------------+----------------+------+---------------------+----------------+----------+---------+
|FAST1167168558886728|   BURE1176451823136D53|      NOT DISCLOSED|     2014-07-29|               10|       10000.0|2022-12-18 12:01:44|            NULL|     1|           2014-07-01|      2022-12-01|     101.0|     NULL|
|FAST1167168558886728|   BURE1176451823136D53|      NOT DISCLOSED|     2015-04-27|               10|       50000.0|2022-

                                                                                

In [20]:
exp_inq = ["*",
           "case when month_diff >= 0 and month_diff < 1 then 1 else 0 end as M1_flag",
           "case when month_diff >= 0 and month_diff < 3 then 1 else 0 end as M3_flag",
           "case when month_diff >= 0 and month_diff < 12 then 1 else 0 end as M12_flag",
           "case when month_diff >= 0 and month_diff < 36 then 1 else 0 end as M36_flag",
           """case when account_type_code in (1, 51, 53, 54, 52, 50, 59, 55, 56, 57, 17, 33, 23, 7, 2, 15, 4, 40, 42, 43, 41, 44, 3, 31, 34, 13, 32) then 'Secured'
                   when account_type_code in (61, 6, 35, 10, 8, 16, 24, 36, 37, 9, 39, 14, 0, 12, 45, 5, 38) then 'Unsecured'
                   else null end as risk_type
                   """,
           "case when M1_flag = 1 and risk_type = 'Unsecured' then 1 else 0 end as Unsecured_M1_flag",
           "case when M3_flag = 1 and risk_type = 'Unsecured' then 1 else 0 end as Unsecured_M3_flag",
           "case when M12_flag = 1 and risk_type = 'Unsecured' then 1 else 0 end as Unsecured_M12_flag",
           "case when M36_flag = 1 and risk_type = 'Unsecured' then 1 else 0 end as Unsecured_M36_flag",
           """case when account_type_code in (1, 34, 32) then 'AL'
                   when account_type_code in  (51, 53, 54, 52, 50, 61, 59, 55, 57, 56, 23, 24, 40, 39) then 'BL'
                   when account_type_code in (35, 10, 16, 36, 15, 31) then 'CC'
                   when account_type_code = 6 then 'CD'
                   when account_type_code = 33 then 'CE'
                   when account_type_code = 17 then 'CV'
                   when account_type_code = 8 then 'EL'
                   when account_type_code = 7 then 'GL'
                   when account_type_code in (2, 42, 44) then 'HL'
                   when account_type_code = 3 then 'LAP'
                   when account_type_code = 4 then 'LAS'
                   when account_type_code in (12, 38) then 'OD'
                   when account_type_code in (14, 0) then 'Other'
                   when account_type_code in (37, 9, 43, 41, 45, 5) then 'PL'
                   when account_type_code = 13 then 'TW'
                   else null end as product_type
                   """,
           "case when M1_flag = 1 and product_type = 'PL' then 1 else 0 end as PL_M1_flag",
           "case when M3_flag = 1 and product_type = 'PL' then 1 else 0 end as PL_M3_flag",
           "case when M12_flag = 1 and product_type = 'PL' then 1 else 0 end as PL_M12_flag",
           "case when M36_flag = 1 and product_type = 'PL' then 1 else 0 end as PL_M36_flag",
           "case when enquiry_amount >= 10000 and M1_flag = 1 then 1 else 0 end as 10_1_flag",
           "case when enquiry_amount >= 10000 and M3_flag = 1 then 1 else 0 end as 10_3_flag",
           "case when enquiry_amount >= 10000 and M12_flag = 1 then 1 else 0 end as 10_12_flag",
           "case when enquiry_amount >= 10000 and M36_flag = 1 then 1 else 0 end as 10_36_flag",
           "case when enquiry_amount >= 50000 and M1_flag = 1 then 1 else 0 end as 50_1_flag",
           "case when enquiry_amount >= 50000 and M3_flag = 1 then 1 else 0 end as 50_3_flag",
           "case when enquiry_amount >= 50000 and M12_flag = 1 then 1 else 0 end as 50_12_flag",
           "case when enquiry_amount >= 50000 and M36_flag = 1 then 1 else 0 end as 50_36_flag",
           "case when enquiry_amount >= 100000 and M1_flag = 1 then 1 else 0 end as 100_1_flag",
           "case when enquiry_amount >= 100000 and M3_flag = 1 then 1 else 0 end as 100_3_flag",
           "case when enquiry_amount >= 100000 and M12_flag = 1 then 1 else 0 end as 100_12_flag",
           "case when enquiry_amount >= 100000 and M36_flag = 1 then 1 else 0 end as 100_36_flag",
           "case when enquiry_amount >= 500000 and M1_flag = 1 then 1 else 0 end as 500_1_flag",
           "case when enquiry_amount >= 500000 and M3_flag = 1 then 1 else 0 end as 500_3_flag",
           "case when enquiry_amount >= 500000 and M12_flag = 1 then 1 else 0 end as 500_12_flag",
           "case when enquiry_amount >= 500000 and M36_flag = 1 then 1 else 0 end as 500_36_flag",
           "case when enquiry_amount >= 10000 and product_type = 'PL' and M1_flag = 1 then 1 else 0 end as PL_10_1_flag",
           "case when enquiry_amount >= 10000 and product_type = 'PL' and M3_flag = 1 then 1 else 0 end as PL_10_3_flag",
           "case when enquiry_amount >= 10000 and product_type = 'PL' and M12_flag = 1 then 1 else 0 end as PL_10_12_flag",
           "case when enquiry_amount >= 10000 and product_type = 'PL' and M36_flag = 1 then 1 else 0 end as PL_10_36_flag",
           "case when enquiry_amount >= 50000 and product_type = 'PL' and M1_flag = 1 then 1 else 0 end as PL_50_1_flag",
           "case when enquiry_amount >= 50000 and product_type = 'PL' and M3_flag = 1 then 1 else 0 end as PL_50_3_flag",
           "case when enquiry_amount >= 50000 and product_type = 'PL' and M12_flag = 1 then 1 else 0 end as PL_50_12_flag",
           "case when enquiry_amount >= 50000 and product_type = 'PL' and M36_flag = 1 then 1 else 0 end as PL_50_36_flag",
           "case when enquiry_amount >= 100000 and product_type = 'PL' and M1_flag = 1 then 1 else 0 end as PL_100_1_flag",
           "case when enquiry_amount >= 100000 and product_type = 'PL' and M3_flag = 1 then 1 else 0 end as PL_100_3_flag",
           "case when enquiry_amount >= 100000 and product_type = 'PL' and M12_flag = 1 then 1 else 0 end as PL_100_12_flag",
           "case when enquiry_amount >= 100000 and product_type = 'PL' and M36_flag = 1 then 1 else 0 end as PL_100_36_flag",
           "case when enquiry_amount >= 500000 and product_type = 'PL' and M1_flag = 1 then 1 else 0 end as PL_500_1_flag",
           "case when enquiry_amount >= 500000 and product_type = 'PL' and M3_flag = 1 then 1 else 0 end as PL_500_3_flag",
           "case when enquiry_amount >= 500000 and product_type = 'PL' and M12_flag = 1 then 1 else 0 end as PL_500_12_flag",
           "case when enquiry_amount >= 500000 and product_type = 'PL' and M36_flag = 1 then 1 else 0 end as PL_500_36_flag",
           "case when enquiry_amount >= 10000 and product_type = 'CD' and M1_flag = 1 then 1 else 0 end as CD_10_1_flag",
           "case when enquiry_amount >= 10000 and product_type = 'CD' and M3_flag = 1 then 1 else 0 end as CD_10_3_flag",
           "case when enquiry_amount >= 10000 and product_type = 'CD' and M12_flag = 1 then 1 else 0 end as CD_10_12_flag",
           "case when enquiry_amount >= 10000 and product_type = 'CD' and M36_flag = 1 then 1 else 0 end as CD_10_36_flag",
           "case when enquiry_amount >= 50000 and product_type = 'CD' and M1_flag = 1 then 1 else 0 end as CD_50_1_flag",
           "case when enquiry_amount >= 50000 and product_type = 'CD' and M3_flag = 1 then 1 else 0 end as CD_50_3_flag",
           "case when enquiry_amount >= 50000 and product_type = 'CD' and M12_flag = 1 then 1 else 0 end as CD_50_12_flag",
           "case when enquiry_amount >= 50000 and product_type = 'CD' and M36_flag = 1 then 1 else 0 end as CD_50_36_flag",
           "case when enquiry_amount >= 10000 and risk_type = 'Unsecured' and M1_flag = 1 then 1 else 0 end as Unsecured_10_1_flag",
           "case when enquiry_amount >= 10000 and risk_type = 'Unsecured' and M3_flag = 1 then 1 else 0 end as Unsecured_10_3_flag",
           "case when enquiry_amount >= 10000 and risk_type = 'Unsecured' and M12_flag = 1 then 1 else 0 end as Unsecured_10_12_flag",
           "case when enquiry_amount >= 10000 and risk_type = 'Unsecured' and M36_flag = 1 then 1 else 0 end as Unsecured_10_36_flag",
           "case when enquiry_amount >= 50000 and risk_type = 'Unsecured' and M1_flag = 1 then 1 else 0 end as Unsecured_50_1_flag",
           "case when enquiry_amount >= 50000 and risk_type = 'Unsecured' and M3_flag = 1 then 1 else 0 end as Unsecured_50_3_flag",
           "case when enquiry_amount >= 50000 and risk_type = 'Unsecured' and M12_flag = 1 then 1 else 0 end as Unsecured_50_12_flag",
           "case when enquiry_amount >= 50000 and risk_type = 'Unsecured' and M36_flag = 1 then 1 else 0 end as Unsecured_50_36_flag",
           "case when enquiry_amount >= 100000 and risk_type = 'Unsecured' and M1_flag = 1 then 1 else 0 end as Unsecured_100_1_flag",
           "case when enquiry_amount >= 100000 and risk_type = 'Unsecured' and M3_flag = 1 then 1 else 0 end as Unsecured_100_3_flag",
           "case when enquiry_amount >= 100000 and risk_type = 'Unsecured' and M12_flag = 1 then 1 else 0 end as Unsecured_100_12_flag",
           "case when enquiry_amount >= 100000 and risk_type = 'Unsecured' and M36_flag = 1 then 1 else 0 end as Unsecured_100_36_flag",
           "case when enquiry_amount >= 500000 and risk_type = 'Unsecured' and M1_flag = 1 then 1 else 0 end as Unsecured_500_1_flag",
           "case when enquiry_amount >= 500000 and risk_type = 'Unsecured' and M3_flag = 1 then 1 else 0 end as Unsecured_500_3_flag",
           "case when enquiry_amount >= 500000 and risk_type = 'Unsecured' and M12_flag = 1 then 1 else 0 end as Unsecured_500_12_flag",
           "case when enquiry_amount >= 500000 and risk_type = 'Unsecured' and M36_flag = 1 then 1 else 0 end as Unsecured_500_36_flag",
           "case when M3_flag = 1 then days_diff else 0 end as M3_days",
           "case when M12_flag = 1 then days_diff else 0 end as M12_days",
           "case when M36_flag = 1 then days_diff else 0 end as M36_days",
           "case when Unsecured_M3_flag = 1 then days_diff else 0 end as M3_unsecured_days",
           "case when Unsecured_M12_flag = 1 then days_diff else 0 end as M12_unsecured_days",
           "case when Unsecured_M36_flag = 1 then days_diff else 0 end as M36_unsecured_days",
           "case when PL_M3_flag = 1 then days_diff else 0 end as M3_PL_days",
           "case when PL_M12_flag = 1 then days_diff else 0 end as M12_PL_days",
           "case when PL_M36_flag = 1 then days_diff else 0 end as M36_PL_days",
          ]

In [21]:
inquiry = inquiry.selectExpr(exp_inq)

In [22]:
inquiry_df = inquiry.groupBy("FB_TRANSACTION_ID").agg(
                    F.max("month_diff").alias("oldest_inq_month"),
                    F.sum("M1_flag").alias("total_inq_last1"),
                    F.sum("M3_flag").alias("total_inq_last3"),
                    F.sum("M12_flag").alias("total_inq_last12"),
                    F.sum("M36_flag").alias("total_inq_last36"),
                    F.sum("Unsecured_M1_flag").alias("total_unsecured_inq_last1"),
                    F.sum("Unsecured_M3_flag").alias("total_unsecured_inq_last3"),
                    F.sum("Unsecured_M12_flag").alias("total_unsecured_inq_last12"),
                    F.sum("Unsecured_M36_flag").alias("total_unsecured_inq_last36"),
                    F.sum("PL_M1_flag").alias("total_PL_inq_last1"),
                    F.sum("PL_M3_flag").alias("total_PL_inq_last3"),
                    F.sum("PL_M12_flag").alias("total_PL_inq_last12"),
                    F.sum("PL_M36_flag").alias("total_PL_inq_last36"),
                    F.sum("10_1_flag").alias("toal_inq_10_1"),
                    F.sum("10_3_flag").alias("toal_inq_10_3"),
                    F.sum("10_12_flag").alias("toal_inq_10_12"),
                    F.sum("10_36_flag").alias("toal_inq_10_36"),
                    F.sum("50_1_flag").alias("toal_inq_50_1"),
                    F.sum("50_3_flag").alias("toal_inq_50_3"),
                    F.sum("50_12_flag").alias("toal_inq_50_12"),
                    F.sum("50_36_flag").alias("toal_inq_50_36"),
                    F.sum("100_1_flag").alias("toal_inq_100_1"),
                    F.sum("100_3_flag").alias("toal_inq_100_3"),
                    F.sum("100_12_flag").alias("toal_inq_100_12"),
                    F.sum("100_36_flag").alias("toal_inq_100_36"),
                    F.sum("500_1_flag").alias("toal_inq_500_1"),
                    F.sum("500_3_flag").alias("toal_inq_500_3"),
                    F.sum("500_12_flag").alias("toal_inq_500_12"),
                    F.sum("500_36_flag").alias("toal_inq_500_36"),
                    F.sum("PL_10_1_flag").alias("toal_PL_inq_10_1"),
                    F.sum("PL_10_3_flag").alias("toal_PL_inq_10_3"),
                    F.sum("PL_10_12_flag").alias("toal_PL_inq_10_12"),
                    F.sum("PL_10_36_flag").alias("toal_PL_inq_10_36"),
                    F.sum("PL_50_1_flag").alias("toal_PL_inq_50_1"),
                    F.sum("PL_50_3_flag").alias("toal_PL_inq_50_3"),
                    F.sum("PL_50_12_flag").alias("toal_PL_inq_50_12"),
                    F.sum("PL_50_36_flag").alias("toal_PL_inq_50_36"),
                    F.sum("PL_100_1_flag").alias("toal_PL_inq_100_1"),
                    F.sum("PL_100_3_flag").alias("toal_PL_inq_100_3"),
                    F.sum("PL_100_12_flag").alias("toal_PL_inq_100_12"),
                    F.sum("PL_100_36_flag").alias("toal_PL_inq_100_36"),
                    F.sum("PL_500_1_flag").alias("toal_PL_inq_500_1"),
                    F.sum("PL_500_3_flag").alias("toal_PL_inq_500_3"),
                    F.sum("PL_500_12_flag").alias("toal_PL_inq_500_12"),
                    F.sum("PL_500_36_flag").alias("toal_PL_inq_500_36"),
                    F.sum("CD_10_1_flag").alias("toal_CD_inq_10_1"),
                    F.sum("CD_10_3_flag").alias("toal_CD_inq_10_3"),
                    F.sum("CD_10_12_flag").alias("toal_CD_inq_10_12"),
                    F.sum("CD_10_36_flag").alias("toal_CD_inq_10_36"),
                    F.sum("CD_50_1_flag").alias("toal_CD_inq_50_1"),
                    F.sum("CD_50_3_flag").alias("toal_CD_inq_50_3"),
                    F.sum("CD_50_12_flag").alias("toal_CD_inq_50_12"),
                    F.sum("CD_50_36_flag").alias("toal_CD_inq_50_36"),
                    F.sum("Unsecured_10_1_flag").alias("toal_unsecured_inq_10_1"),
                    F.sum("Unsecured_10_3_flag").alias("toal_unsecured_inq_10_3"),
                    F.sum("Unsecured_10_12_flag").alias("toal_unsecured_inq_10_12"),
                    F.sum("Unsecured_10_36_flag").alias("toal_unsecured_inq_10_36"),
                    F.sum("Unsecured_50_1_flag").alias("toal_unsecured_inq_50_1"),
                    F.sum("Unsecured_50_3_flag").alias("toal_unsecured_inq_50_3"),
                    F.sum("Unsecured_50_12_flag").alias("toal_unsecured_inq_50_12"),
                    F.sum("Unsecured_50_36_flag").alias("toal_unsecured_inq_50_36"),
                    F.sum("Unsecured_100_1_flag").alias("toal_unsecured_inq_100_1"),
                    F.sum("Unsecured_100_3_flag").alias("toal_unsecured_inq_100_3"),
                    F.sum("Unsecured_100_12_flag").alias("toal_unsecured_inq_100_12"),
                    F.sum("Unsecured_100_36_flag").alias("toal_unsecured_inq_100_36"),
                    F.sum("Unsecured_500_1_flag").alias("toal_unsecured_inq_500_1"),
                    F.sum("Unsecured_500_3_flag").alias("toal_unsecured_inq_500_3"),
                    F.sum("Unsecured_500_12_flag").alias("toal_unsecured_inq_500_12"),
                    F.sum("Unsecured_500_36_flag").alias("toal_unsecured_inq_500_36"),
                    F.avg("M3_days").alias("avg_M3_days"),
                    F.avg("M12_days").alias("avg_M12_days"),
                    F.avg("M36_days").alias("avg_M36_days"),
                    F.avg("M3_unsecured_days").alias("avg_M3_unsecured_days"),
                    F.avg("M12_unsecured_days").alias("avg_M12_unsecured_days"),
                    F.avg("M36_unsecured_days").alias("avg_M36_unsecured_days"),
                    F.avg("M3_PL_days").alias("avg_M3_PL_days"),
                    F.avg("M12_PL_days").alias("avg_M12_PL_days"),
                    F.avg("M36_PL_days").alias("avg_M36_PL_days")              
)

### Merging df_tradeline and inquiry_df

In [23]:
all_features = df_tradeline.join(inquiry_df, on = "FB_TRANSACTION_ID", how = "left")

In [24]:
final_df = all_features.join(target_df, on = "FB_TRANSACTION_ID", how = "left")

In [25]:
final_df.show(1)

[Stage 61:>                                                         (0 + 1) / 1]

+--------------------+------------------+-------------+--------------+----------------+---------------------+---------------------+---------------------+----------------------+---------------+---------------+---------------+--------------------+------------------------+------------------------+------------------------+-------------------------+---------------+------------------------+------------------------+------------------------+-------------------------+--------------------+----------------------+-------------------------------+-------------------------------+-------------------------------+--------------------------------+------------------------+----------------------+--------------------------------+----------------------------------+------------------------+----------------------------------+-------------+----------------+----------------+----------------+----------------+----------------+----------------+-----------------------+--------------------------+---------------------

                                                                                

In [26]:
# final_df.write.csv("final_features.csv", header=True, mode="overwrite")
# final_df_pandas = final_df.toPandas()
# final_df_pandas.to_csv('final_features.csv', header = True)

                                                                                