# PROJECT 5: PySpark

**AUTHOR**: CAT LUONG (luongcn) <br>
**DATE**: 03/31/2024

## Libraries Import and Spark Intialization

In [67]:
import calendar

from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import max, min, month, col, mean, desc, stddev
from typing import Tuple

In [68]:
# Intialize spark session
spark=SparkSession.builder.appName("Practise").getOrCreate()
spark

## FUNCTIONS FOR EACH TASK

In [69]:
from pathlib import Path
data_path = Path("data")
files  = [f for f in data_path.rglob("*") if f.is_file() and f.parent.stem != '.ipynb_checkpoints']
print(files)

[WindowsPath('data/2010/01052099999.csv'), WindowsPath('data/2010/99407099999.csv'), WindowsPath('data/2011/01008099999.csv'), WindowsPath('data/2011/01046099999.csv'), WindowsPath('data/2012/01023099999.csv'), WindowsPath('data/2012/01044099999.csv'), WindowsPath('data/2013/01001499999.csv'), WindowsPath('data/2013/01008099999.csv'), WindowsPath('data/2014/01008099999.csv'), WindowsPath('data/2014/01023099999.csv'), WindowsPath('data/2015/01008099999.csv'), WindowsPath('data/2015/01025099999.csv'), WindowsPath('data/2016/01008099999.csv'), WindowsPath('data/2016/01023199999.csv'), WindowsPath('data/2017/01008099999.csv'), WindowsPath('data/2017/01023099999.csv'), WindowsPath('data/2018/01008099999.csv'), WindowsPath('data/2018/01025099999.csv'), WindowsPath('data/2019/01008099999.csv'), WindowsPath('data/2019/01023099999.csv'), WindowsPath('data/2020/01008099999.csv'), WindowsPath('data/2020/01023099999.csv'), WindowsPath('data/2021/01062099999.csv'), WindowsPath('data/2021/0106509999

In [70]:
def get_max_temp(file_1: Path, file_2: Path, column_name: str = 'MAX') -> Tuple[str, str]:   
    """
    Task 1: Find the hottest day (column MAX) for each year, and provide the corresponding station code,
    station name and the date (columns STATION, NAME, DATE).
    """
    year = file_1.parent.stem
    df_pyspark_1 = spark.read.option("header","true").csv(str(file_1))
    df_pyspark_2 = spark.read.option("header","true").csv(str(file_2))

    max_temp_df_1 = df_pyspark_1.select([column_name, 'STATION', 'NAME', 'DATE'])
    max_temp_df_2 = df_pyspark_2.select([column_name, 'STATION', 'NAME', 'DATE'])

    combined_max_temp_df = max_temp_df_1.unionByName(max_temp_df_2)
    combined_max_temp_df = combined_max_temp_df.filter(col(column_name) != '9999.9')

    # Initilize window specification
    w = Window.partitionBy()

    # Apply the max function using the window specification
    combined_max_temp_df = combined_max_temp_df.withColumn('max_temp', max(col(column_name).cast('float')).over(w))

    # Filter the DataFrame to get rows where TEMP equals max_temp
    filtered_df = combined_max_temp_df.filter(combined_max_temp_df[column_name] == combined_max_temp_df['max_temp'])

    # Show the filtered DataFrame
    filtered_df.select([column_name, 'STATION', 'NAME', 'DATE'])

    filtered_df = filtered_df.collect()
    
    max_val = f"MAX: {filtered_df[0][0]}, STATION: {filtered_df[0][1]}, NAME: {filtered_df[0][2]}, DATE: {filtered_df[0][3]}"

    return year, max_val

In [71]:
def get_min_temp_jan(file_1: Path, file_2: Path, column_name: str = 'MIN') -> Tuple[str, str, float]: 
    """
    Task 2: Find the coldest day (column MIN) for the month of January across all years (2010 - 2022) ,
    and provide the corresponding station code, station name and the date (columns STATION,
    NAME, DATE).
    """
    year = file_1.parent.stem
    df_pyspark_1 = spark.read.option("header","true").csv(str(file_1))
    df_pyspark_2 = spark.read.option("header","true").csv(str(file_2))

    max_temp_df_1 = df_pyspark_1.select([column_name, 'STATION', 'NAME', 'DATE'])
    max_temp_df_2 = df_pyspark_2.select([column_name, 'STATION', 'NAME', 'DATE'])

    combined_min_temp_df = max_temp_df_1.unionByName(max_temp_df_2)
    combined_min_temp_df = combined_min_temp_df.filter(col(column_name) != '9999.9')

    # Get the month of the date column
    combined_min_temp_df = combined_min_temp_df.withColumn('month', month(combined_min_temp_df['DATE']))

    # Filter the DataFrame to only include rows where the month is 3
    combined_min_temp_df = combined_min_temp_df.filter(combined_min_temp_df['month'] == 1)

    # Initilize window specification
    w = Window.partitionBy()

    # Apply the max function using the window specification
    combined_min_temp_df = combined_min_temp_df.withColumn('min_temp', min(col(column_name).cast('float')).over(w))

    # Filter the DataFrame to get rows where TEMP equals max_temp
    filtered_df = combined_min_temp_df.filter(combined_min_temp_df[column_name] == combined_min_temp_df['min_temp'])

    # Show the filtered DataFrame
    filtered_df.select([column_name, 'STATION', 'NAME', 'DATE'])

    filtered_df = filtered_df.collect()
    
    min_str = f"MIN: {filtered_df[0][0]}, STATION: {filtered_df[0][1]}, NAME: {filtered_df[0][2]}, DATE: {filtered_df[0][3]}"
    min_temp = float(filtered_df[0][0])
    
    return year, min_str, min_temp

In [72]:
def min_max_precipitation(file_1: Path, file_2: Path, column_name: str = 'PRCP') -> Tuple[str, str]:
    """
    Task 3: Maximum and Minimum precipitation (column PRCP ) for the year 2015, and provide the
    corresponding station code, station name and the date (columns STATION, NAME, DATE).
    """
    df_pyspark_1 = spark.read.option("header","true").csv(str(file_1))
    df_pyspark_2 = spark.read.option("header","true").csv(str(file_2))

    df_1 = df_pyspark_1.select(['PRCP', 'STATION', 'NAME', 'DATE'])
    df_2 = df_pyspark_2.select(['PRCP', 'STATION', 'NAME', 'DATE'])

    # Initilize window specification
    w = Window.partitionBy()

    combined_prcp_df = df_1.unionByName(df_2)
    combined_prcp_df = combined_prcp_df.filter(col(column_name) != '99.99')

    combined_prcp_df_max = combined_prcp_df.withColumn('max_prcp', max(col(column_name).cast('float')).over(w))
    max_df = combined_prcp_df_max.filter(combined_prcp_df_max[column_name] == combined_prcp_df_max['max_prcp']).collect()

    combined_prcp_df_min = combined_prcp_df.withColumn('min_prcp', min(col(column_name).cast('float')).over(w))
    min_df = combined_prcp_df_min.filter(combined_prcp_df_min[column_name] == combined_prcp_df_min['min_prcp']).collect()

    # Get max and min values of precipitation
    max_val = f"PRCP: {max_df[0][0]}, STATION: {max_df[0][1]}, NAME: {max_df[0][2]}, DATE: {max_df[0][3]}"
    min_val = f"PRCP: {min_df[0][0]}, STATION: {min_df[0][1]}, NAME: {min_df[0][2]}, DATE: {min_df[0][3]}"

    return max_val, min_val

In [73]:
def count_missing_values(file_1: Path, file_2: Path, column_name: str = 'GUST') -> float: 
    """
    Task 4:  Count percentage missing values for wind gust (column GUST) for the year 2019. 
    """
    df_pyspark_1 = spark.read.option("header","true").csv(str(file_1))
    df_pyspark_2 = spark.read.option("header","true").csv(str(file_2))

    rows_1 = df_pyspark_1.count()
    rows_2 = df_pyspark_2.count()

    missing_column_count_1 = df_pyspark_1.filter(col(column_name) == '999.9').count()
    missing_column_count_2 = df_pyspark_2.filter(col(column_name) == '999.9').count()

    return (missing_column_count_2 + missing_column_count_1) / (rows_1 + rows_2) * 100.0

In [74]:
def MMM_std_temp(file_1: Path, file_2: Path, column_name: str = 'TEMP') -> dict[str, list[str, str, str, str]]:
    """
    Task 5: Find the mean, median, mode and standard deviation of the Temperature (column TEMP) for
    each month for the year 2020.
    """
    df_pyspark_1 = spark.read.option("header","true").csv(str(file_1))
    df_pyspark_2 = spark.read.option("header","true").csv(str(file_2))

    temp_1_df = df_pyspark_1.filter(col(column_name) != '9999.9')
    temp_2_df = df_pyspark_2.filter(col(column_name) != '9999.9')
    
    
    temp_1_df = temp_1_df.withColumn(column_name, col(column_name).cast('float'))
    temp_2_df = temp_2_df.withColumn(column_name, col(column_name).cast('float'))

    combined_temp_df = temp_1_df.unionByName(temp_2_df)
    MMM_std = {}

    for m in range(1, 13, 1):
        temp_df = combined_temp_df.withColumn('month', month(combined_temp_df['DATE']))
        temp_df = temp_df.filter(temp_df['month'] == m)
        # Calculate the mean
        avg = temp_df.agg(mean(column_name)).collect()[0][0]

        # Calculate the median
        median = temp_df.approxQuantile(column_name, [0.5], 0)[0]

        # Calculate the mode
        mode = temp_df.groupBy(column_name).count().orderBy(desc("count")).first()[0]

        # Calculate the standard deviation
        std = temp_df.agg(stddev(column_name)).collect()[0][0]

        MMM_std[str(calendar.month_name[m])] = f"Mean: {avg}, Median: {median}, Mode: {mode}, Standard Deviation: {std}"

    return MMM_std


## Results

In [75]:
# Task 1 and Task 2 results
task1_results = {}
task2_results = ""
MIN_VAL = 1e9

for i in range(0, len(files), 2): 
    year, max_temp_str = get_max_temp(files[i], files[i+1])
    year, min_temp_str, min_temp = get_min_temp_jan(files[i], files[i+1])
    task1_results[year] = max_temp_str
    if min_temp < MIN_VAL:
        task2_results = f"YEAR: {year}, {min_temp_str}"
        MIN_VAL = min_temp 


In [76]:
task1_results

{'2010': 'MAX:   74.8, STATION: 99407099999, NAME: DESTRUCTION IS. WA, WA US, DATE: 2010-08-15',
 '2011': 'MAX:   87.8, STATION: 01046099999, NAME: SORKJOSEN, NO, DATE: 2011-07-09',
 '2012': 'MAX:   72.0, STATION: 01023099999, NAME: BARDUFOSS, NO, DATE: 2012-07-05',
 '2013': 'MAX:   80.6, STATION: 01001499999, NAME: SORSTOKKEN, NO, DATE: 2013-08-02',
 '2014': 'MAX:   89.6, STATION: 01023099999, NAME: BARDUFOSS, NO, DATE: 2014-07-10',
 '2015': 'MAX:   71.6, STATION: 01025099999, NAME: TROMSO, NO, DATE: 2015-07-30',
 '2016': 'MAX:   77.0, STATION: 01023199999, NAME: DRAUGEN, NO, DATE: 2016-07-21',
 '2017': 'MAX:   78.6, STATION: 01023099999, NAME: BARDUFOSS, NO, DATE: 2017-06-09',
 '2018': 'MAX:   84.2, STATION: 01025099999, NAME: TROMSO, NO, DATE: 2018-07-29',
 '2019': 'MAX:   78.8, STATION: 01023099999, NAME: BARDUFOSS, NO, DATE: 2019-07-21',
 '2020': 'MAX:   79.9, STATION: 01023099999, NAME: BARDUFOSS, NO, DATE: 2020-06-22',
 '2021': 'MAX:   88.3, STATION: 01065099999, NAME: KARASJOK,

In [77]:
task2_results

'YEAR: 2017, MIN:  -28.3, STATION: 01023099999, NAME: BARDUFOSS, NO, DATE: 2017-01-05'

In [78]:
# Task 3 Results
path_2015 = data_path / Path('2015')
files_2015 = [f for f in path_2015.glob('*') if f.is_file()]

task3_results = list(min_max_precipitation(files_2015[0], files_2015[1]))
task3_results


['PRCP:  2.11, STATION: 01025099999, NAME: TROMSO, NO, DATE: 2015-11-02',
 'PRCP:  0.00, STATION: 01008099999, NAME: LONGYEAR, SV, DATE: 2015-01-01']

In [79]:
# Task 4 Results
path_2019 = data_path / Path('2019')
files_2019 = [f for f in path_2019.glob('*') if f.is_file()]
task4_results = f"Missing value percentage of GUST for 2019 is {count_missing_values(files_2019[0], files_2019[1])}%"
task4_results

'Missing value percentage of GUST for 2019 is 82.87671232876713%'

In [80]:
path_2020 = data_path / Path('2020')
files_2020 = [f for f in path_2020.glob('*') if f.is_file()]

In [81]:
# Task 5 Results
task5_results = MMM_std_temp(files_2020[0], files_2020[1])
task5_results

{'January': 'Mean: 15.896774190928667, Median: 14.899999618530273, Mode: 5.699999809265137, Standard Deviation: 12.805172732458736',
 'February': 'Mean: 13.358620645671055, Median: 15.300000190734863, Mode: 2.799999952316284, Standard Deviation: 13.091808562182692',
 'March': 'Mean: 14.65322592914585, Median: 18.600000381469727, Mode: 9.199999809265137, Standard Deviation: 15.784789655679134',
 'April': 'Mean: 23.329999959468843, Median: 26.0, Mode: 34.099998474121094, Standard Deviation: 13.022097154673123',
 'May': 'Mean: 36.21935490638979, Median: 36.0, Mode: 37.0, Standard Deviation: 8.077246838395743',
 'June': 'Mean: 47.429999923706056, Median: 46.0, Mode: 36.70000076293945, Standard Deviation: 8.877190496403378',
 'July': 'Mean: 52.8870968972483, Median: 51.400001525878906, Mode: 49.29999923706055, Standard Deviation: 6.66378725061258',
 'August': 'Mean: 49.2870969464702, Median: 48.70000076293945, Mode: 44.70000076293945, Standard Deviation: 6.548594719618069',
 'September': 'M

In [82]:
# OUTPUT the results to results.txt
with open('results.txt', 'w', encoding='utf-8') as f:
    f.write("Task 1: \n")
    for year, val in task1_results.items():
        f.write(f"{year}: {val}\n")
    f.write("\n")

    f.write("Task 2: \n")
    f.write(task2_results + "\n")
    f.write("\n")

    f.write("Task 3: \n")
    for val in task3_results: 
        f.write(val + "\n")
    f.write("\n")

    f.write("Task 4: \n")
    f.write(task4_results + "\n")
    f.write("\n")

    f.write("Task 5: \n")
    for month, val in task5_results.items():
        f.write(f"{month}: {val} \n")
    f.write("\n")