In [1]:
from utils import (
     create_spark_session, load_config, custom_read_parquet,
)

from plot_utils import (
    count_nulls, custom_to_timestamp,
    plot_column_distribution, 
    plot_aggregated_by_time, 
    categorize,
    plot_analysis_of_loan_recovery,
    plot_tops,
    categorize_time_difference
)

from pyspark.sql.functions import sum, col

In [2]:
spark = create_spark_session()
config = load_config()
month_index = 1
month = config["months"][month_index]
next_month = config["months"][month_index+1]

24/08/31 17:16:23 WARN Utils: Your hostname, sajjad-Legion-5-15ACH6 resolves to a loopback address: 127.0.1.1; using 10.218.52.77 instead (on interface eno1)
24/08/31 17:16:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/31 17:16:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# assign

In [3]:
assign = custom_read_parquet(spark=spark, config=config, key="loan_assign", month=month)
assign = custom_to_timestamp(df=assign, col_name="date_timestamp")
assign.show()

+--------------------+--------+-------------------+--------------------+---------------+-----------+-------------------+
|              bib_id|date_key|            fake_id|            nid_hash|        loan_id|loan_amount|     date_timestamp|
+--------------------+--------+-------------------+--------------------+---------------+-----------+-------------------+
|C6117AC7815F23E83...|20240615|f-9801-000113957498|32A8F47494EA49EAD...|406153931232002|    10000.0|2024-06-15 10:20:21|
|C6117AC7815F23E83...|20240522|f-9801-000113957498|32A8F47494EA49EAD...|405223859051377|    10000.0|2024-05-22 06:50:04|
|C6117AC7815F23E83...|20240601|f-9801-000113957498|32A8F47494EA49EAD...|406013890917789|    10000.0|2024-06-01 22:41:10|
|C6117AC7815F23E83...|20240523|f-9801-000113957498|32A8F47494EA49EAD...|405233864172865|    20000.0|2024-05-23 21:25:20|
|C6117AC7815F23E83...|20240614|f-9801-000113957498|32A8F47494EA49EAD...|406143929540794|    10000.0|2024-06-14 18:46:05|
|C6117AC7815F23E83...|20240614|f

In [4]:
assign.printSchema()

root
 |-- bib_id: string (nullable = true)
 |-- date_key: string (nullable = true)
 |-- fake_id: string (nullable = true)
 |-- nid_hash: string (nullable = true)
 |-- loan_id: string (nullable = true)
 |-- loan_amount: double (nullable = true)
 |-- date_timestamp: timestamp (nullable = true)



In [5]:
for col_name in assign.columns:
    print(f"number of null value in column {col_name} : {count_nulls(df = assign, col_name=col_name)}")

number of null value in column bib_id : 0
number of null value in column date_key : 0
number of null value in column fake_id : 420
number of null value in column nid_hash : 0
number of null value in column loan_id : 0
number of null value in column loan_amount : 0
number of null value in column date_timestamp : 0


                                                                                

In [6]:
assign = categorize(
df=assign, 
col_name="loan_amount",
bins=[float('-inf'), 5000, 10000, 20000, 50000, 100000, float('inf')],
labels=["VeryLow", "5T-10T", "10T-20T", "20T-50T", "50T-100T", "Extreme"],
new_column_name="loan_category"
)

plot_column_distribution(col_df=assign.select("loan_category"), plot_type='count', month=month, x="loan_category")


Figure saved to: output/CountPlot_of_loan_category__month_39.png


In [7]:
aggregated_nid = assign.groupBy("nid_hash").agg(sum(col("loan_amount")).alias("SumAggregated_loan_amount_by_nid"))
plot_column_distribution(col_df=aggregated_nid.select("SumAggregated_loan_amount_by_nid"), plot_type='hist', month=month, x="SumAggregated_loan_amount_by_nid")


                                                                                

Figure saved to: output/HistPlot_of_SumAggregated_loan_amount_by_nid__month_39.png


In [8]:
plot_aggregated_by_time(df=assign, timestamp_column="date_timestamp", agg_col="loan_amount", month=month)

                                                                                

Figure saved to: output/LinePlot_of_SumAggregated_loan_amount_by_Hour__month_39.png
Figure saved to: output/LinePlot_of_SumAggregated_loan_amount_by_DayOfWeek__month_39.png


                                                                                

Figure saved to: output/LinePlot_of_SumAggregated_loan_amount_by_DayOfMonth__month_39.png


# recovery

In [9]:
recovery = custom_read_parquet(spark=spark, config=config, key="loan_recovery", month=month)
recovery = custom_to_timestamp(df=recovery, col_name="date_timestamp")
recovery.show()

+--------------------+--------+--------------------+--------------------+---------------+-----------+-------------+-------------------+
|              bib_id|date_key|             fake_id|            nid_hash|        loan_id|loan_amount|hsdp_recovery|     date_timestamp|
+--------------------+--------+--------------------+--------------------+---------------+-----------+-------------+-------------------+
|B086DA1A31C0AC9D5...|20240604|f-9801-2260002271...|66D390276DA362AF8...|406043897194116|    10000.0|      10000.0|2024-06-04 23:40:07|
|B086DA1A31C0AC9D5...|20240604|f-9801-2260002271...|66D390276DA362AF8...|406033896244618|    10000.0|      10000.0|2024-06-04 23:40:07|
|B086DA1A31C0AC9D5...|20240604|f-9801-2260002271...|66D390276DA362AF8...|406013888123422|    10000.0|      10000.0|2024-06-04 23:40:06|
|B086DA1A31C0AC9D5...|20240604|f-9801-2260002271...|66D390276DA362AF8...|406013890052938|    10000.0|      10000.0|2024-06-04 23:40:07|
|B0F4B9A70CC7B095F...|20240610| f-9801-000147317

In [10]:
recovery.limit(50).toPandas()

Unnamed: 0,bib_id,date_key,fake_id,nid_hash,loan_id,loan_amount,hsdp_recovery,date_timestamp
0,B086DA1A31C0AC9D54578761D27A3D36,20240604,f-9801-22600022713285,66D390276DA362AF812951A3C4D3B2DC,406043897194116,10000.0,10000.0,2024-06-04 23:40:07
1,B086DA1A31C0AC9D54578761D27A3D36,20240604,f-9801-22600022713285,66D390276DA362AF812951A3C4D3B2DC,406033896244618,10000.0,10000.0,2024-06-04 23:40:07
2,B086DA1A31C0AC9D54578761D27A3D36,20240604,f-9801-22600022713285,66D390276DA362AF812951A3C4D3B2DC,406013888123422,10000.0,10000.0,2024-06-04 23:40:06
3,B086DA1A31C0AC9D54578761D27A3D36,20240604,f-9801-22600022713285,66D390276DA362AF812951A3C4D3B2DC,406013890052938,10000.0,10000.0,2024-06-04 23:40:07
4,B0F4B9A70CC7B095F4A6F068A358F68A,20240610,f-9801-000147317998,482114E19477CD2B4E4A265E744280F6,406073907763789,10000.0,10000.0,2024-06-10 17:51:06
5,B0F4B9A70CC7B095F4A6F068A358F68A,20240607,f-9801-000147317998,482114E19477CD2B4E4A265E744280F6,406053900741301,10000.0,10000.0,2024-06-07 18:08:21
6,B0F4B9A70CC7B095F4A6F068A358F68A,20240607,f-9801-000147317998,482114E19477CD2B4E4A265E744280F6,406053901724555,5000.0,5000.0,2024-06-07 18:08:22
7,B0F4B9A70CC7B095F4A6F068A358F68A,20240531,f-9801-000147317998,482114E19477CD2B4E4A265E744280F6,405263872431379,10000.0,10000.0,2024-05-31 11:11:34
8,B0F4B9A70CC7B095F4A6F068A358F68A,20240531,f-9801-000147317998,482114E19477CD2B4E4A265E744280F6,405293881882071,5000.0,5000.0,2024-05-31 11:11:34
9,B0F4B9A70CC7B095F4A6F068A358F68A,20240610,f-9801-000147317998,482114E19477CD2B4E4A265E744280F6,406093914301573,5000.0,5000.0,2024-06-10 17:51:07


In [11]:
assign.filter(col("bib_id")=="B10F11A70B36157D2D5694229471758B").show()

+--------------------+--------+-------------------+--------------------+---------------+-----------+-------------------+-------------+
|              bib_id|date_key|            fake_id|            nid_hash|        loan_id|loan_amount|     date_timestamp|loan_category|
+--------------------+--------+-------------------+--------------------+---------------+-----------+-------------------+-------------+
|B10F11A70B36157D2...|20240603|f-9801-000095627919|AC2DF520FB6C83118...|406033895929325|    50000.0|2024-06-03 17:41:07|      20T-50T|
|B10F11A70B36157D2...|20240610|f-9801-000095627919|AC2DF520FB6C83118...|406103916362183|    20000.0|2024-06-10 13:55:30|      10T-20T|
|B10F11A70B36157D2...|20240601|f-9801-000095627919|AC2DF520FB6C83118...|406013888037223|   100000.0|2024-06-01 00:24:54|     50T-100T|
|B10F11A70B36157D2...|20240601|f-9801-000095627919|AC2DF520FB6C83118...|406013888033875|   100000.0|2024-06-01 00:22:45|     50T-100T|
|B10F11A70B36157D2...|20240619|f-9801-000095627919|AC2D

In [12]:
recovery.printSchema()

root
 |-- bib_id: string (nullable = true)
 |-- date_key: string (nullable = true)
 |-- fake_id: string (nullable = true)
 |-- nid_hash: string (nullable = true)
 |-- loan_id: string (nullable = true)
 |-- loan_amount: double (nullable = true)
 |-- hsdp_recovery: double (nullable = true)
 |-- date_timestamp: timestamp (nullable = true)



In [13]:
for col_name in recovery.columns:
    print(f"number of null value in column {col_name} : {count_nulls(df = recovery, col_name=col_name)}")

number of null value in column bib_id : 0
number of null value in column date_key : 0
number of null value in column fake_id : 446
number of null value in column nid_hash : 0
number of null value in column loan_id : 0
number of null value in column loan_amount : 0
number of null value in column hsdp_recovery : 0
number of null value in column date_timestamp : 0


In [14]:
plot_analysis_of_loan_recovery(assign=assign, recovery=recovery)

                                                                                

Figure saved to: output/Comparison of Recovered and Assigned Loan Amounts --- 82.41% of loan_amounts are recovered.png


# package

In [15]:
package = custom_read_parquet(spark=spark, config=config, key="package", month=month)
package = custom_to_timestamp(df=package, col_name="activation_date")
package = custom_to_timestamp(df=package, col_name="deactivation_date") 
package.show()

+--------------------+--------+--------------------+--------------------+-------------+------------+--------------------+-------------------+-------------------+
|              bib_id|date_key|             fake_id|            nid_hash|offering_code|offer_amount|       offering_name|    activation_date|  deactivation_date|
+--------------------+--------+--------------------+--------------------+-------------+------------+--------------------+-------------------+-------------------+
|2CD06C4CFC6A1369A...|20240528| f-9801-000094528288|FC9BAEC665B9F8226...|    PO2034OEB|    242000.0|Weekly 5 GB Internet|2024-05-28 18:54:57|2024-06-04 23:59:59|
|2CD06C4CFC6A1369A...|20240612| f-9801-000094528288|FC9BAEC665B9F8226...|    PO2034OEB|    241200.0|Weekly 5 GB Internet|2024-06-12 10:50:35|2024-06-19 23:59:59|
|2CE32EAE4456697BC...|20240522| f-9801-000147217121|93B81E93712730BF0...|    PO2083HBH|    215000.0|Weekly 4 GB Internet|2024-05-22 07:53:23|2024-05-29 23:59:59|
|2CE32EAE4456697BC...|202406

In [16]:
package.printSchema()

root
 |-- bib_id: string (nullable = true)
 |-- date_key: string (nullable = true)
 |-- fake_id: string (nullable = true)
 |-- nid_hash: string (nullable = true)
 |-- offering_code: string (nullable = true)
 |-- offer_amount: double (nullable = true)
 |-- offering_name: string (nullable = true)
 |-- activation_date: timestamp (nullable = true)
 |-- deactivation_date: timestamp (nullable = true)



In [17]:
for col_name in package.columns:
    print(f"number of null value in column {col_name} : {count_nulls(df = package, col_name=col_name)}")

number of null value in column bib_id : 0
number of null value in column date_key : 0
number of null value in column fake_id : 21400
number of null value in column nid_hash : 0
number of null value in column offering_code : 0
number of null value in column offer_amount : 0
number of null value in column offering_name : 0
number of null value in column activation_date : 0
number of null value in column deactivation_date : 0


                                                                                

In [18]:
diff = categorize_time_difference(df=package)
diff.show()

+--------------------+--------+--------------------+--------------------+-------------+------------+--------------------+-------------------+-------------------+--------------------+-----------------+
|              bib_id|date_key|             fake_id|            nid_hash|offering_code|offer_amount|       offering_name|    activation_date|  deactivation_date|time_difference_days|duration_category|
+--------------------+--------+--------------------+--------------------+-------------+------------+--------------------+-------------------+-------------------+--------------------+-----------------+
|2CD06C4CFC6A1369A...|20240528| f-9801-000094528288|FC9BAEC665B9F8226...|    PO2034OEB|    242000.0|Weekly 5 GB Internet|2024-05-28 18:54:57|2024-06-04 23:59:59|                   7|           Weekly|
|2CD06C4CFC6A1369A...|20240612| f-9801-000094528288|FC9BAEC665B9F8226...|    PO2034OEB|    241200.0|Weekly 5 GB Internet|2024-06-12 10:50:35|2024-06-19 23:59:59|                   7|           Wee

In [22]:
plot_tops(diff, col_name="duration_category", index=10, month=month)



Figure saved to: output/top_10_popular_duration_categorys__month_39.png


                                                                                

In [31]:
diff.select("duration_category").toPandas().value_counts()

                                                                                

duration_category
Monthly              565094
Weekly               263665
Same Day             212343
Daily                157200
2 Months             149657
Half-Monthly         103761
4 Months              34888
6 Months               4314
1 Year                  131
More than 1 Year          1
Name: count, dtype: int64

In [None]:
plot_tops(df=package, col_name="offering_name", index=25, month=month)

# recharge

In [24]:
recharge = custom_read_parquet(spark=spark, config=config, key="recharge", month=month)
recharge = custom_to_timestamp(df=recharge, col_name="recharge_dt")
recharge.show()

+--------------------+--------+--------------------+--------------------+------------------+-------------------+--------------+--------------------------+-------------------------+
|              bib_id|date_key|             fake_id|            nid_hash|recharge_value_amt|        recharge_dt|origin_host_nm|account_balance_before_amt|account_balance_after_amt|
+--------------------+--------+--------------------+--------------------+------------------+-------------------+--------------+--------------------------+-------------------------+
|C307AEEF58A203368...|20240531|                NULL|B36F2F6D4BBFE1EE6...|          198182.0|2024-05-31 11:27:15|    MFSEREFILL|                    4111.0|                 202293.0|
|C32291ECD31E122CD...|20240613|f-9801-2560001997...|1ED6BE42AA0475B49...|           10000.0|2024-06-13 11:07:12|    MFSEREFILL|                    3437.0|                  13437.0|
|C32291ECD31E122CD...|20240618|f-9801-2560001997...|1ED6BE42AA0475B49...|           50000.0|202

In [25]:
recharge.printSchema()

root
 |-- bib_id: string (nullable = true)
 |-- date_key: string (nullable = true)
 |-- fake_id: string (nullable = true)
 |-- nid_hash: string (nullable = true)
 |-- recharge_value_amt: double (nullable = true)
 |-- recharge_dt: timestamp (nullable = true)
 |-- origin_host_nm: string (nullable = true)
 |-- account_balance_before_amt: double (nullable = true)
 |-- account_balance_after_amt: double (nullable = true)



In [26]:
for col_name in recharge.columns:
    print(f"number of null value in column {col_name} : {count_nulls(df = recharge, col_name=col_name)}")

number of null value in column bib_id : 0
number of null value in column date_key : 0
number of null value in column fake_id : 4045
number of null value in column nid_hash : 0
number of null value in column recharge_value_amt : 0


                                                                                

number of null value in column recharge_dt : 0
number of null value in column origin_host_nm : 0
number of null value in column account_balance_before_amt : 0
number of null value in column account_balance_after_amt : 50908


In [27]:
recharge = categorize(
    df=recharge, 
    col_name="recharge_value_amt", 
    bins=[float('-inf'), 10000, 20000, 50000, 100000, 200000, 500000, float('inf')],
    labels=["1T", "2T", "5T", "10T", "20T", "50T", "More"],
    new_column_name="recharge_category"
)

plot_column_distribution(col_df=recharge.select("recharge_category"), plot_type='count', month=month, x="recharge_category")

                                                                                

Figure saved to: output/CountPlot_of_recharge_category__month_39.png


In [28]:
plot_aggregated_by_time(df=recharge, timestamp_column="recharge_dt", agg_col="recharge_value_amt", month=month )

                                                                                

Figure saved to: output/LinePlot_of_SumAggregated_recharge_value_amt_by_Hour__month_39.png


                                                                                

Figure saved to: output/LinePlot_of_SumAggregated_recharge_value_amt_by_DayOfWeek__month_39.png




Figure saved to: output/LinePlot_of_SumAggregated_recharge_value_amt_by_DayOfMonth__month_39.png


                                                                                

# user

In [29]:
user = custom_read_parquet(spark=spark, config=config, key="user", month=month)
user.show()

+--------------------+--------------------+--------------------+---------------+--------+-------------------+---------------+--------------+---------------+---------------+-----+
|              bib_id|             fake_id|            nid_hash|contract_type_v|gender_v|registration_date_d|date_of_birth_d|ability_status|account_balance|base_station_cd|sitei|
+--------------------+--------------------+--------------------+---------------+--------+-------------------+---------------+--------------+---------------+---------------+-----+
|7ACB75A42872DB823...| f-9801-000164257907|2A82FFC7FB755237D...|              P|       M|           20240201|       20030324|        Active|  108993.449999|      LT5541XA1|T5541|
|7ACC6C44D78F20363...|f-9801-0020000933...|680DA2322C87A4E60...|              P|       M|           20070623|       19860701|        Active|       55.18333|        T3432XB| NULL|
|7AD3AA1BF5A3AC0C5...|f-9801-1340002540...|49F789BD1A29A408D...|              N|       M|           20180

In [30]:
user.select("gender_v").toPandas().value_counts()

gender_v
M           602268
F           370249
N            25989
Name: count, dtype: int64

# labeling

In [32]:
from labeling_utils import determine_churn_label

def get_churn_label():    
    current_package_df = custom_read_parquet(spark=spark, config=config, key="package", month=month)
    current_package_df = custom_to_timestamp(df=current_package_df, col_name="activation_date")

    next_package_df = custom_read_parquet(spark=spark, config=config, key="package", month=next_month)
    next_package_df = custom_to_timestamp(df=next_package_df, col_name="activation_date")

    current_recharge_df = custom_read_parquet(spark=spark, config=config, key="recharge", month=month)
    current_recharge_df = custom_to_timestamp(df=current_recharge_df, col_name="recharge_dt")

    next_recharge_df = custom_read_parquet(spark=spark, config=config, key="recharge", month=next_month)
    next_recharge_df = custom_to_timestamp(df=next_recharge_df, col_name="recharge_dt")

    return  determine_churn_label(current_package_df, current_recharge_df, next_package_df, next_recharge_df)

churn_label = get_churn_label()

In [33]:
churn_label.show()



+--------------------+-----+
|              bib_id|label|
+--------------------+-----+
|0000CFC78AA4CD390...|    0|
|00012757FE6A9C3B7...|    0|
|000171D81E79BD61E...|    1|
|0002C72A7BC77E249...|    0|
|00051CBABA49DBDD9...|    0|
|0005E18504583D452...|    1|
|00071CB1E187FDEFC...|    0|
|000C0B809B8E98820...|    0|
|000D047FFEFE03025...|    0|
|000D23FAA9CE043AC...|    0|
|001719B17F83C6D25...|    0|
|001915E85645E5E2C...|    0|
|001BC3B1603A4A5D8...|    1|
|001DC7DE19C75219A...|    1|
|001FAA900DA4B4C2C...|    0|
|00204D19AFA17B862...|    0|
|00205F24B17A4F2C0...|    0|
|002205F6BFCBF0168...|    0|
|0024C50C194EEC95F...|    0|
|0026A4FFD577699DC...|    0|
+--------------------+-----+
only showing top 20 rows



                                                                                