In [53]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, lit, when, exp, radians, cos, sin, mean, avg, max as spark_max, min as spark_min, 
    sum as spark_sum, lag
)
from pyspark.sql.window import Window
import pyspark.sql.functions as F

In [54]:
# Initialize Spark session
spark = SparkSession.builder.appName("ForestFireFeatureEngineering").getOrCreate()

# Initialize Spark session
spark = SparkSession.builder.appName("ForestFireFeatureEngineering").getOrCreate()

# Path to the data directory (do not specify a file name, just the directory)
data_dir = "../new_data/new_top_ten_wildfire_weather_data"

# Load all CSV files in the directory
df = spark.read.csv(data_dir, header=True, inferSchema=True)

# Show the first few rows to verify
df.show(5)

+----------+--------------+----------------+-----------------+------------------+------------------+-------------------+------------------------+------------------------+-------------------------+-----------------+-----------------+-----------------+---------+------------+-------------------+------------------+------------------+---------------------------+-----------------------+--------------------------+--------+---------+----------+----+-----+---------+----------+----------+-------+----------+---+--------+----+--------+
|      date|          city|weather_latitude|weather_longitude|temperature_2m_max|temperature_2m_min|temperature_2m_mean|apparent_temperature_max|apparent_temperature_min|apparent_temperature_mean|daylight_duration|sunshine_duration|precipitation_sum| rain_sum|snowfall_sum|precipitation_hours|wind_speed_10m_max|wind_gusts_10m_max|wind_direction_10m_dominant|shortwave_radiation_sum|et0_fao_evapotranspiration|latitude|longitude|brightness|scan|track|satellite|instrumen

In [55]:
df = df.withColumn(
    "new_temperature_range", col("temperature_2m_max") - col("temperature_2m_min")
)
df = df.withColumn(
    "new_relative_humidity",
    100 * (
        exp((17.625 * col("temperature_2m_min")) / (col("temperature_2m_min") + 243.04)) /
        exp((17.625 * col("temperature_2m_max")) / (col("temperature_2m_max") + 243.04))
    )
)
window_spec = Window.partitionBy("city").orderBy("date").rowsBetween(-6, 0)

df = df.withColumn(
    "new_cumulative_precipitation",
    spark_sum("precipitation_sum").over(window_spec)
)
df = df.withColumn(
    "new_dryness_index",
    col("shortwave_radiation_sum") / col("et0_fao_evapotranspiration")
)
df = df.withColumn(
    "new_fire_intensity",
    col("brightness") * col("confidence") * col("frp")
)
df = df.withColumn(
    "new_daylight_fraction", col("daylight_duration") / lit(24.0)
)


In [56]:
df = df.withColumn(
    "new_wind_x",
    col("wind_speed_10m_max") * cos(radians(col("wind_direction_10m_dominant")))
)

df = df.withColumn(
    "new_wind_y",
    col("wind_speed_10m_max") * sin(radians(col("wind_direction_10m_dominant")))
)

df = df.withColumn(
    "new_fire_risk",
    col("brightness") * col("temperature_2m_max") * col("wind_speed_10m_max")
)

evapotranspiration_coefficient = 1

df = df.withColumn(
    "new_soil_moisture",
    col("precipitation_sum") - (col("shortwave_radiation_sum") * lit(evapotranspiration_coefficient))
)
df = df.withColumn(
    "new_precip_radiation_ratio",
    col("precipitation_sum") / col("shortwave_radiation_sum")
)
# historical_avg_precipitation = 10.0  # Replace with the actual value

# city_avg_precipitation = df.groupBy("city").agg(avg("precipitation_sum").alias("city_avg_precipitation"))


# df = df.join(city_avg_precipitation, on="city", how="left").select(
#     df["*"],
#     city_avg_precipitation["city_avg_precipitation"]
# )
# df = df.withColumn(
#     "new_drought_index",
#     (col("city_avg_precipitation") - col("new_cumulative_precipitation")) / col("city_avg_precipitation")
# )


In [58]:
df.select(
    "new_temperature_range", 
    "new_relative_humidity", 
    "new_cumulative_precipitation",
    "new_wind_x", 
    "new_wind_y",
    "new_soil_moisture"
).filter(df.in_modis==True).show(200)




+---------------------+---------------------+----------------------------+-------------------+-------------------+-------------------+
|new_temperature_range|new_relative_humidity|new_cumulative_precipitation|         new_wind_x|         new_wind_y|  new_soil_moisture|
+---------------------+---------------------+----------------------------+-------------------+-------------------+-------------------+
|            4.6000001|    70.95041730356431|                         0.0|  -8.19948637606025| -6.502337996444922|              -2.36|
|            4.6000001|    70.95041730356431|                         0.0|  -8.19948637606025| -6.502337996444922|              -2.36|
|            4.6000001|    70.95041730356431|                         0.0|  -8.19948637606025| -6.502337996444922|              -2.36|
|            4.6000001|    70.95041730356431|                         0.0|  -8.19948637606025| -6.502337996444922|              -2.36|
|            4.6000001|    70.95041730356431|          

                                                                                

In [60]:
output_path = "../new_data/processed_data"

df.write.csv(output_path, header=True, mode="overwrite")


                                                                                