In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-4.1.1.tar.gz (455.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m455.4/455.4 MB[0m [31m909.0 kB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting py4j<0.10.9.10,>=0.10.9.7 (from pyspark)
  Downloading py4j-0.10.9.9-py2.py3-none-any.whl.metadata (1.3 kB)
Downloading py4j-0.10.9.9-py2.py3-none-any.whl (203 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m203.0/203.0 kB[0m [31m20.0 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (pyproject.toml) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-4.1.1-py2.py3-none-any.whl size=456008706 sha256=d44ec4a5ce90ee85f61e6bede8feabfa4d1347c0ebda7e0859cbcd34b31462e7
  Stored in directory: /root/.cache/pip/wheels/f4/ca/ea/203f

In [None]:
import seaborn as sns
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [None]:
spark = (
    SparkSession.builder
    .appName("Churn-Analysis")
    .master("local[*]")
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")
    .getOrCreate()
)

In [None]:
cd /content/drive/MyDrive/Talk_talk

/content/drive/.shortcut-targets-by-id/1Hc6JNdQGWtt7ZkVjQux9MMEg17i44ZOm/Talk_talk


In [None]:
usage_df = spark.read.option("inferSchema", "true")\
            .parquet("/content/drive/MyDrive/Talk_talk/usage.parquet")

customer_info_df = spark.read.option("inferSchema", "true") \
            .parquet("/content/drive/MyDrive/Talk_talk/customer_info.parquet")

cease_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/content/drive/MyDrive/Talk_talk/cease.csv")

call_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/content/drive/MyDrive/Talk_talk/calls.csv")

In [None]:
cease_df.printSchema()

root
 |-- unique_customer_identifier: string (nullable = true)
 |-- cease_placed_date: date (nullable = true)
 |-- cease_completed_date: string (nullable = true)
 |-- reason_description: string (nullable = true)
 |-- reason_description_insight: string (nullable = true)



In [None]:
def build_ref_dates(cease_df, customer_info_df):
  churn_ref = cease_df.filter(F.col("cease_placed_date").isNotNull()) \
            .groupBy("unique_customer_identifier") \
            .agg(F.min("cease_placed_date").alias("ref_date")) \
            .withColumn("churn_group", F.lit("Churner"))

  nonchurn_ref = customer_info_df.groupBy("unique_customer_identifier") \
        .agg(F.max("datevalue").alias("ref_date")) \
        .join(churn_ref.select("unique_customer_identifier"), on="unique_customer_identifier", how="left_anti") \
        .withColumn("churn_group", F.lit("Non-churner"))

  return churn_ref.unionByName(nonchurn_ref)

In [None]:
def build_call_features(call_df, ref_dates):

    call_features = ref_dates.select("unique_customer_identifier", "ref_date").join(
        call_df.select("unique_customer_identifier", "event_date", "call_type"),
        on="unique_customer_identifier",
        how="left"
    ).withColumn(
        "days_before", F.datediff(F.col("ref_date"), F.col("event_date"))
    )

    call_features_90 = call_features.filter((F.col("days_before") >= 0) & (F.col("days_before") <= 90))

    is_loyalty = F.col("call_type") == F.lit("Loyalty")
    is_tech    = F.col("call_type") == F.lit("Tech")
    is_csb     = F.col("call_type") == F.lit("CS&B")
    is_fin     = F.col("call_type") == F.lit("Customer Finance")

    feats = call_features_90.groupBy("unique_customer_identifier").agg(

        F.sum(F.when(F.col("days_before") <= 30, 1).otherwise(0)).alias("calls_total_30d"),
        F.sum(F.when(F.col("days_before") <= 60, 1).otherwise(0)).alias("calls_total_60d"),
        F.sum(F.when(F.col("days_before") <= 90, 1).otherwise(0)).alias("calls_total_90d"),

        F.sum(F.when((F.col("days_before") <= 30) & is_loyalty, 1).otherwise(0)).alias("calls_loyalty_30d"),
        F.sum(F.when((F.col("days_before") <= 90) & is_loyalty, 1).otherwise(0)).alias("calls_loyalty_90d"),

        F.sum(F.when((F.col("days_before") <= 30) & is_tech, 1).otherwise(0)).alias("calls_tech_30d"),
        F.sum(F.when((F.col("days_before") <= 90) & is_tech, 1).otherwise(0)).alias("calls_tech_90d"),

        F.sum(F.when((F.col("days_before") <= 30) & is_csb, 1).otherwise(0)).alias("calls_csb_30d"),
        F.sum(F.when((F.col("days_before") <= 90) & is_csb, 1).otherwise(0)).alias("calls_csb_90d"),

        F.sum(F.when((F.col("days_before") <= 30) & is_fin, 1).otherwise(0)).alias("calls_finance_30d"),
        F.sum(F.when((F.col("days_before") <= 90) & is_fin, 1).otherwise(0)).alias("calls_finance_90d"),

        F.min("days_before").alias("days_since_last_call"),

        F.countDistinct("call_type").alias("call_type_diversity_90d")
    )

    base = ref_dates.select("unique_customer_identifier").distinct()
    feats = base.join(feats, on="unique_customer_identifier", how="left").fillna({
        "calls_total_30d": 0, "calls_total_60d": 0, "calls_total_90d": 0,
        "calls_loyalty_30d": 0, "calls_loyalty_90d": 0,
        "calls_tech_30d": 0, "calls_tech_90d": 0,
        "calls_csb_30d": 0, "calls_csb_90d": 0,
        "calls_finance_30d": 0, "calls_finance_90d": 0,
        "call_type_diversity_90d": 0
    }).withColumn(
        "days_since_last_call", F.coalesce(F.col("days_since_last_call"), F.lit(91))
    ).withColumn(
        "repeat_caller_flag_30d", F.when(F.col("calls_total_30d") >= 3, 1).otherwise(0)
    )

    return feats


In [None]:
ref_dates = build_ref_dates(cease_df, customer_info_df)
call_features_df = build_call_features(call_df, ref_dates)

In [None]:
call_features_df.show()

+--------------------------+---------------+---------------+---------------+-----------------+-----------------+--------------+--------------+-------------+-------------+-----------------+-----------------+--------------------+-----------------------+----------------------+
|unique_customer_identifier|calls_total_30d|calls_total_60d|calls_total_90d|calls_loyalty_30d|calls_loyalty_90d|calls_tech_30d|calls_tech_90d|calls_csb_30d|calls_csb_90d|calls_finance_30d|calls_finance_90d|days_since_last_call|call_type_diversity_90d|repeat_caller_flag_30d|
+--------------------------+---------------+---------------+---------------+-----------------+-----------------+--------------+--------------+-------------+-------------+-----------------+-----------------+--------------------+-----------------------+----------------------+
|      000bcca91cb6de20c...|              0|              0|              0|                0|                0|             0|             0|            0|            0|     

In [None]:
def build_talk_hold_features(call_df, ref_dates):

    calls = call_df \
        .withColumn("event_date", F.to_date("event_date")) \
        .withColumn("talk_time_seconds", (F.col("talk_time_seconds").cast("double"))/ 60) \
        .withColumn("hold_time_seconds", (F.col("hold_time_seconds").cast("double"))/60)

    talk_talk = ref_dates.select("unique_customer_identifier", "ref_date") \
        .join(
            calls.select("unique_customer_identifier", "event_date",
                         "talk_time_seconds", "hold_time_seconds"),
            on="unique_customer_identifier",
            how="left"
        ) \
        .withColumn(
            "days_before", F.datediff(F.col("ref_date"), F.col("event_date"))
        )

    talk_talk_90 = talk_talk.filter(
        (F.col("days_before") >= 0) &
        (F.col("days_before") <= 90)
    )

    feats = talk_talk_90.groupBy("unique_customer_identifier").agg(

        F.sum(F.when(F.col("days_before") <= 30,
                     F.col("talk_time_seconds")).otherwise(0)
              ).alias("talk_time_total_30d"),

        F.sum(F.when(F.col("days_before") <= 90,
                     F.col("talk_time_seconds")).otherwise(0)
              ).alias("talk_time_total_90d"),

        F.sum(F.when(F.col("days_before") <= 30,
                     F.col("hold_time_seconds")).otherwise(0)
              ).alias("hold_time_total_30d"),

        F.sum(F.when(F.col("days_before") <= 90,
                     F.col("hold_time_seconds")).otherwise(0)
              ).alias("hold_time_total_90d"),

        F.sum(F.when(F.col("days_before") <= 90, 1).otherwise(0)
              ).alias("calls_total_90d")
    )

    base = ref_dates.select("unique_customer_identifier").distinct()

    feats = base.join(feats, on="unique_customer_identifier", how="left") \
        .fillna({
            "talk_time_total_30d": 0.0,
            "talk_time_total_90d": 0.0,
            "hold_time_total_30d": 0.0,
            "hold_time_total_90d": 0.0,
            "calls_total_90d": 0
        })

    feats = feats \
        .withColumn(
            "avg_talk_time_per_call_90d",
            F.when(F.col("calls_total_90d") > 0,
                   F.col("talk_time_total_90d") / F.col("calls_total_90d"))
             .otherwise(0.0)
        ) \
        .withColumn(
            "hold_ratio_90d",
            F.when(F.col("talk_time_total_90d") > 0,
                   F.col("hold_time_total_90d") / F.col("talk_time_total_90d"))
             .otherwise(0.0)
        )

    return feats.drop('calls_total_90d')


In [None]:
talk_hold_features_df = build_talk_hold_features(call_df, ref_dates)

In [None]:
talk_hold_features_df.columns

['unique_customer_identifier',
 'talk_time_total_30d',
 'talk_time_total_90d',
 'hold_time_total_30d',
 'hold_time_total_90d',
 'avg_talk_time_per_call_90d',
 'hold_ratio_90d']

In [None]:
talk_hold_features_df.show()

+--------------------------+-------------------+-------------------+-------------------+-------------------+--------------------------+-------------------+
|unique_customer_identifier|talk_time_total_30d|talk_time_total_90d|hold_time_total_30d|hold_time_total_90d|avg_talk_time_per_call_90d|     hold_ratio_90d|
+--------------------------+-------------------+-------------------+-------------------+-------------------+--------------------------+-------------------+
|      000bcca91cb6de20c...|                0.0|                0.0|                0.0|                0.0|                       0.0|                0.0|
|      000c994611ccd522d...| 11.733333333333334| 11.733333333333334|  6.733333333333333|  6.733333333333333|        3.9111111111111114| 0.5738636363636364|
|      0010cb2447ba44197...|                0.0|                0.0|                0.0|                0.0|                       0.0|                0.0|
|      003b8fc1779648421...|                0.0|                

In [None]:
def build_payment_friction_features(customer_info_df):
    cust = customer_info_df.withColumn("datevalue", F.to_date("datevalue"))

    w = Window.partitionBy("unique_customer_identifier").orderBy(F.col("datevalue").desc())

    feats = cust.withColumn("rn", F.row_number().over(w)) \
        .filter(F.col("rn") == 1) \
        .select(
            "unique_customer_identifier",
            F.coalesce(F.col("contract_dd_cancels").cast("int"), F.lit(0)).alias("contract_dd_cancels"),
            F.coalesce(F.col("dd_cancel_60_day").cast("int"), F.lit(0)).alias("dd_cancel_60_day")
        ) \
        .withColumn("any_contract_dd_cancel", F.when(F.col("contract_dd_cancels") > 0, 1).otherwise(0)) \
        .withColumn("any_dd_cancel_60d", F.when(F.col("dd_cancel_60_day") > 0, 1).otherwise(0))

    return feats

In [None]:
payment_cust_features = build_payment_friction_features(customer_info_df)

In [None]:
payment_cust_features.show(truncate=False)

+----------------------------------------------------------------+-------------------+----------------+----------------------+-----------------+
|unique_customer_identifier                                      |contract_dd_cancels|dd_cancel_60_day|any_contract_dd_cancel|any_dd_cancel_60d|
+----------------------------------------------------------------+-------------------+----------------+----------------------+-----------------+
|000bcca91cb6de20cc5191c4579f92f13430fb6f15d2b2969acb5ffeefc20fbb|0                  |0               |0                     |0                |
|000c994611ccd522d116d190ee87a20d58407e97a3f253a1655af00650eec9d9|1                  |1               |1                     |1                |
|0010cb2447ba4419727218ee906da636e6f6964df9e9132784422f71ef5e3076|0                  |0               |0                     |0                |
|003b8fc177964842130249ca9a0008cd6063963543b411731a6eea870ea9fdff|1                  |1               |1                     |1   

In [None]:
def build_contract_tenure_features(customer_info_df):

    cust = customer_info_df.withColumn("datevalue", F.to_date("datevalue"))
    w = Window.partitionBy("unique_customer_identifier").orderBy(F.col("datevalue").desc())

    df = (cust.withColumn("rn", F.row_number().over(w))
        .filter(F.col("rn") == 1)
        .select(
            "unique_customer_identifier",
            F.coalesce(F.col("tenure_days").cast("int"), F.lit(0)).alias("tenure_days"),
            F.coalesce(F.col("ooc_days").cast("int"), F.lit(0)).alias("ooc_days"),
            F.coalesce(F.col("contract_status"), F.lit("Unknown")).alias("contract_status")
        )
        .withColumn("is_out_of_contract", F.when(F.col("ooc_days") > 0, 1).otherwise(0))
        .withColumn("is_near_ooc", F.when((F.col("ooc_days") >= -60) & (F.col("ooc_days") <= 0), 1).otherwise(0))
        .withColumn(
            "tenure_bucket",
            F.when(F.col("tenure_days") < 30,  "0–1 month")
             .when(F.col("tenure_days") < 90,  "1–3 months")
             .when(F.col("tenure_days") < 180, "3–6 months")
             .when(F.col("tenure_days") < 365, "6–12 months")
             .when(F.col("tenure_days") < 540, "12–18 months")
             .when(F.col("tenure_days") < 730, "18–24 months")
             .otherwise("24+ months")
        )
    )

    return df.drop("tenure_days")

In [None]:
contract_tenure_df = build_contract_tenure_features(customer_info_df)

In [None]:
contract_tenure_df.show()

+--------------------------+--------+-----------------+------------------+-----------+-------------+
|unique_customer_identifier|ooc_days|  contract_status|is_out_of_contract|is_near_ooc|tenure_bucket|
+--------------------------+--------+-----------------+------------------+-----------+-------------+
|      000bcca91cb6de20c...|     -49|03 Soon to be OOC|                 0|          1|   24+ months|
|      000c994611ccd522d...|      12|     05 Newly OOC|                 1|          0|   24+ months|
|      0010cb2447ba44197...|     604|           06 OOC|                 1|          0|   24+ months|
|      003b8fc1779648421...|     -77|03 Soon to be OOC|                 0|          0|   24+ months|
|      0044a141cffb1c522...|     -63|03 Soon to be OOC|                 0|          0|   24+ months|
|      004502eba6e70ee21...|      29|     05 Newly OOC|                 1|          0|   24+ months|
|      004873d5592505acd...|    -104|   02 In Contract|                 0|          0|   24

In [None]:
contract_tenure_df.groupBy('unique_customer_identifier').count().orderBy(F.col('count').desc()).show()

+--------------------------+-----+
|unique_customer_identifier|count|
+--------------------------+-----+
|      001a521fb87a8ad5c...|    1|
|      00035e1ed9d471f0f...|    1|
|      000488adb0a8fde22...|    1|
|      000d854a04e8c2140...|    1|
|      0002ba270edc42e5d...|    1|
|      0018abc6d06bc5e3f...|    1|
|      000ed13dcfae07d02...|    1|
|      001f71eebc6457175...|    1|
|      0001ed3f9d6a79197...|    1|
|      002aaf2ed71bcf2a6...|    1|
|      0010738e21082ce6d...|    1|
|      003074f5f01b3e2bb...|    1|
|      000b1ab958e85e3b8...|    1|
|      00316e6e2565d86ba...|    1|
|      00165bb9d8473d8fb...|    1|
|      004f820c0449d611b...|    1|
|      000bcca91cb6de20c...|    1|
|      0050b150a9c437d43...|    1|
|      0022e7a353779e33f...|    1|
|      0062500e5a4181e77...|    1|
+--------------------------+-----+
only showing top 20 rows


In [None]:

def build_usage_features(usage_df, ref_dates):

    usage = usage_df.withColumn("calendar_date", F.to_date("calendar_date")) \
        .withColumn("usage_download_mbs", F.col("usage_download_mbs").cast("double")) \
        .withColumn("usage_upload_mbs", F.col("usage_upload_mbs").cast("double"))

    usage_data = ref_dates.select("unique_customer_identifier", "ref_date").join(
        usage.select("unique_customer_identifier", "calendar_date", "usage_download_mbs", "usage_upload_mbs"),
        on="unique_customer_identifier",
        how="left"
    ).withColumn(
        "days_before", F.datediff(F.col("ref_date"), F.col("calendar_date"))
    )

    usage_data_90 = usage_data.filter((F.col("days_before") >= 0) & (F.col("days_before") <= 60))

    feats = usage_data_90.groupBy("unique_customer_identifier").agg(

        F.sum(F.when(F.col("days_before") <= 30, F.col("usage_download_mbs")).otherwise(0.0)).alias("download_total_30d"),
        F.sum(F.when(F.col("days_before") <= 60, F.col("usage_download_mbs")).otherwise(0.0)).alias("download_total_60d"),

        F.sum(F.when(F.col("days_before") <= 30, F.col("usage_upload_mbs")).otherwise(0.0)).alias("upload_total_30d"),
        F.sum(F.when(F.col("days_before") <= 60, F.col("usage_upload_mbs")).otherwise(0.0)).alias("upload_total_60d"),

        F.min("days_before").alias("days_since_last_usage"),

        F.sum(F.when(F.col("days_before") <= 27, F.col("usage_download_mbs")).otherwise(0.0)).alias("download_last_28d"),
        F.sum(F.when((F.col("days_before") >= 28) & (F.col("days_before") <= 55),
                     F.col("usage_download_mbs")).otherwise(0.0)).alias("download_prev_28d")
    )

    base = ref_dates.select("unique_customer_identifier").distinct()

    feats = base.join(feats, on="unique_customer_identifier", how="left").fillna({
        "download_total_30d": 0.0,
        "download_total_60d": 0.0,
        "upload_total_30d": 0.0,
        "upload_total_60d": 0.0,
        "download_last_28d": 0.0,
        "download_prev_28d": 0.0
    }).withColumn(

        "days_since_last_usage", F.coalesce(F.col("days_since_last_usage"), F.lit(60))
    )

    feats = feats.withColumn(
        "usage_drop_pct_28d",
        F.when(F.col("download_prev_28d") > 0,
               F.round((F.col("download_last_28d") - F.col("download_prev_28d")) / F.col("download_prev_28d") * 100, 2)
        ).otherwise(None)
    ).drop("download_last_28d", "download_prev_28d")

    return feats.select('unique_customer_identifier','usage_drop_pct_28d')


In [None]:
usage_features_df = build_usage_features(usage_df, ref_dates)

In [None]:
usage_features_df.show()

Py4JJavaError: An error occurred while calling o846.showString.
: org.apache.spark.SparkException: [FAILED_READ_FILE.NO_HINT] Encountered error while reading file file:///content/drive/MyDrive/Talk_talk/usage.parquet.  SQLSTATE: KD001
	at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:911)
	at org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2$.attachFilePath(FileDataSourceV2.scala:142)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:141)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:773)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:593)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:153)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:111)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:180)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:716)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:86)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:83)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:97)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:719)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.RuntimeException: java.lang.OutOfMemoryError: Java heap space
	at org.apache.parquet.util.DynMethods$UnboundMethod.invokeChecked(DynMethods.java:67)
	at org.apache.parquet.hadoop.util.wrapped.io.VectorIoBridge.readWrappedRanges(VectorIoBridge.java:244)
	at org.apache.parquet.hadoop.util.wrapped.io.VectorIoBridge.readVectoredRanges(VectorIoBridge.java:201)
	at org.apache.parquet.hadoop.util.H1SeekableInputStream.readVectored(H1SeekableInputStream.java:75)
	at org.apache.parquet.hadoop.ParquetFileReader.readVectored(ParquetFileReader.java:1357)
	at org.apache.parquet.hadoop.ParquetFileReader.readAllPartsVectoredOrNormal(ParquetFileReader.java:1274)
	at org.apache.parquet.hadoop.ParquetFileReader.internalReadRowGroup(ParquetFileReader.java:1185)
	at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:1135)
	at org.apache.parquet.hadoop.ParquetFileReader.readNextFilteredRowGroup(ParquetFileReader.java:1398)
	at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase$ParquetRowGroupReaderImpl.readNextRowGroup(SpecificParquetRecordReaderBase.java:292)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:481)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:399)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:238)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext0(FileScanRDD.scala:130)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:292)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext0(FileScanRDD.scala:130)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:139)
	... 20 more
Caused by: java.lang.OutOfMemoryError: Java heap space


In [None]:
usage_features_df.groupBy('unique_customer_identifier').count().orderBy(F.col('count').desc()).show()

In [None]:
def build_product_tech_speed_features(customer_info_df):

    cust = customer_info_df.withColumn("datevalue", F.to_date("datevalue"))

    w = Window.partitionBy("unique_customer_identifier").orderBy(F.col("datevalue").desc())

    df = cust.withColumn("rn", F.row_number().over(w)) \
        .filter(F.col("rn") == 1) \
        .select(
            "unique_customer_identifier",
            F.coalesce(F.col("sales_channel"), F.lit("Unknown")).alias("sales_channel"),
            F.coalesce(F.col("technology"), F.lit("Unknown")).alias("technology"),
            F.col("speed").cast("double").alias("speed"),
            F.col("line_speed").cast("double").alias("line_speed")
        )

    df = df.withColumn(
        "speed_gap",
        F.when(F.col("speed").isNotNull() & F.col("line_speed").isNotNull(),
               F.col("speed") - F.col("line_speed")
        ).otherwise(None)
    ).withColumn(
        "speed_gap_pct",
        F.when((F.col("speed") > 0) & F.col("line_speed").isNotNull(),
               (F.col("speed") - F.col("line_speed")) / F.col("speed")
        ).otherwise(None)
    )

    return df


In [None]:
prod_speed_df = build_product_tech_speed_features(customer_info_df)

In [None]:
prod_speed_df.show()

In [None]:
def build_churn_target_completed(cease_df):

    target = cease_df.filter(F.col("cease_completed_date")!='null') \
        .groupBy("unique_customer_identifier") \
        .agg(F.min("cease_completed_date").alias("ref_date")) \
        .withColumn("is_churned", F.lit(1))

    return target


In [None]:
target_df = build_churn_target_completed(cease_df)

In [None]:
final_frame = call_features_df \
    .join(talk_hold_features_df, on="unique_customer_identifier", how="left") \
    .join(payment_cust_features, on="unique_customer_identifier", how="left") \
    .join(contract_tenure_df, on="unique_customer_identifier", how="left") \
    .join(usage_features_df, on="unique_customer_identifier", how="left") \
    .join(prod_speed_df, on="unique_customer_identifier", how="left")

In [None]:
final_frame.show()

In [None]:
target_df_label = final_frame.join(
    target_df.select("unique_customer_identifier", "is_churned"),
    on="unique_customer_identifier",
    how="left"
).withColumn(
    "is_churned", F.coalesce(F.col("is_churned"), F.lit(0)))

In [None]:
target_df_label.groupBy('is_churned').count().show()

In [None]:
len(target_df_label.columns)

In [None]:
avg_by_label = target_df_label.groupBy("is_churned").agg(
    F.count("*").alias("customers"),

    # Calls volume
    F.round(F.avg("calls_total_30d"), 3).alias("avg_calls_total_30d"),
    F.round(F.avg("calls_total_60d"), 3).alias("avg_calls_total_60d"),
    F.round(F.avg("calls_total_90d"), 3).alias("avg_calls_total_90d"),

    # Call types
    F.round(F.avg("calls_loyalty_30d"), 3).alias("avg_calls_loyalty_30d"),
    F.round(F.avg("calls_loyalty_90d"), 3).alias("avg_calls_loyalty_90d"),
    F.round(F.avg("calls_tech_30d"), 3).alias("avg_calls_tech_30d"),
    F.round(F.avg("calls_tech_90d"), 3).alias("avg_calls_tech_90d"),
    F.round(F.avg("calls_csb_30d"), 3).alias("avg_calls_csb_30d"),
    F.round(F.avg("calls_csb_90d"), 3).alias("avg_calls_csb_90d"),
    F.round(F.avg("calls_finance_30d"), 3).alias("avg_calls_finance_30d"),
    F.round(F.avg("calls_finance_90d"), 3).alias("avg_calls_finance_90d"),

    # Recency / breadth
    F.round(F.avg("days_since_last_call"), 3).alias("avg_days_since_last_call"),
    F.round(F.avg("call_type_diversity_90d"), 3).alias("avg_call_type_diversity_90d"),
    F.round(F.avg("repeat_caller_flag_30d"), 3).alias("pct_repeat_caller_30d"),

    # Talk / hold
    F.round(F.avg("talk_time_total_30d"), 3).alias("avg_talk_time_total_30d"),
    F.round(F.avg("talk_time_total_90d"), 3).alias("avg_talk_time_total_90d"),
    F.round(F.avg("hold_time_total_30d"), 3).alias("avg_hold_time_total_30d"),
    F.round(F.avg("hold_time_total_90d"), 3).alias("avg_hold_time_total_90d"),
    F.round(F.avg("avg_talk_time_per_call_90d"), 3).alias("avg_avg_talk_time_per_call_90d"),
    F.round(F.avg("hold_ratio_90d"), 3).alias("avg_hold_ratio_90d"),

    # Payment friction
    F.round(F.avg("contract_dd_cancels"), 3).alias("avg_contract_dd_cancels"),
    F.round(F.avg("dd_cancel_60_day"), 3).alias("avg_dd_cancel_60_day"),
    F.round(F.avg("any_contract_dd_cancel"), 3).alias("pct_any_contract_dd_cancel"),
    F.round(F.avg("any_dd_cancel_60d"), 3).alias("pct_any_dd_cancel_60d"),

    # Contract / tenure
    F.round(F.avg("ooc_days"), 3).alias("avg_ooc_days"),
    F.round(F.avg("is_out_of_contract"), 3).alias("pct_out_of_contract"),
    F.round(F.avg("is_near_ooc"), 3).alias("pct_near_ooc"),

    # Usage
    F.round(F.avg("usage_drop_pct_28d"), 3).alias("avg_usage_drop_pct_28d"),

    # Speed quality
    F.round(F.avg("speed"), 3).alias("avg_speed"),
    F.round(F.avg("line_speed"), 3).alias("avg_line_speed"),
    F.round(F.avg("speed_gap"), 3).alias("avg_speed_gap"),
    F.round(F.avg("speed_gap_pct"), 3).alias("avg_speed_gap_pct")
)

In [None]:
avg_by_label.show()

In [None]:
target_df_label.columns

In [None]:
(target_df_label.fillna(0)
 .coalesce(1)
 .write
 .mode("overwrite")
 .option("header", True)
 .parquet("/content/drive/MyDrive/Talk_talk/target_df_label_par")
)