### Spark notebook ###

This notebook will only work in a Jupyter session running on `mathmadslinux2p`.

You can start your own Jupyter session on `mathmadslinux2p` and open this notebook in Chrome on the MADS Windows server by

**Steps**

1. Login to the MADS Windows server using https://mathportal.canterbury.ac.nz/.
2. Download or copy this notebook to your home directory.
3. Open powershell and run `ssh mathmadslinux2p`.
4. Run `start_pyspark_notebook` or `/opt/anaconda3/bin/jupyter-notebook --ip 132.181.129.68 --port $((8000 + $((RANDOM % 999))))`.
5. Copy / paste the url provided in the shell window into Chrome on the MADS Windows server.
6. Open the notebook from the Jupyter root directory (which is your home directory).
7. Run `start_spark()` to start a spark session in the notebook.
8. Run `stop_spark()` before closing the notebook or kill your spark application by hand using the link in the Spark UI.

In [1]:
# Run this cell to import pyspark and to define start_spark() and stop_spark()

import findspark

findspark.init()

import getpass
import pandas
import pyspark
import random
import re

from IPython.display import display, HTML
from pyspark import SparkContext
from pyspark.sql import SparkSession


# Functions used below

def username():
    """Get username with any domain information removed.
    """

    return re.sub('@.*', '', getpass.getuser())


def dict_to_html(d):
    """Convert a Python dictionary into a two column table for display.
    """

    html = []

    html.append(f'<table width="100%" style="width:100%; font-family: monospace;">')
    for k, v in d.items():
        html.append(f'<tr><td style="text-align:left;">{k}</td><td>{v}</td></tr>')
    html.append(f'</table>')

    return ''.join(html)


def show_as_html(df, n=20):
    """Leverage existing pandas jupyter integration to show a spark dataframe as html.
    
    Args:
        n (int): number of rows to show (default: 20)
    """

    display(df.limit(n).toPandas())

    
def display_spark():
    """Display the status of the active Spark session if one is currently running.
    """
    
    if 'spark' in globals() and 'sc' in globals():

        name = sc.getConf().get("spark.app.name")
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:green">active</span></b>, look for <code>{name}</code> under the running applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'<li><a href="{sc.uiWebUrl}" target="_blank">Spark Application UI</a></li>',
            f'</ul>',
            f'<p><b>Config</b></p>',
            dict_to_html(dict(sc.getConf().getAll())),
            f'<p><b>Notes</b></p>',
            f'<ul>',
            f'<li>The spark session <code>spark</code> and spark context <code>sc</code> global variables have been defined by <code>start_spark()</code>.</li>',
            f'<li>Please run <code>stop_spark()</code> before closing the notebook or restarting the kernel or kill <code>{name}</code> by hand using the link in the Spark UI.</li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))
        
    else:
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:red">stopped</span></b>, confirm that <code>{username() + " (jupyter)"}</code> is under the completed applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))


# Functions to start and stop spark

def start_spark(executor_instances=2, executor_cores=1, worker_memory=1, master_memory=1):
    """Start a new Spark session and define globals for SparkSession (spark) and SparkContext (sc).
    
    Args:
        executor_instances (int): number of executors (default: 2)
        executor_cores (int): number of cores per executor (default: 1)
        worker_memory (float): worker memory (default: 1)
        master_memory (float): master memory (default: 1)
    """

    global spark
    global sc

    user = username()
    
    cores = executor_instances * executor_cores
    partitions = cores * 4
    port = 4000 + random.randint(1, 999)

    spark = (
        SparkSession.builder
        .master("spark://masternode2:7077")
        .config("spark.driver.extraJavaOptions", f"-Dderby.system.home=/tmp/{user}/spark/")
        .config("spark.dynamicAllocation.enabled", "false")
        .config("spark.executor.instances", str(executor_instances))
        .config("spark.executor.cores", str(executor_cores))
        .config("spark.cores.max", str(cores))
        .config("spark.executor.memory", f"{worker_memory}g")
        .config("spark.driver.memory", f"{master_memory}g")
        .config("spark.driver.maxResultSize", "0")
        .config("spark.sql.shuffle.partitions", str(partitions))
        .config("spark.ui.port", str(port))
        .appName(user + " (jupyter)")
        .getOrCreate()
    )
    sc = SparkContext.getOrCreate()
    
    display_spark()

    
def stop_spark():
    """Stop the active Spark session and delete globals for SparkSession (spark) and SparkContext (sc).
    """

    global spark
    global sc

    if 'spark' in globals() and 'sc' in globals():

        spark.stop()

        del spark
        del sc

    display_spark()


# Make css changes to improve spark output readability

html = [
    '<style>',
    'pre { white-space: pre !important; }',
    'table.dataframe td { white-space: nowrap !important; }',
    'table.dataframe thead th:first-child, table.dataframe tbody th { display: none; }',
    '</style>',
]
display(HTML(''.join(html)))

In [2]:
# Run this cell to start a spark session in this notebook

start_spark(executor_instances=4, executor_cores=2, worker_memory=4, master_memory=4)

0,1
spark.app.name,kda115 (jupyter)
spark.dynamicAllocation.enabled,false
spark.executor.instances,4
spark.sql.warehouse.dir,file:/users/home/kda115/Spark/Assignment/Analysis/spark-warehouse
spark.ui.port,4634
spark.driver.memory,4g
spark.executor.memory,4g
spark.master,spark://masternode2:7077
spark.app.startTime,1725142988074
spark.executor.id,driver


In [3]:
# Import the pyspark API to defined data types
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql import Row

In [4]:
# Load the enriched_stations parquet file stored in HDFS 
stations = spark.read.parquet('hdfs:///user/kda115/ghcnd/result/enriched_stations.parquet')

# Show the station dataset 
stations.printSchema()
show_as_html(stations, 10)

root
 |-- Station_ID: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Elevation: double (nullable = true)
 |-- Station_Name: string (nullable = true)
 |-- GSN_Flag: string (nullable = true)
 |-- HCN_CRN_Flag: string (nullable = true)
 |-- WMO_ID: string (nullable = true)
 |-- COUNTRY_CODE: string (nullable = true)
 |-- Country_Name: string (nullable = true)
 |-- State_Code: string (nullable = true)
 |-- State_Name: string (nullable = true)
 |-- First_Year: integer (nullable = true)
 |-- Last_Year: integer (nullable = true)
 |-- Total_Years_Active: integer (nullable = true)
 |-- Total_Unique_Elements: long (nullable = true)
 |-- Core_Element_Count: integer (nullable = true)
 |-- Other_Element_Count: integer (nullable = true)



Unnamed: 0,Station_ID,Latitude,Longitude,Elevation,Station_Name,GSN_Flag,HCN_CRN_Flag,WMO_ID,COUNTRY_CODE,Country_Name,State_Code,State_Name,First_Year,Last_Year,Total_Years_Active,Total_Unique_Elements,Core_Element_Count,Other_Element_Count
0,AFM00040990,31.5,65.85,1010.0,KANDAHAR AIRPORT,,,40990,AF,Afghanistan,,,1973,2020,48,5,4,1
1,AGE00147718,34.85,5.72,125.0,BISKRA,,,60525,AG,Algeria,,,1880,2024,145,4,3,1
2,AGM00060417,36.383,3.883,560.0,BOUIRA,,,60417,AG,Algeria,,,1995,2024,30,5,4,1
3,AGM00060421,35.867,7.117,891.0,OUM EL BOUAGHI,,,60421,AG,Algeria,,,1985,2024,40,5,4,1
4,AGM00060531,35.017,-1.45,248.1,ZENATA,,,60531,AG,Algeria,,,1981,2024,44,5,4,1
5,AJ000037895,39.983,46.75,828.0,KHANKANDY,,,37895,AJ,Azerbaijan,,,1936,1991,56,5,4,1
6,AM000037782,40.4,44.3,1893.0,AMBERD (KOSHABULAKH),,,37782,AM,Armenia,,,1919,1992,74,1,1,0
7,AM000037791,40.4,44.683,1800.0,FANTAN,,,37791,AM,Armenia,,,1936,1992,57,5,4,1
8,AO000066447,-15.833,20.35,1088.0,MAVINGA,GSN,,66447,AO,Angola,,,1957,1975,19,4,3,1
9,AR000087828,-43.2,-65.266,43.0,TRELEW AERO,GSN,,87828,AR,Argentina,,,1956,2024,69,5,4,1


## Question 02

### A. Write a Spark function that computes the geographical distance between two stations using lat and longitude. 

The Haversine formula calculates the surface distance between two points on an object, like the Earth, using their latitude and longitude coordinates. It accounts for the object's curvature to provide an accurate distance measurement between the two points.  It assumes that the shape of the object (Earth in our case) is a sphere. (Bielski, 2019)

- Cite: (Bielski, N. (2019, May 6). Using a Custom UDF in PySpark to Compute Haversine Distances. Medium. https://medium.com/@nikolasbielski/using-a-custom-udf-in-pyspark-to-compute-haversine-distances-d877b77b4b18)

In [5]:
import math
# Define the haversine function
def haversine(longit_a, latit_a, longit_b, latit_b):
    "Write a haversine function to calculate the distance"
    # Check if the same station
    if longit_a == longit_b and latit_a == latit_b:
        return None  
    # Radius of Earth in kilometers. Use 3956 for miles.
    radius = 6371.0
    longit_a, latit_a, longit_b, latit_b = map(math.radians, [longit_a,  latit_a, longit_b, latit_b])
    
    # Difference in coordinates
    difference_lat = latit_b - latit_a
    difference_lon = longit_b - longit_a
    
    # Haversine formula
    area = math.sin(difference_lat/2)**2 + math.cos(latit_a) * math.cos(latit_b) * math.sin(difference_lon/2)**2
    central_angle = 2 * math.asin(math.sqrt(area))
    
    # Calculate Distance
    distance = central_angle * radius
    
    # Round this into 2 decimals
    return abs(round(distance, 2))

In [6]:
# Convert the function as a UDF in PySpark
udf_haversine_distance = F.udf(haversine)

#### A.1 Create the small subset of stations to test the function

In [7]:
# Create the small subset stations in United State to test the function
US_stations_subset = (stations
                    .where(F.col("Country_Code") == "US")
                    .select("Station_ID", "Station_Name","Latitude","Longitude")
                    .limit(10))
        
# Show the subet 
show_as_html(US_stations_subset, 10)

Unnamed: 0,Station_ID,Station_Name,Latitude,Longitude
0,US10adam007,HASTINGS 5.4 WSW,40.5389,-98.4713
1,US10adam023,JUNIATA 1.8 ENE,40.5981,-98.4732
2,US10box_007,ALLIANCE 5.9 NE,42.1675,-102.8005
3,US10box_020,HEMINGFORD 0.4 SW,42.3167,-103.0787
4,US10box_023,ALLIANCE 21.7 NNE,42.4093,-102.7894
5,US10brow008,AINSWORTH 18.7 SW,42.3248,-100.061
6,US10cass011,PLATTSMOUTH 3.1 WNW,41.0289,-95.9874
7,US10chas010,IMPERIAL 1.3 ENE,40.5253,-101.6226
8,US10cher011,VALENTINE 15.0 SSE,42.6624,-100.4779
9,US10cher026,WHITMAN 26.0 N,42.4186,-101.557


In [8]:
# Perform a CROSS JOIN to create pairs of stations
station_pairs = US_stations_subset.alias("station_a").crossJoin(US_stations_subset.alias("station_b"))

# Calculate the distance between the two stations in each row
total_distance = station_pairs.withColumn(
    "Distance_km", 
    udf_haversine_distance(
        F.col("station_a.Latitude"), F.col("station_a.Longitude"),
        F.col("station_b.Latitude"), F.col("station_b.Longitude")
    )
)

# Select and rename the necessary columns for clarity
total_distance = total_distance.select(
    F.col("station_a.Station_ID").alias("Station_ID_1"),
    F.col("station_a.Station_Name").alias("Station_1"),
    F.col("station_a.Latitude").alias("Latitude_1"),
    F.col("station_a.Longitude").alias("Longitude_1"),
    F.col("station_b.Station_ID").alias("Station_ID_2"),
    F.col("station_b.Station_Name").alias("Station_2"),
    F.col("station_b.Latitude").alias("Latitude_2"),
    F.col("station_b.Longitude").alias("Longitude_2"),
    F.col("Distance_km").alias("Distance")
)

# Show the result with renamed columns
show_as_html(total_distance, 10)

Unnamed: 0,Station_ID_1,Station_1,Latitude_1,Longitude_1,Station_ID_2,Station_2,Latitude_2,Longitude_2,Distance
0,US10box_010,ALLIANCE 29.1 W,42.116,-103.4295,US10buff020,KEARNEY 3.0 NNE,40.7407,-99.0647,486.23
1,US10box_010,ALLIANCE 29.1 W,42.116,-103.4295,US10ceda014,LAUREL 3.9 ESE,42.3951,-97.0319,711.4
2,US10box_010,ALLIANCE 29.1 W,42.116,-103.4295,US10chas006,CHAMPION 5.2 WNW,40.4949,-101.8424,180.81
3,US10box_010,ALLIANCE 29.1 W,42.116,-103.4295,US10chas027,CHAMPION 0.0 WNW,40.4702,-101.748,191.16
4,US10box_010,ALLIANCE 29.1 W,42.116,-103.4295,US10chey010,DALTON 4.5 WSW,41.3766,-103.0495,46.26
5,US10box_010,ALLIANCE 29.1 W,42.116,-103.4295,US10cumi012,BANCROFT 0.2 NW,42.0149,-96.5756,762.12
6,US10box_010,ALLIANCE 29.1 W,42.116,-103.4295,US10cust021,MERNA 11.4 W,41.4517,-99.9745,384.46
7,US10box_010,ALLIANCE 29.1 W,42.116,-103.4295,US10doug020,OMAHA 5.2 WNW,41.2903,-96.0279,823.15
8,US10box_010,ALLIANCE 29.1 W,42.116,-103.4295,US10fill019,OHIOWA 2.2 NW,40.4362,-97.483,662.02
9,US10box_010,ALLIANCE 29.1 W,42.116,-103.4295,US10fran002,HILDRETH 4.5 SSE,40.2732,-99.0295,490.82


### B. Apply this function to compute the pairwise distances between all stations in New Zealand

In [9]:
# Filter stations located in New Zealand using "Country_Code"
NZ_stations = (stations
                .where(F.col("Country_Code") == "NZ")
                .select("Station_ID","Station_Name","Latitude","Longitude"))

# Show the result 
show_as_html(NZ_stations,10)

Unnamed: 0,Station_ID,Station_Name,Latitude,Longitude
0,NZ000933090,NEW PLYMOUTH AWS,-39.017,174.183
1,NZ000093844,INVERCARGILL AIRPOR,-46.417,168.333
2,NZ000939450,CAMPBELL ISLAND AWS,-52.55,169.167
3,NZM00093929,ENDERBY ISLAND AWS,-50.483,166.3
4,NZ000093417,PARAPARAUMU AWS,-40.9,174.983
5,NZM00093781,CHRISTCHURCH INTL,-43.489,172.532
6,NZ000937470,TARA HILLS,-44.517,169.9
7,NZ000939870,CHATHAM ISLANDS AWS,-43.95,-176.567
8,NZ000093292,GISBORNE AERODROME,-38.65,177.983
9,NZ000093012,KAITAIA,-35.1,173.267


In [10]:
# Perform a cross join to create pairs of New Zealand stations 
NZ_station_pairs = NZ_stations.alias("station_a").crossJoin(NZ_stations.alias("station_b"))

# Calculate the distance between the two stations in each row
NZ_station_pairs = NZ_station_pairs.withColumn(
    "Distance_km",
    udf_haversine_distance(
        F.col("station_a.Latitude"), F.col("station_a.Longitude"),
        F.col("station_b.Latitude"), F.col("station_b.Longitude")
    ).cast('double')
)

# Step 4: Select and rename the necessary columns for clarity
NZ_Stations_Distance = NZ_station_pairs.select(
    F.col("station_a.Station_ID").alias("Station_ID_1"),
    F.col("station_a.Station_Name").alias("Station_Name_1"),
    F.col("station_a.Latitude").alias("Latitude_1"),
    F.col("station_a.Longitude").alias("Longitude_1"),
    F.col("station_b.Station_ID").alias("Station_ID_2"),
    F.col("station_b.Station_Name").alias("Station_Name_2"),
    F.col("station_b.Latitude").alias("Latitude_2"),
    F.col("station_b.Longitude").alias("Longitude_2"),
    F.col("Distance_km").alias("Distance")
)

# Show the result 
show_as_html(NZ_Stations_Distance, 5)

Unnamed: 0,Station_ID_1,Station_Name_1,Latitude_1,Longitude_1,Station_ID_2,Station_Name_2,Latitude_2,Longitude_2,Distance
0,NZ000933090,NEW PLYMOUTH AWS,-39.017,174.183,NZ000933090,NEW PLYMOUTH AWS,-39.017,174.183,0.0
1,NZ000933090,NEW PLYMOUTH AWS,-39.017,174.183,NZ000093844,INVERCARGILL AIRPOR,-46.417,168.333,1041.12
2,NZ000933090,NEW PLYMOUTH AWS,-39.017,174.183,NZ000939450,CAMPBELL ISLAND AWS,-52.55,169.167,1589.44
3,NZ000933090,NEW PLYMOUTH AWS,-39.017,174.183,NZM00093929,ENDERBY ISLAND AWS,-50.483,166.3,1531.11
4,NZ000933090,NEW PLYMOUTH AWS,-39.017,174.183,NZ000093417,PARAPARAUMU AWS,-40.9,174.983,226.63


In [None]:
# Save the NZ_Stations_Distiance result to HDFS directory
(NZ_Stations_Distance.write.option('header', True).mode('overwrite')
.parquet('hdfs:///user/kda115/ghcnd/result/NZ_Stations_Distance.parquet'))

In [None]:
# Check the save file 
! hdfs dfs -ls 'hdfs:///user/kda115/ghcnd/result/'

#### B.1 What 2 stations are geographically closest together in New Zealand?

In [11]:
# Filter out pairs where Station_ID_1 is equal to Station_ID_2 (i.e., self-pairs)
Shortest_Distance = NZ_Stations_Distance.filter(F.col("Station_ID_1") != F.col("Station_ID_2"))

# Find the pair with the minimum distance
show_as_html(Shortest_Distance.sort('Distance', ascending = True),1)

Unnamed: 0,Station_ID_1,Station_Name_1,Latitude_1,Longitude_1,Station_ID_2,Station_Name_2,Latitude_2,Longitude_2,Distance
0,NZM00093439,WELLINGTON AERO AWS,-41.333,174.8,NZ000093417,PARAPARAUMU AWS,-40.9,174.983,52.09


In [12]:
# Run this cell before closing the notebook or kill your spark application by hand using the link in the Spark UI

stop_spark()