In [15]:
import csv
import random
from datetime import datetime, timedelta
import argparse

def generate_dataset(output_path, num_rows):
    headers = [
        "customer_id",
        "gender",
        "SeniorCitizen",
        "Partner",
        "Dependents",
        "tenure",
        "PhoneService",
        "MultipleLines",
        "InternetService",
        "OnlineSecurity",
        "OnlineBackup",
        "event_date"
    ]

    genders = ["Male", "Female"]
    yes_no = ["Yes", "No"]
    internet_services = ["DSL", "Fiber optic", "No"]
    multi_lines = ["Yes", "No", "No phone service"]
    online_opts = ["Yes", "No", "No internet service"]

    start_date = datetime(2025, 12, 1)
    num_days = 10

    with open(output_path, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(headers)

        for i in range(1, num_rows + 1):
            cid = f"C{i:07d}"  # unique ID

            gender = random.choice(genders)
            senior = random.choice([0, 1])
            partner = random.choice(yes_no)
            dependents = random.choice(yes_no)
            tenure = random.randint(0, 72)

            phone = random.choice(yes_no)
            if phone == "No":
                multiline = "No phone service"
            else:
                multiline = random.choice(["Yes", "No"])

            internet = random.choice(internet_services)
            if internet == "No":
                online_sec = "No internet service"
                online_bkp = "No internet service"
            else:
                online_sec = random.choice(["Yes", "No"])
                online_bkp = random.choice(["Yes", "No"])

            event_date = (start_date + timedelta(days=random.randint(0, num_days))).strftime("%Y-%m-%d")

            row = [
                cid, gender, senior, partner, dependents, tenure,
                phone, multiline, internet, online_sec, online_bkp, event_date
            ]
            writer.writerow(row)

    print(f"✅ Generated {num_rows} rows at {output_path}")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Generate Telco-style CSV dataset for Spark batch inference")
    parser.add_argument("--output", required=True, help="Output CSV file path")
    parser.add_argument("--rows", type=int, default=1000, help="Number of rows to generate")

    # Provide default arguments for execution within a Colab notebook
    # In a real command-line execution, these would be passed via `python script.py --output data.csv --rows 500`
    args = parser.parse_args(["--output", "telco_dataset.csv", "--rows", "50000"]) # Example values
    generate_dataset(args.output, args.rows)


✅ Generated 50000 rows at telco_dataset.csv


In [2]:
import pandas as pd

df=pd.read_csv("telco_dataset.csv").head()

In [17]:
df.customer_id.count()

np.int64(5)

In [11]:
len(df)

5

In [14]:
df.shape[1]

12

In [18]:
import csv

with open("telco_dataset.csv", "r") as f:
    row_count = sum(1 for _ in f) - 1  # subtract header

print("Total rows:", row_count)


Total rows: 50000


In [4]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz
!pip install -q pyspark


In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("TelcoTrainColab") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

spark


In [6]:
df = spark.read.option("header", True).option("inferSchema", True).csv("telco_dataset.csv")
df.printSchema()
df.show(5)
print("Rows:", df.count())


root
 |-- customer_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- event_date: date (nullable = true)

+-----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+----------+
|customer_id|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|     OnlineSecurity|       OnlineBackup|event_date|
+-----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+----------+
|   C00

In [8]:
from pyspark.sql.functions import rand

df = df.withColumn("label", (rand() > 0.5).cast("int"))


In [11]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression

categorical_cols = [
    "gender", "Partner", "Dependents", "PhoneService",
    "MultipleLines", "InternetService", "OnlineSecurity", "OnlineBackup"
]
numeric_cols = ["SeniorCitizen", "tenure"]

indexers = [
    StringIndexer(inputCol=c, outputCol=c + "_idx", handleInvalid="keep")
    for c in categorical_cols
]

encoders = [
    OneHotEncoder(inputCol=c + "_idx", outputCol=c + "_ohe")
    for c in categorical_cols
]

assembler = VectorAssembler(
    inputCols=[c + "_ohe" for c in categorical_cols] + numeric_cols,
    outputCol="features"
)

lr = LogisticRegression(featuresCol="features", labelCol="label")

pipeline = Pipeline(stages=indexers + encoders + [assembler, lr])


In [10]:
model = pipeline.fit(df)


In [12]:
model_path = "/content/model/pipeline_model"
model.write().overwrite().save(model_path)

print("✅ Model saved at", model_path)


✅ Model saved at /content/model/pipeline_model


In [13]:
model

PipelineModel_3fd927ed657e

In [14]:
preds = model.transform(df.limit(5))
preds.select("customer_id", "prediction", "probability").show()


+-----------+----------+--------------------+
|customer_id|prediction|         probability|
+-----------+----------+--------------------+
|   C0000001|       0.0|[0.50200161276578...|
|   C0000002|       0.0|[0.51375958747949...|
|   C0000003|       1.0|[0.49887203888610...|
|   C0000004|       1.0|[0.49637088771665...|
|   C0000005|       0.0|[0.50865896179288...|
+-----------+----------+--------------------+



In [15]:
df.groupBy("label").count().show()


+-----+-----+
|label|count|
+-----+-----+
|    1|24924|
|    0|25076|
+-----+-----+



# Use Balanced Random **Labels** **bold text**

In [16]:
from pyspark.sql.functions import rand

df = df.withColumn("label", (rand() > 0.5).cast("int"))


In [13]:
df

DataFrame[customer_id: string, gender: string, SeniorCitizen: int, Partner: string, Dependents: string, tenure: int, PhoneService: string, MultipleLines: string, InternetService: string, OnlineSecurity: string, OnlineBackup: string, event_date: date, label: int]

# **Batch inference & Parquet output**

In [17]:
from pyspark.ml import PipelineModel
from pyspark.sql.functions import col

model = PipelineModel.load(model_path)

predictions = model.transform(df)

result = predictions.select(
    col("customer_id"),
    col("event_date"),
    col("prediction"),
    col("probability")
)

output_path = "/content/output_predictions"

result.write.mode("overwrite") \
    .partitionBy("event_date") \
    .parquet(output_path)

print("✅ Batch inference done. Output at:", output_path)


✅ Batch inference done. Output at: /content/output_predictions


In [18]:
spark.read.parquet(output_path).show(5)


+-----------+----------+--------------------+----------+
|customer_id|prediction|         probability|event_date|
+-----------+----------+--------------------+----------+
|   C0000004|       1.0|[0.49637088771665...|2025-12-11|
|   C0000015|       1.0|[0.49684819803810...|2025-12-11|
|   C0000028|       1.0|[0.49887567753226...|2025-12-11|
|   C0000039|       0.0|[0.50271165365204...|2025-12-11|
|   C0000065|       1.0|[0.49574045654020...|2025-12-11|
+-----------+----------+--------------------+----------+
only showing top 5 rows
