In [1]:
# Run this cell to import pyspark and to define start_spark() and stop_spark()

import findspark

findspark.init()

import getpass
import pandas
import pyspark
import random
import re

from IPython.display import display, HTML
from pyspark import SparkContext
from pyspark.sql import SparkSession


# Functions used below

def username():
    """Get username with any domain information removed.
    """

    return re.sub('@.*', '', getpass.getuser())


def dict_to_html(d):
    """Convert a Python dictionary into a two column table for display.
    """

    html = []

    html.append(f'<table width="100%" style="width:100%; font-family: monospace;">')
    for k, v in d.items():
        html.append(f'<tr><td style="text-align:left;">{k}</td><td>{v}</td></tr>')
    html.append(f'</table>')

    return ''.join(html)


def show_as_html(df, n=20):
    """Leverage existing pandas jupyter integration to show a spark dataframe as html.
    
    Args:
        n (int): number of rows to show (default: 20)
    """

    display(df.limit(n).toPandas())

    
def display_spark():
    """Display the status of the active Spark session if one is currently running.
    """
    
    if 'spark' in globals() and 'sc' in globals():

        name = sc.getConf().get("spark.app.name")
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:green">active</span></b>, look for <code>{name}</code> under the running applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'<li><a href="{sc.uiWebUrl}" target="_blank">Spark Application UI</a></li>',
            f'</ul>',
            f'<p><b>Config</b></p>',
            dict_to_html(dict(sc.getConf().getAll())),
            f'<p><b>Notes</b></p>',
            f'<ul>',
            f'<li>The spark session <code>spark</code> and spark context <code>sc</code> global variables have been defined by <code>start_spark()</code>.</li>',
            f'<li>Please run <code>stop_spark()</code> before closing the notebook or restarting the kernel or kill <code>{name}</code> by hand using the link in the Spark UI.</li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))
        
    else:
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:red">stopped</span></b>, confirm that <code>{username() + " (jupyter)"}</code> is under the completed applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))


# Functions to start and stop spark

def start_spark(executor_instances=2, executor_cores=1, worker_memory=1, master_memory=1):
    """Start a new Spark session and define globals for SparkSession (spark) and SparkContext (sc).
    
    Args:
        executor_instances (int): number of executors (default: 2)
        executor_cores (int): number of cores per executor (default: 1)
        worker_memory (float): worker memory (default: 1)
        master_memory (float): master memory (default: 1)
    """

    global spark
    global sc

    user = username()
    
    cores = executor_instances * executor_cores
    partitions = cores * 4
    port = 4000 + random.randint(1, 999)

    spark = (
        SparkSession.builder
        .master("spark://masternode2:7077")
        .config("spark.driver.extraJavaOptions", f"-Dderby.system.home=/tmp/{user}/spark/")
        .config("spark.dynamicAllocation.enabled", "false")
        .config("spark.executor.instances", str(executor_instances))
        .config("spark.executor.cores", str(executor_cores))
        .config("spark.cores.max", str(cores))
        .config("spark.executor.memory", f"{worker_memory}g")
        .config("spark.driver.memory", f"{master_memory}g")
        .config("spark.driver.maxResultSize", "0")
        .config("spark.sql.shuffle.partitions", str(partitions))
        .config("spark.ui.port", str(port))
        .appName(user + " (jupyter)")
        .getOrCreate()
    )
    sc = SparkContext.getOrCreate()
    
    display_spark()

    
def stop_spark():
    """Stop the active Spark session and delete globals for SparkSession (spark) and SparkContext (sc).
    """

    global spark
    global sc

    if 'spark' in globals() and 'sc' in globals():

        spark.stop()

        del spark
        del sc

    display_spark()


# Make css changes to improve spark output readability

html = [
    '<style>',
    'pre { white-space: pre !important; }',
    'table.dataframe td { white-space: nowrap !important; }',
    'table.dataframe thead th:first-child, table.dataframe tbody th { display: none; }',
    '</style>',
]
display(HTML(''.join(html)))

In [2]:
# Run this cell to start a spark session in this notebook

start_spark(executor_instances=4, executor_cores=2, worker_memory=4, master_memory=4)

0,1
spark.dynamicAllocation.enabled,false
spark.executor.instances,4
spark.driver.memory,4g
spark.driver.port,41965
spark.executor.memory,4g
spark.ui.port,4935
spark.master,spark://masternode2:7077
spark.app.id,app-20240429170049-0306
spark.driver.extraJavaOptions,-Dderby.system.home=/tmp/cgo82/spark/
spark.executor.id,driver


In [None]:
#Q2 (a) and (b)

In [3]:
# Write your imports and code here or insert cells below

from pyspark.sql import functions as F
from pyspark.sql.types import *


In [4]:
# Daily

schema_daily = StructType([
    StructField("ID", StringType(), True),
    StructField("DATE", StringType(), True),
    StructField("ELEMENT", StringType(), True),
    StructField("VALUE", FloatType(), True),
    StructField("M_FLAG", StringType(), True),
    StructField("Q_FLAG", StringType(), True),
    StructField("S_FLAG", StringType(), True),
    StructField("OBS_TIME", StringType(), True),
   
])
daily_data = (
    spark.read.format("com.databricks.spark.csv")
    .option("header", "false")
    .option("inferSchema", "false")
    .schema(schema_daily)
    .load("hdfs:///data/ghcnd/daily/2023.csv.gz")
    .limit(1000)
)
daily_data.show()

+-----------+--------+-------+-----+------+------+------+--------+
|         ID|    DATE|ELEMENT|VALUE|M_FLAG|Q_FLAG|S_FLAG|OBS_TIME|
+-----------+--------+-------+-----+------+------+------+--------+
|AE000041196|20230101|   TMAX|252.0|  null|  null|     S|    null|
|AE000041196|20230101|   TMIN|149.0|  null|  null|     S|    null|
|AE000041196|20230101|   PRCP|  0.0|     D|  null|     S|    null|
|AE000041196|20230101|   TAVG|207.0|     H|  null|     S|    null|
|AEM00041194|20230101|   TMAX|255.0|  null|  null|     S|    null|
|AEM00041194|20230101|   TMIN|186.0|  null|  null|     S|    null|
|AEM00041194|20230101|   PRCP|  0.0|  null|  null|     S|    null|
|AEM00041194|20230101|   TAVG|223.0|     H|  null|     S|    null|
|AEM00041217|20230101|   TMAX|248.0|  null|  null|     S|    null|
|AEM00041217|20230101|   TMIN|184.0|  null|  null|     S|    null|
|AEM00041217|20230101|   TAVG|215.0|     H|  null|     S|    null|
|AEM00041218|20230101|   TMAX|254.0|  null|  null|     S|    n

In [5]:
daily_data.printSchema()

root
 |-- ID: string (nullable = true)
 |-- DATE: string (nullable = true)
 |-- ELEMENT: string (nullable = true)
 |-- VALUE: float (nullable = true)
 |-- M_FLAG: string (nullable = true)
 |-- Q_FLAG: string (nullable = true)
 |-- S_FLAG: string (nullable = true)
 |-- OBS_TIME: string (nullable = true)



In [6]:
show_as_html(daily_data, 100)

Unnamed: 0,ID,DATE,ELEMENT,VALUE,M_FLAG,Q_FLAG,S_FLAG,OBS_TIME
0,AE000041196,20230101,TMAX,252.0,,,S,
1,AE000041196,20230101,TMIN,149.0,,,S,
2,AE000041196,20230101,PRCP,0.0,D,,S,
3,AE000041196,20230101,TAVG,207.0,H,,S,
4,AEM00041194,20230101,TMAX,255.0,,,S,
5,AEM00041194,20230101,TMIN,186.0,,,S,
6,AEM00041194,20230101,PRCP,0.0,,,S,
7,AEM00041194,20230101,TAVG,223.0,H,,S,
8,AEM00041217,20230101,TMAX,248.0,,,S,
9,AEM00041217,20230101,TMIN,184.0,,,S,


In [7]:
!hdfs dfs -ls hdfs:///data/ghcnd/

Found 5 items
drwxr-xr-x   - jsw93 supergroup          0 2024-03-19 00:11 hdfs:///data/ghcnd/daily
-rwxr-xr-x   8 jsw93 supergroup       3659 2024-03-11 00:45 hdfs:///data/ghcnd/ghcnd-countries.txt
-rwxr-xr-x   8 jsw93 supergroup   34380032 2024-03-11 00:39 hdfs:///data/ghcnd/ghcnd-inventory.txt
-rwxr-xr-x   8 jsw93 supergroup       1086 2024-03-11 00:45 hdfs:///data/ghcnd/ghcnd-states.txt
-rwxr-xr-x   8 jsw93 supergroup   10834968 2024-03-11 00:39 hdfs:///data/ghcnd/ghcnd-stations.txt


In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import substring

In [None]:
#(c)

In [9]:
#Station data
spark = SparkSession.builder.appName("GHCN Data Analysis").getOrCreate()
get_stations = spark.read.text('hdfs:///data/ghcnd/ghcnd-stations.txt')





In [10]:
station_data = get_stations.select(
    substring('value', 1, 11).alias('ID'),
    substring('value', 13, 8).alias('LATITUDE'),  # Fix lengths and use alias, not alise
    substring('value', 22, 9).alias('LONGITUDE'),  # Fix lengths
    substring('value', 32, 6).alias('ELEVATION'),  # Fix lengths
    substring('value', 39, 2).alias('STATE'),
    substring('value', 42, 30).alias('NAME'),
    substring('value', 73, 3).alias('GSN_FLAG'),
    substring('value', 77, 3).alias('HCN_CRN_FLAG'),
    substring('value', 81, 5).alias('WMO_ID')
)




In [11]:
station_data.show()

+-----------+--------+---------+---------+-----+--------------------+--------+------------+------+
|         ID|LATITUDE|LONGITUDE|ELEVATION|STATE|                NAME|GSN_FLAG|HCN_CRN_FLAG|WMO_ID|
+-----------+--------+---------+---------+-----+--------------------+--------+------------+------+
|ACW00011604| 17.1167| -61.7833|     10.1|     |ST JOHNS COOLIDGE...|        |            |      |
|ACW00011647| 17.1333| -61.7833|     19.2|     |ST JOHNS         ...|        |            |      |
|AE000041196| 25.3330|  55.5170|     34.0|     |SHARJAH INTER. AI...|     GSN|            | 41196|
|AEM00041194| 25.2550|  55.3640|     10.4|     |DUBAI INTL       ...|        |            | 41194|
|AEM00041217| 24.4330|  54.6510|     26.8|     |ABU DHABI INTL   ...|        |            | 41217|
|AEM00041218| 24.2620|  55.6090|    264.9|     |AL AIN INTL      ...|        |            | 41218|
|AF000040930| 35.3170|  69.0170|   3366.0|     |NORTH-SALANG     ...|     GSN|            | 40930|
|AFM000409

In [12]:
show_as_html(station_data, 10)

Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEVATION,STATE,NAME,GSN_FLAG,HCN_CRN_FLAG,WMO_ID
0,ACW00011604,17.1167,-61.7833,10.1,,ST JOHNS COOLIDGE FLD,,,
1,ACW00011647,17.1333,-61.7833,19.2,,ST JOHNS,,,
2,AE000041196,25.333,55.517,34.0,,SHARJAH INTER. AIRP,GSN,,41196.0
3,AEM00041194,25.255,55.364,10.4,,DUBAI INTL,,,41194.0
4,AEM00041217,24.433,54.651,26.8,,ABU DHABI INTL,,,41217.0
5,AEM00041218,24.262,55.609,264.9,,AL AIN INTL,,,41218.0
6,AF000040930,35.317,69.017,3366.0,,NORTH-SALANG,GSN,,40930.0
7,AFM00040938,34.21,62.228,977.2,,HERAT,,,40938.0
8,AFM00040948,34.566,69.212,1791.3,,KABUL INTL,,,40948.0
9,AFM00040990,31.5,65.85,1010.0,,KANDAHAR AIRPORT,,,40990.0


In [None]:
# Number of stations without WMO_ID

In [13]:
from pyspark.sql.functions import trim

# Count rows in the DataFrame
station_count = station_data.count()

# Count stations without a WMO ID considering spaces as missing values
stations_without_wmo_id = station_data.filter(trim(station_data.WMO_ID) == '').count()
print(f"No of Stations: {station_count}")
print(f"Stations without WMO_ID: {stations_without_wmo_id}")


No of Stations: 125983
Stations without WMO_ID: 118023


In [14]:
from pyspark.sql.functions import substring

states_df = spark.read.text("hdfs:///data/ghcnd/ghcnd-states.txt")
states_data = states_df.select(
    substring('value', 1, 2).alias('STATE_CODE'),  # Corrected .alias and added a comma
    substring('value', 4, 47).alias('STATE_NAME')  # Adjusted length to 47 based on metadata
)

# To count the number of rows, make sure to use the transformed DataFrame
states_count = states_data.count()
print(f"Number of rows in states: {states_count}")



Number of rows in states: 74


In [15]:
show_as_html(states_data, 10)

Unnamed: 0,STATE_CODE,STATE_NAME
0,AB,ALBERTA
1,AK,ALASKA
2,AL,ALABAMA
3,AR,ARKANSAS
4,AS,AMERICAN SAMOA
5,AZ,ARIZONA
6,BC,BRITISH COLUMBIA
7,CA,CALIFORNIA
8,CO,COLORADO
9,CT,CONNECTICUT


In [16]:
from pyspark.sql.functions import substring

countries_df = spark.read.text("hdfs:///data/ghcnd/ghcnd-countries.txt")
countries_data = countries_df.select(
    substring('value', 1, 2).alias('CODE'),
    substring('value', 4, 50).alias('NAME')
)


countries_count = countries_data.count()
print(f"Number of rows in countries: {countries_count}")


Number of rows in countries: 219


In [17]:
show_as_html(countries_data, 10)

Unnamed: 0,CODE,NAME
0,AC,Antigua and Barbuda
1,AE,United Arab Emirates
2,AF,Afghanistan
3,AG,Algeria
4,AJ,Azerbaijan
5,AL,Albania
6,AM,Armenia
7,AO,Angola
8,AQ,American Samoa [United States]
9,AR,Argentina


In [18]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import substring

In [20]:
inventory_df = spark.read.text("hdfs:///data/ghcnd/ghcnd-inventory.txt")
inventory_data = inventory_df.select(
    substring('value', 1, 11).alias('ID'),
    substring('value', 13, 8).alias('LATITUDE'),
    substring('value', 22, 9).alias('LONGITUTE'),
    substring('value', 32, 4).alias('ELEMENT'),
    substring('value', 37, 4).alias('FIRSTYEAR'),
    substring('value', 42, 4).alias('LASTYEAR')
)
    
inventory_count = inventory_data.count()
print(f"Number of rows in inventory: {inventory_count}")


Number of rows in inventory: 747382


In [21]:
show_as_html(inventory_data, 10)

Unnamed: 0,ID,LATITUDE,LONGITUTE,ELEMENT,FIRSTYEAR,LASTYEAR
0,ACW00011604,17.1167,-61.7833,TMAX,1949,1949
1,ACW00011604,17.1167,-61.7833,TMIN,1949,1949
2,ACW00011604,17.1167,-61.7833,PRCP,1949,1949
3,ACW00011604,17.1167,-61.7833,SNOW,1949,1949
4,ACW00011604,17.1167,-61.7833,SNWD,1949,1949
5,ACW00011604,17.1167,-61.7833,PGTM,1949,1949
6,ACW00011604,17.1167,-61.7833,WDFG,1949,1949
7,ACW00011604,17.1167,-61.7833,WSFG,1949,1949
8,ACW00011604,17.1167,-61.7833,WT03,1949,1949
9,ACW00011604,17.1167,-61.7833,WT08,1949,1949


In [None]:
#

In [21]:
# Q3

In [None]:
# (a)

In [22]:
from pyspark.sql.functions import substring

station_data = station_data.withColumn("COUNTRY_CODE", substring("ID", 1, 2))
show_as_html(station_data,5)

Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEVATION,STATE,NAME,GSN_FLAG,HCN_CRN_FLAG,WMO_ID,COUNTRY_CODE
0,ACW00011604,17.1167,-61.7833,10.1,,ST JOHNS COOLIDGE FLD,,,,AC
1,ACW00011647,17.1333,-61.7833,19.2,,ST JOHNS,,,,AC
2,AE000041196,25.333,55.517,34.0,,SHARJAH INTER. AIRP,GSN,,41196.0,AE
3,AEM00041194,25.255,55.364,10.4,,DUBAI INTL,,,41194.0,AE
4,AEM00041217,24.433,54.651,26.8,,ABU DHABI INTL,,,41217.0,AE


In [23]:
output_path = "hdfs:///user/cgo82/outputs/ghcnd/stations_with_country_code"

station_data.write.mode("overwrite").parquet(output_path)


In [None]:
# (b) LEFT JOIN stations with countries using your output from part (a).

In [24]:
countries_data = countries_data.withColumnRenamed("NAME", "COUNTRY_NAME")


stations_countries_df = station_data.join(
    countries_data,
    station_data.COUNTRY_CODE == countries_data.CODE,
    "left"
)

In [25]:
show_as_html(stations_countries_df, 10)

Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEVATION,STATE,NAME,GSN_FLAG,HCN_CRN_FLAG,WMO_ID,COUNTRY_CODE,CODE,COUNTRY_NAME
0,ACW00011604,17.1167,-61.7833,10.1,,ST JOHNS COOLIDGE FLD,,,,AC,AC,Antigua and Barbuda
1,ACW00011647,17.1333,-61.7833,19.2,,ST JOHNS,,,,AC,AC,Antigua and Barbuda
2,AE000041196,25.333,55.517,34.0,,SHARJAH INTER. AIRP,GSN,,41196.0,AE,AE,United Arab Emirates
3,AEM00041194,25.255,55.364,10.4,,DUBAI INTL,,,41194.0,AE,AE,United Arab Emirates
4,AEM00041217,24.433,54.651,26.8,,ABU DHABI INTL,,,41217.0,AE,AE,United Arab Emirates
5,AEM00041218,24.262,55.609,264.9,,AL AIN INTL,,,41218.0,AE,AE,United Arab Emirates
6,AF000040930,35.317,69.017,3366.0,,NORTH-SALANG,GSN,,40930.0,AF,AF,Afghanistan
7,AFM00040938,34.21,62.228,977.2,,HERAT,,,40938.0,AF,AF,Afghanistan
8,AFM00040948,34.566,69.212,1791.3,,KABUL INTL,,,40948.0,AF,AF,Afghanistan
9,AFM00040990,31.5,65.85,1010.0,,KANDAHAR AIRPORT,,,40990.0,AF,AF,Afghanistan


In [26]:
output_path = "hdfs:///user/cgo82/outputs/ghcnd/stations_countries.csv"
stations_countries_df.write.mode("overwrite").option("header", "true").csv(output_path)





In [27]:
# Perform the LEFT JOIN
country_state_df = stations_countries_df.join(
    states_data, stations_countries_df.STATE == states_data.STATE_CODE, "left_outer")
country_state_df=country_state_df.drop(stations_countries_df['STATE'])



In [28]:
show_as_html(country_state_df)

Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEVATION,NAME,GSN_FLAG,HCN_CRN_FLAG,WMO_ID,COUNTRY_CODE,CODE,COUNTRY_NAME,STATE_CODE,STATE_NAME
0,ACW00011604,17.1167,-61.7833,10.1,ST JOHNS COOLIDGE FLD,,,,AC,AC,Antigua and Barbuda,,
1,ACW00011647,17.1333,-61.7833,19.2,ST JOHNS,,,,AC,AC,Antigua and Barbuda,,
2,AE000041196,25.333,55.517,34.0,SHARJAH INTER. AIRP,GSN,,41196.0,AE,AE,United Arab Emirates,,
3,AEM00041194,25.255,55.364,10.4,DUBAI INTL,,,41194.0,AE,AE,United Arab Emirates,,
4,AEM00041217,24.433,54.651,26.8,ABU DHABI INTL,,,41217.0,AE,AE,United Arab Emirates,,
5,AEM00041218,24.262,55.609,264.9,AL AIN INTL,,,41218.0,AE,AE,United Arab Emirates,,
6,AF000040930,35.317,69.017,3366.0,NORTH-SALANG,GSN,,40930.0,AF,AF,Afghanistan,,
7,AFM00040938,34.21,62.228,977.2,HERAT,,,40938.0,AF,AF,Afghanistan,,
8,AFM00040948,34.566,69.212,1791.3,KABUL INTL,,,40948.0,AF,AF,Afghanistan,,
9,AFM00040990,31.5,65.85,1010.0,KANDAHAR AIRPORT,,,40990.0,AF,AF,Afghanistan,,


In [29]:
output_path1 = "hdfs:///user/cgo82/outputs/ghcnd/country_state_df.csv"
country_state_df.write.mode("overwrite").option("header", "true").csv(output_path1)



In [30]:
# Left join using ['US']
station_data = station_data.withColumnRenamed('NAME', 'STATION_NAME')

stations_states_df  = station_data.join(
    states_data,
    (station_data.STATE == states_data.STATE_CODE) & (station_data.COUNTRY_CODE == 'US'),
    'left')

In [31]:
show_as_html(stations_states_df, 5)

Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEVATION,STATE,STATION_NAME,GSN_FLAG,HCN_CRN_FLAG,WMO_ID,COUNTRY_CODE,STATE_CODE,STATE_NAME
0,ACW00011604,17.1167,-61.7833,10.1,,ST JOHNS COOLIDGE FLD,,,,AC,,
1,ACW00011647,17.1333,-61.7833,19.2,,ST JOHNS,,,,AC,,
2,AE000041196,25.333,55.517,34.0,,SHARJAH INTER. AIRP,GSN,,41196.0,AE,,
3,AEM00041194,25.255,55.364,10.4,,DUBAI INTL,,,41194.0,AE,,
4,AEM00041217,24.433,54.651,26.8,,ABU DHABI INTL,,,41217.0,AE,,


In [32]:
output_path1 = "hdfs:///user/cgo82/outputs/ghcnd/stations_states.csv"
stations_states_df.write.mode("overwrite").option("header", "true").csv(output_path1)

In [34]:
# First and last year of activity of each station

In [33]:
from pyspark.sql.functions import min, max

inventory_activity_years_df = inventory_data.groupBy("ID").agg(
    min("FIRSTYEAR").alias("FIRST_ACTIVE_YEAR"),
    max("LASTYEAR").alias("LAST_ACTIVE_YEAR")
)



In [34]:
show_as_html(inventory_activity_years_df, 10)

Unnamed: 0,ID,FIRST_ACTIVE_YEAR,LAST_ACTIVE_YEAR
0,AGE00147719,1888,2024
1,ALE00100939,1940,2000
2,AQC00914873,1955,1967
3,AR000000002,1981,2000
4,AR000875850,1908,2024
5,ARM00087022,1973,2024
6,ARM00087480,1965,2024
7,ARM00087904,2003,2024
8,ASN00001003,1909,1940
9,ASN00002033,1920,1965


In [36]:
# Count of Different Elements Collected by Each Station

In [35]:
from pyspark.sql.functions import countDistinct

inventory_elements_count_df = inventory_data.groupBy("ID").agg(
    countDistinct("ELEMENT").alias("NUM_ELEMENTS")
)


In [36]:
show_as_html(inventory_elements_count_df)

Unnamed: 0,ID,NUM_ELEMENTS
0,AR000875850,5
1,ASN00003069,6
2,ASN00007031,4
3,ASN00009802,4
4,ASN00009965,7
5,ASN00014074,4
6,ASN00017001,4
7,ASN00017124,4
8,ASN00018040,10
9,ASN00018094,4


In [37]:
new_inventory_data = inventory_activity_years_df.join(
    inventory_elements_count_df, inventory_activity_years_df.ID == inventory_elements_count_df.ID, "left" )
new_inventory_data = new_inventory_data.drop(inventory_activity_years_df['ID'])
show_as_html(new_inventory_data,5)

Unnamed: 0,FIRST_ACTIVE_YEAR,LAST_ACTIVE_YEAR,ID,NUM_ELEMENTS
0,1888,2024,AGE00147719,4
1,1940,2000,ALE00100939,2
2,1955,1967,AQC00914873,12
3,1981,2000,AR000000002,1
4,1908,2024,AR000875850,5


In [38]:
output_path = "hdfs:///user/cgo82/outputs/ghcnd/new_invenroty.csv"
new_inventory_data.write.mode("overwrite").option("header", "true").csv(output_path)

In [40]:
#  Count of Core and Other Elements Collected by Each Station

In [39]:
from pyspark.sql.functions import when, count, lit

core_elements = ['PRCP', 'TMAX', 'TMIN', 'SNOW', 'SNWD']

# Adding a column to classify elements as 'core' or 'other'
core_elements_df = inventory_data.filter(inventory_data['ELEMENT'].isin(core_elements)).groupBy('ID').agg(
    F.countDistinct('ELEMENT').alias('CORE_ELEMENT_CT')

)

other_elements_df = inventory_data.filter(~inventory_data['ELEMENT'].isin(core_elements)).groupBy('ID').agg(
    F.countDistinct('ELEMENT').alias('OTHER_ELEMENT_CT')

)



In [40]:
show_as_html(core_elements_df, 5)
show_as_html(other_elements_df, 5)

Unnamed: 0,ID,CORE_ELEMENT_CT
0,US1COEP0053,3
1,US1COEP0228,3
2,US1COJF0537,3
3,US1COLP0061,3
4,US1COLR0215,3


Unnamed: 0,ID,OTHER_ELEMENT_CT
0,CA007041166,3
1,CA007077571,3
2,CA008400800,2
3,CA1NL000069,4
4,CA1ON000224,4


In [41]:
new_inventory_data = new_inventory_data.join(
    core_elements_df, new_inventory_data.ID == core_elements_df.ID, "left" )
new_inventory_data = new_inventory_data.drop(core_elements_df['ID'])
show_as_html(new_inventory_data,5)

Unnamed: 0,FIRST_ACTIVE_YEAR,LAST_ACTIVE_YEAR,ID,NUM_ELEMENTS,CORE_ELEMENT_CT
0,1888,2024,AGE00147719,4,3
1,1940,2000,ALE00100939,2,2
2,1955,1967,AQC00914873,12,5
3,1981,2000,AR000000002,1,1
4,1908,2024,AR000875850,5,4


In [42]:
output_path = "hdfs:///user/cgo82/outputs/ghcnd/invenroty_with_core_elements.csv"
new_inventory_data.write.mode("overwrite").option("header", "true").csv(output_path)

In [43]:
new_inventory_data = new_inventory_data.join(
    other_elements_df, new_inventory_data.ID == other_elements_df.ID, "left" )
new_inventory_data = new_inventory_data.drop(other_elements_df['ID'])
show_as_html(new_inventory_data,5)

Unnamed: 0,FIRST_ACTIVE_YEAR,LAST_ACTIVE_YEAR,ID,NUM_ELEMENTS,CORE_ELEMENT_CT,OTHER_ELEMENT_CT
0,1888,2024,AGE00147719,4,3,1.0
1,1940,2000,ALE00100939,2,2,
2,1955,1967,AQC00914873,12,5,7.0
3,1981,2000,AR000000002,1,1,
4,1908,2024,AR000875850,5,4,1.0


In [44]:
output_path = "hdfs:///user/cgo82/outputs/ghcnd/invenroty_with_other_elements.csv"
new_inventory_data.write.mode("overwrite").option("header", "true").csv(output_path)

In [53]:
#  Stations Collecting All Five Core Elements

In [45]:
from pyspark.sql import functions as F
all_core_elements_df = core_elements_df.filter(F.col("CORE_ELEMENT_CT") == 5)

all_core_elements_ct = all_core_elements_df.count()


print(f"Number of stations with all five core elements: {all_core_elements_ct}")


Number of stations with all five core elements: 20467


In [46]:
show_as_html(all_core_elements_df,5)

Unnamed: 0,ID,CORE_ELEMENT_CT
0,CA007013100,5
1,CA007037400,5
2,CA007063090,5
3,CA00706CP09,5
4,CA007111080,5


In [43]:
# Stations Collecting Only Precipitation

In [47]:
only_prep_df = inventory_data.filter(
    (inventory_data['ELEMENT'] == 'PRCP') & (~inventory_data['ID'].isin(other_elements_df.select('ID').rdd.flatMap(lambda x: x).collect()))
)
only_prep_count = only_prep_df.select('ID').distinct().count()
print(f"Stations collecting only precipitation and no other elements: {only_prep_count}")


Stations collecting only precipitation and no other elements: 35662


In [48]:
collected_elements_df = inventory_data.groupBy('ID').agg(
    F.collect_set('ELEMENT').alias('COLLECTED_ELEMENTS')
)
show_as_html(collected_elements_df,5)

Unnamed: 0,ID,COLLECTED_ELEMENTS
0,AGE00147719,"[TMAX, TMIN, PRCP, TAVG]"
1,ALE00100939,"[TMAX, PRCP]"
2,AQC00914873,"[WT03, TMAX, TMIN, PRCP, SNWD, MDPR, DAPR, SNO..."
3,AR000000002,[PRCP]
4,AR000875850,"[TMAX, TMIN, PRCP, SNWD, TAVG]"


In [49]:
new_inventory_data = new_inventory_data.join(
    collected_elements_df, new_inventory_data.ID == collected_elements_df.ID, "left" )
new_inventory_data = new_inventory_data.drop(collected_elements_df['ID'])
show_as_html(new_inventory_data,5)

Unnamed: 0,FIRST_ACTIVE_YEAR,LAST_ACTIVE_YEAR,ID,NUM_ELEMENTS,CORE_ELEMENT_CT,OTHER_ELEMENT_CT,COLLECTED_ELEMENTS
0,1888,2024,AGE00147719,4,3,1.0,"[TMAX, TMIN, PRCP, TAVG]"
1,1940,2000,ALE00100939,2,2,,"[TMAX, PRCP]"
2,1955,1967,AQC00914873,12,5,7.0,"[WT03, TMAX, TMIN, PRCP, SNWD, MDPR, DAPR, SNO..."
3,1981,2000,AR000000002,1,1,,[PRCP]
4,1908,2024,AR000875850,5,4,1.0,"[TMAX, TMIN, PRCP, SNWD, TAVG]"


In [51]:
output_path = "hdfs:///user/cgo82/outputs/ghcnd/invenroty_with_coll_elements.parquet"
new_inventory_data.write.mode("overwrite").option("header", "true").parquet(output_path)

In [52]:
enriched_stations_df = country_state_df.join(
    new_inventory_data, country_state_df.ID == new_inventory_data.ID, "left_outer")
enriched_stations_df=enriched_stations_df.drop(new_inventory_data['ID'])
show_as_html(enriched_stations_df,5)

Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEVATION,NAME,GSN_FLAG,HCN_CRN_FLAG,WMO_ID,COUNTRY_CODE,CODE,COUNTRY_NAME,STATE_CODE,STATE_NAME,FIRST_ACTIVE_YEAR,LAST_ACTIVE_YEAR,NUM_ELEMENTS,CORE_ELEMENT_CT,OTHER_ELEMENT_CT,COLLECTED_ELEMENTS
0,AGE00147719,33.7997,2.89,767.0,LAGHOUAT,,,60545.0,AG,AG,Algeria,,,1888,2024,4,3,1.0,"[TMAX, TMIN, PRCP, TAVG]"
1,ALE00100939,41.3331,19.7831,89.0,TIRANA,,,,AL,AL,Albania,,,1940,2000,2,2,,"[TMAX, PRCP]"
2,AQC00914873,-14.35,-170.7667,14.9,TAPUTIMU TUTUILA,,,,AQ,AQ,American Samoa [United States],AS,AMERICAN SAMOA,1955,1967,12,5,7.0,"[WT03, TMAX, TMIN, PRCP, SNWD, MDPR, DAPR, SNO..."
3,AR000000002,-29.82,-57.42,75.0,BONPLAND,,,,AR,AR,Argentina,,,1981,2000,1,1,,[PRCP]
4,AR000875850,-34.583,-58.483,25.0,BUENOS AIRES OBSERV,,,87585.0,AR,AR,Argentina,,,1908,2024,5,4,1.0,"[TMAX, TMIN, PRCP, SNWD, TAVG]"


In [53]:
output_path2 = "hdfs:///user/cgo82/outputs/ghcnd/enriched_stations.parquet"
enriched_stations_df.write.mode("overwrite").parquet(output_path2)


In [None]:
# f  LEFT JOIN the 1000 Rows Subset of Daily Data with the Enriched Stations Table

In [54]:
# Perform the LEFT JOIN
daily_stations_df = daily_data.join(
    enriched_stations_df,
    daily_data.ID == enriched_stations_df.ID,
    "left"
)
show_as_html(daily_stations_df,5)

Unnamed: 0,ID,DATE,ELEMENT,VALUE,M_FLAG,Q_FLAG,S_FLAG,OBS_TIME,ID.1,LATITUDE,...,CODE,COUNTRY_NAME,STATE_CODE,STATE_NAME,FIRST_ACTIVE_YEAR,LAST_ACTIVE_YEAR,NUM_ELEMENTS,CORE_ELEMENT_CT,OTHER_ELEMENT_CT,COLLECTED_ELEMENTS
0,AE000041196,20230101,TMAX,252.0,,,S,,AE000041196,25.333,...,AE,United Arab Emirates,,,1944,2024,4,3,1,"[TMAX, TMIN, PRCP, TAVG]"
1,AE000041196,20230101,TMIN,149.0,,,S,,AE000041196,25.333,...,AE,United Arab Emirates,,,1944,2024,4,3,1,"[TMAX, TMIN, PRCP, TAVG]"
2,AE000041196,20230101,PRCP,0.0,D,,S,,AE000041196,25.333,...,AE,United Arab Emirates,,,1944,2024,4,3,1,"[TMAX, TMIN, PRCP, TAVG]"
3,AE000041196,20230101,TAVG,207.0,H,,S,,AE000041196,25.333,...,AE,United Arab Emirates,,,1944,2024,4,3,1,"[TMAX, TMIN, PRCP, TAVG]"
4,AEM00041194,20230101,TMAX,255.0,,,S,,AEM00041194,25.255,...,AE,United Arab Emirates,,,1983,2024,4,3,1,"[TMAX, TMIN, PRCP, TAVG]"


In [None]:
# Determining Missing Stations Using LEFT JOIN

In [55]:
missing_stations_df = daily_stations_df.filter(enriched_stations_df["ID"].isNull())
show_as_html(missing_stations_df,10)
stations_only_left_count = missing_stations_df.count()

print(f"Number of stations not in stations using left-join: {stations_only_left_count}")


Unnamed: 0,ID,DATE,ELEMENT,VALUE,M_FLAG,Q_FLAG,S_FLAG,OBS_TIME,ID.1,LATITUDE,...,CODE,COUNTRY_NAME,STATE_CODE,STATE_NAME,FIRST_ACTIVE_YEAR,LAST_ACTIVE_YEAR,NUM_ELEMENTS,CORE_ELEMENT_CT,OTHER_ELEMENT_CT,COLLECTED_ELEMENTS
0,AGM00060640,20230101,TMAX,211.0,,,S,,,,...,,,,,,,,,,
1,AGM00060640,20230101,TMIN,58.0,,,S,,,,...,,,,,,,,,,
2,AGM00060640,20230101,PRCP,0.0,,,S,,,,...,,,,,,,,,,
3,AGM00060640,20230101,TAVG,131.0,H,,S,,,,...,,,,,,,,,,
4,AGM00060656,20230101,TMIN,86.0,,,S,,,,...,,,,,,,,,,
5,AGM00060656,20230101,TAVG,150.0,H,,S,,,,...,,,,,,,,,,
6,AGM00060670,20230101,TAVG,102.0,H,,S,,,,...,,,,,,,,,,


Number of stations not in stations using left-join: 7


In [50]:
# Determining Missing Stations Using without LEFT JOIN

In [56]:
stations_only_in_daily_df  = daily_data.join(
    enriched_stations_df, daily_data.ID == enriched_stations_df.ID, "left_anti")
show_as_html(stations_only_in_daily_df,10)
stations_only_in_daily_df_anti_count = stations_only_in_daily_df.count()

print(f"Number of stations in daily not in stations without left join: {stations_only_in_daily_df_anti_count}")

Unnamed: 0,ID,DATE,ELEMENT,VALUE,M_FLAG,Q_FLAG,S_FLAG,OBS_TIME
0,AGM00060640,20230101,TMAX,211.0,,,S,
1,AGM00060640,20230101,TMIN,58.0,,,S,
2,AGM00060640,20230101,PRCP,0.0,,,S,
3,AGM00060640,20230101,TAVG,131.0,H,,S,
4,AGM00060656,20230101,TMIN,86.0,,,S,
5,AGM00060656,20230101,TAVG,150.0,H,,S,
6,AGM00060670,20230101,TAVG,102.0,H,,S,


Number of stations in daily not in stations without left join: 7


In [57]:
daily_stations_df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- DATE: string (nullable = true)
 |-- ELEMENT: string (nullable = true)
 |-- VALUE: float (nullable = true)
 |-- M_FLAG: string (nullable = true)
 |-- Q_FLAG: string (nullable = true)
 |-- S_FLAG: string (nullable = true)
 |-- OBS_TIME: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- LATITUDE: string (nullable = true)
 |-- LONGITUDE: string (nullable = true)
 |-- ELEVATION: string (nullable = true)
 |-- NAME: string (nullable = true)
 |-- GSN_FLAG: string (nullable = true)
 |-- HCN_CRN_FLAG: string (nullable = true)
 |-- WMO_ID: string (nullable = true)
 |-- COUNTRY_CODE: string (nullable = true)
 |-- CODE: string (nullable = true)
 |-- COUNTRY_NAME: string (nullable = true)
 |-- STATE_CODE: string (nullable = true)
 |-- STATE_NAME: string (nullable = true)
 |-- FIRST_ACTIVE_YEAR: string (nullable = true)
 |-- LAST_ACTIVE_YEAR: string (nullable = true)
 |-- NUM_ELEMENTS: long (nullable = true)
 |-- CORE_ELEMENT_CT: long (nullabl

In [None]:
# Analysis

In [None]:
# Q1

In [58]:
# Run this cell to start a spark session in this notebook

start_spark(executor_instances=4, executor_cores=2, worker_memory=4, master_memory=4)

0,1
spark.dynamicAllocation.enabled,false
spark.executor.instances,4
spark.driver.memory,4g
spark.driver.port,41965
spark.executor.memory,4g
spark.ui.port,4935
spark.master,spark://masternode2:7077
spark.app.id,app-20240429170049-0306
spark.driver.extraJavaOptions,-Dderby.system.home=/tmp/cgo82/spark/
spark.executor.id,driver


In [None]:
# (a)

In [None]:
# Total number of stations

In [59]:
total_stations = enriched_stations_df.select("ID").distinct().count()
print(f"Total number of stations: {total_stations}")


Total number of stations: 125983


In [None]:
# How many stations were active so far in 2024

In [60]:

active_in_2024_df = inventory_data.filter((inventory_data.FIRSTYEAR <= 2024) & (inventory_data.LASTYEAR >= 2024))

active_stations_2024_count = active_in_2024_df.select("ID").distinct().count()
print(f"Number of stations active in 2024: {active_stations_2024_count}")



Number of stations active in 2024: 31840


In [None]:
# Counting stations in each network

In [61]:
from pyspark.sql.functions import col, count


gsn_count = enriched_stations_df.filter(col("GSN_FLAG") == "GSN").count()
hcn_count = enriched_stations_df.filter(col("HCN_CRN_FLAG") == "HCN").count()
crn_count = enriched_stations_df.filter(col("HCN_CRN_FLAG") == "CRN").count()

print(f"Number of stations in GSN: {gsn_count}")
print(f"Number of stations in HCN: {hcn_count}")
print(f"Number of stations in CRN: {crn_count}")


Number of stations in GSN: 991
Number of stations in HCN: 1218
Number of stations in CRN: 234


In [None]:
# Identifying stations in more than one network

In [62]:
# Filter stations that are in GSN and also marked as either HCN or CRN
stations_in_multiple_networks = enriched_stations_df.filter(
    (col("GSN_FLAG") == "GSN") & 
    ((col("HCN_CRN_FLAG") == "HCN") | (col("HCN_CRN_FLAG") == "CRN"))
)

stations_in_multiple_count = stations_in_multiple_networks.count()

print(f"Number of stations in more than one network: {stations_in_multiple_count}")


Number of stations in more than one network: 15


In [None]:
# (b)

In [None]:
# Count the total number of stations in each country

In [63]:
from pyspark.sql.functions import col, count

# Count the number of stations per country
stations_per_country_count = stations_countries_df.groupBy("COUNTRY_NAME").agg(
    count("*").alias("NUM_STATIONS")
)

pandas_df = stations_per_country_count.toPandas()
print(pandas_df)


                                          COUNTRY_NAME  NUM_STATIONS
0                                              Sweden           1721
1                                             Ukraine            204
2                                             Bolivia             36
3                                          Costa Rica              6
4                                    Equatorial Guinea             2
5                                                Ghana            18
6                                             Hungary             10
7                                                Iran             35
8                                               Libya             28
9                                                Oman             29
10                                          Swaziland              6
11                                            Algeria             82
12                                          Australia          17088
13                                

In [None]:
# Joining Counts Back onto Countries

In [64]:

countries_with_station_counts = countries_data.join(
    stations_per_country_count,
    on="COUNTRY_NAME",
    how="left"
)

countries_with_station_counts.show()

+--------------------+----+------------+
|        COUNTRY_NAME|CODE|NUM_STATIONS|
+--------------------+----+------------+
|            Bolivia |  BL|          36|
|         Costa Rica |  CS|           6|
|   Equatorial Guinea|  EK|           2|
|               Ghana|  GH|          18|
|            Hungary |  HU|          10|
|               Iran |  IR|          35|
|              Libya |  LY|          28|
|               Oman |  MU|          29|
|          Swaziland |  WZ|           6|
|             Sweden |  SW|        1721|
|            Ukraine |  UP|         204|
|            Algeria |  AG|          82|
|          Australia |  AS|       17088|
|             Croatia|  HR|          14|
|            Germany |  GM|        1123|
|            Ireland |  EI|          14|
|Norfolk Island [A...|  NF|           1|
|          Sri Lanka |  CE|           6|
|Federated States ...|  FM|          38|
|               Iraq |  IZ|           1|
+--------------------+----+------------+
only showing top

In [None]:
# Save the data frame

In [66]:

output = "hdfs:///user/cgo82/outputs/ghcnd/station_countries.parquet"


countries_with_station_counts.write.mode("overwrite").parquet(output)


In [None]:
# Count the total number of stations in each state

In [67]:
from pyspark.sql.functions import col, count

# Count the number of stations per state within the US
stations_per_state_count = stations_states_df.groupBy("STATE_NAME").agg(
    count("*").alias("NUM_STATIONS")
)

stations_per_state_count.show()

+--------------+------------+
|    STATE_NAME|NUM_STATIONS|
+--------------+------------+
|     TENNESSEE|        1655|
|      KENTUCKY|         960|
|SOUTH CAROLINA|        1485|
|        ALASKA|        1040|
|      COLORADO|        4640|
|       INDIANA|        1924|
|  NORTH DAKOTA|         574|
|      MICHIGAN|        1422|
|NORTH CAROLINA|        2612|
|        KANSAS|        2217|
|          OHIO|        1397|
|     WISCONSIN|        1412|
|         MAINE|         573|
|       FLORIDA|        2142|
|     MINNESOTA|        2199|
|  SOUTH DAKOTA|        1120|
|   CONNECTICUT|         417|
|          null|       53694|
|     LOUISIANA|         769|
|    WASHINGTON|        1646|
+--------------+------------+
only showing top 20 rows



In [None]:
# Joining counts back to states

In [68]:
states_with_station_counts = states_data.join(
    stations_per_state_count,
    on="STATE_NAME",
    how="left"
)

states_with_station_counts.show()

+--------------------+----------+------------+
|          STATE_NAME|STATE_CODE|NUM_STATIONS|
+--------------------+----------+------------+
|            KENTUCKY|        KY|         960|
|              QUEBEC|        QC|        null|
|      SOUTH CAROLINA|        SC|        1485|
|           TENNESSEE|        TN|        1655|
|              ALASKA|        AK|        1040|
|     PACIFIC ISLANDS|        PI|        null|
|            COLORADO|        CO|        4640|
|             INDIANA|        IN|        1924|
|        NORTH DAKOTA|        ND|         574|
|            MICHIGAN|        MI|        1422|
|NEWFOUNDLAND AND ...|        NL|        null|
|      NORTH CAROLINA|        NC|        2612|
|             ONTARIO|        ON|        null|
|              KANSAS|        KS|        2217|
|    MARSHALL ISLANDS|        MH|        null|
|NORTHWEST TERRITO...|        NT|        null|
|                OHIO|        OH|        1397|
|           WISCONSIN|        WI|        1412|
|            

In [None]:
# save the results

In [69]:

output_path = "hdfs:///user/cgo82/outputs/ghcnd/states_station_counts"


states_with_station_counts.write.mode("overwrite").parquet(output_path)


In [None]:
# (d)

In [None]:
# How many stations are in southern heemisphere

In [70]:
from pyspark.sql.functions import col

# Count stations in the Southern Hemisphere
southern_hemisphere_stations_count = enriched_stations_df.filter(col("LATITUDE") < 0).count()

print(f"Number of stations in the Southern Hemisphere: {southern_hemisphere_stations_count}")


Number of stations in the Southern Hemisphere: 25316


In [69]:
# Count Stations in US territories

In [71]:
stations_per_country_count.printSchema()

root
 |-- COUNTRY_NAME: string (nullable = true)
 |-- NUM_STATIONS: long (nullable = false)



In [72]:
from pyspark.sql.functions import col

# Filter stations in US territories
stations_in_us_territories = stations_countries_df.filter(stations_countries_df["Country_Name"].contains("[United States]"))

# Count the stations
num_stations_in_us_territories = stations_in_us_territories.select('ID').count()
show_as_html(stations_in_us_territories, 20)

print(f"Number of stations in US territories: {num_stations_in_us_territories}")


Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEVATION,STATE,NAME,GSN_FLAG,HCN_CRN_FLAG,WMO_ID,COUNTRY_CODE,CODE,COUNTRY_NAME
0,AQC00914000,-14.3167,-170.7667,408.4,AS,AASUFOU,,,,AQ,AQ,American Samoa [United States]
1,AQC00914005,-14.2667,-170.65,182.9,AS,AFONO,,,,AQ,AQ,American Samoa [United States]
2,AQC00914021,-14.2667,-170.5833,6.1,AS,AMOULI TUTUILA,,,,AQ,AQ,American Samoa [United States]
3,AQC00914060,-14.2667,-170.6833,80.8,AS,ATUU,,,,AQ,AQ,American Samoa [United States]
4,AQC00914135,-14.3,-170.7,249.9,AS,FAGA ALU RSVR,,,,AQ,AQ,American Samoa [United States]
5,AQC00914138,-14.2833,-170.6833,24.1,AS,FAGA ALU STREAM,,,,AQ,AQ,American Samoa [United States]
6,AQC00914141,-14.2667,-170.6167,4.6,AS,FAGAITUA,,,,AQ,AQ,American Samoa [United States]
7,AQC00914145,-14.2833,-170.7167,14.9,AS,FAGASA TUTUILA,,,,AQ,AQ,American Samoa [United States]
8,AQC00914149,-14.2833,-170.6833,57.0,AS,FAGA TOGO,,,,AQ,AQ,American Samoa [United States]
9,AQC00914188,-14.2167,-168.5333,6.1,AS,FALEASAO TAU,,,,AQ,AQ,American Samoa [United States]


Number of stations in US territories: 383


In [64]:
# Q2

In [62]:
# (a)

In [63]:
# Defining for UDF for distance Calculation

In [73]:
import math
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

def haversine(lon1, lat1, lon2, lat2):
    # Convert input to floats
    lon1, lat1, lon2, lat2 = map(float, [lon1, lat1, lon2, lat2])
    
    # Convert decimal degrees to radians
    lon1, lat1, lon2, lat2 = map(math.radians, [lon1, lat1, lon2, lat2])

    # Haversine formula
    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
    c = 2 * math.asin(math.sqrt(a))
    r = 6371  # Radius of Earth in kilometers. Use 3956 for miles.
    return c * r

# Wrap the Python function with udf, specifying the return type as FloatType
haversine_udf = udf(haversine, FloatType())


In [74]:
subset_stations_df = enriched_stations_df.limit(10)

# Perform a CROSS JOIN to generate all pairs of stations
cross_joined_stations = subset_stations_df.alias("station1").crossJoin(subset_stations_df.alias("station2"))

# Calculate the distance between each pair of stations with updated UDF
distance_df = cross_joined_stations.withColumn(
    "distance_km",
    haversine_udf(
        "station1.LONGITUDE",
        "station1.LATITUDE",
        "station2.LONGITUDE",
        "station2.LATITUDE"
    )
)

distance_df.select("station1.ID", "station2.ID", "distance_km").show(10)



+-----------+-----------+-----------+
|         ID|         ID|distance_km|
+-----------+-----------+-----------+
|AGM00060555|AGM00060555|        0.0|
|AGM00060555|AJ000037756|  3864.5713|
|AGM00060555|AJ000037923|   3906.477|
|AGM00060555|AM000037801|  3543.1982|
|AGM00060555|AMM00037717|  3532.2854|
|AGM00060555|AQC00914594|  17907.771|
|AGM00060555|AQW00061705|  17906.658|
|AGM00060555|AR000087925|  11924.059|
|AGM00060555|ARM00087178|   9384.323|
|AGM00060555|ASM00094995|   17477.89|
+-----------+-----------+-----------+
only showing top 10 rows



In [72]:
# (b)

In [75]:
# Filter new Zealand stations

In [75]:
nz_stations = enriched_stations_df.filter(stations_countries_df.COUNTRY_CODE == 'NZ')
nz_stations.show(5)

+-----------+--------+---------+---------+--------------------+--------+------------+------+------------+----+------------+----------+----------+-----------------+----------------+------------+---------------+----------------+--------------------+
|         ID|LATITUDE|LONGITUDE|ELEVATION|                NAME|GSN_FLAG|HCN_CRN_FLAG|WMO_ID|COUNTRY_CODE|CODE|COUNTRY_NAME|STATE_CODE|STATE_NAME|FIRST_ACTIVE_YEAR|LAST_ACTIVE_YEAR|NUM_ELEMENTS|CORE_ELEMENT_CT|OTHER_ELEMENT_CT|  COLLECTED_ELEMENTS|
+-----------+--------+---------+---------+--------------------+--------+------------+------+------------+----+------------+----------+----------+-----------------+----------------+------------+---------------+----------------+--------------------+
|NZ000936150|-42.7170| 170.9830|     40.0|HOKITIKA AERODROM...|        |            | 93781|          NZ|  NZ|New Zealand |      null|      null|             1964|            2024|           4|              3|               1|[TMAX, TMIN, PRCP...|
|NZ00009

In [None]:
# number of unque stations in NZ

In [76]:
nz_station_count = nz_stations.select('ID').count()
print(f"Number of stations in New Zealand: {nz_station_count}")

Number of stations in New Zealand: 15


In [None]:
 # Cross JOIN New Zealand Stations

In [77]:
nz_station_pairs = nz_stations.alias("station1").crossJoin(nz_stations.alias("station2"))
nz_station_pairs.show(5)

+-----------+--------+---------+---------+--------------------+--------+------------+------+------------+----+------------+----------+----------+-----------------+----------------+------------+---------------+----------------+--------------------+-----------+--------+---------+---------+--------------------+--------+------------+------+------------+----+------------+----------+----------+-----------------+----------------+------------+---------------+----------------+--------------------+
|         ID|LATITUDE|LONGITUDE|ELEVATION|                NAME|GSN_FLAG|HCN_CRN_FLAG|WMO_ID|COUNTRY_CODE|CODE|COUNTRY_NAME|STATE_CODE|STATE_NAME|FIRST_ACTIVE_YEAR|LAST_ACTIVE_YEAR|NUM_ELEMENTS|CORE_ELEMENT_CT|OTHER_ELEMENT_CT|  COLLECTED_ELEMENTS|         ID|LATITUDE|LONGITUDE|ELEVATION|                NAME|GSN_FLAG|HCN_CRN_FLAG|WMO_ID|COUNTRY_CODE|CODE|COUNTRY_NAME|STATE_CODE|STATE_NAME|FIRST_ACTIVE_YEAR|LAST_ACTIVE_YEAR|NUM_ELEMENTS|CORE_ELEMENT_CT|OTHER_ELEMENT_CT|  COLLECTED_ELEMENTS|
+-----------

In [None]:
# Calculate pairwise distance

In [78]:
nz_distances = nz_station_pairs.withColumn(
    "distance_km",
    haversine_udf(
        "station1.LONGITUDE",
        "station1.LATITUDE",
        "station2.LONGITUDE",
        "station2.LATITUDE"
    )
)


In [79]:
show_as_html(nz_distances, 5)

Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEVATION,NAME,GSN_FLAG,HCN_CRN_FLAG,WMO_ID,COUNTRY_CODE,CODE,...,COUNTRY_NAME,STATE_CODE,STATE_NAME,FIRST_ACTIVE_YEAR,LAST_ACTIVE_YEAR,NUM_ELEMENTS,CORE_ELEMENT_CT,OTHER_ELEMENT_CT,COLLECTED_ELEMENTS,distance_km
0,NZ000936150,-42.717,170.983,40.0,HOKITIKA AERODROME,,,93781,NZ,NZ,...,New Zealand,,,1964,2024,4,3,1,"[TMAX, TMIN, PRCP, TAVG]",0.0
1,NZ000936150,-42.717,170.983,40.0,HOKITIKA AERODROME,,,93781,NZ,NZ,...,New Zealand,,,1965,2024,4,3,1,"[TMAX, TMIN, PRCP, TAVG]",869.623535
2,NZ000936150,-42.717,170.983,40.0,HOKITIKA AERODROME,,,93781,NZ,NZ,...,New Zealand,,,1962,2024,4,3,1,"[TMAX, TMIN, PRCP, TAVG]",743.137268
3,NZ000936150,-42.717,170.983,40.0,HOKITIKA AERODROME,,,93781,NZ,NZ,...,New Zealand,,,1997,2024,4,3,1,"[TMAX, TMIN, PRCP, TAVG]",224.981277
4,NZ000936150,-42.717,170.983,40.0,HOKITIKA AERODROME,,,93781,NZ,NZ,...,New Zealand,,,1948,2024,4,3,1,"[TMAX, TMIN, PRCP, TAVG]",461.815979


In [None]:
# Find the closest pair stations

In [80]:
from pyspark.sql.functions import col

closest_pair = nz_distances.filter(
    nz_distances["station1.ID"] != nz_distances["station2.ID"]
).orderBy("distance_km", ascending=True).limit(1)
closest_pair.show(5)

+-----------+--------+---------+---------+--------------------+--------+------------+------+------------+----+------------+----------+----------+-----------------+----------------+------------+---------------+----------------+--------------------+-----------+--------+---------+---------+--------------------+--------+------------+------+------------+----+------------+----------+----------+-----------------+----------------+------------+---------------+----------------+--------------------+-----------+
|         ID|LATITUDE|LONGITUDE|ELEVATION|                NAME|GSN_FLAG|HCN_CRN_FLAG|WMO_ID|COUNTRY_CODE|CODE|COUNTRY_NAME|STATE_CODE|STATE_NAME|FIRST_ACTIVE_YEAR|LAST_ACTIVE_YEAR|NUM_ELEMENTS|CORE_ELEMENT_CT|OTHER_ELEMENT_CT|  COLLECTED_ELEMENTS|         ID|LATITUDE|LONGITUDE|ELEVATION|                NAME|GSN_FLAG|HCN_CRN_FLAG|WMO_ID|COUNTRY_CODE|CODE|COUNTRY_NAME|STATE_CODE|STATE_NAME|FIRST_ACTIVE_YEAR|LAST_ACTIVE_YEAR|NUM_ELEMENTS|CORE_ELEMENT_CT|OTHER_ELEMENT_CT|  COLLECTED_ELEMENTS|d

In [81]:
closest_pair_row = closest_pair.first()
print(f"The distance between the closest stations in New Zealand is {closest_pair_row['distance_km']:.2f} kilometers.")


The distance between the closest stations in New Zealand is 50.53 kilometers.


In [None]:
#Q3

In [None]:
# (a)

In [None]:
# Default block size of HDFS

In [82]:
!hdfs getconf -confKey "dfs.blocksize"

134217728


In [None]:
#File size in daily - 2024

In [83]:
!hdfs dfs -du -h /data/ghcnd/daily/2024.csv.gz

26.2 M  209.8 M  /data/ghcnd/daily/2024.csv.gz


In [None]:
#File size in daily - 2023

In [84]:
!hdfs dfs -du -h /data/ghcnd/daily/2023.csv.gz

158.7 M  1.2 G  /data/ghcnd/daily/2023.csv.gz


In [None]:
# Number of Blocks in both years

In [85]:
!hdfs fsck /data/ghcnd/daily/2023.csv.gz -files -blocks -locations


Connecting to namenode via http://masternode2:9870/fsck?ugi=cgo82&files=1&blocks=1&locations=1&path=%2Fdata%2Fghcnd%2Fdaily%2F2023.csv.gz
FSCK started by cgo82 (auth:SIMPLE) from /192.168.40.11 for path /data/ghcnd/daily/2023.csv.gz at Mon Apr 29 17:16:55 NZST 2024

/data/ghcnd/daily/2023.csv.gz 166367488 bytes, replicated: replication=8, 2 block(s):  OK
0. BP-700027894-132.181.129.68-1626517177804:blk_1074057666_316853 len=134217728 Live_repl=8  [DatanodeInfoWithStorage[192.168.40.158:9866,DS-60dfaa6a-f7a4-4d6c-8e44-70a79274b7af,DISK], DatanodeInfoWithStorage[192.168.40.105:9866,DS-995c6d40-6fd7-4f6e-8812-148b6aac3e9f,DISK], DatanodeInfoWithStorage[192.168.40.134:9866,DS-0a049076-30db-4346-b90c-2e52cc905458,DISK], DatanodeInfoWithStorage[192.168.40.180:9866,DS-8012b6d5-226a-489a-bfa7-a2e221b8d1ff,DISK], DatanodeInfoWithStorage[192.168.40.142:9866,DS-18323328-25b7-46b9-beea-a2d0a1ab6423,DISK], DatanodeInfoWithStorage[192.168.40.173:9866,DS-81ea4712-59d5-45a7-8d68-848697c8fac8,DISK]

In [86]:
!hdfs fsck /data/ghcnd/daily/2024.csv.gz -files -blocks -locations

Connecting to namenode via http://masternode2:9870/fsck?ugi=cgo82&files=1&blocks=1&locations=1&path=%2Fdata%2Fghcnd%2Fdaily%2F2024.csv.gz
FSCK started by cgo82 (auth:SIMPLE) from /192.168.40.11 for path /data/ghcnd/daily/2024.csv.gz at Mon Apr 29 17:17:43 NZST 2024

/data/ghcnd/daily/2024.csv.gz 27492832 bytes, replicated: replication=8, 1 block(s):  OK
0. BP-700027894-132.181.129.68-1626517177804:blk_1074057694_316881 len=27492832 Live_repl=8  [DatanodeInfoWithStorage[192.168.40.183:9866,DS-d547a5f3-b56f-45ab-81b8-e9d492a4e1d3,DISK], DatanodeInfoWithStorage[192.168.40.106:9866,DS-39db1648-d1af-450f-999c-277feea9beeb,DISK], DatanodeInfoWithStorage[192.168.40.159:9866,DS-93ea3ab4-4367-4d25-8560-df00f71281a3,DISK], DatanodeInfoWithStorage[192.168.40.133:9866,DS-1395ddcb-8542-431a-8b4e-48509cab5de9,DISK], DatanodeInfoWithStorage[192.168.40.102:9866,DS-33686d75-da76-41d8-97de-5c93688abb7a,DISK], DatanodeInfoWithStorage[192.168.40.173:9866,DS-81ea4712-59d5-45a7-8d68-848697c8fac8,DISK], 

In [None]:
# (b)

In [None]:
# no of observations

In [87]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("GHCN Daily Analysis").getOrCreate()

# Load 2023 data
daily_2023_df = spark.read.csv("hdfs:///data/ghcnd/daily/2023.csv.gz", header=False, inferSchema=True)
count_2023 = daily_2023_df.count()
print(f"Number of observations in 2023: {count_2023}")



Number of observations in 2023: 37395852


In [88]:
# Load 2024 data
daily_2024_df = spark.read.csv("hdfs:///data/ghcnd/daily/2024.csv.gz", header=False, inferSchema=True)
count_2024 = daily_2024_df.count()
print(f"Number of observations in 2024: {count_2024}")


Number of observations in 2024: 6061827


In [89]:
num_partitions_2023 = daily_2023_df.rdd.getNumPartitions()
num_partitions_2024 = daily_2024_df.rdd.getNumPartitions()
print(f"Number of partitions in 2023 data: {num_partitions_2023}")
print(f"Number of partitions in 2024 data: {num_partitions_2024}")


Number of partitions in 2023 data: 1
Number of partitions in 2024 data: 1


In [None]:
#(c)

In [None]:
#Load and count the total number of observations in the years from 2014 to 2023

In [90]:
# Creating a list of paths for each year
paths = [f"hdfs:///data/ghcnd/daily/{year}.csv.gz" for year in range(2014, 2024)]

# Pass the list of paths directly to spark.read.text()
daily_2014_2023_df = spark.read.text(paths)


In [91]:
total_count_2014_2023 = daily_2014_2023_df.count()
print(f"Total number of observations from 2014 to 2023: {total_count_2014_2023}")


Total number of observations from 2014 to 2023: 369419065


In [92]:
num_partitions = daily_2014_2023_df.rdd.getNumPartitions()
print(f"Number of partitions (and likely number of tasks for reading): {num_partitions}")


Number of partitions (and likely number of tasks for reading): 10


In [93]:
stop_spark()