In [7]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [8]:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Initialize Spark Session
spark = SparkSession.builder.appName("SL_Weather_Analysis").getOrCreate()

# Load Datasets
loc_df = spark.read.csv("/content/drive/MyDrive/CW_DATASETS/locationData.csv", header=True, inferSchema=True)
weather_df = spark.read.csv("/content/drive/MyDrive/CW_DATASETS/weatherData.csv", header=True, inferSchema=True)

# Preprocessing
# Convert date string to DateType (Format in CSV is M/d/yyyy based on inspection)
weather_df = weather_df.withColumn("date", F.to_date(F.col("date"), "M/d/yyyy"))
weather_df = weather_df.withColumn("year", F.year("date")) \
                       .withColumn("month", F.month("date"))

# Join with Location Data to get City Names
df = weather_df.join(loc_df, "location_id")

# Cache df for better performance on multiple queries
df.cache()


DataFrame[location_id: int, date: date, weather_code (wmo code): int, temperature_2m_max (°C): double, temperature_2m_min (°C): double, temperature_2m_mean (°C): double, apparent_temperature_max (°C): double, apparent_temperature_min (°C): double, apparent_temperature_mean (°C): double, daylight_duration (s): double, sunshine_duration (s): double, precipitation_sum (mm): double, rain_sum (mm): double, precipitation_hours (h): int, wind_speed_10m_max (km/h): double, wind_gusts_10m_max (km/h): double, wind_direction_10m_dominant (°): int, shortwave_radiation_sum (MJ/m²): double, et0_fao_evapotranspiration (mm): double, sunrise: timestamp, sunset: timestamp, year: int, month: int, latitude: double, longitude: double, elevation: int, utc_offset_seconds: int, timezone: string, timezone_abbreviation: int, city_name: string]

In [9]:
# ---------------------------------------------------------
# Task 2-3.1: Percentage of total shortwave radiation > 15MJ/m² in a month across all districts
# ---------------------------------------------------------

# Calculate Total Radiation and High Radiation (>15) per month
q1_result = df.groupBy("month").agg(
    F.sum("shortwave_radiation_sum (MJ/m²)").alias("total_radiation"),
    F.sum(F.when(F.col("shortwave_radiation_sum (MJ/m²)") > 15,
                 F.col("shortwave_radiation_sum (MJ/m²)")).otherwise(0)).alias("high_radiation")
).withColumn("percentage", (F.col("high_radiation") / F.col("total_radiation")) * 100) \
 .orderBy("month","percentage")

print("--- Radiation Percentage per Month ---")
q1_result.select("month", "percentage").show()




--- Radiation Percentage per Month ---
+-----+-----------------+
|month|       percentage|
+-----+-----------------+
|    1|87.09519350608072|
|    2|94.29768774737937|
|    3|98.05285672729275|
|    4| 97.4256157388907|
|    5|90.85530180292807|
|    6|  92.275791422476|
|    7|93.00052419118205|
|    8|92.74227304062101|
|    9|91.08658237397407|
|   10|84.52634754134056|
|   11|72.27480647083718|
|   12| 71.7864978477085|
+-----+-----------------+



In [10]:
# ---------------------------------------------------------
# Task 2-3.2: The weekly maximum temperatures for the hottest months of an year
# ---------------------------------------------------------

# Identify hottest month for each year (based on Mean Temp)
monthly_stats = df.groupBy("year", "month").agg(F.mean("temperature_2m_max (°C)").alias("monthly_avg_temp"))

window_year = Window.partitionBy("year").orderBy(F.col("monthly_avg_temp").desc())

hottest_months = monthly_stats.withColumn("rank", F.row_number().over(window_year)) \
    .filter(F.col("rank") == 1) \
    .select(F.col("year").alias("hot_year"), F.col("month").alias("hot_month"))

# Join back to original data to filter for these months
df_hottest = df.join(hottest_months,
                     (df.year == hottest_months.hot_year) & (df.month == hottest_months.hot_month))

# Calculate Weekly Max Temperature (using weekofyear)
q2_result = df_hottest.withColumn("week", F.weekofyear("date")) \
    .groupBy("year", "month", "week") \
    .agg(F.max("temperature_2m_max (°C)").alias("weekly_max_temp")) \
    .orderBy("year", "month", "week")

print("--- Weekly Max Temps for Hottest Months ---")
q2_result.show()

--- Weekly Max Temps for Hottest Months ---
+----+-----+----+---------------+
|year|month|week|weekly_max_temp|
+----+-----+----+---------------+
|2010|    3|   9|           36.1|
|2010|    3|  10|           36.7|
|2010|    3|  11|           37.2|
|2010|    3|  12|           36.6|
|2010|    3|  13|           33.9|
|2011|    6|  22|           35.1|
|2011|    6|  23|           35.2|
|2011|    6|  24|           35.5|
|2011|    6|  25|           36.4|
|2011|    6|  26|           37.0|
|2012|    5|  18|           35.4|
|2012|    5|  19|           37.1|
|2012|    5|  20|           37.4|
|2012|    5|  21|           36.2|
|2012|    5|  22|           36.4|
|2013|    4|  14|           36.5|
|2013|    4|  15|           36.0|
|2013|    4|  16|           36.2|
|2013|    4|  17|           34.8|
|2013|    4|  18|           34.8|
+----+-----+----+---------------+
only showing top 20 rows


In [11]:


# Save Q6 Result
q1_result.coalesce(1).write.csv("percentage_of_total_shortwave_radiation.csv", header=True, mode="overwrite")

# Save Dataset
q2_result.coalesce(1).write.csv("weekly_max_temp_of_hottest_month_of_an_year.csv", header=True, mode="overwrite")