In [1]:
#!pip install pyspark
#!pip install findspark

In [2]:
import findspark
import builtins
from pyspark.ml import Pipeline

findspark.init()
from cProfile import label

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, year, month, dayofmonth,abs,
    avg, sum, min, max, stddev, variance, percentile_approx,when
)
from pyspark.sql.functions import abs as spark_abs
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer, OneHotEncoder
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [3]:
# Khởi tạo Spark Session
spark = SparkSession.builder \
    .appName("Rainfall_Prediction_Project") \
    .getOrCreate()


In [4]:
csv_path = "../data/processed/rainfall_clean.csv"
print("CSV path:", csv_path)
# Đọc dữ liệu lượng mưa
df = (spark.read.option("header", True)
      .csv(csv_path, inferSchema=True))


CSV path: ../data/processed/rainfall_clean.csv


In [5]:
# Đếm số dòng và cột
num_rows = df.count()
num_cols = len(df.columns)
print(f"Dataset có {num_rows} dòng và {num_cols} cột")

Dataset có 10000 dòng và 10 cột


In [6]:
# Xem 10 dòng đầu
df.show(10)

+----------+--------+-----------+-------------+------------+--------------+------------+-------------+-----+------+
|      date|location|rainfall_mm|temperature_c|humidity_pct|wind_speed_kmh|pressure_hpa|rain_category|month|season|
+----------+--------+-----------+-------------+------------+--------------+------------+-------------+-----+------+
|2020-01-01|     Hue|      23.18|         20.3|        60.3|          22.4|      1004.2|     Moderate|    1|winter|
|2020-01-18|     Hue|       2.08|         20.1|        47.1|          32.5|      1018.0|        Light|    1|winter|
|2020-01-27|     Hue|     150.26|         21.2|        85.1|           9.4|      1001.5|        Heavy|    1|winter|
|2020-02-01|     Hue|     155.77|         33.7|        52.2|          23.5|       993.9|        Heavy|    2|winter|
|2020-02-12|    Hcmc|      65.56|         28.5|        61.2|           4.4|       998.7|        Heavy|    2|winter|
|2020-02-26|   Hanoi|      140.6|         32.9|        91.8|          37

In [7]:
# Kiểm tra cấu trúc dữ liệu
df.printSchema()

root
 |-- date: date (nullable = true)
 |-- location: string (nullable = true)
 |-- rainfall_mm: double (nullable = true)
 |-- temperature_c: double (nullable = true)
 |-- humidity_pct: double (nullable = true)
 |-- wind_speed_kmh: double (nullable = true)
 |-- pressure_hpa: double (nullable = true)
 |-- rain_category: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- season: string (nullable = true)



In [8]:
# Khảo sát nhanh dữ liệu lượng mưa
df.describe("rainfall_mm").show()

+-------+-----------------+
|summary|      rainfall_mm|
+-------+-----------------+
|  count|            10000|
|   mean|99.11817499999994|
| stddev|57.91234235945848|
|    min|             0.01|
|    max|           199.94|
+-------+-----------------+



In [9]:
# Thống kê mô tả chi tiết để đánh giá phân bố và quyết định chuẩn hóa
rain_stats = df.select(
    max('rainfall_mm').alias('max_rain'),
    min('rainfall_mm').alias('min_rain'),
    avg('rainfall_mm').alias('mean_rain'),
    stddev('rainfall_mm').alias('std_rain'),
    variance('rainfall_mm').alias('var_rain'),
    percentile_approx('rainfall_mm', 0.25).alias('Q1'),
    percentile_approx('rainfall_mm', 0.5).alias('Median'),
    percentile_approx('rainfall_mm', 0.75).alias('Q3')
)

rain_stats.show(truncate=False)


+--------+--------+-----------------+-----------------+------------------+-----+------+------+
|max_rain|min_rain|mean_rain        |std_rain         |var_rain          |Q1   |Median|Q3    |
+--------+--------+-----------------+-----------------+------------------+-----+------+------+
|199.94  |0.01    |99.11817499999994|57.91234235945848|3353.8393975591284|48.48|99.45 |149.41|
+--------+--------+-----------------+-----------------+------------------+-----+------+------+



In [10]:
# Chuyển cột date về kiểu Date
df = df.withColumn("date", F.to_date("date", "yyyy-MM-dd"))

# Tách năm, tháng, ngày để phân tích thời gian
df = df.withColumn("year", F.year("date")) \
       .withColumn("month", F.month("date")) \
       .withColumn("day", F.dayofmonth("date"))

df.select("date", "year", "month", "day").show(5)


+----------+----+-----+---+
|      date|year|month|day|
+----------+----+-----+---+
|2020-01-01|2020|    1|  1|
|2020-01-18|2020|    1| 18|
|2020-01-27|2020|    1| 27|
|2020-02-01|2020|    2|  1|
|2020-02-12|2020|    2| 12|
+----------+----+-----+---+
only showing top 5 rows


In [11]:
# Thêm cột season với 4 mùa
df = df.withColumn(
    "season",
    when(col("month").isin(3, 4, 5), "spring")
    .when(col("month").isin(6, 7, 8), "summer")
    .when(col("month").isin(9, 10, 11), "autumn")
    .otherwise("winter")
)

In [12]:
#Tiền xử lý và ép kiểu an toàn an toàn
numeric_cols = [
    "rainfall_mm",
    "temperature_c",
    "humidity_pct",
    "wind_speed_kmh",
    "pressure_hpa"
]

for c in numeric_cols:
    df = df.withColumn(
        c,
        F.when(
            F.col(c).rlike(r'^-?\d+(\.\d+)?$'),
            F.col(c).cast(DoubleType())
        ).otherwise(None)
    )

In [13]:
print(" KIỂM TRA GIÁ TRỊ THIẾU:")
print("-" * 40)

total_rows = df.count()
for column in df.columns:
    # Đếm số dòng có giá trị null
    null_count = df.filter(col(column).isNull()).count()
    if null_count > 0:
        null_percentage = (null_count / total_rows) * 100
        print(f"{column}: {null_count} giá trị thiếu ({null_percentage:.2f}%)")


 KIỂM TRA GIÁ TRỊ THIẾU:
----------------------------------------


In [14]:
# Xóa các dòng có giá trị thiếu
df_clean = df.dropna()
print(f"Đã xóa dòng có missing values. Số dòng còn lại: {df_clean.count()}")

Đã xóa dòng có missing values. Số dòng còn lại: 10000


In [15]:
print("PHÂN TÍCH LƯỢNG MƯA THEO KHU VỰC:")

# Tạo bảng tạm để sử dụng SQL
df_clean.createOrReplaceTempView("rainfall")


PHÂN TÍCH LƯỢNG MƯA THEO KHU VỰC:


In [16]:
# Phân tích theo khu vực
print("PHÂN TÍCH LƯỢNG MƯA THEO KHU VỰC")
spark.sql("""
    SELECT
        location,
        COUNT(*) as so_mau,
        ROUND(AVG(rainfall_mm), 2) as avg_rain,
        ROUND(MIN(rainfall_mm), 2) as min_rain,
        ROUND(MAX(rainfall_mm), 2) as max_rain,
        ROUND(STDDEV(rainfall_mm), 2) as std_rain
    FROM rainfall
    GROUP BY location
    ORDER BY avg_rain DESC
""").show(truncate=False)


PHÂN TÍCH LƯỢNG MƯA THEO KHU VỰC
+--------+------+--------+--------+--------+--------+
|location|so_mau|avg_rain|min_rain|max_rain|std_rain|
+--------+------+--------+--------+--------+--------+
|Hanoi   |2044  |101.27  |0.08    |199.94  |58.39   |
|Can Tho |1941  |99.24   |0.07    |199.83  |58.34   |
|Danang  |2070  |98.85   |0.06    |199.85  |57.41   |
|Hcmc    |1971  |98.47   |0.17    |199.77  |57.3    |
|Hue     |1974  |97.7    |0.01    |199.47  |58.13   |
+--------+------+--------+--------+--------+--------+



In [17]:
print("LƯỢNG MƯA TRUNG BÌNH & TỔNG THEO KHU VỰC")

spark.sql("""
    SELECT
        location,
        ROUND(AVG(rainfall_mm), 2) AS avg_rainfall,
        ROUND(SUM(rainfall_mm), 2) AS total_rainfall
    FROM rainfall
    GROUP BY location
    ORDER BY total_rainfall DESC
""").show(truncate=False)


LƯỢNG MƯA TRUNG BÌNH & TỔNG THEO KHU VỰC
+--------+------------+--------------+
|location|avg_rainfall|total_rainfall|
+--------+------------+--------------+
|Hanoi   |101.27      |206992.08     |
|Danang  |98.85       |204614.99     |
|Hcmc    |98.47       |194074.96     |
|Hue     |97.7        |192865.47     |
|Can Tho |99.24       |192634.25     |
+--------+------------+--------------+



In [18]:
# Xếp hạng khu vực mưa nhiều nhất theo năm
print("XẾP HẠNG LƯỢNG MƯA THEO NĂM")
spark.sql("""
    SELECT
        year,
        location,
        ROUND(AVG(rainfall_mm),2) AS avg_rainfall,
        RANK() OVER (
            PARTITION BY year
            ORDER BY AVG(rainfall_mm) DESC
        ) AS rainfall_rank
    FROM rainfall
    GROUP BY year, location
    ORDER BY year, rainfall_rank
""").show()


XẾP HẠNG LƯỢNG MƯA THEO NĂM
+----+--------+------------+-------------+
|year|location|avg_rainfall|rainfall_rank|
+----+--------+------------+-------------+
|2020|   Hanoi|      101.03|            1|
|2020| Can Tho|        99.6|            2|
|2020|  Danang|       99.38|            3|
|2020|    Hcmc|       98.79|            4|
|2020|     Hue|       97.11|            5|
|2021|   Hanoi|      102.86|            1|
|2021|     Hue|      101.86|            2|
|2021| Can Tho|        96.5|            3|
|2021|    Hcmc|       96.13|            4|
|2021|  Danang|       94.77|            5|
+----+--------+------------+-------------+



In [19]:
# Phân tích theo mùa
print("\nPHÂN TÍCH LƯỢNG MƯA THEO MÙA:")
spark.sql("""
    SELECT
        season,
        COUNT(*) as count,
        ROUND(AVG(rainfall_mm), 2) as avg_rain
    FROM rainfall
    GROUP BY season
""").show()


PHÂN TÍCH LƯỢNG MƯA THEO MÙA:
+------+-----+--------+
|season|count|avg_rain|
+------+-----+--------+
|winter| 3400|   98.79|
|summer| 2208|   99.44|
|spring| 2208|   98.13|
|autumn| 2184|  100.31|
+------+-----+--------+



In [20]:
print("PHÂN LOẠI MỨC ĐỘ MƯA")

spark.sql("""
    SELECT
        rain_level,
        COUNT(*) AS total_days,
        ROUND(AVG(rainfall_mm), 2) AS avg_rainfall
    FROM (
        SELECT
            rainfall_mm,
            CASE
                WHEN rainfall_mm = 0 THEN 'No Rain'
                WHEN rainfall_mm < 10 THEN 'Light Rain'
                WHEN rainfall_mm < 30 THEN 'Moderate Rain'
                ELSE 'Heavy Rain'
            END AS rain_level
        FROM rainfall
    )
    GROUP BY rain_level
    ORDER BY avg_rainfall DESC
""").show(truncate=False)


PHÂN LOẠI MỨC ĐỘ MƯA
+-------------+----------+------------+
|rain_level   |total_days|avg_rainfall|
+-------------+----------+------------+
|Heavy Rain   |8431      |114.76      |
|Moderate Rain|1033      |20.31       |
|Light Rain   |536       |4.98        |
+-------------+----------+------------+



In [21]:
print("SO SÁNH KHU VỰC VS TRUNG BÌNH TOÀN CỤC")

spark.sql("""
    WITH global_avg AS (
        SELECT AVG(rainfall_mm) AS global_avg
        FROM rainfall
    )
    SELECT
        r.location,
        ROUND(AVG(r.rainfall_mm), 2) AS location_avg,
        ROUND(g.global_avg, 2) AS global_avg,
        ROUND(AVG(r.rainfall_mm) - g.global_avg, 2) AS deviation
    FROM rainfall r
    CROSS JOIN global_avg g
    GROUP BY r.location, g.global_avg
    ORDER BY deviation DESC
""").show(truncate=False)


SO SÁNH KHU VỰC VS TRUNG BÌNH TOÀN CỤC
+--------+------------+----------+---------+
|location|location_avg|global_avg|deviation|
+--------+------------+----------+---------+
|Hanoi   |101.27      |99.12     |2.15     |
|Can Tho |99.24       |99.12     |0.13     |
|Danang  |98.85       |99.12     |-0.27    |
|Hcmc    |98.47       |99.12     |-0.65    |
|Hue     |97.7        |99.12     |-1.42    |
+--------+------------+----------+---------+



In [22]:
print("PHÁT HIỆN NGÀY MƯA BẤT THƯỜNG")

spark.sql("""
    WITH stats AS (
        SELECT
            location,
            AVG(rainfall_mm) AS avg_rain,
            STDDEV(rainfall_mm) AS std_rain
        FROM rainfall
        GROUP BY location
    )
    SELECT
        r.date,
        r.location,
        r.rainfall_mm,
        ROUND(s.avg_rain, 2) AS avg_rain,
        ROUND(s.std_rain, 2) AS std_rain
    FROM rainfall r
    JOIN stats s
        ON r.location = s.location
    WHERE r.rainfall_mm > s.avg_rain + 2 * s.std_rain
    ORDER BY r.rainfall_mm DESC
""").show(truncate=False)


PHÁT HIỆN NGÀY MƯA BẤT THƯỜNG
+----+--------+-----------+--------+--------+
|date|location|rainfall_mm|avg_rain|std_rain|
+----+--------+-----------+--------+--------+
+----+--------+-----------+--------+--------+



In [23]:
print("XU HƯỚNG LƯỢNG MƯA – TRUNG BÌNH TRƯỢT 7 NGÀY")

spark.sql("""
    SELECT
        date,
        location,
        rainfall_mm,
        ROUND(
            AVG(rainfall_mm) OVER (
                PARTITION BY location
                ORDER BY date
                ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
            ), 2
        ) AS rainfall_7day_avg
    FROM rainfall
    ORDER BY location, date
""").show(truncate=False)


XU HƯỚNG LƯỢNG MƯA – TRUNG BÌNH TRƯỢT 7 NGÀY
+----------+--------+-----------+-----------------+
|date      |location|rainfall_mm|rainfall_7day_avg|
+----------+--------+-----------+-----------------+
|2020-01-01|Can Tho |113.24     |113.24           |
|2020-01-01|Can Tho |105.91     |109.57           |
|2020-01-01|Can Tho |198.74     |139.3            |
|2020-01-02|Can Tho |36.08      |113.49           |
|2020-01-02|Can Tho |184.11     |127.62           |
|2020-01-02|Can Tho |72.93      |118.5            |
|2020-01-02|Can Tho |6.32       |102.48           |
|2020-01-02|Can Tho |148.18     |107.47           |
|2020-01-03|Can Tho |129.74     |110.87           |
|2020-01-03|Can Tho |68.25      |92.23            |
|2020-01-03|Can Tho |109.43     |102.71           |
|2020-01-04|Can Tho |61.15      |85.14            |
|2020-01-04|Can Tho |146.93     |95.71            |
|2020-01-04|Can Tho |42.15      |100.83           |
|2020-01-06|Can Tho |33.33      |84.43            |
|2020-01-06|Can Tho

TRANSFORM

In [24]:
# Chuẩn bị biến số
numeric_features = ["temperature_c", "humidity_pct",
                    "wind_speed_kmh", "pressure_hpa"]
label_column = "rainfall_mm"

print(f"\n1. Biến số (numeric): {numeric_features}")
print(f"2. Target: {label_column}")


1. Biến số (numeric): ['temperature_c', 'humidity_pct', 'wind_speed_kmh', 'pressure_hpa']
2. Target: rainfall_mm


In [25]:
# Mã hóa biến phân loại 'location'
indexer = StringIndexer(inputCol="location", outputCol="location_index")
encoder = OneHotEncoder(inputCol="location_index",
                       outputCol="location_encoded")

In [26]:
# Mã hóa biến 'season'
season_indexer = StringIndexer(inputCol="season", outputCol="season_index")
season_encoder = OneHotEncoder(inputCol="season_index",
                              outputCol="season_encoded")


In [27]:
    # Tạo danh sách feature cuối cùng
final_features = numeric_features + ["location_encoded", "season_encoded"]

print("CÁC FEATURE SẼ SỬ DỤNG:")
for i, feat in enumerate(final_features, 1):
    print(f"  {i}. {feat}")

CÁC FEATURE SẼ SỬ DỤNG:
  1. temperature_c
  2. humidity_pct
  3. wind_speed_kmh
  4. pressure_hpa
  5. location_encoded
  6. season_encoded


In [28]:
# Tạo Vector Assembler
assembler = VectorAssembler(
    inputCols=final_features,
    outputCol="features"
)

In [29]:
# Tạo pipeline HOÀN CHỈNH
feature_pipeline = Pipeline(stages=[
    indexer, encoder,
    season_indexer, season_encoder,
    assembler
])

In [30]:
#Fit và transform qua pipeline
df_transformed = feature_pipeline.fit(df_clean).transform(df_clean)

In [31]:
# Chọn chỉ columns cần thiết
df_final = df_transformed.select("features", label_column)

In [32]:
print("KẾT QUẢ TRANSFORM:")
print(f"Số mẫu: {df_final.count():,} mẫu")
print(f"Cấu trúc: {len(df_final.columns)} cột")
print(f"Cột: {df_final.columns}")

print("\n5 MẪU ĐẦU TIÊN (features đã encoded):")
print("-"*60)
df_final.show(5, truncate=False)


KẾT QUẢ TRANSFORM:
Số mẫu: 10,000 mẫu
Cấu trúc: 2 cột
Cột: ['features', 'rainfall_mm']

5 MẪU ĐẦU TIÊN (features đã encoded):
------------------------------------------------------------
+--------------------------------------------------+-----------+
|features                                          |rainfall_mm|
+--------------------------------------------------+-----------+
|(11,[0,1,2,3,6,8],[20.3,60.3,22.4,1004.2,1.0,1.0])|23.18      |
|(11,[0,1,2,3,6,8],[20.1,47.1,32.5,1018.0,1.0,1.0])|2.08       |
|(11,[0,1,2,3,6,8],[21.2,85.1,9.4,1001.5,1.0,1.0]) |150.26     |
|(11,[0,1,2,3,6,8],[33.7,52.2,23.5,993.9,1.0,1.0]) |155.77     |
|(11,[0,1,2,3,7,8],[28.5,61.2,4.4,998.7,1.0,1.0])  |65.56      |
+--------------------------------------------------+-----------+
only showing top 5 rows


In [33]:
# Lấy một mẫu để xem cấu trúc
sample_row = df_final.limit(1).collect()[0]
features_vector = sample_row["features"]
rainfall_value = sample_row[label_column]

print(f"1. Rainfall (target gốc): {rainfall_value} mm")
print(f"2. Số lượng features: {len(features_vector)}")
print(f"3. Các features giữ nguyên scale gốc:")
print(f"   - Numeric features: {numeric_features}")
print(f"   - One-hot encoded: location (5 categories) + season (4 categories)")
print(f"4. Ví dụ vector features: {features_vector}")

1. Rainfall (target gốc): 23.18 mm
2. Số lượng features: 11
3. Các features giữ nguyên scale gốc:
   - Numeric features: ['temperature_c', 'humidity_pct', 'wind_speed_kmh', 'pressure_hpa']
   - One-hot encoded: location (5 categories) + season (4 categories)
4. Ví dụ vector features: (11,[0,1,2,3,6,8],[20.3,60.3,22.4,1004.2,1.0,1.0])


In [34]:
def train_rainfall_model(
    df_final,
    feature_columns,
    label_column="rainfall_mm",
    test_size=0.2,
    seed=42,
    scale=True
):
    print("BẮT ĐẦU HUẤN LUYỆN MÔ HÌNH")
    print(f"Test size: {test_size*100}%")

    # Kiểm tra dữ liệu
    print("\nKIỂM TRA DỮ LIỆU")
    if "features" not in df_final.columns:
        raise ValueError("Thiếu column 'features'")
    if label_column not in df_final.columns:
        raise ValueError(f"Thiếu column '{label_column}'")

    # Chia dữ liệu
    train_df, test_df = df_final.randomSplit([1 - test_size, test_size], seed=seed)
    print(f"  Train: {train_df.count()} mẫu")
    print(f"  Test : {test_df.count()} mẫu")

    # Khở tạo các stages
    stages = []
    if scale:
        scaler = StandardScaler(
            inputCol="features",
            outputCol="scaled_features",
            withMean=True,
            withStd=True
        )
        stages.append(scaler)
        features_col = "scaled_features"
    else:
        features_col = "features"

    # Khởi tạo thuật toán Linear Regression
    lr = LinearRegression(
        featuresCol=features_col,
        labelCol=label_column,
        solver="normal"
    )
    stages.append(lr)

    pipeline = Pipeline(stages=stages)

    # Huấn luyện mô hình
    print("\nHUẤN LUYỆN MÔ HÌNH...")
    pipeline_model = pipeline.fit(train_df)

    # lr là stage cuối cùng trong pipeline
    lr_model = pipeline_model.stages[-1]

    # Dự đoán và đánh giá
    predictions = pipeline_model.transform(test_df)

    print("\nĐÁNH GIÁ TRÊN TẬP TEST")
    evaluators = {
        "RMSE": RegressionEvaluator(labelCol=label_column, metricName="rmse"),
        "MAE": RegressionEvaluator(labelCol=label_column, metricName="mae"),
        "R2": RegressionEvaluator(labelCol=label_column, metricName="r2")
    }

    metrics = {}
    for name, evaluator in evaluators.items():
        value = evaluator.evaluate(predictions)
        metrics[name] = value
        print(f"  {name}: {value:.4f}")

    # Thống kế trên tập train
    print("\nTHỐNG KÊ TRÊN TẬP TRAIN")
    summary = lr_model.summary
    print(f"  R2 (train): {summary.r2:.4f}")
    print(f"  RMSE (train): {summary.rootMeanSquaredError:.4f}")

    importance = lr_model.coefficients.toArray().tolist()

    print("\nHOÀN THÀNH HUẤN LUYỆN")
    return {
        "pipeline_model": pipeline_model,
        "linear_model": lr_model,
        "predictions": predictions,
        "metrics": metrics,
        "feature_importance": importance
    }

In [35]:
results = train_rainfall_model(
    df_final=df_final,
    feature_columns=final_features,
    label_column="rainfall_mm",
    test_size=0.2,
    seed=42,
    scale=True   # ← có thể False để so sánh
)


BẮT ĐẦU HUẤN LUYỆN MÔ HÌNH
Test size: 20.0%

KIỂM TRA DỮ LIỆU
  Train: 8079 mẫu
  Test : 1921 mẫu

HUẤN LUYỆN MÔ HÌNH...

ĐÁNH GIÁ TRÊN TẬP TEST
  RMSE: 56.5883
  MAE: 48.7631
  R2: -0.0022

THỐNG KÊ TRÊN TẬP TRAIN
  R2 (train): 0.0009
  RMSE (train): 58.1989

HOÀN THÀNH HUẤN LUYỆN


In [36]:
results = train_rainfall_model(
    df_final=df_final,
    feature_columns=final_features,
    label_column="rainfall_mm",
    test_size=0.2,
    seed=142,
    scale=True   # ← có thể False để so sánh
)


BẮT ĐẦU HUẤN LUYỆN MÔ HÌNH
Test size: 20.0%

KIỂM TRA DỮ LIỆU
  Train: 7994 mẫu
  Test : 2006 mẫu

HUẤN LUYỆN MÔ HÌNH...

ĐÁNH GIÁ TRÊN TẬP TEST
  RMSE: 57.7957
  MAE: 50.1148
  R2: -0.0007

THỐNG KÊ TRÊN TẬP TRAIN
  R2 (train): 0.0008
  RMSE (train): 57.9162

HOÀN THÀNH HUẤN LUYỆN


In [37]:
spark.stop()