In [1]:
# Run this cell to import pyspark and to define start_spark() and stop_spark()

import findspark

findspark.init()

import getpass
import pandas
import pyspark
import random
import re

from IPython.display import display, HTML
from pyspark import SparkContext
from pyspark.sql import SparkSession


# Functions used below

def username():
    """Get username with any domain information removed.
    """

    return re.sub('@.*', '', getpass.getuser())


def dict_to_html(d):
    """Convert a Python dictionary into a two column table for display.
    """

    html = []

    html.append(f'<table width="100%" style="width:100%; font-family: monospace;">')
    for k, v in d.items():
        html.append(f'<tr><td style="text-align:left;">{k}</td><td>{v}</td></tr>')
    html.append(f'</table>')

    return ''.join(html)


def show_as_html(df, n=20):
    """Leverage existing pandas jupyter integration to show a spark dataframe as html.
    
    Args:
        n (int): number of rows to show (default: 20)
    """

    display(df.limit(n).toPandas())

    
def display_spark():
    """Display the status of the active Spark session if one is currently running.
    """
    
    if 'spark' in globals() and 'sc' in globals():

        name = sc.getConf().get("spark.app.name")
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:green">active</span></b>, look for <code>{name}</code> under the running applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'<li><a href="{sc.uiWebUrl}" target="_blank">Spark Application UI</a></li>',
            f'</ul>',
            f'<p><b>Config</b></p>',
            dict_to_html(dict(sc.getConf().getAll())),
            f'<p><b>Notes</b></p>',
            f'<ul>',
            f'<li>The spark session <code>spark</code> and spark context <code>sc</code> global variables have been defined by <code>start_spark()</code>.</li>',
            f'<li>Please run <code>stop_spark()</code> before closing the notebook or restarting the kernel or kill <code>{name}</code> by hand using the link in the Spark UI.</li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))
        
    else:
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:red">stopped</span></b>, confirm that <code>{username() + " (jupyter)"}</code> is under the completed applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))


# Functions to start and stop spark

def start_spark(executor_instances=2, executor_cores=1, worker_memory=1, master_memory=1):
    """Start a new Spark session and define globals for SparkSession (spark) and SparkContext (sc).
    
    Args:
        executor_instances (int): number of executors (default: 2)
        executor_cores (int): number of cores per executor (default: 1)
        worker_memory (float): worker memory (default: 1)
        master_memory (float): master memory (default: 1)
    """

    global spark
    global sc

    user = username()
    
    cores = executor_instances * executor_cores
    partitions = cores * 4
    port = 4000 + random.randint(1, 999)

    spark = (
        SparkSession.builder
        .master("spark://masternode2:7077")
        .config("spark.driver.extraJavaOptions", f"-Dderby.system.home=/tmp/{user}/spark/")
        .config("spark.dynamicAllocation.enabled", "false")
        .config("spark.executor.instances", str(executor_instances))
        .config("spark.executor.cores", str(executor_cores))
        .config("spark.cores.max", str(cores))
        .config("spark.executor.memory", f"{worker_memory}g")
        .config("spark.driver.memory", f"{master_memory}g")
        .config("spark.driver.maxResultSize", "0")
        .config("spark.sql.shuffle.partitions", str(partitions))
        .config("spark.ui.port", str(port))
        .appName(user + " (jupyter)")
        .getOrCreate()
    )
    sc = SparkContext.getOrCreate()
    
    display_spark()

    
def stop_spark():
    """Stop the active Spark session and delete globals for SparkSession (spark) and SparkContext (sc).
    """

    global spark
    global sc

    if 'spark' in globals() and 'sc' in globals():

        spark.stop()

        del spark
        del sc

    display_spark()


# Make css changes to improve spark output readability

html = [
    '<style>',
    'pre { white-space: pre !important; }',
    'table.dataframe td { white-space: nowrap !important; }',
    'table.dataframe thead th:first-child, table.dataframe tbody th { display: none; }',
    '</style>',
]
display(HTML(''.join(html)))

In [2]:
# Run this cell to start a spark session in this notebook

start_spark(executor_instances=2, executor_cores=1, worker_memory=1, master_memory=1)

0,1
spark.dynamicAllocation.enabled,false
spark.app.startTime,1713255230520
spark.master,spark://masternode2:7077
spark.executor.id,driver
spark.driver.memory,1g
spark.driver.port,35769
spark.driver.host,mathmadslinux2p.canterbury.ac.nz
spark.ui.port,4852
spark.app.name,uwi14 (jupyter)
spark.executor.memory,1g


<h3>Processing<h3>

<h3>Q1<h3> Define the separate data sources as daily, stations, states, countries, and inventory
respectively. All of the data is stored in HDFS under hdfs:///data/ghcnd and is read only. Do
not copy any of the data to your home directory.
Use the hdfs command to explore hdfs:///data/ghcnd without actually loading any data into
memory.
(a) How is the data structured? Draw a directory tree to represent this in a sensible way.
(b) How many years are contained in daily, and how does the size of the data change?
(c) What is the total size of all of the data? How much of that is daily?

In [71]:
#Q1 - 1
#Data Structure
!hdfs dfs -ls /data/ghcnd/

Found 5 items
drwxr-xr-x   - jsw93 supergroup          0 2024-03-19 00:11 /data/ghcnd/daily
-rwxr-xr-x   8 jsw93 supergroup       3659 2024-03-11 00:45 /data/ghcnd/ghcnd-countries.txt
-rwxr-xr-x   8 jsw93 supergroup   34380032 2024-03-11 00:39 /data/ghcnd/ghcnd-inventory.txt
-rwxr-xr-x   8 jsw93 supergroup       1086 2024-03-11 00:45 /data/ghcnd/ghcnd-states.txt
-rwxr-xr-x   8 jsw93 supergroup   10834968 2024-03-11 00:39 /data/ghcnd/ghcnd-stations.txt


In [4]:
#Q1 - 2
#Number of years in daily
!hdfs dfs -ls /data/ghcnd/daily/

Found 263 items
-rw-r--r--   8 jsw93 supergroup     517706 2024-03-18 23:56 /data/ghcnd/daily/1750.csv.gz
-rw-r--r--   8 jsw93 supergroup       3358 2024-03-18 23:57 /data/ghcnd/daily/1763.csv.gz
-rw-r--r--   8 jsw93 supergroup       3327 2024-03-18 23:54 /data/ghcnd/daily/1764.csv.gz
-rw-r--r--   8 jsw93 supergroup       3335 2024-03-18 23:54 /data/ghcnd/daily/1765.csv.gz
-rw-r--r--   8 jsw93 supergroup       3344 2024-03-18 23:49 /data/ghcnd/daily/1766.csv.gz
-rw-r--r--   8 jsw93 supergroup       3356 2024-03-18 23:56 /data/ghcnd/daily/1767.csv.gz
-rw-r--r--   8 jsw93 supergroup       3325 2024-03-18 23:53 /data/ghcnd/daily/1768.csv.gz
-rw-r--r--   8 jsw93 supergroup       3418 2024-03-18 23:54 /data/ghcnd/daily/1769.csv.gz
-rw-r--r--   8 jsw93 supergroup       3357 2024-03-18 23:56 /data/ghcnd/daily/1770.csv.gz
-rw-r--r--   8 jsw93 supergroup       3373 2024-03-18 23:56 /data/ghcnd/daily/1771.csv.gz
-rw-r--r--   8 jsw93 supergroup       3419 2024-03-18 23:55 /data/ghcnd/d

In [5]:
#Q1 - 2
#Size of the data changes in Daily
!hdfs dfs -du -h /data/ghcnd/daily/


505.6 K  3.9 M     /data/ghcnd/daily/1750.csv.gz
3.3 K    26.2 K    /data/ghcnd/daily/1763.csv.gz
3.2 K    26.0 K    /data/ghcnd/daily/1764.csv.gz
3.3 K    26.1 K    /data/ghcnd/daily/1765.csv.gz
3.3 K    26.1 K    /data/ghcnd/daily/1766.csv.gz
3.3 K    26.2 K    /data/ghcnd/daily/1767.csv.gz
3.2 K    26.0 K    /data/ghcnd/daily/1768.csv.gz
3.3 K    26.7 K    /data/ghcnd/daily/1769.csv.gz
3.3 K    26.2 K    /data/ghcnd/daily/1770.csv.gz
3.3 K    26.4 K    /data/ghcnd/daily/1771.csv.gz
3.3 K    26.7 K    /data/ghcnd/daily/1772.csv.gz
3.3 K    26.3 K    /data/ghcnd/daily/1773.csv.gz
3.3 K    26.5 K    /data/ghcnd/daily/1774.csv.gz
6.2 K    49.7 K    /data/ghcnd/daily/1775.csv.gz
6.3 K    50.2 K    /data/ghcnd/daily/1776.csv.gz
6.3 K    50.2 K    /data/ghcnd/daily/1777.csv.gz
6.1 K    48.8 K    /data/ghcnd/daily/1778.csv.gz
6 K      48 K      /data/ghcnd/daily/1779.csv.gz
6.1 K    48.8 K    /data/ghcnd/daily/1780.csv.gz
7.6 K    60.9 K    /data/ghcnd/daily/1781.csv.gz


In [6]:
#Q1 - 3
#Total Size of the data
!hdfs dfs -du -h /data/ghcnd/

12.4 G  98.9 G   /data/ghcnd/daily
3.6 K   28.6 K   /data/ghcnd/ghcnd-countries.txt
32.8 M  262.3 M  /data/ghcnd/ghcnd-inventory.txt
1.1 K   8.5 K    /data/ghcnd/ghcnd-states.txt
10.3 M  82.7 M   /data/ghcnd/ghcnd-stations.txt


<h3>Q2<h3>
<h3>Open a notebook on the master node using start pyspark notebook and run start spark
with 2 executors, 1 core per executor, 1 GB of executor memory, and 1 GB of master memory.
You will now explore each data source briefly to ensure that the descriptions are accurate and
that the data is as expected.<h3>

<h3>(a) Define a schema for daily based on the description in this assignment or in the GHCN
Daily README. This schema should use the data types defined in pyspark.sql.<h3>

<h3>(b) Load 1000 rows of the hdfs:///data/ghcnd/daily/2023.csv.gz file into Spark by using
the limit command immediately after the read command.
What data types did you end up using for the schema and why?<h3>

In [7]:
#Q2

# Write your imports and code here or insert cells below

from pyspark.sql import functions as F
from pyspark.sql.types import *

In [8]:
# Daily

schema_daily = StructType([
    StructField("ID", StringType(), True),
    StructField("DATE", StringType(), True),
    StructField("ELEMENT", StringType(), True),
    StructField("VALUE", FloatType(), True),
    StructField("M_FLAG", StringType(), True),
    StructField("Q_FLAG", StringType(), True),
    StructField("S_FLAG", StringType(), True),
    StructField("OBS_TIME", StringType(), True),
])
daily_data = (
    spark.read.format("com.databricks.spark.csv")
    .option("header", "false")
    .option("inferSchema", "false")
    .schema(schema_daily)
    .load("hdfs:///data/ghcnd/daily/2023.csv.gz")
    .limit(1000)
)
daily_data.show(100)

+-----------+--------+-------+-----+------+------+------+--------+
|         ID|    DATE|ELEMENT|VALUE|M_FLAG|Q_FLAG|S_FLAG|OBS_TIME|
+-----------+--------+-------+-----+------+------+------+--------+
|AE000041196|20230101|   TMAX|252.0|  null|  null|     S|    null|
|AE000041196|20230101|   TMIN|149.0|  null|  null|     S|    null|
|AE000041196|20230101|   PRCP|  0.0|     D|  null|     S|    null|
|AE000041196|20230101|   TAVG|207.0|     H|  null|     S|    null|
|AEM00041194|20230101|   TMAX|255.0|  null|  null|     S|    null|
|AEM00041194|20230101|   TMIN|186.0|  null|  null|     S|    null|
|AEM00041194|20230101|   PRCP|  0.0|  null|  null|     S|    null|
|AEM00041194|20230101|   TAVG|223.0|     H|  null|     S|    null|
|AEM00041217|20230101|   TMAX|248.0|  null|  null|     S|    null|
|AEM00041217|20230101|   TMIN|184.0|  null|  null|     S|    null|
|AEM00041217|20230101|   TAVG|215.0|     H|  null|     S|    null|
|AEM00041218|20230101|   TMAX|254.0|  null|  null|     S|    n

In [9]:
show_as_html(daily_data, 100)

Unnamed: 0,ID,DATE,ELEMENT,VALUE,M_FLAG,Q_FLAG,S_FLAG,OBS_TIME
0,AE000041196,20230101,TMAX,252.0,,,S,
1,AE000041196,20230101,TMIN,149.0,,,S,
2,AE000041196,20230101,PRCP,0.0,D,,S,
3,AE000041196,20230101,TAVG,207.0,H,,S,
4,AEM00041194,20230101,TMAX,255.0,,,S,
5,AEM00041194,20230101,TMIN,186.0,,,S,
6,AEM00041194,20230101,PRCP,0.0,,,S,
7,AEM00041194,20230101,TAVG,223.0,H,,S,
8,AEM00041217,20230101,TMAX,248.0,,,S,
9,AEM00041217,20230101,TMIN,184.0,,,S,


<h3>(c) Load each of stations, states, countries, and inventory into Spark as well. You
will need to find a way to parse the fixed width text formatting, as this format is not included
in the standard spark.read library. You could try using spark.read.text and
pyspark.sql.functions.substring or finding an existing open source library.
How many rows are in each metadata table? How many stations do not have a WMO ID?<h3>

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import substring

In [4]:
#Stations_Data

stations_df = spark.read.text("hdfs:///data/ghcnd/ghcnd-stations.txt")

In [5]:
stations_df.show()

+--------------------+
|               value|
+--------------------+
|ACW00011604  17.1...|
|ACW00011647  17.1...|
|AE000041196  25.3...|
|AEM00041194  25.2...|
|AEM00041217  24.4...|
|AEM00041218  24.2...|
|AF000040930  35.3...|
|AFM00040938  34.2...|
|AFM00040948  34.5...|
|AFM00040990  31.5...|
|AG000060390  36.7...|
|AG000060590  30.5...|
|AG000060611  28.0...|
|AG000060680  22.8...|
|AGE00135039  35.7...|
|AGE00147704  36.9...|
|AGE00147705  36.7...|
|AGE00147706  36.8...|
|AGE00147707  36.8...|
|AGE00147708  36.7...|
+--------------------+
only showing top 20 rows



In [6]:
stations_parsed_df = stations_df.select(
    substring("value", 1, 11).alias("ID"),
    substring("value", 13, 8).alias("LATITUDE"),
    substring("value", 22, 9).alias("LONGITUDE"),
    substring("value", 32, 6).alias("ELEVATION"),
    substring("value", 39, 2).alias("STATE"),
    substring("value", 42, 29).alias("NAME"),
    substring("value", 73, 3).alias("GSN_FLAG"),
    substring("value", 77, 3).alias("HCN_CRN_FLAG"),
    substring("value", 81, 5).alias("WMO_ID")
)

In [8]:
stations_parsed_df.show(100)

+-----------+--------+---------+---------+-----+--------------------+--------+------------+------+
|         ID|LATITUDE|LONGITUDE|ELEVATION|STATE|                NAME|GSN_FLAG|HCN_CRN_FLAG|WMO_ID|
+-----------+--------+---------+---------+-----+--------------------+--------+------------+------+
|ACW00011604| 17.1167| -61.7833|     10.1|     |ST JOHNS COOLIDGE...|        |            |      |
|ACW00011647| 17.1333| -61.7833|     19.2|     |ST JOHNS         ...|        |            |      |
|AE000041196| 25.3330|  55.5170|     34.0|     |SHARJAH INTER. AI...|     GSN|            | 41196|
|AEM00041194| 25.2550|  55.3640|     10.4|     |DUBAI INTL       ...|        |            | 41194|
|AEM00041217| 24.4330|  54.6510|     26.8|     |ABU DHABI INTL   ...|        |            | 41217|
|AEM00041218| 24.2620|  55.6090|    264.9|     |AL AIN INTL      ...|        |            | 41218|
|AF000040930| 35.3170|  69.0170|   3366.0|     |NORTH-SALANG     ...|     GSN|            | 40930|
|AFM000409

In [15]:
show_as_html(stations_parsed_df,10)

Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEVATION,STATE,NAME,GSN_FLAG,HCN_CRN_FLAG,WMO_ID
0,ACW00011604,17.1167,-61.7833,10.1,,ST JOHNS COOLIDGE FLD,,,
1,ACW00011647,17.1333,-61.7833,19.2,,ST JOHNS,,,
2,AE000041196,25.333,55.517,34.0,,SHARJAH INTER. AIRP,GSN,,41196.0
3,AEM00041194,25.255,55.364,10.4,,DUBAI INTL,,,41194.0
4,AEM00041217,24.433,54.651,26.8,,ABU DHABI INTL,,,41217.0
5,AEM00041218,24.262,55.609,264.9,,AL AIN INTL,,,41218.0
6,AF000040930,35.317,69.017,3366.0,,NORTH-SALANG,GSN,,40930.0
7,AFM00040938,34.21,62.228,977.2,,HERAT,,,40938.0
8,AFM00040948,34.566,69.212,1791.3,,KABUL INTL,,,40948.0
9,AFM00040990,31.5,65.85,1010.0,,KANDAHAR AIRPORT,,,40990.0


In [9]:
from pyspark.sql.functions import col, isnull, trim

# Count rows in the DataFrame
stations_count = stations_parsed_df.count()

# Count stations with a blank or null WMO_ID
stations_no_wmo_id_count = stations_parsed_df.filter(
    (trim(col("WMO_ID")) == "") | isnull(col("WMO_ID"))
).count()

print(f"Number of stations: {stations_count}")
print(f"Stations without WMO ID: {stations_no_wmo_id_count}")

Number of stations: 125983
Stations without WMO ID: 118023


In [72]:
#States Data
states_df = spark.read.text("hdfs:///data/ghcnd/ghcnd-states.txt")
states_parsed_df = states_df.select(
    substring('value', 1, 2).alias('State_Code'),
    substring('value', 4, 50).alias('State_Name')
)
show_as_html(states_parsed_df,10)
states_count = states_parsed_df.count()
print(f"Number of states: {states_count}")

Unnamed: 0,State_Code,State_Name
0,AB,ALBERTA
1,AK,ALASKA
2,AL,ALABAMA
3,AR,ARKANSAS
4,AS,AMERICAN SAMOA
5,AZ,ARIZONA
6,BC,BRITISH COLUMBIA
7,CA,CALIFORNIA
8,CO,COLORADO
9,CT,CONNECTICUT


Number of states: 74


In [73]:
output_path_states = "hdfs:///user/uwi14/outputs/ghcnd/states_df.csv"
states_parsed_df.write.mode("overwrite").option("header", "true").option("compression", "gzip").csv(output_path_states)

In [79]:
#Country Data
countries_df = spark.read.text("hdfs:///data/ghcnd/ghcnd-countries.txt")
countries_parsed_df =countries_df.select(
    substring('value', 1, 2).alias('Country_Code'),
    substring('value', 4, 50).alias('Country_Name')
)
show_as_html(countries_parsed_df,10)
countries_count = countries_parsed_df.count()
print(f"Number of countries: {countries_count}")

Unnamed: 0,Country_Code,Country_Name
0,AC,Antigua and Barbuda
1,AE,United Arab Emirates
2,AF,Afghanistan
3,AG,Algeria
4,AJ,Azerbaijan
5,AL,Albania
6,AM,Armenia
7,AO,Angola
8,AQ,American Samoa [United States]
9,AR,Argentina


Number of countries: 219


In [80]:
output_path_countries = "hdfs:///user/uwi14/outputs/ghcnd/countries_df.csv"
countries_parsed_df.write.mode("overwrite").option("header", "true").option("compression", "gzip").csv(output_path_countries)

In [19]:
#Inventory
inventory_df = spark.read.text("hdfs:///data/ghcnd/ghcnd-inventory.txt")
inventory_parsed_df =inventory_df.select(
    substring(col("value"), 1, 11).alias('ID'),
    substring(col("value"), 13, 7).alias('LATITUDE'),
    substring(col("value"), 22, 8).alias('LONGITUDE'),
    substring('value', 32, 4).alias('ELEMENT'),
    substring('value', 37, 4).alias('FIRST YEAR'),
    substring('value', 42, 4).alias('LAST YEAR')
)
show_as_html(inventory_parsed_df,100)
inventory_count = inventory_parsed_df.count()
print(f"Number of inventories: {inventory_count}")

Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEMENT,FIRST YEAR,LAST YEAR
0,ACW00011604,17.116,-61.783,TMAX,1949,1949
1,ACW00011604,17.116,-61.783,TMIN,1949,1949
2,ACW00011604,17.116,-61.783,PRCP,1949,1949
3,ACW00011604,17.116,-61.783,SNOW,1949,1949
4,ACW00011604,17.116,-61.783,SNWD,1949,1949
5,ACW00011604,17.116,-61.783,PGTM,1949,1949
6,ACW00011604,17.116,-61.783,WDFG,1949,1949
7,ACW00011604,17.116,-61.783,WSFG,1949,1949
8,ACW00011604,17.116,-61.783,WT03,1949,1949
9,ACW00011604,17.116,-61.783,WT08,1949,1949


Number of inventories: 747382


<h3> Q3 <h3>
<h3>Next you will combine the daily climate summaries with the metadata tables, joining on station,
state, and country. Note that joining the daily climate summaries and metadata into a single
table is not efficient for a database of this size, but joining the metadata into a single table is very
convenient for filtering and sorting based on attributes at a station level.
You will need to start saving some intermediate outputs along the way. Create an output directory
in your home directory e.g. hdfs:///user/abc123/outputs/ghcnd/. Note that we only have
400GB of distributed storage available so think carefully before you write output to HDFS.<h3>

<h4>(a) Extract the two character country code from each station code in stations and store the
output as a new column using the withColumn method.<h4>

In [21]:
!hdfs dfs -du -h /user/uwi14/outputs/ghcnd/

4.6 M  18.4 M  /user/uwi14/outputs/ghcnd/Final_stations_df.parquet
2.3 M  9.2 M   /user/uwi14/outputs/ghcnd/joined_countries_df.csv
2.3 M  9.3 M   /user/uwi14/outputs/ghcnd/joined_countries_state_df.csv


In [22]:
#Extract the two character country code from each station code in stations

stations_parsed_df = stations_parsed_df.withColumn("CODE", substring(col("ID"), 1, 2))

show_as_html(stations_parsed_df,5)

Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEVATION,STATE,NAME,GSN_FLAG,HCN_CRN_FLAG,WMO_ID,CODE
0,ACW00011604,17.1167,-61.7833,10.1,,ST JOHNS COOLIDGE FLD,,,,AC
1,ACW00011647,17.1333,-61.7833,19.2,,ST JOHNS,,,,AC
2,AE000041196,25.333,55.517,34.0,,SHARJAH INTER. AIRP,GSN,,41196.0,AE
3,AEM00041194,25.255,55.364,10.4,,DUBAI INTL,,,41194.0,AE
4,AEM00041217,24.433,54.651,26.8,,ABU DHABI INTL,,,41217.0,AE


<h4>(b) LEFT JOIN stations with countries using your output from part (a).<h4>

In [31]:
# Perform the LEFT JOIN
joined_countries_df = stations_parsed_df.join(
    countries_parsed_df.withColumnRenamed("Country_Code","CODE"),
    on="CODE",
    how = "left"
)
show_as_html(joined_countries_df,5)

Unnamed: 0,CODE,ID,LATITUDE,LONGITUDE,ELEVATION,STATE,NAME,GSN_FLAG,HCN_CRN_FLAG,WMO_ID,Country_Name
0,AC,ACW00011604,17.1167,-61.7833,10.1,,ST JOHNS COOLIDGE FLD,,,,Antigua and Barbuda
1,AC,ACW00011647,17.1333,-61.7833,19.2,,ST JOHNS,,,,Antigua and Barbuda
2,AE,AE000041196,25.333,55.517,34.0,,SHARJAH INTER. AIRP,GSN,,41196.0,United Arab Emirates
3,AE,AEM00041194,25.255,55.364,10.4,,DUBAI INTL,,,41194.0,United Arab Emirates
4,AE,AEM00041217,24.433,54.651,26.8,,ABU DHABI INTL,,,41217.0,United Arab Emirates


In [32]:
output_path = "hdfs:///user/uwi14/outputs/ghcnd/joined_countries_df.csv"
joined_countries_df.write.mode("overwrite").option("header", "true").option("compression", "gzip").csv(output_path)

<h4>(c) LEFT JOIN stations and states, allowing for the fact that state codes are only provided
for stations in the US.<h4>

In [38]:
# Perform the LEFT JOIN
joined_country_state_df = joined_countries_df.join(
    states_parsed_df.withColumnRenamed("State_Code","STATE"),
    on="STATE",
    how="left"
)
show_as_html(joined_country_state_df,15)

Unnamed: 0,STATE,CODE,ID,LATITUDE,LONGITUDE,ELEVATION,NAME,GSN_FLAG,HCN_CRN_FLAG,WMO_ID,Country_Name,State_Name
0,,AC,ACW00011604,17.1167,-61.7833,10.1,ST JOHNS COOLIDGE FLD,,,,Antigua and Barbuda,
1,,AC,ACW00011647,17.1333,-61.7833,19.2,ST JOHNS,,,,Antigua and Barbuda,
2,,AE,AE000041196,25.333,55.517,34.0,SHARJAH INTER. AIRP,GSN,,41196.0,United Arab Emirates,
3,,AE,AEM00041194,25.255,55.364,10.4,DUBAI INTL,,,41194.0,United Arab Emirates,
4,,AE,AEM00041217,24.433,54.651,26.8,ABU DHABI INTL,,,41217.0,United Arab Emirates,
5,,AE,AEM00041218,24.262,55.609,264.9,AL AIN INTL,,,41218.0,United Arab Emirates,
6,,AF,AF000040930,35.317,69.017,3366.0,NORTH-SALANG,GSN,,40930.0,Afghanistan,
7,,AF,AFM00040938,34.21,62.228,977.2,HERAT,,,40938.0,Afghanistan,
8,,AF,AFM00040948,34.566,69.212,1791.3,KABUL INTL,,,40948.0,Afghanistan,
9,,AF,AFM00040990,31.5,65.85,1010.0,KANDAHAR AIRPORT,,,40990.0,Afghanistan,


In [39]:
output_path1 = "hdfs:///user/uwi14/outputs/ghcnd/joined_countries_state_df.csv"
joined_country_state_df.write.mode("overwrite").option("header", "true").option("compression", "gzip").csv(output_path1)

In [41]:
!hdfs dfs -du -h /user/uwi14/outputs/ghcnd/

4.6 M  18.4 M  /user/uwi14/outputs/ghcnd/Final_stations_df.parquet
2.3 M  9.2 M   /user/uwi14/outputs/ghcnd/joined_countries_df.csv
2.3 M  9.2 M   /user/uwi14/outputs/ghcnd/joined_countries_state_df.csv


<h4>(d) Based on inventory, what was the first and last year that each station was active and
collected any element at all?<h4>

In [42]:
active_years_df = inventory_parsed_df.groupBy('ID').agg(
    F.min('FIRST YEAR').alias('FIRST_ACTIVE_YEAR'),
    F.max('LAST YEAR').alias('LAST_ACTIVE_YEAR')
)
show_as_html(active_years_df,5)

Unnamed: 0,ID,FIRST_ACTIVE_YEAR,LAST_ACTIVE_YEAR
0,AEM00041217,1983,2024
1,AGE00147708,1879,2024
2,AGE00147710,1909,2009
3,AGE00147714,1896,1938
4,AGE00147719,1888,2024


<h4>How many different elements has each station collected overall?<h4>

In [46]:
element_count_df = inventory_parsed_df.groupBy('ID').agg(
    F.countDistinct('ELEMENT').alias('DIFFERENT_ELEMENTS_COUNT')
)
show_as_html(element_count_df,5)

Unnamed: 0,ID,DIFFERENT_ELEMENTS_COUNT
0,USC00030828,15
1,USC00031442,20
2,USC00031582,13
3,USC00031900,12
4,USC00031956,2


In [47]:
Modified_Inventory_df = active_years_df.join(
                        element_count_df,
                        on="ID",how="left")
show_as_html(Modified_Inventory_df,5)

Unnamed: 0,ID,FIRST_ACTIVE_YEAR,LAST_ACTIVE_YEAR,DIFFERENT_ELEMENTS_COUNT
0,AEM00041217,1983,2024,4
1,AGE00147708,1879,2024,5
2,AGE00147710,1909,2009,4
3,AGE00147714,1896,1938,3
4,AGE00147719,1888,2024,4


<h4>Further, count separately the number of core elements and the number of ”other” elements
that each station has collected overall.<h4>

In [50]:
core_elements = ['PRCP', 'SNOW', 'SNWD', 'TMAX', 'TMIN']

# Core elements count
core_elements_df = inventory_parsed_df.filter(inventory_parsed_df['ELEMENT'].isin(core_elements)).groupBy('ID').agg(
    F.countDistinct('ELEMENT').alias('CORE_ELEMENTS_COUNT')
)

# Other elements count
other_elements_df = inventory_parsed_df.filter(~inventory_parsed_df['ELEMENT'].isin(core_elements)).groupBy('ID').agg(
    F.countDistinct('ELEMENT').alias('OTHER_ELEMENTS_COUNT')
)

core_other_elements_df = core_elements_df.join(
                            other_elements_df,
                            on="ID", how="left")

show_as_html(core_elements_df,5)
show_as_html(other_elements_df,5)
show_as_html(core_other_elements_df,5)

Unnamed: 0,ID,CORE_ELEMENTS_COUNT
0,USC00031442,5
1,USC00031582,3
2,USC00031956,2
3,USC00031962,5
4,USC00031982,5


Unnamed: 0,ID,OTHER_ELEMENTS_COUNT
0,USC00030664,10
1,USC00030828,10
2,USC00030842,11
3,USC00031419,7
4,USC00031442,15


Unnamed: 0,ID,CORE_ELEMENTS_COUNT,OTHER_ELEMENTS_COUNT
0,AEM00041217,3,1.0
1,AGE00147708,4,1.0
2,AGE00147710,3,1.0
3,AGE00147714,3,
4,AGE00147719,3,1.0


In [52]:
Modified_Inventory_df= Modified_Inventory_df.join(core_other_elements_df,
                                                 on="ID",
                                                 how="left")
show_as_html(Modified_Inventory_df,5)

Unnamed: 0,ID,FIRST_ACTIVE_YEAR,LAST_ACTIVE_YEAR,DIFFERENT_ELEMENTS_COUNT,CORE_ELEMENTS_COUNT,OTHER_ELEMENTS_COUNT
0,AEM00041217,1983,2024,4,3,1.0
1,AGE00147708,1879,2024,5,4,1.0
2,AGE00147710,1909,2009,4,3,1.0
3,AGE00147714,1896,1938,3,3,
4,AGE00147719,1888,2024,4,3,1.0


<h4>How many stations collect all five core elements? How many collect only precipitation and
no other elements?<h4>

In [51]:
all_core_elements_df = core_elements_df.filter(core_elements_df['CORE_ELEMENTS_COUNT'] == 5)
all_core_elements_count = all_core_elements_df.count()
print(f"Stations collecting all five core elements: {all_core_elements_count}")

Stations collecting all five core elements: 20467


In [53]:
only_precipitation_df = inventory_parsed_df.filter(
    (inventory_parsed_df['ELEMENT'] == 'PRCP') & (~inventory_parsed_df['ID'].isin(other_elements_df.select('ID').rdd.flatMap(lambda x: x).collect()))
)
only_precipitation_count = only_precipitation_df.select('ID').distinct().count()
print(f"Stations collecting only precipitation and no other elements: {only_precipitation_count}")

Stations collecting only precipitation and no other elements: 35662


In [45]:
#C5

In [54]:
collected_elements_df = inventory_parsed_df.groupBy('ID').agg(
    F.collect_set('ELEMENT').alias('COLLECTED_ELEMENTS')
)
show_as_html(collected_elements_df,5)

Unnamed: 0,ID,COLLECTED_ELEMENTS
0,AEM00041217,"[TMAX, TMIN, PRCP, TAVG]"
1,AGE00147708,"[TMAX, TMIN, PRCP, SNWD, TAVG]"
2,AGE00147710,"[TMAX, TMIN, PRCP, TAVG]"
3,AGE00147714,"[TMAX, TMIN, PRCP]"
4,AGE00147719,"[TMAX, TMIN, PRCP, TAVG]"


In [55]:
Modified_Inventory_df= Modified_Inventory_df.join(collected_elements_df,
                                                 on="ID",
                                                 how="left")
show_as_html(Modified_Inventory_df,5)

Unnamed: 0,ID,FIRST_ACTIVE_YEAR,LAST_ACTIVE_YEAR,DIFFERENT_ELEMENTS_COUNT,CORE_ELEMENTS_COUNT,OTHER_ELEMENTS_COUNT,COLLECTED_ELEMENTS
0,AEM00041217,1983,2024,4,3,1.0,"[TMAX, TMIN, PRCP, TAVG]"
1,AGE00147708,1879,2024,5,4,1.0,"[TMAX, TMIN, PRCP, SNWD, TAVG]"
2,AGE00147710,1909,2009,4,3,1.0,"[TMAX, TMIN, PRCP, TAVG]"
3,AGE00147714,1896,1938,3,3,,"[TMAX, TMIN, PRCP]"
4,AGE00147719,1888,2024,4,3,1.0,"[TMAX, TMIN, PRCP, TAVG]"


In [57]:
output_path3 = "hdfs:///user/uwi14/outputs/ghcnd/Modified_Inventory_df.parquet"
Modified_Inventory_df.write.mode("overwrite").option("compression", "snappy").parquet(output_path3)

<h4>(e) LEFT JOIN stations and your output from part (d).
This enriched stations table will be useful. Save it to your output directory. Think carefully
about the file format that you use (e.g. csv, csv.gz, parquet) with respect to consistency
and efficiency. From now on assume that stations refers to this enriched table with all
the new columns included.<h4>

In [59]:
# Perform the LEFT JOIN
Final_stations_df = joined_country_state_df.join(Modified_Inventory_df,
                                                on="ID",
                                                how = "left")
show_as_html(Final_stations_df,5)

Unnamed: 0,ID,STATE,CODE,LATITUDE,LONGITUDE,ELEVATION,NAME,GSN_FLAG,HCN_CRN_FLAG,WMO_ID,Country_Name,State_Name,FIRST_ACTIVE_YEAR,LAST_ACTIVE_YEAR,DIFFERENT_ELEMENTS_COUNT,CORE_ELEMENTS_COUNT,OTHER_ELEMENTS_COUNT,COLLECTED_ELEMENTS
0,AEM00041217,,AE,24.433,54.651,26.8,ABU DHABI INTL,,,41217.0,United Arab Emirates,,1983,2024,4,3,1.0,"[TMAX, TMIN, PRCP, TAVG]"
1,AGE00147708,,AG,36.72,4.05,222.0,TIZI OUZOU,,,60395.0,Algeria,,1879,2024,5,4,1.0,"[TMAX, TMIN, PRCP, SNWD, TAVG]"
2,AGE00147710,,AG,36.75,5.1,9.0,BEJAIA-BOUGIE (PORT),,,60401.0,Algeria,,1909,2009,4,3,1.0,"[TMAX, TMIN, PRCP, TAVG]"
3,AGE00147714,,AG,35.77,0.8,78.0,ORAN-CAP FALCON,,,,Algeria,,1896,1938,3,3,,"[TMAX, TMIN, PRCP]"
4,AGE00147719,,AG,33.7997,2.89,767.0,LAGHOUAT,,,60545.0,Algeria,,1888,2024,4,3,1.0,"[TMAX, TMIN, PRCP, TAVG]"


In [60]:
output_path2 = "hdfs:///user/uwi14/outputs/ghcnd/Final_stations_df.parquet"
Final_stations_df.write.mode("overwrite").option("compression", "snappy").parquet(output_path2)

In [61]:
pandas_df3 = Final_stations_df.toPandas()
pandas_df3.to_csv("Final_stations_df.csv", index=False)

In [62]:
!hdfs dfs -du -h /user/uwi14/outputs/ghcnd/

5.0 M  20.1 M  /user/uwi14/outputs/ghcnd/Final_stations_df.parquet
1.5 M  6.0 M   /user/uwi14/outputs/ghcnd/Modified_Inventory_df.parquet
2.3 M  9.2 M   /user/uwi14/outputs/ghcnd/joined_countries_df.csv
2.3 M  9.2 M   /user/uwi14/outputs/ghcnd/joined_countries_state_df.csv


In [63]:
#!hdfs dfs -rm -r /user/uwi14/outputs/ghcnd/joined_countries_state_df.csv

<h4>(f) LEFT JOIN your 1000 rows subset of daily and your output from part (e). Are there any
stations in your subset of daily that are not in stations at all?<h4>

In [65]:
Daily_stations_df = daily_data.join(
    Final_stations_df, daily_data.ID == Final_stations_df.ID, "left_outer")
#Daily_stations_df=Daily_stations_df.drop(Final_stations_df['ID'])
show_as_html(Daily_stations_df,5)

Unnamed: 0,ID,DATE,ELEMENT,VALUE,M_FLAG,Q_FLAG,S_FLAG,OBS_TIME,ID.1,STATE,...,HCN_CRN_FLAG,WMO_ID,Country_Name,State_Name,FIRST_ACTIVE_YEAR,LAST_ACTIVE_YEAR,DIFFERENT_ELEMENTS_COUNT,CORE_ELEMENTS_COUNT,OTHER_ELEMENTS_COUNT,COLLECTED_ELEMENTS
0,AE000041196,20230101,TMAX,252.0,,,S,,AE000041196,,...,,41196,United Arab Emirates,,1944,2024,4,3,1,"[TMAX, TMIN, PRCP, TAVG]"
1,AE000041196,20230101,TMIN,149.0,,,S,,AE000041196,,...,,41196,United Arab Emirates,,1944,2024,4,3,1,"[TMAX, TMIN, PRCP, TAVG]"
2,AE000041196,20230101,PRCP,0.0,D,,S,,AE000041196,,...,,41196,United Arab Emirates,,1944,2024,4,3,1,"[TMAX, TMIN, PRCP, TAVG]"
3,AE000041196,20230101,TAVG,207.0,H,,S,,AE000041196,,...,,41196,United Arab Emirates,,1944,2024,4,3,1,"[TMAX, TMIN, PRCP, TAVG]"
4,AEM00041194,20230101,TMAX,255.0,,,S,,AEM00041194,,...,,41194,United Arab Emirates,,1983,2024,4,3,1,"[TMAX, TMIN, PRCP, TAVG]"


In [66]:
stations_not_in_stations_df = Daily_stations_df.filter(Final_stations_df["ID"].isNull())
show_as_html(stations_not_in_stations_df,10)
stations_only_in_daily_df_left_count = stations_not_in_stations_df.count()

print(f"Number of stations not in stations using left-join: {stations_only_in_daily_df_left_count}")

Unnamed: 0,ID,DATE,ELEMENT,VALUE,M_FLAG,Q_FLAG,S_FLAG,OBS_TIME,ID.1,STATE,...,HCN_CRN_FLAG,WMO_ID,Country_Name,State_Name,FIRST_ACTIVE_YEAR,LAST_ACTIVE_YEAR,DIFFERENT_ELEMENTS_COUNT,CORE_ELEMENTS_COUNT,OTHER_ELEMENTS_COUNT,COLLECTED_ELEMENTS
0,AGM00060640,20230101,TMAX,211.0,,,S,,,,...,,,,,,,,,,
1,AGM00060640,20230101,TMIN,58.0,,,S,,,,...,,,,,,,,,,
2,AGM00060640,20230101,PRCP,0.0,,,S,,,,...,,,,,,,,,,
3,AGM00060640,20230101,TAVG,131.0,H,,S,,,,...,,,,,,,,,,
4,AGM00060670,20230101,TAVG,102.0,H,,S,,,,...,,,,,,,,,,
5,AGM00060656,20230101,TMIN,86.0,,,S,,,,...,,,,,,,,,,
6,AGM00060656,20230101,TAVG,150.0,H,,S,,,,...,,,,,,,,,,


Number of stations not in stations using left-join: 7


<h4>How expensive do you think it would be to LEFT JOIN all of daily and stations? Could
you determine if there are any stations in daily that are not in stations without using
LEFT JOIN?<h4>

In [None]:
#We can use anti-join to determine the stations in daily that are not in stations

In [67]:
stations_only_in_daily_df  = daily_data.join(
    Final_stations_df, daily_data.ID == Final_stations_df.ID, "left_anti")
show_as_html(stations_only_in_daily_df,10)
stations_only_in_daily_df_anti_count = stations_only_in_daily_df.count()

print(f"Number of stations in daily not in stations using anti-join: {stations_only_in_daily_df_anti_count}")

Unnamed: 0,ID,DATE,ELEMENT,VALUE,M_FLAG,Q_FLAG,S_FLAG,OBS_TIME
0,AGM00060640,20230101,TMAX,211.0,,,S,
1,AGM00060640,20230101,TMIN,58.0,,,S,
2,AGM00060640,20230101,PRCP,0.0,,,S,
3,AGM00060640,20230101,TAVG,131.0,H,,S,
4,AGM00060670,20230101,TAVG,102.0,H,,S,
5,AGM00060656,20230101,TMIN,86.0,,,S,
6,AGM00060656,20230101,TAVG,150.0,H,,S,


Number of stations in daily not in stations using anti-join: 7


In [17]:
# Run this cell before closing the notebook or kill your spark application by hand using the link in the Spark UI

stop_spark()