# ─────────────────────────────────────────────
# 📘 PySpark Notebook for Data Preprocessing
# ─────────────────────────────────────────────

In [1]:
import os
!java -version
print("JAVA_HOME:", os.environ.get("JAVA_HOME"))

openjdk version "11.0.26" 2025-01-21
OpenJDK Runtime Environment (build 11.0.26+4-post-Ubuntu-1ubuntu122.04)
OpenJDK 64-Bit Server VM (build 11.0.26+4-post-Ubuntu-1ubuntu122.04, mixed mode, sharing)
JAVA_HOME: /usr/lib/jvm/java-11-openjdk-amd64


In [4]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# 🔹 Start Spark session
spark = SparkSession.builder \
    .appName("TelecomDataProcessing") \
    .getOrCreate()

# 🔹 Paths
RAW_PATH = "../data/raw/"
PROCESSED_PATH = "../data/processed/"

In [7]:
import os
import shutil

def save_dataframe(df, path, format="parquet", single_file=True, overwrite=True):
    """
    Save Spark DataFrame to disk in an easy-to-process format.

    Args:
        df (DataFrame): Spark DataFrame to save.
        path (str): Destination path (e.g., 'data/processed/users').
        format (str): 'csv' or 'parquet'.
        single_file (bool): If True, output will be a single file (coalesce + move).
        overwrite (bool): If True, overwrite existing files.

    Returns:
        str: Final path to the saved file or directory.
    """

    if overwrite and os.path.exists(path):
        shutil.rmtree(path)

    # Coalesce to 1 partition if single file desired
    writer = df.coalesce(1) if single_file else df

    # Set writer options
    writer = writer.write.option("header", True)
    if overwrite:
        writer = writer.mode("overwrite")

    # Write based on format
    if format == "csv":
        writer.csv(path)
    elif format == "parquet":
        writer.parquet(path)
    else:
        raise ValueError("Unsupported format. Use 'csv' or 'parquet'.")

    # If CSV and single_file=True, rename part file
    if format == "csv" and single_file:
        part_file = None
        for fname in os.listdir(path):
            if fname.startswith("part-") and fname.endswith(".csv"):
                part_file = fname
                break
        if part_file:
            final_path = path + ".csv"
            shutil.move(os.path.join(path, part_file), final_path)
            shutil.rmtree(path)
            return final_path

    return path

# ─────────────────────────────────────────────
# 1️⃣ Process: Traffic Volume (For Forecasting)
# ─────────────────────────────────────────────

In [8]:
# Load CDR data
cdr = spark.read.csv(RAW_PATH + "synthetic_cdr.csv", header=True, inferSchema=True)

# Aggregate traffic by tower and hour
traffic = cdr.withColumn("hour", F.date_trunc("hour", F.col("timestamp"))) \
    .groupBy("tower_id", "hour") \
    .agg(
        F.count("*").alias("total_calls"),
        F.sum("data_usage_mb").alias("total_data_used_mb")
    )

# Save using the helper
output_path = save_dataframe(
    df=traffic,
    path=PROCESSED_PATH + "traffic_volume",  # No .csv extension here!
    format="csv",
    single_file=True
)

print(f"✅ traffic_volume.csv saved to {output_path}")

✅ traffic_volume.csv saved to ../data/processed/traffic_volume.csv


# ─────────────────────────────────────────────
# 2️⃣ Process: Graph Edges (For Graph ML)
# ─────────────────────────────────────────────

In [9]:
edges = cdr.groupBy("user_id", "tower_id") \
    .agg(F.count("*").alias("interaction_weight"))

# Save using the helper
output_path = save_dataframe(
    df=edges,
    path=PROCESSED_PATH + "telecom_graph_edges",
    format="csv",
    single_file=True
)

print(f"✅ telecom_graph_edges.csv saved to {output_path}")

print("✅ telecom_graph_edges.csv saved.")

✅ telecom_graph_edges.csv saved to ../data/processed/telecom_graph_edges.csv
✅ telecom_graph_edges.csv saved.


# ─────────────────────────────────────────────
# 3️⃣ Process: Network Stats (For Anomaly/VAE)
# ─────────────────────────────────────────────

In [10]:
logs = spark.read.csv(RAW_PATH + "anomaly_logs.csv", header=True, inferSchema=True)

features = logs.select("latency_ms", "packet_loss", "cpu_load", "anomaly")

# Save using the helper
output_path = save_dataframe(
    df=features,
    path=PROCESSED_PATH + "network_behaviors",
    format="csv",
    single_file=True
)

print(f"✅ network_behaviors.csv saved to {output_path}")

print("✅ network_behaviors.csv saved.")

✅ network_behaviors.csv saved to ../data/processed/network_behaviors.csv
✅ network_behaviors.csv saved.


# ─────────────────────────────────────────────
# 4️⃣ Process: User Churn Features (For Churn Modeling)
# ─────────────────────────────────────────────

In [11]:
plans = spark.read.csv(PROCESSED_PATH + "user_plan_history.csv", header=True, inferSchema=True)

churn_ds = plans.groupBy("user_id") \
    .agg(
        F.max("churn_flag").alias("churned"),
        F.avg("minutes_used").alias("avg_minutes"),
        F.avg("data_used_gb").alias("avg_data_gb"),
        F.avg("satisfaction_score").alias("avg_satisfaction")
    )

# Save using the helper
output_path = save_dataframe(
    df=churn_ds,
    path=PROCESSED_PATH + "user_churn_dataset",
    format="csv",
    single_file=True
)

print(f"✅ user_churn_dataset.csv saved to {output_path}")

print("✅ user_churn_dataset.csv saved.")

✅ user_churn_dataset.csv saved to ../data/processed/user_churn_dataset.csv
✅ user_churn_dataset.csv saved.


# ─────────────────────────────────────────────
# 4️⃣ Process: Copy Raw files to Processed
# ─────────────────────────────────────────────

In [None]:
files_to_copy = [
    ("data/raw/user_profiles.csv", "data/processed/user_profiles.csv"),
    ("data/raw/tower_locations.csv", "data/processed/tower_locations.csv")
]

# Create processed folder if it doesn't exist
os.makedirs("data/processed", exist_ok=True)

# Copy files
for src, dst in files_to_copy:
    shutil.copyfile(src, dst)
    print(f"✅ Copied: {src} → {dst}")