In [5]:
import json
import time
from datetime import datetime, timezone
from kafka import KafkaProducer
import random

BOOTSTRAP_SERVERS = "localhost:9092"
TOPIC = "telecom.tower.network"

producer = KafkaProducer(
    bootstrap_servers=BOOTSTRAP_SERVERS,
    key_serializer=lambda k: k.encode("utf-8"),
    value_serializer=lambda v: json.dumps(v).encode("utf-8"),
    acks="all",
    retries=5
)

TOWERS = [
    {"id": "TOWER_1", "region": "Cairo"},
    {"id": "TOWER_2", "region": "Giza"},
    {"id": "TOWER_3", "region": "Alex"}
]

tower_states = {}
for tower in TOWERS:
    tower_states[tower["id"]] = {
        "latency_ms": random.randint(80, 150),
        "throughput_mbps": random.randint(20, 60),
        "packet_loss": round(random.uniform(0, 0.05), 3),
        "active_users": random.randint(50, 200)
    }

print(" Unified Network Metrics Producer Started...")

while True:
    document = {"event_time": datetime.now(timezone.utc).isoformat(), "towers": []}

    for tower in TOWERS:
        state = tower_states[tower["id"]]

        state["latency_ms"] = max(10, state["latency_ms"] + random.randint(-5, 5))
        state["throughput_mbps"] = max(1, state["throughput_mbps"] + random.randint(-5, 5))
        state["packet_loss"] = max(0, round(state["packet_loss"] + random.uniform(-0.01, 0.01), 3))
        state["active_users"] = max(0, state["active_users"] + random.randint(-5, 5))

        tower_doc = {
            "tower_id": tower["id"],
            "region": tower["region"],
            "latency_ms": state["latency_ms"],
            "throughput_mbps": state["throughput_mbps"],
            "packet_loss": state["packet_loss"],
            "active_users": state["active_users"]
        }
        document["towers"].append(tower_doc)

    producer.send(
        topic=TOPIC,
        key="ALL_TOWERS",
        value=document
    )

    print("Sent Unified Network Document:", document)
    time.sleep(10)


 Unified Network Metrics Producer Started...
Sent Unified Network Document: {'event_time': '2026-02-07T22:06:12.178185+00:00', 'towers': [{'tower_id': 'TOWER_1', 'region': 'Cairo', 'latency_ms': 117, 'throughput_mbps': 34, 'packet_loss': 0.029, 'active_users': 184}, {'tower_id': 'TOWER_2', 'region': 'Giza', 'latency_ms': 148, 'throughput_mbps': 32, 'packet_loss': 0.052, 'active_users': 185}, {'tower_id': 'TOWER_3', 'region': 'Alex', 'latency_ms': 149, 'throughput_mbps': 29, 'packet_loss': 0.008, 'active_users': 75}]}
Sent Unified Network Document: {'event_time': '2026-02-07T22:06:22.298815+00:00', 'towers': [{'tower_id': 'TOWER_1', 'region': 'Cairo', 'latency_ms': 117, 'throughput_mbps': 38, 'packet_loss': 0.021, 'active_users': 181}, {'tower_id': 'TOWER_2', 'region': 'Giza', 'latency_ms': 153, 'throughput_mbps': 37, 'packet_loss': 0.057, 'active_users': 190}, {'tower_id': 'TOWER_3', 'region': 'Alex', 'latency_ms': 145, 'throughput_mbps': 25, 'packet_loss': 0.003, 'active_users': 73}]}

KeyboardInterrupt: 

create Radio&Cell metrics 
-------------------------

In [None]:
import json
import time
from datetime import datetime, timezone
from kafka import KafkaProducer
import random

BOOTSTRAP_SERVERS = "localhost:9092"
TOPIC = "telecom.tower.radio"

producer = KafkaProducer(
    bootstrap_servers=BOOTSTRAP_SERVERS,
    key_serializer=lambda k: k.encode("utf-8"),
    value_serializer=lambda v: json.dumps(v).encode("utf-8"),
    acks="all",
    retries=5
)

TOWERS = [
    {"id": "TOWER_1", "region": "Cairo"},
    {"id": "TOWER_2", "region": "Giza"},
    {"id": "TOWER_3", "region": "Alex"}
]

# Stateful
tower_states = {}
for tower in TOWERS:
    tower_states[tower["id"]] = {
        "signal_dbm": random.randint(-110, -90),
        "cell_load_pct": random.randint(30, 60),
        "handover_rate": round(random.uniform(0.0, 2.0), 2),
        "drop_call_rate": round(random.uniform(0.0, 1.0), 2)
    }

print("üì° Unified Radio Metrics Producer Started...")

while True:
    document = {"event_time": datetime.now(timezone.utc).isoformat(), "towers": []}

    for tower in TOWERS:
        state = tower_states[tower["id"]]

        state["signal_dbm"] = max(-120, min(-50, state["signal_dbm"] + random.randint(-2, 2)))
        state["cell_load_pct"] = max(0, min(100, state["cell_load_pct"] + random.randint(-5, 5)))
        state["handover_rate"] = max(0, round(state["handover_rate"] + random.uniform(-0.2, 0.2), 2))
        state["drop_call_rate"] = max(0, round(state["drop_call_rate"] + random.uniform(-0.1, 0.1), 2))

        tower_doc = {
            "tower_id": tower["id"],
            "region": tower["region"],
            "signal_dbm": state["signal_dbm"],
            "cell_load_pct": state["cell_load_pct"],
            "handover_rate": state["handover_rate"],
            "drop_call_rate": state["drop_call_rate"]
        }
        document["towers"].append(tower_doc)

    producer.send(
        topic=TOPIC,
        key="ALL_TOWERS",
        value=document
    )

    print("Sent Unified Radio Document:", document)
    time.sleep(10)



create Hardware&System metrics 
--------------------

In [4]:
import json
import time
from datetime import datetime, timezone
from kafka import KafkaProducer
import random

# ===============================
# Kafka Config
# ===============================
BOOTSTRAP_SERVERS = "localhost:9092"
TOPIC = "telecom.tower.system"

producer = KafkaProducer(
    bootstrap_servers=BOOTSTRAP_SERVERS,
    key_serializer=lambda k: k.encode("utf-8"),
    value_serializer=lambda v: json.dumps(v).encode("utf-8"),
    acks="all",
    retries=5
)

# ===============================
# Towers Initial State
# ===============================
TOWERS = [
    {"id": "TOWER_1", "region": "Cairo"},
    {"id": "TOWER_2", "region": "Giza"},
    {"id": "TOWER_3", "region": "Alex"}
]

# ŸÑŸÉŸÑ tower ŸÜÿÆÿ≤ŸÜ state
tower_states = {}
for tower in TOWERS:
    tower_states[tower["id"]] = {
        "cpu_pct": random.randint(20, 40),
        "memory_pct": random.randint(30, 50),
        "power_kw": round(random.uniform(2.5, 3.5), 2),
        "battery_level_pct": random.randint(70, 100)
    }

print("üñ•Ô∏è Unified System Metrics Producer Started...")

# ===============================
# Streaming Loop
# ===============================
while True:
    document = {"event_time": datetime.now(timezone.utc).isoformat(), "towers": []}

    for tower in TOWERS:
        state = tower_states[tower["id"]]

        # Update each metric gradually
        state["cpu_pct"] = min(100, max(0, state["cpu_pct"] + random.randint(-5, 5)))
        state["memory_pct"] = min(100, max(0, state["memory_pct"] + random.randint(-3, 3)))
        state["power_kw"] = round(max(0, state["power_kw"] + random.uniform(-0.2, 0.2)), 2)
        state["battery_level_pct"] = min(100, max(0, state["battery_level_pct"] - random.uniform(0, 1)))

        # Append tower data to document
        tower_doc = {
            "tower_id": tower["id"],
            "region": tower["region"],
            "cpu_pct": state["cpu_pct"],
            "memory_pct": state["memory_pct"],
            "power_kw": state["power_kw"],
            "battery_level_pct": round(state["battery_level_pct"], 2)
        }
        document["towers"].append(tower_doc)

    # Send unified document as one message
    producer.send(
        topic=TOPIC,
        key="ALL_TOWERS",
        value=document
    )

    print("Sent Unified Document:", document)
    time.sleep(10)


üñ•Ô∏è Unified System Metrics Producer Started...
Sent Unified Document: {'event_time': '2026-02-07T21:35:52.476181+00:00', 'towers': [{'tower_id': 'TOWER_1', 'region': 'Cairo', 'cpu_pct': 24, 'memory_pct': 35, 'power_kw': 2.91, 'battery_level_pct': 93.95}, {'tower_id': 'TOWER_2', 'region': 'Giza', 'cpu_pct': 30, 'memory_pct': 35, 'power_kw': 2.86, 'battery_level_pct': 72.24}, {'tower_id': 'TOWER_3', 'region': 'Alex', 'cpu_pct': 45, 'memory_pct': 37, 'power_kw': 2.75, 'battery_level_pct': 88.67}]}
Sent Unified Document: {'event_time': '2026-02-07T21:36:02.597659+00:00', 'towers': [{'tower_id': 'TOWER_1', 'region': 'Cairo', 'cpu_pct': 22, 'memory_pct': 33, 'power_kw': 3.07, 'battery_level_pct': 93.45}, {'tower_id': 'TOWER_2', 'region': 'Giza', 'cpu_pct': 31, 'memory_pct': 33, 'power_kw': 2.72, 'battery_level_pct': 71.58}, {'tower_id': 'TOWER_3', 'region': 'Alex', 'cpu_pct': 47, 'memory_pct': 36, 'power_kw': 2.86, 'battery_level_pct': 87.68}]}
Sent Unified Document: {'event_time': '2026

KeyboardInterrupt: 

create eniveronmental metrics
------------------

In [None]:
import json
import time
from datetime import datetime, timezone
from kafka import KafkaProducer
import random

BOOTSTRAP_SERVERS = "localhost:9092"
TOPIC = "telecom.tower.environment"

producer = KafkaProducer(
    bootstrap_servers=BOOTSTRAP_SERVERS,
    key_serializer=lambda k: k.encode("utf-8"),
    value_serializer=lambda v: json.dumps(v).encode("utf-8"),
    acks="all",
    retries=5
)

TOWERS = [
    {"id": "TOWER_1", "region": "Cairo"},
    {"id": "TOWER_2", "region": "Giza"},
    {"id": "TOWER_3", "region": "Alex"}
]

tower_states = {}
for tower in TOWERS:
    tower_states[tower["id"]] = {
        "temperature_c": random.randint(25, 35),
        "humidity_pct": random.randint(30, 60),
        "wind_speed_kmh": random.randint(5, 20)
    }

print("üå°Ô∏è Unified Environment Metrics Producer Started...")

while True:
    document = {"event_time": datetime.now(timezone.utc).isoformat(), "towers": []}

    for tower in TOWERS:
        state = tower_states[tower["id"]]

        state["temperature_c"] = round(state["temperature_c"] + random.uniform(-0.5, 0.5), 1)
        state["humidity_pct"] = round(max(0, min(100, state["humidity_pct"] + random.uniform(-2, 2))), 1)
        state["wind_speed_kmh"] = round(max(0, state["wind_speed_kmh"] + random.uniform(-1, 1)), 1)

        tower_doc = {
            "tower_id": tower["id"],
            "region": tower["region"],
            "temperature_c": state["temperature_c"],
            "humidity_pct": state["humidity_pct"],
            "wind_speed_kmh": state["wind_speed_kmh"]
        }
        document["towers"].append(tower_doc)

    producer.send(
        topic=TOPIC,
        key="ALL_TOWERS",
        value=document
    )

    print("Sent Unified Environment Document:", document)
    time.sleep(10)


üå°Ô∏è Environment Metrics Producer Started...
Sent: {'event_time': '2026-02-07T21:01:05.443607+00:00', 'tower_id': 'TOWER_3', 'region': 'Alex', 'temperature_c': 48, 'humidity_pct': 59, 'wind_speed_kmh': 7.06}
Sent: {'event_time': '2026-02-07T21:01:15.710575+00:00', 'tower_id': 'TOWER_2', 'region': 'Giza', 'temperature_c': 27, 'humidity_pct': 40, 'wind_speed_kmh': 15.56}
Sent: {'event_time': '2026-02-07T21:01:34.872674+00:00', 'tower_id': 'TOWER_2', 'region': 'Giza', 'temperature_c': 24, 'humidity_pct': 78, 'wind_speed_kmh': 22.27}
Sent: {'event_time': '2026-02-07T21:01:44.887503+00:00', 'tower_id': 'TOWER_1', 'region': 'Cairo', 'temperature_c': 30, 'humidity_pct': 80, 'wind_speed_kmh': 37.07}
Sent: {'event_time': '2026-02-07T21:01:54.905011+00:00', 'tower_id': 'TOWER_1', 'region': 'Cairo', 'temperature_c': 43, 'humidity_pct': 78, 'wind_speed_kmh': 27.73}
Sent: {'event_time': '2026-02-07T21:02:04.919533+00:00', 'tower_id': 'TOWER_1', 'region': 'Cairo', 'temperature_c': 38, 'humidity_p

KeyboardInterrupt: 

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, explode
from pyspark.sql.types import *

# Create Spark Session with Kafka support
spark = SparkSession.builder \
    .appName("TelecomStreamingConsumer") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

#  Read Stream From Kafka
df_kafka = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "telecom.tower.system") \
    .option("startingOffsets", "latest") \
    .load()

# 3Ô∏è‚É£ Convert value from binary ‚Üí string
df_string = df_kafka.selectExpr("CAST(value AS STRING)")

# 4Ô∏è‚É£ Define JSON Schema
schema = StructType([
    StructField("event_time", StringType()),
    StructField("towers", ArrayType(
        StructType([
            StructField("tower_id", StringType()),
            StructField("region", StringType()),
            StructField("cpu_pct", IntegerType()),
            StructField("memory_pct", IntegerType()),
            StructField("power_kw", DoubleType()),
            StructField("battery_level_pct", DoubleType())
        ])
    ))
])

# 5Ô∏è‚É£ Parse JSON
df_json = df_string.select(
    from_json(col("value"), schema).alias("data")
)

# 6Ô∏è‚É£ Explode towers array
df_exploded = df_json.select(
    col("data.event_time"),
    explode(col("data.towers")).alias("tower")
)

df_final = df_exploded.select(
    "event_time",
    "tower.tower_id",
    "tower.region",
    "tower.cpu_pct",
    "tower.memory_pct",
    "tower.power_kw",
    "tower.battery_level_pct"
)

# 7Ô∏è‚É£ Write To Console (ÿßÿÆÿ™ÿ®ÿßÿ±)
query = df_final.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, MapType

# =========================================
# 1Ô∏è‚É£ Spark Session
# =========================================
spark = SparkSession.builder \
    .appName("TelecomTowerRawDocuments") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# =========================================
# 2Ô∏è‚É£ Generic Document Schema
# =========================================

tower_schema = MapType(StringType(), StringType())

document_schema = StructType([
    StructField("event_time", StringType()),
    StructField("towers", ArrayType(tower_schema))
])

# =========================================
# 3Ô∏è‚É£ Function to Read Any Topic
# =========================================

def read_topic(topic_name):
    df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", topic_name) \
        .option("startingOffsets", "latest") \
        .load()

    parsed_df = df.selectExpr("CAST(value AS STRING) as json_str") \
        .select(from_json(col("json_str"), document_schema).alias("data")) \
        .select("data.*")

    return parsed_df

# =========================================
# 4Ô∏è‚É£ Read All 4 Streams
# =========================================

system_df = read_topic("telecom.tower.system")
radio_df = read_topic("telecom.tower.radio")
env_df = read_topic("telecom.tower.environment")
network_df = read_topic("telecom.tower.network")

# =========================================
# 5Ô∏è‚É£ Output Each Stream to Console
# =========================================

query1 = system_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", False) \
    .start()

query2 = radio_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", False) \
    .start()

query3 = env_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", False) \
    .start()

query4 = network_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", False) \
    .start()

spark.streams.awaitAnyTermination()


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/02/15 22:16:08 WARN Utils: Your hostname, refat, resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
26/02/15 22:16:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/15 22:16:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of Structured Streaming + Kafka Integration Guide.