# Download data needed (Optional)

In [1]:
import urllib.request

files = '''
2010/01052099999.csv
2010/99407099999.csv
2011/01008099999.csv
2011/01046099999.csv
2012/01023099999.csv
2012/01044099999.csv
2013/01001499999.csv
2013/01008099999.csv
2014/01008099999.csv
2014/01023099999.csv
2015/01008099999.csv
2015/01025099999.csv
2016/01008099999.csv
2016/01023199999.csv
2017/01008099999.csv
2017/01023099999.csv
2018/01008099999.csv
2018/01025099999.csv
2019/01008099999.csv
2019/01023099999.csv
2020/01008099999.csv
2020/01023099999.csv
2021/01062099999.csv
2021/01065099999.csv
2022/01241099999.csv
2022/02095099999.csv
'''
files = files.split()
print(files)

from os import mkdir
from os.path import isdir
if not isdir('data'):
    mkdir('data')
for f in files:
    folder, file = f.split('/')
    if not isdir(f'data/{folder}'):
        mkdir(f'data/{folder}')
    else:
        urllib.request.urlretrieve(f"https://www.ncei.noaa.gov/data/global-summary-of-the-day/access/{f}", 
                                f'data/{folder}/{file}')

# Setup and initialize spark

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import size, year, month
from pyspark.sql.types import *

import pandas as pd
import sys

spark = SparkSession.builder.master("local").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/29 15:45:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Read data and create year month data

In [3]:
spark_df = spark.read.option("header", "true").option("delimiter",",").option("inferSchema", "true").csv('data/*')
spark_df = spark_df.withColumn('YEAR', year(spark_df.DATE))
spark_df = spark_df.withColumn('MONTH', month(spark_df.DATE))

                                                                                

# Task 1

Get rid of missing data

In [4]:
spark_df_Hot = spark_df.filter(spark_df.MAX != 9999.9)
spark_df_Hot.createOrReplaceTempView('table')

23/03/29 15:45:53 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [5]:
query = '''
SELECT t1.MAX, t1.STATION, t1.NAME, t1.DATE
FROM (
  SELECT MAX, STATION, NAME, DATE, YEAR,
         ROW_NUMBER() OVER (PARTITION BY YEAR ORDER BY MAX DESC) as row
  FROM table
) t1
INNER JOIN (
  SELECT YEAR, MAX(MAX) as M_TEMP
  FROM table
  GROUP BY YEAR
) t2
ON t1.YEAR = t2.YEAR AND t1.MAX = t2.M_TEMP
WHERE t1.row = 1
ORDER BY t1.YEAR

'''
ori_stdout = sys.stdout
with open('result.txt', 'a') as f:
    sys.stdout = f
    print('Task 1')
    spark.sql(query).show()
    sys.stdout = ori_stdout
spark.sql(query).show()

                                                                                

+----+-----------+--------------------+-------------------+
| MAX|    STATION|                NAME|               DATE|
+----+-----------+--------------------+-------------------+
|74.8|99407099999|DESTRUCTION IS. W...|2010-08-15 00:00:00|
|87.8| 1046099999|       SORKJOSEN, NO|2011-07-09 00:00:00|
|72.0| 1023099999|       BARDUFOSS, NO|2012-07-05 00:00:00|
|80.6| 1001499999|      SORSTOKKEN, NO|2013-08-02 00:00:00|
|89.6| 1023099999|       BARDUFOSS, NO|2014-07-10 00:00:00|
|71.6| 1025099999|          TROMSO, NO|2015-07-30 00:00:00|
|77.0| 1023199999|         DRAUGEN, NO|2016-07-21 00:00:00|
|78.6| 1023099999|       BARDUFOSS, NO|2017-06-09 00:00:00|
|84.2| 1025099999|          TROMSO, NO|2018-07-29 00:00:00|
|78.8| 1023099999|       BARDUFOSS, NO|2019-07-21 00:00:00|
|79.9| 1023099999|       BARDUFOSS, NO|2020-06-22 00:00:00|
|88.3| 1065099999|        KARASJOK, NO|2021-07-05 00:00:00|
|85.5| 2095099999|          PAJALA, SW|2022-07-01 00:00:00|
+----+-----------+--------------------+-

# Task 2

Get rid of missing data

In [6]:
spark_df_Cold = spark_df.filter(spark_df.MIN != 9999.9)
spark_df_Cold.createOrReplaceTempView('table')

In [7]:
query = '''
SELECT t.MIN, t.STATION, t.NAME, t.DATE
FROM table t
WHERE t.MONTH = 1 AND t.YEAR BETWEEN 2010 AND 2022 AND t.MIN = (
  SELECT MIN(MIN)
  FROM table
  WHERE MONTH = 1 AND YEAR BETWEEN 2010 AND 2022
)
'''
ori_stdout = sys.stdout
with open('result.txt', 'a') as f:
    sys.stdout = f
    print('Task 2')
    spark.sql(query).show()
    sys.stdout = ori_stdout
spark.sql(query).show()

+-----+----------+-------------+-------------------+
|  MIN|   STATION|         NAME|               DATE|
+-----+----------+-------------+-------------------+
|-28.3|1023099999|BARDUFOSS, NO|2017-01-05 00:00:00|
+-----+----------+-------------+-------------------+



# Task 3

Get rid of missing data

In [8]:
spark_df_PRCP = spark_df.filter(spark_df.PRCP != 99.99)
spark_df_PRCP.createOrReplaceTempView('table')

In [9]:
query = '''
(SELECT PRCP, STATION, NAME, DATE
FROM table
WHERE YEAR = 2015
ORDER BY PRCP DESC
limit 1)
UNION
(SELECT PRCP, STATION, NAME, DATE
FROM table
WHERE YEAR = 2015
ORDER BY PRCP ASC
limit 1)
'''
ori_stdout = sys.stdout
with open('result.txt', 'a') as f:
    sys.stdout = f
    print('Task 3')
    spark.sql(query).show()
    sys.stdout = ori_stdout
spark.sql(query).show()

+----+----------+------------+-------------------+
|PRCP|   STATION|        NAME|               DATE|
+----+----------+------------+-------------------+
|2.11|1025099999|  TROMSO, NO|2015-11-02 00:00:00|
| 0.0|1008099999|LONGYEAR, SV|2015-01-01 00:00:00|
+----+----------+------------+-------------------+



# Task 4

In [10]:
spark_df.createOrReplaceTempView('table')

query = '''
SELECT n_nan, n_rows, n_nan / n_rows * 100 as percentage_missing
FROM
(
SELECT COUNT(*) as n_nan
FROM table
WHERE YEAR = 2019 AND GUST = 999.9
)
,
(
SELECT COUNT(*) as n_rows
FROM table
WHERE YEAR = 2019
)
'''
ori_stdout = sys.stdout
with open('result.txt', 'a') as f:
    sys.stdout = f
    print('Task 4')
    spark.sql(query).show()
    sys.stdout = ori_stdout
spark.sql(query).show()

+-----+------+------------------+
|n_nan|n_rows|percentage_missing|
+-----+------+------------------+
|  605|   730| 82.87671232876713|
+-----+------+------------------+



# Task 5

Get rid of missing data

In [11]:
spark_df_TEMP = spark_df.filter(spark_df.TEMP != 9999.9)
spark_df_TEMP.createOrReplaceTempView('table')

Get Mean, Median, and Standard Deviatiion usnig first query

In [12]:
query = '''
SELECT MONTH, AVG(TEMP) as MEAN, percentile(TEMP, 0.5) as MEDIAN, STD(TEMP) as STD
FROM table
WHERE YEAR = 2020
GROUP BY MONTH
ORDER BY MONTH
'''

spark_df_summary = spark.sql(query)

Calculating Mode by manually

In [13]:
query = '''
SELECT a.MONTH, b.TEMP
FROM
(
SELECT MONTH, MAX(COUNT) as M_COUNT
FROM
(
SELECT MONTH, TEMP, COUNT(TEMP) as COUNT
FROM table
WHERE YEAR = 2020
GROUP BY TEMP, MONTH
)
GROUP BY MONTH
ORDER BY MONTH
) a
INNER JOIN
(
SELECT MONTH, TEMP, COUNT(TEMP) as COUNT
FROM table
WHERE YEAR = 2020
GROUP BY TEMP, MONTH
) b
ON a.MONTH = b.MONTH AND a.M_COUNT = b.COUNT
ORDER BY a.MONTH
'''

spark.sql(query).createOrReplaceTempView('count')

query = '''
SELECT MONTH, TEMP as MODE
FROM (
      SELECT MONTH, TEMP, ROW_NUMBER() OVER (PARTITION BY MONTH ORDER BY TEMP)row_num
      FROM count
      ) sub
WHERE row_num = 1
'''
spark_df_mode = spark.sql(query)

Combine two table to one

In [14]:
ori_stdout = sys.stdout
with open('result.txt', 'a') as f:
    sys.stdout = f
    print('Task 5')
    spark_df_summary.join(spark_df_mode, 'MONTH', 'outer').select('MONTH', 'MEAN', 'MEDIAN', 'MODE', 'STD').show()
    sys.stdout = ori_stdout
spark_df_summary.join(spark_df_mode, 'MONTH', 'outer').select('MONTH', 'MEAN', 'MEDIAN', 'MODE', 'STD').show()

                                                                                

+-----+------------------+------+----+------------------+
|MONTH|              MEAN|MEDIAN|MODE|               STD|
+-----+------------------+------+----+------------------+
|    1|15.896774193548387| 15.25| 5.7|12.805172721989297|
|    2| 13.35862068965517|  15.4| 2.8| 13.09180853418292|
|    3|14.653225806451612|  18.6| 9.2|15.784789500893567|
|    4|23.329999999999995|  27.3|34.1| 13.02209725617009|
|    5| 36.21935483870967| 36.05|37.0| 8.077246704851957|
|    6| 47.42999999999999|  46.1|36.7| 8.877190347997287|
|    7| 52.88709677419356| 51.55|49.3|  6.66378723291517|
|    8|49.287096774193564| 48.85|44.7| 6.548594740281946|
|    9| 41.84499999999999| 42.55|31.8| 5.887660897797832|
|   10|31.529032258064507|  30.9|23.2| 9.609052888228815|
|   11|29.246666666666666|  29.9|28.1| 8.143448373534971|
|   12| 19.95483870967743| 20.35|10.2| 8.854464048157649|
+-----+------------------+------+----+------------------+

