In [1]:
# Cell 1: Import thư viện và lấy biến môi trường
import os
import json
import math
from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window, avg, count, udf, lit, current_timestamp, expr
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType, FloatType, IntegerType

# --- Lấy cấu hình từ Biến Môi trường ---
# Lưu ý: Các biến này được set trong docker-compose.yml cho service spark-jupyter
KAFKA_BROKERS = os.environ.get("KAFKA_BROKERS", "kafk-1:9092,kafk-2:9092,kafk-3:9092")
KAFKA_TOPIC = os.environ.get("KAFKA_TOPIC", "raw_traffic_data")
REDIS_HOST = os.environ.get("REDIS_HOST", "redis")
REDIS_PORT = int(os.environ.get("REDIS_PORT", 6379))
CLICKHOUSE_HOST = os.environ.get("CLICKHOUSE_HOST", "clickhouse-server")
CLICKHOUSE_NATIVE_PORT = int(os.environ.get("CLICKHOUSE_NATIVE_PORT", 9000))
CLICKHOUSE_DB = "traffic_db"
CLICKHOUSE_TABLE = "traffic_events"

print(f"Kafka Brokers: {KAFKA_BROKERS}")
print(f"Kafka Topic: {KAFKA_TOPIC}")
print(f"Redis Host: {REDIS_HOST}:{REDIS_PORT}")
print(f"ClickHouse Host: {CLICKHOUSE_HOST}:{CLICKHOUSE_NATIVE_PORT}")

# --- Schema cho dữ liệu JSON từ Kafka ---
schema = StructType([
    StructField("vehicle_id", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("speed", DoubleType(), True),
    StructField("segment_id_actual", StringType(), True) # Giữ lại để tham khảo/debug
])

# --- Dữ liệu đoạn đường tĩnh (Ví dụ - Nên tải từ file hoặc nguồn khác) ---
SEGMENTS_COORDS = {
    "segment_1": (10.98, 106.75, "Ngã tư 550 Area"), # lat, lon, name
    "segment_2": (10.94, 106.81, "KCN Sóng Thần Area"),
    "segment_3": (10.96, 106.78, "Trung tâm Dĩ An Area")
}
# Có thể tạo Spark DataFrame từ đây để join nếu cần làm giàu dữ liệu
# segments_sdf = spark.createDataFrame([(k, v[0], v[1], v[2]) for k, v in SEGMENTS_COORDS.items()], ["segment_id", "lat", "lon", "name"])
# segments_sdf.show()

# --- Các hàm tiện ích (Có thể tách ra utils.py) ---
def haversine(lon1, lat1, lon2, lat2):
    # (Giữ nguyên hàm haversine từ trước)
    R = 6371 # km
    dLat = math.radians(lat2 - lat1)
    dLon = math.radians(lon2 - lon1)
    lat1_rad = math.radians(lat1)
    lat2_rad = math.radians(lat2)
    a = math.sin(dLat/2)**2 + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(dLon/2)**2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    distance = R * c
    return distance

def find_nearest_segment(lat, lon):
    # (Giữ nguyên hàm map matching đơn giản)
    min_dist = float('inf')
    nearest_segment = "unknown"
    if lat is None or lon is None:
        return nearest_segment

    for segment_id, (seg_lat, seg_lon, _) in SEGMENTS_COORDS.items(): # Lấy tọa độ từ dict
        dist = haversine(lon, lat, seg_lon, seg_lat)
        # Ngưỡng khoảng cách tối đa để map (ví dụ: 1km)
        if dist < min_dist and dist < 1.0:
            min_dist = dist
            nearest_segment = segment_id
    return nearest_segment

find_nearest_segment_udf = udf(find_nearest_segment, StringType())

def classify_status(avg_speed):
    # (Giữ nguyên hàm phân loại)
    if avg_speed is None:
        return "unknown"
    elif avg_speed > 45: # Điều chỉnh ngưỡng tốc độ
        return "clear"
    elif avg_speed > 20:
        return "slow"
    else:
        return "congested"

classify_status_udf = udf(classify_status, StringType())

Kafka Brokers: kafk-1:9092,kafk-2:9092,kafk-3:9092
Kafka Topic: raw_traffic_data
Redis Host: redis:6379
ClickHouse Host: clickhouse-server:9000


In [2]:
# Cell 2: Khởi tạo Spark Session (Cập nhật)
from pyspark.sql import SparkSession
import os

# --- Xác định các gói Maven cần thiết ---
# Phiên bản phải tương thích với phiên bản Spark trong image Jupyter
# Ví dụ: image jupyter/pyspark-notebook:spark-3.3.3 dùng Spark 3.3.3 (Scala 2.12)
SPARK_VERSION = "3.3.3" # Hoặc version khác tương ứng với base image bạn chọn
SCALA_VERSION = "2.12"
KAFKA_PACKAGE = f"org.apache.spark:spark-sql-kafka-0-10_{SCALA_VERSION}:{SPARK_VERSION}"
# Gói hadoop-aws cần cho S3A (tương tác MinIO, ví dụ cho checkpoint)
# Phiên bản Hadoop thường đi kèm Spark (kiểm tra nếu cần)
HADOOP_VERSION = "3.3.2" # Thường tương thích với Spark 3.3.x
AWS_PACKAGE = f"org.apache.hadoop:hadoop-aws:{HADOOP_VERSION}"
# Đôi khi hadoop-aws cần thư viện phụ thuộc khác
# AWS_SDK_PACKAGE = "com.amazonaws:aws-java-sdk-bundle:1.11.1026" # Ví dụ

# Kết hợp các package lại
spark_packages = f"{KAFKA_PACKAGE},{AWS_PACKAGE}"
# Nếu cần thư viện phụ khác: spark_packages = f"{KAFKA_PACKAGE},{AWS_PACKAGE},{AWS_SDK_PACKAGE}"

print(f"--- Configuring Spark Session ---")
print(f"Attempting to include packages: {spark_packages}")

try:
    spark = SparkSession.builder \
        .appName("JupyterTrafficProcessor") \
        .master("local[*]") \
        .config("spark.jars.packages", spark_packages) \
        .config("spark.sql.streaming.checkpointLocation", "/tmp/spark-checkpoints-jupyter") \
        .config("spark.hadoop.fs.s3a.endpoint", os.environ.get("MINIO_ENDPOINT", "http://minio:9000")) \
        .config("spark.hadoop.fs.s3a.access.key", os.environ.get("MINIO_ACCESS_KEY", "minioadmin")) \
        .config("spark.hadoop.fs.s3a.secret.key", os.environ.get("MINIO_SECRET_KEY", "minioadmin")) \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .getOrCreate()

    # Set log level
    spark.sparkContext.setLogLevel("WARN")
    print(f"Spark Session created successfully. Spark version: {spark.version}")
    # In ra cấu hình packages thực tế để kiểm tra
    print(f"Actual spark.jars.packages config: {spark.conf.get('spark.jars.packages')}")
    print(f"--- Spark Session Ready ---")

except Exception as e:
    print(f"!!! Error creating Spark Session: {e}")
    # Dừng thực thi nếu không tạo được session
    raise e

--- Configuring Spark Session ---
Attempting to include packages: org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.3,org.apache.hadoop:hadoop-aws:3.3.2
Spark Session created successfully. Spark version: 3.5.0
Actual spark.jars.packages config: org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.3,org.apache.hadoop:hadoop-aws:3.3.2
--- Spark Session Ready ---


In [3]:
# Cell 3: Đọc dữ liệu từ Kafka
print(f"Reading stream from Kafka topic '{KAFKA_TOPIC}' at brokers '{KAFKA_BROKERS}'")

kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKERS) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("startingOffsets", "latest") \
    .option("failOnDataLoss", "false") \
    .load()

# Parse JSON và chọn các cột cần thiết, chuyển đổi kiểu dữ liệu
value_df = kafka_df.select(from_json(col("value").cast("string"), schema).alias("data")) \
                   .select("data.*") \
                   .withColumn("event_time", col("timestamp").cast(TimestampType())) \
                   .withColumn("latitude", col("latitude").cast(DoubleType())) \
                   .withColumn("longitude", col("longitude").cast(DoubleType())) \
                   .withColumn("speed", col("speed").cast(FloatType())) \
                   .filter(col("latitude").isNotNull() & col("longitude").isNotNull() & col("speed").isNotNull()) # Lọc dữ liệu null cơ bản

print("Kafka stream schema:")
value_df.printSchema()

Reading stream from Kafka topic 'raw_traffic_data' at brokers 'kafk-1:9092,kafk-2:9092,kafk-3:9092'
Kafka stream schema:
root
 |-- vehicle_id: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- speed: float (nullable = true)
 |-- segment_id_actual: string (nullable = true)
 |-- event_time: timestamp (nullable = true)



In [4]:
# Cell 4: Xử lý dữ liệu (Map Matching, Aggregation, Classification)

# 1. Map Matching
mapped_df = value_df.withColumn("segment_id", find_nearest_segment_udf(col("latitude"), col("longitude"))) \
                    .filter(col("segment_id") != "unknown") # Chỉ giữ lại các điểm map được

# (Tùy chọn) Thêm Data Quality Check (ví dụ lọc tốc độ bất thường)
# mapped_df = mapped_df.filter((col("speed") > 0) & (col("speed") < 150))

# (Tùy chọn) Enrichment - Join với dữ liệu tĩnh về segment (nếu có segments_sdf)
# enriched_df = mapped_df.join(segments_sdf, "segment_id", "left_outer")

# 2. Windowing & Aggregation
# Sử dụng df đã map (hoặc enriched nếu có)
agg_df = mapped_df \
    .withWatermark("event_time", "2 minutes") \
    .groupBy(
        window(col("event_time"), "1 minute", "30 seconds").alias("time_window"), # Cửa sổ 1 phút, trượt 30s
        col("segment_id")
    ) \
    .agg(
        avg("speed").alias("avg_speed"),
        count("*").alias("vehicle_count")
    )

# 3. Phân loại trạng thái
status_df = agg_df.withColumn("status", classify_status_udf(col("avg_speed"))) \
                  .select(
                      col("segment_id"),
                      col("time_window.start").alias("window_start"),
                      col("time_window.end").alias("window_end"),
                      col("avg_speed").cast(FloatType()), # Đảm bảo đúng kiểu dữ liệu
                      col("vehicle_count").cast(IntegerType()), # Đảm bảo đúng kiểu dữ liệu
                      col("status")
                   )


print("Aggregated stream schema:")
status_df.printSchema()

Aggregated stream schema:
root
 |-- segment_id: string (nullable = true)
 |-- window_start: timestamp (nullable = true)
 |-- window_end: timestamp (nullable = true)
 |-- avg_speed: float (nullable = true)
 |-- vehicle_count: integer (nullable = false)
 |-- status: string (nullable = true)



In [5]:
# Cell 5: Ghi dữ liệu vào Redis và ClickHouse (Dùng foreachBatch)

# --- Hàm ghi vào Redis ---
def write_to_redis(df, epoch_id):
    # Sử dụng df đã collect(), không nên collect trong production với dữ liệu lớn
    # Nên dùng df.foreachPartition thay thế để xử lý song song
    # Ví dụ này dùng collect() cho đơn giản trong notebook
    if df.isEmpty():
        return

    import redis # import bên trong để tránh lỗi serialization

    print(f"--- Redis Write (Epoch: {epoch_id}) ---")
    r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True)
    pipe = r.pipeline()
    try:
        # Chỉ lấy bản ghi mới nhất cho mỗi segment trong batch này
        # Dùng RDD để xử lý linh hoạt hơn
        latest_updates = df.rdd \
            .map(lambda row: (row.segment_id, row)) \
            .reduceByKey(lambda row1, row2: row1 if row1.window_end > row2.window_end else row2) \
            .map(lambda item: item[1]) \
            .collect()

        for row in latest_updates:
            segment_id = row.segment_id
            state = {
                "avg_speed": round(row.avg_speed, 2) if row.avg_speed else None,
                "vehicle_count": row.vehicle_count,
                "status": row.status,
                "window_end": row.window_end.isoformat()
            }
            key = f"segment:{segment_id}"
            value = json.dumps(state)
            print(f"  Updating Redis: {key} -> {value}")
            pipe.set(key, value)
        pipe.execute()
    except Exception as e:
        print(f"  Error writing to Redis: {e}")

# --- Hàm ghi vào ClickHouse ---
def write_to_clickhouse(df, epoch_id):
    if df.isEmpty():
        return

    # Sử dụng thư viện clickhouse-connect (hoặc clickhouse-driver)
    import clickhouse_connect # import bên trong

    print(f"--- ClickHouse Write (Epoch: {epoch_id}) ---")
    try:
        # Lấy dữ liệu dưới dạng list of tuples/dicts
        # Cần đảm bảo tên cột và thứ tự khớp với bảng ClickHouse
        data_to_insert = df.select(
            "segment_id",
            "window_start",
            "window_end",
            "avg_speed",
            "vehicle_count",
            "status"
            # processing_time sẽ được ClickHouse tự thêm DEFAULT now()
        ).collect() # Collect() không tốt cho production lớn!

        if not data_to_insert:
            print("  No data to insert.")
            return

        # Chuyển đổi Row thành list/tuple nếu cần bởi client
        # clickhouse-connect có thể nhận list of lists hoặc list of dicts
        data_list = [list(row) for row in data_to_insert]
        # Hoặc data_dict = [row.asDict() for row in data_to_insert]


        # Kết nối và insert
        client = clickhouse_connect.get_client(
            host=CLICKHOUSE_HOST,
            port=8123, # Dùng port Native TCP
            database=CLICKHOUSE_DB
            # Thêm username/password nếu ClickHouse có yêu cầu
        )
        print(f"  Inserting {len(data_list)} rows into ClickHouse table '{CLICKHOUSE_TABLE}'...")
        # Tên cột cần khớp chính xác thứ tự trong data_list
        client.insert(CLICKHOUSE_TABLE, data_list,
                      column_names=['segment_id', 'window_start', 'window_end', 'avg_speed', 'vehicle_count', 'status'])
        print(f"  Successfully inserted {len(data_list)} rows.")
        client.close()

    except Exception as e:
        print(f"  Error writing to ClickHouse: {e}")
        # Có thể thêm logic retry hoặc ghi log chi tiết hơn


# --- Khởi chạy Streaming Queries ---
# Output Mode 'update' thường dùng cho foreachBatch khi không có aggregation hoàn chỉnh
# Hoặc 'complete' nếu aggregation cho phép (ví dụ: count toàn bộ)
# 'append' nếu chỉ ghi dữ liệu mới không cập nhật state cũ

# Query ghi vào Redis và ClickHouse
combined_write_query = status_df.writeStream \
    .outputMode("update") \
    .option("checkpointLocation", "/tmp/spark-checkpoints/combined_write")\
    .foreachBatch(lambda df, epoch_id: [write_to_redis(df, epoch_id), write_to_clickhouse(df, epoch_id)]) \
    .start()


# (Tùy chọn) Query ghi ra console để debug
# console_query = status_df.writeStream \
#     .outputMode("update") \
#     .format("console") \
#     .option("truncate", "false") \
#     .option("numRows", 50) \
#     .start()

print("Streaming queries started. Check Spark UI (if accessible) and output logs.")
# Trong Jupyter, query sẽ chạy ngầm. Để dừng, bạn cần dùng:
# combined_write_query.stop()
# console_query.stop()

Streaming queries started. Check Spark UI (if accessible) and output logs.
--- Redis Write (Epoch: 11) ---
  Updating Redis: segment:segment_3 -> {"avg_speed": 41.51, "vehicle_count": 23, "status": "slow", "window_end": "2025-04-20T20:19:30"}
  Updating Redis: segment:segment_2 -> {"avg_speed": 35.62, "vehicle_count": 25, "status": "slow", "window_end": "2025-04-20T20:19:30"}
  Updating Redis: segment:segment_1 -> {"avg_speed": 31.74, "vehicle_count": 26, "status": "slow", "window_end": "2025-04-20T20:19:30"}
--- ClickHouse Write (Epoch: 11) ---
  Inserting 6 rows into ClickHouse table 'traffic_events'...
  Successfully inserted 6 rows.
--- Redis Write (Epoch: 12) ---
  Updating Redis: segment:segment_3 -> {"avg_speed": 31.38, "vehicle_count": 6, "status": "slow", "window_end": "2025-04-20T20:21:30"}
  Updating Redis: segment:segment_2 -> {"avg_speed": 29.67, "vehicle_count": 5, "status": "slow", "window_end": "2025-04-20T20:21:30"}
  Updating Redis: segment:segment_1 -> {"avg_speed": 