In [236]:
#chittitu ipynb file

In [237]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
import base64

if SparkContext._active_spark_context:
    SparkContext._active_spark_context.stop()

spark = SparkSession.builder.appName("WeatherDataAnalysis").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

In [238]:
#2 - Load the CSV files and display the count of each dataset 

In [239]:
df = spark.read.option("header", "true").csv("*/*.csv")
file_count = len(df.inputFiles())

print(f"Total number of files: {file_count}")

Total number of files: 19


In [240]:
years = range(2015, 2025)
cincinnati_path = [f"{year}/72429793812.csv" for year in years]
florida_path = [f"{year}/99495199999.csv" for year in years if year != 2016]

data_cin = spark.read.option("header", "true").option("inferSchema", "true").csv(cincinnati_path)
data_flo = spark.read.option("header", "true").option("inferSchema", "true").csv(florida_path)

print("Cincinnati dataset count:", data_cin.count())
print("Florida dataset count:", data_flo.count())


Cincinnati dataset count: 3585
Florida dataset count: 2483


In [241]:
from pyspark.sql.functions import input_file_name, regexp_extract

cin = spark.read.option("header", "true").option("inferSchema", "true").csv("*/72429793812.csv") 
cin = cin.withColumn("Year", input_file_name())
cin = cin.withColumn("Year", regexp_extract("Year", r"(\d{4})", 1))

flo = spark.read.option("header", "true").option("inferSchema", "true").csv("*/99495199999.csv") 
flo = flo.withColumn("Year", input_file_name())
flo = flo.withColumn("Year", regexp_extract("Year", r"(\d{4})", 1))

print("Count for each dataset from Cincinnati:")
cin.groupBy("Year").count().orderBy("Year").show()
print("Count for each dataset from Florida:")
flo.groupBy("Year").count().orderBy("Year").show()

Count for each dataset from Cincinnati:
+----+-----+
|Year|count|
+----+-----+
|2015|  365|
|2016|  366|
|2017|  365|
|2018|  365|
|2019|  365|
|2020|  366|
|2021|  365|
|2022|  365|
|2023|  365|
|2024|  298|
+----+-----+

Count for each dataset from Florida:
+----+-----+
|Year|count|
+----+-----+
|2015|  355|
|2017|  283|
|2018|  363|
|2019|  345|
|2020|  365|
|2021|  104|
|2022|  259|
|2023|  276|
|2024|  133|
+----+-----+



In [242]:
#3 - Find the hottest day (column MAX) for each year 

In [243]:
from pyspark.sql.functions import year, col, date_format

for year_val in range(2015, 2025):
    directory_path = f"{year_val}"
    csv_file1 = "/72429793812.csv"  
    csv_file2 = "/99495199999.csv" 

    path_to_file1 = f"{directory_path}{csv_file1}"
    path_to_file2 = f"{directory_path}{csv_file2}"

    try:
        data_cin2 = spark.read.option("header", "true").option("inferSchema", "true").csv(path_to_file1)
        data_cin2 = data_cin2.withColumn("Year", year(col("DATE")))
        max_cin_row = data_cin2.filter((data_cin2["Year"] == year_val) & (data_cin2["MAX"] != 9999.9)) \
                                   .orderBy(col("MAX").desc()) \
                                   .first()
        
        if year_val==2016:
             print(f"Year: {year_val}")
             print("Station Code: 72429793812")
             print("Station Name: CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US")
             reversed_cin_date = data_cin2.select(date_format(col("DATE"), "dd-MM-yyyy")).filter(col("DATE") == max_cin_row['DATE']).first()[0]
             print(f"Hottest Date: {reversed_cin_date}")
             print(f"Highest Temperature: {max_cin_row['MAX']}")
             print("-" * 30)
        else:
            data_flo = spark.read.option("header", "true").option("inferSchema", "true").csv(path_to_file2)
            data_flo = data_flo.withColumn("Year", year(col("DATE")))
            max_flo_row = data_flo.filter((data_flo["Year"] == year_val) & (data_flo["MAX"] != 9999.9)) \
                                       .orderBy(col("MAX").desc()) \
                                       .first()
            
            if max_cin_row < max_flo_row:
                print(f"Year: {year_val}")
                print("Station Code: 72429793812")
                print("Station Name: CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US")
                reversed_cin_date = data_cin2.select(date_format(col("DATE"), "dd-MM-yyyy")).filter(col("DATE") == max_cin_row['DATE']).first()[0]
                print(f"Hottest Date: {reversed_cin_date}")
                print(f"Highest Temperature: {max_cin_row['MAX']}")
                if year_val < 2024:
                    print("-" * 30)
            else:
                print(f"Year: {year_val}")
                print("Station Code: 99495199999")
                print("Station Name: SEBASTIAN INLET STATE PARK, FL US")
                reversed_cin_date = data_cin2.select(date_format(col("DATE"), "dd-MM-yyyy")).filter(col("DATE") == max_cin_row['DATE']).first()[0]
                print(f"Hottest Date: {reversed_cin_date}")
                print(f"Highest Temperature: {max_flo_row['MAX']}")
                if year_val < 2024:
                    print("-" * 30)
            
    except Exception as e:
        print(f"Error processing files for year {year_val}: {e}")


Year: 2015
Station Code: 72429793812
Station Name: CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US
Hottest Date: 12-06-2015
Highest Temperature: 91.9
------------------------------
Year: 2016
Station Code: 72429793812
Station Name: CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US
Hottest Date: 24-07-2016
Highest Temperature: 93.9
------------------------------
Year: 2017
Station Code: 72429793812
Station Name: CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US
Hottest Date: 22-07-2017
Highest Temperature: 91.9
------------------------------
Year: 2018
Station Code: 72429793812
Station Name: CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US
Hottest Date: 04-07-2018
Highest Temperature: 96.1
------------------------------
Year: 2019
Station Code: 72429793812
Station Name: CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US
Hottest Date: 30-09-2019
Highest Temperature: 95.0
------------------------------
Year: 2020
Station Code: 72429793812
Station Name: CINCINNATI MUNICIPAL AIRPORT LUNKEN FIE

In [244]:
#4 - Find the coldest day (column MIN) for the month of March across all years (2015-2024) 

In [245]:
from pyspark.sql.functions import col, year, month, mean, stddev, count, when, date_format, lit
coldest_march_cin = data_cin.filter((month(col("DATE")) == 3) & (col("MIN") != 9999.9)) \
    .orderBy(col("MIN").asc()) \
    .select("STATION", "NAME", date_format("DATE", "dd-MM-yyyy").alias("Formatted_Date"), "MIN").first()

coldest_march_flo = data_flo.filter((month(col("DATE")) == 3) & (col("MIN") != 9999.9)) \
    .orderBy(col("MIN").asc()) \
    .select("STATION", "NAME", date_format("DATE", "dd-MM-yyyy").alias("Formatted_Date"), "MIN").first()
if coldest_march_cin < coldest_march_flo:
    print("Coldest day in March across all years:")
    print(f"Station Code: {coldest_march_cin['STATION']}")
    print(f"Station Name: {coldest_march_cin['NAME']}")
    print(f"Coldest Date: {coldest_march_cin['Formatted_Date']}")
    print(f"Lowest Temperature: {coldest_march_cin['MIN']}")
else:
    print("Coldest day in March across all years:")
    print(f"Station Code: {coldest_march_flo['STATION']}")
    print(f"Station Name: {coldest_march_flo['NAME']}")
    print(f"Coldest Date: {coldest_march_flo['Formatted_Date']}")
    print(f"Lowest Temperature: {coldest_march_flo['MIN']}")

Coldest day in March across all years:
Station Code: 72429793812
Station Name: CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US
Coldest Date: 06-03-2015
Lowest Temperature: 3.2


In [246]:
#5 - Find the year with the most precipitation for Cincinnati and Florida 

In [247]:
precipitation_data_cin = data_cin.groupBy(year("DATE").alias("Year")).agg(mean("PRCP").alias("Average_PRCP"))
max_precipitation_year_cin = precipitation_data_cin.orderBy(col("Average_PRCP").desc()).first()

precipitation_data_flo = data_flo.groupBy(year("DATE").alias("Year")).agg(mean("PRCP").alias("Average_PRCP"))
max_precipitation_year_flo = precipitation_data_flo.orderBy(col("Average_PRCP").desc()).first()

print(f"The most precipitation for Cincinnati was {max_precipitation_year_cin['Average_PRCP']:.2f} in the year {max_precipitation_year_cin['Year']}")
if ({max_precipitation_year_flo['Average_PRCP']}==0):
    print("The precipitation for Florida was the same for years 2015-2024 with a precipitation of 0")
else:
    print(f"The most precipitation for Florida was {max_precipitation_year_flo['Average_PRCP']:.2f} in the year {max_precipitation_year_flo['Year']}")

The most precipitation for Cincinnati was 5.49 in the year 2024
The most precipitation for Florida was 0.00 in the year 2024


In [248]:
#6 - Count the percentage of missing values for wind gust (column GUST) for Cincinnati and Florida in the year 2024

In [249]:
from pyspark.sql.functions import col

cinci_2024 = spark.read.option("header", "true").csv("2024/72429793812.csv") 
florida_2024 = spark.read.option("header", "true").csv("2024/99495199999.csv")

cinci_total = cinci_2024.count()
florida_total = florida_2024.count()

cinci_missing_gust = cinci_2024.filter(col("GUST") == 999.9).count()
florida_missing_gust = florida_2024.filter(col("GUST") == 999.9).count()

cinci_missing_percent = (cinci_missing_gust / cinci_total) * 100
florida_missing_percent = (florida_missing_gust / florida_total) * 100

print(f"Cincinnati 2024 - Missing GUST %: {cinci_missing_percent:0.2f}")
print(f"Florida 2024 - Missing GUST %: {florida_missing_percent:0.2f}")

Cincinnati 2024 - Missing GUST %: 39.60
Florida 2024 - Missing GUST %: 100.00


In [250]:
#7 - Find the mean, median, mode, and standard deviation of the temperature (column TEMP) for Cincinnati in each month for the year 2020 

In [251]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, month, avg, stddev, expr, approx_count_distinct

df = spark.read.option("header", "true").csv("2020/72429793812.csv")
df = df.withColumn("TEMP", col("TEMP").cast("float")).withColumn("DATE", col("DATE").cast("date"))

results = df.withColumn("Month", month("DATE")) \
    .groupBy("Month") \
    .agg(
        avg("TEMP").alias("Mean_TEMP"),
        stddev("TEMP").alias("StdDev_TEMP"),
        expr("percentile_approx(TEMP, 0.5)").alias("Median_TEMP"),  
        expr("mode(TEMP)").alias("Mode_TEMP") 
    ) \
    .orderBy("Month")

results.show(12)

+-----+------------------+------------------+-----------+---------+
|Month|         Mean_TEMP|       StdDev_TEMP|Median_TEMP|Mode_TEMP|
+-----+------------------+------------------+-----------+---------+
|    1|37.945161081129505| 8.345810838316384|       37.7|     24.7|
|    2| 36.58965525133856| 7.901597947537755|       36.0|     30.8|
|    3|  49.0741934007214|  8.77940669347644|       47.8|     47.8|
|    4| 51.77999992370606|7.3131621276074465|       51.0|     46.8|
|    5| 60.89032290058751| 9.314768319579512|       63.7|     73.9|
|    6| 72.54666570027669|4.8999458590264515|       73.7|     74.2|
|    7|  77.6000001968876| 2.337947626620972|       77.9|     78.4|
|    8| 73.34516143798828|3.4878690606063563|       73.7|     67.4|
|    9| 66.09999961853028| 7.118261579669542|       65.8|     72.7|
|   10| 55.19354851015152|6.7286914818367975|       54.0|     61.6|
|   11| 48.00333340962728| 6.825938707865554|       47.7|     47.7|
|   12| 35.99354830095845|6.6427872766495755|   

In [252]:
#8 - Find the top 10 days with the lowest Wind Chill for Cincinnati in 2017

In [253]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr

df = spark.read.option("header", "true").csv("2017/72429793812.csv")

df = df.withColumn("TEMP", col("TEMP").cast("float")).withColumn("WDSP", col("WDSP").cast("float")).withColumn("DATE", col("DATE").cast("date"))

df_filtered = df.filter((col("STATION") == "72429793812") & (col("DATE").between("2017-01-01", "2017-12-31")) & (col("TEMP") < 50) & (col("WDSP") > 3))

df_wind_chill = df_filtered.withColumn(
    "Wind_Chill",
    35.74 + (0.6215 * col("TEMP")) - (35.75 * (col("WDSP")**0.16)) + (0.4275 * col("TEMP") * (col("WDSP")**0.16))
)

top_10_lowest_wind_chill = df_wind_chill.select("DATE", "STATION", "NAME", "TEMP", "WDSP", "Wind_Chill") \
    .orderBy("Wind_Chill") \
    .limit(10)

top_10_lowest_wind_chill.show()

+----------+-----------+--------------------+----+----+-------------------+
|      DATE|    STATION|                NAME|TEMP|WDSP|         Wind_Chill|
+----------+-----------+--------------------+----+----+-------------------+
|2017-01-07|72429793812|CINCINNATI MUNICI...|10.5| 7.0|-0.4140156367932173|
|2017-12-31|72429793812|CINCINNATI MUNICI...|11.0| 5.3| 2.0339764741541018|
|2017-12-27|72429793812|CINCINNATI MUNICI...|13.0| 5.8| 3.8206452986638073|
|2017-12-28|72429793812|CINCINNATI MUNICI...|13.6| 5.8|  4.533355513517824|
|2017-01-06|72429793812|CINCINNATI MUNICI...|13.6| 5.5|  4.868933492954463|
|2017-01-08|72429793812|CINCINNATI MUNICI...|15.9| 5.2|  7.929747979856229|
|2017-12-25|72429793812|CINCINNATI MUNICI...|25.8|13.5| 14.285112249501509|
|2017-12-30|72429793812|CINCINNATI MUNICI...|21.6| 5.3| 14.539211503699956|
|2017-01-05|72429793812|CINCINNATI MUNICI...|22.2| 5.8| 14.748862551376547|
|2017-12-26|72429793812|CINCINNATI MUNICI...|23.3| 6.2| 15.688977064714743|
+----------+

In [254]:
#9 - Investigate how many days had extreme weather conditions for Florida (fog, rain, snow, etc.) using the FRSHTT column

In [255]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# for rows with any extreme condition occurred in FRSHTT
extreme_weather_days = data_flo.filter(
    (col("FRSHTT").substr(1, 1) == '1') |  #for fog
    (col("FRSHTT").substr(2, 1) == '1') |  #for rain or drizzle
    (col("FRSHTT").substr(3, 1) == '1') |  #for snow
    (col("FRSHTT").substr(4, 1) == '1') |  #for hail
    (col("FRSHTT").substr(5, 1) == '1') |  #for thunder
    (col("FRSHTT").substr(6, 1) == '1')    #for tornado or funnel cloud
)

extreme_weather_count = extreme_weather_days.count()
print(f"Number of days with extreme weather conditions in Florida: {extreme_weather_count}")

Number of days with extreme weather conditions in Florida: 0


In [256]:
extreme_weather_florida = data_flo.filter((col("FRSHTT") != "000000")).count()
print("Days with extreme weather in Florida:", extreme_weather_florida)

Days with extreme weather in Florida: 0


In [257]:
#10 - Predict the maximum Temperature for Cincinnati for November and December 2024, based on the previous 2 years of weather data

In [258]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Load datasets for 2022, 2023, and 2024
data_2022 = spark.read.csv("2022/72429793812.csv", header=True, inferSchema=True)
data_2023 = spark.read.csv("2023/72429793812.csv", header=True, inferSchema=True)
data_2024 = spark.read.csv("2024/72429793812.csv", header=True, inferSchema=True)

# Combine datasets into one DataFrame
data_cin1 = data_2022.union(data_2023).union(data_2024)

# Filter out rows with missing "MAX" values (where MAX == 9999.9)
data_cin_filtered = data_cin1.filter(col("MAX") != 9999.9)

# Add Year and Month columns based on the DATE column
data_cin_filtered = data_cin_filtered.withColumn("Year", year(col("DATE"))).withColumn("Month", month(col("DATE")))

# Filter data for November and December of 2022 and 2023
nov_dec_data = data_cin_filtered.filter(col("Month").isin(11, 12)).select("MAX", "Year", "Month")

# Assemble features for linear regression (Year and Month)
assembler = VectorAssembler(inputCols=["Year", "Month"], outputCol="features")
assembled_data = assembler.transform(nov_dec_data.na.drop())

# Split data for training (only use data before 2024)
train_data = assembled_data.filter(col("Year") < 2024)

# Initialize and fit the Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol="MAX")
model = lr.fit(train_data)

# Create test data for November and December 2024 with different months
nov_test_data = spark.createDataFrame([(2024, 11)], ["Year", "Month"])
dec_test_data = spark.createDataFrame([(2024, 12)], ["Year", "Month"])

# Assemble features for the November and December test data
nov_assembled = assembler.transform(nov_test_data)
dec_assembled = assembler.transform(dec_test_data)

# Make predictions for November and December 2024
nov_predictions = model.transform(nov_assembled)
dec_predictions = model.transform(dec_assembled)

# Display predictions for November and December 2024
nov_predictions_with_watermark = nov_predictions.select("features", "prediction") \
    .withColumnRenamed("prediction", "November Prediction").show()

# Add watermark to December predictions
dec_predictions_with_watermark = dec_predictions.select("features", "prediction") \
    .withColumnRenamed("prediction", "December Prediction").show()

+-------------+-------------------+
|     features|November Prediction|
+-------------+-------------------+
|[2024.0,11.0]|    66.908455259736|
+-------------+-------------------+

+-------------+-------------------+
|     features|December Prediction|
+-------------+-------------------+
|[2024.0,12.0]| 57.475203227402744|
+-------------+-------------------+

