### Spark notebook ###

This notebook will only work in a Jupyter notebook or Jupyter lab session running on the cluster master node in the cloud.

Follow the instructions on the computing resources page to start a cluster and open this notebook.

**Steps**

1. Connect to the Windows server using Windows App.
2. Connect to Kubernetes.
3. Start Jupyter and open this notebook from Jupyter in order to connect to Spark.

In [None]:
# Run this cell to import pyspark and to define start_spark() and stop_spark()

import findspark

findspark.init()

import getpass
import pandas
import pyspark
import random
import re

from IPython.display import display, HTML
from pyspark import SparkContext
from pyspark.sql import SparkSession


# Constants used to interact with Azure Blob Storage using the hdfs command or Spark

global username

username = re.sub('@.*', '', getpass.getuser())

global azure_account_name
global azure_data_container_name
global azure_user_container_name
global azure_user_token

azure_account_name = "madsstorage002"
azure_data_container_name = "campus-data"
azure_user_container_name = "campus-user"
azure_user_token = r"sp=racwdl&st=2025-08-01T09:41:33Z&se=2026-12-30T16:56:33Z&spr=https&sv=2024-11-04&sr=c&sig=GzR1hq7EJ0lRHj92oDO1MBNjkc602nrpfB5H8Cl7FFY%3D"


# Functions used below

def dict_to_html(d):
    """Convert a Python dictionary into a two column table for display.
    """
    html = []
    html.append(f'<table width="100%" style="width:100%; font-family: monospace;">')
    for k, v in d.items():
        html.append(f'<tr><td style="text-align:left;">{k}</td><td>{v}</td></tr>')
    html.append(f'</table>')
    return ''.join(html)


def show_as_html(df, n=20):
    """Leverage existing pandas jupyter integration to show a spark dataframe as html.
    
    Args:
        n (int): number of rows to show (default: 20)
    """

    display(df.limit(n).toPandas())

    
def display_spark():
    """Display the status of the active Spark session if one is currently running.
    """
    
    if 'spark' in globals() and 'sc' in globals():
        name = sc.getConf().get("spark.app.name")
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:green">active</span></b>, look for <code>{name}</code> under the running applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://localhost:{sc.uiWebUrl.split(":")[-1]}" target="_blank">Spark Application UI</a></li>',
            f'</ul>',
            f'<p><b>Config</b></p>',
            dict_to_html(dict(sc.getConf().getAll())),
            f'<p><b>Notes</b></p>',
            f'<ul>',
            f'<li>The spark session <code>spark</code> and spark context <code>sc</code> global variables have been defined by <code>start_spark()</code>.</li>',
            f'<li>Please run <code>stop_spark()</code> before closing the notebook or restarting the kernel or kill <code>{name}</code> by hand using the link in the Spark UI.</li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))
        
    else: 
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:red">stopped</span></b>, confirm that <code>{username} (notebook)</code> is under the completed applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))


# Functions to start and stop spark

def start_spark(executor_instances=2, executor_cores=1, worker_memory=1, master_memory=1):
    """Start a new Spark session and define globals for SparkSession (spark) and SparkContext (sc).
    
    Args:
    
        executor_instances (int): number of executors (default: 2)
        executor_cores (int): number of cores per executor (default: 1)
        worker_memory (float): worker memory (default: 1)
        master_memory (float): master memory (default: 1)
    """

    global spark
    global sc

    cores = executor_instances * executor_cores
    partitions = cores * 4
    port = 4000 + random.randint(1, 999)

    spark = (
        SparkSession.builder
        .config("spark.driver.extraJavaOptions", f"-Dderby.system.home=/tmp/{username}/spark/")
        .config("spark.dynamicAllocation.enabled", "false")
        .config("spark.executor.instances", str(executor_instances))
        .config("spark.executor.cores", str(executor_cores))
        .config("spark.cores.max", str(cores))
        .config("spark.driver.memory", f'{master_memory}g')
        .config("spark.executor.memory", f'{worker_memory}g')
        .config("spark.driver.maxResultSize", "0")
        .config("spark.sql.shuffle.partitions", str(partitions))
        .config("spark.kubernetes.container.image", "madsregistry001.azurecr.io/hadoop-spark:v3.3.5-openjdk-8")
        .config("spark.kubernetes.container.image.pullPolicy", "IfNotPresent")
        .config("spark.kubernetes.memoryOverheadFactor", "0.3")
        .config("spark.memory.fraction", "0.1")
        .config(f"fs.azure.sas.{azure_user_container_name}.{azure_account_name}.blob.core.windows.net",  azure_user_token)
        .config("spark.app.name", f"{username} (notebook)")
        .getOrCreate()
    )
    sc = SparkContext.getOrCreate()
    
    display_spark()

    
def stop_spark():
    """Stop the active Spark session and delete globals for SparkSession (spark) and SparkContext (sc).
    """
    global spark
    global sc
    if 'spark' in globals() and 'sc' in globals():
        spark.stop()
        del spark
        del sc
    display_spark()


# Make css changes to improve spark output readability

html = [
    '<style>',
    'pre { white-space: pre !important; }',
    'table.dataframe td { white-space: nowrap !important; }',
    'table.dataframe thead th:first-child, table.dataframe tbody th { display: none; }',
    '</style>',
]
display(HTML(''.join(html)))

### Assignment 2 ###

- MSD containers:
  - `wasbs://campus-data@madsstorage002.blob.core.windows.net/msd/` 

- MY containers:
  - `wasbs://campus-user@madsstorage002.blob.core.windows.net/`


In [2]:
# Run this cell to start a spark session in this notebook

start_spark(executor_instances=4, executor_cores=2, worker_memory=4, master_memory=4)

25/10/06 10:41:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


0,1
spark.dynamicAllocation.enabled,false
spark.fs.azure.sas.campus-user.madsstorage002.blob.core.windows.net,"""sp=racwdl&st=2025-08-01T09:41:33Z&se=2026-12-30T16:56:33Z&spr=https&sv=2024-11-04&sr=c&sig=GzR1hq7EJ0lRHj92oDO1MBNjkc602nrpfB5H8Cl7FFY%3D"""
spark.kubernetes.driver.pod.name,spark-master-driver
spark.executor.instances,4
spark.driver.memory,4g
spark.kubernetes.namespace,dew59
spark.kubernetes.container.image.pullPolicy,IfNotPresent
spark.sql.shuffle.partitions,32
spark.driver.extraJavaOptions,-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false -Dderby.system.home=/tmp/dew59/spark/
spark.serializer.objectStreamReset,100


In [None]:
# My Imports
 
from IPython.display     import display  # calls between environments
from math                import acos, atan2, cos, radians, sin, sqrt
from matplotlib.ticker   import FuncFormatter, MaxNLocator
from pathlib             import Path
from pyspark.sql         import DataFrame
from pyspark.sql         import DataFrame as SparkDF
from pyspark.sql         import functions as F, types as T
from pyspark.sql.types   import *
from pyspark.sql.utils   import AnalysisException
from pyspark.sql.window  import Window
from time                import perf_counter  # Add this line for benchmark functions
from typing              import List, Optional, Tuple
from rich.tree           import Tree
from rich.console        import Console
import itertools         as it
import matplotlib.dates  as mdates
import matplotlib.pyplot as plt
import numpy             as np
import pandas            as pd
import warnings

warnings.filterwarnings("ignore", category=UserWarning)
console = Console()

import math, os, platform, re
import subprocess, sys, time

#The following shows the data structure

In [None]:
# overall time metric
notebook_run_time = time.time()

# Use the hdfs command to explore the data in Azure Blob Storage
#USERNAME    = "dew59"
WASBS_DATA  = "wasbs://campus-data@madsstorage002.blob.core.windows.net/msd/"
WASBS_USER  = f"wasbs://campus-user@madsstorage002.blob.core.windows.net/{username}/A2/"

WASBS_USER          = "wasbs://campus-user@madsstorage002.blob.core.windows.net/{}".format(USERNAME)
#WASBS_YEAR_SIZE     = "{}/years_size_metrics.parquet/".format(WASBS_USER)

 
#stations_path = f'wasbs://{azure_data_container_name}@{azure_account_name}.blob.core.windows.net/{stations_write_path}'
#common_data_path    = f'wasbs://{azure_data_container_name}@{azure_account_name}.blob.core.windows.net/'
#stations_read_name  =  inventory_read_name = ""
#stations_read_name  =  inventory_read_name = ""
 

print("Spark:", spark.version)
hprint("PATHS")
print("WASBS_DATA          :", WASBS_DATA)
print("WASBS_USER          :", WASBS_USER) 
print()

Spark: 3.5.1
------------ PATHS ------------
USERNAME            : dew59
WASBS_DATA          : wasbs://campus-data@madsstorage002.blob.core.windows.net/ghcnd/
WASBS_DAILY         : wasbs://campus-data@madsstorage002.blob.core.windows.net/ghcnd/daily/
WASBS_USER          : wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59
WASBS_YEAR_SIZE     : wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/years_size_metrics.parquet/
WASBS_METADATA_SIZE : wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/metadata_size_metrics.parquet/

stations_read_name   : wasbs://campus-data@madsstorage002.blob.core.windows.net/ghcnd/ghcnd-stations.txt
inventory_read_name  : wasbs://campus-data@madsstorage002.blob.core.windows.net/ghcnd/ghcnd-inventory.txt
countries_read_name  : wasbs://campus-data@madsstorage002.blob.core.windows.net/ghcnd/ghcnd-countries.txt
states_read_name     : wasbs://campus-data@madsstorage002.blob.core.windows.net/ghcnd/ghcnd-states.txt

station_date_

In [None]:
# HELPER AND DIAGNOSTIC FUNCTIONS

notebook_run_time = time.time()
print("_" * 35 + "HELPER / DIAGNOSTIC FUNCTIONS" + "_" * 35)

def hprint(text: str="", l=50):
    """Print formatted section header"""
    n = len(text)
    n = abs(n - l) // 2
    print("\n" + "_" * n + text + "_" * n)

def cleanup_parquet_files(cleanup=False):
    """Clean up existing parquet files in user directory.
    
    Args:
        cleanup (bool): When True, actually DELETES FILES. 
                        When False, only LISTS files.
    """
    hprint("Clean up existing parquet files")

    print("[cleanup] Listing files BEFORE cleanup:")
    get_ipython().system(f'hdfs dfs -ls {WASBS_USER}/*.parquet')
    
    if cleanup:
        print("\n[cleanup] Deleting all parquet folders...")
        get_ipython().system(f'hdfs dfs -rm -r -f {WASBS_USER}/*.parquet')
        
        print("\n[info] Listing files AFTER cleanup:")
        get_ipython().system(f'hdfs dfs -ls {WASBS_USER}/*.parquet')
        print("\n[cleanup] Parquet file cleanup complete - ready to restart Processing run with clean schema")

    else:
        print("\n[info] To actually delete files, call: cleanup_parquet_files(cleanup=True)")

def normalise_ids(df: DataFrame, col: str = "ID") -> DataFrame:
    """
    # Single source of truth for ID normalisation 
    Upper + trim + distinct on the given ID column.
    """
    print(f"[INFO] normalise_ids() on column: {col}")
    df.printSchema()
    df.show(20)
    return df.select(F.upper(F.trim(F.col(col))).alias("ID")).distinct()
    df.printSchema()
    df.show(20)

def df_as_html(df, n: int = 5, right_align: bool = False, show_index: bool = False):
    """
    HTML preview via pandas with no truncation. If right_align=True,
    only numeric columns are right-justified; everything else is 
    explicitly left-aligned.
    """
    pdf = df.limit(n).toPandas()
    print("[INFO] Converting Spark → pandas for HTML display (rows:", len(pdf), ")")
    print("[INFO] right_align (numeric columns):", right_align)

    with pd.option_context(
        "display.max_colwidth", None,   
        "display.max_columns", None,    
        "display.width", None            
    ):
        styler = pdf.style if show_index else pdf.style.hide(axis="index")

        #   table alignment: left for both headers and cells
        styler = styler.set_table_styles(
            [
                {"selector": "th", "props": [("text-align", "left")]},
                {"selector": "td", "props": [("text-align", "left")]},
            ],
            overwrite=True,  # make this the baseline
        )
         
        if right_align:
            numeric_cols = list(pdf.select_dtypes(include=["number"]).columns)
            print("[INFO] Right-aligning numeric columns:", numeric_cols)
            if numeric_cols:
                styler = styler.set_properties(subset=numeric_cols,
                                               **{"text-align": "right"})
        display(styler)

def show_df(df, n: int = 10, name: str = "", right_align: bool = False):
    """
    Print schema, 
    show an HTML sample,
    and row count.
    """
    bprint()
    print("name : ",name)
    df.printSchema()
    print("[check] sample:")
    df_as_html(df, n=n, right_align=right_align)

def write_parquet(df, dir_as_path: str, df_name:str = ""):    
    funct_time = time.time()
    path = _normalise_dir(dir_as_path)
    print(f"[file] write_parquet  : {path}")
    try:      
        show_df(df,df_name)
    except Exception as e:
        print("[catch] sample failed:", e)
        os.system(f'hdfs dfs -rm -r -f "{path}"')   # idempotent cleanup
    df.write.mode("overwrite").format("parquet").save(path)
    os.system(f'hdfs dfs -ls -R "{path}"')
    funct_time = time.time() - funct_time 
    print(f"[time] write_parquet (min)   : {funct_time/60:5.2f}")
    print(f"[time] write_parquet (sec)   : {funct_time:5.2f}")

def has_parquet(dir_as_path: str) -> bool:
    path   = _normalise_dir( dir_as_path)
    marker = path + '_SUCCESS'
    #print("\n[check] dir_path:", dir_path)
    #print("\n[check] path    :", path)
    print("\n[check] marker  :", marker)
    rc = os.system(f'hdfs dfs -test -e "{marker}"')
    print("[check] rc:", rc, "->", ("exists" if rc == 0 else "missing"))
    return (rc == 0)

def _to_spark(df_like, schema=None):
    """
    Return a Spark DataFrame  .
    """
    if isinstance(df_like, SparkDF):
        return df_like
    return spark.createDataFrame(df_like, schema=schema) if schema else spark.createDataFrame(df_like)

def ensure_dir(path: str) -> str:
    """
    ensures that path is a path 
    and not representing a file;
    add trailing slash if needed
    """
    if path is None:
        raise ValueError("Path is None")
    path = _normalise_dir(path)
#   print("ensure_dir -> ",path)
    return path

def _normalise_dir(s: str) -> str:
    """
    Ensure trailing slash so we point to
    the dataset directory (not a file)
    """
    return s if s.endswith("/") else s + "/"

def _success_exists(target_dir: str) -> bool:
    """
    Check for the Hadoop/Spark _SUCCESS marker;  
    """
    jvm = spark._jvm
    hconf = spark._jsc.hadoopConfiguration()
    try:
        uri = jvm.java.net.URI(target_dir)
        fs = jvm.org.apache.hadoop.fs.FileSystem.get(uri, hconf)
        success = jvm.org.apache.hadoop.fs.Path(target_dir + "_SUCCESS")
        exists = fs.exists(success)
        print(f"[status] _SUCCESS check at: {target_dir}_SUCCESS -> {exists}")
        return bool(exists)
    except Exception as e:
        print(f"[status] _SUCCESS check failed ({e}); attempting read-probe …")
        try:
            spark.read.parquet(target_dir).limit(1).count()
            print(f"[dewstatus59] read-probe succeeded at: {target_dir}")
            return True
        except Exception as e2:
            print(f"[status] read-probe failed ({e2}); treating as not existing.")
            return False

def _count_unique_ids(df: DataFrame) -> int:
    return normalise_ids(df).count()

 
# Where to save diagnostics (use your username as requested)

# Back-compat aliases hack to account for non-disciplined naming un-convention
# hack 
_ids       = normalise_ids
canon_ids  = normalise_ids
_canon_ids = normalise_ids

#print("[TEST] Using _canon_ids:", _canon_ids(stations).count())
#print("[TEST] Using canon_ids :", canon_ids(stations).count())
#print("[TEST] Using _ids      :", _ids(stations).count())

# : pairwise city distances in km using Spark built-ins 
def pairwise_city_distances_spark(cities, radius_km=6371.0):
    """
    cities: list[tuple[str, float, float]] -> [(name, lat_deg, lon_deg), ...]
    returns: Spark DataFrame with columns:
             city_a, city_b, haversine_km, slc_km, delta_km, delta_pct
    """
  #  from pyspark.sql import SparkSession, functions as F, types as T

    spark = SparkSession.getActiveSession()
    if spark is None:
        raise RuntimeError("No active Spark session.")

    schema = T.StructType([
        T.StructField("city", T.StringType(), False),
        T.StructField("lat",  T.DoubleType(), False),
        T.StructField("lon",  T.DoubleType(), False),
        ])
    df = spark.createDataFrame(cities, schema)

    a, b = df.alias("a"), df.alias("b")
    pairs = (a.join(b, F.col("a.city") < F.col("b.city"))
               .select(F.col("a.city").alias("city_a"),
                       F.col("b.city").alias("city_b"),
                       F.col("a.lat").alias("lat1"),
                       F.col("a.lon").alias("lon1"),
                       F.col("b.lat").alias("lat2"),
                       F.col("b.lon").alias("lon2")))

    R = F.lit(float(radius_km))
    lat1 = F.radians(F.col("lat1"));  lat2 = F.radians(F.col("lat2"))
    dlat = lat2 - lat1
    dlon = F.radians(F.col("lon2") - F.col("lon1"))

    a_term = F.sin(dlat/2)**2 + F.cos(lat1)*F.cos(lat2)*F.sin(dlon/2)**2
    c_term = 2*F.atan2(F.sqrt(a_term), F.sqrt(1 - a_term))
    hav_km = R * c_term

    cos_val = F.sin(lat1)*F.sin(lat2) + F.cos(lat1)*F.cos(lat2)*F.cos(dlon)
    cos_val = F.greatest(F.lit(-1.0), F.least(F.lit(1.0), cos_val))
    slc_km = R * F.acos(cos_val)

    delta_km  = F.abs(hav_km - slc_km)
    delta_pct = F.when(hav_km == 0, F.lit(0.0)).otherwise(delta_km / hav_km * 100.0)

    out_df = (pairs
              .withColumn("haversine_km", F.round(hav_km, 2))
              .withColumn("slc_km",       F.round(slc_km, 2))
              .withColumn("delta_km",     F.round(delta_km, 4))
              .withColumn("delta_pct",    F.round(delta_pct, 6))
              .select("city_a", "city_b", "haversine_km", "slc_km", "delta_km", "delta_pct")
              .orderBy("haversine_km"))
    return out_df

# --- Timing helpers for Spark & pure Python (no extra deps)

def benchmark_python_distances(cities, radius_km=6371.0, repeats=50000):
    """
    cities: [(name, lat_deg, lon_deg), ...]  (3 cities => 3 pairs)
    repeats: loop count to make timings stable
    returns: dict with seconds for haversine/slc
    """
    pairs = []
    for i in range(len(cities)):
        for j in range(i+1, len(cities)):
            (_, lat1, lon1), (_, lat2, lon2) = cities[i], cities[j]
            pairs.append((lat1, lon1, lat2, lon2))

    # haversine
    t0 = perf_counter()
    for _ in range(repeats):
        for lat1, lon1, lat2, lon2 in pairs:
            φ1, λ1, φ2, λ2 = map(radians, (lat1, lon1, lat2, lon2))
            dφ, dλ = (φ2 - φ1), (λ2 - λ1)
            a = sin(dφ/2)**2 + cos(φ1)*cos(φ2)*sin(dλ/2)**2
            c = 2*atan2(sqrt(a), sqrt(1 - a))
            _ = radius_km * c
    t1 = perf_counter()

    # spherical law of cosines (SLC)
    t2 = perf_counter()
    for _ in range(repeats):
        for lat1, lon1, lat2, lon2 in pairs:
            φ1, λ1, φ2, λ2 = map(radians, (lat1, lon1, lat2, lon2))
            cosv = sin(φ1)*sin(φ2) + cos(φ1)*cos(φ2)*cos(λ2 - λ1)
            cosv = max(-1.0, min(1.0, cosv))
            _ = radius_km * acos(cosv)
    t3 = perf_counter()

    return {
        "python_haversine_sec": t1 - t0,
        "python_slc_sec":       t3 - t2,
        "repeats": repeats,
        "pairs": len(pairs),
    }

def _parse_ls_bytes(line): 
    parts = line.split()
    if len(parts) < 8:
        return None, None
    try:
        size = int(parts[4])
    except ValueError:
        return None, None
    return size, parts[-1]

def _parse_du_bytes(line):
    parts = line.split()
    if len(parts) < 2:
        return None, None
    try:
        size = int(parts[0])
    except ValueError:
        return None, None
    return size, parts[-1]

def du_bytes(path):
    lines = get_ipython().getoutput(f'hdfs dfs -du "{path}"')
    total = 0
    for ln in lines:
        parts = ln.split()
        if len(parts) >= 2:
            try:
                total += int(parts[0])
            except ValueError:
                pass
    return total
    
def benchmark_spark_distances(cities, radius_km=6368.6, repeats=3):
    """
    Uses Spark built-ins only. Measures full execution
    time by forcing an action.
    
    returns: dict with seconds for haversine/slc and
    row counts used.
    
    For the radius:
    
    The Earth is slightly flattened, so the geocentric 
    radius depends on latitude.  For context: 
    
    * equatorial radius = 6,378.137 km; 
    * polar radius      = 6,356.752 km 
    
    Across New Zealand's latitudes (≈36–47°S), using the
    WGS-84 ellipsoid, you get roughly:

    Auckland (37°S):       ~6,370.4 km
    Christchurch (43.5°S): ~6,368.0 km
    Dunedin (45.9°S):      ~6,367.2 km
    __________________________________
    Wellington (41°S):     ~6,369.0 km
    mean                  ≈ 6,368.6 km
    """

    
    try:
        from pyspark.sql import SparkSession, functions as F, types as T
    except Exception:
        return None  # no Spark therefore save cannot run in vs code

    spark = SparkSession.getActiveSession()
    if spark is None:
        return None

    # build pairs once and cache
    schema = T.StructType([
        T.StructField("city", T.StringType(), False),
        T.StructField("lat",  T.DoubleType(), False),
        T.StructField("lon",  T.DoubleType(), False),
    ])
    df = spark.createDataFrame(cities, schema)
    a, b = df.alias("a"), df.alias("b")
    pairs = (a.join(b, F.col("a.city") < F.col("b.city"))
               .select(F.col("a.lat").alias("lat1"),
                       F.col("a.lon").alias("lon1"),
                       F.col("b.lat").alias("lat2"),
                       F.col("b.lon").alias("lon2"))
               .cache())
    _ = pairs.count()

    R = F.lit(float(radius_km))
    lat1 = F.radians(F.col("lat1")); lat2 = F.radians(F.col("lat2"))
    dlat = lat2 - lat1
    dlon = F.radians(F.col("lon2") - F.col("lon1"))

    # Haversine expr
    a_term = F.sin(dlat/2)**2 + F.cos(lat1)*F.cos(lat2)*F.sin(dlon/2)**2
    c_term = 2*F.atan2(F.sqrt(a_term), F.sqrt(1 - a_term))
    hav    = R * c_term

    # SLC expr
    cosv = F.sin(lat1)*F.sin(lat2) + F.cos(lat1)*F.cos(lat2)*F.cos(dlon)
    cosv = F.greatest(F.lit(-1.0), F.least(F.lit(1.0), cosv))
    slc = R * F.acos(cosv)

    # time Haversine
    t0 = perf_counter()
    for _ in range(repeats):
        _ = pairs.select(hav.alias("d")).agg(F.sum("d")).collect()
    t1 = perf_counter()

    # time SLC
    t2 = perf_counter()
    for _ in range(repeats):
        _ = pairs.select(slc.alias("d")).agg(F.sum("d")).collect()
    t3 = perf_counter()

    return {
        "spark_pairs": pairs.count(),
        "spark_repeats": repeats,
        "spark_haversine_sec": t1 - t0,
        "spark_slc_sec":       t3 - t2,
    }


def list_hdfs_csvgz_files(hdfs_path = WASBS_DATA, debug=False):
    """
    Lists .csv.gz files from an HDFS directory, extracting year and file size.

    Parameters
    ----------
    hdfs_path : str
        The HDFS path to list, e.g. 'wasbs://campus-data@...'
    debug : bool, optional
        If True, prints intermediate parsing steps.

    Returns
    -------
    list of tuple
        A list of (year, size) tuples for each .csv.gz file.
    """
    cmd = f"hdfs dfs -ls {hdfs_path}"
    result = subprocess.run(cmd, shell=True, capture_output=True, text=True)

    lines = result.stdout.strip().split("\n")
    rows = []

    for line in lines:
        parts = line.split()
        if debug:
            print("Parts:", parts)
        if len(parts) < 6:
            continue
        try:
            size = int(parts[2])
        except ValueError:
            continue
        path = parts[-1]
        if path.endswith(".csv.gz"):
            try:
                year = int(path.split("/")[-1].replace(".csv.gz", ""))
                rows.append((year, size))
            except ValueError:
                continue

    if debug:
        print("_____________________________________________________")
        print("Sample parsed rows:", rows[:5])

    return rows


___________________________________HELPER / DIAGNOSTIC FUNCTIONS___________________________________


In [None]:

# USE SPARINGLY - these are for diagnostics only
# Set cleanup=True to actually delete files, or False to just list them 
# LEAVE cleanup=False after running this cell once! 
# if they have been created and are correct, change cleanup=False for quicker runs. 
cleanup_parquet_files(cleanup=False)


_________Clean up existing parquet files_________
[info] Listing files BEFORE cleanup:
Found 2 items
-rw-r--r--   1 dew59 supergroup          0 2025-10-05 11:21 wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/countries.parquet/_SUCCESS
-rw-r--r--   1 dew59 supergroup       4080 2025-10-05 11:21 wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/countries.parquet/part-00000-ebb561da-0fa0-4105-8b8b-9d96a72518f2-c000.snappy.parquet
Found 9 items
-rw-r--r--   1 dew59 supergroup          0 2025-10-05 19:07 wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/enriched_stations.parquet/_SUCCESS
-rw-r--r--   1 dew59 supergroup     523589 2025-10-05 19:07 wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/enriched_stations.parquet/part-00000-610ec7d9-6381-4c13-a1f8-bf3a93358628-c000.snappy.parquet
-rw-r--r--   1 dew59 supergroup     470777 2025-10-05 19:07 wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/enriched_stations.parquet/p

In [1]:
# overall time metric
start_notebook = time.time() 
start_time = start_notebook.strftime("%Y.%m.%d %H:%M")
hprint(f"started at: {start_time}")

# Use the hdfs command to explore the data in Azure Blob Storage
#!hdfs dfs -ls wasbs://{azure_data_container_name}@{azure_account_name}.blob.core.windows.net/msd/
!hdfs dfs -ls    -h {WASBS_DATA} 
!hdfs dfs -du -s -h {WASBS_DATA} 
!hdfs dfs -ls    -h {WASBS_USER} 
!hdfs dfs -du -s -h {WASBS_USER} 

NameError: name 'time' is not defined

In [None]:
ghcnd/daily/


13.0 G  13.0 G  wasbs://campus-data@madsstorage002.blob.core.windows.net/ghcnd/daily
Found 264 items
-rwxrwxrwx   1      1.3 M 2025-08-01 21:30 wasbs://campus-data@madsstorage002.blob.core.windows.net/ghcnd/daily/1750.csv.gz
-rwxrwxrwx   1      3.3 K 2025-08-01 21:30 wasbs://campus-data@madsstorage002.blob.core.windows.net/ghcnd/daily/1763.csv.gz
-rwxrwxrwx   1      3.2 K 2025-08-01 21:30 wasbs://campus-data@madsstorage002.blob.core.windows.net/ghcnd/daily/1764.csv.gz
-rwxrwxrwx   1      3.3 K 2025-08-01 21:30 wasbs://campus-data@madsstorage002.blob.core.windows.net/ghcnd/daily/1765.csv.gz
-rwxrwxrwx   1      3.3 K 2025-08-01 21:30 wasbs://campus-data@madsstorage002.blob.core.windows.net/ghcnd/daily/1766.csv.gz
-rwxrwxrwx   1      3.3 K 2025-08-01 21:30 wasbs://campus-data@madsstorage002.blob.core.windows.net/ghcnd/daily/1767.csv.gz
-rwxrwxrwx   1      3.2 K 2025-08-01 21:30 wasbs://campus-data@madsstorage002.blob.core.windows.net/ghcnd/daily/1768.csv.gz
-rwxrwxrwx   1      3.3 K 2025-

In [None]:
cell_time = time.time() 
result = get_ipython().getoutput(f"hdfs dfs -du -s {WASBS_DATA}") 

print("Raw result:", result)
print()
data_size_bytes = int(result[0].split()[0])
print("firstpass size (bytes):", data_size_bytes)
print("firstpass size (MB)   :", data_size_bytes / (1024**2), "MB")
 
lines = get_ipython().getoutput(f"hdfs dfs -ls {WASBS_DATA}")
print()
#other_size_bytes = 0
#for line in lines:
#    parts = line.split()
#    if len(parts) >= 6 and parts[0].startswith('-'):   # file, not directory
#        size = int(parts[2])                           # file size is parts[2] in your env
#        other_size_bytes += size
#print()
#print("_____________________________________________________") 
#print(f"[result] daily size (bytes): {daily_size_bytes:,d}")
#print(f"[result] daily size (MB)   : {daily_size_bytes / (1024**2):.2f}")
#print(f"[result] meta-data (bytes) : {other_size_bytes:,d}")
#print(f"[result] meta-data (MB)    : {other_size_bytes / (1024**2):.2f}")


cell_time = time.time() - cell_time
print(f"[time]   Cell time (sec)   : {cell_time:5.2f}") 
print(f"[time]   Cell time (min)   : {cell_time/60:5.2f}") 

Raw result: ['13993455698  13993455698  wasbs://campus-data@madsstorage002.blob.core.windows.net/ghcnd/daily']

Daily size (bytes): 13993455698
Daily size (MB)   : 13345.199296951294


_____________________________________________________
[result] daily size (bytes): 13,993,455,698
[result] daily size (MB)   : 13345.20
[result] meta-data (bytes) : 46,427,311
[result] meta-data (MB)    : 44.28
[time]   Cell time (sec)   :  3.76
[time]   Cell time (min)   :  0.06


In [None]:
# Q1(b)2 – capture directory listing
cell_time  = time.time() 

list_hdfs_csvgz_files(hdfs_path = WASBS_DATA, debug=False):
list_hdfs_csvgz_files(hdfs_path = WASBS_DATA, debug=True):


cell_time = time.time() - cell_time 
print(f"[time]   Cell time (sec)   : {cell_time:5.2f}") 
print(f"[time]   Cell time (min)   : {cell_time/60:5.2f}") 


_____________________________________________________
Sample parsed rows: [(1750, 1385743), (1763, 3358), (1764, 3327), (1765, 3335), (1766, 3344)]
[time]   Cell time (sec)   :  1.89
[time]   Cell time (min)   :  0.03


In [None]:
# Q1(b)4 - Directory Tree Visualisation
# Directory tree visualisation for data sources and parquet outputs

def visualise_directory_tree(root_path, max_depth=1):
    """
    Visualise a directory tree using Rich library
    """
    from rich.tree import Tree
    from rich.console import Console
    import subprocess
    import re
    
    console = Console()
    
    def build_tree(path, tree, depth=0):
        if depth >= max_depth:
            return
        
        try:
            # Use hdfs dfs -ls to list directory contents
            result = subprocess.run(['hdfs', 'dfs', '-ls', path], 
                                  capture_output=True, text=True, check=True)
            lines = result.stdout.strip().split('\n')
            
            # Skip the "Found N items" line if present
            if lines and lines[0].startswith('Found'):
                lines = lines[1:]
            
            for line in lines:
                if not line.strip():
                    continue
                    
                # Parse hdfs ls output: permissions user group size date time name
                parts = line.split()
                if len(parts) < 8:
                    continue
                    
                name = parts[-1]  # Last part is the full path
                permissions = parts[0]
                size = parts[4] if parts[4] != '-' else '0'
                
                # Extract just the filename/dirname from full path
                item_name = name.split('/')[-1]
                if not item_name:  # Handle trailing slashes
                    item_name = name.split('/')[-2]
                
                # Determine if it's a directory or file
                if permissions.startswith('d'):
                    # Directory
                    subtree = tree.add(f"{item_name}/")
                    if depth + 1 < max_depth:
                        build_tree(name, subtree, depth + 1)
                else:
                    # File - show size
                    size_mb = int(size) / (1024*1024) if size.isdigit() else 0
                    if size_mb > 1:
                        tree.add(f"{item_name} ({size_mb:.1f} MB)")
                    else:
                        tree.add(f"{item_name}")
                        
        except subprocess.CalledProcessError as e:
            tree.add(f"Error accessing {path}: {e}")
        except Exception as e:
            tree.add(f"Unexpected error: {e}")
    
    # Create and build the tree
    tree = Tree(f"{root_path}")
    build_tree(root_path, tree)
    console.print(tree)

# Visualise directory trees for data sources and parquet outputs
print("=" * 60)
print("DIRECTORY STRUCTURE VISUALISATION")
print("=" * 60)

print("\nDATA SOURCES (GHCN Dataset):")
print("-" * 40)
visualise_directory_tree(WASBS_USER, max_depth=2)

print("\nPARQUET OUTPUTS (Processed Data):")
print("-" * 40)
visualise_directory_tree(WASBS_USER, max_depth=2)

print("\n" + "=" * 60)
print("Directory tree visualisation complete")
print("=" * 60)

cell_time = time.time() - cell_time 
print(f"[time]   Cell time (sec)   : {cell_time:5.2f}") 
print(f"[time]   Cell time (min)   : {cell_time/60:5.2f}") 

DIRECTORY STRUCTURE VISUALISATION

DATA SOURCES (GHCN Dataset):
----------------------------------------



PARQUET OUTPUTS (Processed Data):
----------------------------------------



Directory tree visualisation complete
[time]   Cell time (sec)   : 1759700512.32
[time]   Cell time (min)   : 29328341.87


In [54]:
# Run this cell before closing the notebook or kill your spark application by hand using the link in the Spark UI
 
stop_spark()

25/10/06 12:51:26 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
