In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import expr, when, from_json, col, to_timestamp, date_format
from datetime import datetime

curr_time = datetime.now().strftime("%y-%m-%d")
# Schema cho dữ liệu 
schema = StructType([
    StructField("coord", StructType([
        StructField("lon", DoubleType()),
        StructField("lat", DoubleType())
    ])),
    StructField("weather", ArrayType(StructType([
        StructField("id", IntegerType()),
        StructField("main", StringType()),
        StructField("description", StringType()),
        StructField("icon", StringType())
    ]))),
    StructField("base", StringType()),
    StructField("main", StructType([
        StructField("temp", DoubleType()),
        StructField("feels_like", DoubleType()),
        StructField("temp_min", DoubleType()),
        StructField("temp_max", DoubleType()),
        StructField("pressure", IntegerType()),
        StructField("humidity", IntegerType()),
        StructField("sea_level", IntegerType()),
        StructField("grnd_level", IntegerType())
    ])),
    StructField("visibility", IntegerType()),
    StructField("wind", StructType([
        StructField("speed", DoubleType()),
        StructField("deg", IntegerType()),
        StructField("gust", DoubleType())
    ])),
    StructField("rain", StructType([
        StructField("1h", DoubleType())
    ]), True),
    StructField("clouds", StructType([
        StructField("all", IntegerType())
    ])),
    StructField("dt", LongType()),
    StructField("sys", StructType([
        StructField("type", IntegerType()),
        StructField("id", IntegerType()),
        StructField("country", StringType()),
        StructField("sunrise", LongType()),
        StructField("sunset", LongType())
    ])),
    StructField("timezone", IntegerType()),
    StructField("id", IntegerType()),
    StructField("name", StringType()),
    StructField("cod", IntegerType())
])

jars = [
    "/home/jovyan/work/jars/spark-sql-kafka-0-10_2.12-3.5.0.jar",
    "/home/jovyan/work/jars/kafka-clients-3.5.0.jar",
    "/home/jovyan/work/jars/spark-token-provider-kafka-0-10_2.12-3.5.0.jar",
    "/home/jovyan/work/jars/spark-streaming-kafka-0-10_2.13-3.5.0.jar",
    "/home/jovyan/work/jars/commons-pool2-2.12.0.jar",
]

spark = SparkSession.builder \
    .appName("Transform Process") \
    .master("spark://spark-master:7077") \
    .config("spark.executor.memory", '4g') \
    .config("spark.executor.cores", '4') \
    .config("spark.driver.memory", '4g') \
    .config("spark.driver.cores", '4') \
    .config("spark.cores.max", "2") \
    .config("spark.jars", ",".join(jars)) \
    .config("spark.sql.session.timeZone", "Asia/Ho_Chi_Minh") \
    .config("spark.executorEnv.TZ", "Asia/Ho_Chi_Minh") \
    .config("spark.hadoop.fs.defaultFS","hdfs://namenode:9000") \
    .getOrCreate()

# Đọc dữ liệu streaming:
df = spark.readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", "kafka:9092") \
            .option("subscribe", "weather_data") \
            .option("startingOffsets", "latest") \
            .option("failOnDataLoss", "false") \
            .load()

parsed_df = df.select(
    from_json(col("value").cast("string"), schema).alias("data")) \
    .select("data.*")

# Chuyển đổi và làm phẳng dữ liệu
processed_df = parsed_df.select(
    col("id").alias("city_id"),
    col("name").alias("city_name"),
    col("coord.lat").alias("latitude"),
    col("coord.lon").alias("longitude"),
    to_timestamp(col("dt")).alias("timestamp"),
    col("weather")[0]["main"].alias("weather_condition"),
    col("weather")[0]["description"].alias("weather_description"),
    (col("main.temp") - 273.15).alias("temperature_celsius"),
    (col("main.feels_like") - 273.15).alias("feels_like_celsius"),
    (col("main.temp_min") - 273.15).alias("temp_min_celsius"),
    (col("main.temp_max") - 273.15).alias("temp_max_celsius"),
    col("main.pressure").alias("pressure_hpa"),
    col("main.humidity").alias("humidity_percent"),
    col("wind.speed").alias("wind_speed_mps"),
    col("wind.deg").alias("wind_direction_deg"),
    col("wind.gust").alias("wind_gust_mps"),
    col("rain.1h").alias("rain_1h_mm"),
    col("clouds.all").alias("cloud_coverage_percent"),
    col("sys.country").alias("country_code"),
    to_timestamp(col("sys.sunrise")).alias("sunrise_time"),
    to_timestamp(col("sys.sunset")).alias("sunset_time"),
    (col("timezone") / 3600).alias("timezone_hours"),
    date_format(to_timestamp(col("dt")), "yyyy-MM-dd").alias("date"),
    date_format(to_timestamp(col("dt")), "HH").alias("hour"),
)

processed_df = processed_df.fillna({
    "weather_condition": "Unknown",
    "temperature_celsius": 0.0,
    "humidity_percent": 0,
    "wind_speed_mps": 0.0, 
    "wind_gust_mps": 0.0,
    "rain_1h_mm": 0.0,
    "cloud_coverage_percent": 0
})

# Lưu dữ liệu lên HDFS
# Lưu dữ liệu lên HDFS với partition theo ngày và giờ
query = processed_df.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "hdfs://namenode:9000/data/weather") \
    .option("checkpointLocation", "hdfs://namenode:9000/checkpoints/weather") \
    .partitionBy("date", "hour") \
    .start()

query.awaitTermination()

25/04/28 10:08:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/28 10:08:08 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
ERROR:root:Exception while sending command.                                     
Traceback (most recent call last):
  File "/opt/conda/lib/python3.11/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
RuntimeError: reentrant call inside <_io.BufferedReader name=59>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.11/site-packages/py4j/java_gateway.py", line 1038, in send_comman

Py4JError: An error occurred while calling o143.awaitTermination