In [4]:
#Author - Ameya Deshmukh
#Date - 03/25/2024
#Assignment - BigData

#install pyspark
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=ff47bfc5fbfeeac60e96c97ab6b8ecaba6a69998389df2f95594e9681cf01bfb
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, max, min, avg, count, countDistinct, lit, sum, when, year, month, dayofmonth, stddev, expr, first, row_number
from pyspark.sql.window import Window
from pyspark.sql.types import DateType
import pyspark.sql.functions as F

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Weather Data Analysis") \
    .getOrCreate()

print('Spark Session Initialized')

Spark Session Initialized


In [7]:
import zipfile
import os

# Path to the zip file
zip_file_path = 'data.zip'

# Directory where the zip file will be extracted
extract_to_dir = 'data'

# Check if the extraction directory exists, create if it doesn't
if not os.path.exists(extract_to_dir):
    os.makedirs(extract_to_dir)

# Open the zip file
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    # Extract all the contents into the directory
    zip_ref.extractall(extract_to_dir)

print(f"Extracted all files in {zip_file_path} to {extract_to_dir}")


Extracted all files in data.zip to data


In [8]:
# Assuming the data.zip has been extracted into a folder named 'data'
weather_df = spark.read.csv("data/data/*/*.csv", header=True, inferSchema=True)

# Show the DataFrame schema to understand your data
weather_df.printSchema()


root
 |-- STATION: long (nullable = true)
 |-- DATE: date (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- ELEVATION: double (nullable = true)
 |-- NAME: string (nullable = true)
 |-- TEMP: double (nullable = true)
 |-- TEMP_ATTRIBUTES: double (nullable = true)
 |-- DEWP: double (nullable = true)
 |-- DEWP_ATTRIBUTES: double (nullable = true)
 |-- SLP: double (nullable = true)
 |-- SLP_ATTRIBUTES: double (nullable = true)
 |-- STP: double (nullable = true)
 |-- STP_ATTRIBUTES: double (nullable = true)
 |-- VISIB: double (nullable = true)
 |-- VISIB_ATTRIBUTES: double (nullable = true)
 |-- WDSP: double (nullable = true)
 |-- WDSP_ATTRIBUTES: double (nullable = true)
 |-- MXSPD: double (nullable = true)
 |-- GUST: double (nullable = true)
 |-- MAX: double (nullable = true)
 |-- MAX_ATTRIBUTES: string (nullable = true)
 |-- MIN: double (nullable = true)
 |-- MIN_ATTRIBUTES: string (nullable = true)
 |-- PRCP: double (nullable = true)

In [9]:
# Filter out missing values and add a column for the year
filtered_weather_df = weather_df.filter(
    (weather_df["MAX"] != 9999.9) &
    (weather_df["MIN"] != 9999.9) &
    (weather_df["PRCP"] != 99.99) &
    (weather_df["TEMP"] != 9999.9)
).withColumn("YEAR", year("DATE"))

In [10]:
# Define a window specification partitioned by 'YEAR' with a descending order by the 'MAX' temperature
year_partition = Window.partitionBy("YEAR").orderBy((filtered_weather_df["MAX"]).desc())

# Assign row numbers to identify the highest temperature record per year
highest_temp_by_year = filtered_weather_df.withColumn("rank", row_number().over(year_partition)) \
    .filter(col("rank") == 1) \
    .drop("rank")

# Reordering selected columns and displaying the results in descending order by 'DATE'
highest_temp_by_year.select("STATION", "NAME", "DATE", "MAX").orderBy("DATE",ascending=False).show()


+-----------+--------------------+----------+----+
|    STATION|                NAME|      DATE| MAX|
+-----------+--------------------+----------+----+
| 2095099999|          PAJALA, SW|2022-07-01|85.5|
| 1065099999|        KARASJOK, NO|2021-07-05|88.3|
| 1023099999|       BARDUFOSS, NO|2020-06-22|79.9|
| 1023099999|       BARDUFOSS, NO|2019-07-21|78.8|
| 1025099999|          TROMSO, NO|2018-07-29|84.2|
| 1023099999|       BARDUFOSS, NO|2017-06-09|78.6|
| 1023199999|         DRAUGEN, NO|2016-07-21|77.0|
| 1025099999|          TROMSO, NO|2015-07-30|71.6|
| 1023099999|       BARDUFOSS, NO|2014-07-10|89.6|
| 1001499999|      SORSTOKKEN, NO|2013-08-02|80.6|
| 1023099999|       BARDUFOSS, NO|2012-07-05|72.0|
| 1046099999|       SORKJOSEN, NO|2011-07-09|87.8|
|99407099999|DESTRUCTION IS. W...|2010-08-15|74.8|
+-----------+--------------------+----------+----+



In [11]:
# Find the coldest day in January
coldest_day_in_jan = filtered_weather_df.withColumn("MONTH", month("DATE")) \
    .filter(col("MONTH") == 1) \
    .select("STATION", "NAME", "DATE", "MIN") \
    .orderBy("MIN").limit(1)

coldest_day_in_jan.show()

+----------+-------------+----------+-----+
|   STATION|         NAME|      DATE|  MIN|
+----------+-------------+----------+-----+
|1023099999|BARDUFOSS, NO|2017-01-05|-28.3|
+----------+-------------+----------+-----+



In [12]:
# Filtering data for the year 2015
weather_data_2015 = filtered_weather_df.filter(col("YEAR") == 2015)

# Compute highest and lowest rainfall amounts in 2015
peak_rainfall = weather_data_2015.agg(max("PRCP").alias("HIGHEST_RAIN")).collect()[0]["HIGHEST_RAIN"]
least_rainfall = weather_data_2015.agg(min("PRCP").alias("LOWEST_RAIN")).collect()[0]["LOWEST_RAIN"]

# Retrieve station information and date for the highest rainfall
peak_rainfall_details = weather_data_2015.filter(col("PRCP") == peak_rainfall).select("STATION", "NAME", "DATE", "PRCP")

# Retrieve station information and date for the lowest rainfall
least_rainfall_details = weather_data_2015.filter(col("PRCP") == least_rainfall).select("STATION", "NAME", "DATE", "PRCP").limit(1)

# Displaying the findings
print("Highest Rainfall for 2015:")
peak_rainfall_details.show()

print("Lowest Rainfall for 2015:")
least_rainfall_details.show()


Highest Rainfall for 2015:
+----------+----------+----------+----+
|   STATION|      NAME|      DATE|PRCP|
+----------+----------+----------+----+
|1025099999|TROMSO, NO|2015-11-02|2.11|
+----------+----------+----------+----+

Lowest Rainfall for 2015:
+----------+------------+----------+----+
|   STATION|        NAME|      DATE|PRCP|
+----------+------------+----------+----+
|1008099999|LONGYEAR, SV|2015-01-01| 0.0|
+----------+------------+----------+----+



In [13]:
# Evaluate the fraction of unavailable wind gust readings for 2019
gust_absent_2019 = weather_df.filter(year("DATE") == 2019) \
    .select((count(when(col("GUST") == 999.9, True)) / count("*") * 100).alias("PERCENTAGE MISSING GUST"))

# Display the percentage of missing gust data in 2019
gust_absent_2019.show()


+-----------------------+
|PERCENTAGE MISSING GUST|
+-----------------------+
|      82.87671232876713|
+-----------------------+



In [14]:
# Select weather data for 2020
climate_data_2020 = filtered_weather_df.filter(year("DATE") == 2020)

# Add a month column based on the date
climate_data_2020 = climate_data_2020.withColumn("Month", month("DATE"))

# Compute average, median, mode, and standard deviation of temperature for each month
monthly_temperature_stats_2020 = climate_data_2020.groupBy("Month") \
    .agg(avg("TEMP").alias("Average Temperature"),
         expr("percentile_approx(TEMP, 0.5)").alias("Median Temperature"),
         expr("sort_array(collect_list(TEMP))[(cast(size(collect_list(TEMP)) as int) + 1) DIV 2]").alias("Mode Temperature"),
         stddev("TEMP").alias("Temperature Stddev")) \
    .orderBy("Month")

# Display the monthly temperature statistics for 2020
monthly_temperature_stats_2020.show()


+-----+-------------------+------------------+----------------+------------------+
|Month|Average Temperature|Median Temperature|Mode Temperature|Temperature Stddev|
+-----+-------------------+------------------+----------------+------------------+
|    1| 15.210169491525424|              14.7|            14.9|12.653031460610185|
|    2| 13.577358490566038|              15.3|            15.5|13.186832615404859|
|    3| 15.023333333333335|              18.6|            19.9|15.829465837499535|
|    4| 23.329999999999995|              26.0|            28.6|13.022097256170087|
|    5|  36.21935483870968|              36.0|            36.1| 8.077246704851957|
|    6| 47.429999999999986|              46.0|            46.2| 8.877190347997288|
|    7|  52.88709677419355|              51.4|            51.7| 6.663787232915164|
|    8| 49.376666666666665|              48.7|            49.0| 6.615066692379813|
|    9|  40.92727272727273|              39.0|            43.0| 8.161138512375697|
|   

In [15]:
import pandas as pd
from tabulate import tabulate

highest_temp_by_year_df = highest_temp_by_year.select("STATION", "NAME", "DATE", "MAX").orderBy("DATE", ascending=False)

# Pandas DataFrame Conversion
highest_temp_by_year_pd = highest_temp_by_year.toPandas()
coldest_day_in_jan_pd = coldest_day_in_jan.toPandas()
peak_rainfall_details_pd = peak_rainfall_details.toPandas()
least_rainfall_details_pd = least_rainfall_details.toPandas()
gust_absent_2019_pd = gust_absent_2019.toPandas()
monthly_temperature_stats_2020_pd = monthly_temperature_stats_2020.toPandas()

# Define the file path
file_path = "/result.txt"

# Formatting the output file
with open(file_path, "w") as f:
    f.write("TASK 1:\n")
    f.write("The hottest day for each year.\n")
    f.write(tabulate(highest_temp_by_year_pd, headers='keys', tablefmt='psql', showindex=False))
    f.write("\n\nTASK 2:\n")
    f.write("The coldest day for the month of January across all years.\n")
    f.write(tabulate(coldest_day_in_jan_pd, headers='keys', tablefmt='psql', showindex=False))
    f.write("\n\nTASK 3:\n")
    f.write("Maximum Precipitation for 2015.\n")
    f.write(tabulate(peak_rainfall_details_pd, headers='keys', tablefmt='psql', showindex=False))
    f.write("\nMinimum Precipitation for 2015.\n")
    f.write(tabulate(least_rainfall_details_pd, headers='keys', tablefmt='psql', showindex=False))
    f.write("\n\nTASK 4:\n")
    f.write("Percentage of missing values for wind gust for the year 2019.\n")
    f.write(tabulate(gust_absent_2019_pd, headers='keys', tablefmt='psql', showindex=False))
    f.write("\n\nTASK 5:\n")
    f.write("The mean, median, mode and standard deviation of the temperature for each month for the year 2020.\n")
    f.write(tabulate(monthly_temperature_stats_2020_pd, headers='keys', tablefmt='psql', showindex=False))