In [1]:
import os
import time
import shutil
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, to_timestamp, round, current_timestamp
from pyspark.sql.types import *

In [2]:
spark = SparkSession.builder \
    .appName("TelecomTowerStreaming") \
    .config("spark.jars.packages",
        "org.apache.spark:spark-sql-kafka-0-10_2.13:3.5.1,"
        "com.datastax.spark:spark-cassandra-connector_2.13:3.4.1") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "1g") \
    .config("spark.sql.streaming.minBatchesToRetain", "2") \
    .config("spark.cleaner.referenceTracking.cleanCheckpoints", "true") \
    .config("spark.sql.streaming.schemaInference", "false") \
    .config("spark.sql.streaming.checkpointLocation.cleanup.enabled", "true") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

print("✅ Spark Session Created Successfully!")
print(f"Spark Version: {spark.version}")



✅ Spark Session Created Successfully!
Spark Version: 4.0.1


In [3]:
spark.conf.set("spark.cassandra.connection.host", "cassandra")  # عدلي حسب IP أو Docker host
spark.conf.set("spark.cassandra.connection.port", "9042")
spark.conf.set("spark.cassandra.output.concurrent.writes", "5")
spark.conf.set("spark.cassandra.output.batch.size.rows", "500")

In [4]:
tower_schema = StructType([
    StructField("tower_id", LongType(), True),
    StructField("CID", LongType(), True),
    StructField("TAC", LongType(), True),
    StructField("radio", StringType(), True),
    StructField("MCC", IntegerType(), True),
    StructField("MNC", IntegerType(), True),
    StructField("Network", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("governorate_en", StringType(), True),
    StructField("Country", StringType(), True),
    StructField("RANGE", IntegerType(), True),
    StructField("SAM", IntegerType(), True),
    StructField("tower_status", StringType(), True),
    StructField("signal_quality", StringType(), True),
    StructField("coverage_gap", BooleanType(), True),
    StructField("priority", StringType(), True),
    StructField("signal_strength", IntegerType(), True),
    StructField("speed", DoubleType(), True),
    StructField("latency", DoubleType(), True),
    StructField("QoE", DoubleType(), True),
    StructField("avg_load", DoubleType(), True),
    StructField("total_calls", IntegerType(), True),
    StructField("drop_calls", IntegerType(), True),
    StructField("drop_rate", DoubleType(), True),
    StructField("maintenance_type", StringType(), True),
    StructField("labor_cost_egp", DoubleType(), True),
    StructField("parts_cost_egp", DoubleType(), True),
    StructField("downtime_hours", DoubleType(), True),
    StructField("vendor", StringType(), True),
    StructField("notes", StringType(), True),
    StructField("created", LongType(), True),
    StructField("updated", LongType(), True),
    StructField("created_dt", StringType(), True),
    StructField("updated_dt", StringType(), True),
    StructField("maintenance_date", StringType(), True)
])

In [5]:
checkpoint_dir = "/tmp/checkpoints/telecom_v1"

df_raw = (
    spark
        .readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "kafka:29092")
        .option("subscribe", "telecom-stream")
        .option("startingOffsets", "earliest")
        .option("maxOffsetsPerTrigger", "5000")
        .option("kafka.group.id", "spark_telecom_group")
        .option("failOnDataLoss", "false")
        .load()
)

print("✅ Kafka Stream Connected")



✅ Kafka Stream Connected


In [6]:
# STEP 4: Parse and Transform Data
df_parsed = df_raw.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), tower_schema).alias("data")) \
    .select("data.*")

df_parsed.printSchema()


root
 |-- tower_id: long (nullable = true)
 |-- CID: long (nullable = true)
 |-- TAC: long (nullable = true)
 |-- radio: string (nullable = true)
 |-- MCC: integer (nullable = true)
 |-- MNC: integer (nullable = true)
 |-- Network: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- governorate_en: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- RANGE: integer (nullable = true)
 |-- SAM: integer (nullable = true)
 |-- tower_status: string (nullable = true)
 |-- signal_quality: string (nullable = true)
 |-- coverage_gap: boolean (nullable = true)
 |-- priority: string (nullable = true)
 |-- signal_strength: integer (nullable = true)
 |-- speed: double (nullable = true)
 |-- latency: double (nullable = true)
 |-- QoE: double (nullable = true)
 |-- avg_load: double (nullable = true)
 |-- total_calls: integer (nullable = true)
 |-- drop_calls: integer (nullable = true)
 |-- drop_rate: double (nullable = true)
 

In [7]:
# STEP 1c – Define Checkpoints for Streams
checkpoint_dir_console = "/tmp/checkpoints/console"
checkpoint_dir_memory = "/tmp/checkpoints/memory"
checkpoint_dir_cassandra = "/tmp/checkpoints/cassandra_stream_v2"


In [8]:
# Rename columns
rename_dict = {
    "tower_id": "tower_id",
    "radio": "radio",
    "MCC": "mobile_country_code",
    "MNC": "mobile_network_code",
    "TAC": "tracking_area_code",
    "CID": "cell_id",
    "longitude": "longitude",
    "latitude": "latitude",
    "RANGE": "range",
    "SAM": "sampling_count",
    "created": "created",
    "updated": "updated",
    "governorate_en": "governorate_en",
    "Country": "country",
    "Network": "network",
    "drop_calls": "drop_calls",
    "total_calls": "total_calls",
    "drop_rate": "drop_rate",
    "avg_load": "avg_load",
    "signal_strength": "signal_strength",
    "speed": "speed",
    "latency": "latency",
    "QoE": "quality_of_experience",
    "coverage_gap": "coverage_gap",
    "signal_quality": "signal_quality",
    "tower_status": "tower_status",
    "priority": "priority",
    "maintenance_type": "maintenance_type",
    "created_dt": "created_dt",
    "updated_dt": "updated_dt",
    "maintenance_date": "maintenance_date",
    "labor_cost_egp": "labor_cost_egp",
    "parts_cost_egp": "parts_cost_egp",
    "downtime_hours": "downtime_hours",
    "vendor": "vendor",
    "notes": "notes"
}

In [9]:
df_renamed = df_parsed
for old, new in rename_dict.items():
    df_renamed = df_renamed.withColumnRenamed(old, new)


In [10]:
# STEP 4c: Enrichment
df_enriched = df_renamed \
    .withColumn("maint_total_cost", col("labor_cost_egp") + col("parts_cost_egp")) \
    .withColumn("is_high_risk", 
        (col("drop_rate") > 0.02) | 
        (col("signal_strength") < -95) | 
        (col("latency") > 150)) \
    .withColumn("updated_dt", to_timestamp(col("updated_dt"))) \
    .withColumn("created_dt", to_timestamp(col("created_dt"))) \
    .withColumn("is_anomaly",
        (col("drop_rate") > 0.1) |
        (col("latency") > 300) |
        (col("speed") < 1)) \
    .withColumn("lat_rounded", round(col("latitude"), 3)) \
    .withColumn("lon_rounded", round(col("longitude"), 3))

print("✅ Data Transformation Applied")

✅ Data Transformation Applied


In [11]:
# STEP 5: Console Stream — تأكيد وصول البيانات
console_query = df_enriched.writeStream \
    .format("console") \
    .outputMode("append") \
    .trigger(processingTime="5 seconds") \
    .option("checkpointLocation", checkpoint_dir_console) \
    .start()

In [12]:
# STEP 6: Memory Table Stream
df_with_watermark = df_enriched.withWatermark("updated_dt", "5 minutes")
query_memory = df_with_watermark.writeStream \
    .format("memory") \
    .queryName("live_towers_view") \
    .outputMode("append") \
    .trigger(processingTime='5 seconds') \
    .start()

print("✅ Memory table stream started: live_towers_view")

✅ Memory table stream started: live_towers_view


In [13]:
query_parquet = df_enriched.writeStream \
    .format("parquet") \
    .option("path", "/tmp/telecom_output") \
    .option("checkpointLocation", checkpoint_dir) \
    .outputMode("append") \
    .trigger(processingTime='10 seconds') \
    .start()

print("✅ Parquet Stream Started")

✅ Parquet Stream Started


In [14]:
def write_to_cassandra(batch_df, batch_id):
    batch_count = batch_df.count()
    if batch_count > 0:
        print(f"Batch {batch_id} contains {batch_count} rows — writing to Cassandra")
        batch_df.write \
            .format("org.apache.spark.sql.cassandra") \
            .mode("append") \
            .option("table", "tower_metrics") \
            .option("keyspace", "telecom") \
            .save()
        print(f"✅ Batch {batch_id} written successfully")
    else:
        print(f"Batch {batch_id} is empty — skipping")

query_cassandra = df_enriched.writeStream \
    .foreachBatch(write_to_cassandra) \
    .outputMode("append") \
    .trigger(processingTime="5 seconds") \
    .option("checkpointLocation", checkpoint_dir_cassandra) \
    .start()

print("✅ Cassandra Stream Started")


✅ Cassandra Stream Started
Batch 0 contains 5000 rows — writing to Cassandra
✅ Batch 0 written successfully
Batch 1 contains 5000 rows — writing to Cassandra
✅ Batch 1 written successfully
Batch 2 contains 5000 rows — writing to Cassandra
✅ Batch 2 written successfully
Batch 3 contains 5000 rows — writing to Cassandra
✅ Batch 3 written successfully
Batch 4 contains 5000 rows — writing to Cassandra
✅ Batch 4 written successfully
Batch 5 contains 5000 rows — writing to Cassandra
✅ Batch 5 written successfully
Batch 6 contains 5000 rows — writing to Cassandra
✅ Batch 6 written successfully
Batch 7 contains 5000 rows — writing to Cassandra
✅ Batch 7 written successfully
Batch 8 contains 5000 rows — writing to Cassandra
✅ Batch 8 written successfully
Batch 9 contains 5000 rows — writing to Cassandra
✅ Batch 9 written successfully
Batch 10 contains 5000 rows — writing to Cassandra


In [18]:
for q in spark.streams.active:
    print(f"Stopping query: {q.name}")
    q.stop()


Stopping query: None
Stopping query: live_towers_view
Stopping query: None
Stopping query: None


In [19]:
spark.stop()