In [1]:
import os
import json
import pytz
import logging
from pathlib import Path
from datetime import datetime, timedelta
from dotenv import load_dotenv
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, FloatType

import sys
sys.path.append("/home/jovyan/work")

# 🔁 Load path from settings.py
from settings import RAW_FLIGHT_DATA_DIR, RAW_WEATHER_DATA_DIR, LOGS_DATA_DIR

In [2]:
# Import Environment from .env
load_dotenv()
MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT")
MINIO_BUCKET = os.getenv("MINIO_BUCKET")
MINIO_USER = os.getenv("MINIO_ROOT_USER")
MINIO_PASS = os.getenv("MINIO_ROOT_PASSWORD")
print('Import Environment')

Import Environment


In [3]:
# Setup date
date = datetime.now(pytz.timezone("Asia/Bangkok")) - timedelta(days=0)
day = date.strftime('%Y-%m-%d')
print(f'Transform Data on date: {day}')

Transform Data on date: 2025-04-02


In [4]:
# Setup Logging
log_dir = LOGS_DATA_DIR
log_dir.mkdir(parents=True, exist_ok=True)
log_file = log_dir / f"transform_{day}.log"
print(f"🪵 Logging to: {log_file}")

for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)
    
logging.basicConfig(
    filename=log_file,
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S"
)

logging.info("Start transform process")
logging.info(f"Target Date: {day}")
print('Complete Setup Logging')

🪵 Logging to: /home/jovyan/work/data/logs/transform_2025-04-02.log
Complete Setup Logging


In [5]:
# Helpers
def safe_get(d, keys, default=None):
    for k in keys:
        if isinstance(d, dict) and k in d:
            d = d[k]
        else:
            return default
    return d

def float_or_none(val):
    try:
        return float(val) if val is not None else None
    except:
        return None

def get_iata_by_city(city):
    mapping = {
        "Tokyo": "NRT",
        "Seoul": "ICN",
        "Singapore": "SIN",
        "Kuala Lumpur": "KUL",
        "Taipei": "TPE",
        "Ho Chi Minh": "SGN",
        "Hong Kong": "HKG"
    }
    return mapping.get(city)

print('Define Helper Function')

Define Helper Function


In [6]:
# CreateSparkSession
spark = SparkSession.builder \
    .appName("CloudJetTransform") \
    .master("spark://spark:7077") \
    .config("spark.hadoop.fs.s3a.endpoint", os.getenv("MINIO_ENDPOINT")) \
    .config("spark.hadoop.fs.s3a.access.key", os.getenv("MINIO_ROOT_USER")) \
    .config("spark.hadoop.fs.s3a.secret.key", os.getenv("MINIO_ROOT_PASSWORD")) \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .getOrCreate()

logging.info(f"Create SparkSession name : CloudJetTransform")
print(f"Create SparkSession name : CloudJetTransform")

Create SparkSession name : CloudJetTransform


In [7]:
print('Start Load Flight Data File')
try:
    # Load Flight Data
    flight_path = RAW_FLIGHT_DATA_DIR / day / 'data.json'
    if not flight_path.exists():
        raise FileNotFoundError(f"Missing flight file: {flight_path}")
        
    logging.info(f"Loading flight data from {flight_path}")
    print(f"Loading flight data from {flight_path}")

    df_flight_raw = spark.read.option("multiline", True).json(str(flight_path))
    df_flight = df_flight_raw.select(
        col("flight.iata").alias("flight_iata"),
        col("departure.iata").alias("departure_iata"),
        col("arrival.iata").alias("arrival_iata"),
        col("arrival.scheduled").alias("arrival_time"),
        col("airline.name").alias("airline"),
        col("arrival.airport").alias("arr_airport"),
    )
    logging.info(f"Flight rows: {df_flight.count()}")
    print(f"Flight rows: {df_flight.count()}")

    # Load Weather Data
    weather_path = RAW_WEATHER_DATA_DIR / day / 'data.json'
    if not weather_path.exists():
        raise FileNotFoundError(f"Missing weather file: {weather_path}")

    logging.info(f"Loading weather data from {weather_path}")
    print(f"Loading weather data from {weather_path}")
    with open(weather_path, "r", encoding='utf-8') as f:
        weather_raw = json.load(f)
        print("Weather JSON loaded")

    print(f"Weather records: {len(weather_raw)}")

    weather_rows = []
    for w in weather_raw:
        weather = safe_get(w, ["data", "weather", 0, "main"])
        if not isinstance(weather, str):
            weather = "Unknown"
        weather_rows.append({
            "city": w.get("city"),
            "iata": get_iata_by_city(w.get("city")),
            "temp": float_or_none(safe_get(w, ["data", "main", "temp"])),
            "humidity": float_or_none(safe_get(w, ["data", "main", "humidity"])),
            "weather": weather,
            "wind_speed": float_or_none(safe_get(w, ["data", "wind", "speed"]))
        })
    print(f"Weather rows parsed: {len(weather_rows)}")

    schema = StructType([
        StructField("city", StringType(), True),
        StructField("iata", StringType(), True),
        StructField("temp", FloatType(), True),
        StructField("humidity", FloatType(), True),
        StructField("weather", StringType(), True),
        StructField("wind_speed", FloatType(), True),
    ])

    print("Creating Spark DataFrame from weather_rows")
    df_weather = spark.createDataFrame(weather_rows, schema=schema)
    logging.info(f"Weather rows: {df_weather.count()}")

    # Join
    df_joined = df_flight.join(df_weather, df_flight.arrival_iata == df_weather.iata, how="left")
    logging.info(f"Joined rows: {df_joined.count()}")
    print(f"Weather rows: {df_weather.count()}")

    # Save to MinIO
    output_path = f"s3a://{MINIO_BUCKET}/{day}/flight_weather.parquet"
    df_joined.write.mode("overwrite").parquet(output_path)
    logging.info(f"Saved to MinIO: {output_path}")
    print(f"Saved to MinIO: {output_path}")

except Exception as e:
    logging.error("Error during transformation", exc_info=True)
    print('Error during transformation')
    print(repr(e))

finally:
    spark.stop()
    logging.info("Spark session stopped")


Start Load Flight Data File
Loading flight data from /home/jovyan/work/data/raw/flight/2025-04-02/data.json
Flight rows: 647
Loading weather data from /home/jovyan/work/data/raw/weather/2025-04-02/data.json
Weather JSON loaded
Weather records: 7
Weather rows parsed: 7
Creating Spark DataFrame from weather_rows
Weather rows: 7
Saved to MinIO: s3a://cloudjet-clean/2025-04-02/flight_weather.parquet


In [8]:
!ls -l /home/jovyan/work/data/raw/flight/2025-04-02/data.json

-rwxrwxrwx 1 root root 961457 Apr  1 20:32 /home/jovyan/work/data/raw/flight/2025-04-02/data.json
