### Spark notebook ###

This notebook will only work in a Jupyter session running on `mathmadslinux2p`.

You can start your own Jupyter session on `mathmadslinux2p` and open this notebook in Chrome on the MADS Windows server by

**Steps**

1. Login to the MADS Windows server using https://mathportal.canterbury.ac.nz/.
2. Download or copy this notebook to your home directory.
3. Open powershell and run `ssh mathmadslinux2p`.
4. Run `start_pyspark_notebook` or `/opt/anaconda3/bin/jupyter-notebook --ip 132.181.129.68 --port $((8000 + $((RANDOM % 999))))`.
5. Copy / paste the url provided in the shell window into Chrome on the MADS Windows server.
6. Open the notebook from the Jupyter root directory (which is your home directory).
7. Run `start_spark()` to start a spark session in the notebook.
8. Run `stop_spark()` before closing the notebook or kill your spark application by hand using the link in the Spark UI.

In [1]:
# Run this cell to import pyspark and to define start_spark() and stop_spark()

import findspark

findspark.init()

import getpass
import pandas
import pyspark
import random
import re

from IPython.display import display, HTML
from pyspark import SparkContext
from pyspark.sql import SparkSession


# Functions used below

def username():
    """Get username with any domain information removed.
    """

    return re.sub('@.*', '', getpass.getuser())


def dict_to_html(d):
    """Convert a Python dictionary into a two column table for display.
    """

    html = []

    html.append(f'<table width="100%" style="width:100%; font-family: monospace;">')
    for k, v in d.items():
        html.append(f'<tr><td style="text-align:left;">{k}</td><td>{v}</td></tr>')
    html.append(f'</table>')

    return ''.join(html)


def show_as_html(df, n=20):
    """Leverage existing pandas jupyter integration to show a spark dataframe as html.
    
    Args:
        n (int): number of rows to show (default: 20)
    """

    display(df.limit(n).toPandas())

    
def display_spark():
    """Display the status of the active Spark session if one is currently running.
    """
    
    if 'spark' in globals() and 'sc' in globals():

        name = sc.getConf().get("spark.app.name")
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:green">active</span></b>, look for <code>{name}</code> under the running applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'<li><a href="{sc.uiWebUrl}" target="_blank">Spark Application UI</a></li>',
            f'</ul>',
            f'<p><b>Config</b></p>',
            dict_to_html(dict(sc.getConf().getAll())),
            f'<p><b>Notes</b></p>',
            f'<ul>',
            f'<li>The spark session <code>spark</code> and spark context <code>sc</code> global variables have been defined by <code>start_spark()</code>.</li>',
            f'<li>Please run <code>stop_spark()</code> before closing the notebook or restarting the kernel or kill <code>{name}</code> by hand using the link in the Spark UI.</li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))
        
    else:
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:red">stopped</span></b>, confirm that <code>{username() + " (jupyter)"}</code> is under the completed applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))


# Functions to start and stop spark

def start_spark(executor_instances=2, executor_cores=1, worker_memory=1, master_memory=1):
    """Start a new Spark session and define globals for SparkSession (spark) and SparkContext (sc).
    
    Args:
        executor_instances (int): number of executors (default: 2)
        executor_cores (int): number of cores per executor (default: 1)
        worker_memory (float): worker memory (default: 1)
        master_memory (float): master memory (default: 1)
    """

    global spark
    global sc

    user = username()
    
    cores = executor_instances * executor_cores
    partitions = cores * 4
    port = 4000 + random.randint(1, 999)

    spark = (
        SparkSession.builder
        .master("spark://masternode2:7077")
        .config("spark.driver.extraJavaOptions", f"-Dderby.system.home=/tmp/{user}/spark/")
        .config("spark.dynamicAllocation.enabled", "false")
        .config("spark.executor.instances", str(executor_instances))
        .config("spark.executor.cores", str(executor_cores))
        .config("spark.cores.max", str(cores))
        .config("spark.executor.memory", f"{worker_memory}g")
        .config("spark.driver.memory", f"{master_memory}g")
        .config("spark.driver.maxResultSize", "0")
        .config("spark.sql.shuffle.partitions", str(partitions))
        .config("spark.ui.port", str(port))
        .appName(user + " (jupyter)")
        .getOrCreate()
    )
    sc = SparkContext.getOrCreate()
    
    display_spark()

    
def stop_spark():
    """Stop the active Spark session and delete globals for SparkSession (spark) and SparkContext (sc).
    """

    global spark
    global sc

    if 'spark' in globals() and 'sc' in globals():

        spark.stop()

        del spark
        del sc

    display_spark()


# Make css changes to improve spark output readability

html = [
    '<style>',
    'pre { white-space: pre !important; }',
    'table.dataframe td { white-space: nowrap !important; }',
    'table.dataframe thead th:first-child, table.dataframe tbody th { display: none; }',
    '</style>',
]
display(HTML(''.join(html)))

### Example notebook ###

The code below provides a template for how you would use a notebook to start spark, run some code, and then stop spark.

**Steps**

- Run `start_spark()` to start a spark session in the notebook (only change the default resources when advised to do so for an exercise or assignment)
- Write and run code interactively, creating additional cells as needed.
- Run `stop_spark()` before closing the notebook or kill your spark application by hand using the link in the [Spark UI](http://mathmadslinux2p.canterbury.ac.nz:8080/).

In [2]:
# Run this cell to start a spark session in this notebook

start_spark(executor_instances=4, executor_cores=4, worker_memory=16, master_memory=2)

0,1
spark.sql.shuffle.partitions,64
spark.dynamicAllocation.enabled,false
spark.executor.instances,4
spark.driver.port,32829
spark.master,spark://masternode2:7077
spark.cores.max,16
spark.executor.id,driver
spark.sql.warehouse.dir,file:/users/home/dca129/Assignment1/spark-warehouse
spark.driver.host,mathmadslinux2p.canterbury.ac.nz
spark.executor.cores,4


In [27]:
# Write your imports here or insert cells below

from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import to_date,substring,length
import math

In [28]:
# create schema for daily

daily_schema = StructType([
    StructField("ID",StringType(),True),
    #StructField("Date",DateType(),True),
    StructField("Date",StringType(),True),
    StructField("Element",StringType(),True),
    StructField("Value",FloatType(),True),
    StructField("Measurement Flag",StringType(),True),
    StructField("Quality Flag",StringType(),True),
    StructField("Source Flag",StringType(),True),
    StructField("Observation Time",StringType(),True),
    
])

In [29]:
#Q1.a
# Load MetaData
# Loading 'Stations', 'States', 'Countries', 'Inventory' datasets 

stations_file_path = "hdfs:///data/ghcnd/ghcnd-stations.txt"

states_file_path = "hdfs:///data/ghcnd/ghcnd-states.txt"

countries_file_path = "hdfs:///data/ghcnd/ghcnd-countries.txt"

inventory_file_path = "hdfs:///data/ghcnd/ghcnd-inventory.txt"

In [30]:
# Load ghcnd-stations.txt into spark
raw_stations = spark.read.text(stations_file_path)

# Show the content of the DataFrame
#raw_stations.show(truncate=False)

#parsing using substring pyspark.sql function
df_stations = raw_stations.select(
    substring("value", 1, 11).alias("ID"),
    substring("value", 13, 8).cast(DoubleType()).alias("LATITUDE"),
    substring("value", 22, 9).cast(DoubleType()).alias("LONGITUDE"),
    substring("value", 32, 6).cast(DoubleType()).alias("ELEVATION"),
    substring("value", 39, 2).alias("STATE"),
    substring("value", 42, 30).alias("NAME"),
    substring("value", 73, 3).alias("GSN_FLAG"),
    substring("value", 77, 3).alias("HCN_CRN_FLAG"),
    substring("value", 81, 5).alias("WMO_ID"),
)

show_as_html(df_stations)

Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEVATION,STATE,NAME,GSN_FLAG,HCN_CRN_FLAG,WMO_ID
0,ACW00011604,17.1167,-61.7833,10.1,,ST JOHNS COOLIDGE FLD,,,
1,ACW00011647,17.1333,-61.7833,19.2,,ST JOHNS,,,
2,AE000041196,25.333,55.517,34.0,,SHARJAH INTER. AIRP,GSN,,41196.0
3,AEM00041194,25.255,55.364,10.4,,DUBAI INTL,,,41194.0
4,AEM00041217,24.433,54.651,26.8,,ABU DHABI INTL,,,41217.0
5,AEM00041218,24.262,55.609,264.9,,AL AIN INTL,,,41218.0
6,AF000040930,35.317,69.017,3366.0,,NORTH-SALANG,GSN,,40930.0
7,AFM00040938,34.21,62.228,977.2,,HERAT,,,40938.0
8,AFM00040948,34.566,69.212,1791.3,,KABUL INTL,,,40948.0
9,AFM00040990,31.5,65.85,1010.0,,KANDAHAR AIRPORT,,,40990.0


In [31]:
# Load ghcnd-states.txt into spark
raw_states = spark.read.text(states_file_path)

# Show the content of the DataFrame
raw_states.show(truncate=False)

#parsing using substring pyspark.sql function
df_states = raw_states.select(
    substring("value", 1, 2).alias("CODE"),
    substring("value", 4, 46).alias("NAME"),
)

show_as_html(df_states)

+--------------------------------------------------+
|value                                             |
+--------------------------------------------------+
|AB ALBERTA                                        |
|AK ALASKA                                         |
|AL ALABAMA                                        |
|AR ARKANSAS                                       |
|AS AMERICAN SAMOA                                 |
|AZ ARIZONA                                        |
|BC BRITISH COLUMBIA                               |
|CA CALIFORNIA                                     |
|CO COLORADO                                       |
|CT CONNECTICUT                                    |
|DC DISTRICT OF COLUMBIA                           |
|DE DELAWARE                                       |
|FL FLORIDA                                        |
|FM MICRONESIA                                     |
|GA GEORGIA                                        |
|GU GUAM                                      

Unnamed: 0,CODE,NAME
0,AB,ALBERTA
1,AK,ALASKA
2,AL,ALABAMA
3,AR,ARKANSAS
4,AS,AMERICAN SAMOA
5,AZ,ARIZONA
6,BC,BRITISH COLUMBIA
7,CA,CALIFORNIA
8,CO,COLORADO
9,CT,CONNECTICUT


In [32]:
# Load ghcnd-countries.txt into spark
raw_countries = spark.read.text(countries_file_path)

# Show the content of the DataFrame
raw_countries.show(truncate=False)

#parsing using substring pyspark.sql function
df_countries = raw_countries.select(
    substring("value", 1, 2).alias("CODE"),
    substring("value", 4, 60).alias("NAME"),
)

show_as_html(df_countries)

+----------------------------------+
|value                             |
+----------------------------------+
|AC Antigua and Barbuda            |
|AE United Arab Emirates           |
|AF Afghanistan                    |
|AG Algeria                        |
|AJ Azerbaijan                     |
|AL Albania                        |
|AM Armenia                        |
|AO Angola                         |
|AQ American Samoa [United States] |
|AR Argentina                      |
|AS Australia                      |
|AU Austria                        |
|AY Antarctica                     |
|BA Bahrain                        |
|BB Barbados                       |
|BC Botswana                       |
|BD Bermuda [United Kingdom]       |
|BE Belgium                        |
|BF Bahamas, The                   |
|BG Bangladesh                     |
+----------------------------------+
only showing top 20 rows



Unnamed: 0,CODE,NAME
0,AC,Antigua and Barbuda
1,AE,United Arab Emirates
2,AF,Afghanistan
3,AG,Algeria
4,AJ,Azerbaijan
5,AL,Albania
6,AM,Armenia
7,AO,Angola
8,AQ,American Samoa [United States]
9,AR,Argentina


In [33]:
# Load ghcnd-inventory.txt into spark
raw_inventory = spark.read.text(inventory_file_path)
raw_inventory.show(truncate=False)

#parsing using substring pyspark.sql function
df_inventory = raw_inventory.select(
    substring("value", 1, 11).alias("ID"),
    substring("value", 13, 8).cast(DoubleType()).alias("LATITUDE"),
    substring("value", 22, 9).cast(DoubleType()).alias("LONGITUDE"),
    substring("value", 32, 4).alias("ELEMENT"),
    substring("value", 37, 4).cast(IntegerType()).alias("FIRSTYEAR"),
    substring("value", 42, 4).cast(IntegerType()).alias("LASTYEAR"),
)

df_inventory.cache()

show_as_html(df_inventory)

+---------------------------------------------+
|value                                        |
+---------------------------------------------+
|ACW00011604  17.1167  -61.7833 TMAX 1949 1949|
|ACW00011604  17.1167  -61.7833 TMIN 1949 1949|
|ACW00011604  17.1167  -61.7833 PRCP 1949 1949|
|ACW00011604  17.1167  -61.7833 SNOW 1949 1949|
|ACW00011604  17.1167  -61.7833 SNWD 1949 1949|
|ACW00011604  17.1167  -61.7833 PGTM 1949 1949|
|ACW00011604  17.1167  -61.7833 WDFG 1949 1949|
|ACW00011604  17.1167  -61.7833 WSFG 1949 1949|
|ACW00011604  17.1167  -61.7833 WT03 1949 1949|
|ACW00011604  17.1167  -61.7833 WT08 1949 1949|
|ACW00011604  17.1167  -61.7833 WT16 1949 1949|
|ACW00011647  17.1333  -61.7833 TMAX 1961 1961|
|ACW00011647  17.1333  -61.7833 TMIN 1961 1961|
|ACW00011647  17.1333  -61.7833 PRCP 1957 1970|
|ACW00011647  17.1333  -61.7833 SNOW 1957 1970|
|ACW00011647  17.1333  -61.7833 SNWD 1957 1970|
|ACW00011647  17.1333  -61.7833 WT03 1961 1961|
|ACW00011647  17.1333  -61.7833 WT16 196

Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEMENT,FIRSTYEAR,LASTYEAR
0,ACW00011604,17.1167,-61.7833,TMAX,1949,1949
1,ACW00011604,17.1167,-61.7833,TMIN,1949,1949
2,ACW00011604,17.1167,-61.7833,PRCP,1949,1949
3,ACW00011604,17.1167,-61.7833,SNOW,1949,1949
4,ACW00011604,17.1167,-61.7833,SNWD,1949,1949
5,ACW00011604,17.1167,-61.7833,PGTM,1949,1949
6,ACW00011604,17.1167,-61.7833,WDFG,1949,1949
7,ACW00011604,17.1167,-61.7833,WSFG,1949,1949
8,ACW00011604,17.1167,-61.7833,WT03,1949,1949
9,ACW00011604,17.1167,-61.7833,WT08,1949,1949


<a id="Analysis"></a>
## Analysis

In [34]:
#Q1.a
# Count the total number of unique stations
# Total stations and active stations

total_stations = df_stations.select("ID").distinct().count()
print(f"Total number of stations: {total_stations}")

Total number of stations: 127994


In [35]:
# Active stations *2024
df_active_2024 = df_inventory.filter(F.col("LASTYEAR") == 2024)

# total distinct stations that were active *2024
active_stations_2024 = df_active_2024.select("ID").distinct().count()

# Print the number of stations active *2024
print(f"Number of stations active in 2024: {active_stations_2024}")

Number of stations active in 2024: 36516


In [36]:
# Displaying active stations details by broadcast join between inventory and stations table
df_joined = df_inventory.join(F.broadcast(df_stations), on="ID", how="inner")

# Filter for stations that were active *2024 LASTYEAR
df_active_2024 = df_joined.filter(F.col("LASTYEAR") == 2024)

# Get the distinct station IDs to count the unique stations
active_station_count = df_active_2024.select("ID").distinct().count()
df_active_2024.show()

print(f"Number of stations active in 2024: {active_station_count}")

+-----------+--------+---------+-------+---------+--------+--------+---------+---------+-----+--------------------+--------+------------+------+
|         ID|LATITUDE|LONGITUDE|ELEMENT|FIRSTYEAR|LASTYEAR|LATITUDE|LONGITUDE|ELEVATION|STATE|                NAME|GSN_FLAG|HCN_CRN_FLAG|WMO_ID|
+-----------+--------+---------+-------+---------+--------+--------+---------+---------+-----+--------------------+--------+------------+------+
|AE000041196|  25.333|   55.517|   TMAX|     1944|    2024|  25.333|   55.517|     34.0|     |SHARJAH INTER. AI...|     GSN|            | 41196|
|AE000041196|  25.333|   55.517|   TMIN|     1944|    2024|  25.333|   55.517|     34.0|     |SHARJAH INTER. AI...|     GSN|            | 41196|
|AE000041196|  25.333|   55.517|   PRCP|     1944|    2024|  25.333|   55.517|     34.0|     |SHARJAH INTER. AI...|     GSN|            | 41196|
|AE000041196|  25.333|   55.517|   TAVG|     1944|    2024|  25.333|   55.517|     34.0|     |SHARJAH INTER. AI...|     GSN|      

In [37]:
#Stations in GCOS surface network 
#Checking different categories of GCOS surface network
df_stations.select("HCN_CRN_FLAG").distinct().show()
df_stations.count() == df_stations.distinct().count()

+------------+
|HCN_CRN_FLAG|
+------------+
|            |
|         CRN|
|         HCN|
+------------+



True

In [38]:
# Checking HCN and CRN count 

# Stations in HCN
hcn_count = df_stations.filter(F.col("HCN_CRN_FLAG") == "HCN").count()

# Stations in CRN
crn_count = df_stations.filter(F.col("HCN_CRN_FLAG") == "CRN").count()

# Filter stations that are NOT in HCN or CRN
not_hcn_crn_count = df_stations.filter((F.col("HCN_CRN_FLAG") != "HCN") & (F.col("HCN_CRN_FLAG") != "CRN")).count()

# Filter stations that are in both HCN or CRN
both_hcn_crn_count = df_stations.filter((F.col("HCN_CRN_FLAG") == "HCN") & (F.col("HCN_CRN_FLAG") == "CRN")).count()

print(f"Number of stations in the US Historical Climatology Network (HCN): {hcn_count}")
print(f"Number of stations in the US Climate Reference Network (CRN): {crn_count}")
print(f"Number of stations that are NOT in HCN or CRN: {not_hcn_crn_count}")
print(f"Number of stations that are in both HCN or CRN: {both_hcn_crn_count}")

Number of stations in the US Historical Climatology Network (HCN): 1218
Number of stations in the US Climate Reference Network (CRN): 234
Number of stations that are NOT in HCN or CRN: 126542
Number of stations that are in both HCN or CRN: 0


In [39]:
# GCOS surface network count 
df_stations.select("GSN_FLAG").distinct().show()
gsn_count = df_stations.filter(F.col("GSN_FLAG") == "GSN").count()
print(f"Number of stations in GCOS surface network: {gsn_count}")

+--------+
|GSN_FLAG|
+--------+
|        |
|     GSN|
+--------+

Number of stations in GCOS surface network: 991


In [40]:
# stations in more than one network
more_than_one_network = df_stations.filter(
    (F.col("GSN_FLAG") == "GSN") & ((F.col("HCN_CRN_FLAG") == "HCN") | (F.col("HCN_CRN_FLAG") == "CRN")))
more_than_one_network.show()

print(f"Count of statiosn which are in more than one network are: {more_than_one_network.count()}")

+-----------+--------+---------+---------+-----+--------------------+--------+------------+------+
|         ID|LATITUDE|LONGITUDE|ELEVATION|STATE|                NAME|GSN_FLAG|HCN_CRN_FLAG|WMO_ID|
+-----------+--------+---------+---------+-----+--------------------+--------+------------+------+
|USW00003870| 34.8833| -82.2197|    325.8|   SC|GREER            ...|     GSN|         HCN| 72312|
|USW00012836| 24.5569| -81.7553|      0.3|   FL|KEY W INTL AP    ...|     GSN|         HCN| 72201|
|USW00012921| 29.5442| -98.4839|    243.5|   TX|SAN ANTONIO INTL ...|     GSN|         HCN| 72253|
|USW00013782|  32.775| -79.9239|      3.0|   SC|DWTN CHARLESTON  ...|     GSN|         HCN| 72208|
|USW00014742| 44.4683|   -73.15|    101.2|   VT|BURLINGTON INTL A...|     GSN|         HCN| 72617|
|USW00014771| 43.1111| -76.1039|    125.0|   NY|SYRACUSE HANCOCK ...|     GSN|         HCN| 72519|
|USW00014922| 44.8853| -93.2314|    254.5|   MN|MINNEAPOLIS-ST PA...|     GSN|         HCN| 72658|
|USW000230

In [41]:
# Q1.b
# Southern Hemisphere Latitude is less than zero
# Stations in Southern Hemisphere
# Stations where LATITUDE is less than 0 (Southern Hemisphere)
southern_hemisphere_stations = df_stations.filter(F.col("LATITUDE") < 0)

# Show the result
print(f"Number of stations in the Southern Hemisphere: {southern_hemisphere_stations.count()}")

Number of stations in the Southern Hemisphere: 25357


In [42]:
# Identifying Territories from Countries 
# Filter rows where the 'NAME' column contains'[' and ']'
territories_df = df_countries.filter(F.col("NAME").contains('['))

# Show the resulting territories
territories_df.show(truncate=False)

# Count the number of territories
territories_count = territories_df.count()

print(f"Number of territories: {territories_count}")

+----+---------------------------------------------------+
|CODE|NAME                                               |
+----+---------------------------------------------------+
|AQ  |American Samoa [United States]                     |
|BD  |Bermuda [United Kingdom]                           |
|CJ  |Cayman Islands [United Kingdom]                    |
|CK  |Cocos (Keeling) Islands [Australia]                |
|CQ  |Northern Mariana Islands [United States]           |
|CW  |Cook Islands [New Zealand]                         |
|EU  |Europa Island [France]                             |
|FG  |French Guiana [France]                             |
|FK  |Falkland Islands (Islas Malvinas) [United Kingdom] |
|FS  |French Southern and Antarctic Lands [France]       |
|GI  |Gibraltar [United Kingdom]                         |
|GL  |Greenland [Denmark]                                |
|GP  |Guadeloupe [France]                                |
|GQ  |Guam [United States]                              

In [43]:
# Identifying United States Territories
united_states_territories = df_countries.filter(F.col("NAME").contains('[United States]'))
united_states_territories.show(truncate=False)

+----+-----------------------------------------+
|CODE|NAME                                     |
+----+-----------------------------------------+
|AQ  |American Samoa [United States]           |
|CQ  |Northern Mariana Islands [United States] |
|GQ  |Guam [United States]                     |
|JQ  |Johnston Atoll [United States]           |
|LQ  |Palmyra Atoll [United States]            |
|RQ  |Puerto Rico [United States]              |
|VQ  |Virgin Islands [United States]           |
|WQ  |Wake Island [United States]              |
+----+-----------------------------------------+



In [44]:
#Broadcast join to check stations are there in total in the territories of the United States around the world
#Broadcast the united_states_territories DataFrame
broadcast_territories = F.broadcast(united_states_territories)

# broadcast join
stations_in_us_territories = df_stations.join(
    broadcast_territories,
    df_stations["STATE"] == broadcast_territories["CODE"],
    "inner"
)

# Count the number of stations in the U.S. territories
stations_in_us_territories_count = stations_in_us_territories.count()


print(f"Number of stations in U.S. territories (excluding the U.S. itself): {stations_in_us_territories_count}")

Number of stations in U.S. territories (excluding the U.S. itself): 0


In [45]:
df_stations.select("STATE").distinct().show()

+-----+
|STATE|
+-----+
|   NT|
|   NH|
|   ND|
|   MB|
|   AZ|
|   NM|
|     |
|   AR|
|   VI|
|   KS|
|   LA|
|   NL|
|   NY|
|   BC|
|   PR|
|   WA|
|   UT|
|   AK|
|   YT|
|   IA|
+-----+
only showing top 20 rows



In [46]:
##Q1. C Count the total number of stations in each country, and join these counts onto countries so that we can use these counts later if desired.
# Stations in each country 
# Total number of stations in each country

# Extract the first two characters from the 'ID' column to get the country code
df_stations_with_country_code = df_stations.withColumn(
    "COUNTRY_CODE", F.expr("substring(ID, 1, 2)")
)

# Group by the country code and count the number of stations in each country
station_counts_by_country = df_stations_with_country_code.groupBy("COUNTRY_CODE").count()
station_counts_by_country.show(truncate=False)

+------------+-----+
|COUNTRY_CODE|count|
+------------+-----+
|TI          |62   |
|SW          |1721 |
|UG          |8    |
|MX          |5249 |
|NI          |10   |
|GM          |1123 |
|TO          |10   |
|HU          |10   |
|NH          |6    |
|MB          |2    |
|RS          |1123 |
|CJ          |1    |
|EG          |23   |
|HO          |8    |
|IV          |21   |
|PS          |12   |
|TL          |1    |
|AR          |101  |
|CG          |13   |
|SU          |28   |
+------------+-----+
only showing top 20 rows



In [47]:
# Join total number of stations in each country with countries dataframe
# Join the counts onto the df_countries DataFrame
countries_with_stations_count = df_countries.join(
    station_counts_by_country,
    df_countries["CODE"] == station_counts_by_country["COUNTRY_CODE"],
    "left"
).drop("COUNTRY_CODE")

countries_with_stations_count.show(truncate=False)

# Cross check with sample
df_stations.filter(F.col("ID").startswith("AY")).count()

+----+--------------------------------+-----+
|CODE|NAME                            |count|
+----+--------------------------------+-----+
|TI  |Tajikistan                      |62   |
|MX  |Mexico                          |5249 |
|NI  |Nigeria                         |10   |
|SW  |Sweden                          |1721 |
|UG  |Uganda                          |8    |
|GM  |Germany                         |1123 |
|HU  |Hungary                         |10   |
|NH  |Vanuatu                         |6    |
|TO  |Togo                            |10   |
|MB  |Martinique [France]             |2    |
|RS  |Russia                          |1123 |
|CJ  |Cayman Islands [United Kingdom] |1    |
|EG  |Egypt                           |23   |
|HO  |Honduras                        |8    |
|IV  |Cote D'Ivoire                   |21   |
|PS  |Palau                           |12   |
|AR  |Argentina                       |101  |
|CG  |Congo (Kinshasa)                |13   |
|TL  |Tokelau [New Zealand]       

102

In [48]:
# Saving modified countries to hdfs output directory
parquet_path = "/user/dca129/assignment1/output/countries/"

countries_with_stations_count.write.mode("overwrite").parquet(parquet_path)

In [49]:
# Modifying states to join number of counts of stations
# Join df_stations with df_states on STATE and CODE
# Count total number of stations in each state
df_joined = df_stations.join(
    df_states,
    df_stations["STATE"] == df_states["CODE"],
    "inner"
)
#df_joined.show()
# Group by the state and count the number of stations in each state
station_counts_by_state = df_joined.groupBy(F.col("STATE")).count()

# Rename the count column
station_counts_by_state = station_counts_by_state.withColumnRenamed("count", "StationCount")

station_counts_by_state.show(truncate=False)

+-----+------------+
|STATE|StationCount|
+-----+------------+
|NT   |137         |
|NH   |482         |
|ND   |580         |
|MB   |734         |
|AZ   |1676        |
|NM   |2273        |
|AR   |952         |
|VI   |74          |
|NL   |335         |
|KS   |2312        |
|LA   |836         |
|NY   |1872        |
|BC   |1714        |
|PR   |253         |
|WA   |1679        |
|UT   |982         |
|AK   |1046        |
|YT   |128         |
|IA   |1070        |
|GU   |29          |
+-----+------------+
only showing top 20 rows



In [50]:
# Join total number of stations in each state
# Join the counts onto the df_countries DataFrame
states_with_stations_count = df_states.join(
    station_counts_by_state,
    df_states["CODE"] == station_counts_by_state["STATE"],
    "left"
)
#.drop("COUNTRY_CODE)
states_with_stations_count.show(truncate=False)

# Cross check with one sample 
df_stations.filter(F.col("STATE")=="IL").count()

+----+-------------------------+-----+------------+
|CODE|NAME                     |STATE|StationCount|
+----+-------------------------+-----+------------+
|NT  |NORTHWEST TERRITORIES    |NT   |137         |
|ND  |NORTH DAKOTA             |ND   |580         |
|NH  |NEW HAMPSHIRE            |NH   |482         |
|AZ  |ARIZONA                  |AZ   |1676        |
|MB  |MANITOBA                 |MB   |734         |
|NM  |NEW MEXICO               |NM   |2273        |
|AR  |ARKANSAS                 |AR   |952         |
|VI  |VIRGIN ISLANDS           |VI   |74          |
|KS  |KANSAS                   |KS   |2312        |
|LA  |LOUISIANA                |LA   |836         |
|NL  |NEWFOUNDLAND AND LABRADOR|NL   |335         |
|BC  |BRITISH COLUMBIA         |BC   |1714        |
|NY  |NEW YORK                 |NY   |1872        |
|PR  |PUERTO RICO              |PR   |253         |
|AK  |ALASKA                   |AK   |1046        |
|UT  |UTAH                     |UT   |982         |
|WA  |WASHIN

2193

In [51]:
# save the output
parquet_path = "/user/dca129/assignment1/output/states/"

states_with_stations_count.write.mode("overwrite").parquet(parquet_path)

### Q2 

In [52]:
# Q2.a. function that computes the geographical distance between two stations using their latitude and longitude as arguments.
# Geographical Distance between two stations

#Haversine formula to calculate the distance between two lat-long coordinates
def haversine_distance_calculator(lat1, lon1, lat2, lon2):
    R = 6371.0  # Radius of Earth in kilometers

    lat1 = math.radians(lat1)
    lon1 = math.radians(lon1)
    lat2 = math.radians(lat2)
    lon2 = math.radians(lon2)

    dlat = lat2 - lat1
    dlon = lon2 - lon1

    a = math.sin(dlat / 2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2)**2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))

    distance = R * c
    return distance

In [53]:
# Register the Haversine function as a UDF
haversine_udf = F.udf(haversine_distance_calculator, DoubleType())

In [54]:
# Selecting small subset of stations
df_stations_subset = df_stations.limit(2)
df_stations_subset.show()

+-----------+--------+---------+---------+-----+--------------------+--------+------------+------+
|         ID|LATITUDE|LONGITUDE|ELEVATION|STATE|                NAME|GSN_FLAG|HCN_CRN_FLAG|WMO_ID|
+-----------+--------+---------+---------+-----+--------------------+--------+------------+------+
|ACW00011604| 17.1167| -61.7833|     10.1|     |ST JOHNS COOLIDGE...|        |            |      |
|ACW00011647| 17.1333| -61.7833|     19.2|     |ST JOHNS         ...|        |            |      |
+-----------+--------+---------+---------+-----+--------------------+--------+------------+------+



In [56]:
# Cross join on subset of stations
# Perform CROSS JOIN on the small subset of stations
df_crossed_subset = df_stations_subset.alias('df1').crossJoin(df_stations_subset.alias('df2')) \
    .select(
        F.col('df1.ID').alias('ID1'),
        F.col('df1.LATITUDE').alias('LAT1'),
        F.col('df1.LONGITUDE').alias('LON1'),
        F.col('df2.ID').alias('ID2'),
        F.col('df2.LATITUDE').alias('LAT2'),
        F.col('df2.LONGITUDE').alias('LON2')
    )

In [57]:
# Apply the Haversine UDF to calculate distances
# Apply the UDF to calculate the distance between station pairs
df_with_distances_subset = df_crossed_subset.withColumn(
    'Distance KM',
    haversine_udf(F.col('LAT1'), F.col('LON1'), F.col('LAT2'), F.col('LON2'))
)

df_with_distances_subset.show(truncate=False)

# https://latlongdata.com/distance-calculator/

+-----------+-------+---------+-----------+-------+--------+------------------+
|ID1        |LAT1   |LON1     |ID2        |LAT2   |LON2    |Distance KM       |
+-----------+-------+---------+-----------+-------+--------+------------------+
|US1WAKG0076|47.3308|-122.2673|ACW00011604|17.1167|-61.7833|6407.0777760856   |
|US1WAKG0076|47.3308|-122.2673|ACW00011647|17.1333|-61.7833|6405.75676345013  |
|US1WAKG0077|47.5755|-122.2134|ACW00011604|17.1167|-61.7833|6407.861135721299 |
|US1WAKG0077|47.5755|-122.2134|ACW00011647|17.1333|-61.7833|6406.5335512881675|
+-----------+-------+---------+-----------+-------+--------+------------------+



In [58]:
# Q2.b. What two stations are geographically closest together in New Zealand?
# New Zealnd range of Latitide and Longitude as follows - Latitude - (-34.5 to -47.5) Longitude - (166.5 to 178.5)
df_nz_stations = df_stations.filter(
    (F.col("LATITUDE") >= -47.5) & (F.col("LATITUDE") <= -34.5) &
    (F.col("LONGITUDE") >= 166.5) & (F.col("LONGITUDE") <= 178.5)
)

df_nz_stations.show()

+-----------+--------+---------+---------+-----+--------------------+--------+------------+------+
|         ID|LATITUDE|LONGITUDE|ELEVATION|STATE|                NAME|GSN_FLAG|HCN_CRN_FLAG|WMO_ID|
+-----------+--------+---------+---------+-----+--------------------+--------+------------+------+
|NZ000093012|   -35.1|  173.267|     54.0|     |KAITAIA          ...|        |            | 93119|
|NZ000093292|  -38.65|  177.983|      5.0|     |GISBORNE AERODROM...|     GSN|            | 93292|
|NZ000093417|   -40.9|  174.983|      7.0|     |PARAPARAUMU AWS  ...|     GSN|            | 93420|
|NZ000093844| -46.417|  168.333|      2.0|     |INVERCARGILL AIRP...|     GSN|            | 93845|
|NZ000933090| -39.017|  174.183|     32.0|     |NEW PLYMOUTH AWS ...|     GSN|            | 93309|
|NZ000936150| -42.717|  170.983|     40.0|     |HOKITIKA AERODROM...|        |            | 93781|
|NZ000937470| -44.517|    169.9|    488.0|     |TARA HILLS       ...|     GSN|            | 93747|
|NZM000931

In [59]:
# Cross join on New Zealand stations
# Generate pairs of New Zealand stations
df_nz_crossed = df_nz_stations.alias('df1').crossJoin(df_nz_stations.alias('df2')) \
    .select(
        F.col('df1.ID').alias('ID1'),
        F.col('df1.LATITUDE').alias('LAT1'),
        F.col('df1.LONGITUDE').alias('LON1'),
        F.col('df2.ID').alias('ID2'),
        F.col('df2.LATITUDE').alias('LAT2'),
        F.col('df2.LONGITUDE').alias('LON2')
    )


In [60]:
# Applying the Haversine UDF to compute the distance between station pairs
df_nz_distances = df_nz_crossed.withColumn(
    'Distance KM',
    haversine_udf(F.col('LAT1'), F.col('LON1'), F.col('LAT2'), F.col('LON2'))
)

In [61]:
# Finding Closest pair of stations
# Filter out rows where ID1 and ID2 are the same (comparing the same station)
df_non_same_stations = df_nz_distances.filter(F.col('ID1') != F.col('ID2'))

# Find the closest pair of stations by ordering by distance and selecting the minimum
df_closest_stations = df_non_same_stations.orderBy(F.col('Distance KM')).limit(1)

# Show the closest pair of stations
df_closest_stations.show(truncate=False)

df_closest_stations.select(F.col("ID1"),F.col("ID2")).show()

+-----------+-----+-------+-----------+-------+-----+-----------------+
|ID1        |LAT1 |LON1   |ID2        |LAT2   |LON2 |Distance KM      |
+-----------+-----+-------+-----------+-------+-----+-----------------+
|NZ000093417|-40.9|174.983|NZM00093439|-41.333|174.8|50.52902648213863|
+-----------+-----+-------+-----------+-------+-----+-----------------+

+-----------+-----------+
|        ID1|        ID2|
+-----------+-----------+
|NZ000093417|NZM00093439|
+-----------+-----------+



In [62]:
# Q3.a . # Daily climate summaries
# Count number of rows in daily
hdfs_daily_path = "hdfs:///data/ghcnd/daily/"

daily_data = (
    spark.read.format("com.databricks.spark.csv")
    .option("header","false")
    .option("inferSchema", "false")
    .schema(daily_schema)
    .load(hdfs_daily_path)
)

daily_data.count()

3119374043

In [63]:
# Q3.b. 
# Observations in daily involving core elements
# The five core elements:
#PRCP = Precipitation (tenths of mm)
#SNOW = Snowfall (mm)
#SNWD = Snow depth (mm)
#TMAX = Maximum temperature (tenths of degrees C)
#TMIN = Minimum temperature (tenths of degrees C)

# Define the five core elements
core_elements = ['TMAX', 'TMIN', 'PRCP', 'SNOW', 'SNWD']

# Filter the data for the five core elements
filtered_data = daily_data.filter(F.col("Element").isin(core_elements))

# Count the number of observations for each core element
element_counts = (
    filtered_data.groupBy("Element")
    .agg(F.count("*").alias("Observation_Count"))
    .orderBy(F.desc("Observation_Count"))
)

element_counts.show()

+-------+-----------------+
|Element|Observation_Count|
+-------+-----------------+
|   PRCP|       1073530896|
|   TMAX|        457927581|
|   TMIN|        456739567|
|   SNOW|        356187192|
|   SNWD|        299076145|
+-------+-----------------+



In [64]:
# Find the element with the most observations
most_observed_element = element_counts.limit(1)
most_observed_element.show()

+-------+-----------------+
|Element|Observation_Count|
+-------+-----------------+
|   PRCP|       1073530896|
+-------+-----------------+



In [65]:
# Q3.c
# Filter daily_data for TMAX and TMIN observations
# Counting observations of TMAX which do not have corresponding TMIN
tmax_data = daily_data.filter(F.col("Element") == "TMAX")
tmin_data = daily_data.filter(F.col("Element") == "TMIN")

In [66]:
# Perform a left join to find TMAX observations without corresponding TMIN observations
# Join on ID (station ID) and Date columns
tmax_without_tmin = tmax_data.join(
    tmin_data, 
    on=["ID", "Date"], 
    how="left_anti"  # This gives us TMAX rows that do not have matching TMIN
)

In [67]:
# Count the total number of TMAX observations without a corresponding TMIN
#tmax_without_tmin_count = tmax_without_tmin.count()
print(f"Total TMAX observations without corresponding TMIN: {tmax_without_tmin.count()}")

Total TMAX observations without corresponding TMIN: 10567304


In [68]:
# Count the number of unique stations that contributed to these observations
#unique_stations_count = tmax_without_tmin.select("ID").distinct().count()
print(f"Number of unique stations contributing to these observations: {tmax_without_tmin.select('ID').distinct().count()}")

Number of unique stations contributing to these observations: 28716


In [268]:
# Run this cell before closing the notebook or kill your spark application by hand using the link in the Spark UI

stop_spark()