In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline

In [1]:
from pyspark.sql import SparkSession


spark = SparkSession.builder \
    .appName("WeatherDataAnalysis") \
    .config("spark.driver.extraJavaOptions", "-Djava.security.manager=allow") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/03 21:50:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/11/03 21:50:43 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [21]:
import os
from pyspark.sql.functions import year, to_date

# Define the base path for the data directory
base_path = os.path.expanduser('~/Documents/Project 4/WeatherData') 
stations = ['72429793812', '99495199999']

# Create an empty list to store results
data_counts = []

# Iterate through year directories in the base path
for year_dir in os.listdir(base_path):
    if year_dir.isdigit() and int(year_dir) in range(2015, 2025):  # Check if the directory name is a valid year
        year_path = os.path.join(base_path, year_dir)
        
        # Process CSV files in the year directory
        for station in stations:
            file_path = os.path.join(year_path, f"{station}.csv")
            if os.path.exists(file_path):
                try:
                    # Read the CSV file with inferSchema
                    df = spark.read.csv(file_path, header=True, inferSchema=True)
                    
                    # Get count and year from the first date in the dataset
                    count = df.count()
                    yr = df.select(year(to_date("DATE")).alias("year")).first()[0]  # Extract year from date column

                    # Add to results
                    data_counts.append((yr, station, count))
                except Exception as e:
                    print(f"Error processing {file_path}: {str(e)}")
                    data_counts.append((year_dir, station, 0))  # Append zero count on error
            else:
                data_counts.append((year_dir, station, 0))  # Append zero count if file does not exist

# Print the results
for record in data_counts:
    year, station, count = record
    print(f"Year: {year}, Station: {station}, Record Count: {count}")


Year: 2022, Station: 72429793812, Record Count: 365
Year: 2022, Station: 99495199999, Record Count: 259
Year: 2024, Station: 72429793812, Record Count: 296
Year: 2024, Station: 99495199999, Record Count: 133
Year: 2023, Station: 72429793812, Record Count: 365
Year: 2023, Station: 99495199999, Record Count: 276
Year: 2015, Station: 72429793812, Record Count: 365
Year: 2015, Station: 99495199999, Record Count: 355
Year: 2017, Station: 72429793812, Record Count: 365
Year: 2017, Station: 99495199999, Record Count: 283
Year: 2019, Station: 72429793812, Record Count: 365
Year: 2019, Station: 99495199999, Record Count: 345
Year: 2021, Station: 72429793812, Record Count: 365
Year: 2021, Station: 99495199999, Record Count: 104
Year: 2020, Station: 72429793812, Record Count: 366
Year: 2020, Station: 99495199999, Record Count: 365
Year: 2018, Station: 72429793812, Record Count: 365
Year: 2018, Station: 99495199999, Record Count: 363
Year: 2016, Station: 72429793812, Record Count: 366
Year: 2016, 

In [23]:
from functools import reduce
from pyspark.sql.functions import year, col, row_number

dfs = []

for yr in years:
    for station in stations:
        file_path = f"{base_path}/{yr}/{station}.csv"
        if os.path.exists(file_path):
            df = spark.read.csv(file_path, header=True)
            dfs.append(df)

from pyspark.sql import DataFrame

all_data = reduce(DataFrame.unionAll, dfs)

all_data = all_data.withColumn('YEAR', year(col('DATE'))) \
                   .withColumn('MAX', col('MAX').cast('float')) \
                   .withColumn('MIN', col('MIN').cast('float')) \
                   .filter((col('MAX') != 9999.9) & (col('MAX').isNotNull()))

from pyspark.sql.window import Window

windowSpec = Window.partitionBy('YEAR').orderBy(col('MAX').desc())

hottest_days = all_data.withColumn('rank', row_number().over(windowSpec)) \
                       .filter(col('rank') == 1) \
                       .select('YEAR', 'STATION', 'NAME', 'DATE', 'MAX')

hottest_days.show(10, False)


+----+-----------+------------------------------------------------+----------+------+
|YEAR|STATION    |NAME                                            |DATE      |MAX   |
+----+-----------+------------------------------------------------+----------+------+
|2015|72429793812|CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US|2015-06-12|91.9  |
|2016|72429793812|CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US|2016-07-24|93.9  |
|2017|99495199999|SEBASTIAN INLET STATE PARK, FL US               |2017-02-22|9999.9|
|2018|72429793812|CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US|2018-07-04|96.1  |
|2019|72429793812|CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US|2019-09-30|95.0  |
|2020|72429793812|CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US|2020-07-05|93.9  |
|2021|72429793812|CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US|2021-08-12|95.0  |
|2022|72429793812|CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US|2022-12-23|9999.9|
|2023|72429793812|CINCINNATI MUNICIPAL AIRPORT LUNKEN 

In [35]:
from pyspark.sql.functions import month, col

# Filter data for March and clean the MIN column
march_weather_data = all_data.filter(month(col('DATE')) == 3) \
                             .withColumn('Minimum_Temperature', col('MIN').cast('float')) \
                             .filter((col('Minimum_Temperature') != 9999.9) & (col('Minimum_Temperature').isNotNull()))

# Find the coldest day in March
coldest_record = march_weather_data.orderBy(col('Minimum_Temperature').asc()).limit(1) \
                                    .select('STATION', 'NAME', 'DATE', 'Minimum_Temperature')

# Display the result
coldest_record.show(1, False)


+-----------+------------------------------------------------+----------+-------------------+
|STATION    |NAME                                            |DATE      |Minimum_Temperature|
+-----------+------------------------------------------------+----------+-------------------+
|72429793812|CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US|2015-03-06|3.2                |
+-----------+------------------------------------------------+----------+-------------------+



In [51]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

valid_precipitation = all_data.withColumn('PRCP', col('PRCP').cast('float')) \
                              .filter((col('PRCP').isNotNull()) & 
                                      (col('PRCP') != 99.99) & 
                                      (col('PRCP') != 999.9))

precipitation_data = valid_precipitation.groupBy('STATION', 'NAME', 'YEAR') \
                                        .agg(F.sum('PRCP').alias('Total_PRCP'))

windowSpec = Window.partitionBy('STATION').orderBy(col('Total_PRCP').desc())

max_precipitation = precipitation_data.withColumn('rank', row_number().over(windowSpec)) \
                                      .filter(col('rank') == 1) \
                                      .select('STATION', 'NAME', 'YEAR', 'Total_PRCP')

max_precipitation.show(2, False)


+-----------+------------------------------------------------+----+------------------+
|STATION    |NAME                                            |YEAR|Total_PRCP        |
+-----------+------------------------------------------------+----+------------------+
|72429793812|CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US|2024|1636.1299657933414|
|99495199999|SEBASTIAN INLET STATE PARK, FL US               |2015|0.0               |
+-----------+------------------------------------------------+----+------------------+



In [7]:

# Function to calculate missing percentage for a given DataFrame
def calculate_missing_percentage(data, station_name):
    total_count = data.count()
    missing_count = data.filter(col("GUST").isNull() | (col("GUST") == missing_value)).count()
    missing_percentage = (missing_count / total_count) * 100 if total_count > 0 else 0
    print(f"Percentage of missing values for {station_name} (GUST): {missing_percentage:.2f}%")

# Calculate and print missing percentage for Cincinnati
if cincinnati_data_2024:
    calculate_missing_percentage(cincinnati_data_2024, "Cincinnati")

# Calculate and print missing percentage for Florida
if florida_data_2024:
    calculate_missing_percentage(florida_data_2024, "Florida")

# Additional analysis for all_data if required (assuming 'all_data' is defined)
data_2024 = all_data.filter(col('YEAR') == 2024).withColumn('GUST', col('GUST').cast('float'))

# Calculate total counts per station
total_counts = data_2024.groupBy('STATION').agg(count('*').alias('Total_Count'))

# Calculate missing counts where GUST is null or equals 999.9
missing_counts = data_2024.filter((col('GUST').isNull()) | (col('GUST') == missing_value)) \
                              .groupBy('STATION') \
                              .agg(count('*').alias('Missing_Count'))

# Join and calculate missing percentage for all data
joined_counts = total_counts.join(missing_counts, on='STATION', how='left') \
                            .fillna(0, subset=['Missing_Count'])

percentage_missing = joined_counts.withColumn('Missing_Percentage', 
                                              (col('Missing_Count') / col('Total_Count')) * 100) \
                                  .select('STATION', 'Missing_Percentage')

# Display the results for all_data
percentage_missing.show()


Percentage of missing values for Cincinnati (GUST): 39.53%
Percentage of missing values for Florida (GUST): 100.00%


In [59]:
cincinnati_2020 = all_data.filter((col('STATION') == '72429793812') & (col('YEAR') == 2020)) \
                          .withColumn('TEMP', col('TEMP').cast('float')) \
                          .withColumn('MONTH', month(col('DATE'))) \
                          .filter((col('TEMP') != 9999.9) & (col('TEMP').isNotNull()))


In [67]:
from pyspark.sql.functions import mean, stddev, expr

stats = cincinnati_2020.groupBy('MONTH') \
                       .agg(mean('TEMP').alias('Mean_TEMP'),
                            expr('percentile_approx(TEMP, 0.5)').alias('Median_TEMP'),
                            stddev('TEMP').alias('StdDev_TEMP')) \
                       .orderBy('MONTH')




In [69]:
from pyspark.sql.functions import count

mode_temp = cincinnati_2020.groupBy('MONTH', 'TEMP') \
                           .agg(count('TEMP').alias('Count')) \
                           .withColumn('rn', row_number().over(Window.partitionBy('MONTH').orderBy(col('Count').desc()))) \
                           .filter(col('rn') == 1) \
                           .select('MONTH', col('TEMP').alias('Mode_TEMP'))

final_stats = stats.join(mode_temp, on='MONTH')

final_stats.show(12, False)

+-----+------------------+-----------+------------------+---------+
|MONTH|Mean_TEMP         |Median_TEMP|StdDev_TEMP       |Mode_TEMP|
+-----+------------------+-----------+------------------+---------+
|1    |37.945161081129505|37.7       |8.345810838316384 |24.7     |
|2    |36.58965525133856 |36.0       |7.901597947537755 |25.9     |
|3    |49.0741934007214  |47.8       |8.77940669347644  |39.6     |
|4    |51.77999992370606 |51.0       |7.3131621276074465|49.4     |
|5    |60.89032290058751 |63.7       |9.314768319579512 |73.9     |
|6    |72.54666570027669 |73.7       |4.8999458590264515|70.7     |
|7    |77.6000001968876  |77.9       |2.337947626620972 |78.4     |
|8    |73.34516143798828 |73.7       |3.4878690606063563|78.3     |
|9    |66.09999961853028 |65.8       |7.118261579669542 |74.5     |
|10   |55.19354851015152 |54.0       |6.7286914818367975|52.2     |
|11   |48.00333340962728 |47.7       |6.825938707865554 |47.7     |
|12   |35.99354830095845 |35.2       |6.64278727

In [71]:
from pyspark.sql.functions import col, pow

# Filter the data for Cincinnati in 2017
cincinnati_2017 = all_data.filter((col('STATION') == '72429793812') & (col('YEAR') == 2017))

# Further filter the data based on the specified conditions
filtered_data = cincinnati_2017.filter((col('TEMP').cast('float') < 50) & 
                                        (col('WDSP').cast('float') > 3) &
                                        (col('TEMP') != 9999.9) & (col('WDSP') != 999.9))

# Calculate Wind Chill
wind_chill_data = filtered_data.withColumn('TEMP', col('TEMP').cast('float')) \
                               .withColumn('WDSP', col('WDSP').cast('float')) \
                               .withColumn('Wind_Chill', 35.74 + 0.6215 * col('TEMP') - 35.75 * pow(col('WDSP'), 0.16) + 0.4275 * col('TEMP') * pow(col('WDSP'), 0.16))

# Get the 10 records with the lowest Wind Chill, including TEMP and WDSP
lowest_wind_chill = wind_chill_data.orderBy(col('Wind_Chill').asc()) \
                                    .select('DATE', 'TEMP', 'WDSP', 'Wind_Chill') \
                                    .limit(10)

# Display the results
lowest_wind_chill.show(10, False)


+----------+----+----+-------------------+
|DATE      |TEMP|WDSP|Wind_Chill         |
+----------+----+----+-------------------+
|2017-01-07|10.5|7.0 |-0.4140156367932173|
|2017-12-31|11.0|5.3 |2.0339764741541018 |
|2017-12-27|13.0|5.8 |3.8206452986638073 |
|2017-12-28|13.6|5.8 |4.533355513517824  |
|2017-01-06|13.6|5.5 |4.868933492954463  |
|2017-01-08|15.9|5.2 |7.929747979856229  |
|2017-12-25|25.8|13.5|14.285112249501509 |
|2017-12-30|21.6|5.3 |14.539211503699956 |
|2017-01-05|22.2|5.8 |14.748862551376547 |
|2017-12-26|23.3|6.2 |15.688977064714743 |
+----------+----+----+-------------------+



In [73]:
from pyspark.sql.functions import lpad, col

# Filter data for Florida station
florida_station_data = all_data.filter(col('STATION') == '99495199999')

# Identify extreme weather conditions
weather_conditions = florida_station_data.filter(col('FRSHTT').isNotNull()) \
                                         .withColumn('FRSHTT', lpad(col('FRSHTT').cast('string'), 6, '0')) \
                                         .withColumn('IsExtreme', 
                                                     (col('FRSHTT').substr(1, 1) == '1') |  # Fog
                                                     (col('FRSHTT').substr(2, 1) == '1') |  # Rain
                                                     (col('FRSHTT').substr(3, 1) == '1') |  # Snow
                                                     (col('FRSHTT').substr(4, 1) == '1') |  # Hail
                                                     (col('FRSHTT').substr(5, 1) == '1') |  # Thunder
                                                     (col('FRSHTT').substr(6, 1) == '1'))   # Tornado

# Count the number of days with extreme weather
extreme_weather_days = weather_conditions.filter(col('IsExtreme')).count()

# Output the result
print(f"Total days with extreme weather conditions in Florida: {extreme_weather_days}")


Total days with extreme weather conditions in Florida: 0


In [75]:
import pandas as pd

# Generate prediction dates for November and December 2024
november_days = [(2024, 11, day) for day in range(1, 31)]
december_days = [(2024, 12, day) for day in range(1, 32)]
all_prediction_dates = november_days + december_days

# Create a DataFrame with the prediction dates
prediction_dataframe = pd.DataFrame(all_prediction_dates, columns=["YEAR", "MONTH", "DAY"])

# Specify the path to save the CSV file
csv_file_path = "prediction_dates.csv"  # Modify this path as needed
prediction_dataframe.to_csv(csv_file_path, index=False)

# Load the prediction dates from the CSV into a Spark DataFrame
prediction_spark_data = spark.read.option("header", "true").csv(csv_file_path)
prediction_spark_data.show()


+----+-----+---+
|YEAR|MONTH|DAY|
+----+-----+---+
|2024|   11|  1|
|2024|   11|  2|
|2024|   11|  3|
|2024|   11|  4|
|2024|   11|  5|
|2024|   11|  6|
|2024|   11|  7|
|2024|   11|  8|
|2024|   11|  9|
|2024|   11| 10|
|2024|   11| 11|
|2024|   11| 12|
|2024|   11| 13|
|2024|   11| 14|
|2024|   11| 15|
|2024|   11| 16|
|2024|   11| 17|
|2024|   11| 18|
|2024|   11| 19|
|2024|   11| 20|
+----+-----+---+
only showing top 20 rows



In [101]:
import os
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month, dayofmonth, to_timestamp, col
import pyspark.sql.functions as F

# Initialize Spark session
spark = SparkSession.builder.appName("WeatherDataAnalysis").getOrCreate()

# Define the base path for the data directory
base_path = os.path.expanduser('~/Documents/Project 4/WeatherData') 
stations = ['72429793812']

# Initialize variable to hold Cincinnati data
cincinnati_combined_data = None

# Iterate through year directories in the base path
for year_dir in os.listdir(base_path):
    if year_dir.isdigit() and int(year_dir) in range(2015, 2025):  # Check if the directory name is a valid year
        year_path = os.path.join(base_path, year_dir)
        
        # Process CSV files in the year directory
        for station in stations:
            file_path = os.path.join(year_path, f"{station}.csv")
            if os.path.exists(file_path):
                try:
                    # Read the CSV file with header and infer schema
                    yearly_data = spark.read.csv(file_path, header=True, inferSchema=True)

                    # Process the data to extract YEAR, MONTH, DAY, and cast MAX to float
                    yearly_data = yearly_data.withColumn("YEAR", year(to_timestamp(col("DATE")))) \
                                             .withColumn("MONTH", month(to_timestamp(col("DATE")))) \
                                             .withColumn("DAY", dayofmonth(to_timestamp(col("DATE")))) \
                                             .withColumn("MAX", col("MAX").cast("float")) \
                                             .filter(col("MONTH").isin(11, 12)) \
                                             .filter((col("MAX") < 9999.9) & (col("MAX").isNotNull()))

                    # Combine the yearly data into a single DataFrame
                    cincinnati_combined_data = yearly_data if cincinnati_combined_data is None else cincinnati_combined_data.union(yearly_data)
                except Exception as error:
                    print(f"Error loading {file_path}: {error}")
            else:
                print(f"File does not exist: {file_path}")

# Create a feature vector from YEAR, MONTH, and DAY
vector_assembler = VectorAssembler(inputCols=["YEAR", "MONTH", "DAY"], outputCol="features")
cincinnati_combined_data = vector_assembler.transform(cincinnati_combined_data).select("features", "MAX")

# Set up a Random Forest Regressor model in a pipeline
random_forest_model = RandomForestRegressor(featuresCol="features", labelCol="MAX")
model_pipeline = Pipeline(stages=[random_forest_model])

# Fit the model to the combined data
trained_model = model_pipeline.fit(cincinnati_combined_data)

# Load prediction data from the CSV file (make sure this path is correct)
prediction_data = spark.read.option("header", "true").csv(csv_file_path)

# Cast YEAR, MONTH, and DAY to integer types in the prediction DataFrame
prediction_data = prediction_data.withColumn("YEAR", col("YEAR").cast("int")) \
                                  .withColumn("MONTH", col("MONTH").cast("int")) \
                                  .withColumn("DAY", col("DAY").cast("int"))

# Transform the prediction data to create feature vectors
prediction_data = vector_assembler.transform(prediction_data)

# Make predictions using the trained model
predicted_results = trained_model.transform(prediction_data)

# Aggregate the maximum predictions by MONTH
max_monthly_predictions = predicted_results.groupBy("MONTH").agg(F.max("prediction").alias("Max_Prediction"))

# Show the results
max_monthly_predictions.show()


+-----+-----------------+
|MONTH|   Max_Prediction|
+-----+-----------------+
|   12|54.13378106595995|
|   11|66.55222876541171|
+-----+-----------------+

