# Setup

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, to_timestamp, year as spark_year, month as spark_month, dayofmonth, lit, concat_ws, lpad
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, TimestampType # Thêm TimestampType nếu cần cho target_silver_schema

spark = SparkSession.builder \
    .appName("Databricks_BronzeSilver_Pipeline") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# THAY THẾ BẰNG THÔNG TIN NGROK CHÍNH XÁC
kafka_bootstrap_servers = "0.tcp.ap.ngrok.io:10802"
# Ví dụ: kafka_bootstrap_servers = "0.tcp.ap.ngrok.io:10802"
if kafka_bootstrap_servers == "YOUR_NGROK_HOSTNAME:YOUR_NGROK_PORT":
    print("!!! CẢNH BÁO: Vui lòng cập nhật `kafka_bootstrap_servers` với endpoint ngrok của bạn !!!")

kafka_topic = "test"

# Đây sẽ là schema chuẩn cho API và cũng là schema mục tiêu cho Silver
weather_data_schema_from_producer = StructType([
    StructField("time", StringType(), True), StructField("month", IntegerType(), True),
    StructField("year", IntegerType(), True), StructField("temperature", DoubleType(), True),
    StructField("feelslike", DoubleType(), True), StructField("wind", DoubleType(), True),
    StructField("direction", StringType(), True), StructField("gust", DoubleType(), True),
    StructField("cloud", IntegerType(), True), StructField("humidity", IntegerType(), True),
    StructField("precipitation", DoubleType(), True), StructField("pressure", DoubleType(), True),
    StructField("weather", StringType(), True), StructField("label", StringType(), True)
])

BASE_DBFS_PATH = "dbfs:/user/thanhtai/delta_pipeline" # Sử dụng một thư mục mới để tránh lẫn lộn nếu chạy lại
BRONZE_API_DBFS_PATH = f"{BASE_DBFS_PATH}/bronze/from_api"
BRONZE_CSV_DBFS_PATH = f"{BASE_DBFS_PATH}/bronze/from_csv"
SILVER_MERGED_DBFS_PATH = f"{BASE_DBFS_PATH}/silver/merged_weather"
GOLD_FEATURES_DBFS_PATH = f"{BASE_DBFS_PATH}/gold/weather_features"
GOLD_PREDICTIONS_DBFS_PATH = f"{BASE_DBFS_PATH}/gold/weather_predictions"
MODEL_DBFS_PATH = f"{BASE_DBFS_PATH}/model/weather_rf_model"
CHECKPOINT_API_DBFS_PATH = f"{BASE_DBFS_PATH}/_checkpoints/bronze_from_api_dbfs"

paths_to_create_dbfs = [
    BASE_DBFS_PATH, BRONZE_API_DBFS_PATH, BRONZE_CSV_DBFS_PATH,
    SILVER_MERGED_DBFS_PATH, GOLD_FEATURES_DBFS_PATH,
    GOLD_PREDICTIONS_DBFS_PATH, MODEL_DBFS_PATH, CHECKPOINT_API_DBFS_PATH
]
for path in paths_to_create_dbfs:
    try:
        dbutils.fs.mkdirs(path)
        print(f"Created DBFS path: {path}")
    except Exception as e:
        print(f"Could not create DBFS path {path}: {e}")
        pass # Bỏ qua nếu đã tồn tại hoặc lỗi khác

print("Setup complete. Các đường dẫn DBFS đã được chuẩn bị.")

Created DBFS path: dbfs:/user/thanhtai/delta_pipeline
Created DBFS path: dbfs:/user/thanhtai/delta_pipeline/bronze/from_api
Created DBFS path: dbfs:/user/thanhtai/delta_pipeline/bronze/from_csv
Created DBFS path: dbfs:/user/thanhtai/delta_pipeline/silver/merged_weather
Created DBFS path: dbfs:/user/thanhtai/delta_pipeline/gold/weather_features
Created DBFS path: dbfs:/user/thanhtai/delta_pipeline/gold/weather_predictions
Created DBFS path: dbfs:/user/thanhtai/delta_pipeline/model/weather_rf_model
Created DBFS path: dbfs:/user/thanhtai/delta_pipeline/_checkpoints/bronze_from_api_dbfs
Setup complete. Các đường dẫn DBFS đã được chuẩn bị.


# Kafka to Bronze API (Streaming)

In [0]:
raw_kafka_df_dbfs = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "latest") \
    .option("failOnDataLoss", "false") \
    .option("kafka.request.timeout.ms", "120000") \
    .option("kafka.session.timeout.ms", "60000") \
    .load()

parsed_api_df_dbfs = raw_kafka_df_dbfs \
    .selectExpr("CAST(value AS STRING) as json_string") \
    .withColumn("data", from_json(col("json_string"), weather_data_schema_from_producer)) \
    .select("data.*")

# Dừng query cũ nếu đang chạy để tránh lỗi checkpoint
for s in spark.streams.active:
    if CHECKPOINT_API_DBFS_PATH in s.lastProgress["sources"][0]["description"]: # Kiểm tra kỹ hơn
        print(f"Stopping existing stream: {s.id}")
        try:
            s.stop()
            s.awaitTermination() # Đợi stream dừng hẳn
            dbutils.fs.rm(CHECKPOINT_API_DBFS_PATH, recurse=True) # Xóa checkpoint cũ
            print(f"Removed old checkpoint: {CHECKPOINT_API_DBFS_PATH}")
        except Exception as e:
            print(f"Error stopping stream or removing checkpoint: {e}")


api_bronze_query_dbfs = parsed_api_df_dbfs.writeStream \
    .trigger(processingTime="30 seconds") \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", CHECKPOINT_API_DBFS_PATH) \
    .option("mergeSchema", "true") \
    .start(BRONZE_API_DBFS_PATH)

print(f"Streaming query {api_bronze_query_dbfs.id} to {BRONZE_API_DBFS_PATH} started.")
# Để query này chạy ngầm. Để dừng: api_bronze_query_dbfs.stop()
# Có thể kiểm tra trạng thái bằng: api_bronze_query_dbfs.status
# Hoặc xem các stream đang hoạt động: spark.streams.active

Streaming query bf73165b-f80c-4e30-8a81-fa0bfd477ca6 to dbfs:/user/thanhtai/delta_pipeline/bronze/from_api started.


# CSV to Bronze/from_csv (Batch)

In [0]:
# Đảm bảo đã upload file này lên DBFS, ví dụ: dbfs:/FileStore/thanhtai_sample_data/weather_data_from_source.csv
csv_input_dbfs_path = "dbfs:/FileStore/thanhtai_sample_data/weather_data_from_source.csv"

try:
    dbutils.fs.ls(csv_input_dbfs_path)
    print(f"File CSV được tìm thấy tại: {csv_input_dbfs_path}")

    df_csv_dbfs_raw = spark.read.format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load(csv_input_dbfs_path)

    print("Schema của dữ liệu CSV sau khi Spark suy luận:")
    df_csv_dbfs_raw.printSchema()
    display(df_csv_dbfs_raw.limit(5))

    df_csv_dbfs_raw.write.format("delta") \
        .mode("overwrite") \
        .option("mergeSchema", "true") \
        .save(BRONZE_CSV_DBFS_PATH)

    print(f"Ghi dữ liệu CSV (với schema suy luận) vào Bronze (DBFS) thành công: {BRONZE_CSV_DBFS_PATH}")
    display(spark.read.format("delta").load(BRONZE_CSV_DBFS_PATH).limit(5))

except Exception as e:
    if "java.io.FileNotFoundException" in str(e) or "Path does not exist" in str(e):
        print(f"LỖI: File CSV không tìm thấy tại '{csv_input_dbfs_path}'. Vui lòng upload file lên DBFS trước.")
    else:
        print(f"Lỗi khi xử lý CSV to Bronze: {e}")

File CSV được tìm thấy tại: dbfs:/FileStore/thanhtai_sample_data/weather_data_from_source.csv
Schema của dữ liệu CSV sau khi Spark suy luận:
root
 |-- time: timestamp (nullable = true)
 |-- month: integer (nullable = true)
 |-- temperature: double (nullable = true)
 |-- feelslike: double (nullable = true)
 |-- wind: double (nullable = true)
 |-- direction: string (nullable = true)
 |-- gust: double (nullable = true)
 |-- cloud: double (nullable = true)
 |-- humidity: double (nullable = true)
 |-- precipitation: double (nullable = true)
 |-- pressure: double (nullable = true)
 |-- weather: string (nullable = true)



time,month,temperature,feelslike,wind,direction,gust,cloud,humidity,precipitation,pressure,weather
2025-05-25T00:00:00.000+0000,1,24.0,28.0,8.0,ENE,12.0,4.0,86.0,0.0,1012.0,Clear
2025-05-25T03:00:00.000+0000,1,23.0,27.0,8.0,NE,10.0,4.0,88.0,0.0,1011.0,Clear
2025-05-25T06:00:00.000+0000,1,23.0,26.0,8.0,NNE,11.0,7.0,85.0,0.0,1012.0,Sunny
2025-05-25T09:00:00.000+0000,1,28.0,33.0,11.0,NNE,13.0,6.0,64.0,0.0,1012.0,Sunny
2025-05-25T12:00:00.000+0000,1,31.0,35.0,10.0,ENE,12.0,62.0,53.0,0.0,1010.0,Partly cloudy


Ghi dữ liệu CSV (với schema suy luận) vào Bronze (DBFS) thành công: dbfs:/user/thanhtai/delta_pipeline/bronze/from_csv


time,month,temperature,feelslike,wind,direction,gust,cloud,humidity,precipitation,pressure,weather
2025-05-25T00:00:00.000+0000,1,24.0,28.0,8.0,ENE,12.0,4.0,86.0,0.0,1012.0,Clear
2025-05-25T03:00:00.000+0000,1,23.0,27.0,8.0,NE,10.0,4.0,88.0,0.0,1011.0,Clear
2025-05-25T06:00:00.000+0000,1,23.0,26.0,8.0,NNE,11.0,7.0,85.0,0.0,1012.0,Sunny
2025-05-25T09:00:00.000+0000,1,28.0,33.0,11.0,NNE,13.0,6.0,64.0,0.0,1012.0,Sunny
2025-05-25T12:00:00.000+0000,1,31.0,35.0,10.0,ENE,12.0,62.0,53.0,0.0,1010.0,Partly cloudy


# Bronze to Silver Merge (Batch) 

In [0]:
print(f"Đọc dữ liệu từ Bronze API Table (DBFS): {BRONZE_API_DBFS_PATH}")
df_api_raw_dbfs = spark.read.format("delta").load(BRONZE_API_DBFS_PATH)

print(f"Đọc dữ liệu từ Bronze CSV Table (DBFS, schema được suy luận): {BRONZE_CSV_DBFS_PATH}")
df_csv_raw_dbfs = spark.read.format("delta").load(BRONZE_CSV_DBFS_PATH)

print("\nSchema gốc của df_api_raw_dbfs (từ Kafka producer):")
df_api_raw_dbfs.printSchema()
print("Schema gốc của df_csv_raw_dbfs (từ CSV, suy luận):")
df_csv_raw_dbfs.printSchema()

# --- Thống nhất Schema ---
# Sử dụng weather_data_schema_from_producer từ Cell 1 làm schema mục tiêu cho Silver.
target_silver_schema = weather_data_schema_from_producer
target_silver_columns = [field.name for field in target_silver_schema.fields]

# Hàm trợ giúp để căn chỉnh DataFrame theo schema mục tiêu (tương tự VMWare script)
def align_df_to_schema_dbfs(df, schema_target, df_name="DataFrame"):
    df_aligned = df
    existing_cols_lower = {c.lower(): c for c in df_aligned.columns}
    select_expressions = []

    for field in schema_target.fields:
        target_col_name = field.name
        target_col_type = field.dataType
        source_col_name_found = None
        if target_col_name.lower() in existing_cols_lower:
            source_col_name_found = existing_cols_lower[target_col_name.lower()]

        if source_col_name_found:
            current_col_type = df_aligned.schema[source_col_name_found].dataType
            if current_col_type == target_col_type:
                select_expressions.append(col(source_col_name_found).alias(target_col_name))
            else:
                print(f"Thông báo ({df_name}): Ép kiểu cột '{source_col_name_found}' từ {current_col_type} sang {target_col_type} (đổi tên thành '{target_col_name}').")
                try:
                    select_expressions.append(col(source_col_name_found).cast(target_col_type).alias(target_col_name))
                except Exception as cast_error:
                    print(f"CẢNH BÁO ({df_name}): Không thể ép kiểu cột '{source_col_name_found}' sang {target_col_type}. Đặt thành NULL. Lỗi: {cast_error}")
                    select_expressions.append(lit(None).cast(target_col_type).alias(target_col_name))
        else:
            print(f"Thông báo ({df_name}): Cột '{target_col_name}' không tìm thấy. Thêm cột này với giá trị NULL.")
            select_expressions.append(lit(None).cast(target_col_type).alias(target_col_name))
    return df_aligned.select(select_expressions)

# Căn chỉnh df_api_raw_dbfs
print(f"\n--- Căn chỉnh df_api_raw_dbfs ---")
df_api_aligned_dbfs = align_df_to_schema_dbfs(df_api_raw_dbfs, target_silver_schema, "df_api_raw_dbfs")

# Căn chỉnh df_csv_raw_dbfs
print(f"\n--- Căn chỉnh df_csv_raw_dbfs ---")
df_csv_aligned_dbfs = align_df_to_schema_dbfs(df_csv_raw_dbfs, target_silver_schema, "df_csv_raw_dbfs")

print("\nSchema của df_api_aligned_dbfs (sau khi chuẩn hóa):")
df_api_aligned_dbfs.printSchema()
print("Schema của df_csv_aligned_dbfs (sau khi chuẩn hóa):")
df_csv_aligned_dbfs.printSchema()

# Gộp dữ liệu
df_merged_bronze_dbfs = df_api_aligned_dbfs.unionByName(df_csv_aligned_dbfs, allowMissingColumns=False)

# Tạo event_timestamp và các cột partition
df_silver_transformed_dbfs = df_merged_bronze_dbfs \
    .withColumn("date_str_for_timestamp",
                concat_ws("-",
                          col("year").cast("string"),
                          lpad(col("month").cast("string"), 2, "0"),
                          lit("01"))) \
    .withColumn("datetime_str_for_timestamp", concat_ws(" ", col("date_str_for_timestamp"), col("time"))) \
    .withColumn("event_timestamp", to_timestamp(col("datetime_str_for_timestamp"), "yyyy-MM-dd HH:mm")) \
    .withColumn("year_partition", spark_year(col("event_timestamp"))) \
    .withColumn("month_partition", spark_month(col("event_timestamp"))) \
    .withColumn("day_partition", dayofmonth(col("event_timestamp"))) \
    .drop("date_str_for_timestamp", "datetime_str_for_timestamp") \
    .dropDuplicates(["year", "month", "time", "weather", "temperature", "direction"])

print("\nSchema của Silver DataFrame cuối cùng (DBFS):")
df_silver_transformed_dbfs.printSchema()

df_silver_transformed_dbfs.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .partitionBy("year_partition", "month_partition", "day_partition") \
    .save(SILVER_MERGED_DBFS_PATH)

print(f"\nGhi dữ liệu vào Silver (DBFS) thành công: {SILVER_MERGED_DBFS_PATH}")
display(spark.read.format("delta").load(SILVER_MERGED_DBFS_PATH).limit(10))

Đọc dữ liệu từ Bronze API Table (DBFS): dbfs:/user/thanhtai/delta_pipeline/bronze/from_api
Đọc dữ liệu từ Bronze CSV Table (DBFS, schema được suy luận): dbfs:/user/thanhtai/delta_pipeline/bronze/from_csv

Schema gốc của df_api_raw_dbfs (từ Kafka producer):
root
 |-- time: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- temperature: double (nullable = true)
 |-- feelslike: double (nullable = true)
 |-- wind: double (nullable = true)
 |-- direction: string (nullable = true)
 |-- gust: double (nullable = true)
 |-- cloud: integer (nullable = true)
 |-- humidity: integer (nullable = true)
 |-- precipitation: double (nullable = true)
 |-- pressure: double (nullable = true)
 |-- weather: string (nullable = true)
 |-- label: string (nullable = true)

Schema gốc của df_csv_raw_dbfs (từ CSV, suy luận):
root
 |-- time: timestamp (nullable = true)
 |-- month: integer (nullable = true)
 |-- temperature: double (nullable = true)
 |-- feelslik

time,month,year,temperature,feelslike,wind,direction,gust,cloud,humidity,precipitation,pressure,weather,label,event_timestamp,year_partition,month_partition,day_partition
2025-05-25 03:00:00,12,,20.0,22.0,10.0,N,12.0,3,70,0.0,1014.0,Clear,,,,,
2025-05-25 03:00:00,12,,20.0,21.0,10.0,N,16.0,74,67,0.0,1014.0,Cloudy,,,,,
2025-05-25 06:00:00,12,,20.0,22.0,13.0,NNE,15.0,2,71,0.0,1015.0,Sunny,,,,,
2025-05-25 00:00:00,12,,21.0,21.0,10.0,N,16.0,46,67,0.0,1014.0,Partly cloudy,,,,,
2025-05-25 06:00:00,12,,21.0,21.0,10.0,N,13.0,37,67,0.0,1015.0,Partly cloudy,,,,,
2025-05-25 06:00:00,1,,21.0,22.0,9.0,N,11.0,8,72,0.0,1013.0,Sunny,,,,,
2025-05-25 03:00:00,2,,21.0,22.0,11.0,NE,15.0,7,65,0.0,1016.0,Clear,,,,,
2025-05-25 06:00:00,2,,21.0,23.0,8.0,NE,10.0,10,70,0.0,1018.0,Sunny,,,,,
2025-05-25 00:00:00,12,,21.0,23.0,10.0,NNE,12.0,0,67,0.0,1014.0,Clear,,,,,
2025-05-25 03:00:00,1,,21.0,22.0,9.0,NNE,15.0,22,74,0.0,1014.0,Clear,,,,,
