In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
import time

In [None]:
from pyspark import SparkConf
credentials_location = '/home/phuc_0703/learnDE/data-engineering-zoomcamp/strong-ward-437213-j6-c3ae16d10e5f.json'

conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", "/home/phuc_0703/learnDE/data-engineering-zoomcamp/05-batch/libs/gcs-connector-hadoop3-2.2.5.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)


In [None]:
sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

In [None]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [None]:
from pyspark.sql.types import LongType, StructType, StructField, FloatType, IntegerType, DoubleType
from pyspark.sql.functions import col, from_unixtime
from pyspark.sql import functions as F
schema = StructType([
    StructField("time", LongType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("number", LongType(), True),
    StructField("step", LongType(), True),
    StructField("surface", DoubleType(), True),
    StructField("valid_time", LongType(), True),
    StructField("u10", FloatType(), True),
    StructField("v10", FloatType(), True),
    StructField("d2m", FloatType(), True),
    StructField("t2m", FloatType(), True),
    StructField("msl", FloatType(), True),
    StructField("sst", FloatType(), True),
    StructField("sp", FloatType(), True),
    StructField("tcc", FloatType(), True),
    StructField("tciw", FloatType(), True),
    StructField("tclw", FloatType(), True)
])


df_weather = spark.read.option("basePath", "gs://weather_bigdata_20241/other_data/year=2024/") \
                       .schema(schema) \
                       .parquet("gs://weather_bigdata_20241/other_data/year=2024/*")

In [None]:
# from pyspark.sql.types import LongType, StructType, StructField, FloatType, IntegerType, DoubleType
# from pyspark.sql.functions import col, from_unixtime
# from pyspark.sql import functions as F
# schema = StructType([
#     StructField("time", LongType(), True),
#     StructField("latitude", DoubleType(), True),
#     StructField("longitude", DoubleType(), True),
#     StructField("number", LongType(), True),
#     StructField("step", LongType(), True),
#     StructField("surface", DoubleType(), True),
#     # StructField("valid_time", LongType(), True),
#     StructField("u10", FloatType(), True),
#     StructField("v10", FloatType(), True),
#     StructField("d2m", FloatType(), True),
#     StructField("t2m", FloatType(), True),
#     StructField("msl", FloatType(), True),
#     StructField("sst", FloatType(), True),
#     StructField("sp", FloatType(), True),
#     StructField("tcc", FloatType(), True),
#     StructField("tciw", FloatType(), True),
#     StructField("tclw", FloatType(), True),
#     StructField("tp", FloatType(), True),
# ])


# df_weather = spark.read.option("basePath", "gs://weather_bigdata_20241/weather_joined/") \
#                        .schema(schema) \
#                        .parquet("gs://weather_bigdata_20241/weather_joined/*")

In [None]:
schema = StructType([
    StructField("time", LongType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("number", LongType(), True),
    StructField("step", LongType(), True),
    StructField("surface", DoubleType(), True),
    StructField("valid_time", LongType(), True),
    StructField("tp", FloatType(), True)
])


df_rain = spark.read.option("basePath", "gs://weather_bigdata_20241/rain/year=2024") \
                       .schema(schema) \
                       .parquet("gs://weather_bigdata_20241/rain/year=2024/*")

In [None]:
df_rain.count()

In [None]:
# Lọc các bản ghi mà trường "tp" không null
df_rain_filtered = df_rain.filter(df_rain.tp.isNotNull())

# Đếm số lượng bản ghi sau khi lọc
# print(f"Số lượng bản ghi không null ở trường 'tp': {df_rain_filtered.count()}")


In [None]:
df_weather.count()

In [None]:
from pyspark.sql.functions import col

# Thực hiện JOIN với điều kiện
df_joined = df_rain_filtered.join(
    df_weather,
    (df_rain_filtered.latitude == df_weather.latitude) &
    (df_rain_filtered.longitude == df_weather.longitude) &
    (df_rain_filtered.valid_time == df_weather.time),
    "inner"  # Chỉ lấy các bản ghi khớp nhau (INNER JOIN)
)

# Chọn các cột cần thiết và đổi tên nếu cần
df_joined = df_joined.select(
    df_rain_filtered.latitude.alias("latitude"),
    df_rain_filtered.longitude.alias("longitude"),
    df_rain_filtered.valid_time.alias("time"),
    df_weather.step,
    df_weather.surface,
    df_weather.t2m,
    df_weather.d2m,
    df_weather.u10,
    df_weather.v10,
    df_weather.msl,
    df_weather.sst,
    df_weather.sp,
    df_weather.tcc,
    df_weather.tciw,
    df_weather.tclw,
    df_rain_filtered.tp
)

# # Đếm số lượng bản ghi
# print(df_joined.count())


In [None]:
df_joined.show(10)

In [None]:
df_weather.show(10)

In [None]:
df_rain.show(10)

In [None]:
df_joined = df_joined.withColumn("time", from_unixtime(col("time") / 1_000_000_000).cast("timestamp"))
df_joined = df_joined.withColumn("d2m", F.col("d2m") - 273.15) \
                        .withColumn("t2m", F.col("t2m") - 273.15)
# df_joined.show(10)

In [None]:
df_joined.printSchema()

In [None]:
from pyspark.sql.functions import year, month, dayofmonth
df_joined = df_joined \
    .withColumn("year", year(col("time"))) \
    .withColumn("month", month(col("time"))) \
    .withColumn("day", dayofmonth(col("time")))

output_path = "gs://weather_bigdata_20241/weather_all"

df_joined.write \
    .partitionBy("year", "month", "day") \
    .mode("overwrite") \
    .parquet(output_path)


In [None]:
from pyspark.sql import functions as F
df_weather_main = (
    spark.read.option("timestampAsString", "true")
    .parquet("gs://weather_bigdata_20241/weather_all_fixed_partitions/*")
)
df_weather_main = df_weather_main.withColumn("time", df_weather_main["time"].cast("timestamp"))

In [None]:
df_weather_main.count()

In [None]:
df_weather_main.printSchema()

In [None]:
time = time.time() - start
time

In [None]:
df_weather_main = df_weather_main.withColumn("wind_speed", F.sqrt(F.col("u10")**2 + F.col("v10")**2))
df_weather_main.show(10)

In [None]:
df_weather_main = df_weather_main.withColumn("vapor_pressure", 
    6.11 * (10 ** (7.5 * F.col("d2m") / (F.col("d2m") + 237.3)))
)
df_weather_main.show(10)

In [None]:
df_weather_main = df_weather_main.withColumn("saturation_vapor_pressure", 
    6.11 * (10 ** (7.5 * F.col("t2m") / (F.col("t2m") + 237.3)))
)
df_weather_main.show(10)

In [None]:
df_weather_main = df_weather_main.withColumn("relative_humidity", 
    (F.col("vapor_pressure") / F.col("saturation_vapor_pressure")) * 100
)
df_weather_main.show(10)

In [None]:
df_weather_main = df_weather_main.withColumn("apparent_temperature", 
    (F.col("t2m")) +
    0.33 * F.col("vapor_pressure") - 0.70 * F.col("wind_speed") - 4.00
)
df_weather_main.show(10)

In [None]:
R = 287.05
df_weather_main = df_weather_main.withColumn("air_density", 
    F.col("sp") / (R * (F.col("t2m") + 273.15)) 
)
df_weather_main.show(10)

In [None]:
import math
df_weather_main = df_weather_main.withColumn(
    "wind_direction",
    F.when(
        (180 + (180 / math.pi) * F.atan2(F.col("u10"), F.col("v10"))) < 0,
        360 + (180 + (180 / math.pi) * F.atan2(F.col("u10"), F.col("v10")))
    ).otherwise(
        180 + (180 / math.pi) * F.atan2(F.col("u10"), F.col("v10"))
    )
)

In [None]:
# Define the latitude and longitude range filters
df_weather_main = df_weather_main.filter(
    (F.col("latitude") >= 8) & (F.col("latitude") <= 24) &
    (F.col("longitude") >= 102) & (F.col("longitude") <= 112)
)

# Select the date and calculate total precipitation
df_weather_main = df_weather_main.groupBy(F.to_date("valid_time").alias("date")) \
    .agg(F.sum("tp").alias("total_precipitation")) \
    .orderBy("date")

In [None]:
df_result = df_weather_main.select("time", "latitude", "longitude", "wind_speed", "vapor_pressure", 
                        "relative_humidity", "apparent_temperature", "air_density", "wind_direction")

In [None]:
df_result.show(10)

In [None]:
df_result = df_result \
    .withColumn("year", year(col("time"))) \
    .withColumn("month", month(col("time"))) \
    .withColumn("day", dayofmonth(col("time")))

output_path = "gs://weather_bigdata_20241/result"

df_result.write \
    .partitionBy("year", "month", "day") \
    .mode("overwrite") \
    .parquet(output_path)

In [None]:
sc.stop()
spark.stop()