In [2]:
# ============================================================
# NOTEBOOK 03: ADVANCED DATA ENGINEERING & TELECOM ANALYTICS
# Project: CDR Telecom Big Data Engineering Final Year Internship
# Enhanced Version with Sophisticated Analytics
# ============================================================

# ------------------------------------------------------------
# Cell 1 – Setup, Imports, and Advanced Config
# ------------------------------------------------------------
import sys, os
from datetime import datetime, timedelta
from pyspark.sql import functions as F, types as T, Window
from pyspark.ml.feature import Bucketizer
import numpy as np

# Cluster bootstrap
sys.path.append('/home/jovyan/work/work/scripts')
from spark_init import init_spark
spark = init_spark("CDR Advanced Data Engineering & Analytics")

# Advanced Configuration
DATABASE_NAME = "algerie_telecom_cdr"
MAIN_TABLE = "cdr_anonymized"

# Core tables
CLEANED_TABLE = "cdr_cleaned"
ENRICHED_TABLE = "cdr_enriched"

# Time-based aggregation tables
TRAFFIC_DAILY = "traffic_daily"
TRAFFIC_HOURLY = "traffic_hourly"
TRAFFIC_15MIN = "traffic_15min"
TRAFFIC_WEEKLY = "traffic_weekly"
SPECIAL_DAYS_TABLE = "traffic_specialdays"

# Subscriber analytics tables
SUBSCRIBER_METRICS = "subscriber_metrics"
SUBSCRIBER_BEHAVIOR = "subscriber_behavior_patterns"
SUBSCRIBER_LIFETIME = "subscriber_lifetime_value"
CHURN_RISK = "subscriber_churn_risk"

# Network performance tables
NETWORK_PERF = "network_performance_agg"
CELL_DAILY_KPIS = "cell_daily_kpis"
CELL_HOURLY_KPIS = "cell_hourly_kpis"
CELL_HOURLY_SPIKES = "cell_hourly_spikes"
CELL_HEALTH_SCORE = "cell_health_scores"

# Advanced analytics tables
ROLLING_KPIS = "traffic_rolling_kpis"
SEASONAL_PATTERNS = "seasonal_patterns"
PEAK_ANALYSIS = "peak_traffic_analysis"
REVENUE_ANALYTICS = "revenue_analytics"
SERVICE_QUALITY = "service_quality_metrics"

# Regional/Special tables
YENNAYER_ANALYSIS = "yennayer_special_analysis"
REGIONAL_METRICS = "regional_metrics"
WILAYA_PERFORMANCE = "wilaya_performance"

spark.sql(f"USE {DATABASE_NAME}")

print(f"🔧 Advanced Processing Configuration:")
print(f"   Database: {DATABASE_NAME}")
print(f"   Source Table: {MAIN_TABLE}")
print(f"   Total Tables to Create: 24")
print(f"   Spark Version: {spark.version}")
print(f"   Processing Started: {datetime.now()}")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/21 16:07:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


✅ SparkSession initialized (App: CDR Advanced Data Engineering & Analytics, Spark: 3.5.1)
✅ Hive Warehouse: hdfs://namenode:9000/user/hive/warehouse
✅ Hive Metastore URI: thrift://hive-metastore:9083


25/06/21 16:07:37 WARN HiveConf: HiveConf of name hive.metastore.event.db.notification.api.auth does not exist


🔧 Advanced Processing Configuration:
   Database: algerie_telecom_cdr
   Source Table: cdr_anonymized
   Total Tables to Create: 24
   Spark Version: 3.5.1
   Processing Started: 2025-06-21 16:07:37.407052


In [6]:
import pyspark.sql.functions as F

# Replace "cdr_anonymized" with your table if different
df = spark.table("cdr_anonymized")

# 1. Check the min and max dates in the START_DATE or CDR_DAY column
df.select(
    F.min("CDR_DAY").alias("Earliest_Day"),
    F.max("CDR_DAY").alias("Latest_Day"),
    F.min("START_DATE").alias("Earliest_Start"),
    F.max("START_DATE").alias("Latest_Start")
).show(truncate=False)

+------------+----------+--------------+--------------+
|Earliest_Day|Latest_Day|Earliest_Start|Latest_Start  |
+------------+----------+--------------+--------------+
|2024-12-31  |2025-01-01|20241231211909|20250101133522|
+------------+----------+--------------+--------------+



25/06/21 16:38:31 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: Master removed our application: KILLED
25/06/21 16:38:31 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exiting due to error from cluster scheduler: Master removed our application: KILLED
	at org.apache.spark.errors.SparkCoreErrors$.clusterSchedulerError(SparkCoreErrors.scala:291)
	at org.apache.spark.scheduler.TaskSchedulerImpl.error(TaskSchedulerImpl.scala:981)
	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:165)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint.markDead(StandaloneAppClient.scala:263)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$receive$1.applyOrElse(StandaloneAppClient.scala:170)
	at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
	at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
	at org.apache.spark.rpc.netty.Inbox.proce

In [3]:
# ------------------------------------------------------------
# Cell 2 – Data Quality Profiling & Coverage Analysis
# ------------------------------------------------------------
print("\n📊 Advanced Data Quality Profiling...")

cdr_df = spark.table(MAIN_TABLE).cache()
total_records = cdr_df.count()

# Data quality scores
quality_metrics = cdr_df.select(
    F.count("*").alias("total_records"),
    F.countDistinct("PRI_IDENTITY_HASH").alias("unique_subscribers"),
    F.sum(F.when(F.col("ACTUAL_USAGE").isNull(), 1).otherwise(0)).alias("null_usage"),
    F.sum(F.when(F.col("DEBIT_AMOUNT").isNull(), 1).otherwise(0)).alias("null_revenue"),
    F.sum(F.when(F.col("CallingCellID").isNull(), 1).otherwise(0)).alias("null_cells"),
    F.min("START_DATE").alias("earliest_record"),
    F.max("START_DATE").alias("latest_record")
).collect()[0]

print(f"📈 Data Quality Summary:")
print(f"   Total Records: {quality_metrics['total_records']:,}")
print(f"   Unique Subscribers: {quality_metrics['unique_subscribers']:,}")
print(f"   Data Completeness: {(1 - quality_metrics['null_usage']/total_records)*100:.2f}%")
print(f"   Date Range: {quality_metrics['earliest_record']} to {quality_metrics['latest_record']}")

# Data coverage analysis
coverage = cdr_df.groupBy("CDR_DAY").agg(
    F.count("*").alias("records"),
    F.countDistinct("PRI_IDENTITY_HASH").alias("unique_subs"),
    F.min("START_DATE").alias("min_start"),
    F.max("START_DATE").alias("max_start"),
    F.countDistinct("CallingCellID").alias("active_cells")
).orderBy("CDR_DAY")

coverage_stats = coverage.agg(
    F.avg("records").alias("avg_daily_records"),
    F.stddev("records").alias("std_daily_records"),
    F.min("records").alias("min_daily_records"),
    F.max("records").alias("max_daily_records")
).collect()[0]

print(f"\n📅 Daily Coverage Statistics:")
print(f"   Average Daily Records: {coverage_stats['avg_daily_records']:,.0f}")
print(f"   Standard Deviation: {coverage_stats['std_daily_records']:,.0f}")
print(f"   Min/Max: {coverage_stats['min_daily_records']:,} / {coverage_stats['max_daily_records']:,}")


📊 Advanced Data Quality Profiling...


25/06/21 16:07:46 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

📈 Data Quality Summary:
   Total Records: 89,911
   Unique Subscribers: 40,843
   Data Completeness: 100.00%
   Date Range: 20241231211909 to 20250101133522

📅 Daily Coverage Statistics:
   Average Daily Records: 44,956
   Standard Deviation: 53,421
   Min/Max: 7,181 / 82,730


In [4]:
# ------------------------------------------------------------
# Cell 3 – Calendar Features & Algerian Holidays
# ------------------------------------------------------------
print("\n🗓️ Building Advanced Calendar Features...")

# Algerian holidays within our data period (2024-12-31 to 2025-01-15)
algerian_holidays = {
    "2024-12-31": ("New Year's Eve", "International"),
    "2025-01-01": ("New Year's Day", "International"),
    "2025-01-12": ("Yennayer", "Cultural"),
    # Note: We only have data until 2025-01-15
}

# Create holiday lookup DataFrame
holiday_data = [(date, name, type_) for date, (name, type_) in algerian_holidays.items()]
holidays_df = spark.createDataFrame(holiday_data, ["date", "holiday_name", "holiday_type"])

# Enrich CDR data with calendar features
cdr_df = cdr_df.withColumn("day_of_week", F.dayofweek("CDR_DAY"))
cdr_df = cdr_df.withColumn("day_name", 
    F.when(F.col("day_of_week") == 1, "Sunday")
     .when(F.col("day_of_week") == 2, "Monday")
     .when(F.col("day_of_week") == 3, "Tuesday")
     .when(F.col("day_of_week") == 4, "Wednesday")
     .when(F.col("day_of_week") == 5, "Thursday")
     .when(F.col("day_of_week") == 6, "Friday")
     .otherwise("Saturday")
)

# Algerian weekend (Friday=6, Saturday=7)
cdr_df = cdr_df.withColumn("is_weekend", F.col("day_of_week").isin([6,7]).cast("int"))

# Add month, quarter, week of year
cdr_df = cdr_df.withColumn("month", F.month("CDR_DAY"))
cdr_df = cdr_df.withColumn("quarter", F.quarter("CDR_DAY"))
cdr_df = cdr_df.withColumn("week_of_year", F.weekofyear("CDR_DAY"))

# Join with holidays
cdr_df = cdr_df.join(
    holidays_df, 
    cdr_df.CDR_DAY.cast("string") == holidays_df.date, 
    "left"
).withColumn("is_holiday", F.when(F.col("holiday_name").isNotNull(), 1).otherwise(0))


🗓️ Building Advanced Calendar Features...


In [5]:
# ------------------------------------------------------------
# Cell 4 – Advanced Data Cleaning & Validation
# ------------------------------------------------------------
print("\n🧹 Advanced Data Cleaning & Validation...")

# Numeric columns with validation
numeric_validations = {
    'ACTUAL_USAGE': (0, 86400),  # Max 24 hours in seconds
    'RATE_USAGE': (0, 86400),
    'DEBIT_AMOUNT': (0, 100000),  # Reasonable max amount
    'UN_DEBIT_AMOUNT': (0, 100000),
    'TOTAL_TAX': (0, 10000),
    'ChargingTime': (0, 3600)  # Max 1 hour charging time
}

for col, (min_val, max_val) in numeric_validations.items():
    if col in cdr_df.columns:
        cdr_df = cdr_df.withColumn(f"{col}_clean", 
            F.when(F.col(col).between(min_val, max_val), F.col(col).cast("double"))
             .otherwise(None)
        )
        # Track outliers
        cdr_df = cdr_df.withColumn(f"{col}_outlier",
            F.when(F.col(col) > max_val, 1)
             .when(F.col(col) < min_val, 1)
             .otherwise(0)
        )

# Date columns with validation
date_columns = ['START_DATE', 'END_DATE', 'CREATE_DATE', 'CUST_LOCAL_START_DATE', 'CUST_LOCAL_END_DATE']
for col in date_columns:
    if col in cdr_df.columns and col != "CDR_DAY":
        cdr_df = cdr_df.withColumn(f"{col}_clean", 
            F.to_timestamp(F.col(col), "yyyyMMddHHmmss")
        )

# Data quality score per record
quality_cols = [f"{col}_outlier" for col in numeric_validations.keys() if f"{col}_outlier" in cdr_df.columns]
cdr_df = cdr_df.withColumn("data_quality_score",
    F.lit(100) - (F.array_sum(F.array(*[F.col(c) for c in quality_cols])) * 10)
)

# Processing metadata
cdr_df = cdr_df.withColumn("processing_timestamp", F.current_timestamp())
cdr_df = cdr_df.withColumn("processing_date", F.current_date())

cdr_df.write.mode("overwrite").saveAsTable(CLEANED_TABLE)
print(f"✅ Advanced cleaned table saved: {CLEANED_TABLE}")


🧹 Advanced Data Cleaning & Validation...


AttributeError: module 'pyspark.sql.functions' has no attribute 'array_sum'

In [None]:
# ------------------------------------------------------------
# Cell 5 – Feature Engineering & Derived Metrics
# ------------------------------------------------------------
print("\n🔧 Advanced Feature Engineering...")

enriched_df = cdr_df

# Call duration categorization with more granularity
enriched_df = enriched_df.withColumn("call_duration_category",
    F.when(F.col("ACTUAL_USAGE_clean") == 0, "Failed")
     .when(F.col("ACTUAL_USAGE_clean") <= 10, "Very Short")
     .when(F.col("ACTUAL_USAGE_clean") <= 30, "Short")
     .when(F.col("ACTUAL_USAGE_clean") <= 120, "Normal")
     .when(F.col("ACTUAL_USAGE_clean") <= 300, "Medium")
     .when(F.col("ACTUAL_USAGE_clean") <= 1800, "Long")
     .otherwise("Very Long")
)

# Temporal features
enriched_df = enriched_df \
    .withColumn("call_hour", F.hour(F.col("START_DATE_clean"))) \
    .withColumn("call_minute", F.minute(F.col("START_DATE_clean"))) \
    .withColumn("call_15min_bucket", F.floor(F.col("call_hour") * 4 + F.col("call_minute") / 15)) \
    .withColumn("call_dayofweek", F.dayofweek(F.col("START_DATE_clean"))) \
    .withColumn("call_month", F.month(F.col("START_DATE_clean"))) \
    .withColumn("is_weekend_call", F.when(F.dayofweek(F.col("START_DATE_clean")).isin([6,7]), 1).otherwise(0))

# Advanced time period categorization
enriched_df = enriched_df.withColumn("time_period",
    F.when(F.hour(F.col("START_DATE_clean")).between(0, 5), "Late Night")
     .when(F.hour(F.col("START_DATE_clean")).between(6, 8), "Early Morning")
     .when(F.hour(F.col("START_DATE_clean")).between(9, 11), "Morning")
     .when(F.hour(F.col("START_DATE_clean")).between(12, 13), "Noon")
     .when(F.hour(F.col("START_DATE_clean")).between(14, 17), "Afternoon")
     .when(F.hour(F.col("START_DATE_clean")).between(18, 20), "Evening")
     .when(F.hour(F.col("START_DATE_clean")).between(21, 23), "Night")
     .otherwise("Late Night")
)

# Service type mapping with more detail
enriched_df = enriched_df.withColumn("service_type_group",
    F.when(F.col("SERVICE_CATEGORY") == "1", "Voice")
     .when(F.col("SERVICE_CATEGORY") == "2", "SMS")
     .when(F.col("SERVICE_CATEGORY") == "3", "Data")
     .when(F.col("SERVICE_CATEGORY") == "4", "MMS")
     .when(F.col("SERVICE_CATEGORY") == "5", "VAS")
     .otherwise("Other")
)

# Revenue categorization with percentiles
revenue_percentiles = enriched_df.filter(F.col("DEBIT_AMOUNT_clean") > 0).select(
    F.expr("percentile_approx(DEBIT_AMOUNT_clean, 0.25)").alias("p25"),
    F.expr("percentile_approx(DEBIT_AMOUNT_clean, 0.50)").alias("p50"),
    F.expr("percentile_approx(DEBIT_AMOUNT_clean, 0.75)").alias("p75"),
    F.expr("percentile_approx(DEBIT_AMOUNT_clean, 0.95)").alias("p95")
).collect()[0]

enriched_df = enriched_df.withColumn("revenue_category",
    F.when(F.col("DEBIT_AMOUNT_clean") == 0, "Free")
     .when(F.col("DEBIT_AMOUNT_clean") <= revenue_percentiles["p25"], "Low")
     .when(F.col("DEBIT_AMOUNT_clean") <= revenue_percentiles["p50"], "Medium-Low")
     .when(F.col("DEBIT_AMOUNT_clean") <= revenue_percentiles["p75"], "Medium-High")
     .when(F.col("DEBIT_AMOUNT_clean") <= revenue_percentiles["p95"], "High")
     .otherwise("Premium")
)

# Call success and quality metrics
enriched_df = enriched_df.withColumn("call_success", 
    F.when(F.col("ACTUAL_USAGE_clean") > 0, 1).otherwise(0)
)
enriched_df = enriched_df.withColumn("call_completion_rate",
    F.when(F.col("ACTUAL_USAGE_clean").isNotNull() & F.col("RATE_USAGE_clean").isNotNull(),
        F.col("ACTUAL_USAGE_clean") / F.col("RATE_USAGE_clean")
    ).otherwise(0)
)

# Network quality indicators
enriched_df = enriched_df.withColumn("network_quality_indicator",
    F.when(F.col("call_success") == 0, "Poor")
     .when(F.col("call_completion_rate") < 0.5, "Fair")
     .when(F.col("call_completion_rate") < 0.8, "Good")
     .otherwise("Excellent")
)

# Roaming and location features
enriched_df = enriched_df.withColumn("is_roaming",
    F.when(F.col("RoamState") == "1", 1).otherwise(0)
)

# Processing metadata
enriched_df = enriched_df.withColumn("processing_batch_id", 
    F.lit(datetime.now().strftime("%Y%m%d_%H%M%S"))
)
enriched_df = enriched_df.withColumn("record_id", F.monotonically_increasing_id())

enriched_df.write.mode("overwrite").saveAsTable(ENRICHED_TABLE)
print(f"✅ Advanced enriched table saved: {ENRICHED_TABLE}")


In [None]:
# ------------------------------------------------------------
# Cell 6 – Multi-Granularity Time Aggregations
# ------------------------------------------------------------
print("\n📊 Building Multi-Granularity Aggregations...")

# --- 15-Minute Aggregations ---
traffic_15min = enriched_df.groupBy("CDR_DAY", "call_15min_bucket").agg(
    F.count("*").alias("calls_15min"),
    F.countDistinct("PRI_IDENTITY_HASH").alias("unique_users_15min"),
    F.sum(F.when(F.col("call_success") == 1, 1).otherwise(0)).alias("successful_calls"),
    F.sum(F.when(F.col("call_success") == 0, 1).otherwise(0)).alias("failed_calls"),
    F.sum("DEBIT_AMOUNT_clean").alias("revenue_15min"),
    F.avg("ACTUAL_USAGE_clean").alias("avg_duration_15min"),
    F.max("ACTUAL_USAGE_clean").alias("max_duration_15min"),
    F.stddev("ACTUAL_USAGE_clean").alias("stddev_duration_15min")
).withColumn("failure_rate_15min", 
    F.round(F.col("failed_calls") / F.col("calls_15min") * 100, 2)
)
traffic_15min.write.mode("overwrite").saveAsTable(TRAFFIC_15MIN)
print(f"✅ 15-minute traffic table created: {TRAFFIC_15MIN}")

# --- Hourly Aggregations (Enhanced) ---
traffic_hourly = enriched_df.groupBy("CDR_DAY", "call_hour").agg(
    F.count("*").alias("hourly_calls"),
    F.countDistinct("PRI_IDENTITY_HASH").alias("unique_users_hourly"),
    F.sum(F.when(F.col("call_success") == 1, 1).otherwise(0)).alias("successful_calls"),
    F.sum(F.when(F.col("call_success") == 0, 1).otherwise(0)).alias("failed_calls"),
    F.sum("DEBIT_AMOUNT_clean").alias("hourly_revenue"),
    F.avg("ACTUAL_USAGE_clean").alias("avg_duration"),
    F.stddev("ACTUAL_USAGE_clean").alias("stddev_duration"),
    F.expr("percentile_approx(ACTUAL_USAGE_clean, 0.5)").alias("median_duration"),
    F.expr("percentile_approx(ACTUAL_USAGE_clean, 0.95)").alias("p95_duration"),
    F.max("is_holiday").alias("is_holiday"),
    F.max("holiday_name").alias("holiday_name"),
    F.avg("data_quality_score").alias("avg_quality_score")
).withColumn("calls_per_user", 
    F.round(F.col("hourly_calls") / F.col("unique_users_hourly"), 2)
).withColumn("success_rate", 
    F.round(F.col("successful_calls") / F.col("hourly_calls") * 100, 2)
).orderBy("CDR_DAY", "call_hour")

traffic_hourly.write.mode("overwrite").saveAsTable(TRAFFIC_HOURLY)
print(f"✅ Enhanced hourly traffic table created: {TRAFFIC_HOURLY}")

# --- Daily Aggregations (Comprehensive) ---
traffic_daily = enriched_df.groupBy("CDR_DAY").agg(
    # Volume metrics
    F.count("*").alias("total_records"),
    F.countDistinct("PRI_IDENTITY_HASH").alias("unique_subscribers"),
    F.countDistinct("CallingCellID").alias("active_cells"),
    
    # Success metrics
    F.sum(F.when(F.col("call_success") == 0, 1).otherwise(0)).alias("failed_calls"),
    F.sum(F.when(F.col("call_success") == 1, 1).otherwise(0)).alias("successful_calls"),
    
    # Revenue metrics
    F.sum("DEBIT_AMOUNT_clean").alias("total_revenue"),
    F.avg("DEBIT_AMOUNT_clean").alias("avg_revenue_per_call"),
    F.sum(F.when(F.col("DEBIT_AMOUNT_clean") > 0, 1).otherwise(0)).alias("paid_calls"),
    
    # Duration metrics
    F.avg("ACTUAL_USAGE_clean").alias("avg_call_duration"),
    F.sum("ACTUAL_USAGE_clean").alias("total_call_minutes"),
    F.stddev("ACTUAL_USAGE_clean").alias("stddev_duration"),
    F.expr("percentile_approx(ACTUAL_USAGE_clean, 0.5)").alias("median_duration"),
    
    # Quality metrics
    F.avg("data_quality_score").alias("avg_quality_score"),
    F.avg("call_completion_rate").alias("avg_completion_rate"),
    
    # Calendar features
    F.max("is_weekend").alias("is_weekend"),
    F.max("is_holiday").alias("is_holiday"),
    F.max("holiday_name").alias("holiday_name"),
    F.max("day_name").alias("day_name")
).withColumn("success_rate", 
    F.round(F.col("successful_calls") / F.col("total_records") * 100, 2)
).withColumn("arpu", 
    F.round(F.col("total_revenue") / F.col("unique_subscribers"), 2)
).withColumn("avg_calls_per_user", 
    F.round(F.col("total_records") / F.col("unique_subscribers"), 2)
).orderBy("CDR_DAY")

traffic_daily.write.mode("overwrite").saveAsTable(TRAFFIC_DAILY)
print(f"✅ Comprehensive daily traffic table created: {TRAFFIC_DAILY}")

# --- Weekly Aggregations ---
traffic_weekly = enriched_df.groupBy("week_of_year").agg(
    F.count("*").alias("weekly_calls"),
    F.countDistinct("PRI_IDENTITY_HASH").alias("unique_users_weekly"),
    F.sum("DEBIT_AMOUNT_clean").alias("weekly_revenue"),
    F.avg("ACTUAL_USAGE_clean").alias("avg_duration_weekly"),
    F.countDistinct("CDR_DAY").alias("active_days"),
    F.countDistinct("CallingCellID").alias("active_cells_weekly")
).orderBy("week_of_year")

traffic_weekly.write.mode("overwrite").saveAsTable(TRAFFIC_WEEKLY)
print(f"✅ Weekly traffic table created: {TRAFFIC_WEEKLY}")


In [None]:
# ------------------------------------------------------------
# Cell 7 – Special Days & Regional Analysis (Yennayer Focus)
# ------------------------------------------------------------
print("\n🎊 Special Days & Regional Analysis...")

# Yennayer (Amazigh New Year) Analysis
yennayer_df = enriched_df.filter(
    (F.col("CDR_DAY").cast("string") == "2025-01-12") | 
    (F.col("CDR_DAY").cast("string").between("2025-01-11", "2025-01-13"))  # Include surrounding days
).withColumn("yennayer_period",
    F.when(F.col("CDR_DAY").cast("string") == "2025-01-11", "Eve")
     .when(F.col("CDR_DAY").cast("string") == "2025-01-12", "Yennayer Day")
     .when(F.col("CDR_DAY").cast("string") == "2025-01-13", "Day After")
)

# Yennayer cell-level analysis
yennayer_cells = yennayer_df.groupBy("CallingCellID", "yennayer_period").agg(
    F.count("*").alias("calls"),
    F.countDistinct("PRI_IDENTITY_HASH").alias("unique_users"),
    F.sum(F.when(F.col("call_success") == 1, 1).otherwise(0)).alias("successful_calls"),
    F.sum("DEBIT_AMOUNT_clean").alias("revenue"),
    F.avg("ACTUAL_USAGE_clean").alias("avg_duration"),
    F.sum(F.when(F.col("time_period").isin(["Evening", "Night"]), 1).otherwise(0)).alias("evening_calls")
).withColumn("evening_call_ratio", 
    F.round(F.col("evening_calls") / F.col("calls") * 100, 2)
)

# Identify cells with highest Yennayer activity
yennayer_spike_cells = yennayer_cells.filter(
    F.col("yennayer_period") == "Yennayer Day"
).orderBy(F.desc("calls"))

yennayer_spike_cells.write.mode("overwrite").saveAsTable(YENNAYER_ANALYSIS)
print(f"✅ Yennayer analysis table created: {YENNAYER_ANALYSIS}")

# All special days analysis
special_days_traffic = enriched_df.filter(F.col("is_holiday") == 1).groupBy(
    "CDR_DAY", "holiday_name", "holiday_type"
).agg(
    F.count("*").alias("holiday_calls"),
    F.countDistinct("PRI_IDENTITY_HASH").alias("unique_users"),
    F.sum("DEBIT_AMOUNT_clean").alias("holiday_revenue"),
    F.avg("ACTUAL_USAGE_clean").alias("avg_duration"),
    F.sum(F.when(F.col("call_hour").between(18, 23), 1).otherwise(0)).alias("evening_calls"),
    F.sum(F.when(F.col("service_type_group") == "Voice", 1).otherwise(0)).alias("voice_calls"),
    F.sum(F.when(F.col("service_type_group") == "SMS", 1).otherwise(0)).alias("sms_messages")
).orderBy("CDR_DAY")

special_days_traffic.write.mode("overwrite").saveAsTable(SPECIAL_DAYS_TABLE)
print(f"✅ Special days analysis table created: {SPECIAL_DAYS_TABLE}")

In [None]:
# ------------------------------------------------------------
# Cell 8 – Advanced Subscriber Analytics
# ------------------------------------------------------------
print("\n👥 Advanced Subscriber Analytics...")

# Subscriber behavior patterns
subscriber_window = Window.partitionBy("PRI_IDENTITY_HASH").orderBy("START_DATE_clean")

subscriber_behavior = enriched_df.withColumn(
    "prev_call_time", F.lag("START_DATE_clean").over(subscriber_window)
).withColumn(
    "time_since_last_call", 
    F.unix_timestamp("START_DATE_clean") - F.unix_timestamp("prev_call_time")
).groupBy("PRI_IDENTITY_HASH").agg(
    # Volume metrics
    F.count("*").alias("total_calls"),
    F.countDistinct("CDR_DAY").alias("active_days"),
    F.countDistinct("week_of_year").alias("active_weeks"),
    
    # Success metrics
    F.sum(F.when(F.col("call_success") == 1, 1).otherwise(0)).alias("successful_calls"),
    F.sum(F.when(F.col("call_success") == 0, 1).otherwise(0)).alias("failed_calls"),
    
    # Revenue metrics
    F.sum("DEBIT_AMOUNT_clean").alias("total_revenue"),
    F.avg("DEBIT_AMOUNT_clean").alias("avg_revenue_per_call"),
    F.stddev("DEBIT_AMOUNT_clean").alias("revenue_volatility"),
    
    # Usage patterns
    F.avg("ACTUAL_USAGE_clean").alias("avg_call_duration"),
    F.stddev("ACTUAL_USAGE_clean").alias("duration_volatility"),
    F.max("ACTUAL_USAGE_clean").alias("longest_call"),
    
    # Time patterns
    F.avg("time_since_last_call").alias("avg_time_between_calls"),
    F.min("START_DATE_clean").alias("first_activity"),
    F.max("START_DATE_clean").alias("last_activity"),
    
    # Preferred time periods
    F.sum(F.when(F.col("time_period") == "Morning", 1).otherwise(0)).alias("morning_calls"),
    F.sum(F.when(F.col("time_period") == "Evening", 1).otherwise(0)).alias("evening_calls"),
    F.sum(F.when(F.col("is_weekend_call") == 1, 1).otherwise(0)).alias("weekend_calls"),
    
    # Service preferences
    F.sum(F.when(F.col("service_type_group") == "Voice", 1).otherwise(0)).alias("voice_calls"),
    F.sum(F.when(F.col("service_type_group") == "SMS", 1).otherwise(0)).alias("sms_count"),
    F.sum(F.when(F.col("service_type_group") == "Data", 1).otherwise(0)).alias("data_sessions"),
    
    # Network quality experience
    F.avg("call_completion_rate").alias("avg_completion_rate"),
    F.avg("data_quality_score").alias("avg_quality_score")
)

# Calculate derived metrics
subscriber_behavior = subscriber_behavior.withColumn(
    "success_rate", F.round(F.col("successful_calls") / F.col("total_calls") * 100, 2)
).withColumn(
    "daily_arpu", F.round(F.col("total_revenue") / F.col("active_days"), 2)
).withColumn(
    "weekend_preference", F.round(F.col("weekend_calls") / F.col("total_calls") * 100, 2)
).withColumn(
    "evening_preference", F.round(F.col("evening_calls") / F.col("total_calls") * 100, 2)
).withColumn(
    "days_since_last_activity", 
    F.datediff(F.current_date(), F.col("last_activity"))
).withColumn(
    "customer_lifetime_days", 
    F.datediff(F.col("last_activity"), F.col("first_activity"))
)

# Subscriber segmentation
subscriber_behavior = subscriber_behavior.withColumn(
    "usage_segment",
    F.when(F.col("total_calls") >= 100, "Power User")
     .when(F.col("total_calls") >= 50, "Heavy User")
     .when(F.col("total_calls") >= 20, "Regular User")
     .when(F.col("total_calls") >= 5, "Light User")
     .otherwise("Minimal User")
).withColumn(
    "revenue_segment",
    F.when(F.col("total_revenue") >= 10000, "Premium")
     .when(F.col("total_revenue") >= 5000, "High Value")
     .when(F.col("total_revenue") >= 1000, "Medium Value")
     .when(F.col("total_revenue") > 0, "Low Value")
     .otherwise("No Revenue")
).withColumn(
    "activity_segment",
    F.when(F.col("active_days") >= 25, "Daily Active")
     .when(F.col("active_days") >= 15, "Very Active")
     .when(F.col("active_days") >= 7, "Active")
     .when(F.col("active_days") >= 3, "Occasional")
     .otherwise("Rare")
)

subscriber_behavior.write.mode("overwrite").saveAsTable(SUBSCRIBER_BEHAVIOR)
print(f"✅ Subscriber behavior patterns table created: {SUBSCRIBER_BEHAVIOR}")

# Churn risk analysis
churn_risk = subscriber_behavior.withColumn(
    "churn_risk_score",
    F.when(F.col("days_since_last_activity") > 30, 100)
     .when(F.col("days_since_last_activity") > 14, 75)
     .when(F.col("days_since_last_activity") > 7, 50)
     .when(F.col("days_since_last_activity") > 3, 25)
     .otherwise(0)
).withColumn(
    "churn_risk_category",
    F.when(F.col("churn_risk_score") >= 75, "High Risk")
     .when(F.col("churn_risk_score") >= 50, "Medium Risk")
     .when(F.col("churn_risk_score") >= 25, "Low Risk")
     .otherwise("Active")
).select(
    "PRI_IDENTITY_HASH", "churn_risk_score", "churn_risk_category",
    "days_since_last_activity", "total_calls", "total_revenue",
    "customer_lifetime_days", "usage_segment", "revenue_segment"
)

churn_risk.write.mode("overwrite").saveAsTable(CHURN_RISK)
print(f"✅ Churn risk analysis table created: {CHURN_RISK}")

# Customer Lifetime Value (CLV) calculation
clv_df = subscriber_behavior.withColumn(
    "monthly_revenue", F.col("total_revenue") / (F.col("customer_lifetime_days") / 30)
).withColumn(
    "predicted_lifetime_months", 
    F.when(F.col("churn_risk_score") >= 75, 1)
     .when(F.col("churn_risk_score") >= 50, 3)
     .when(F.col("churn_risk_score") >= 25, 6)
     .otherwise(12)
).withColumn(
    "estimated_clv", F.col("monthly_revenue") * F.col("predicted_lifetime_months")
).select(
    "PRI_IDENTITY_HASH", "total_revenue", "monthly_revenue",
    "predicted_lifetime_months", "estimated_clv", "usage_segment", "revenue_segment"
)

clv_df.write.mode("overwrite").saveAsTable(SUBSCRIBER_LIFETIME)
print(f"✅ Customer lifetime value table created: {SUBSCRIBER_LIFETIME}")

In [None]:
# ------------------------------------------------------------
# Cell 9 – Network Performance & Cell Analytics
# ------------------------------------------------------------
print("\n📡 Advanced Network Performance Analytics...")

# Cell-level daily KPIs with advanced metrics
cell_daily_kpis = enriched_df.groupBy("CallingCellID", "CDR_DAY").agg(
    # Volume metrics
    F.count("*").alias("total_calls"),
    F.countDistinct("PRI_IDENTITY_HASH").alias("unique_users"),
    F.countDistinct("call_hour").alias("active_hours"),
    
    # Success metrics
    F.sum(F.when(F.col("call_success") == 1, 1).otherwise(0)).alias("successful_calls"),
    F.sum(F.when(F.col("call_success") == 0, 1).otherwise(0)).alias("failed_calls"),
    
    # Revenue metrics
    F.sum("DEBIT_AMOUNT_clean").alias("total_revenue"),
    F.avg("DEBIT_AMOUNT_clean").alias("avg_revenue_per_call"),
    
    # Duration metrics
    F.avg("ACTUAL_USAGE_clean").alias("avg_call_duration"),
    F.sum("ACTUAL_USAGE_clean").alias("total_minutes"),
    F.stddev("ACTUAL_USAGE_clean").alias("duration_variance"),
    
    # Quality metrics
    F.avg("call_completion_rate").alias("avg_completion_rate"),
    F.avg("data_quality_score").alias("avg_quality_score"),
    
    # Time distribution
    F.sum(F.when(F.col("time_period").isin(["Morning", "Early Morning"]), 1).otherwise(0)).alias("morning_calls"),
    F.sum(F.when(F.col("time_period").isin(["Evening", "Night", "Late Night"]), 1).otherwise(0)).alias("evening_calls"),
    F.sum(F.when(F.col("is_weekend_call") == 1, 1).otherwise(0)).alias("weekend_calls")
).withColumn(
    "failure_rate", F.round(F.col("failed_calls") / F.col("total_calls") * 100, 2)
).withColumn(
    "calls_per_user", F.round(F.col("total_calls") / F.col("unique_users"), 2)
).withColumn(
    "revenue_per_user", F.round(F.col("total_revenue") / F.col("unique_users"), 2)
).withColumn(
    "cell_load_score", 
    F.when(F.col("total_calls") > 1000, "Very High")
     .when(F.col("total_calls") > 500, "High")
     .when(F.col("total_calls") > 100, "Medium")
     .otherwise("Low")
)

cell_daily_kpis.write.mode("overwrite").saveAsTable(CELL_DAILY_KPIS)
print(f"✅ Cell-level daily KPIs table created: {CELL_DAILY_KPIS}")

# Cell health scoring
cell_health = cell_daily_kpis.groupBy("CallingCellID").agg(
    F.avg("failure_rate").alias("avg_failure_rate"),
    F.stddev("failure_rate").alias("failure_rate_volatility"),
    F.avg("total_calls").alias("avg_daily_calls"),
    F.avg("unique_users").alias("avg_daily_users"),
    F.sum("total_revenue").alias("total_cell_revenue"),
    F.count("*").alias("active_days")
).withColumn(
    "reliability_score", 
    F.round(100 - F.col("avg_failure_rate"), 2)
).withColumn(
    "stability_score",
    F.round(100 - F.least(F.col("failure_rate_volatility") * 10, F.lit(100)), 2)
).withColumn(
    "cell_health_score",
    F.round((F.col("reliability_score") + F.col("stability_score")) / 2, 2)
).withColumn(
    "cell_category",
    F.when(F.col("cell_health_score") >= 90, "Excellent")
     .when(F.col("cell_health_score") >= 80, "Good")
     .when(F.col("cell_health_score") >= 70, "Fair")
     .when(F.col("cell_health_score") >= 60, "Poor")
     .otherwise("Critical")
)

cell_health.write.mode("overwrite").saveAsTable(CELL_HEALTH_SCORE)
print(f"✅ Cell health scores table created: {CELL_HEALTH_SCORE}")

# Hourly cell patterns for capacity planning
cell_hourly = enriched_df.groupBy("CallingCellID", "CDR_DAY", "call_hour").agg(
    F.count("*").alias("hourly_calls"),
    F.countDistinct("PRI_IDENTITY_HASH").alias("unique_users"),
    F.sum(F.when(F.col("call_success") == 0, 1).otherwise(0)).alias("failed_calls"),
    F.avg("ACTUAL_USAGE_clean").alias("avg_duration"),
    F.sum("DEBIT_AMOUNT_clean").alias("hourly_revenue")
).withColumn(
    "hour_failure_rate", 
    F.round(F.col("failed_calls") / F.col("hourly_calls") * 100, 2)
)

# Detect hourly spikes using statistical methods
window_cell_stats = Window.partitionBy("CallingCellID", "CDR_DAY")
cell_hourly_stats = cell_hourly.withColumn(
    "daily_avg_calls", F.avg("hourly_calls").over(window_cell_stats)
).withColumn(
    "daily_stddev_calls", F.stddev("hourly_calls").over(window_cell_stats)
).withColumn(
    "z_score", 
    F.when(F.col("daily_stddev_calls") > 0,
        (F.col("hourly_calls") - F.col("daily_avg_calls")) / F.col("daily_stddev_calls")
    ).otherwise(0)
).withColumn(
    "spike_indicator",
    F.when(F.col("z_score") > 3, "Extreme Spike")
     .when(F.col("z_score") > 2, "Major Spike")
     .when(F.col("z_score") > 1, "Minor Spike")
     .when(F.col("z_score") < -2, "Major Drop")
     .when(F.col("z_score") < -1, "Minor Drop")
     .otherwise("Normal")
)

cell_hourly_stats.write.mode("overwrite").saveAsTable(CELL_HOURLY_SPIKES)
print(f"✅ Cell hourly patterns with spike detection table created: {CELL_HOURLY_SPIKES}")


In [None]:
# ------------------------------------------------------------
# Cell 10 – Advanced Time Series Analytics
# ------------------------------------------------------------
print("\n📈 Advanced Time Series & Trend Analytics...")

# Multiple rolling windows for trend analysis
window_3d = Window.orderBy("CDR_DAY").rowsBetween(-2, 0)
window_7d = Window.orderBy("CDR_DAY").rowsBetween(-6, 0)
window_14d = Window.orderBy("CDR_DAY").rowsBetween(-13, 0)
window_30d = Window.orderBy("CDR_DAY").rowsBetween(-29, 0)

traffic_daily_df = spark.table(TRAFFIC_DAILY)

rolling_df = traffic_daily_df.withColumn(
    "calls_3d_avg", F.avg("total_records").over(window_3d)
).withColumn(
    "calls_7d_avg", F.avg("total_records").over(window_7d)
).withColumn(
    "calls_14d_avg", F.avg("total_records").over(window_14d)
).withColumn(
    "calls_30d_avg", F.avg("total_records").over(window_30d)
).withColumn(
    "revenue_7d_avg", F.avg("total_revenue").over(window_7d)
).withColumn(
    "revenue_30d_avg", F.avg("total_revenue").over(window_30d)
).withColumn(
    "calls_wow_change", 
    F.round((F.col("total_records") - F.lag("total_records", 7).over(Window.orderBy("CDR_DAY"))) / 
    F.lag("total_records", 7).over(Window.orderBy("CDR_DAY")) * 100, 2)
).withColumn(
    "calls_mom_change",
    F.round((F.col("total_records") - F.lag("total_records", 30).over(Window.orderBy("CDR_DAY"))) / 
    F.lag("total_records", 30).over(Window.orderBy("CDR_DAY")) * 100, 2)
).withColumn(
    "trend_strength",
    F.abs(F.col("calls_7d_avg") - F.col("calls_30d_avg")) / F.col("calls_30d_avg") * 100
).withColumn(
    "trend_direction",
    F.when(F.col("calls_7d_avg") > F.col("calls_30d_avg") * 1.05, "Strong Upward")
     .when(F.col("calls_7d_avg") > F.col("calls_30d_avg") * 1.02, "Upward")
     .when(F.col("calls_7d_avg") < F.col("calls_30d_avg") * 0.95, "Strong Downward")
     .when(F.col("calls_7d_avg") < F.col("calls_30d_avg") * 0.98, "Downward")
     .otherwise("Stable")
).withColumn(
    "volatility_score",
    F.stddev("total_records").over(window_7d) / F.avg("total_records").over(window_7d) * 100
)

rolling_df.write.mode("overwrite").saveAsTable(ROLLING_KPIS)
print(f"✅ Advanced rolling KPIs table created: {ROLLING_KPIS}")

# Seasonal pattern detection
seasonal_patterns = enriched_df.groupBy("call_hour", "day_name").agg(
    F.avg("hourly_calls").alias("avg_calls"),
    F.stddev("hourly_calls").alias("stddev_calls"),
    F.expr("percentile_approx(hourly_calls, 0.5)").alias("median_calls"),
    F.expr("percentile_approx(hourly_calls, 0.95)").alias("p95_calls")
).withColumn(
    "pattern_reliability",
    F.when(F.col("stddev_calls") / F.col("avg_calls") < 0.3, "High")
     .when(F.col("stddev_calls") / F.col("avg_calls") < 0.5, "Medium")
     .otherwise("Low")
)

seasonal_patterns.write.mode("overwrite").saveAsTable(SEASONAL_PATTERNS)
print(f"✅ Seasonal patterns table created: {SEASONAL_PATTERNS}")

In [None]:
# ------------------------------------------------------------
# Cell 11 – Revenue Analytics & Optimization
# ------------------------------------------------------------
print("\n💰 Advanced Revenue Analytics...")

revenue_analytics = enriched_df.groupBy("CDR_DAY", "service_type_group").agg(
    F.count("*").alias("service_calls"),
    F.sum("DEBIT_AMOUNT_clean").alias("service_revenue"),
    F.avg("DEBIT_AMOUNT_clean").alias("avg_service_charge"),
    F.countDistinct("PRI_IDENTITY_HASH").alias("unique_users"),
    F.sum(F.when(F.col("DEBIT_AMOUNT_clean") == 0, 1).otherwise(0)).alias("free_calls"),
    F.sum(F.when(F.col("DEBIT_AMOUNT_clean") > 0, 1).otherwise(0)).alias("paid_calls")
).withColumn(
    "monetization_rate", 
    F.round(F.col("paid_calls") / F.col("service_calls") * 100, 2)
).withColumn(
    "arpu_service", 
    F.round(F.col("service_revenue") / F.col("unique_users"), 2)
).pivot("service_type_group").sum("service_revenue")

revenue_analytics.write.mode("overwrite").saveAsTable(REVENUE_ANALYTICS)
print(f"✅ Revenue analytics table created: {REVENUE_ANALYTICS}")


In [None]:
# ------------------------------------------------------------
# Cell 12 – Service Quality Metrics
# ------------------------------------------------------------
print("\n🎯 Service Quality Metrics...")

service_quality = enriched_df.groupBy("CDR_DAY", "time_period").agg(
    F.count("*").alias("total_calls"),
    F.avg("call_completion_rate").alias("avg_completion_rate"),
    F.avg("data_quality_score").alias("avg_quality_score"),
    F.sum(F.when(F.col("network_quality_indicator") == "Poor", 1).otherwise(0)).alias("poor_quality_calls"),
    F.sum(F.when(F.col("network_quality_indicator") == "Excellent", 1).otherwise(0)).alias("excellent_quality_calls"),
    F.avg(F.when(F.col("call_success") == 1, F.col("ACTUAL_USAGE_clean")).otherwise(None)).alias("avg_successful_duration"),
    F.stddev("ACTUAL_USAGE_clean").alias("duration_consistency")
).withColumn(
    "quality_index", 
    F.round((F.col("avg_completion_rate") * 0.4 + 
             F.col("avg_quality_score") / 100 * 0.3 +
             (1 - F.col("poor_quality_calls") / F.col("total_calls")) * 0.3) * 100, 2)
).withColumn(
    "service_grade",
    F.when(F.col("quality_index") >= 90, "A")
     .when(F.col("quality_index") >= 80, "B")
     .when(F.col("quality_index") >= 70, "C")
     .when(F.col("quality_index") >= 60, "D")
     .otherwise("F")
)

service_quality.write.mode("overwrite").saveAsTable(SERVICE_QUALITY)
print(f"✅ Service quality metrics table created: {SERVICE_QUALITY}")


In [None]:
# ------------------------------------------------------------
# Cell 13 – Peak Traffic Analysis
# ------------------------------------------------------------
print("\n⚡ Peak Traffic Analysis...")

# Identify peak hours and patterns
peak_analysis = traffic_hourly.withColumn(
    "is_peak_hour",
    F.when(F.col("hourly_calls") > F.col("avg_daily_calls") * 1.5, 1).otherwise(0)
).filter(F.col("is_peak_hour") == 1).groupBy("call_hour").agg(
    F.count("*").alias("peak_occurrences"),
    F.avg("hourly_calls").alias("avg_peak_calls"),
    F.max("hourly_calls").alias("max_peak_calls"),
    F.avg("success_rate").alias("avg_peak_success_rate"),
    F.avg("hourly_revenue").alias("avg_peak_revenue")
).withColumn(
    "peak_frequency", 
    F.round(F.col("peak_occurrences") / 30 * 100, 2)  # Percentage of days this hour is peak
).orderBy(F.desc("peak_frequency"))

peak_analysis.write.mode("overwrite").saveAsTable(PEAK_ANALYSIS)
print(f"✅ Peak traffic analysis table created: {PEAK_ANALYSIS}")

In [None]:
# ------------------------------------------------------------
# Cell 14 – Create Advanced Views for BI & Visualization
# ------------------------------------------------------------
print("\n🔍 Creating Advanced BI Views...")

# Executive dashboard view
spark.sql(f"""
CREATE OR REPLACE VIEW v_executive_dashboard AS
SELECT 
    CDR_DAY,
    total_records,
    unique_subscribers,
    ROUND(total_revenue, 2) as daily_revenue,
    ROUND(arpu, 2) as daily_arpu,
    success_rate,
    is_weekend,
    is_holiday,
    holiday_name,
    ROUND(calls_7d_avg, 0) as weekly_avg_calls,
    trend_direction
FROM {TRAFFIC_DAILY} d
JOIN {ROLLING_KPIS} r ON d.CDR_DAY = r.CDR_DAY
ORDER BY CDR_DAY DESC
""")

# Network health monitoring view
spark.sql(f"""
CREATE OR REPLACE VIEW v_network_health AS
SELECT 
    c.CallingCellID,
    c.cell_health_score,
    c.cell_category,
    c.avg_failure_rate,
    c.reliability_score,
    c.stability_score,
    c.avg_daily_calls,
    c.total_cell_revenue
FROM {CELL_HEALTH_SCORE} c
ORDER BY cell_health_score ASC
""")

# Subscriber insights view
spark.sql(f"""
CREATE OR REPLACE VIEW v_subscriber_insights AS
SELECT 
    usage_segment,
    revenue_segment,
    COUNT(*) as subscriber_count,
    ROUND(AVG(total_calls), 2) as avg_calls,
    ROUND(AVG(total_revenue), 2) as avg_revenue,
    ROUND(AVG(daily_arpu), 2) as avg_daily_arpu,
    ROUND(AVG(success_rate), 2) as avg_success_rate
FROM {SUBSCRIBER_BEHAVIOR}
GROUP BY usage_segment, revenue_segment
ORDER BY subscriber_count DESC
""")

# Peak hours view
spark.sql(f"""
CREATE OR REPLACE VIEW v_peak_hours_analysis AS
SELECT 
    call_hour,
    peak_frequency as peak_frequency_pct,
    ROUND(avg_peak_calls, 0) as avg_calls_during_peak,
    ROUND(max_peak_calls, 0) as max_calls_recorded,
    ROUND(avg_peak_success_rate, 2) as success_rate_during_peak,
    ROUND(avg_peak_revenue, 2) as avg_revenue_during_peak
FROM {PEAK_ANALYSIS}
ORDER BY call_hour
""")

# Yennayer special analysis view
spark.sql(f"""
CREATE OR REPLACE VIEW v_yennayer_impact AS
SELECT 
    CallingCellID,
    yennayer_period,
    calls,
    unique_users,
    ROUND(revenue, 2) as revenue,
    ROUND(avg_duration, 2) as avg_call_duration,
    evening_call_ratio
FROM {YENNAYER_ANALYSIS}
WHERE calls > 100
ORDER BY calls DESC
""")

# Service quality view
spark.sql(f"""
CREATE OR REPLACE VIEW v_service_quality AS
SELECT 
    CDR_DAY,
    time_period,
    quality_index,
    service_grade,
    total_calls,
    ROUND(avg_completion_rate * 100, 2) as completion_rate_pct,
    poor_quality_calls,
    excellent_quality_calls
FROM {SERVICE_QUALITY}
ORDER BY CDR_DAY DESC, quality_index DESC
""")

print("✅ Created advanced views:")
print("   - v_executive_dashboard")
print("   - v_network_health")
print("   - v_subscriber_insights")
print("   - v_peak_hours_analysis")
print("   - v_yennayer_impact")
print("   - v_service_quality")

In [None]:
# ------------------------------------------------------------
# Cell 15 – Summary & Verification
# ------------------------------------------------------------
print("\n" + "="*80)
print("📊 ADVANCED DATA ENGINEERING PROCESSING COMPLETE")
print("="*80)

# Show tables created
print("\n📋 Tables Created:")
all_tables = spark.sql("SHOW TABLES").filter(F.col("isTemporary") == False).collect()
for table in all_tables:
    if table.tableName not in ['cdr_anonymized', 'cdr_daily_summary', 'cdr_network_metrics']:
        row_count = spark.table(table.tableName).count()
        print(f"   - {table.tableName}: {row_count:,} rows")

# Sample executive dashboard
print("\n📈 Executive Dashboard Sample:")
spark.sql("SELECT * FROM v_executive_dashboard LIMIT 5").show(truncate=False)

# Network health summary
print("\n🏥 Network Health Summary:")
spark.sql("""
SELECT cell_category, COUNT(*) as cell_count, ROUND(AVG(cell_health_score), 2) as avg_score
FROM v_network_health
GROUP BY cell_category
ORDER BY avg_score DESC
""").show()

# Subscriber segmentation summary
print("\n👥 Subscriber Segmentation:")
spark.sql("SELECT * FROM v_subscriber_insights LIMIT 10").show(truncate=False)

print("\n✅ All data engineering transformations completed successfully!")
print("📊 Total processing time:", datetime.now())
print("\n🚀 Next Steps:")
print("   1. Run Notebook 04 for Advanced Anomaly Detection")
print("   2. Run Notebook 05 for BI Visualizations")
print("   3. Configure Superset/PowerBI dashboards with created views")

# Keep session active for next notebook
print("\n💡 Spark session kept active for next notebook")

#### Generated CDR Part 

In [None]:
# =====================================================
# NOTEBOOK 03: ADVANCED DATA ENGINEERING & TRANSFORMATIONS
# Algerie Telecom Big Data Project - Feature Engineering
# =====================================================

import sys
sys.path.append('/home/jovyan/work/scripts')
from spark_init import init_spark
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.stat import Correlation
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from datetime import datetime, timedelta
import numpy as np

# Initialize Spark
spark = init_spark("Data Engineering - Advanced Transformations")

print("=" * 80)
print("🔧 ADVANCED DATA ENGINEERING & FEATURE EXTRACTION")
print("📊 Algerie Telecom CDR Analytics Pipeline")
print("=" * 80)

# Use the database
spark.sql("USE algerie_telecom_gen")

# =====================================================
# 1. CUSTOMER BEHAVIOR PROFILING
# =====================================================
print("\n👤 BUILDING CUSTOMER BEHAVIOR PROFILES...")

# Create comprehensive customer profile
customer_profile = spark.sql("""
    WITH customer_metrics AS (
        SELECT 
            subscriber_id,
            customer_segment,
            age_group,
            gender,
            operator,
            payment_type,
            
            -- Service usage patterns
            COUNT(DISTINCT CASE WHEN service_type = 'VOICE' THEN cdr_id END) as voice_calls_count,
            COUNT(DISTINCT CASE WHEN service_type = 'DATA' THEN cdr_id END) as data_sessions_count,
            COUNT(DISTINCT CASE WHEN service_type = 'SMS' THEN cdr_id END) as sms_count,
            
            -- Duration metrics
            SUM(CASE WHEN service_type = 'VOICE' THEN duration ELSE 0 END) as total_voice_duration,
            AVG(CASE WHEN service_type = 'VOICE' THEN duration ELSE 0 END) as avg_voice_duration,
            
            -- Data usage metrics
            SUM(data_volume_mb) as total_data_mb,
            AVG(CASE WHEN service_type = 'DATA' THEN data_volume_mb ELSE NULL END) as avg_data_per_session,
            
            -- Financial metrics
            SUM(charging_amount) as total_spending,
            AVG(charging_amount) as avg_transaction_value,
            SUM(promotional_discount) / COUNT(*) as avg_discount_rate,
            
            -- Quality metrics
            AVG(quality_score) as avg_quality_score,
            SUM(CASE WHEN dropped_call_flag THEN 1 ELSE 0 END) as dropped_calls,
            
            -- Temporal patterns
            COUNT(DISTINCT DATE(start_time)) as active_days,
            COUNT(DISTINCT CASE WHEN is_weekend THEN DATE(start_time) END) as weekend_days,
            COUNT(DISTINCT CASE WHEN time_of_day_category = 'NIGHT' THEN DATE(start_time) END) as night_usage_days,
            
            -- Special features usage
            COUNT(DISTINCT CASE WHEN special_offer_applied != 'None' THEN DATE(start_time) END) as offer_usage_days,
            COUNT(DISTINCT CASE WHEN roaming_flag THEN cdr_id END) as roaming_events,
            
            -- Anomaly indicators
            MAX(fraud_indicator) as has_fraud_flag,
            SUM(CASE WHEN unusual_pattern_flag THEN 1 ELSE 0 END) as unusual_patterns_count,
            
            -- App preferences (for data users)
            COUNT(DISTINCT application_used) as unique_apps_used,
            MIN(start_time) as first_activity,
            MAX(start_time) as last_activity
            
        FROM cdr_partitioned
        WHERE year = 2025 AND month IN (1, 2, 3)  -- Q1 2025
        GROUP BY subscriber_id, customer_segment, age_group, gender, operator, payment_type
    )
    SELECT 
        *,
        -- Derived metrics
        DATEDIFF(last_activity, first_activity) + 1 as customer_lifetime_days,
        total_spending / NULLIF(active_days, 0) as daily_avg_spending,
        voice_calls_count / NULLIF(active_days, 0) as calls_per_day,
        total_data_mb / NULLIF(active_days, 0) as data_mb_per_day,
        
        -- Customer value score
        CASE 
            WHEN total_spending > 5000 AND active_days > 60 THEN 'Premium'
            WHEN total_spending > 2000 AND active_days > 30 THEN 'High'
            WHEN total_spending > 500 THEN 'Medium'
            ELSE 'Low'
        END as value_category,
        
        -- Usage pattern classification
        CASE 
            WHEN voice_calls_count > data_sessions_count * 2 THEN 'Voice Heavy'
            WHEN data_sessions_count > voice_calls_count * 2 THEN 'Data Heavy'
            ELSE 'Balanced'
        END as usage_pattern
        
    FROM customer_metrics
""")

# Cache for performance
customer_profile.cache()
customer_profile.createOrReplaceTempView("customer_profiles")

print(f"✅ Created customer profiles for {customer_profile.count():,} subscribers")

# Save as table
customer_profile.write.mode("overwrite").saveAsTable("customer_behavior_profiles")

# Show sample profile
print("\n📊 Sample Customer Profiles:")
customer_profile.select(
    "subscriber_id", "customer_segment", "operator", "value_category", 
    "usage_pattern", "total_spending", "active_days"
).show(5)

# =====================================================
# 2. NETWORK PERFORMANCE ENGINEERING
# =====================================================
print("\n📡 ENGINEERING NETWORK PERFORMANCE FEATURES...")

network_performance = spark.sql("""
    WITH network_metrics AS (
        SELECT 
            location_area,
            operator,
            network_type,
            cell_id,
            HOUR(start_time) as hour_of_day,
            
            -- Performance metrics
            COUNT(*) as total_transactions,
            AVG(quality_score) as avg_quality_score,
            STDDEV(quality_score) as quality_stddev,
            AVG(signal_strength) as avg_signal_strength,
            
            -- Failure metrics
            SUM(CASE WHEN dropped_call_flag THEN 1 ELSE 0 END) as dropped_calls,
            SUM(CASE WHEN call_result = 'FAILED' THEN 1 ELSE 0 END) as failed_transactions,
            SUM(CASE WHEN network_congestion_level = 'HIGH' THEN 1 ELSE 0 END) as high_congestion_events,
            
            -- Load metrics
            COUNT(DISTINCT subscriber_id) as unique_users,
            SUM(data_volume_mb) as total_data_load_mb,
            SUM(duration) / 3600.0 as total_voice_hours
            
        FROM cdr_partitioned
        WHERE year = 2025 AND month = 1
        GROUP BY location_area, operator, network_type, cell_id, HOUR(start_time)
    )
    SELECT 
        *,
        -- Calculated KPIs
        (dropped_calls + failed_transactions) / NULLIF(total_transactions, 0) * 100 as failure_rate,
        high_congestion_events / NULLIF(total_transactions, 0) * 100 as congestion_rate,
        total_data_load_mb / NULLIF(unique_users, 0) as data_per_user,
        
        -- Performance score (composite metric)
        (avg_quality_score * 0.4 + 
         (100 - (dropped_calls + failed_transactions) / NULLIF(total_transactions, 0) * 100) / 100 * 0.3 +
         avg_signal_strength / 100.0 * 0.3) as network_performance_score,
         
        -- Congestion classification
        CASE 
            WHEN high_congestion_events / NULLIF(total_transactions, 0) > 0.2 THEN 'Critical'
            WHEN high_congestion_events / NULLIF(total_transactions, 0) > 0.1 THEN 'High'
            WHEN high_congestion_events / NULLIF(total_transactions, 0) > 0.05 THEN 'Medium'
            ELSE 'Low'
        END as congestion_severity
        
    FROM network_metrics
""")

network_performance.write.mode("overwrite").saveAsTable("network_performance_features")
print("✅ Network performance features engineered")

# =====================================================
# 3. REVENUE OPTIMIZATION FEATURES
# =====================================================
print("\n💰 CREATING REVENUE OPTIMIZATION FEATURES...")

revenue_features = spark.sql("""
    WITH revenue_base AS (
        SELECT 
            subscriber_id,
            customer_segment,
            operator,
            service_type,
            special_offer_applied,
            
            -- Revenue metrics
            SUM(charging_amount) as total_revenue,
            AVG(charging_amount) as avg_revenue_per_transaction,
            SUM(tax_amount) as total_tax,
            
            -- Discount analysis
            AVG(promotional_discount) as avg_discount,
            MAX(promotional_discount) as max_discount,
            COUNT(CASE WHEN promotional_discount > 0 THEN 1 END) as discounted_transactions,
            
            -- Service-specific metrics
            SUM(CASE WHEN service_type = 'DATA' THEN data_volume_mb ELSE 0 END) as total_data_mb,
            SUM(CASE WHEN service_type = 'DATA' THEN charging_amount ELSE 0 END) as data_revenue,
            SUM(CASE WHEN service_type = 'VOICE' THEN duration ELSE 0 END) as total_voice_seconds,
            SUM(CASE WHEN service_type = 'VOICE' THEN charging_amount ELSE 0 END) as voice_revenue,
            
            COUNT(*) as transaction_count
            
        FROM cdr_partitioned
        WHERE year = 2025 AND month IN (1, 2, 3)
        GROUP BY subscriber_id, customer_segment, operator, service_type, special_offer_applied
    )
    SELECT 
        subscriber_id,
        customer_segment,
        operator,
        
        -- Aggregate across services
        SUM(total_revenue) as total_customer_revenue,
        SUM(total_tax) as total_customer_tax,
        AVG(avg_revenue_per_transaction) as avg_transaction_value,
        
        -- Service mix
        SUM(CASE WHEN service_type = 'VOICE' THEN total_revenue ELSE 0 END) / NULLIF(SUM(total_revenue), 0) as voice_revenue_ratio,
        SUM(CASE WHEN service_type = 'DATA' THEN total_revenue ELSE 0 END) / NULLIF(SUM(total_revenue), 0) as data_revenue_ratio,
        SUM(CASE WHEN service_type = 'SMS' THEN total_revenue ELSE 0 END) / NULLIF(SUM(total_revenue), 0) as sms_revenue_ratio,
        
        -- Discount effectiveness
        SUM(discounted_transactions) / SUM(transaction_count) as discount_usage_rate,
        AVG(avg_discount) as overall_avg_discount,
        
        -- ARPU components
        SUM(data_revenue) / NULLIF(SUM(total_data_mb), 0) as revenue_per_mb,
        SUM(voice_revenue) / NULLIF(SUM(total_voice_seconds), 0) * 60 as revenue_per_minute,
        
        -- Offer adoption
        COUNT(DISTINCT CASE WHEN special_offer_applied != 'None' THEN special_offer_applied END) as unique_offers_used
        
    FROM revenue_base
    GROUP BY subscriber_id, customer_segment, operator
""")

revenue_features.write.mode("overwrite").saveAsTable("revenue_optimization_features")
print("✅ Revenue optimization features created")

# =====================================================
# 4. TIME SERIES FEATURE ENGINEERING
# =====================================================
print("\n📈 ENGINEERING TIME SERIES FEATURES...")

# Daily aggregations with lag features
time_series_features = spark.sql("""
    WITH daily_metrics AS (
        SELECT 
            DATE(start_time) as activity_date,
            operator,
            service_type,
            COUNT(*) as daily_transactions,
            COUNT(DISTINCT subscriber_id) as daily_active_users,
            SUM(charging_amount) as daily_revenue,
            AVG(quality_score) as daily_avg_quality,
            SUM(CASE WHEN fraud_indicator THEN 1 ELSE 0 END) as daily_fraud_cases,
            SUM(data_volume_mb) as daily_data_volume,
            SUM(duration) / 3600.0 as daily_voice_hours
        FROM cdr_partitioned
        WHERE year = 2025 AND month IN (1, 2, 3)
        GROUP BY DATE(start_time), operator, service_type
    ),
    with_lag_features AS (
        SELECT 
            *,
            -- Lag features for trend analysis
            LAG(daily_transactions, 1) OVER (PARTITION BY operator, service_type ORDER BY activity_date) as prev_day_transactions,
            LAG(daily_transactions, 7) OVER (PARTITION BY operator, service_type ORDER BY activity_date) as prev_week_transactions,
            LAG(daily_revenue, 1) OVER (PARTITION BY operator, service_type ORDER BY activity_date) as prev_day_revenue,
            LAG(daily_revenue, 7) OVER (PARTITION BY operator, service_type ORDER BY activity_date) as prev_week_revenue,
            
            -- Moving averages
            AVG(daily_transactions) OVER (
                PARTITION BY operator, service_type 
                ORDER BY activity_date 
                ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
            ) as ma7_transactions,
            
            AVG(daily_revenue) OVER (
                PARTITION BY operator, service_type 
                ORDER BY activity_date 
                ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
            ) as ma7_revenue
            
        FROM daily_metrics
    )
    SELECT 
        *,
        -- Growth rates
        (daily_transactions - prev_day_transactions) / NULLIF(prev_day_transactions, 0) * 100 as daily_growth_rate,
        (daily_transactions - prev_week_transactions) / NULLIF(prev_week_transactions, 0) * 100 as weekly_growth_rate,
        
        -- Trend indicators
        CASE 
            WHEN daily_transactions > ma7_transactions * 1.2 THEN 'Spike'
            WHEN daily_transactions < ma7_transactions * 0.8 THEN 'Drop'
            ELSE 'Normal'
        END as traffic_trend,
        
        -- Revenue per user
        daily_revenue / NULLIF(daily_active_users, 0) as daily_arpu
        
    FROM with_lag_features
""")

time_series_features.write.mode("overwrite").saveAsTable("time_series_features")
print("✅ Time series features engineered")

# =====================================================
# 5. CUSTOMER CHURN RISK INDICATORS
# =====================================================
print("\n🚨 BUILDING CHURN RISK INDICATORS...")

churn_indicators = spark.sql("""
    WITH activity_timeline AS (
        SELECT 
            subscriber_id,
            DATE(start_time) as activity_date,
            COUNT(*) as daily_activities
        FROM cdr_partitioned
        WHERE year = 2025 AND month IN (1, 2, 3)
        GROUP BY subscriber_id, DATE(start_time)
    ),
    activity_gaps AS (
        SELECT 
            subscriber_id,
            activity_date,
            LAG(activity_date, 1) OVER (PARTITION BY subscriber_id ORDER BY activity_date) as prev_activity_date,
            DATEDIFF(activity_date, LAG(activity_date, 1) OVER (PARTITION BY subscriber_id ORDER BY activity_date)) as days_since_last_activity
        FROM activity_timeline
    ),
    churn_metrics AS (
        SELECT 
            ag.subscriber_id,
            COUNT(*) as total_active_days,
            MAX(days_since_last_activity) as max_inactivity_gap,
            AVG(days_since_last_activity) as avg_days_between_activities,
            STDDEV(days_since_last_activity) as activity_regularity,
            MAX(activity_date) as last_activity_date,
            DATEDIFF('2025-03-31', MAX(activity_date)) as days_since_last_seen
        FROM activity_gaps ag
        GROUP BY ag.subscriber_id
    )
    SELECT 
        cm.*,
        cp.customer_segment,
        cp.total_spending,
        cp.avg_transaction_value,
        cp.active_days,
        
        -- Churn risk score components
        CASE 
            WHEN days_since_last_seen > 30 THEN 1.0
            WHEN days_since_last_seen > 14 THEN 0.7
            WHEN days_since_last_seen > 7 THEN 0.4
            ELSE 0.1
        END as recency_risk,
        
        CASE 
            WHEN total_active_days < 10 THEN 0.8
            WHEN total_active_days < 30 THEN 0.5
            ELSE 0.2
        END as frequency_risk,
        
        CASE 
            WHEN total_spending < 100 THEN 0.8
            WHEN total_spending < 500 THEN 0.5
            ELSE 0.2
        END as monetary_risk,
        
        -- Overall churn risk
        (CASE 
            WHEN days_since_last_seen > 30 THEN 1.0
            WHEN days_since_last_seen > 14 THEN 0.7
            WHEN days_since_last_seen > 7 THEN 0.4
            ELSE 0.1
        END * 0.5 +
        CASE 
            WHEN total_active_days < 10 THEN 0.8
            WHEN total_active_days < 30 THEN 0.5
            ELSE 0.2
        END * 0.3 +
        CASE 
            WHEN total_spending < 100 THEN 0.8
            WHEN total_spending < 500 THEN 0.5
            ELSE 0.2
        END * 0.2) as churn_risk_score
        
    FROM churn_metrics cm
    JOIN customer_behavior_profiles cp ON cm.subscriber_id = cp.subscriber_id
""")

churn_indicators.write.mode("overwrite").saveAsTable("customer_churn_indicators")
print("✅ Churn risk indicators calculated")

# =====================================================
# 6. FRAUD PATTERN DETECTION FEATURES
# =====================================================
print("\n🔍 ENGINEERING FRAUD DETECTION FEATURES...")

fraud_features = spark.sql("""
    WITH subscriber_patterns AS (
        SELECT 
            subscriber_id,
            -- Normal behavior baseline
            AVG(CASE WHEN fraud_indicator = false THEN charging_amount END) as normal_avg_charge,
            STDDEV(CASE WHEN fraud_indicator = false THEN charging_amount END) as normal_charge_stddev,
            AVG(CASE WHEN fraud_indicator = false AND service_type = 'VOICE' THEN duration END) as normal_avg_duration,
            
            -- Anomaly counts
            SUM(CASE WHEN fraud_indicator THEN 1 ELSE 0 END) as fraud_flag_count,
            SUM(CASE WHEN unusual_pattern_flag THEN 1 ELSE 0 END) as unusual_pattern_count,
            
            -- High-risk behaviors
            COUNT(CASE WHEN roaming_flag AND charging_amount > 500 THEN 1 END) as high_value_roaming,
            COUNT(CASE WHEN HOUR(start_time) BETWEEN 2 AND 5 THEN 1 END) as night_activities,
            COUNT(CASE WHEN service_subtype = 'INTERNATIONAL_CALL' THEN 1 END) as international_calls,
            
            -- Velocity features
            COUNT(DISTINCT DATE(start_time)) as active_days,
            COUNT(*) as total_transactions,
            MAX(charging_amount) as max_single_charge
            
        FROM cdr_partitioned
        WHERE year = 2025 AND month IN (1, 2, 3)
        GROUP BY subscriber_id
    ),
    hourly_velocity AS (
        SELECT 
            subscriber_id,
            DATE(start_time) as activity_date,
            HOUR(start_time) as activity_hour,
            COUNT(*) as hourly_transactions,
            SUM(charging_amount) as hourly_spending
        FROM cdr_partitioned
        WHERE year = 2025 AND month IN (1, 2, 3)
        GROUP BY subscriber_id, DATE(start_time), HOUR(start_time)
    ),
    velocity_features AS (
        SELECT 
            subscriber_id,
            MAX(hourly_transactions) as max_hourly_transactions,
            MAX(hourly_spending) as max_hourly_spending,
            AVG(hourly_transactions) as avg_hourly_transactions
        FROM hourly_velocity
        GROUP BY subscriber_id
    )
    SELECT 
        sp.*,
        vf.max_hourly_transactions,
        vf.max_hourly_spending,
        vf.avg_hourly_transactions,
        
        -- Risk indicators
        CASE 
            WHEN max_single_charge > normal_avg_charge + 3 * normal_charge_stddev THEN 1
            ELSE 0
        END as charge_anomaly_flag,
        
        CASE 
            WHEN max_hourly_transactions > 50 THEN 1
            ELSE 0
        END as velocity_anomaly_flag,
        
        -- Fraud risk score
        (fraud_flag_count * 0.4 +
         unusual_pattern_count * 0.2 +
         CASE WHEN max_single_charge > normal_avg_charge + 3 * normal_charge_stddev THEN 10 ELSE 0 END * 0.2 +
         CASE WHEN max_hourly_transactions > 50 THEN 10 ELSE 0 END * 0.1 +
         CASE WHEN night_activities > total_transactions * 0.3 THEN 10 ELSE 0 END * 0.1
        ) as fraud_risk_score
        
    FROM subscriber_patterns sp
    JOIN velocity_features vf ON sp.subscriber_id = vf.subscriber_id
""")

fraud_features.write.mode("overwrite").saveAsTable("fraud_detection_features")
print("✅ Fraud detection features created")

# =====================================================
# 7. CAMPAIGN EFFECTIVENESS FEATURES
# =====================================================
print("\n📢 ANALYZING CAMPAIGN EFFECTIVENESS...")

campaign_features = spark.sql("""
    WITH offer_adoption AS (
        SELECT 
            special_offer_applied,
            customer_segment,
            age_group,
            operator,
            COUNT(DISTINCT subscriber_id) as unique_adopters,
            COUNT(*) as total_usage_count,
            SUM(charging_amount) as revenue_with_offer,
            AVG(promotional_discount) as avg_discount_given,
            SUM(charging_amount * promotional_discount / 100) as total_discount_amount
        FROM cdr_partitioned
        WHERE year = 2025 AND month IN (1, 2, 3)
          AND special_offer_applied != 'None'
        GROUP BY special_offer_applied, customer_segment, age_group, operator
    ),
    baseline_revenue AS (
        SELECT 
            customer_segment,
            age_group,
            operator,
            AVG(charging_amount) as baseline_avg_charge
        FROM cdr_partitioned
        WHERE year = 2025 AND month IN (1, 2, 3)
          AND special_offer_applied = 'None'
        GROUP BY customer_segment, age_group, operator
    )
    SELECT 
        oa.*,
        br.baseline_avg_charge,
        
        -- Campaign metrics
        unique_adopters / total_usage_count as adoption_rate,
        revenue_with_offer / NULLIF(unique_adopters, 0) as revenue_per_adopter,
        total_discount_amount / NULLIF(revenue_with_offer, 0) * 100 as discount_cost_ratio,
        
        -- Effectiveness score
        CASE 
            WHEN revenue_with_offer / NULLIF(unique_adopters, 0) > br.baseline_avg_charge * 1.2 THEN 'Highly Effective'
            WHEN revenue_with_offer / NULLIF(unique_adopters, 0) > br.baseline_avg_charge THEN 'Effective'
            ELSE 'Needs Improvement'
        END as campaign_effectiveness
        
    FROM offer_adoption oa
    LEFT JOIN baseline_revenue br 
        ON oa.customer_segment = br.customer_segment 
        AND oa.age_group = br.age_group
        AND oa.operator = br.operator
""")

campaign_features.write.mode("overwrite").saveAsTable("campaign_effectiveness_features")
print("✅ Campaign effectiveness features analyzed")

# =====================================================
# 8. NETWORK CAPACITY PLANNING FEATURES
# =====================================================
print("\n📊 CREATING NETWORK CAPACITY PLANNING FEATURES...")

capacity_planning = spark.sql("""
    WITH hourly_load AS (
        SELECT 
            location_area,
            cell_id,
            operator,
            network_type,
            DATE(start_time) as load_date,
            HOUR(start_time) as load_hour,
            
            COUNT(*) as concurrent_sessions,
            COUNT(DISTINCT subscriber_id) as concurrent_users,
            SUM(CASE WHEN service_type = 'VOICE' THEN 1 ELSE 0 END) as voice_sessions,
            SUM(CASE WHEN service_type = 'DATA' THEN 1 ELSE 0 END) as data_sessions,
            SUM(data_volume_mb) as hourly_data_mb,
            AVG(quality_score) as avg_quality,
            SUM(CASE WHEN dropped_call_flag THEN 1 ELSE 0 END) as dropped_sessions
            
        FROM cdr_partitioned
        WHERE year = 2025 AND month = 1
        GROUP BY location_area, cell_id, operator, network_type, DATE(start_time), HOUR(start_time)
    ),
    cell_capacity_metrics AS (
        SELECT 
            location_area,
            cell_id,
            operator,
            network_type,
            
            -- Peak load metrics
            MAX(concurrent_sessions) as peak_concurrent_sessions,
            MAX(concurrent_users) as peak_concurrent_users,
            MAX(hourly_data_mb) as peak_hourly_data_mb,
            
            -- Average load
            AVG(concurrent_sessions) as avg_concurrent_sessions,
            AVG(hourly_data_mb) as avg_hourly_data_mb,
            
            -- Quality under load
            MIN(avg_quality) as min_quality_score,
            AVG(CASE WHEN concurrent_sessions > AVG(concurrent_sessions) * 1.5 THEN avg_quality END) as quality_under_peak_load,
            
            -- Capacity utilization (assuming theoretical max)
            MAX(concurrent_sessions) / 1000.0 * 100 as estimated_capacity_utilization,
            
            -- Failure rate under load
            SUM(dropped_sessions) / SUM(concurrent_sessions) * 100 as overall_drop_rate
            
        FROM hourly_load
        GROUP BY location_area, cell_id, operator, network_type
    )
    SELECT 
        *,
        -- Capacity planning recommendations
        CASE 
            WHEN estimated_capacity_utilization > 80 THEN 'Urgent Upgrade Required'
            WHEN estimated_capacity_utilization > 60 THEN 'Plan Upgrade'
            WHEN estimated_capacity_utilization > 40 THEN 'Monitor'
            ELSE 'Adequate'
        END as capacity_recommendation,
        
        -- Load balancing score
        (100 - estimated_capacity_utilization) * 0.4 +
        min_quality_score * 100 * 0.3 +
        (100 - overall_drop_rate) * 0.3 as cell_health_score
        
    FROM cell_capacity_metrics
""")

capacity_planning.write.mode("overwrite").saveAsTable("network_capacity_planning")
print("✅ Network capacity planning features created")

# =====================================================
# 9. FEATURE STORE SUMMARY
# =====================================================
print("\n📚 CREATING UNIFIED FEATURE STORE...")

# Create a master feature table joining key features
spark.sql("""
    CREATE OR REPLACE TABLE feature_store AS
    SELECT 
        cbp.subscriber_id,
        cbp.customer_segment,
        cbp.age_group,
        cbp.gender,
        cbp.operator,
        cbp.payment_type,
        cbp.value_category,
        cbp.usage_pattern,
        cbp.total_spending,
        cbp.active_days,
        cbp.daily_avg_spending,
        
        -- Revenue features
        rof.total_customer_revenue,
        rof.voice_revenue_ratio,
        rof.data_revenue_ratio,
        rof.discount_usage_rate,
        
        -- Churn indicators
        cci.days_since_last_seen,
        cci.churn_risk_score,
        
        -- Fraud indicators
        fdf.fraud_risk_score,
        fdf.fraud_flag_count,
        fdf.unusual_pattern_count,
        
        -- Current timestamp for versioning
        CURRENT_TIMESTAMP() as feature_timestamp
        
    FROM customer_behavior_profiles cbp
    LEFT JOIN revenue_optimization_features rof ON cbp.subscriber_id = rof.subscriber_id
    LEFT JOIN customer_churn_indicators cci ON cbp.subscriber_id = cci.subscriber_id
    LEFT JOIN fraud_detection_features fdf ON cbp.subscriber_id = fdf.subscriber_id
""")

print("✅ Unified feature store created")

# =====================================================
# 10. FEATURE STATISTICS & QUALITY CHECK
# =====================================================
print("\n📊 FEATURE STATISTICS AND QUALITY REPORT...")

# Feature completeness check
feature_quality = spark.sql("""
    SELECT 
        COUNT(*) as total_records,
        COUNT(subscriber_id) / COUNT(*) * 100 as subscriber_id_completeness,
        COUNT(total_spending) / COUNT(*) * 100 as spending_completeness,
        COUNT(churn_risk_score) / COUNT(*) * 100 as churn_score_completeness,
        COUNT(fraud_risk_score) / COUNT(*) * 100 as fraud_score_completeness,
        
        AVG(total_spending) as avg_customer_spending,
        STDDEV(total_spending) as stddev_customer_spending,
        
        AVG(churn_risk_score) as avg_churn_risk,
        AVG(fraud_risk_score) as avg_fraud_risk,
        
        COUNT(DISTINCT customer_segment) as unique_segments,
        COUNT(DISTINCT operator) as unique_operators
        
    FROM feature_store
""").collect()[0]

print("\n📈 Feature Store Quality Report:")
print(f"   Total customer records: {feature_quality['total_records']:,}")
print(f"   Average customer spending: {feature_quality['avg_customer_spending']:.2f} DZD")
print(f"   Average churn risk: {feature_quality['avg_churn_risk']:.2%}")
print(f"   Average fraud risk: {feature_quality['avg_fraud_risk']:.2f}")

# Show feature distributions
print("\n📊 Customer Segment Distribution:")
spark.sql("""
    SELECT 
        customer_segment,
        COUNT(*) as customer_count,
        ROUND(AVG(total_spending), 2) as avg_spending,
        ROUND(AVG(churn_risk_score), 3) as avg_churn_risk
    FROM feature_store
    GROUP BY customer_segment
    ORDER BY customer_count DESC
""").show()

print("\n📊 Value Category Distribution:")
spark.sql("""
    SELECT 
        value_category,
        COUNT(*) as count,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as percentage
    FROM feature_store
    GROUP BY value_category
    ORDER BY 
        CASE value_category 
            WHEN 'Premium' THEN 1 
            WHEN 'High' THEN 2 
            WHEN 'Medium' THEN 3 
            ELSE 4 
        END
""").show()

# =====================================================
# 11. EXPORT KEY METRICS FOR VISUALIZATION
# =====================================================
print("\n📤 EXPORTING KEY METRICS FOR VISUALIZATION...")

# Daily trends for visualization
daily_trends = spark.sql("""
    SELECT 
        DATE(start_time) as date,
        operator,
        COUNT(*) as transactions,
        COUNT(DISTINCT subscriber_id) as active_users,
        SUM(charging_amount) as revenue,
        AVG(quality_score) as avg_quality
    FROM cdr_partitioned
    WHERE year = 2025 AND month IN (1, 2)
    GROUP BY DATE(start_time), operator
    ORDER BY date, operator
""").toPandas()

# Save for later visualization
daily_trends.to_csv("/tmp/daily_trends.csv", index=False)
print("✅ Daily trends exported")

# =====================================================
# SUMMARY
# =====================================================
print("\n" + "="*80)
print("✅ DATA ENGINEERING PIPELINE COMPLETE!")
print("="*80)

print("\n📊 Feature Engineering Summary:")
print(f"   • Customer Behavior Profiles: {customer_profile.count():,} profiles")
print(f"   • Network Performance Features: Analyzed across {network_performance.select('location_area').distinct().count()} locations")
print(f"   • Revenue Optimization Features: Created for {revenue_features.count():,} customers")
print(f"   • Time Series Features: {time_series_features.count():,} daily records")
print(f"   • Churn Risk Indicators: Calculated for {churn_indicators.count():,} subscribers")
print(f"   • Fraud Detection Features: {fraud_features.count():,} subscriber patterns analyzed")
print(f"   • Campaign Effectiveness: Analyzed {campaign_features.select('special_offer_applied').distinct().count()} campaigns")
print(f"   • Network Capacity Planning: {capacity_planning.count():,} cell tower metrics")

print("\n🎯 Next Steps:")
print("   → Run Notebook 04 for Advanced Analytics & Anomaly Detection")
print("   → Run Notebook 05 for Business Intelligence Dashboards")

spark.stop()
print("\n🔚 Spark session closed.")