In [0]:
# from pyspark.sql import functions as F

# csv_path = "/Volumes/workspace/mais_hacks/dataset/*.csv"
# df = spark.read.csv(csv_path, header=True, inferSchema=True)

# # Columns to keep (use the exact names in your CSV)
# cols_to_keep = ["Date/Time (LST)", "Temp (�C)"]
# df = df.select([F.col(c) for c in cols_to_keep])

# # Convert Date/Time to proper timestamp
# df = df.withColumn(
#     "timestamp",
#     F.to_timestamp(F.col("Date/Time (LST)"), "yyyy-MM-dd HH:mm")  # adjust format if needed
# )

# # Rename temperature column to a clean name
# df = df.withColumnRenamed("Temp (�C)", "temp")

# # Keep only timestamps and temp
# df_weather = df.select("timestamp", "temp")

# # Save as table
# df_weather.write.mode("overwrite").option("mergeSchema", "true").saveAsTable("mais_hacks.mais_hacks_cleaned2")

# df_weather.show(5)


In [0]:

# # Read arrivals CSV
# csv_path2 = "/Volumes/workspace/mais_hacks/scnd/*.csv"
# df_arrival = spark.read.csv(csv_path2, header=True, inferSchema=True)

# # Inspect columns to find timestamp column
# df_arrival.printSchema()
# df_arrival.show(5)

In [0]:
# # Truncate timestamps to the hour for alignment
# df_arrival_hourly = df_arrival.withColumn("hour_ts", F.date_trunc("hour", F.col("timestamp")))

# df_weather_hourly = df_weather.withColumn("hour_ts", F.date_trunc("hour", F.col("timestamp")))

# # Join on the hourly timestamp
# combined_df = df_arrival_hourly.join(
#     df_weather_hourly,
#     on="hour_ts",
#     how="left"
# ).drop("timestamp") \
#  .withColumnRenamed("hour_ts", "timestamp")

# combined_df.show(5)


In [0]:
# from pyspark.sql import functions as F

# # Check column names to be sure
# combined_df.printSchema()

# # Use the correct timestamp column name ('timestamp')
# combined_df = combined_df.withColumn("year", F.year(F.col("timestamp")))

# # Training data: 2022-2023
# train_df = combined_df.filter((F.col("year") >= 2022) & (F.col("year") <= 2023)).drop("year")

# # Testing data: 2024
# test_df = combined_df.filter(F.col("year") == 2024).drop("year")

# print("Training data count:", train_df.count())
# print("Testing data count:", test_df.count())

# train_df.show(5)
# test_df.show(5)


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import DateType

# ----------------------------
# Step 1: Read CSVs
# ----------------------------
csv_weather = "/Volumes/workspace/mais_hacks/dataset/*.csv"
csv_arrivals = "/Volumes/workspace/mais_hacks/scnd/*.csv"

df_weather = spark.read.csv(csv_weather, header=True, inferSchema=True)
df_arrivals = spark.read.csv(csv_arrivals, header=True, inferSchema=True)

# ----------------------------
# Step 2: Clean weather data
# ----------------------------
df_weather = df_weather.select(
    F.col("Date/Time (LST)").alias("timestamp"),
    F.col("Temp (�C)").alias("temp")
)

df_weather = df_weather.withColumn(
    "timestamp",
    F.to_timestamp(F.col("timestamp"), "yyyy-MM-dd HH:mm")
)

# ----------------------------
# Step 3: Clean arrivals data
# ----------------------------
# Rename your timestamp column if needed
# Replace 'timestamps' below with actual column name in arrivals CSV
df_arrivals = df_arrivals.withColumnRenamed("timestamps", "timestamp")

# ----------------------------
# Step 4: Align to hourly timestamps
# ----------------------------
df_weather_hourly = df_weather.withColumn("hour_ts", F.date_trunc("hour", F.col("timestamp")))
df_arrivals_hourly = df_arrivals.withColumn("hour_ts", F.date_trunc("hour", F.col("timestamp")))

# ----------------------------
# Step 5: Join weather and arrivals
# ----------------------------
combined_df = df_arrivals_hourly.join(
    df_weather_hourly,
    on="hour_ts",
    how="left"
).drop("timestamp") \
 .withColumnRenamed("hour_ts", "timestamp")

# ----------------------------
# Step 6: Add day-of-week
# ----------------------------
combined_df = combined_df.withColumn("day_of_week_int", F.dayofweek(F.col("timestamp")))
combined_df = combined_df.withColumn("day_of_week", F.date_format(F.col("timestamp"), "EEEE"))

# ----------------------------
# Step 7: Add holidays
# ----------------------------
holidays = [
    "2022-01-01", "2022-07-01", "2022-12-25",
    "2023-01-01", "2023-07-01", "2023-12-25",
    "2024-01-01", "2024-07-01", "2024-12-25"
]

# Create a holidays DataFrame
holidays_df = spark.createDataFrame([(d,) for d in holidays], ["holiday_date"])
holidays_df = holidays_df.withColumn("holiday_date", F.to_date("holiday_date", "yyyy-MM-dd"))

# Extract date from timestamp
combined_df = combined_df.withColumn("date_only", F.to_date("timestamp"))

# Join to flag holidays
combined_df = combined_df.join(
    holidays_df.withColumn("is_holiday", F.lit(1)),
    combined_df["date_only"] == holidays_df["holiday_date"],
    how="left"
).drop("holiday_date")

combined_df = combined_df.fillna({"is_holiday": 0}).drop("date_only")

# ----------------------------
# Step 8: Split train/test by year
# ----------------------------
combined_df = combined_df.withColumn("year", F.year(F.col("timestamp")))

train_df = combined_df.filter((F.col("year") >= 2022) & (F.col("year") <= 2023)).drop("year")
test_df = combined_df.filter(F.col("year") == 2024).drop("year")

print("Training data count:", train_df.count())
print("Testing data count:", test_df.count())

train_df.show(5)
test_df.show(5)

# ----------------------------
# Step 9: Optional save as table
# ----------------------------
# combined_df.write.mode("overwrite").saveAsTable("mais_hacks.mais_hacks_combined")
