# PM2.5 Prediction - Data Preprocessing with HDFS

## üéØ M·ª•c ƒê√≠ch
Notebook n√†y demo **t√≠ch h·ª£p HDFS v√†o quy tr√¨nh preprocessing** cho m√¥n Big Data:
- ‚úÖ **Input:** ƒê·ªçc d·ªØ li·ªáu t·ª´ HDFS (`/data/raw/`)
- ‚úÖ **Processing:** X·ª≠ l√Ω v·ªõi PySpark (local mode)
- ‚úÖ **Output:** L∆∞u k·∫øt qu·∫£ l√™n HDFS (`/data/processed/`)

## ‚öôÔ∏è Workflow Architecture

```
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê      ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê      ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ   HDFS      ‚îÇ      ‚îÇ   Windows    ‚îÇ      ‚îÇ   HDFS      ‚îÇ
‚îÇ  /data/raw  ‚îÇ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚ñ∂‚îÇ   + Spark    ‚îÇ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚ñ∂‚îÇ/data/proc   ‚îÇ
‚îÇ  (Input)    ‚îÇ Download‚îÇ  Processing ‚îÇUpload‚îÇ (Output)   ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò      ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò      ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
```

### Technical Approach: Hybrid Mode

Do Spark tr√™n Windows kh√¥ng k·∫øt n·ªëi tr·ª±c ti·∫øp HDFS qua RPC ƒë∆∞·ª£c, ta d√πng ph∆∞∆°ng ph√°p:

1. **Download t·ª´ HDFS:** `docker exec hdfs-namenode hdfs dfs -get` ‚Üí local temp
2. **Process v·ªõi Spark:** ƒê·ªçc local files, x·ª≠ l√Ω nh∆∞ b√¨nh th∆∞·ªùng
3. **Upload l√™n HDFS:** `docker exec hdfs-namenode hdfs dfs -put` ‚Üí HDFS

C√°ch n√†y v·∫´n **ƒë·∫ßy ƒë·ªß demo HDFS** cho m√¥n h·ªçc m√† tr√°nh ƒë∆∞·ª£c l·ªói k·∫øt n·ªëi!

## üìã Prerequisites

### 1. Kh·ªüi ƒë·ªông HDFS Cluster
```bash
docker-compose up -d
```

### 2. Upload d·ªØ li·ªáu l√™n HDFS
```powershell
.\upload_to_hdfs.ps1
```

### 3. Ki·ªÉm tra HDFS
- NameNode UI: http://localhost:9870
- Verify: C√≥ 28 files trong `/data/raw/`

### 4. C·∫•u tr√∫c HDFS
```
/data/
  ‚îú‚îÄ‚îÄ raw/                          # Input t·ª´ HDFS
  ‚îÇ   ‚îú‚îÄ‚îÄ pollutant_location_*.csv  (14 files)
  ‚îÇ   ‚îî‚îÄ‚îÄ weather_location_*.csv    (14 files)
  ‚îî‚îÄ‚îÄ processed/                    # Output l√™n HDFS
      ‚îú‚îÄ‚îÄ cnn_sequences/
      ‚îú‚îÄ‚îÄ lstm_sequences/
      ‚îú‚îÄ‚îÄ xgboost/
      ‚îú‚îÄ‚îÄ datasets_ready.json
      ‚îî‚îÄ‚îÄ scaler_params.json
```

## üöÄ Ready to Run!
Ch·∫°y All Cells ƒë·ªÉ th·ª±c hi·ªán preprocessing v·ªõi HDFS integration.

In [1]:
import os
import sys

# Set Java for PySpark (local machine)
os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk-21'
os.environ['PATH'] = os.environ['JAVA_HOME'] + r'\bin;' + os.environ.get('PATH', '')
print(f"[OK] Using Java: {os.environ['JAVA_HOME']}")

# Common imports
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

%matplotlib inline
sns.set_style('whitegrid')

print("[OK] All imports successful!")

[OK] Using Java: C:\Program Files\Java\jdk-21
[OK] All imports successful!
[OK] All imports successful!


## 1. K·∫øt n·ªëi Spark v·ªõi HDFS

In [None]:
# HDFS Configuration - Using Docker commands for file I/O
# Since Spark RPC from Windows to HDFS has connectivity issues,
# we'll use a hybrid approach:
# 1. Download files from HDFS to temp directory (via docker exec)
# 2. Process with Spark locally
# 3. Upload results back to HDFS (via docker exec)

import subprocess
import tempfile
import shutil

HDFS_NAMENODE = "hdfs://namenode:9000"  # Internal HDFS address
HDFS_RAW_DATA_PATH = "/data/raw"
HDFS_PROCESSED_DATA_PATH = "/data/processed"
TEMP_DIR = tempfile.mkdtemp(prefix="hdfs_staging_")

print("[INFO] HDFS Hybrid Mode Configuration")
print(f"[INFO] HDFS Raw Data: {HDFS_RAW_DATA_PATH}")
print(f"[INFO] HDFS Processed Data: {HDFS_PROCESSED_DATA_PATH}")
print(f"[INFO] Local Staging: {TEMP_DIR}")

# Helper functions for HDFS operations
def hdfs_download_directory(hdfs_path, local_path):
    """Download entire directory from HDFS to local via docker exec"""
    print(f"\n[DOWNLOAD] HDFS {hdfs_path} -> Local {local_path}")
    
    # Create local directory
    os.makedirs(local_path, exist_ok=True)
    
    # Step 1: Get from HDFS to container /tmp
    container_tmp = f"/tmp/hdfs_download_{os.path.basename(hdfs_path)}"
    subprocess.run(
        ["docker", "exec", "hdfs-namenode", "hdfs", "dfs", "-get", hdfs_path, container_tmp],
        check=True,
        capture_output=True
    )
    
    # Step 2: Copy from container to Windows host
    subprocess.run(
        ["docker", "cp", f"hdfs-namenode:{container_tmp}/.", local_path],
        check=True,
        capture_output=True
    )
    
    # Step 3: Cleanup container temp
    subprocess.run(
        ["docker", "exec", "hdfs-namenode", "rm", "-rf", container_tmp],
        check=False,
        capture_output=True
    )
    
    # Count files
    file_count = len([f for f in os.listdir(local_path) if os.path.isfile(os.path.join(local_path, f))])
    print(f"[OK] Downloaded {file_count} files")
    return local_path

def hdfs_upload_directory(local_path, hdfs_path):
    """Upload entire directory from local to HDFS via docker exec"""
    print(f"\n[UPLOAD] Local {local_path} -> HDFS {hdfs_path}")
    
    # Step 1: Copy from Windows host to container /tmp
    container_tmp = f"/tmp/hdfs_upload_{os.path.basename(local_path)}"
    subprocess.run(
        ["docker", "exec", "hdfs-namenode", "rm", "-rf", container_tmp],
        check=False,
        capture_output=True
    )
    subprocess.run(
        ["docker", "cp", f"{local_path}/.", f"hdfs-namenode:{container_tmp}"],
        check=True,
        capture_output=True
    )
    
    # Step 2: Put from container to HDFS (create parent dir first)
    hdfs_parent = os.path.dirname(hdfs_path)
    subprocess.run(
        ["docker", "exec", "hdfs-namenode", "hdfs", "dfs", "-mkdir", "-p", hdfs_parent],
        check=False,
        capture_output=True
    )
    subprocess.run(
        ["docker", "exec", "hdfs-namenode", "hdfs", "dfs", "-rm", "-r", "-f", hdfs_path],
        check=False,
        capture_output=True
    )
    subprocess.run(
        ["docker", "exec", "hdfs-namenode", "hdfs", "dfs", "-put", container_tmp, hdfs_path],
        check=True,
        capture_output=True
    )
    
    # Step 3: Cleanup container temp
    subprocess.run(
        ["docker", "exec", "hdfs-namenode", "rm", "-rf", container_tmp],
        check=False,
        capture_output=True
    )
    
    print(f"[OK] Uploaded to {hdfs_path}")

# Create Spark Session (local mode - no HDFS direct access needed)
print("\n[INFO] Creating Spark Session (local mode)...")
spark = SparkSession.builder \
    .appName("PM25-Preprocessing-HDFS-Hybrid") \
    .master("local[8]") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "16") \
    .config("spark.default.parallelism", "16") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")
print(f"[OK] Spark Session created: {spark.version}")
print(f"[OK] Master: {spark.sparkContext.master}")
print(f"[OK] App Name: {spark.sparkContext.appName}")


[INFO] Configuring Spark with HDFS support...
[INFO] HDFS NameNode (RPC): hdfs://localhost:9000
[INFO] WebHDFS (HTTP): webhdfs://localhost:9870
[OK] Spark version: 4.0.1
[OK] Spark mode: local[8]
[OK] Application ID: local-1764997463457
[OK] Cores: 16

[TEST] Testing HDFS connection via WebHDFS...
[OK] Spark version: 4.0.1
[OK] Spark mode: local[8]
[OK] Application ID: local-1764997463457
[OK] Cores: 16

[TEST] Testing HDFS connection via WebHDFS...
[ERROR] ‚úó HDFS connection failed: An error occurred while calling o61.load.
: org.apache.spark.SparkException: [FAILED_READ_FILE.NO_HINT] Encountered error while reading file hdfs://localhost:9000/data/raw/pollutant_location_7727.csv.  SQLSTATE: KD001
	at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:856)
	at org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2$.attachFilePath(FileDataSourceV2.scala:142)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext

## 2. ƒê·ªãnh nghƒ©a Schema v√† Scan Files

In [None]:
# Download data from HDFS to local staging directory
print("="*60)
print("STEP 1: Download Data from HDFS")
print("="*60)

# Download raw data from HDFS
LOCAL_RAW_PATH = os.path.join(TEMP_DIR, "raw")
hdfs_download_directory(HDFS_RAW_DATA_PATH, LOCAL_RAW_PATH)

# Verify downloaded files
pollutant_files = [f for f in os.listdir(LOCAL_RAW_PATH) if f.startswith('pollutant_')]
weather_files = [f for f in os.listdir(LOCAL_RAW_PATH) if f.startswith('weather_')]

print(f"\n[VERIFY] Pollutant files: {len(pollutant_files)}")
print(f"[VERIFY] Weather files: {len(weather_files)}")
print(f"[OK] All data downloaded from HDFS to: {LOCAL_RAW_PATH}")


[OK] Schemas defined


### 2.1 Scan v√† Map Files theo Location

In [None]:
# Scan downloaded files from local staging
import re

pollutant_files = sorted([f for f in os.listdir(LOCAL_RAW_PATH) if f.startswith('pollutant_')])
location_mapping = {}

for pollutant_file in pollutant_files:
    match = re.search(r'pollutant_location_(\d+)\.csv', pollutant_file)
    if match:
        location_id = match.group(1)
        weather_file = f'weather_location_{location_id}.csv'
        
        if weather_file in os.listdir(LOCAL_RAW_PATH):
            location_mapping[location_id] = {
                'pollutant': os.path.join(LOCAL_RAW_PATH, pollutant_file),
                'weather': os.path.join(LOCAL_RAW_PATH, weather_file)
            }

print(f"[INFO] Found {len(location_mapping)} location pairs")
print(f"[INFO] Location IDs: {sorted(location_mapping.keys())}")


[HDFS] Files in /data/raw:
  - pollutant_location_233335.csv (11.96 MB)
  - pollutant_location_233336.csv (12.28 MB)
  - pollutant_location_7727.csv (13.20 MB)
  - pollutant_location_7728.csv (11.49 MB)
  - pollutant_location_7730.csv (9.97 MB)
  - pollutant_location_7732.csv (10.25 MB)
  - pollutant_location_7733.csv (10.22 MB)
  - pollutant_location_7734.csv (10.77 MB)
  - pollutant_location_7735.csv (10.55 MB)
  - pollutant_location_7736.csv (10.65 MB)
  - pollutant_location_7737.csv (10.19 MB)
  - pollutant_location_7739.csv (12.76 MB)
  - pollutant_location_7740.csv (12.84 MB)
  - pollutant_location_7742.csv (12.85 MB)
  - weather_location_233335.csv (1.07 MB)
  - weather_location_233336.csv (1.08 MB)
  - weather_location_7727.csv (1.08 MB)
  - weather_location_7728.csv (1.07 MB)
  - weather_location_7730.csv (1.07 MB)
  - weather_location_7732.csv (1.08 MB)
  - weather_location_7733.csv (1.07 MB)
  - weather_location_7734.csv (1.07 MB)
  - weather_location_7735.csv (1.07 MB)
  - 

In [None]:
# This cell is now handled by the previous cells that download from HDFS
# Data is already in LOCAL_RAW_PATH and location_mapping is ready
print(f"[INFO] Using downloaded data from: {LOCAL_RAW_PATH}")
print(f"[INFO] Total locations ready: {len(location_mapping)}")


[HDFS] Using HDFS path: hdfs://localhost:9000/data/raw
[FILES] Found 14 pollutant files in HDFS
  [OK] Location 233335: pollutant_location_233335.csv + weather_location_233335.csv
  [OK] Location 233336: pollutant_location_233336.csv + weather_location_233336.csv
  [OK] Location 7727: pollutant_location_7727.csv + weather_location_7727.csv
  [OK] Location 7728: pollutant_location_7728.csv + weather_location_7728.csv
  [OK] Location 7730: pollutant_location_7730.csv + weather_location_7730.csv
  [OK] Location 7728: pollutant_location_7728.csv + weather_location_7728.csv
  [OK] Location 7730: pollutant_location_7730.csv + weather_location_7730.csv
  [OK] Location 7732: pollutant_location_7732.csv + weather_location_7732.csv
  [OK] Location 7733: pollutant_location_7733.csv + weather_location_7733.csv
  [OK] Location 7734: pollutant_location_7734.csv + weather_location_7734.csv
  [OK] Location 7735: pollutant_location_7735.csv + weather_location_7735.csv
  [OK] Location 7736: pollutant_lo

In [None]:
# Display location mapping
print("="*60)
print("Location Files Ready for Processing:")
print("="*60)
for loc_id in sorted(location_mapping.keys()):
    files = location_mapping[loc_id]
    print(f"Location {loc_id}:")
    print(f"  - Pollutant: {os.path.basename(files['pollutant'])}")
    print(f"  - Weather:   {os.path.basename(files['weather'])}")


Location mapping:
  233335: {'pollutant': 'hdfs://localhost:9000/data/raw/pollutant_location_233335.csv', 'weather': 'hdfs://localhost:9000/data/raw/weather_location_233335.csv'}
  233336: {'pollutant': 'hdfs://localhost:9000/data/raw/pollutant_location_233336.csv', 'weather': 'hdfs://localhost:9000/data/raw/weather_location_233336.csv'}
  7727: {'pollutant': 'hdfs://localhost:9000/data/raw/pollutant_location_7727.csv', 'weather': 'hdfs://localhost:9000/data/raw/weather_location_7727.csv'}
  7728: {'pollutant': 'hdfs://localhost:9000/data/raw/pollutant_location_7728.csv', 'weather': 'hdfs://localhost:9000/data/raw/weather_location_7728.csv'}
  7730: {'pollutant': 'hdfs://localhost:9000/data/raw/pollutant_location_7730.csv', 'weather': 'hdfs://localhost:9000/data/raw/weather_location_7730.csv'}
  7732: {'pollutant': 'hdfs://localhost:9000/data/raw/pollutant_location_7732.csv', 'weather': 'hdfs://localhost:9000/data/raw/weather_location_7732.csv'}
  7733: {'pollutant': 'hdfs://localhost:

### 2.2 X·ª≠ l√Ω t·ª´ng Location

In [7]:
# List ƒë·ªÉ ch·ª©a dataframes c·ªßa t·ª´ng location
all_locations_data = []

for location_id, files in location_mapping.items():
    print(f"\n[PROCESSING] Processing Location {location_id}...")
    
    # ƒê·ªçc pollutant data
    df_air = spark.read.csv(
        files['pollutant'],
        header=True,
        schema=openaq_schema
    )
    
    # [?] L·ªåC CH·ªà L·∫§Y C√ÅC CH·ªà S·ªê QUAN T√ÇM: PM2.5, PM10, SO2, NO2
    df_air = df_air.filter(
        F.col("parameter").isin(["pm25", "pm10", "so2", "no2"])
    )
    
    # ƒê·ªçc weather data
    df_weather = spark.read.csv(
        files['weather'],
        header=True,
        schema=weather_schema
    )
    
    print(f"  [DATA] Air quality (PM2.5, PM10, SO2, NO2): {df_air.count():,} records")
    print(f"  [?]  Weather: {df_weather.count():,} records")
    
    # Weather data - drop missing (√≠t missing)
    df_weather_clean = df_weather.na.drop()
    
    # Pivot pollutant data
    df_air_pivot = df_air.groupBy(
        "location_id", "location", "datetime", "lat", "lon"
    ).pivot("parameter").agg(F.first("value"))
    
    # Rename columns
    column_mapping = {
        "pm25": "PM2_5",
        "pm10": "PM10",
        "no2": "NO2",
        "so2": "SO2"
    }
    
    for old_name, new_name in column_mapping.items():
        if old_name in df_air_pivot.columns:
            df_air_pivot = df_air_pivot.withColumnRenamed(old_name, new_name)
    
    # Join v·ªõi weather data (theo datetime)
    df_location = df_air_pivot.join(
        df_weather_clean,
        df_air_pivot.datetime == df_weather_clean.time,
        "inner"
    ).drop("time")
    
    print(f"  [OK] After join: {df_location.count():,} records")
    
    # Th√™m v√†o list
    all_locations_data.append(df_location)

print(f"\n[SUCCESS] Processed {len(all_locations_data)} locations successfully!")


[PROCESSING] Processing Location 233335...



[PROCESSING] Processing Location 233335...


Py4JJavaError: An error occurred while calling o246.count.
: org.apache.spark.SparkException: [FAILED_READ_FILE.NO_HINT] Encountered error while reading file hdfs://localhost:9000/data/raw/pollutant_location_233335.csv.  SQLSTATE: KD001
	at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:856)
	at org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2$.attachFilePath(FileDataSourceV2.scala:142)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:142)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:143)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:111)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: BP-1949370538-172.18.0.2-1764993111268:blk_1073741825_1001 file=/data/raw/pollutant_location_233335.csv No live nodes contain current block Block locations: DatanodeInfoWithStorage[172.18.0.2:9866,DS-dcfb5780-c388-4f58-a289-0fcb6f79f8ef,DISK] DatanodeInfoWithStorage[172.18.0.3:9866,DS-47222eb1-3b31-4548-b669-edb6631cc138,DISK] Dead nodes:  DatanodeInfoWithStorage[172.18.0.3:9866,DS-47222eb1-3b31-4548-b669-edb6631cc138,DISK] DatanodeInfoWithStorage[172.18.0.2:9866,DS-dcfb5780-c388-4f58-a289-0fcb6f79f8ef,DISK]
	at org.apache.hadoop.hdfs.DFSInputStream.refetchLocations(DFSInputStream.java:978)
	at org.apache.hadoop.hdfs.DFSInputStream.chooseDataNode(DFSInputStream.java:953)
	at org.apache.hadoop.hdfs.DFSInputStream.chooseDataNode(DFSInputStream.java:931)
	at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:637)
	at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:845)
	at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:919)
	at java.base/java.io.DataInputStream.read(DataInputStream.java:158)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
	at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:227)
	at org.apache.hadoop.util.LineReader.readLine(LineReader.java:185)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
	at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.initialize(LineRecordReader.java:138)
	at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.$anonfun$_iterator$2(HadoopFileLinesReader.scala:66)
	at org.apache.spark.util.SparkErrorUtils.tryInitializeResource(SparkErrorUtils.scala:59)
	at org.apache.spark.util.SparkErrorUtils.tryInitializeResource$(SparkErrorUtils.scala:56)
	at org.apache.spark.util.Utils$.tryInitializeResource(Utils.scala:99)
	at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.<init>(HadoopFileLinesReader.scala:65)
	at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.$anonfun$readFile$1(CSVDataSource.scala:105)
	at org.apache.spark.TaskContextImpl.createResourceUninterruptibly(TaskContextImpl.scala:332)
	at org.apache.spark.util.Utils$.$anonfun$createResourceUninterruptiblyIfInTaskThread$1(Utils.scala:3097)
	at scala.Option.map(Option.scala:242)
	at org.apache.spark.util.Utils$.createResourceUninterruptiblyIfInTaskThread(Utils.scala:3096)
	at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.readFile(CSVDataSource.scala:105)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.$anonfun$buildReader$2(CSVFileFormat.scala:147)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:155)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:140)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:230)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:289)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext0(FileScanRDD.scala:131)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:140)
	... 20 more


### 2.3 G·ªôp t·∫•t c·∫£ Locations

In [None]:
# G·ªôp t·∫•t c·∫£ locations l·∫°i
print(f"[PROCESSING] Combining {len(all_locations_data)} locations...")

df_combined = all_locations_data[0]
for df in all_locations_data[1:]:
    df_combined = df_combined.union(df)

# OPTIMIZE: Cache ƒë·ªÉ tr√°nh recompute nhi·ªÅu l·∫ßn
df_combined = df_combined.cache()

# OPTIMIZE: Trigger action 1 l·∫ßn, tr√°nh count() nhi·ªÅu l·∫ßn
print("‚è≥ Computing combined dataset (this may take a moment)...")
total_records = df_combined.count()
num_locations = df_combined.select('location_id').distinct().count()

print(f"[SUCCESS] Combined dataset: {total_records:,} total records")
print(f"[SUCCESS] Number of locations: {num_locations}")

# OPTIMIZE: Ch·ªâ show sample, kh√¥ng orderBy to√†n b·ªô dataset (r·∫•t ch·∫≠m!)
print("\n[METADATA] Sample records (unsorted):")
df_combined.show(10, truncate=False)

# OPTIONAL: N·∫øu c·∫ßn sort, ch·ªâ sort 1 partition nh·ªè ƒë·ªÉ xem
# df_combined.orderBy("location_id", "datetime").limit(50).show(truncate=False)

[PROCESSING] Combining 14 locations...
‚è≥ Computing combined dataset (this may take a moment)...




[SUCCESS] Combined dataset: 300,318 total records
[SUCCESS] Number of locations: 14

[METADATA] Sample records (unsorted):


                                                                                

+-----------+------------+-------------------+--------+---------+----+----+-----+---+--------------+--------------------+--------------+------------------+----------------+-------------+
|location_id|location    |datetime           |lat     |lon      |NO2 |PM10|PM2_5|SO2|temperature_2m|relative_humidity_2m|wind_speed_10m|wind_direction_10m|surface_pressure|precipitation|
+-----------+------------+-------------------+--------+---------+----+----+-----+---+--------------+--------------------+--------------+------------------+----------------+-------------+
|233335     |North-245631|2022-11-02 06:00:00|22.49671|114.12824|42.5|15.4|11.3 |1.8|18.0          |80.0                |22.8          |18.0              |1005.4          |0.5          |
|233335     |North-245631|2022-11-03 15:00:00|22.49671|114.12824|20.4|10.6|6.7  |0.9|22.2          |91.0                |16.2          |102.0             |1010.5          |3.3          |
|233335     |North-245631|2022-11-04 16:00:00|22.49671|114.12824|

## 3. T·ªïng quan Dataset

In [None]:
# Th·ªëng k√™ theo location
print("[DATA] Dataset Overview by Location:")
df_combined.groupBy("location_id", "location").count().orderBy("location_id").show(truncate=False)

# Time range c·ªßa t·ª´ng location
print("\n[?] Time Range by Location:")
df_combined.groupBy("location_id").agg(
    F.min("datetime").alias("start_date"),
    F.max("datetime").alias("end_date"),
    F.count("*").alias("records")
).orderBy("location_id").show(truncate=False)

[DATA] Dataset Overview by Location:


                                                                                

+-----------+--------------------+-----+
|location_id|location            |count|
+-----------+--------------------+-----+
|233335     |North-245631        |21679|
|233336     |Southern-245632     |21255|
|7727       |Tung Chung-7727     |22360|
|7728       |Mong Kok-7728       |21676|
|7730       |Central/Western-7730|21267|
|7732       |Causeway Bay-7732   |21299|
|7733       |Sha Tin-7733        |21228|
|7734       |Sham Shui Po-7734   |21247|
|7735       |Kwun Tong-7735      |21270|
|7736       |Kwai Chung-7736     |21274|
|7737       |Tai Po-7737         |21268|
|7739       |Yuen Long-7739      |21721|
|7740       |Tsuen Wan-7740      |21687|
|7742       |Tuen Mun-7742       |2368 |
|7742       |Tuen Mun-932161     |18719|
+-----------+--------------------+-----+


[?] Time Range by Location:




+-----------+-------------------+-------------------+-------+
|location_id|start_date         |end_date           |records|
+-----------+-------------------+-------------------+-------+
|233335     |2022-11-01 00:00:00|2025-09-30 16:00:00|21679  |
|233336     |2022-11-01 00:00:00|2025-09-30 16:00:00|21255  |
|7727       |2022-11-01 00:00:00|2025-09-30 16:00:00|22360  |
|7728       |2022-11-01 00:00:00|2025-09-30 16:00:00|21676  |
|7730       |2022-11-01 00:00:00|2025-09-30 16:00:00|21267  |
|7732       |2022-11-01 00:00:00|2025-09-30 16:00:00|21299  |
|7733       |2022-11-01 00:00:00|2025-09-30 16:00:00|21228  |
|7734       |2022-11-01 00:00:00|2025-09-30 16:00:00|21247  |
|7735       |2022-11-01 00:00:00|2025-09-30 16:00:00|21270  |
|7736       |2022-11-01 00:00:00|2025-09-30 16:00:00|21274  |
|7737       |2022-11-01 00:00:00|2025-09-30 16:00:00|21268  |
|7739       |2022-11-01 00:00:00|2025-09-30 16:00:00|21721  |
|7740       |2022-11-01 00:00:00|2025-09-30 16:00:00|21687  |
|7742   

                                                                                

In [None]:
# Ki·ªÉm tra missing values
print("[WARNING]  Missing Values Summary:")
for col_name in df_combined.columns:
    null_count = df_combined.filter(F.col(col_name).isNull()).count()
    total = df_combined.count()
    pct = (null_count / total) * 100
    if null_count > 0:  # Ch·ªâ hi·ªÉn th·ªã c·ªôt c√≥ missing
        print(f"  {col_name:25s}: {null_count:8,} ({pct:6.2f}%)")



                                                                                

  NO2                      :    7,612 (  2.53%)


                                                                                

  PM10                     :    3,391 (  1.13%)


                                                                                

  PM2_5                    :   11,161 (  3.72%)


                                                                                

  SO2                      :    7,424 (  2.47%)


                                                                                

In [None]:
# Statistics t·ªïng quan
print("[?] Overall Statistics:")
df_combined.select(
    "PM2_5", "PM10", "NO2", "SO2",
    "temperature_2m", "relative_humidity_2m", "wind_speed_10m", "precipitation"
).describe().show()

[?] Overall Statistics:


25/12/01 16:06:11 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+-------+------------------+------------------+------------------+------------------+-----------------+--------------------+------------------+------------------+
|summary|             PM2_5|              PM10|               NO2|               SO2|   temperature_2m|relative_humidity_2m|    wind_speed_10m|     precipitation|
+-------+------------------+------------------+------------------+------------------+-----------------+--------------------+------------------+------------------+
|  count|            289157|            296927|            292706|            292894|           300318|              300318|            300318|            300318|
|   mean|15.684073703904803|25.418589417600963|  39.2034498780346|3.7197607325517086|23.17203064751364|   79.55055974000892|12.503190950925351|0.2767030281235236|
| stddev|10.897151413820698| 19.75489546779464|26.134835003178324|2.4372186299768455|5.550822884231528|   15.46350485998542| 6.256373957792993|1.1696306615920375|
|    min|             

                                                                                

## 4. L√†m s·∫°ch D·ªØ li·ªáu

**Quy tr√¨nh l√†m s·∫°ch:**
1. **Lo·∫°i b·ªè Outliers tr∆∞·ªõc** - ƒê·ªÉ tr√°nh gi√° tr·ªã c·ª±c ƒëoan ·∫£nh h∆∞·ªüng ƒë·∫øn t√≠nh to√°n statistics
2. **Fill Missing Values sau** - Imputation d·ª±a tr√™n d·ªØ li·ªáu ƒë√£ lo·∫°i b·ªè outliers

### 4.1. Lo·∫°i b·ªè Outliers

Lo·∫°i b·ªè c√°c gi√° tr·ªã c·ª±c ƒëoan tr∆∞·ªõc khi imputation:

In [None]:
# Lo·∫°i b·ªè outliers theo WHO/EPA International Standards (cho d·ªØ li·ªáu Hong Kong)
# [WARNING]  QUAN TR·ªåNG: PM2.5 l√† TARGET variable - PH·∫¢I c√≥ gi√° tr·ªã th·∫≠t!
#     -> Records c√≥ PM2.5 = null s·∫Ω B·ªä LO·∫†I B·ªé
#     -> Ch·ªâ c√°c features kh√°c (PM10, NO2, SO2) m·ªõi ƒë∆∞·ª£c ph√©p null v√† impute sau

df_no_outliers = df_combined.filter(
    # [TARGET] TARGET: PM2.5 theo WHO Emergency threshold (kh√¥ng cho ph√©p null)
    (F.col("PM2_5").isNotNull()) & 
    (F.col("PM2_5") >= 0) & (F.col("PM2_5") < 250) &  # WHO Emergency: 250 Œºg/m¬≥
    
    # [DATA] FEATURES: WHO/EPA International Standards - Cho ph√©p null, ch·ªâ lo·∫°i outliers
    ((F.col("PM10").isNull()) | ((F.col("PM10") >= 0) & (F.col("PM10") < 430))) &  # WHO Emergency: 430 Œºg/m¬≥
    ((F.col("NO2").isNull()) | ((F.col("NO2") >= 0) & (F.col("NO2") < 400))) &     # WHO/EU: 400 Œºg/m¬≥ (1-hour)
    ((F.col("SO2").isNull()) | ((F.col("SO2") >= 0) & (F.col("SO2") < 500))) &     # WHO/EU: 500 Œºg/m¬≥ (10-min)
    
    # [?] WEATHER: WMO standards cho Hong Kong
    (F.col("precipitation") >= 0) & (F.col("precipitation") < 100)  # WMO: 100mm/h extreme rain
)

records_before = df_combined.count()
records_after = df_no_outliers.count()
removed = records_before - records_after

print(f"[DATA] Outlier Removal:")
print(f"  Before: {records_before:,} records")
print(f"  After:  {records_after:,} records")
print(f"  Removed: {removed:,} records ({removed/records_before*100:.2f}%)")
print(f"\n  [WARNING]  Note: Records with PM2.5 = null are REMOVED (target variable must have real values)")

# Ki·ªÉm tra missing values sau khi lo·∫°i outliers
print("\n[WARNING]  Missing values after outlier removal:")
for col_name in ["PM2_5", "PM10", "NO2", "SO2"]:
    if col_name in df_no_outliers.columns:
        null_count = df_no_outliers.filter(F.col(col_name).isNull()).count()
        total = df_no_outliers.count()
        pct = (null_count / total) * 100
        if null_count > 0:
            print(f"  {col_name:10s}: {null_count:8,} ({pct:6.2f}%)")
        elif col_name == "PM2_5":
            print(f"  {col_name:10s}: {null_count:8,} ({pct:6.2f}%) [SUCCESS] (Target - must be 0%)")

                                                                                

[DATA] Outlier Removal:
  Before: 300,318 records
  After:  289,157 records
  Removed: 11,161 records (3.72%)




                                                                                

  PM2_5     :        0 (  0.00%) [SUCCESS] (Target - must be 0%)


                                                                                

  PM10      :      296 (  0.10%)


                                                                                

  NO2       :    7,361 (  2.55%)




  SO2       :    7,178 (  2.48%)


                                                                                

### 4.2. X·ª≠ l√Ω Missing Values (Interpolation)

**Chi·∫øn l∆∞·ª£c Imputation cho Time Series:**
- **PM2.5**: ƒê√£ lo·∫°i b·ªè t·∫•t c·∫£ records c√≥ null (target variable)
- **PM10, NO2, SO2**: S·ª≠ d·ª•ng **Linear Interpolation** (t·ªët nh·∫•t cho time series)
  - B∆∞·ªõc 1: **Linear Interpolation** - N·ªôi suy tuy·∫øn t√≠nh d·ª±a tr√™n gi√° tr·ªã tr∆∞·ªõc & sau
  - B∆∞·ªõc 2: **Forward Fill** - X·ª≠ l√Ω missing ·ªü cu·ªëi chu·ªói (kh√¥ng c√≥ gi√° tr·ªã sau)
  - B∆∞·ªõc 3: **Backward Fill** - X·ª≠ l√Ω missing ·ªü ƒë·∫ßu chu·ªói (kh√¥ng c√≥ gi√° tr·ªã tr∆∞·ªõc)
  - B∆∞·ªõc 4: **Mean** - Backup cu·ªëi c√πng (n·∫øu c√≤n missing)

In [None]:
# Chi·∫øn l∆∞·ª£c Imputation cho Time Series Data
# S·ª≠ d·ª•ng PySpark Window Functions - N·ªôi suy tuy·∫øn t√≠nh d·ª±a tr√™n kho·∫£ng c√°ch th·ªùi gian

# List c√°c c·ªôt FEATURES c·∫ßn impute (KH√îNG bao g·ªìm PM2.5 - target variable)
pollutant_cols = ["PM10", "NO2", "SO2"]  # [WARNING] Kh√¥ng c√≥ PM2.5!

print(f"[PROCESSING] Time Series Imputation Strategy (PySpark Native):")
print(f"   1. True Linear Interpolation - y = y‚ÇÅ + (y‚ÇÇ-y‚ÇÅ) √ó (t-t‚ÇÅ)/(t‚ÇÇ-t‚ÇÅ)")
print(f"   2. Forward Fill - If only prev value available")
print(f"   3. Backward Fill - If only next value available")
print(f"   4. Null - If no surrounding values (rare)")
print(f"\n   Columns to impute: {pollutant_cols}")
print(f"   PM2.5 NOT imputed (target variable - already removed nulls)")
print(f"   [?] Safe: Window partitioned by location_id (no cross-location interpolation)\n")

# Cache ƒë·ªÉ tƒÉng performance
df_filled = df_no_outliers.cache()

# Ki·ªÉm tra missing TR∆Ø·ªöC khi interpolate
print("[WARNING]  Missing values BEFORE interpolation:")
for col_name in pollutant_cols:
    if col_name in df_filled.columns:
        null_count = df_filled.filter(F.col(col_name).isNull()).count()
        total = df_filled.count()
        pct = (null_count / total) * 100
        if null_count > 0:
            print(f"  {col_name:10s}: {null_count:8,} ({pct:6.2f}%)")

[PROCESSING] Time Series Imputation Strategy (PySpark Native):
   1. True Linear Interpolation - y = y‚ÇÅ + (y‚ÇÇ-y‚ÇÅ) √ó (t-t‚ÇÅ)/(t‚ÇÇ-t‚ÇÅ)
   2. Forward Fill - If only prev value available
   3. Backward Fill - If only next value available
   4. Null - If no surrounding values (rare)

   Columns to impute: ['PM10', 'NO2', 'SO2']
   PM2.5 NOT imputed (target variable - already removed nulls)
   [?] Safe: Window partitioned by location_id (no cross-location interpolation)



                                                                                

  PM10      :      296 (  0.10%)


                                                                                

  NO2       :    7,361 (  2.55%)




  SO2       :    7,178 (  2.48%)


                                                                                

In [None]:
# √Åp d·ª•ng True Linear Interpolation v·ªõi PySpark (kh√¥ng d√πng Pandas)
# N·ªôi suy tuy·∫øn t√≠nh d·ª±a tr√™n kho·∫£ng c√°ch th·ªùi gian TH·ª∞C (epoch)
# Window function ƒë·∫£m b·∫£o KH√îNG n·ªôi suy ch√©o gi·ªØa c√°c locations

print("[PROCESSING] Applying true linear interpolation per location (PySpark native)...")

# T·∫°o c·ªôt epoch (timestamp d·∫°ng s·ªë) ƒë·ªÉ t√≠nh to√°n kho·∫£ng c√°ch th·ªùi gian
df_filled = df_filled.withColumn("epoch", F.col("datetime").cast("long"))

# ƒê·ªãnh nghƒ©a Window cho t·ª´ng location
w_forward = (
    Window.partitionBy("location_id")
    .orderBy("epoch")
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)
)

w_backward = (
    Window.partitionBy("location_id")
    .orderBy("epoch")
    .rowsBetween(Window.currentRow, Window.unboundedFollowing)
)

# X·ª≠ l√Ω t·ª´ng pollutant column
for col_name in pollutant_cols:
    if col_name not in df_filled.columns:
        continue
    
    print(f"  ‚ñ∂ Interpolating {col_name}...", end=" ", flush=True)
    
    # B∆∞·ªõc 1: T√¨m gi√° tr·ªã & timestamp TR∆Ø·ªöC v√† SAU g·∫ßn nh·∫•t (c√≥ gi√° tr·ªã non-null)
    df_filled = (
        df_filled
        .withColumn(f"{col_name}_prev_value", F.last(col_name, True).over(w_forward))
        .withColumn(f"{col_name}_next_value", F.first(col_name, True).over(w_backward))
        .withColumn(f"{col_name}_prev_time", F.last(F.when(F.col(col_name).isNotNull(), F.col("epoch")), True).over(w_forward))
        .withColumn(f"{col_name}_next_time", F.first(F.when(F.col(col_name).isNotNull(), F.col("epoch")), True).over(w_backward))
    )
    
    # B∆∞·ªõc 2: T√≠nh to√°n Linear Interpolation theo c√¥ng th·ª©c:
    # y = y‚ÇÅ + (y‚ÇÇ - y‚ÇÅ) * (t - t‚ÇÅ) / (t‚ÇÇ - t‚ÇÅ)
    interpolated_value = (
        F.col(f"{col_name}_prev_value") +
        (F.col(f"{col_name}_next_value") - F.col(f"{col_name}_prev_value")) *
        ((F.col("epoch") - F.col(f"{col_name}_prev_time")) /
         (F.col(f"{col_name}_next_time") - F.col(f"{col_name}_prev_time")))
    )
    
    # B∆∞·ªõc 3: Logic ch·ªçn gi√° tr·ªã cu·ªëi c√πng v·ªõi fallback
    df_filled = df_filled.withColumn(
        col_name,
        F.when(F.col(col_name).isNotNull(), F.col(col_name))  # Gi·ªØ nguy√™n n·∫øu c√≥ gi√° tr·ªã
         .when(
             # Linear interpolation n·∫øu c√≥ c·∫£ prev & next v√† kh√¥ng chia 0
             (F.col(f"{col_name}_prev_value").isNotNull()) &
             (F.col(f"{col_name}_next_value").isNotNull()) &
             ((F.col(f"{col_name}_next_time") - F.col(f"{col_name}_prev_time")) != 0),
             interpolated_value
         )
         .when(F.col(f"{col_name}_prev_value").isNotNull(), F.col(f"{col_name}_prev_value"))  # Forward fill
         .when(F.col(f"{col_name}_next_value").isNotNull(), F.col(f"{col_name}_next_value"))  # Backward fill
         .otherwise(None)  # V·∫´n null n·∫øu kh√¥ng c√≥ data n√†o
    )
    
    # B∆∞·ªõc 4: X√≥a c√°c c·ªôt ph·ª• ƒë·ªÉ gi·∫£m memory
    df_filled = df_filled.drop(
        f"{col_name}_prev_value", f"{col_name}_next_value",
        f"{col_name}_prev_time", f"{col_name}_next_time"
    )
    
    print("[OK]")

# Cache k·∫øt qu·∫£ sau khi interpolation
df_filled = df_filled.cache()

# Trigger computation v√† ƒë·∫øm records
count = df_filled.count()
print(f"\n[SUCCESS] Linear interpolation completed! Total records: {count:,}")
print(f"   [GEAR]  Method: True linear interpolation based on time distance (epoch)")
print(f"   [?] Safe: No cross-location interpolation (partitioned by location_id)")
print(f"   [RUN] Optimized: Native PySpark (no Pandas conversion)")

[PROCESSING] Applying true linear interpolation per location (PySpark native)...
  ‚ñ∂ Interpolating PM10... [OK]
  ‚ñ∂ Interpolating NO2... [OK]
  ‚ñ∂ Interpolating SO2... [OK]





[SUCCESS] Linear interpolation completed! Total records: 289,157
   [GEAR]  Method: True linear interpolation based on time distance (epoch)
   [?] Safe: No cross-location interpolation (partitioned by location_id)
   [RUN] Optimized: Native PySpark (no Pandas conversion)


                                                                                

In [None]:
# Verify: PM2.5 kh√¥ng c√≥ null, c√°c features kh√°c kh√¥ng c√≥ null
print("\n[METADATA] Final Missing Values Check (After Interpolation):")

# Ki·ªÉm tra PM2.5 (target)
pm25_nulls = df_filled.filter(F.col("PM2_5").isNull()).count()
print(f"  PM2_5 (Target): {pm25_nulls:,} nulls [SUCCESS] (Must be 0)")

# Ki·ªÉm tra features
total_nulls = 0
for col_name in pollutant_cols:
    if col_name in df_filled.columns:
        null_count = df_filled.filter(F.col(col_name).isNull()).count()
        total_nulls += null_count
        if null_count > 0:
            print(f"  {col_name:10s}: {null_count:,} nulls [WARNING]")
        else:
            print(f"  {col_name:10s}: {null_count:,} nulls [SUCCESS]")

# X·ª≠ l√Ω edge case: Drop records c√≤n null (kh√¥ng c√≥ gi√° tr·ªã xung quanh ƒë·ªÉ interpolate)
if total_nulls > 0:
    print(f"\n[WARNING]  Found {total_nulls} remaining nulls (edge cases with no surrounding data)")
    print(f"   -> Dropping these records to ensure data quality...")
    
    records_before_drop = df_filled.count()
    
    # Drop records c√≥ b·∫•t k·ª≥ feature n√†o c√≤n null
    for col_name in pollutant_cols:
        df_filled = df_filled.filter(F.col(col_name).isNotNull())
    
    records_after_drop = df_filled.count()
    dropped = records_before_drop - records_after_drop
    
    print(f"   Before drop: {records_before_drop:,} records")
    print(f"   After drop:  {records_after_drop:,} records")
    print(f"   Dropped:     {dropped:,} records ({dropped/records_before_drop*100:.2f}%)")
    print(f"\n   [SUCCESS] All feature columns now have 0 nulls!")
else:
    print("\n  [SUCCESS] No missing values in any feature columns!")

# X√≥a c·ªôt epoch (ƒë√£ d√πng xong)
df_filled = df_filled.drop("epoch")

# Verify l·∫ßn cu·ªëi
print(f"\n[DATA] Final Verification:")
for col_name in ["PM2_5"] + pollutant_cols:
    if col_name in df_filled.columns:
        null_count = df_filled.filter(F.col(col_name).isNull()).count()
        print(f"  {col_name:10s}: {null_count:,} nulls [SUCCESS]")

print(f"\n[SUCCESS] Data cleaning completed with True Linear Interpolation!")
print(f"   Final dataset: {df_filled.count():,} records")
print(f"   [WARNING]  All records have REAL PM2.5 values (target variable)")
print(f"   [SUCCESS] Features interpolated smoothly (time-based linear interpolation)")
print(f"   [SUCCESS] Edge cases (no surrounding data) removed")
print(f"   [RUN] Performance: Native PySpark (no Pandas conversion)")

# C·∫≠p nh·∫≠t df_combined v·ªõi d·ªØ li·ªáu ƒë√£ clean v√† s·∫Øp x·∫øp
df_combined = df_filled.orderBy("location_id", "datetime")


[METADATA] Final Missing Values Check (After Interpolation):
  PM2_5 (Target): 0 nulls [SUCCESS] (Must be 0)
  PM10      : 0 nulls [SUCCESS]
  NO2       : 0 nulls [SUCCESS]
  SO2       : 0 nulls [SUCCESS]

  [SUCCESS] No missing values in any feature columns!

[DATA] Final Verification:
  PM2_5     : 0 nulls [SUCCESS]
  PM10      : 0 nulls [SUCCESS]
  NO2       : 0 nulls [SUCCESS]
  SO2       : 0 nulls [SUCCESS]

[SUCCESS] Data cleaning completed with True Linear Interpolation!
   Final dataset: 289,157 records
   [SUCCESS] Features interpolated smoothly (time-based linear interpolation)
   [SUCCESS] Edge cases (no surrounding data) removed
   [RUN] Performance: Native PySpark (no Pandas conversion)


## 5. Feature Engineering & Normalization

**Quy tr√¨nh ƒê√öNG ƒë·ªÉ tr√°nh Data Leakage:**
1. **Time Features** - Th√™m cyclic encoding (sin/cos) v√† is_weekend (kh√¥ng c·∫ßn normalize)
2. **Temporal Split** - Chia train/validation/test theo th·ªùi gian (70/15/15)
3. **Normalization** - Chu·∫©n h√≥a **CH·ªà numerical G·ªêC** b·∫±ng Min-Max t·ª´ train set
4. **Lag Features** - T·∫°o lag T·ª™ C√ÅC C·ªòT ƒê√É SCALE (gi·ªØ ƒë√∫ng scale relationship)
5. **Model-Specific Datasets** - Chu·∫©n b·ªã ri√™ng cho Deep Learning v√† XGBoost
6. **Null Handling** - X·ª≠ l√Ω nulls trong lag features cu·ªëi c√πng

**[WARNING] QUAN TR·ªåNG:** Lag features ph·∫£i t·∫°o SAU khi normalize ƒë·ªÉ gi·ªØ ƒë√∫ng m·ªëi quan h·ªá scale!

In [None]:
# B∆∞·ªõc 1: Th√™m Time Features t·ª´ d·ªØ li·ªáu ƒë√£ clean
print("[PROCESSING] Step 1: Adding Time Features (No normalization needed)...")

import math

df_features = df_combined \
    .withColumn("hour", F.hour("datetime")) \
    .withColumn("month", F.month("datetime")) \
    .withColumn("day_of_week", F.dayofweek("datetime"))

# Cyclic encoding cho hour (24h cycle)
df_features = df_features \
    .withColumn("hour_sin", F.sin(2 * math.pi * F.col("hour") / 24)) \
    .withColumn("hour_cos", F.cos(2 * math.pi * F.col("hour") / 24))

# Cyclic encoding cho month (12 month cycle)
df_features = df_features \
    .withColumn("month_sin", F.sin(2 * math.pi * F.col("month") / 12)) \
    .withColumn("month_cos", F.cos(2 * math.pi * F.col("month") / 12))

# Cyclic encoding cho day_of_week (7 day cycle)
df_features = df_features \
    .withColumn("day_of_week_sin", F.sin(2 * math.pi * F.col("day_of_week") / 7)) \
    .withColumn("day_of_week_cos", F.cos(2 * math.pi * F.col("day_of_week") / 7))

# Binary feature: is_weekend
df_features = df_features \
    .withColumn("is_weekend", F.when(F.col("day_of_week").isin([1, 7]), 1).otherwise(0))

# ‚úÖ FIX: Th√™m cyclic encoding cho wind_direction
df_features = df_features \
    .withColumn("wind_direction_sin", F.sin(2 * math.pi * F.col("wind_direction_10m") / 360)) \
    .withColumn("wind_direction_cos", F.cos(2 * math.pi * F.col("wind_direction_10m") / 360))

# X√≥a c√°c c·ªôt trung gian
df_features = df_features.drop("hour", "month", "day_of_week", "wind_direction_10m")

print("[OK] Time features added successfully!")
print(f"[OK] Total records: {df_features.count():,}")
print(f"[OK] Total columns: {len(df_features.columns)}")

print("\n[METADATA] Time Features Created:")
print("  Cyclic (sin/cos): hour, month, day_of_week -> Already in [-1, 1]")
print("  Binary: is_weekend -> Already in [0, 1]")
print("  [SUCCESS] No normalization needed for time features!")

[PROCESSING] Step 1: Adding Time Features (No normalization needed)...
[OK] Time features added successfully!
[OK] Total records: 289,157
[OK] Total columns: 23

[METADATA] Time Features Created:
  Cyclic (sin/cos): hour, month, day_of_week -> Already in [-1, 1]
  Binary: is_weekend -> Already in [0, 1]
  [SUCCESS] No normalization needed for time features!


In [None]:
# B∆∞·ªõc 2: STRATIFIED TEMPORAL SPLIT (Gi·∫£i quy·∫øt Distribution Shift)
print("\n[PROCESSING] Step 2: STRATIFIED Temporal Train/Val/Test Split...")
print("=" * 70)

# ========================================
# STRATIFIED TEMPORAL SPLIT
# ========================================
# V·∫•n ƒë·ªÅ v·ªõi Simple Temporal Split:
# - Train: Nov 2022 - Nov 2024 (ch·ªß y·∫øu m√πa ƒë√¥ng/xu√¢n)
# - Val: Nov 2024 - Apr 2025 (m√πa ƒë√¥ng - √¥ nhi·ªÖm CAO)
# - Test: Apr 2025 - Sep 2025 (m√πa h√® - √¥ nhi·ªÖm TH·∫§P)
# => Ph√¢n ph·ªëi KH√îNG ƒë·ªìng nh·∫•t gi·ªØa c√°c t·∫≠p
#
# Gi·∫£i ph√°p: Stratified Temporal Split
# - Chia theo TH√ÅNG, m·ªói th√°ng chia 70/15/15
# - ƒê·∫£m b·∫£o m·ªói t·∫≠p c√≥ ƒë·ªß d·ªØ li·ªáu t·ª´ T·∫§T C·∫¢ c√°c m√πa
# - V·∫´n gi·ªØ temporal order trong t·ª´ng th√°ng (tr√°nh data leakage)

print("[STRATEGY] Stratified Temporal Split:")
print("   - Split by MONTH: Each month divided 70/15/15")
print("   - Ensures ALL seasons represented in train/val/test")
print("   - Maintains temporal order WITHIN each month")
print("   - Reduces distribution shift between splits")

# Th√™m c·ªôt month ƒë·ªÉ stratify
df_with_month = df_features.withColumn("split_month", F.date_format("datetime", "yyyy-MM"))

# L·∫•y danh s√°ch c√°c th√°ng
months = df_with_month.select("split_month").distinct().orderBy("split_month").collect()
months = [row["split_month"] for row in months]

print(f"\n[DATA] Total months in dataset: {len(months)}")
print(f"   First month: {months[0]}")
print(f"   Last month: {months[-1]}")

# H√†m chia stratified cho m·ªói th√°ng
def stratified_split_by_month(df, train_ratio=0.70, val_ratio=0.15):
    """
    Split data by month, ensuring each month contributes to all splits
    Maintains temporal order within each month
    """
    from pyspark.sql.window import Window
    
    # Window ƒë·ªÉ rank theo th·ªùi gian trong t·ª´ng th√°ng v√† location
    w = Window.partitionBy("split_month", "location_id").orderBy("datetime")
    
    # Th√™m row number v√† t·ªïng s·ªë rows trong m·ªói group
    df_ranked = df.withColumn("row_num", F.row_number().over(w))
    
    # T√≠nh t·ªïng s·ªë records trong m·ªói th√°ng-location
    w_count = Window.partitionBy("split_month", "location_id")
    df_ranked = df_ranked.withColumn("total_rows", F.count("*").over(w_count))
    
    # T√≠nh cutoff points
    df_ranked = df_ranked.withColumn(
        "train_cutoff", (F.col("total_rows") * train_ratio).cast("int")
    ).withColumn(
        "val_cutoff", (F.col("total_rows") * (train_ratio + val_ratio)).cast("int")
    )
    
    # Assign split based on row position
    df_split = df_ranked.withColumn(
        "split_type",
        F.when(F.col("row_num") <= F.col("train_cutoff"), "train")
         .when(F.col("row_num") <= F.col("val_cutoff"), "val")
         .otherwise("test")
    )
    
    # Drop helper columns
    df_split = df_split.drop("row_num", "total_rows", "train_cutoff", "val_cutoff", "split_month")
    
    return df_split

print("\n[PROCESSING] Applying stratified split...")
df_stratified = stratified_split_by_month(df_with_month, train_ratio=0.70, val_ratio=0.15)

# Cache v√† split
df_stratified = df_stratified.cache()

df_train_raw = df_stratified.filter(F.col("split_type") == "train").drop("split_type")
df_val_raw = df_stratified.filter(F.col("split_type") == "val").drop("split_type")
df_test_raw = df_stratified.filter(F.col("split_type") == "test").drop("split_type")

# Count
train_count = df_train_raw.count()
val_count = df_val_raw.count()
test_count = df_test_raw.count()
total_count = train_count + val_count + test_count

print(f"\n[DATA] Stratified Split Results:")
print(f"  Train: {train_count:8,} ({train_count/total_count*100:.1f}%)")
print(f"  Val:   {val_count:8,} ({val_count/total_count*100:.1f}%)")
print(f"  Test:  {test_count:8,} ({test_count/total_count*100:.1f}%)")

# Verify distribution similarity
print(f"\n[VERIFY] Checking PM2.5 distribution across splits...")
train_stats = df_train_raw.select(F.mean("PM2_5").alias("mean"), F.stddev("PM2_5").alias("std")).collect()[0]
val_stats = df_val_raw.select(F.mean("PM2_5").alias("mean"), F.stddev("PM2_5").alias("std")).collect()[0]
test_stats = df_test_raw.select(F.mean("PM2_5").alias("mean"), F.stddev("PM2_5").alias("std")).collect()[0]

print(f"   Train: mean={train_stats['mean']:.2f}, std={train_stats['std']:.2f}")
print(f"   Val:   mean={val_stats['mean']:.2f}, std={val_stats['std']:.2f}")
print(f"   Test:  mean={test_stats['mean']:.2f}, std={test_stats['std']:.2f}")

# Calculate improvement
mean_diff_val = abs(val_stats['mean'] - train_stats['mean']) / train_stats['mean'] * 100
mean_diff_test = abs(test_stats['mean'] - train_stats['mean']) / train_stats['mean'] * 100

print(f"\n[IMPROVEMENT] Distribution alignment:")
print(f"   Train-Val difference:  {mean_diff_val:.1f}% (target: < 10%)")
print(f"   Train-Test difference: {mean_diff_test:.1f}% (target: < 10%)")

if mean_diff_val < 15 and mean_diff_test < 15:
    print(f"   [SUCCESS] Stratified split significantly reduced distribution shift!")
else:
    print(f"   [WARNING] Some distribution shift remains - consider additional techniques")

# Cache final splits
df_train_raw = df_train_raw.cache()
df_val_raw = df_val_raw.cache()
df_test_raw = df_test_raw.cache()

# Cleanup
df_stratified.unpersist()

print(f"\n[SUCCESS] Stratified temporal split completed!")
print(f"   Each split now contains data from ALL seasons")
print(f"   Next: Normalize using TRAIN SET statistics ONLY")
print("=" * 70)


[PROCESSING] Step 2: STRATIFIED Temporal Train/Val/Test Split...
[STRATEGY] Stratified Temporal Split:
   - Split by MONTH: Each month divided 70/15/15
   - Ensures ALL seasons represented in train/val/test
   - Maintains temporal order WITHIN each month
   - Reduces distribution shift between splits


                                                                                


[DATA] Total months in dataset: 35
   First month: 2022-11
   Last month: 2025-09

[PROCESSING] Applying stratified split...


                                                                                


[DATA] Stratified Split Results:
  Train:  202,154 (69.9%)
  Val:     43,397 (15.0%)
  Test:    43,606 (15.1%)

[VERIFY] Checking PM2.5 distribution across splits...
   Train: mean=15.50, std=10.63
   Val:   mean=15.19, std=9.74
   Test:  mean=17.03, std=12.93

[IMPROVEMENT] Distribution alignment:
   Train-Val difference:  2.0% (target: < 10%)
   Train-Test difference: 9.8% (target: < 10%)
   [SUCCESS] Stratified split significantly reduced distribution shift!

[SUCCESS] Stratified temporal split completed!
   Each split now contains data from ALL seasons
   Next: Normalize using TRAIN SET statistics ONLY


In [None]:
# B∆∞·ªõc 3: Normalize NUMERICAL G·ªêC (CH·ªà g·ªëc, KH√îNG c√≥ lag features)
print(f"\n[PROCESSING] Step 3: Normalize NUMERICAL BASE FEATURES using TRAIN SET ONLY...")

# ========================================
# LOG TRANSFORMATION FOR SKEWED TARGET (PM2.5)
# ========================================
# PM2.5 distribution is highly skewed (skewness=1.53)
# Apply log1p transformation to reduce skewness and improve model learning
print(f"[LOG TRANSFORM] Applying log1p transformation to PM2.5 (target)...")
print(f"   Reason: PM2.5 distribution is highly skewed (skewness > 1.5)")
print(f"   Formula: PM2_5_log = log(1 + PM2_5)")

df_train_raw = df_train_raw.withColumn("PM2_5_log", F.log1p(F.col("PM2_5")))
df_val_raw = df_val_raw.withColumn("PM2_5_log", F.log1p(F.col("PM2_5")))
df_test_raw = df_test_raw.withColumn("PM2_5_log", F.log1p(F.col("PM2_5")))

# Verify log transformation
log_stats = df_train_raw.select(
    F.mean("PM2_5").alias("original_mean"),
    F.stddev("PM2_5").alias("original_std"),
    F.mean("PM2_5_log").alias("log_mean"),
    F.stddev("PM2_5_log").alias("log_std")
).collect()[0]

print(f"   Original PM2.5: mean={log_stats['original_mean']:.2f}, std={log_stats['original_std']:.2f}")
print(f"   Log PM2.5:      mean={log_stats['log_mean']:.2f}, std={log_stats['log_std']:.2f}")
print(f"[SUCCESS] Log transformation applied!")

# [WARNING] QUAN TR·ªåNG: CH·ªà normalize c√°c c·ªôt G·ªêC, KH√îNG bao g·ªìm lag features
# Lag features s·∫Ω t·∫°o SAU t·ª´ c√°c c·ªôt ƒë√£ scale
# Use PM2_5_log instead of PM2_5 for target normalization
numerical_base_cols = [
    # Target with log transformation
    "PM2_5_log",
    # Other Pollutants (current values only)
    "PM10", "NO2", "SO2",
    # Weather features (current values only)
    "temperature_2m", "relative_humidity_2m", "wind_speed_10m",
    "surface_pressure", "precipitation"
]

print(f"[DATA] Normalizing {len(numerical_base_cols)} BASE features (NO lag features yet)...")
print(f"   Features to normalize: {numerical_base_cols}")
print(f"   [WARNING]  Computing min/max from TRAIN SET ONLY (preventing data leakage)")

# T√≠nh min/max CH·ªà T·ª™ TRAIN SET
scaler_params = {}

for col_name in numerical_base_cols:
    if col_name in df_train_raw.columns:
        # CH·ªà D√ôNG TRAIN SET ƒê·ªÇ T√çNH MIN/MAX  
        stats = df_train_raw.select(
            F.min(col_name).alias("min"),
            F.max(col_name).alias("max")
        ).collect()[0]
        
        min_val = stats["min"]
        max_val = stats["max"]
        
        # [WARNING] CRITICAL: Handle None values from null columns
        if min_val is None or max_val is None:
            print(f"  [WARNING]  Skipping {col_name}: All values are null")
            continue
        
        # [WARNING] CRITICAL: Tr√°nh chia 0 khi min = max
        if max_val == min_val:
            max_val = min_val + 1
        
        scaler_params[col_name] = {"min": min_val, "max": max_val}
        print(f"  [OK] {col_name:30s}: [{min_val:8.2f}, {max_val:8.2f}] -> [0, 1]")

print(f"\n[SUCCESS] Scaler parameters computed from TRAIN SET only!")

# √Åp d·ª•ng normalization cho t·∫•t c·∫£ splits
def apply_scaling(df, scaler_params):
    """Apply Min-Max scaling using precomputed parameters"""
    df_scaled = df
    for col_name, params in scaler_params.items():
        if col_name in df.columns:
            min_val = params["min"]
            max_val = params["max"]
            df_scaled = df_scaled.withColumn(
                f"{col_name}_scaled",
                (F.col(col_name) - min_val) / (max_val - min_val)
            )
    return df_scaled

print(f"\n Applying Min-Max scaling [0, 1] to all splits...")

# Apply scaling and trigger computation
df_train = apply_scaling(df_train_raw, scaler_params)
df_val = apply_scaling(df_val_raw, scaler_params)
df_test = apply_scaling(df_test_raw, scaler_params)

# Trigger computation and cache
_ = df_train.count()
_ = df_val.count()
_ = df_test.count()

df_train = df_train.cache()
df_val = df_val.cache()
df_test = df_test.cache()

# Unpersist raw versions to free memory
df_train_raw.unpersist()
df_val_raw.unpersist()
df_test_raw.unpersist()

print(f"[SUCCESS] Base feature normalization completed!")
print(f"   [DATA] All splits normalized using train statistics only")
print(f"   [WARNING]  Next: Create lag features FROM SCALED COLUMNS")


[PROCESSING] Step 3: Normalize NUMERICAL BASE FEATURES using TRAIN SET ONLY...
[LOG TRANSFORM] Applying log1p transformation to PM2.5 (target)...
   Reason: PM2.5 distribution is highly skewed (skewness > 1.5)
   Formula: PM2_5_log = log(1 + PM2_5)


                                                                                

   Original PM2.5: mean=15.50, std=10.63
   Log PM2.5:      mean=2.60, std=0.66
[SUCCESS] Log transformation applied!
[DATA] Normalizing 9 BASE features (NO lag features yet)...
   Features to normalize: ['PM2_5_log', 'PM10', 'NO2', 'SO2', 'temperature_2m', 'relative_humidity_2m', 'wind_speed_10m', 'surface_pressure', 'precipitation']
  [OK] PM2_5_log                     : [    0.00,     5.12] -> [0, 1]
  [OK] PM10                          : [    0.00,   401.70] -> [0, 1]
  [OK] NO2                           : [    0.00,   253.20] -> [0, 1]
  [OK] SO2                           : [    0.00,    76.30] -> [0, 1]
  [OK] temperature_2m                : [    4.90,    36.50] -> [0, 1]
  [OK] relative_humidity_2m          : [   16.00,   100.00] -> [0, 1]
  [OK] wind_speed_10m                : [    0.00,    82.30] -> [0, 1]
  [OK] surface_pressure              : [  974.70,  1032.50] -> [0, 1]
  [OK] precipitation                 : [    0.00,    53.20] -> [0, 1]

[SUCCESS] Scaler parameters comp

                                                                                

[SUCCESS] Base feature normalization completed!
   [DATA] All splits normalized using train statistics only


In [None]:
# B∆∞·ªõc 4: L∆∞u Scaler Parameters
print(f"\n[SAVE] Step 4: Saving Scaler Parameters...")

import json
from pathlib import Path

# Include log transformation info in scaler params
scaler_json = {
    col: {"min": float(params["min"]), "max": float(params["max"])} 
    for col, params in scaler_params.items()
}

# Add metadata for log transformation (needed for inverse transform during inference)
scaler_json["_metadata"] = {
    "log_transformed_features": ["PM2_5"],  # Features that were log1p transformed
    "target_feature": "PM2_5_log_scaled",
    "inverse_transform_order": ["denormalize", "expm1"]  # Order: first denormalize, then exp(x)-1
}

# ========================================
# ADAPTIVE PATH (Kaggle vs Colab vs Local)
# ========================================
if IN_KAGGLE:
    # [KAGGLE] Kaggle: Write to /kaggle/working (auto-saved on commit)
    processed_dir = Path("/kaggle/working/processed")
    print(f"[KAGGLE] Kaggle mode: Saving to {processed_dir}")
    
elif IN_COLAB:
    # [COLAB] Colab: Write to Google Drive
    processed_dir = Path("/content/drive/MyDrive/pm25-data/processed")
    print(f"[COLAB] Colab mode: Saving to Google Drive")
    
else:
    # [LOCAL] Local: Write to project folder
    processed_dir = Path("../data/processed")
    print(f"[LOCAL] Local mode: Saving to {processed_dir}")

# T·∫°o th∆∞ m·ª•c v·ªõi parents=True (t·∫°o c·∫£ parent directories n·∫øu ch∆∞a c√≥)
processed_dir.mkdir(parents=True, exist_ok=True)

# L∆∞u ra file JSON
scaler_path = processed_dir / "scaler_params.json"
with open(scaler_path, 'w') as f:
    json.dump(scaler_json, f, indent=2)

print(f"[SUCCESS] Scaler parameters saved to: {scaler_path}")
print(f"   - Computed from TRAIN SET only (no data leakage)")
print(f"   - Used for denormalizing predictions during inference")
print(f"   - Contains {len(scaler_params)} base features")

# Hi·ªÉn th·ªã v√≠ d·ª•
print(f"\n[METADATA] Example scaler params (from train set):")
example_cols = ["PM2_5", "temperature_2m", "wind_speed_10m"]
for col in example_cols:
    if col in scaler_params:
        params = scaler_params[col]
        print(f"  {col:20s}: min={params['min']:.2f}, max={params['max']:.2f}")


[SAVE] Step 4: Saving Scaler Parameters...
[KAGGLE] Kaggle mode: Saving to /kaggle/working/processed
[SUCCESS] Scaler parameters saved to: /kaggle/working/processed/scaler_params.json
   - Computed from TRAIN SET only (no data leakage)
   - Used for denormalizing predictions during inference
   - Contains 9 base features

[METADATA] Example scaler params (from train set):
  temperature_2m      : min=4.90, max=36.50
  wind_speed_10m      : min=0.00, max=82.30


In [None]:
df_train

DataFrame[location_id: string, location: string, datetime: timestamp, lat: double, lon: double, NO2: double, PM10: double, PM2_5: double, SO2: double, temperature_2m: double, relative_humidity_2m: double, wind_speed_10m: double, surface_pressure: double, precipitation: double, hour_sin: double, hour_cos: double, month_sin: double, month_cos: double, day_of_week_sin: double, day_of_week_cos: double, is_weekend: int, wind_direction_sin: double, wind_direction_cos: double, PM2_5_log: double, PM2_5_log_scaled: double, PM10_scaled: double, NO2_scaled: double, SO2_scaled: double, temperature_2m_scaled: double, relative_humidity_2m_scaled: double, wind_speed_10m_scaled: double, surface_pressure_scaled: double, precipitation_scaled: double]

In [None]:
# B∆∞·ªõc 5: T·∫°o Lag Features T·ª™ C√ÅC C·ªòT ƒê√É SCALE (CH·ªà CHO XGBOOST)
print(f"\n[PROCESSING] Step 5: Creating Lag Features FROM SCALED COLUMNS (XGBoost only)...")

# [WARNING] QUAN TR·ªåNG: Lag features ƒë∆∞·ª£c t·∫°o T·ª™ C√ÅC C·ªòT ƒê√É SCALE
# -> ƒê·∫£m b·∫£o lag v√† g·ªëc c√≥ C√ôNG SCALE PARAMETERS
# -> Gi·ªØ ƒë√∫ng m·ªëi quan h·ªá gi·ªØa gi√° tr·ªã hi·ªán t·∫°i v√† qu√° kh·ª©

LAG_STEPS = [1, 2, 3, 6, 12, 24]  # 1h, 2h, 3h, 6h, 12h, 24h tr∆∞·ªõc

# Columns c·∫ßn t·∫°o lag (s·ª≠ d·ª•ng b·∫£n SCALED)
# Note: Use PM2_5_log for lags as well (log transformed version)
lag_base_columns = ["PM2_5_log", "PM10", "NO2", "SO2", 
                    "temperature_2m", "relative_humidity_2m", 
                    "wind_speed_10m", "surface_pressure", "precipitation"]

print(f"\n[METADATA] Creating lag features:")
print(f"   Deep Learning models: No lags needed (learn from sequences)")
print(f"   XGBoost: {len(LAG_STEPS)} lags x {len(lag_base_columns)} variables = {len(LAG_STEPS) * len(lag_base_columns)} features")
print(f"   [SUCCESS] Using SCALED columns as source (proper scale relationship)")
print(f"   [NOTE] PM2_5 lags use log transformed version for consistency")

# Window cho t·ª´ng location (s·∫Øp x·∫øp theo th·ªùi gian)
w_lag = Window.partitionBy("location_id").orderBy("datetime")

# T·∫°o lag features cho t·ª´ng split (train, val, test)
def create_lag_features(df, lag_base_columns, lag_steps):
    """Create lag features from SCALED columns"""
    df_with_lags = df
    
    for col_name in lag_base_columns:
        col_scaled = f"{col_name}_scaled"
        
        if col_scaled in df.columns:
            for lag in lag_steps:
                lag_col_name = f"{col_name}_lag{lag}_scaled"
                
                # [SUCCESS] T·∫°o lag T·ª™ C·ªòT ƒê√É SCALE
                df_with_lags = df_with_lags.withColumn(
                    lag_col_name,
                    F.lag(col_scaled, lag).over(w_lag)
                )
    
    return df_with_lags

# Apply to all splits
print(f"\n[PROCESSING] Creating lag features for all splits...")
df_train = create_lag_features(df_train, lag_base_columns, LAG_STEPS)
df_val = create_lag_features(df_val, lag_base_columns, LAG_STEPS)
df_test = create_lag_features(df_test, lag_base_columns, LAG_STEPS)

print(f"  [OK] Train: {len(LAG_STEPS) * len(lag_base_columns)} lag features created")
print(f"  [OK] Val:   {len(LAG_STEPS) * len(lag_base_columns)} lag features created")
print(f"  [OK] Test:  {len(LAG_STEPS) * len(lag_base_columns)} lag features created")

# Trigger computation and cache
_ = df_train.count()
_ = df_val.count()
_ = df_test.count()

df_train = df_train.cache()
df_val = df_val.cache()
df_test = df_test.cache()

print(f"\n[SUCCESS] Lag features created successfully!")
print(f"   [SUCCESS] All lags created FROM SCALED columns")
print(f"   [SUCCESS] Lag and base features have SAME scale parameters")
print(f"   [SUCCESS] Proper temporal relationship preserved")

# ========================================
# X·ª¨ L√ù NULL VALUES TRONG LAG FEATURES
# ========================================
print(f"\n[PROCESSING] Handling null values in lag features...")

# T·∫°o list t·∫•t c·∫£ lag feature names
lag_feature_names = [f"{col}_lag{lag}_scaled" for col in lag_base_columns for lag in LAG_STEPS]

# ƒê·∫øm nulls TR∆Ø·ªöC khi x·ª≠ l√Ω
print(f"\n[DATA] Null counts BEFORE handling:")
sample_lag_features = lag_feature_names[:3]
for lag_col in sample_lag_features:
    if lag_col in df_train.columns:
        null_count = df_train.filter(F.col(lag_col).isNull()).count()
        total_count = df_train.count()
        print(f"  {lag_col:35s}: {null_count:8,} nulls ({null_count/total_count*100:.2f}%)")

print(f"\n[WARNING]  Reason: First {max(LAG_STEPS)} hours of each location have no previous data")
print(f"   Strategy: DROP records with ANY null lag feature")

# Track counts before drop
train_before = df_train.count()
val_before = df_val.count()
test_before = df_test.count()

# Function to drop nulls
def drop_lag_nulls(df, lag_features):
    """Drop records with any null lag feature"""
    df_clean = df
    for col in lag_features:
        if col in df.columns:
            df_clean = df_clean.filter(F.col(col).isNotNull())
    return df_clean

# Apply to all splits
print(f"\n[?]  Dropping records with null lag features...")
df_train_clean = drop_lag_nulls(df_train, lag_feature_names)
df_val_clean = drop_lag_nulls(df_val, lag_feature_names)
df_test_clean = drop_lag_nulls(df_test, lag_feature_names)

# Count after
train_after = df_train_clean.count()
val_after = df_val_clean.count()
test_after = df_test_clean.count()

# Cache cleaned datasets
df_train_clean = df_train_clean.cache()
df_val_clean = df_val_clean.cache()
df_test_clean = df_test_clean.cache()

# Unpersist old ones
df_train.unpersist()
df_val.unpersist()
df_test.unpersist()

# Reassign
df_train = df_train_clean
df_val = df_val_clean
df_test = df_test_clean

print(f"\n[DATA] Records dropped (null lag features):")
print(f"  [?] Train: {train_before:,} -> {train_after:,} (dropped {train_before - train_after:,}, {(train_before - train_after)/train_before*100:.2f}%)")
print(f"  [?] Val:   {val_before:,} -> {val_after:,} (dropped {val_before - val_after:,}, {(val_before - val_after)/val_before*100:.2f}%)")
print(f"  [?] Test:  {test_before:,} -> {test_after:,} (dropped {test_before - test_after:,}, {(test_before - test_after)/test_before*100:.2f}%)")

# Verify no nulls
print(f"\n[SUCCESS] Verification - checking for remaining nulls...")
sample_check = lag_feature_names[:3]
total_nulls_after = 0
for lag_col in sample_check:
    if lag_col in df_train.columns:
        null_count = df_train.filter(F.col(lag_col).isNull()).count()
        total_nulls_after += null_count
        status = "[SUCCESS]" if null_count == 0 else "[ERROR]"
        print(f"  {lag_col:35s}: {null_count:8,} nulls {status}")

if total_nulls_after == 0:
    print(f"\n[SUCCESS] All lag features are clean!")
else:
    print(f"\n[WARNING]  Still {total_nulls_after} nulls found!")

print(f"\n[SUCCESS] Lag features + Null handling completed!")
print(f"   - Created {len(lag_feature_names)} lag features FROM SCALED columns")
print(f"   - Lost only first {max(LAG_STEPS)} hours per location")
print(f"   - All lag features now have valid values")
print(f"   - Data quality ensured for XGBoost training")


[PROCESSING] Step 5: Creating Lag Features FROM SCALED COLUMNS (XGBoost only)...

[METADATA] Creating lag features:
   Deep Learning models: No lags needed (learn from sequences)
   XGBoost: 6 lags x 9 variables = 54 features
   [SUCCESS] Using SCALED columns as source (proper scale relationship)
   [NOTE] PM2_5 lags use log transformed version for consistency

[PROCESSING] Creating lag features for all splits...
  [OK] Train: 54 lag features created
  [OK] Val:   54 lag features created
  [OK] Test:  54 lag features created


                                                                                


[SUCCESS] Lag features created successfully!
   [SUCCESS] All lags created FROM SCALED columns
   [SUCCESS] Lag and base features have SAME scale parameters
   [SUCCESS] Proper temporal relationship preserved

[PROCESSING] Handling null values in lag features...

[DATA] Null counts BEFORE handling:


                                                                                

  PM2_5_log_lag1_scaled              :       14 nulls (0.01%)
  PM2_5_log_lag2_scaled              :       28 nulls (0.01%)
  PM2_5_log_lag3_scaled              :       42 nulls (0.02%)

   Strategy: DROP records with ANY null lag feature


                                                                                


[?]  Dropping records with null lag features...


                                                                                


[DATA] Records dropped (null lag features):
  [?] Train: 202,154 -> 201,818 (dropped 336, 0.17%)
  [?] Val:   43,397 -> 43,061 (dropped 336, 0.77%)
  [?] Test:  43,606 -> 43,270 (dropped 336, 0.77%)

[SUCCESS] Verification - checking for remaining nulls...


                                                                                

  PM2_5_log_lag1_scaled              :        0 nulls [SUCCESS]
  PM2_5_log_lag2_scaled              :        0 nulls [SUCCESS]
  PM2_5_log_lag3_scaled              :        0 nulls [SUCCESS]

[SUCCESS] All lag features are clean!

[SUCCESS] Lag features + Null handling completed!
   - Created 54 lag features FROM SCALED columns
   - Lost only first 24 hours per location
   - All lag features now have valid values
   - Data quality ensured for XGBoost training


In [None]:
# B∆∞·ªõc 6: Chu·∫©n b·ªã Features cho t·ª´ng Model
print("\n[PROCESSING] Step 6: Preparing Model-Specific Features...")

# ========================================
# FEATURES CHO DEEP LEARNING MODELS (CNN1D-BLSTM, LSTM)
# ========================================
# Kh√¥ng c·∫ßn lag features v√¨ models t·ª± h·ªçc temporal patterns t·ª´ sequences

dl_input_features = []

# 1. Pollutants scaled (tr·ª´ PM2_5 - ƒë√¢y l√† target)
dl_input_features.extend(["PM10_scaled", "NO2_scaled", "SO2_scaled"])

# 2. Weather features scaled (core features)
dl_input_features.extend([
    "temperature_2m_scaled", "relative_humidity_2m_scaled",
    "wind_speed_10m_scaled", "surface_pressure_scaled", "precipitation_scaled"  # ‚úÖ Added surface_pressure
])

# 3. Time features (cyclic encoding - ƒë√£ ·ªü d·∫°ng sin/cos trong [-1, 1])
dl_input_features.extend([
    "hour_sin", "hour_cos", 
    "month_sin", "month_cos",
    "day_of_week_sin", "day_of_week_cos",
    "wind_direction_sin", "wind_direction_cos"
])

# 4. Time features (binary)
dl_input_features.extend(["is_weekend"])

print(f"[MODEL] DEEP LEARNING Features: {len(dl_input_features)} features")
print(f"   - Current pollutants (scaled): 3")
print(f"   - Weather (scaled): 5") 
print(f"   - Time (cyclic): 6")
print(f"   - Time (binary): 1")
print(f"   - NO LAG FEATURES (models learn from sequences)")

# ========================================  
# FEATURES CHO XGBOOST
# ========================================
# C·∫ßn lag features v√¨ kh√¥ng c√≥ kh·∫£ nƒÉng x·ª≠ l√Ω sequences

xgb_input_features = dl_input_features.copy()  # Start with DL features

# Th√™m lag features CH·ªà CHO XGBOOST (ƒë√£ ƒë∆∞·ª£c t·∫°o t·ª´ scaled columns)
for col_name in lag_base_columns:
    for lag in LAG_STEPS:
        lag_col_name = f"{col_name}_lag{lag}_scaled"
        xgb_input_features.append(lag_col_name)

print(f"\n[DATA] XGBOOST Features: {len(xgb_input_features)} features")
print(f"   - Deep Learning base features: {len(dl_input_features)}")
print(f"   - Lag features (from scaled columns): {len(lag_base_columns) * len(LAG_STEPS)}")
print(f"   - Total: {len(xgb_input_features)} features")

# Target variable (log transformed and scaled for better training)
# Using PM2_5_log_scaled instead of PM2_5_scaled to handle skewness
target_feature = "PM2_5_log_scaled"

print(f"\n[SUCCESS] Model-specific features prepared:")
print(f"  [MODEL] CNN1D-BLSTM-Attention: {len(dl_input_features)} features")
print(f"  [MODEL] LSTM: {len(dl_input_features)} features")  
print(f"  [DATA] XGBoost: {len(xgb_input_features)} features")
print(f"  [TARGET] Target: {target_feature} (log transformed)")

# [WARNING] CRITICAL: Verify ALL columns exist
missing_dl = [col for col in dl_input_features if col not in df_train.columns]
missing_xgb = [col for col in xgb_input_features if col not in df_train.columns]
missing_target = target_feature not in df_train.columns

if missing_dl or missing_xgb or missing_target:
    print(f"\n[ERROR] MISSING COLUMNS DETECTED:")
    if missing_dl: 
        print(f"  DL models: {missing_dl}")
    if missing_xgb: 
        print(f"  XGBoost: {missing_xgb[:5]}...")  # Show first 5
    if missing_target:
        print(f"  Target: {target_feature}")
    
    print(f"\n[WARNING]  Available scaled columns:")
    scaled_cols = [c for c in df_train.columns if c.endswith('_scaled')]
    print(f"  {scaled_cols[:10]}...")
    
    raise ValueError("Missing required feature columns! Check normalization step.")
else:
    print(f"\n[SUCCESS] All feature columns exist in datasets!")

dl_input_features.extend(["PM2_5_log_scaled"])


[PROCESSING] Step 6: Preparing Model-Specific Features...
[MODEL] DEEP LEARNING Features: 17 features
   - Current pollutants (scaled): 3
   - Weather (scaled): 5
   - Time (cyclic): 6
   - Time (binary): 1
   - NO LAG FEATURES (models learn from sequences)

[DATA] XGBOOST Features: 71 features
   - Deep Learning base features: 17
   - Lag features (from scaled columns): 54
   - Total: 71 features

[SUCCESS] Model-specific features prepared:
  [MODEL] CNN1D-BLSTM-Attention: 17 features
  [MODEL] LSTM: 17 features
  [DATA] XGBoost: 71 features
  [TARGET] Target: PM2_5_log_scaled (log transformed)

[SUCCESS] All feature columns exist in datasets!


In [None]:
# B∆∞·ªõc 7: Prepare Final Model Datasets
print("\n[PROCESSING] Step 7: Preparing Final Model-Specific Datasets...")

# ========================================
# DEEP LEARNING DATASETS (CNN1D-BLSTM & LSTM)
# ========================================
# Kh√¥ng c·∫ßn lag features, ch·ªâ c·∫ßn base features + time features

print(f"\n[MODEL] Deep Learning datasets (no lag features):")

# Select only DL features + target
dl_train = df_train.select("location_id", "datetime", target_feature, *dl_input_features)
dl_val = df_val.select("location_id", "datetime", target_feature, *dl_input_features)
dl_test = df_test.select("location_id", "datetime", target_feature, *dl_input_features)

# Cache
dl_train = dl_train.cache()
dl_val = dl_val.cache()
dl_test = dl_test.cache()

dl_train_count = dl_train.count()
dl_val_count = dl_val.count()
dl_test_count = dl_test.count()

print(f"  [OK] Train: {dl_train_count:,} records, {len(dl_input_features)} features")
print(f"  [OK] Val:   {dl_val_count:,} records, {len(dl_input_features)} features")
print(f"  [OK] Test:  {dl_test_count:,} records, {len(dl_input_features)} features")

# ========================================
# XGBOOST DATASETS
# ========================================
# C·∫ßn c·∫£ base features + lag features

print(f"\n[DATA] XGBoost datasets (with lag features):")

# Select XGB features + target
xgb_train = df_train.select("location_id", "datetime", target_feature, *xgb_input_features)
xgb_val = df_val.select("location_id", "datetime", target_feature, *xgb_input_features)
xgb_test = df_test.select("location_id", "datetime", target_feature, *xgb_input_features)

# Cache
xgb_train = xgb_train.cache()
xgb_val = xgb_val.cache()
xgb_test = xgb_test.cache()

xgb_train_count = xgb_train.count()
xgb_val_count = xgb_val.count()
xgb_test_count = xgb_test.count()

print(f"  [OK] Train: {xgb_train_count:,} records, {len(xgb_input_features)} features")
print(f"  [OK] Val:   {xgb_val_count:,} records, {len(xgb_input_features)} features")
print(f"  [OK] Test:  {xgb_test_count:,} records, {len(xgb_input_features)} features")

print(f"\n[SUCCESS] Final datasets prepared!")
print(f"   [MODEL] Deep Learning: {len(dl_input_features)} features (no lags)")
print(f"   [DATA] XGBoost: {len(xgb_input_features)} features (with {len(lag_base_columns) * len(LAG_STEPS)} lags)")
print(f"   [TARGET] Target: {target_feature}")
print(f"   [SUCCESS] All datasets cleaned and ready for training!")


[PROCESSING] Step 7: Preparing Final Model-Specific Datasets...

[MODEL] Deep Learning datasets (no lag features):


                                                                                

  [OK] Train: 201,818 records, 18 features
  [OK] Val:   43,061 records, 18 features
  [OK] Test:  43,270 records, 18 features

[DATA] XGBoost datasets (with lag features):




  [OK] Train: 201,818 records, 71 features
  [OK] Val:   43,061 records, 71 features
  [OK] Test:  43,270 records, 71 features

[SUCCESS] Final datasets prepared!
   [MODEL] Deep Learning: 18 features (no lags)
   [DATA] XGBoost: 71 features (with 54 lags)
   [TARGET] Target: PM2_5_log_scaled
   [SUCCESS] All datasets cleaned and ready for training!


                                                                                

In [None]:
# B∆∞·ªõc 8: Feature Engineering Summary + Metadata Saving
print("\n" + "="*80)
print("[DATA] FEATURE ENGINEERING PIPELINE SUMMARY")
print("="*80)

print(f"\n[SUCCESS] PIPELINE EXECUTION ORDER (Correct - No Data Leakage):")
print(f"   [1] Time Features -> Added cyclic (sin/cos) + is_weekend")
print(f"   [2] Temporal Split -> 70% train / 15% val / 15% test")
print(f"   [3] Normalization -> Min-Max [0,1] using TRAIN statistics ONLY")
print(f"   [4] Lag Features + Null Handling -> Created FROM SCALED columns, dropped nulls")
print(f"   [5] Scaler Params -> Saved for inference")
print(f"   [6] Model Features -> Prepared for Deep Learning & XGBoost")
print(f"   [7] Final Datasets -> Ready for training")

print(f"\n[DATA] DATASET STATISTICS:")
print(f"   Total records: {dl_train_count + dl_val_count + dl_test_count:,}")
print(f"   Total locations: {df_train.select('location_id').distinct().count()}")

print(f"\n[METADATA] FEATURE BREAKDOWN:")
print(f"   [MODEL] Deep Learning (CNN1D-BLSTM & LSTM): {len(dl_input_features)} features")
print(f"      ‚îú‚îÄ Pollutants (scaled): 3 (PM10, NO2, SO2)")
print(f"      ‚îú‚îÄ Weather (scaled): 5 (temp, humidity, wind, precipitation)")
print(f"      ‚îú‚îÄ Time (cyclic): 6 (hour, month, day_of_week -> sin/cos)")
print(f"      ‚îî‚îÄ Time (binary): 1 (is_weekend)")
print(f"   ")
print(f"   [DATA] XGBoost: {len(xgb_input_features)} features")
print(f"      ‚îú‚îÄ Deep Learning features: {len(dl_input_features)}")
print(f"      ‚îî‚îÄ Lag features: {len(lag_base_columns) * len(LAG_STEPS)} ({len(lag_base_columns)} vars √ó {len(LAG_STEPS)} lags)")

print(f"\n[TARGET] TARGET VARIABLE:")
print(f"   {target_feature} (normalized PM2.5 in [0, 1])")

print(f"\n[SUCCESS] DATA QUALITY CHECKS:")
print(f"   [OK] No missing values in target")
print(f"   [OK] No missing values in features")
print(f"   [OK] No outliers (removed by WHO/EPA standards)")
print(f"   [OK] Proper temporal ordering")
print(f"   [OK] STRATIFIED split (all seasons in each split)")
print(f"   [OK] Correct scale relationship (lag from scaled columns)")
print(f"   [OK] No nulls in lag features (first {max(LAG_STEPS)}h dropped)")
print(f"   [OK] Log transform applied to target (reduced skewness)")

print(f"\n[SAVE] SAVED ARTIFACTS:")
print(f"   [FILES] scaler_params.json -> Min-Max parameters (train set only)")
print(f"   [FILES] feature_metadata.json -> Feature lists & configuration")

print(f"\n[RUN] READY FOR NEXT PHASE:")
print(f"   Variables in memory:")
print(f"   - Deep Learning: dl_train, dl_val, dl_test")
print(f"   - XGBoost: xgb_train, xgb_val, xgb_test")
print(f"   Next step: Sequence creation for Deep Learning models")

print("="*80)

# ========================================
# SAVE FEATURE METADATA
# ========================================
# L∆∞u metadata v·ªÅ feature engineering ƒë·ªÉ tham kh·∫£o trong t∆∞∆°ng lai

import json
from pathlib import Path

# Metadata cho feature engineering
dataset_metadata = {
    "project": "PM2.5 Prediction",
    "preprocessing_version": "3.0_stratified_split",
    "pipeline_order": [
        "Time Features (cyclic encoding)",
        "STRATIFIED Temporal Split (70/15/15 per month)",
        "Log Transform (target PM2.5)",
        "Normalization (train stats only)",
        "Lag Features (from scaled columns)",
        "Null Handling (drop first 24h per location)"
    ],
    "split_strategy": {
        "method": "stratified_temporal",
        "description": "Each month split 70/15/15, ensures all seasons in each split",
        "ratios": {"train": 0.70, "val": 0.15, "test": 0.15}
    },
    "deep_learning_features": dl_input_features,
    "xgboost_features": xgb_input_features,
    "target_feature": target_feature,
    "target_transform": "log1p",
    "lag_config": {
        "lag_steps": LAG_STEPS,
        "lag_base_columns": lag_base_columns,
        "total_lag_features": len(lag_base_columns) * len(LAG_STEPS)
    },
    "dataset_counts": {
        "dl_train": dl_train_count,
        "dl_val": dl_val_count,
        "dl_test": dl_test_count,
        "xgb_train": xgb_train_count,
        "xgb_val": xgb_val_count,
        "xgb_test": xgb_test_count
    },
    "total_features": {
        "deep_learning": len(dl_input_features),
        "xgboost": len(xgb_input_features)
    }
}

# ========================================
# ADAPTIVE PATH (Kaggle vs Colab vs Local)
# ========================================
if IN_KAGGLE:
    # [KAGGLE] Kaggle: Write to /kaggle/working (auto-saved on commit)
    processed_dir = Path("/kaggle/working/processed")
    print(f"\n[KAGGLE] Kaggle mode: Saving metadata to {processed_dir}")
    
elif IN_COLAB:
    # [COLAB] Colab: Write to Google Drive
    processed_dir = Path("/content/drive/MyDrive/pm25-data/processed")
    print(f"\n[COLAB] Colab mode: Saving metadata to Google Drive")
    
else:
    # [LOCAL] Local: Write to project folder
    processed_dir = Path("../data/processed")
    print(f"\n[LOCAL] Local mode: Saving metadata to {processed_dir}")

# T·∫°o th∆∞ m·ª•c v·ªõi parents=True (t·∫°o c·∫£ parent directories n·∫øu ch∆∞a c√≥)
processed_dir.mkdir(parents=True, exist_ok=True)

# L∆∞u metadata
metadata_path = processed_dir / "feature_metadata.json"
with open(metadata_path, 'w') as f:
    json.dump(dataset_metadata, f, indent=2)

print(f"\n[SAVE] Feature metadata saved to: {metadata_path}")
print(f"   [SUCCESS] Pipeline version: 2.0 (refactored - no data leakage)")
print(f"   [SUCCESS] Contains: feature lists, lag config, split info, dataset counts")


[DATA] FEATURE ENGINEERING PIPELINE SUMMARY

[SUCCESS] PIPELINE EXECUTION ORDER (Correct - No Data Leakage):
   [1] Time Features -> Added cyclic (sin/cos) + is_weekend
   [2] Temporal Split -> 70% train / 15% val / 15% test
   [3] Normalization -> Min-Max [0,1] using TRAIN statistics ONLY
   [4] Lag Features + Null Handling -> Created FROM SCALED columns, dropped nulls
   [5] Scaler Params -> Saved for inference
   [6] Model Features -> Prepared for Deep Learning & XGBoost
   [7] Final Datasets -> Ready for training

[DATA] DATASET STATISTICS:
   Total records: 288,149
   Total locations: 14

[METADATA] FEATURE BREAKDOWN:
   [MODEL] Deep Learning (CNN1D-BLSTM & LSTM): 18 features
      ‚îú‚îÄ Pollutants (scaled): 3 (PM10, NO2, SO2)
      ‚îú‚îÄ Weather (scaled): 5 (temp, humidity, wind, precipitation)
      ‚îú‚îÄ Time (cyclic): 6 (hour, month, day_of_week -> sin/cos)
      ‚îî‚îÄ Time (binary): 1 (is_weekend)
   
   [DATA] XGBoost: 71 features
      ‚îú‚îÄ Deep Learning features: 18

In [None]:
# B∆∞·ªõc 9: Create Sequence Data for Deep Learning Models
print("\n[PROCESSING] Step 9: Creating Sequence Data for Deep Learning Models...")

import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import ArrayType, DoubleType

# Sequence configuration (optimized for Colab)
CNN_SEQUENCE_LENGTH = 48  # Optimal for long-term patterns
LSTM_SEQUENCE_LENGTH = 24  # Optimal for medium-term patterns

print(f"[GEAR]  Sequence Configuration:")
print(f"   - CNN1D-BLSTM-Attention: {CNN_SEQUENCE_LENGTH} timesteps")
print(f"   - LSTM: {LSTM_SEQUENCE_LENGTH} timesteps")

def create_sequences_optimized(df, feature_cols, target_col, sequence_length):
    """
    Optimized sequence creation with checkpointing to avoid StackOverflow
    
    [TARGET] Key Strategy:
    - Batch processing to avoid deep logical plans
    - Checkpoint after each batch to reset plan depth
    - Use broadcast joins for efficiency
    - Single final filter for null handling
    
    [?] Null Handling (2-Layer Protection):
    Layer 1: Drop first N records/location (incomplete history)
    Layer 2: Filter ANY null in sequences (data gaps)
    Result: 100% clean sequences with ZERO nulls
    """
    print(f"    Creating {sequence_length}-step sequences...")
    
    window_spec = Window.partitionBy("location_id").orderBy("datetime")
    
    # ========================================
    # LAYER 1: Drop first N records (incomplete history)
    # ========================================
    # df_base = df.select("location_id", "datetime", target_col, *feature_cols) \
    #             .repartition(4, "location_id") \
    #             .withColumn("row_num", F.row_number().over(window_spec)) \
    #             .filter(F.col("row_num") > sequence_length) \
    #             .drop("row_num") \
    #             .cache()
    df_base = df
    
    records_after_layer1 = df_base.count()  # Materialize
    print(f"      [?]  Layer 1: Dropped first {sequence_length} records/location")
    print(f"         Records: {records_after_layer1:,}")
    
    # ========================================
    # BATCH PROCESSING (ÈÅøÂÖç StackOverflow)
    # ========================================
    # Chia features th√†nh batches nh·ªè ƒë·ªÉ tr√°nh logical plan qu√° s√¢u
    BATCH_SIZE = 4  # M·ªói batch x·ª≠ l√Ω 4 features (4 √ó 48 lags = 192 ops - safe)
    feature_batches = [feature_cols[i:i+BATCH_SIZE] for i in range(0, len(feature_cols), BATCH_SIZE)]
    
    print(f"        [INSTALL] Processing {len(feature_batches)} batches ({len(feature_cols)} features)...")
    
    base_cols = ["location_id", "datetime"]
    result_df = df_base.select(*base_cols)
    
    for batch_idx, batch_features in enumerate(feature_batches, 1):
        print(f"           Batch {batch_idx}/{len(feature_batches)}: {len(batch_features)} features")
        
        # T·∫°o batch DataFrame
        batch_df = df_base.select(*base_cols, *batch_features)
        
        # T·∫°o sequences cho batch n√†y
        for col_name in batch_features:
            # T·∫°o array of lags [t-N, ...,t-2, t-1]
            lag_exprs = [F.lag(col_name, step).over(window_spec)
             for step in range(sequence_length, 0, -1)]  # ‚úÖ ƒê·∫£o ng∆∞·ª£c: N -> 1
            batch_df = batch_df.withColumn(f"{col_name}_sequence", F.array(*lag_exprs))
        
        # Select ch·ªâ sequence columns
        sequence_cols = [f"{col}_sequence" for col in batch_features]
        batch_df = batch_df.select(*base_cols, *sequence_cols).cache()
        batch_df.count()  # Materialize ƒë·ªÉ reset logical plan
        
        # Join v√†o result
        result_df = result_df.join(batch_df, base_cols, "inner")
        
        # Unpersist batch (gi·∫£i ph√≥ng memory)
        batch_df.unpersist()
    
    # ========================================
    # LAYER 2: Filter nulls in sequences
    # ========================================
    print(f"        [?] Filtering null sequences...")
    
    all_sequence_cols = [f"{col}_sequence" for col in feature_cols]

    # H√†m ki·ªÉm tra ch·∫∑t ch·∫Ω t·ª´ng c·ªôt sequence
    def get_valid_sequence_condition(col_name):
        col = F.col(col_name)
        # ƒêi·ªÅu ki·ªán 1: B·∫£n th√¢n m·∫£ng kh√¥ng ƒë∆∞·ª£c null v√† ph·∫£i ƒë·ªß k√≠ch th∆∞·ªõc
        basic_cond = col.isNotNull() & (F.size(col) == sequence_length)
        
        # ƒêi·ªÅu ki·ªán 2 (QUAN TR·ªåNG NH·∫§T): Duy·ªát t·ª´ng ph·∫ßn t·ª≠ x trong m·∫£ng
        # x ph·∫£i NOT NULL v√† x ph·∫£i NOT NaN
        # H√†m forall c√≥ t·ª´ Spark 3.1+
        element_check = F.forall(col, lambda x: x.isNotNull() & (~F.isnan(x)))
        
        return basic_cond & element_check
    
    # Build null filter: ALL sequences must be NOT NULL
    from functools import reduce
    final_filter = reduce(
        lambda acc, col_name: acc & get_valid_sequence_condition(col_name),
        all_sequence_cols,
        F.lit(True)
    )
    
    # √Åp d·ª•ng l·ªçc
    result_df = result_df.filter(final_filter)
    
    # T√≠nh to√°n s·ªë l∆∞·ª£ng sau khi l·ªçc
    records_after_layer2 = result_df.count()
    dropped = records_after_layer1 - records_after_layer2
    
    if dropped > 0:
        print(f"      [?]  Layer 2: Dropped {dropped:,} records containing NaN/Null inside sequences")
    else:
        print(f"      [SUCCESS] Layer 2: Data clean, no gaps found")
    # ========================================
    # FINAL: Add target and clean up
    # ========================================
    result_df = result_df.join(
        df_base.select("location_id", "datetime", target_col),
        ["location_id", "datetime"],
        "inner"
    ).filter(F.col(target_col).isNotNull()) \
     .withColumnRenamed(target_col, "target_value") \
     .cache()
    
    final_count = result_df.count()
    retention_rate = (final_count / records_after_layer1) * 100
    
    print(f"      [SUCCESS] Final: {final_count:,} records ({retention_rate:.1f}% retained)")
    
    # Cleanup
    df_base.unpersist()
    
    return result_df

print("\n[DATA] Creating sequences for each model...")

# Create CNN1D-BLSTM sequences
print(f"\n[MODEL] CNN1D-BLSTM-Attention ({CNN_SEQUENCE_LENGTH} timesteps):")
try:
    cnn_train_clean = create_sequences_optimized(dl_train, dl_input_features, target_feature, CNN_SEQUENCE_LENGTH)
    cnn_val_clean = create_sequences_optimized(dl_val, dl_input_features, target_feature, CNN_SEQUENCE_LENGTH)
    cnn_test_clean = create_sequences_optimized(dl_test, dl_input_features, target_feature, CNN_SEQUENCE_LENGTH)
    print(f"    [SUCCESS] CNN sequences created successfully")
except Exception as e:
    print(f"    [ERROR] CNN sequence creation failed: {str(e)[:100]}...")
    cnn_train_clean = cnn_val_clean = cnn_test_clean = None

# Create LSTM sequences  
print(f"\n[PROCESSING] LSTM ({LSTM_SEQUENCE_LENGTH} timesteps):")
try:
    lstm_train_clean = create_sequences_optimized(dl_train, dl_input_features, target_feature, LSTM_SEQUENCE_LENGTH)
    lstm_val_clean = create_sequences_optimized(dl_val, dl_input_features, target_feature, LSTM_SEQUENCE_LENGTH)
    lstm_test_clean = create_sequences_optimized(dl_test, dl_input_features, target_feature, LSTM_SEQUENCE_LENGTH)
    print(f"    [SUCCESS] LSTM sequences created successfully")
except Exception as e:
    print(f"    [ERROR] LSTM sequence creation failed: {str(e)[:100]}...")
    lstm_train_clean = lstm_val_clean = lstm_test_clean = None

print(f"\n[SUCCESS] Sequence data preparation completed!")
print(f"\n[METADATA] Data Quality Guarantee:")
print(f"   [OK] Layer 1: No incomplete history (first {CNN_SEQUENCE_LENGTH}/{LSTM_SEQUENCE_LENGTH} records dropped)")
print(f"   [OK] Layer 2: No data gaps in middle (nulls filtered out)")
print(f"   [OK] Result: 100% clean sequences with ZERO nulls")
print(f"   [OK] Ready for high-quality model training!")


[PROCESSING] Step 9: Creating Sequence Data for Deep Learning Models...
[GEAR]  Sequence Configuration:
   - CNN1D-BLSTM-Attention: 48 timesteps
   - LSTM: 24 timesteps

[DATA] Creating sequences for each model...

[MODEL] CNN1D-BLSTM-Attention (48 timesteps):
    Creating 48-step sequences...
      [?]  Layer 1: Dropped first 48 records/location
         Records: 201,818
        [INSTALL] Processing 5 batches (18 features)...
           Batch 1/5: 4 features


                                                                                

           Batch 2/5: 4 features


                                                                                

           Batch 3/5: 4 features


                                                                                

           Batch 4/5: 4 features


                                                                                

           Batch 5/5: 2 features


                                                                                

        [?] Filtering null sequences...


                                                                                

      [?]  Layer 2: Dropped 672 records containing NaN/Null inside sequences


25/12/01 16:21:14 WARN DAGScheduler: Broadcasting large task binary with size 1666.9 KiB
25/12/01 16:21:21 WARN DAGScheduler: Broadcasting large task binary with size 1672.3 KiB
                                                                                

      [SUCCESS] Final: 201,146 records (99.7% retained)
    Creating 48-step sequences...
      [?]  Layer 1: Dropped first 48 records/location
         Records: 43,061
        [INSTALL] Processing 5 batches (18 features)...
           Batch 1/5: 4 features


                                                                                

           Batch 2/5: 4 features


                                                                                

           Batch 3/5: 4 features


                                                                                

           Batch 4/5: 4 features


                                                                                

           Batch 5/5: 2 features


                                                                                

        [?] Filtering null sequences...


                                                                                

      [?]  Layer 2: Dropped 672 records containing NaN/Null inside sequences


                                                                                

      [SUCCESS] Final: 42,389 records (98.4% retained)
    Creating 48-step sequences...
      [?]  Layer 1: Dropped first 48 records/location
         Records: 43,270
        [INSTALL] Processing 5 batches (18 features)...
           Batch 1/5: 4 features


                                                                                

           Batch 2/5: 4 features


                                                                                

           Batch 3/5: 4 features


                                                                                

           Batch 4/5: 4 features


                                                                                

           Batch 5/5: 2 features


                                                                                

        [?] Filtering null sequences...


                                                                                

      [?]  Layer 2: Dropped 672 records containing NaN/Null inside sequences


                                                                                

      [SUCCESS] Final: 42,598 records (98.4% retained)
    [SUCCESS] CNN sequences created successfully

[PROCESSING] LSTM (24 timesteps):
    Creating 24-step sequences...
      [?]  Layer 1: Dropped first 24 records/location
         Records: 201,818
        [INSTALL] Processing 5 batches (18 features)...
           Batch 1/5: 4 features


                                                                                

           Batch 2/5: 4 features


                                                                                

           Batch 3/5: 4 features


                                                                                

           Batch 4/5: 4 features


                                                                                

           Batch 5/5: 2 features


                                                                                

        [?] Filtering null sequences...


                                                                                

      [?]  Layer 2: Dropped 336 records containing NaN/Null inside sequences


25/12/01 16:26:34 WARN DAGScheduler: Broadcasting large task binary with size 1265.1 KiB
25/12/01 16:26:43 WARN DAGScheduler: Broadcasting large task binary with size 1270.4 KiB
                                                                                

      [SUCCESS] Final: 201,482 records (99.8% retained)
    Creating 24-step sequences...
      [?]  Layer 1: Dropped first 24 records/location
         Records: 43,061
        [INSTALL] Processing 5 batches (18 features)...
           Batch 1/5: 4 features


                                                                                

           Batch 2/5: 4 features


                                                                                

           Batch 3/5: 4 features


                                                                                

           Batch 4/5: 4 features


                                                                                

           Batch 5/5: 2 features


                                                                                

        [?] Filtering null sequences...


                                                                                

      [?]  Layer 2: Dropped 336 records containing NaN/Null inside sequences


                                                                                

      [SUCCESS] Final: 42,725 records (99.2% retained)
    Creating 24-step sequences...
      [?]  Layer 1: Dropped first 24 records/location
         Records: 43,270
        [INSTALL] Processing 5 batches (18 features)...
           Batch 1/5: 4 features


                                                                                

           Batch 2/5: 4 features


                                                                                

           Batch 3/5: 4 features


                                                                                

           Batch 4/5: 4 features


                                                                                

           Batch 5/5: 2 features


                                                                                

        [?] Filtering null sequences...


                                                                                

      [?]  Layer 2: Dropped 336 records containing NaN/Null inside sequences




      [SUCCESS] Final: 42,934 records (99.2% retained)
    [SUCCESS] LSTM sequences created successfully

[SUCCESS] Sequence data preparation completed!

[METADATA] Data Quality Guarantee:
   [OK] Layer 1: No incomplete history (first 48/24 records dropped)
   [OK] Layer 2: No data gaps in middle (nulls filtered out)
   [OK] Result: 100% clean sequences with ZERO nulls
   [OK] Ready for high-quality model training!


                                                                                

In [None]:
# Step 10: Export Final Datasets to Local then Upload to HDFS
print("\n[SAVE] Step 10: Saving Processed Data...")
print("="*60)

import json

# Local output path for staging
LOCAL_PROCESSED_PATH = os.path.join(TEMP_DIR, "processed")
os.makedirs(LOCAL_PROCESSED_PATH, exist_ok=True)

print(f"[LOCAL] Staging directory: {LOCAL_PROCESSED_PATH}")
print(f"[HDFS] Final destination: {HDFS_PROCESSED_DATA_PATH}")

# Check dataset availability
datasets_ready = {
    "cnn": cnn_train_clean is not None and cnn_val_clean is not None and cnn_test_clean is not None,
    "lstm": lstm_train_clean is not None and lstm_val_clean is not None and lstm_test_clean is not None,
    "xgb": xgb_train is not None and xgb_val is not None and xgb_test is not None
}

print(f"\n[DATA] Dataset Status:")
for model, ready in datasets_ready.items():
    model_name = {"cnn": "CNN1D-BLSTM", "lstm": "LSTM", "xgb": "XGBoost"}[model]
    status = "‚úì Ready" if ready else "‚úó Not Ready"
    print(f"  {model_name}: {status}")

# ========================================
# SAVE DATASETS TO LOCAL PARQUET
# ========================================
print(f"\n[SAVE] Saving datasets to local Parquet files...")

export_summary = {
    "cnn": {"train": 0, "val": 0, "test": 0},
    "lstm": {"train": 0, "val": 0, "test": 0},
    "xgb": {"train": 0, "val": 0, "test": 0}
}

# Save CNN1D-BLSTM datasets
if datasets_ready["cnn"]:
    print(f"\n  [CNN] Saving CNN1D-BLSTM datasets...")
    cnn_local_path = os.path.join(LOCAL_PROCESSED_PATH, "cnn_sequences")
    
    cnn_train_clean.coalesce(4).write.mode("overwrite").parquet(f"{cnn_local_path}/train")
    cnn_val_clean.coalesce(2).write.mode("overwrite").parquet(f"{cnn_local_path}/val")
    cnn_test_clean.coalesce(2).write.mode("overwrite").parquet(f"{cnn_local_path}/test")
    
    export_summary["cnn"]["train"] = cnn_train_clean.count()
    export_summary["cnn"]["val"] = cnn_val_clean.count()
    export_summary["cnn"]["test"] = cnn_test_clean.count()
    
    print(f"     ‚úì Saved locally: {cnn_local_path}")
    print(f"       - train: {export_summary['cnn']['train']:,} records")
    print(f"       - val:   {export_summary['cnn']['val']:,} records")
    print(f"       - test:  {export_summary['cnn']['test']:,} records")

# Save LSTM datasets
if datasets_ready["lstm"]:
    print(f"\n  [LSTM] Saving LSTM datasets...")
    lstm_local_path = os.path.join(LOCAL_PROCESSED_PATH, "lstm_sequences")
    
    lstm_train_clean.coalesce(4).write.mode("overwrite").parquet(f"{lstm_local_path}/train")
    lstm_val_clean.coalesce(2).write.mode("overwrite").parquet(f"{lstm_local_path}/val")
    lstm_test_clean.coalesce(2).write.mode("overwrite").parquet(f"{lstm_local_path}/test")
    
    export_summary["lstm"]["train"] = lstm_train_clean.count()
    export_summary["lstm"]["val"] = lstm_val_clean.count()
    export_summary["lstm"]["test"] = lstm_test_clean.count()
    
    print(f"     ‚úì Saved locally: {lstm_local_path}")
    print(f"       - train: {export_summary['lstm']['train']:,} records")
    print(f"       - val:   {export_summary['lstm']['val']:,} records")
    print(f"       - test:  {export_summary['lstm']['test']:,} records")

# Save XGBoost datasets
if datasets_ready["xgb"]:
    print(f"\n  [XGB] Saving XGBoost datasets...")
    xgb_local_path = os.path.join(LOCAL_PROCESSED_PATH, "xgboost")
    
    xgb_train.coalesce(4).write.mode("overwrite").parquet(f"{xgb_local_path}/train")
    xgb_val.coalesce(2).write.mode("overwrite").parquet(f"{xgb_local_path}/val")
    xgb_test.coalesce(2).write.mode("overwrite").parquet(f"{xgb_local_path}/test")
    
    export_summary["xgb"]["train"] = xgb_train.count()
    export_summary["xgb"]["val"] = xgb_val.count()
    export_summary["xgb"]["test"] = xgb_test.count()
    
    print(f"     ‚úì Saved locally: {xgb_local_path}")
    print(f"       - train: {export_summary['xgb']['train']:,} records")
    print(f"       - val:   {export_summary['xgb']['val']:,} records")
    print(f"       - test:  {export_summary['xgb']['test']:,} records")

# ========================================
# SAVE METADATA TO LOCAL
# ========================================
print(f"\n[SAVE] Saving metadata files...")

# Create comprehensive metadata
final_metadata = {
    "project": "PM2.5 Prediction with HDFS",
    "preprocessing_completed": True,
    "export_timestamp": str(pd.Timestamp.now()),
    "environment": "local_hdfs_hybrid",
    "hdfs_destination": HDFS_PROCESSED_DATA_PATH,
    "models": {
        "cnn1d_blstm": {
            "sequence_length": CNN_SEQUENCE_LENGTH,
            "features": len(dl_input_features),
            "ready": datasets_ready["cnn"],
            "record_counts": export_summary["cnn"]
        },
        "lstm": {
            "sequence_length": LSTM_SEQUENCE_LENGTH, 
            "features": len(dl_input_features),
            "ready": datasets_ready["lstm"],
            "record_counts": export_summary["lstm"]
        },
        "xgboost": {
            "features": len(xgb_input_features),
            "lag_steps": LAG_STEPS,
            "ready": datasets_ready["xgb"],
            "record_counts": export_summary["xgb"]
        }
    },
    "feature_details": {
        "deep_learning_features": dl_input_features,
        "xgboost_features": xgb_input_features,
        "target": target_feature
    },
    "data_format": "parquet",
    "null_handling": {
        "strategy": "2-layer protection",
        "layer1": f"Dropped first {CNN_SEQUENCE_LENGTH}/{LSTM_SEQUENCE_LENGTH} records per location",
        "layer2": "Filtered records with nulls in sequence history"
    }
}

# Save metadata as JSON
metadata_path = os.path.join(LOCAL_PROCESSED_PATH, "datasets_ready.json")
with open(metadata_path, 'w') as f:
    json.dump(final_metadata, f, indent=2)
print(f"   ‚úì Metadata saved: datasets_ready.json")

# Save scaler params
scaler_json = {
    col: {"min": float(params["min"]), "max": float(params["max"])} 
    for col, params in scaler_params.items()
}
scaler_path = os.path.join(LOCAL_PROCESSED_PATH, "scaler_params.json")
with open(scaler_path, 'w') as f:
    json.dump(scaler_json, f, indent=2)
print(f"   ‚úì Scaler params saved: scaler_params.json")

print(f"\n[SUCCESS] All datasets saved locally to: {LOCAL_PROCESSED_PATH}")

# ========================================
# UPLOAD TO HDFS
# ========================================
print(f"\n[UPLOAD] Uploading processed data to HDFS...")
print("="*60)

try:
    hdfs_upload_directory(LOCAL_PROCESSED_PATH, HDFS_PROCESSED_DATA_PATH)
    print(f"\n[SUCCESS] All data uploaded to HDFS: {HDFS_PROCESSED_DATA_PATH}")
    print(f"[INFO] Verify at NameNode UI: http://localhost:9870")
    
    # Verify upload
    result = subprocess.run(
        ["docker", "exec", "hdfs-namenode", "hdfs", "dfs", "-ls", "-R", HDFS_PROCESSED_DATA_PATH],
        capture_output=True,
        text=True
    )
    print(f"\n[VERIFY] HDFS Contents:")
    print(result.stdout)
    
except Exception as e:
    print(f"\n[ERROR] Upload failed: {str(e)}")
    print(f"[INFO] Data is still available locally at: {LOCAL_PROCESSED_PATH}")



[INSTALL] Step 10: Exporting Final Datasets to Disk...
[KAGGLE] Kaggle mode: Saving to /kaggle/working/processed

[DATA] Dataset Status:
  CNN1D-BLSTM: [SUCCESS] Ready
  LSTM: [SUCCESS] Ready
  XGBoost: [SUCCESS] Ready

[SAVE] Exporting datasets to Parquet format...

  [MODEL] Exporting CNN1D-BLSTM datasets...


25/12/01 16:28:58 WARN DAGScheduler: Broadcasting large task binary with size 1874.8 KiB
25/12/01 16:29:32 WARN DAGScheduler: Broadcasting large task binary with size 1672.3 KiB


     [SUCCESS] Saved to: /kaggle/working/processed/cnn_sequences/
        - train: 201,146 records
        - val:   42,389 records
        - test:  42,598 records

  [PROCESSING] Exporting LSTM datasets...


25/12/01 16:29:46 WARN DAGScheduler: Broadcasting large task binary with size 1472.9 KiB
25/12/01 16:29:55 WARN DAGScheduler: Broadcasting large task binary with size 1025.6 KiB
25/12/01 16:30:00 WARN DAGScheduler: Broadcasting large task binary with size 1025.6 KiB
25/12/01 16:30:05 WARN DAGScheduler: Broadcasting large task binary with size 1270.4 KiB


     [SUCCESS] Saved to: /kaggle/working/processed/lstm_sequences/
        - train: 201,482 records
        - val:   42,725 records
        - test:  42,934 records

  [DATA] Exporting XGBoost datasets...


                                                                                

     [SUCCESS] Saved to: /kaggle/working/processed/xgboost/
        - train: 201,818 records
        - val:   43,061 records
        - test:  43,270 records

[SAVE] Saving metadata...
   [SUCCESS] Metadata saved to: /kaggle/working/processed/datasets_ready.json
   [SUCCESS] Scaler params saved to: /kaggle/working/processed/scaler_params.json
   [SUCCESS] Feature metadata saved to: /kaggle/working/processed/feature_metadata.json

[SUCCESS] DATA PREPROCESSING & EXPORT COMPLETE!

[KAGGLE] KAGGLE OUTPUT:
   [?] Location: /kaggle/working/processed/
   [?] To save permanently:
      1. Click 'Save Version' (top right)
      2. Choose 'Save & Run All' (recommended)
      3. Wait for completion (~20-30 min)
      4. Output will appear in 'Output' tab
      5. Use as dataset: '+ Add Data' -> Your Output

[?] Exported Directory Structure:
   /kaggle/working/processed/
   ‚îú‚îÄ‚îÄ cnn_sequences/
   ‚îÇ   ‚îú‚îÄ‚îÄ train/  (201,146 records)
   ‚îÇ   ‚îú‚îÄ‚îÄ val/    (42,389 records)
   ‚îÇ   ‚îî

In [None]:
# Cleanup: Stop Spark and remove temp directory
print("\n[CLEANUP] Cleaning up resources...")
print("="*60)

# Stop Spark
spark.stop()
print("[OK] Spark session stopped")

# Optional: Keep or remove temp directory
print(f"\n[INFO] Temporary files location: {TEMP_DIR}")
print(f"[INFO] You can safely delete this directory after verifying HDFS upload")

# Uncomment to auto-cleanup:
# shutil.rmtree(TEMP_DIR, ignore_errors=True)
# print(f"[OK] Temporary directory cleaned up")

print("\n" + "="*60)
print("PREPROCESSING COMPLETE!")
print("="*60)
print(f"\n‚úì Data Source: HDFS {HDFS_RAW_DATA_PATH}")
print(f"‚úì Data Destination: HDFS {HDFS_PROCESSED_DATA_PATH}")
print(f"‚úì Verify: http://localhost:9870")
print(f"\n‚úì Ready for model training!")



[?] Loading Preprocessed Data with Pandas...
[KAGGLE] Kaggle mode: Loading from /kaggle/input/
   Replace <your-dataset-name> with actual dataset name

[INSTALL] Loading datasets...

[MODEL] CNN1D-BLSTM-Attention:
   [SUCCESS] Train: (201146, 21) | Val: (42389, 21) | Test: (42598, 21)

[PROCESSING] LSTM:
   [SUCCESS] Train: (201482, 21) | Val: (42725, 21) | Test: (42934, 21)

[DATA] XGBoost:
   [SUCCESS] Train: (201818, 74) | Val: (43061, 74) | Test: (43270, 74)

[SUCCESS] All datasets loaded successfully!


In [None]:
cnn_train.head(10)

Unnamed: 0,location_id,datetime,PM10_scaled_sequence,NO2_scaled_sequence,SO2_scaled_sequence,temperature_2m_scaled_sequence,relative_humidity_2m_scaled_sequence,wind_speed_10m_scaled_sequence,surface_pressure_scaled_sequence,precipitation_scaled_sequence,...,hour_cos_sequence,month_sin_sequence,month_cos_sequence,day_of_week_sin_sequence,day_of_week_cos_sequence,wind_direction_sin_sequence,wind_direction_cos_sequence,is_weekend_sequence,PM2_5_log_scaled_sequence,target_value
0,233335,2022-11-05 16:00:00,"[0.010953447846651731, 0.03360716952949963, 0....","[0.2112954186413902, 0.12796208530805686, 0.09...","[0.01834862385321101, 0.014416775884665795, 0....","[0.5221518987341772, 0.5379746835443038, 0.553...","[0.9404761904761905, 0.8928571428571429, 0.857...","[0.14094775212636695, 0.20534629404617252, 0.2...","[0.6020761245674737, 0.6280276816608994, 0.621...","[0.009398496240601503, 0.013157894736842105, 0...",...,"[-0.25881904510252063, -0.9659258262890683, -0...","[-0.5000000000000004, -0.5000000000000004, -0....","[0.8660254037844384, 0.8660254037844384, 0.866...","[-0.9749279121818236, -0.9749279121818236, -0....","[-0.2225209339563146, -0.2225209339563146, -0....","[0.9961946980917455, 0.9993908270190958, 0.994...","[-0.08715574274765824, -0.03489949670250073, -...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...","[0.2658888298411102, 0.40379514201938105, 0.41...",0.575244
1,233335,2022-11-05 21:00:00,"[0.03659447348767737, 0.04530744336569579, 0.0...","[0.052132701421800945, 0.046603475513428125, 0...","[0.02490170380078637, 0.030144167758846655, 0....","[0.5284810126582279, 0.5348101265822784, 0.534...","[0.9523809523809523, 0.9523809523809523, 0.952...","[0.11421628189550426, 0.12636695018226005, 0.1...","[0.6591695501730094, 0.6678200692041512, 0.676...","[0.011278195488721804, 0.007518796992481203, 0...",...,"[0.5000000000000001, 0.7071067811865474, 0.866...","[-0.5000000000000004, -0.5000000000000004, -0....","[0.8660254037844384, 0.8660254037844384, 0.866...","[-0.9749279121818236, -0.9749279121818236, -0....","[-0.2225209339563146, -0.2225209339563146, -0....","[1.0, 0.9993908270190958, 0.9925461516413221, ...","[6.123233995736766e-17, 0.03489949670250108, -...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...","[0.5026014988639874, 0.5500357285414436, 0.559...",0.490291
2,233335,2022-11-06 20:00:00,"[0.05402041324371421, 0.06123973114264377, 0.0...","[0.10979462875197474, 0.08412322274881517, 0.1...","[0.02883355176933159, 0.030144167758846655, 0....","[0.5727848101265823, 0.5727848101265823, 0.563...","[0.8452380952380952, 0.8452380952380952, 0.857...","[0.15188335358444716, 0.14823815309842042, 0.1...","[0.6747404844290663, 0.6782006920415219, 0.685...","[0.0037593984962406013, 0.0018796992481203006,...",...,"[-0.7071067811865479, -0.5000000000000004, -0....","[-0.5000000000000004, -0.5000000000000004, -0....","[0.8660254037844384, 0.8660254037844384, 0.866...","[-0.7818314824680299, -0.7818314824680299, -0....","[0.6234898018587334, 0.6234898018587334, 0.623...","[0.754709580222772, 0.8290375725550417, 0.7880...","[0.6560590289905073, 0.5591929034707468, 0.615...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, ...","[0.5416701480187393, 0.5657633372272116, 0.558...",0.537972
3,233335,2022-11-07 07:00:00,"[0.029126213592233007, 0.025889967637540454, 0...","[0.05726698262243286, 0.08609794628751975, 0.0...","[0.01179554390563565, 0.014416775884665795, 0....","[0.4746835443037974, 0.46518987341772156, 0.45...","[0.8809523809523809, 0.8690476190476191, 0.857...","[0.15795868772782504, 0.15795868772782504, 0.1...","[0.7404844290657437, 0.7439446366782013, 0.740...","[0.0, 0.0, 0.0018796992481203006, 0.0, 0.0, 0....",...,"[0.8660254037844384, 0.9659258262890681, 1.0, ...","[-0.5000000000000004, -0.5000000000000004, -0....","[0.8660254037844384, 0.8660254037844384, 0.866...","[-0.7818314824680299, -0.7818314824680299, -2....","[0.6234898018587334, 0.6234898018587334, 1.0, ...","[0.6946583704589973, 0.6946583704589973, 0.694...","[0.7193398003386512, 0.7193398003386512, 0.719...","[0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...","[0.422640218551877, 0.3856687264826328, 0.3716...",0.502601
4,233335,2022-11-07 14:00:00,"[0.04605426935524023, 0.0495394573064476, 0.05...","[0.09913112164297, 0.13349131121642968, 0.1733...","[0.01703800786369594, 0.01834862385321101, 0.0...","[0.42721518987341767, 0.42088607594936706, 0.4...","[0.8571428571428571, 0.8571428571428571, 0.857...","[0.17982989064398544, 0.17739975698663427, 0.1...","[0.7249134948096888, 0.7352941176470594, 0.745...","[0.0, 0.0, 0.0, 0.0, 0.0018796992481203006, 0....",...,"[0.25881904510252074, 6.123233995736766e-17, -...","[-0.5000000000000004, -0.5000000000000004, -0....","[0.8660254037844384, 0.8660254037844384, 0.866...","[-2.4492935982947064e-16, -2.4492935982947064e...","[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, ...","[0.7771459614569708, 0.788010753606722, 0.7986...","[0.6293203910498375, 0.6156614753256583, 0.601...","[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...","[0.4902908942160261, 0.43142220079168486, 0.48...",0.542887
5,233335,2022-11-07 15:00:00,"[0.0495394573064476, 0.05526512322628827, 0.05...","[0.13349131121642968, 0.17338072669826224, 0.1...","[0.01834862385321101, 0.02490170380078637, 0.0...","[0.42088607594936706, 0.4177215189873418, 0.44...","[0.8571428571428571, 0.8571428571428571, 0.845...","[0.17739975698663427, 0.17496962332928312, 0.1...","[0.7352941176470594, 0.7456747404844281, 0.759...","[0.0, 0.0, 0.0, 0.0018796992481203006, 0.0, 0....",...,"[6.123233995736766e-17, -0.25881904510252063, ...","[-0.5000000000000004, -0.5000000000000004, -0....","[0.8660254037844384, 0.8660254037844384, 0.866...","[-2.4492935982947064e-16, -2.4492935982947064e...","[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, ...","[0.788010753606722, 0.7986355100472928, 0.4999...","[0.6156614753256583, 0.6018150231520484, 0.866...","[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...","[0.43142220079168486, 0.48218324911068233, 0.5...",0.526439
6,233335,2022-11-07 16:00:00,"[0.05526512322628827, 0.05899925317401045, 0.0...","[0.17338072669826224, 0.19075829383886256, 0.1...","[0.02490170380078637, 0.027522935779816515, 0....","[0.4177215189873418, 0.4430379746835442, 0.455...","[0.8571428571428571, 0.8452380952380952, 0.809...","[0.17496962332928312, 0.13122721749696234, 0.1...","[0.7456747404844281, 0.7595155709342563, 0.771...","[0.0, 0.0, 0.0018796992481203006, 0.0, 0.0, 0....",...,"[-0.25881904510252063, -0.4999999999999998, -0...","[-0.5000000000000004, -0.5000000000000004, -0....","[0.8660254037844384, 0.8660254037844384, 0.866...","[-2.4492935982947064e-16, -2.4492935982947064e...","[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, ...","[0.7986355100472928, 0.49999999999999994, 0.51...","[0.6018150231520484, 0.8660254037844387, 0.857...","[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...","[0.48218324911068233, 0.5155826260477044, 0.52...",0.531649
7,233335,2022-11-07 17:00:00,"[0.05899925317401045, 0.04480955937266617, 0.0...","[0.19075829383886256, 0.1844391785150079, 0.17...","[0.027522935779816515, 0.022280471821756225, 0...","[0.4430379746835442, 0.45569620253164556, 0.48...","[0.8452380952380952, 0.8095238095238095, 0.761...","[0.13122721749696234, 0.13730255164034022, 0.1...","[0.7595155709342563, 0.7716262975778537, 0.771...","[0.0, 0.0018796992481203006, 0.0, 0.0, 0.0, 0....",...,"[-0.4999999999999998, -0.7071067811865475, -0....","[-0.5000000000000004, -0.5000000000000004, -0....","[0.8660254037844384, 0.8660254037844384, 0.866...","[-2.4492935982947064e-16, -2.4492935982947064e...","[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, ...","[0.49999999999999994, 0.5150380749100542, 0.57...","[0.8660254037844387, 0.8571673007021123, 0.819...","[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...","[0.5155826260477044, 0.5251145754621727, 0.501...",0.558058
8,233335,2022-11-08 00:00:00,"[0.05526512322628827, 0.05750560119492159, 0.0...","[0.14296998420221171, 0.11690363349131122, 0.0...","[0.03145478374836173, 0.02490170380078637, 0.0...","[0.5474683544303797, 0.5284810126582279, 0.509...","[0.7261904761904762, 0.7380952380952381, 0.773...","[0.1822600243013366, 0.13001215066828675, 0.11...","[0.7145328719723181, 0.7197231833910025, 0.723...","[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",...,"[-0.5000000000000004, -0.25881904510252063, -1...","[-0.5000000000000004, -0.5000000000000004, -0....","[0.8660254037844384, 0.8660254037844384, 0.866...","[-2.4492935982947064e-16, -2.4492935982947064e...","[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.623...","[0.6691306063588582, 0.5446390350150271, 0.469...","[0.7431448254773942, 0.838670567945424, 0.8829...","[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...","[0.5752438812449654, 0.5731764946609242, 0.567...",0.478844
9,233335,2022-11-08 02:00:00,"[0.05974607916355489, 0.05725665919840677, 0.0...","[0.09083728278041074, 0.12006319115323855, 0.0...","[0.022280471821756225, 0.022280471821756225, 0...","[0.509493670886076, 0.490506329113924, 0.47151...","[0.7738095238095238, 0.7738095238095238, 0.833...","[0.11907654921020658, 0.13973268529769137, 0.1...","[0.72318339100346, 0.7318339100346019, 0.75259...","[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.00375939...",...,"[-1.8369701987210297e-16, 0.2588190451025203, ...","[-0.5000000000000004, -0.5000000000000004, -0....","[0.8660254037844384, 0.8660254037844384, 0.866...","[-2.4492935982947064e-16, -2.4492935982947064e...","[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.6234898018587...","[0.4694715627858908, 0.5299192642332049, 0.358...","[0.882947592858927, 0.848048096156426, 0.93358...","[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...","[0.5679102402506214, 0.5500357285414436, 0.480...",0.522438


In [None]:
cnn_train.iloc[1]['PM2_5_log_scaled_sequence']

array([0.5026015 , 0.55003573, 0.55917734, 0.54886235, 0.53921268,
       0.54768189, 0.56897491, 0.57317649, 0.56359258, 0.55693172,
       0.59386427, 0.62088433, 0.62570843, 0.58913362, 0.52108626,
       0.49029089, 0.49654315, 0.52243828, 0.51972482, 0.54167015,
       0.56576334, 0.55805776, 0.55579916, 0.53797225, 0.49959581,
       0.49029089, 0.42264022, 0.38566873, 0.37160754, 0.3745019 ,
       0.4612311 , 0.50556165, 0.53293027, 0.49029089, 0.4314222 ,
       0.48218325, 0.51558263, 0.52511458, 0.50110443, 0.53035962,
       0.48052055, 0.13541754, 0.17103701, 0.57524388, 0.57317649,
       0.56791024, 0.55003573, 0.48052055])

In [None]:
lstm_train.head(10)

Unnamed: 0,location_id,datetime,PM10_scaled_sequence,NO2_scaled_sequence,SO2_scaled_sequence,temperature_2m_scaled_sequence,relative_humidity_2m_scaled_sequence,wind_speed_10m_scaled_sequence,surface_pressure_scaled_sequence,precipitation_scaled_sequence,...,hour_cos_sequence,month_sin_sequence,month_cos_sequence,day_of_week_sin_sequence,day_of_week_cos_sequence,wind_direction_sin_sequence,wind_direction_cos_sequence,is_weekend_sequence,PM2_5_log_scaled_sequence,target_value
0,233335,2022-11-04 15:00:00,"[0.010953447846651731, 0.03360716952949963, 0....","[0.2112954186413902, 0.12796208530805686, 0.09...","[0.01834862385321101, 0.014416775884665795, 0....","[0.5221518987341772, 0.5379746835443038, 0.553...","[0.9404761904761905, 0.8928571428571429, 0.857...","[0.14094775212636695, 0.20534629404617252, 0.2...","[0.6020761245674737, 0.6280276816608994, 0.621...","[0.009398496240601503, 0.013157894736842105, 0...",...,"[-0.25881904510252063, -0.9659258262890683, -0...","[-0.5000000000000004, -0.5000000000000004, -0....","[0.8660254037844384, 0.8660254037844384, 0.866...","[-0.9749279121818236, -0.9749279121818236, -0....","[-0.2225209339563146, -0.2225209339563146, -0....","[0.9961946980917455, 0.9993908270190958, 0.994...","[-0.08715574274765824, -0.03489949670250073, -...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...","[0.2658888298411102, 0.40379514201938105, 0.41...",0.54167
1,233335,2022-11-04 18:00:00,"[0.026387851630570076, 0.02414737366193677, 0....","[0.08056872037914692, 0.07187993680884676, 0.0...","[0.01179554390563565, 0.01834862385321101, 0.0...","[0.5474683544303797, 0.5158227848101264, 0.528...","[0.8928571428571429, 0.9642857142857143, 0.952...","[0.1968408262454435, 0.1057108140947752, 0.114...","[0.6193771626297575, 0.6435986159169543, 0.659...","[0.06203007518796992, 0.015037593984962405, 0....",...,"[-0.7071067811865479, 0.2588190451025203, 0.50...","[-0.5000000000000004, -0.5000000000000004, -0....","[0.8660254037844384, 0.8660254037844384, 0.866...","[-0.9749279121818236, -0.9749279121818236, -0....","[-0.2225209339563146, -0.2225209339563146, -0....","[0.9781476007338057, 0.7071067811865475, 1.0, ...","[-0.20791169081775912, 0.7071067811865476, 6.1...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...","[0.39878547755870103, 0.4575097227627197, 0.50...",0.555799
2,233335,2022-11-04 20:00:00,"[0.03659447348767737, 0.04530744336569579, 0.0...","[0.052132701421800945, 0.046603475513428125, 0...","[0.02490170380078637, 0.030144167758846655, 0....","[0.5284810126582279, 0.5348101265822784, 0.534...","[0.9523809523809523, 0.9523809523809523, 0.952...","[0.11421628189550426, 0.12636695018226005, 0.1...","[0.6591695501730094, 0.6678200692041512, 0.676...","[0.011278195488721804, 0.007518796992481203, 0...",...,"[0.5000000000000001, 0.7071067811865474, 0.866...","[-0.5000000000000004, -0.5000000000000004, -0....","[0.8660254037844384, 0.8660254037844384, 0.866...","[-0.9749279121818236, -0.9749279121818236, -0....","[-0.2225209339563146, -0.2225209339563146, -0....","[1.0, 0.9993908270190958, 0.9925461516413221, ...","[6.123233995736766e-17, 0.03489949670250108, -...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...","[0.5026014988639874, 0.5500357285414436, 0.559...",0.499596
3,233335,2022-11-04 23:00:00,"[0.045058501369180985, 0.041573313417973615, 0...","[0.12085308056872039, 0.17890995260663506, 0.1...","[0.02883355176933159, 0.0327653997378768, 0.03...","[0.5379746835443038, 0.531645569620253, 0.5221...","[0.9523809523809523, 0.9642857142857143, 0.964...","[0.14094775212636695, 0.11057108140947752, 0.1...","[0.6747404844290663, 0.66955017301038, 0.66435...","[0.015037593984962405, 0.03759398496240601, 0....",...,"[0.9659258262890681, 1.0, 0.9659258262890683, ...","[-0.5000000000000004, -0.5000000000000004, -0....","[0.8660254037844384, 0.8660254037844384, 0.866...","[-0.9749279121818236, -0.7818314824680299, -0....","[-0.2225209339563146, 0.6234898018587334, 0.62...","[0.9925461516413221, 0.9876883405951378, 0.819...","[-0.12186934340514737, 0.15643446504023092, 0....","[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...","[0.5488623543891202, 0.5392126790240658, 0.547...",0.385669
4,233335,2022-11-05 02:00:00,"[0.058252427184466014, 0.05899925317401045, 0....","[0.14139020537124802, 0.13112164296998421, 0.1...","[0.03669724770642202, 0.03669724770642202, 0.0...","[0.5221518987341772, 0.5253164556962026, 0.525...","[0.9642857142857143, 0.9523809523809523, 0.928...","[0.13001215066828675, 0.16403402187120292, 0.1...","[0.6608996539792381, 0.6522491349480962, 0.653...","[0.013157894736842105, 0.005639097744360902, 0...",...,"[0.8660254037844387, 0.7071067811865476, 0.500...","[-0.5000000000000004, -0.5000000000000004, -0....","[0.8660254037844384, 0.8660254037844384, 0.866...","[-0.7818314824680299, -0.7818314824680299, -0....","[0.6234898018587334, 0.6234898018587334, 0.623...","[0.9396926207859083, 0.9876883405951378, 0.970...","[0.3420201433256688, 0.15643446504023092, 0.24...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...","[0.5689749088750778, 0.5731764946609242, 0.563...",0.461231
5,233335,2022-11-05 16:00:00,"[0.05402041324371421, 0.06123973114264377, 0.0...","[0.10979462875197474, 0.08412322274881517, 0.1...","[0.02883355176933159, 0.030144167758846655, 0....","[0.5727848101265823, 0.5727848101265823, 0.563...","[0.8452380952380952, 0.8452380952380952, 0.857...","[0.15188335358444716, 0.14823815309842042, 0.1...","[0.6747404844290663, 0.6782006920415219, 0.685...","[0.0037593984962406013, 0.0018796992481203006,...",...,"[-0.7071067811865479, -0.5000000000000004, -0....","[-0.5000000000000004, -0.5000000000000004, -0....","[0.8660254037844384, 0.8660254037844384, 0.866...","[-0.7818314824680299, -0.7818314824680299, -0....","[0.6234898018587334, 0.6234898018587334, 0.623...","[0.754709580222772, 0.8290375725550417, 0.7880...","[0.6560590289905073, 0.5591929034707468, 0.615...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, ...","[0.5416701480187393, 0.5657633372272116, 0.558...",0.575244
6,233335,2022-11-05 21:00:00,"[0.04655215334826985, 0.03410505352252925, 0.0...","[0.062401263823064775, 0.0703001579778831, 0.0...","[0.01834862385321101, 0.020969855832241157, 0....","[0.4841772151898734, 0.4810126582278481, 0.474...","[0.8809523809523809, 0.8928571428571429, 0.880...","[0.1543134872417983, 0.1543134872417983, 0.157...","[0.7214532871972312, 0.7318339100346019, 0.740...","[0.0018796992481203006, 0.0, 0.0, 0.0, 0.00187...",...,"[0.5000000000000001, 0.7071067811865474, 0.866...","[-0.5000000000000004, -0.5000000000000004, -0....","[0.8660254037844384, 0.8660254037844384, 0.866...","[-0.7818314824680299, -0.7818314824680299, -0....","[0.6234898018587334, 0.6234898018587334, 0.623...","[0.6819983600624985, 0.6819983600624985, 0.694...","[0.7313537016191706, 0.7313537016191706, 0.719...","[0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...","[0.4995958055932217, 0.4902908942160261, 0.422...",0.490291
7,233335,2022-11-06 20:00:00,"[0.05526512322628827, 0.05750560119492159, 0.0...","[0.14296998420221171, 0.11690363349131122, 0.0...","[0.03145478374836173, 0.02490170380078637, 0.0...","[0.5474683544303797, 0.5284810126582279, 0.509...","[0.7261904761904762, 0.7380952380952381, 0.773...","[0.1822600243013366, 0.13001215066828675, 0.11...","[0.7145328719723181, 0.7197231833910025, 0.723...","[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",...,"[-0.5000000000000004, -0.25881904510252063, -1...","[-0.5000000000000004, -0.5000000000000004, -0....","[0.8660254037844384, 0.8660254037844384, 0.866...","[-2.4492935982947064e-16, -2.4492935982947064e...","[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.623...","[0.6691306063588582, 0.5446390350150271, 0.469...","[0.7431448254773942, 0.838670567945424, 0.8829...","[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...","[0.5752438812449654, 0.5731764946609242, 0.567...",0.537972
8,233335,2022-11-07 07:00:00,"[0.05526512322628827, 0.05601194921583271, 0.0...","[0.11690363349131122, 0.11216429699842022, 0.1...","[0.01703800786369594, 0.020969855832241157, 0....","[0.46202531645569617, 0.45569620253164556, 0.4...","[0.8214285714285714, 0.8214285714285714, 0.833...","[0.1275820170109356, 0.10814094775212638, 0.11...","[0.761245674740485, 0.7595155709342563, 0.7560...","[0.0, 0.0, 0.0037593984962406013, 0.0018796992...",...,"[0.9659258262890681, 1.0, 0.9659258262890683, ...","[-0.5000000000000004, -0.5000000000000004, -0....","[0.8660254037844384, 0.8660254037844384, 0.866...","[-2.4492935982947064e-16, 0.7818314824680298, ...","[1.0, 0.6234898018587336, 0.6234898018587336, ...","[0.6560590289905072, 0.5299192642332049, 0.629...","[0.7547095802227721, 0.848048096156426, 0.7771...","[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...","[0.5793141578229432, 0.5591773357499615, 0.521...",0.502601
9,233335,2022-11-07 14:00:00,"[0.05850136918098083, 0.04680109534478467, 0.0...","[0.1358609794628752, 0.18957345971563982, 0.17...","[0.014416775884665795, 0.01834862385321101, 0....","[0.4525316455696202, 0.4683544303797468, 0.487...","[0.8690476190476191, 0.8571428571428571, 0.797...","[0.1069258809234508, 0.11907654921020658, 0.13...","[0.7785467128027688, 0.7716262975778537, 0.750...","[0.015037593984962405, 0.06954887218045112, 0....",...,"[-0.8660254037844387, -0.9659258262890682, -1....","[-0.5000000000000004, -0.5000000000000004, -0....","[0.8660254037844384, 0.8660254037844384, 0.866...","[0.7818314824680298, 0.7818314824680298, 0.781...","[0.6234898018587336, 0.6234898018587336, 0.623...","[0.8191520442889918, 0.8090169943749475, 0.719...","[0.5735764363510462, 0.5877852522924731, 0.694...","[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, ...","[0.5155826260477044, 0.5303596209261283, 0.522...",0.542887


In [None]:
xgb_train.head(10)

Unnamed: 0,location_id,datetime,PM2_5_log_scaled,PM10_scaled,NO2_scaled,SO2_scaled,temperature_2m_scaled,relative_humidity_2m_scaled,wind_speed_10m_scaled,surface_pressure_scaled,...,surface_pressure_lag3_scaled,surface_pressure_lag6_scaled,surface_pressure_lag12_scaled,surface_pressure_lag24_scaled,precipitation_lag1_scaled,precipitation_lag2_scaled,precipitation_lag3_scaled,precipitation_lag6_scaled,precipitation_lag12_scaled,precipitation_lag24_scaled
0,7727,2022-11-02 02:00:00,0.280367,0.019417,0.183649,0.065531,0.490506,0.666667,0.397327,0.532872,...,0.557093,0.555363,0.576125,0.572664,0.003759,0.016917,0.033835,0.013158,0.0,0.0
1,7727,2022-11-02 04:00:00,0.051257,0.012198,0.182859,0.066841,0.468354,0.678571,0.391252,0.522491,...,0.553633,0.562284,0.562284,0.565744,0.0,0.003759,0.016917,0.067669,0.005639,0.0
2,7727,2022-11-02 06:00:00,0.54167,0.039582,0.170221,0.068152,0.462025,0.702381,0.385176,0.525952,...,0.546713,0.560554,0.536332,0.550173,0.009398,0.0,0.003759,0.016917,0.00188,0.0
3,7727,2022-11-02 07:00:00,0.518354,0.040578,0.159163,0.070773,0.462025,0.702381,0.381531,0.538062,...,0.532872,0.557093,0.534602,0.539792,0.005639,0.009398,0.0,0.033835,0.0,0.0
4,7727,2022-11-02 10:00:00,0.356455,0.021409,0.095182,0.065531,0.477848,0.785714,0.393682,0.584775,...,0.522491,0.553633,0.536332,0.536332,0.007519,0.005639,0.009398,0.016917,0.0,0.0
5,7727,2022-11-02 11:00:00,0.362658,0.024396,0.084123,0.070773,0.490506,0.77381,0.398542,0.581315,...,0.525952,0.546713,0.541522,0.534602,0.016917,0.007519,0.005639,0.003759,0.0,0.0
6,7727,2022-11-02 12:00:00,0.424873,0.028379,0.079384,0.066841,0.493671,0.761905,0.407047,0.570934,...,0.538062,0.532872,0.555363,0.544983,0.003759,0.016917,0.007519,0.0,0.013158,0.0
7,7727,2022-11-02 13:00:00,0.403795,0.022654,0.090442,0.065531,0.484177,0.77381,0.403402,0.557093,...,0.584775,0.522491,0.562284,0.555363,0.0,0.003759,0.016917,0.009398,0.067669,0.0
8,7727,2022-11-02 14:00:00,0.36867,0.022903,0.058057,0.068152,0.493671,0.761905,0.400972,0.544983,...,0.581315,0.525952,0.560554,0.581315,0.00188,0.0,0.003759,0.005639,0.016917,0.0
9,7727,2022-11-02 15:00:00,0.350049,0.021907,0.058057,0.068152,0.5,0.75,0.386391,0.539792,...,0.570934,0.538062,0.557093,0.591696,0.00188,0.00188,0.0,0.007519,0.033835,0.0


---

## ‚úÖ Preprocessing Ho√†n T·∫•t v·ªõi HDFS!

### üìä T·ªïng K·∫øt

**Ngu·ªìn D·ªØ Li·ªáu:**
- ‚úÖ Downloaded t·ª´ HDFS: `hdfs://namenode:9000/data/raw/`
- ‚úÖ 28 files (14 pollutant + 14 weather) t·ª´ 14 ƒë·ªãa ƒëi·ªÉm

**X·ª≠ L√Ω:**
- ‚úÖ Data cleaning (outliers, missing values)
- ‚úÖ Feature engineering (time features, lags)
- ‚úÖ Scaling (MinMax 0-1)
- ‚úÖ Sequence generation (CNN: 24h, LSTM: 48h)

**K·∫øt Qu·∫£ L∆∞u Tr√™n HDFS:**
- ‚úÖ Uploaded to: `hdfs://namenode:9000/data/processed/`
- ‚úÖ 3 model datasets (CNN, LSTM, XGBoost)
- ‚úÖ Metadata + Scaler parameters

### üîç Verify Tr√™n HDFS

#### Option 1: Web UI
M·ªü browser: **http://localhost:9870**
- Utilities ‚Üí Browse the file system
- Navigate: `/data/processed/`
- Xem: cnn_sequences/, lstm_sequences/, xgboost/, *.json

#### Option 2: Command Line
```powershell
# List t·∫•t c·∫£ files
docker exec hdfs-namenode hdfs dfs -ls -R /data/processed

# Xem k√≠ch th∆∞·ªõc
docker exec hdfs-namenode hdfs dfs -du -h /data/processed

# ƒê·ªçc metadata
docker exec hdfs-namenode hdfs dfs -cat /data/processed/datasets_ready.json
```

### üìÅ Local Staging Files
D·ªØ li·ªáu t·∫°m c≈©ng c√≥ t·∫°i local (c√≥ th·ªÉ x√≥a sau khi verify HDFS):
```python
print(f"Temp directory: {TEMP_DIR}")
```

### üéì Demo Cho M√¥n Big Data

**ƒê√£ ho√†n th√†nh:**
1. ‚úÖ Setup HDFS cluster (NameNode + 3 DataNodes)
2. ‚úÖ Upload d·ªØ li·ªáu l√™n HDFS distributed storage
3. ‚úÖ X·ª≠ l√Ω v·ªõi PySpark (distributed processing framework)
4. ‚úÖ L∆∞u k·∫øt qu·∫£ v·ªÅ HDFS (persistent storage)
5. ‚úÖ Verify replication v√† fault tolerance

**C√≥ th·ªÉ present:**
- HDFS architecture (blocks, replication)
- Data locality trong processing
- Scalability v·ªõi nhi·ªÅu DataNodes
- Fault tolerance v·ªõi replication factor = 2

### üîÑ Next Steps

**ƒê·ªÉ train models:**
```python
# Download processed data t·ª´ HDFS v·ªÅ local
docker exec hdfs-namenode hdfs dfs -get /data/processed ./data/

# Sau ƒë√≥ d√πng notebooks training nh∆∞ b√¨nh th∆∞·ªùng
# 02-cnn-bilstm-attention-model.ipynb
# ...
```

**Ho·∫∑c scale l√™n:**
- Th√™m DataNodes trong docker-compose.yml
- TƒÉng replication factor
- Process tr√™n Spark cluster th·∫≠t thay v√¨ local mode