In [2]:
"""
Flight Delay Feature Engineering - Clean Approach
不需要join，直接从FlightDate提取特征
"""

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, month, dayofmonth, dayofweek, quarter, weekofyear,
    when, lit, date_format, hour, minute
)

# Initialize Spark
spark = SparkSession.builder \
    .appName("Feature Engineering") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()

# Load data
BASE_PATH = "/Users/jorahmormont/PycharmProjects/BigDataFinalProject/datasets/robikscube/flight-delay-dataset-20182022/versions/4/"

flight_df = spark.read.parquet(
    BASE_PATH + "Combined_Flights_2018.parquet",
    BASE_PATH + "Combined_Flights_2019.parquet",
    BASE_PATH + "Combined_Flights_2020.parquet",
    BASE_PATH + "Combined_Flights_2021.parquet",
    BASE_PATH + "Combined_Flights_2022.parquet"
)

print("=" * 80)
print("Feature Engineering - Clean Approach (No Join Required)")
print("=" * 80)

# ============================================================================
# 1. Basic Temporal Features (从FlightDate直接提取)
# ============================================================================

print("\n1. Extracting basic temporal features...")

flight_df = flight_df \
    .withColumn('month', month('FlightDate')) \
    .withColumn('day_of_month', dayofmonth('FlightDate')) \
    .withColumn('day_of_week', dayofweek('FlightDate')) \
    .withColumn('quarter', quarter('FlightDate')) \
    .withColumn('week_of_year', weekofyear('FlightDate'))

# ============================================================================
# 2. Weekend Indicator (周末标识)
# ============================================================================

print("2. Creating weekend indicator...")

flight_df = flight_df.withColumn(
    'is_weekend',
    when(col('day_of_week').isin([1, 7]), 1).otherwise(0)
)

# ============================================================================
# 3. Major Holidays (主要节假日 - 不需要外部数据)
# ============================================================================

print("3. Creating major holiday indicators...")

# New Year's Day (1/1)
flight_df = flight_df.withColumn(
    'is_new_year',
    when((month('FlightDate') == 1) & (dayofmonth('FlightDate') == 1), 1).otherwise(0)
)

# Independence Day (7/4)
flight_df = flight_df.withColumn(
    'is_july_4th',
    when((month('FlightDate') == 7) & (dayofmonth('FlightDate') == 4), 1).otherwise(0)
)

# Christmas (12/25)
flight_df = flight_df.withColumn(
    'is_christmas',
    when((month('FlightDate') == 12) & (dayofmonth('FlightDate') == 25), 1).otherwise(0)
)

# Thanksgiving (11月第四个周四 - 22-28号之间的周四)
flight_df = flight_df.withColumn(
    'is_thanksgiving',
    when(
        (month('FlightDate') == 11) &
        (dayofweek('FlightDate') == 5) &  # Thursday
        (dayofmonth('FlightDate').between(22, 28)),
        1
    ).otherwise(0)
)

# Memorial Day (5月最后一个周一)
flight_df = flight_df.withColumn(
    'is_memorial_day',
    when(
        (month('FlightDate') == 5) &
        (dayofweek('FlightDate') == 2) &  # Monday
        (dayofmonth('FlightDate') >= 25),
        1
    ).otherwise(0)
)

# Labor Day (9月第一个周一)
flight_df = flight_df.withColumn(
    'is_labor_day',
    when(
        (month('FlightDate') == 9) &
        (dayofweek('FlightDate') == 2) &  # Monday
        (dayofmonth('FlightDate') <= 7),
        1
    ).otherwise(0)
)

# Any major holiday
flight_df = flight_df.withColumn(
    'is_major_holiday',
    when(
        (col('is_new_year') == 1) |
        (col('is_july_4th') == 1) |
        (col('is_christmas') == 1) |
        (col('is_thanksgiving') == 1) |
        (col('is_memorial_day') == 1) |
        (col('is_labor_day') == 1),
        1
    ).otherwise(0)
)

# ============================================================================
# 4. Holiday Proximity (节假日前后)
# ============================================================================

print("4. Creating holiday proximity features...")

# Day before/after major holidays
flight_df = flight_df.withColumn(
    'near_new_year',
    when(
        (month('FlightDate') == 1) & (dayofmonth('FlightDate').between(1, 2)) |
        (month('FlightDate') == 12) & (dayofmonth('FlightDate').between(30, 31)),
        1
    ).otherwise(0)
)

flight_df = flight_df.withColumn(
    'near_christmas',
    when(
        (month('FlightDate') == 12) & (dayofmonth('FlightDate').between(23, 26)),
        1
    ).otherwise(0)
)

flight_df = flight_df.withColumn(
    'near_thanksgiving',
    when(
        (month('FlightDate') == 11) & (dayofmonth('FlightDate').between(22, 29)),
        1
    ).otherwise(0)
)

# ============================================================================
# 5. Seasonal Features (季节特征)
# ============================================================================

print("5. Creating seasonal features...")

# Holiday season (感恩节到新年)
flight_df = flight_df.withColumn(
    'is_holiday_season',
    when(
        ((month('FlightDate') == 11) & (dayofmonth('FlightDate') >= 22)) |
        (month('FlightDate') == 12) |
        ((month('FlightDate') == 1) & (dayofmonth('FlightDate') <= 2)),
        1
    ).otherwise(0)
)

# Summer travel season (6-8月)
flight_df = flight_df.withColumn(
    'is_summer_travel',
    when(month('FlightDate').between(6, 8), 1).otherwise(0)
)

# Spring break (3月中旬到4月初)
flight_df = flight_df.withColumn(
    'is_spring_break_period',
    when(
        ((month('FlightDate') == 3) & (dayofmonth('FlightDate') >= 15)) |
        ((month('FlightDate') == 4) & (dayofmonth('FlightDate') <= 10)),
        1
    ).otherwise(0)
)

# ============================================================================
# 6. Time of Day Features (如果有CRSDepTime)
# ============================================================================

print("6. Creating time of day features...")

if 'CRSDepTime' in flight_df.columns:
    # CRSDepTime格式是HHMM (如: 1430 = 14:30)
    flight_df = flight_df.withColumn(
        'dep_hour',
        (col('CRSDepTime') / 100).cast('int')
    )

    # Time of day categories
    flight_df = flight_df.withColumn(
        'time_of_day',
        when(col('dep_hour').between(6, 11), 'morning')
        .when(col('dep_hour').between(12, 17), 'afternoon')
        .when(col('dep_hour').between(18, 21), 'evening')
        .otherwise('night')
    )

    # Peak hours
    flight_df = flight_df.withColumn(
        'is_peak_hour',
        when(col('dep_hour').between(7, 9) | col('dep_hour').between(17, 19), 1)
        .otherwise(0)
    )

# ============================================================================
# 7. Show Results
# ============================================================================

print("\n" + "=" * 80)
print("Feature Engineering Complete!")
print("=" * 80)

# Select new feature columns
feature_cols = [
    'FlightDate', 'Year', 'Month',
    'month', 'day_of_month', 'day_of_week', 'quarter', 'week_of_year',
    'is_weekend', 'is_major_holiday',
    'is_new_year', 'is_july_4th', 'is_christmas', 'is_thanksgiving',
    'near_new_year', 'near_christmas', 'near_thanksgiving',
    'is_holiday_season', 'is_summer_travel', 'is_spring_break_period'
]

if 'dep_hour' in flight_df.columns:
    feature_cols.extend(['dep_hour', 'time_of_day', 'is_peak_hour'])

print("\nNew features created:")
for col_name in feature_cols[8:]:  # Skip original columns
    print(f"  ✓ {col_name}")

print("\nSample data with new features:")
flight_df.select(feature_cols).show(10)

# ============================================================================
# 8. Feature Statistics
# ============================================================================

print("\n" + "=" * 80)
print("Feature Statistics")
print("=" * 80)

# Weekend flights
weekend_stats = flight_df.groupBy('is_weekend').count()
print("\nWeekend vs Weekday:")
weekend_stats.show()

# Major holiday flights
holiday_stats = flight_df.groupBy('is_major_holiday').count()
print("\nMajor Holiday vs Regular Day:")
holiday_stats.show()

# Holiday season
holiday_season_stats = flight_df.groupBy('is_holiday_season').count()
print("\nHoliday Season vs Regular Season:")
holiday_season_stats.show()

# ============================================================================
# 9. Delay Analysis by Features
# ============================================================================

if 'DepDel15' in flight_df.columns:
    print("\n" + "=" * 80)
    print("Delay Rates by Features")
    print("=" * 80)

    # Weekend
    print("\nDelay Rate - Weekend vs Weekday:")
    flight_df.filter(col('Cancelled') == 0).groupBy('is_weekend') \
        .agg((sum('DepDel15') / count('*') * 100).alias('delay_rate_pct')) \
        .show()

    # Major holidays
    print("\nDelay Rate - Major Holiday vs Regular:")
    flight_df.filter(col('Cancelled') == 0).groupBy('is_major_holiday') \
        .agg((sum('DepDel15') / count('*') * 100).alias('delay_rate_pct')) \
        .show()

    # Holiday season
    print("\nDelay Rate - Holiday Season vs Regular:")
    flight_df.filter(col('Cancelled') == 0).groupBy('is_holiday_season') \
        .agg((sum('DepDel15') / count('*') * 100).alias('delay_rate_pct')) \
        .show()

    # Summer travel
    print("\nDelay Rate - Summer vs Other Seasons:")
    flight_df.filter(col('Cancelled') == 0).groupBy('is_summer_travel') \
        .agg((sum('DepDel15') / count('*') * 100).alias('delay_rate_pct')) \
        .show()

# ============================================================================
# 10. Save Enhanced Dataset
# ============================================================================

print("\n" + "=" * 80)
print("Saving Enhanced Dataset")
print("=" * 80)

output_path = "flight_data_with_features.parquet"
flight_df.write.mode('overwrite').parquet(output_path)

print(f"\n✓ Enhanced dataset saved to: {output_path}")
print(f"  Total records: {flight_df.count():,}")
print(f"  Total columns: {len(flight_df.columns)}")

print("\n" + "=" * 80)
print("✅ Feature Engineering Complete!")
print("=" * 80)

print("""
Summary:
✓ No external data join required
✓ All features derived from FlightDate
✓ Clean and efficient approach
✓ Ready for model training

Next steps:
1. Add weather features (if needed)
2. Feature selection
3. Model training
""")

# spark.stop()

Flight Delay Data Analysis (2018-2022)

Loading data files...
Reading Parquet files from 2018 to 2022...


AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/Users/jorahmormont/PycharmProjects/BigDataFinalProject/Data Analysis/datasets/robikscube/flight-delay-dataset-20182022/versions/4/raw/Combined_Flights_2021.parquet. SQLSTATE: 42K03