In [104]:
!pip install pandas numpy faker




In [105]:
!pip install pyspark




In [106]:
import pandas as pd
import numpy as np
from faker import Faker
import random
from datetime import datetime, timedelta

fake = Faker()

In [107]:
# CONFIG
# ------------------------------
NUM_CUSTOMERS = 20000
NUM_CDR = 50000
NUM_COMPLAINTS = 8000
NUM_NETWORK = 10000

locations = ["Kathmandu", "Lalitpur", "Bhaktapur", "Pokhara", "Biratnagar", "Butwal", "Dharan"]
service_plans = ["Basic", "Standard", "Premium"]
network_types = ["4G", "5G", "VoLTE"]
call_types = ["incoming", "outgoing"]
complaint_types = ["Network Issue", "Billing Issue", "Slow Internet", "Dropped Calls", "Other"]

In [108]:
# 1️⃣ CUSTOMER DATASET
# ------------------------------
customers = {
    "customer_id": [fake.unique.random_int(100000, 999999) for _ in range(NUM_CUSTOMERS)],
    "location": [random.choice(locations) for _ in range(NUM_CUSTOMERS)],
    "service_plan": [random.choice(service_plans) for _ in range(NUM_CUSTOMERS)],
    "data_usage": [round(random.uniform(0.1, 10.0),2) for _ in range(NUM_CUSTOMERS)],
    "call_duration": [fake.random_int(10,600) for _ in range(NUM_CUSTOMERS)],
    "complaints": [fake.random_int(0,5) for _ in range(NUM_CUSTOMERS)],
    "churn": [fake.random_int(0,1) for _ in range(NUM_CUSTOMERS)]
}

df_customers = pd.DataFrame(customers)

In [109]:
# 2️⃣ CDR DATASET
# ------------------------------
cdr = {
    "record_id": list(range(1, NUM_CDR+1)),
    "customer_id": [random.choice(df_customers["customer_id"]) for _ in range(NUM_CDR)],
    "timestamp": [fake.date_time_between(start_date="-1y", end_date="now") for _ in range(NUM_CDR)],
    "call_type": [random.choice(call_types) for _ in range(NUM_CDR)],
    "call_duration_seconds": [fake.random_int(1, 800) for _ in range(NUM_CDR)],
    "called_number": [fake.random_int(9800000000, 9899999999) for _ in range(NUM_CDR)],
    "call_cost": [round(random.uniform(0.1, 2.0), 4) for _ in range(NUM_CDR)],
    "tower_id": [f"TWR{fake.random_int(1000,9999)}" for _ in range(NUM_CDR)],
    "network": [random.choice(network_types) for _ in range(NUM_CDR)]
}

df_cdr = pd.DataFrame(cdr)


In [110]:
# 3️⃣ COMPLAINT LOGS DATASET
# ------------------------------
complaints = {
    "complaint_id": list(range(1, NUM_COMPLAINTS+1)),
    "customer_id": [random.choice(df_customers["customer_id"]) for _ in range(NUM_COMPLAINTS)],
    "timestamp": [fake.date_time_between(start_date="-1y", end_date="now") for _ in range(NUM_COMPLAINTS)],
    "category": [random.choice(complaint_types) for _ in range(NUM_COMPLAINTS)],
    "resolved": [random.choice([0, 1]) for _ in range(NUM_COMPLAINTS)]
}

df_complaints = pd.DataFrame(complaints)

In [111]:
# 4️⃣ NETWORK USAGE DATASET
# ------------------------------
network_usage = {
    "tower_id": [f"TWR{fake.random_int(1000,9999)}" for _ in range(NUM_NETWORK)],
    "timestamp": [fake.date_time_between(start_date="-30d", end_date="now") for _ in range(NUM_NETWORK)],
    "data_load_MB": [round(random.uniform(100, 5000), 2) for _ in range(NUM_NETWORK)],
    "active_users": [fake.random_int(10, 500) for _ in range(NUM_NETWORK)],
    "network_type": [random.choice(network_types) for _ in range(NUM_NETWORK)]
}

df_network = pd.DataFrame(network_usage)

In [112]:
# SAVE TO CSV
# ------------------------------
df_customers.to_csv("customers.csv", index=False)
df_cdr.to_csv("cdr.csv", index=False)
df_complaints.to_csv("complaints.csv", index=False)
df_network.to_csv("network_usage.csv", index=False)

df_customers.head(), df_cdr.head(), df_complaints.head(), df_network.head()

(   customer_id    location service_plan  data_usage  call_duration  \
 0       952924    Lalitpur      Premium        4.04            380   
 1       443105      Dharan      Premium        5.56            427   
 2       608919  Biratnagar      Premium        4.24            420   
 3       123322    Lalitpur     Standard        5.62             99   
 4       963491    Lalitpur      Premium        5.15            288   
 
    complaints  churn  
 0           0      1  
 1           5      1  
 2           4      0  
 3           1      0  
 4           1      0  ,
    record_id  customer_id                  timestamp call_type  \
 0          1       457636 2025-11-10 11:46:35.382419  outgoing   
 1          2       816728 2024-12-19 05:21:23.908468  incoming   
 2          3       987917 2025-07-25 02:28:51.815629  incoming   
 3          4       469832 2025-07-18 06:05:18.939896  incoming   
 4          5       484687 2025-09-29 08:31:25.932601  outgoing   
 
    call_duration_secon

In [113]:
# View columns only
print("DF1 Columns:", df_customers.columns.tolist())
print("DF2 Columns:", df_cdr.columns.tolist())
print("DF3 Columns:", df_complaints.columns.tolist())
print("DF4 Columns:", df_network.columns.tolist())

# View schema (column name + datatype + non-null info)
print("\nDF1 Info:")
df_customers.info()
print("\nDF2 Info:")
df_cdr.info()
print("\nDF3 Info:")
df_complaints.info()
print("\nDF4 Info:")
df_network.info()


DF1 Columns: ['customer_id', 'location', 'service_plan', 'data_usage', 'call_duration', 'complaints', 'churn']
DF2 Columns: ['record_id', 'customer_id', 'timestamp', 'call_type', 'call_duration_seconds', 'called_number', 'call_cost', 'tower_id', 'network']
DF3 Columns: ['complaint_id', 'customer_id', 'timestamp', 'category', 'resolved']
DF4 Columns: ['tower_id', 'timestamp', 'data_load_MB', 'active_users', 'network_type']

DF1 Info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 20000 entries, 0 to 19999
Data columns (total 7 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   customer_id    20000 non-null  int64  
 1   location       20000 non-null  object 
 2   service_plan   20000 non-null  object 
 3   data_usage     20000 non-null  float64
 4   call_duration  20000 non-null  int64  
 5   complaints     20000 non-null  int64  
 6   churn          20000 non-null  int64  
dtypes: float64(1), int64(4), object(2)
memory usage: 1.1+ 

In [114]:
from google.colab import files

files.download("customers.csv")
files.download("cdr.csv")
files.download("complaints.csv")
files.download("network_usage.csv")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [115]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, isnan

In [116]:
# 1️⃣ Create Spark Session
spark = SparkSession.builder \
    .appName("TelecomDataCleaning") \
    .master("local[*]") \
    .getOrCreate()

In [117]:
# 2️⃣ Load CSVs into DataFrames
df_customers = spark.read.csv("customers.csv", header=True, inferSchema=True)
df_cdr = spark.read.csv("cdr.csv", header=True, inferSchema=True)
df_complaints = spark.read.csv("complaints.csv", header=True, inferSchema=True)
df_network = spark.read.csv("network_usage.csv", header=True, inferSchema=True)

In [118]:
# 3️⃣ Check schema & basic stats
print("=== Customers Schema ===")
df_customers.printSchema()
df_customers.show(5)


=== Customers Schema ===
root
 |-- customer_id: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- service_plan: string (nullable = true)
 |-- data_usage: double (nullable = true)
 |-- call_duration: integer (nullable = true)
 |-- complaints: integer (nullable = true)
 |-- churn: integer (nullable = true)

+-----------+----------+------------+----------+-------------+----------+-----+
|customer_id|  location|service_plan|data_usage|call_duration|complaints|churn|
+-----------+----------+------------+----------+-------------+----------+-----+
|     952924|  Lalitpur|     Premium|      4.04|          380|         0|    1|
|     443105|    Dharan|     Premium|      5.56|          427|         5|    1|
|     608919|Biratnagar|     Premium|      4.24|          420|         4|    0|
|     123322|  Lalitpur|    Standard|      5.62|           99|         1|    0|
|     963491|  Lalitpur|     Premium|      5.15|          288|         1|    0|
+-----------+----------+--------

In [119]:
print("=== CDR Schema ===")
df_cdr.printSchema()
df_cdr.show(5)

=== CDR Schema ===
root
 |-- record_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- call_type: string (nullable = true)
 |-- call_duration_seconds: integer (nullable = true)
 |-- called_number: long (nullable = true)
 |-- call_cost: double (nullable = true)
 |-- tower_id: string (nullable = true)
 |-- network: string (nullable = true)

+---------+-----------+--------------------+---------+---------------------+-------------+---------+--------+-------+
|record_id|customer_id|           timestamp|call_type|call_duration_seconds|called_number|call_cost|tower_id|network|
+---------+-----------+--------------------+---------+---------------------+-------------+---------+--------+-------+
|        1|     457636|2025-11-10 11:46:...| outgoing|                  383|   9880870532|   0.8506| TWR9006|  VoLTE|
|        2|     816728|2024-12-19 05:21:...| incoming|                  333|   9883851600|   0.1289| TWR6241|  VoLT

In [120]:
print("=== Complaints Schema ===")
df_complaints.printSchema()
df_complaints.show(5)

=== Complaints Schema ===
root
 |-- complaint_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- category: string (nullable = true)
 |-- resolved: integer (nullable = true)

+------------+-----------+--------------------+-------------+--------+
|complaint_id|customer_id|           timestamp|     category|resolved|
+------------+-----------+--------------------+-------------+--------+
|           1|     847707|2025-04-20 03:13:...|Dropped Calls|       1|
|           2|     299792|2025-04-09 03:31:...|        Other|       0|
|           3|     702698|2024-12-10 21:38:...|Slow Internet|       0|
|           4|     795529|2025-09-18 16:02:...|Dropped Calls|       1|
|           5|     306703|2025-07-08 11:33:...|Slow Internet|       0|
+------------+-----------+--------------------+-------------+--------+
only showing top 5 rows



In [121]:
print("=== Network Schema ===")
df_network.printSchema()
df_network.show(5)

=== Network Schema ===
root
 |-- tower_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- data_load_MB: double (nullable = true)
 |-- active_users: integer (nullable = true)
 |-- network_type: string (nullable = true)

+--------+--------------------+------------+------------+------------+
|tower_id|           timestamp|data_load_MB|active_users|network_type|
+--------+--------------------+------------+------------+------------+
| TWR2980|2025-11-06 00:26:...|     2998.39|         209|          5G|
| TWR8241|2025-11-18 16:48:...|     2588.02|         251|          4G|
| TWR4619|2025-11-07 07:10:...|     1212.09|         327|          5G|
| TWR1204|2025-11-05 06:38:...|     1865.88|         173|       VoLTE|
| TWR5992|2025-11-08 09:59:...|     3584.88|         181|          5G|
+--------+--------------------+------------+------------+------------+
only showing top 5 rows



In [122]:
# 4️⃣ Remove duplicates
# Customers and Complaints: duplicates are likely unwanted
df_customers = df_customers.dropDuplicates()
df_complaints = df_complaints.dropDuplicates()


In [123]:
# CDR & Network: check first
cdr_dups = df_cdr.count() - df_cdr.dropDuplicates().count()
network_dups = df_network.count() - df_network.dropDuplicates().count()
print(f"CDR duplicates: {cdr_dups}, Network duplicates: {network_dups}")

CDR duplicates: 0, Network duplicates: 0


In [124]:
df_cdr = df_cdr.dropDuplicates() if cdr_dups > 0 else df_cdr
df_network = df_network.dropDuplicates() if network_dups > 0 else df_network

In [125]:
# 5️⃣ Handle missing values
# Customers
numeric_cols_cust = ["data_usage", "call_duration", "complaints", "churn"]
string_cols_cust = ["location", "service_plan"]
for col_name in numeric_cols_cust:
    df_customers = df_customers.fillna({col_name: 0})
for col_name in string_cols_cust:
    df_customers = df_customers.fillna({col_name: "Unknown"})

In [126]:

# CDR
df_cdr = df_cdr.fillna({
    "call_type": "unknown",
    "call_duration_seconds": 0,
    "call_cost": 0.0,
    "network": "unknown"
})


In [127]:
# Complaints
df_complaints = df_complaints.fillna({
    "category": "Other",
    "resolved": 0
})

In [128]:
# Network
df_network = df_network.fillna({
    "data_load_MB": 0.0,
    "active_users": 0,
    "network_type": "Unknown"
})

In [129]:
# 6️⃣ Standardize categorical values
df_customers = df_customers.withColumn(
    "service_plan",
    when(col("service_plan").isin(["Basic", "Standard", "Premium"]), col("service_plan")).otherwise("Other")
)

In [130]:
df_cdr = df_cdr.withColumn(
    "call_type",
    when(col("call_type").isin(["incoming", "outgoing"]), col("call_type")).otherwise("other")
)


In [131]:
df_network = df_network.withColumn(
    "network_type",
    when(col("network_type").isin(["4G", "5G", "VoLTE"]), col("network_type")).otherwise("Other")
)

In [132]:
# 7️⃣ Filter invalid numeric data (negative values)
df_customers = df_customers.filter((col("data_usage") >= 0) & (col("call_duration") >= 0))
df_cdr = df_cdr.filter((col("call_duration_seconds") >= 0) & (col("call_cost") >= 0))
df_network = df_network.filter((col("data_load_MB") >= 0) & (col("active_users") >= 0))

In [133]:
# 8️⃣Feature Engineering
# Aggregate total call duration per customer
df_total_call = df_cdr.groupBy("customer_id").sum("call_duration_seconds") \
    .withColumnRenamed("sum(call_duration_seconds)", "total_call_seconds")

In [134]:
# Aggregate total complaints per customer
df_total_complaints = df_complaints.groupBy("customer_id").count() \
    .withColumnRenamed("count", "total_complaints")

In [135]:
# Join aggregates with customer profile
df_customers_cleaned = df_customers.join(df_total_call, on="customer_id", how="left") \
    .join(df_total_complaints, on="customer_id", how="left") \
    .fillna({"total_call_seconds": 0, "total_complaints": 0})

In [136]:
# Encode churn label (already 0/1, but ensure type)
df_customers_cleaned = df_customers_cleaned.withColumn("churn", col("churn").cast("integer"))

In [137]:
# 9️⃣ Save cleaned data
df_customers_cleaned.write.csv("customers_cleaned.csv", header=True, mode="overwrite")
df_cdr.write.csv("cdr_cleaned.csv", header=True, mode="overwrite")
df_complaints.write.csv("complaints_cleaned.csv", header=True, mode="overwrite")
df_network.write.csv("network_usage_cleaned.csv", header=True, mode="overwrite")

In [138]:
# 4️⃣ Feature Engineering
# ---- CDR Aggregation
df_cdr_agg = df_cdr.groupBy("customer_id").agg(
    _sum("call_duration_seconds").alias("total_call_seconds_cdr"),
    _avg("call_duration_seconds").alias("avg_call_duration_cdr"),
    _count("record_id").alias("total_calls_cdr")
)

In [139]:
# ---- Peak calls example (calls > 600 sec)
df_peak_calls = df_cdr.filter(col("call_duration_seconds") > 600) \
    .groupBy("customer_id").agg(_count("record_id").alias("peak_calls_cdr"))

In [140]:
# ---- Complaints Aggregation
df_complaints_agg = df_complaints.groupBy("customer_id").agg(
    _count("complaint_id").alias("total_complaints_cpl"),
    _sum("resolved").alias("resolved_complaints_cpl")
)
df_complaints_agg = df_complaints_agg.withColumn(
    "resolved_ratio_cpl",
    col("resolved_complaints_cpl") / col("total_complaints_cpl")
)

In [141]:
# ---- Network Aggregation per customer via tower mapping (example)
# First join CDR to network by tower_id
df_cdr_network = df_cdr.select("customer_id", "tower_id").distinct() \
    .join(df_network.groupBy("tower_id").agg(
        _avg("data_load_MB").alias("avg_data_load_MB_net"),
        _avg("active_users").alias("avg_active_users_net")
    ), on="tower_id", how="left")

df_network_agg = df_cdr_network.groupBy("customer_id").agg(
    _avg("avg_data_load_MB_net").alias("avg_data_load_MB_net"),
    _avg("avg_active_users_net").alias("avg_active_users_net")
)

In [142]:
# ---- Additional flags
df_customers = df_customers.withColumn("high_data_usage", when(col("data_usage") > 5, 1).otherwise(0)) \
                           .withColumn("high_call_duration", when(col("call_duration") > 300, 1).otherwise(0))

In [143]:
# 5️⃣ Combine all features
df_features = df_customers.join(df_cdr_agg, on="customer_id", how="left") \
                          .join(df_peak_calls, on="customer_id", how="left") \
                          .join(df_complaints_agg, on="customer_id", how="left") \
                          .join(df_network_agg, on="customer_id", how="left") \
                          .fillna(0)

In [144]:
# 6️⃣ Handle duplicate columns automatically
from collections import Counter

def rename_duplicates(df):
    cols = df.columns
    counts = Counter()
    new_cols = []
    for c in cols:
        if counts[c]:
            new_name = f"{c}_{counts[c]}"
        else:
            new_name = c
        new_cols.append(new_name)
        counts[c] += 1
    for old, new in zip(cols, new_cols):
        if old != new:
            df = df.withColumnRenamed(old, new)
    return df

df_features = rename_duplicates(df_features)

In [145]:
# 8️⃣ Save final feature table
df_features.write.csv("customer_features.csv", header=True, mode="overwrite")

In [146]:
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline


In [147]:
# ---------- 1️⃣ Feature Preprocessing ----------

categorical_cols = ["location", "service_plan"]
numeric_cols = [c for c in df_features.columns if c not in ('customer_id','churn') + tuple(categorical_cols)]

In [148]:
# StringIndexer for categorical
indexers = [StringIndexer(inputCol=c, outputCol=c+"_idx", handleInvalid="keep") for c in categorical_cols]

In [149]:
# OneHotEncoder for categorical indices
encoders = [OneHotEncoder(inputCol=c+"_idx", outputCol=c+"_ohe") for c in categorical_cols]


In [150]:
# Assembler
assembler = VectorAssembler(inputCols=numeric_cols + [c+"_ohe" for c in categorical_cols], outputCol="features")

In [151]:
# Random Forest classifier
rf = RandomForestClassifier(labelCol="churn", featuresCol="features", numTrees=100, maxDepth=5, seed=42)

In [152]:
# Pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, rf])

In [153]:
# Train-test split
train_df, test_df = df_features.randomSplit([0.7, 0.3], seed=42)

In [154]:
# Fit pipeline
model = pipeline.fit(train_df)

In [155]:
# Predictions
predictions = model.transform(test_df)
predictions.select("customer_id", "churn", "prediction", "probability").show(5)

+-----------+-----+----------+--------------------+
|customer_id|churn|prediction|         probability|
+-----------+-----+----------+--------------------+
|     100074|    0|       1.0|[0.49671086418935...|
|     100390|    1|       0.0|[0.50628556901616...|
|     100435|    1|       0.0|[0.50493715676768...|
|     100450|    1|       1.0|[0.48633273430845...|
|     100592|    1|       1.0|[0.49688602519222...|
+-----------+-----+----------+--------------------+
only showing top 5 rows



In [156]:
# Evaluation
evaluator = BinaryClassificationEvaluator(labelCol="churn", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
roc_auc = evaluator.evaluate(predictions)
print(f"✅ Customer Churn Prediction ROC-AUC: {roc_auc:.4f}")

✅ Customer Churn Prediction ROC-AUC: 0.5043


In [157]:
from pyspark.sql.functions import hour, avg, max as _max

# Add hour column for sessionization
df_network = df_network.withColumn("hour", hour("timestamp"))

# Average data load and active users per tower per hour
network_stats = df_network.groupBy("tower_id", "hour").agg(
    avg("data_load_MB").alias("avg_data_load_MB"),
    avg("active_users").alias("avg_active_users"),
    _max("data_load_MB").alias("peak_data_load_MB")
).orderBy("tower_id", "hour")

network_stats.show(10)

# Identify top congested towers (highest peak load)
top_congestion = network_stats.orderBy(col("peak_data_load_MB").desc()).limit(10)
top_congestion.show()


+--------+----+----------------+----------------+-----------------+
|tower_id|hour|avg_data_load_MB|avg_active_users|peak_data_load_MB|
+--------+----+----------------+----------------+-----------------+
| TWR1000|   3|          604.04|           304.0|           604.04|
| TWR1001|  13|         1237.78|           110.0|          1237.78|
| TWR1002|  20|           511.2|           365.0|            511.2|
| TWR1003|  12|         4786.54|           428.0|          4786.54|
| TWR1004|  18|          3922.6|            48.0|           3922.6|
| TWR1004|  21|          519.91|           327.0|           519.91|
| TWR1006|   2|         1482.63|           279.0|          1482.63|
| TWR1007|   2|         3844.87|           496.0|          3844.87|
| TWR1007|  22|         4232.32|           199.0|          4232.32|
| TWR1008|   6|         2380.32|           274.0|          2380.32|
+--------+----+----------------+----------------+-----------------+
only showing top 10 rows

+--------+----+-------

In [158]:
from pyspark.sql.functions import sum as _sum, col

# Aggregate revenue from CDR
cdr_revenue = df_cdr.groupBy("customer_id").agg(_sum("call_cost").alias("total_revenue"))

# Join revenue to customer features
df_profit = df_features.join(cdr_revenue, on="customer_id", how="left").fillna(0)

# Assign service plan cost (simulated)
plan_cost = {"Basic": 5, "Standard": 10, "Premium": 20}
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

plan_cost_udf = udf(lambda plan: plan_cost.get(plan, 0), DoubleType())
df_profit = df_profit.withColumn("plan_cost", plan_cost_udf(col("service_plan")))

# Profit = revenue - cost
df_profit = df_profit.withColumn("profit", col("total_revenue") - col("plan_cost"))

# Aggregate by service plan
profit_summary = df_profit.groupBy("service_plan").agg(
    _sum("total_revenue").alias("total_revenue"),
    _sum("plan_cost").alias("total_cost"),
    _sum("profit").alias("total_profit"),
    _avg("profit").alias("avg_profit_per_customer")
)

profit_summary.show()


+------------+------------------+----------+------------+-----------------------+
|service_plan|     total_revenue|total_cost|total_profit|avg_profit_per_customer|
+------------+------------------+----------+------------+-----------------------+
|     Premium|17595.678100000016|      NULL|        NULL|                   NULL|
|       Basic| 17808.86990000003|      NULL|        NULL|                   NULL|
|    Standard|17188.157300000013|      NULL|        NULL|                   NULL|
+------------+------------------+----------+------------+-----------------------+

