## Q1(a): Data structure and compression

**Assignment Question:**
How is the data structured? Are any of the datasets compressed?

## Q1(b): Years in daily and data size changes

**Assignment Question:**
How many years are contained in daily, and how does the size of the data change?

## Q1(c): Total data size

**Assignment Question:**
What is the total size of all of the data, and how much of that is daily?

## Q2(a): Define schema for daily

**Assignment Question:**
Define a schema for daily based on the description above or in the GHCN Daily README, using the types defined in pyspark.sql. What do you think is the best way to load the DATE and OBSERVATION TIME columns?

## Q2(b): Load daily with schema

**Assignment Question:**
Modify the spark.read.csv command to load a subset of the most recent year of daily into Spark so that it uses the schema that you defined in step (a). Did anything go wrong when you tried to use the schema? What data types did you end up using and why?

## Q2(d): Row counts in metadata tables

**Assignment Question:**
How many rows are there in each of the metadata tables?

## Q2(e): Row count in daily

**Assignment Question:**
How many rows are there in daily?

## Q4(a): Join daily and stations

**Assignment Question:**
LEFT JOIN a subset of daily and your stations table from Q3 step (e). How expensive do you think it would be to join all of daily and stations? Can you think of an efficient way to check if there are any stations in stations that are not in daily at all without using LEFT JOIN?

## Q4(b): Stations not in daily

**Assignment Question:**
Based on step (a) count the total number of stations in stations that are not in daily.

In [1]:
# Run this cell to import pyspark and to define start_spark() and stop_spark()

import findspark

findspark.init()

import getpass
import pandas
import pyspark
import random
import re

from IPython.display import display, HTML
from pyspark import SparkContext
from pyspark.sql import SparkSession


# Constants used to interact with Azure Blob Storage using the hdfs command or Spark

global username

username = re.sub('@.*', '', getpass.getuser())

global azure_account_name
global azure_data_container_name
global azure_user_container_name
global azure_user_token

azure_account_name = "madsstorage002"
azure_data_container_name = "campus-data"
azure_user_container_name = "campus-user"
azure_user_token = r"sp=racwdl&st=2025-08-01T09:41:33Z&se=2026-12-30T16:56:33Z&spr=https&sv=2024-11-04&sr=c&sig=GzR1hq7EJ0lRHj92oDO1MBNjkc602nrpfB5H8Cl7FFY%3D"


# Functions used below

def dict_to_html(d):
    """Convert a Python dictionary into a two column table for display.
    """

    html = []

    html.append(f'<table width="100%" style="width:100%; font-family: monospace;">')
    for k, v in d.items():
        html.append(f'<tr><td style="text-align:left;">{k}</td><td>{v}</td></tr>')
    html.append(f'</table>')

    return ''.join(html)


def show_as_html(df, n=20):
    """Leverage existing pandas jupyter integration to show a spark dataframe as html.
    
    Args:
        n (int): number of rows to show (default: 20)
    """

    display(df.limit(n).toPandas())

    
def display_spark():
    """Display the status of the active Spark session if one is currently running.
    """
    
    if 'spark' in globals() and 'sc' in globals():

        name = sc.getConf().get("spark.app.name")

        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:green">active</span></b>, look for <code>{name}</code> under the running applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://localhost:{sc.uiWebUrl.split(":")[-1]}" target="_blank">Spark Application UI</a></li>',
            f'</ul>',
            f'<p><b>Config</b></p>',
            dict_to_html(dict(sc.getConf().getAll())),
            f'<p><b>Notes</b></p>',
            f'<ul>',
            f'<li>The spark session <code>spark</code> and spark context <code>sc</code> global variables have been defined by <code>start_spark()</code>.</li>',
            f'<li>Please run <code>stop_spark()</code> before closing the notebook or restarting the kernel or kill <code>{name}</code> by hand using the link in the Spark UI.</li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))
        
    else:
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:red">stopped</span></b>, confirm that <code>{username} (notebook)</code> is under the completed applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))


# Functions to start and stop spark

def start_spark(executor_instances=2, executor_cores=1, worker_memory=1, master_memory=1):
    """Start a new Spark session and define globals for SparkSession (spark) and SparkContext (sc).
    
    Args:
        executor_instances (int): number of executors (default: 2)
        executor_cores (int): number of cores per executor (default: 1)
        worker_memory (float): worker memory (default: 1)
        master_memory (float): master memory (default: 1)
    """

    global spark
    global sc

    cores = executor_instances * executor_cores
    partitions = cores * 4
    port = 4000 + random.randint(1, 999)

    spark = (
        SparkSession.builder
        .config("spark.driver.extraJavaOptions", f"-Dderby.system.home=/tmp/{username}/spark/")
        .config("spark.dynamicAllocation.enabled", "false")
        .config("spark.executor.instances", str(executor_instances))
        .config("spark.executor.cores", str(executor_cores))
        .config("spark.cores.max", str(cores))
        .config("spark.driver.memory", f'{master_memory}g')
        .config("spark.executor.memory", f'{worker_memory}g')
        .config("spark.driver.maxResultSize", "0")
        .config("spark.sql.shuffle.partitions", str(partitions))
        .config("spark.kubernetes.container.image", "madsregistry001.azurecr.io/hadoop-spark:v3.3.5-openjdk-8")
        .config("spark.kubernetes.container.image.pullPolicy", "IfNotPresent")
        .config("spark.kubernetes.memoryOverheadFactor", "0.3")
        .config("spark.memory.fraction", "0.1")
        .config(f"fs.azure.sas.{azure_user_container_name}.{azure_account_name}.blob.core.windows.net",  azure_user_token)
        .config("spark.app.name", f"{username} (notebook)")
        .getOrCreate()
    )
    sc = SparkContext.getOrCreate()
    
    display_spark()

    
def stop_spark():
    """Stop the active Spark session and delete globals for SparkSession (spark) and SparkContext (sc).
    """

    global spark
    global sc

    if 'spark' in globals() and 'sc' in globals():

        spark.stop()

        del spark
        del sc

    display_spark()


# Make css changes to improve spark output readability

html = [
    '<style>',
    'pre { white-space: pre !important; }',
    'table.dataframe td { white-space: nowrap !important; }',
    'table.dataframe thead th:first-child, table.dataframe tbody th { display: none; }',
    '</style>',
]
display(HTML(''.join(html)))

### Spark notebook ###

This notebook will only work in a Jupyter notebook or Jupyter lab session running on the cluster master node in the cloud.

Follow the instructions on the computing resources page to start a cluster and open this notebook.

**Steps**

1. Connect to the Windows server using Windows App.
2. Connect to Kubernetes.
3. Start Jupyter and open this notebook from Jupyter in order to connect to Spark.

In [2]:
# Run this cell to start a spark session in this notebook

start_spark(executor_instances=4, executor_cores=2, worker_memory=4, master_memory=4)

25/10/01 19:22:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


0,1
spark.dynamicAllocation.enabled,false
spark.fs.azure.sas.campus-user.madsstorage002.blob.core.windows.net,"""sp=racwdl&st=2025-08-01T09:41:33Z&se=2026-12-30T16:56:33Z&spr=https&sv=2024-11-04&sr=c&sig=GzR1hq7EJ0lRHj92oDO1MBNjkc602nrpfB5H8Cl7FFY%3D"""
spark.app.submitTime,1759299776697
spark.kubernetes.driver.pod.name,spark-master-driver
spark.executor.instances,4
spark.driver.memory,4g
spark.kubernetes.namespace,dew59
spark.kubernetes.container.image.pullPolicy,IfNotPresent
spark.sql.shuffle.partitions,32
spark.app.id,spark-5fbaae6ca4304df78d25254a2781f4db


In [3]:
# Write your imports here or insert cells below

from IPython.display     import display  # calls between environments
from math                import acos, atan2, cos, radians, sin, sqrt
from matplotlib.ticker   import FuncFormatter, MaxNLocator
from pathlib             import Path
from pyspark.sql         import DataFrame
from pyspark.sql         import DataFrame as SparkDF
from pyspark.sql         import functions as F, types as T
from pyspark.sql.types   import *
from pyspark.sql.utils   import AnalysisException
from pyspark.sql.window  import Window
from typing              import List, Optional, Tuple
from rich.tree           import Tree
from rich.console        import Console
import itertools         as it
import matplotlib.dates  as mdates
import matplotlib.pyplot as plt
import numpy             as np
import pandas            as pd
import warnings

warnings.filterwarnings("ignore", category=UserWarning)
onsole = Console()

import math, os, platform, re
import subprocess, sys, time


In [4]:
print("_" * 35 + "HELPER / DIAGNOSTIC FUNCTIONS" + "_" * 35)

notebook_run_time = time.time()

def df_as_html(df, n: int = 5, right_align: bool = False, show_index: bool = False):
    """
    HTML preview via pandas with no truncation. If right_align=True,
    only numeric columns are right-justified; everything else is 
    explicitly left-aligned.
    """
    
    pdf = df.limit(n).toPandas()
    print("[INFO] Converting Spark → pandas for HTML display (rows:", len(pdf), ")")
    print("[INFO] right_align (numeric columns):", right_align)

    with pd.option_context(
        "display.max_colwidth", None,   
        "display.max_columns", None,    
        "display.width", None            
    ):
        styler = pdf.style if show_index else pdf.style.hide(axis="index")

        #   table alignment: left for both headers and cells
        styler = styler.set_table_styles(
            [
                {"selector": "th", "props": [("text-align", "left")]},
                {"selector": "td", "props": [("text-align", "left")]},
            ],
            overwrite=True,  # make this the baseline
        )
         
        if right_align:
            numeric_cols = list(pdf.select_dtypes(include=["number"]).columns)
            print("[INFO] Right-aligning numeric columns:", numeric_cols)
            if numeric_cols:
                styler = styler.set_properties(subset=numeric_cols,
                                               **{"text-align": "right"})
        display(styler)

def _normalise_dir(s: str) -> str:
    """
    Ensure trailing slash so we point to
    the dataset directory (not a file)
    """
    return s if s.endswith("/") else s + "/"

def ensure_dir(path: str) -> str:
    """
    ensures that path is a path 
    and not representing a file;
    add trailing slash if needed
    """
    if path is None:
        raise ValueError("Path is None")
    path = _normalise_dir(path)
#   print("ensure_dir -> ",path)
    return path
    
def show_df(df, n: int = 10, name: str = "", right_align: bool = False):
    """
    Print schema, 
    show an HTML sample,
    and row count.
    """
    bprint()
    print("name : ",name)
    df.printSchema()
    print("[check] sample:")
    df_as_html(df, n=n, right_align=right_align)
  
def write_parquet(df, dir_as_path: str, df_name:str = ""):    
    funct_time = time.time()
    path = _normalise_dir(dir_as_path)
    print(f"[file] write_parquet  : {path}")
    try:      
        show_df(df,df_name)
    except Exception as e:
        print("[cathch] sample failed:", e)
        os.system(f'hdfs dfs -rm -r -f "{path}"')   # idempotent cleanup
    df.write.mode("overwrite").format("parquet").save(path)
    os.system(f'hdfs dfs -ls -R "{path}"')
    funct_time = time.time() - funct_time 
    print(f"[time] write_parquet (min)   : {funct_time/60:5.2f}")
    print(f"[time] write_parquet (sec)   : {funct_time:5.2f}")
  
def has_parquet(dir_as_path: str) -> bool:
    path   = _normalise_dir( dir_as_path)
    marker = path + '_SUCCESS'
    #print("\n[check] dir_path:", dir_path)
    #print("\n[check] path    :", path)
    print("\n[check] marker  :", marker)
    rc = os.system(f'hdfs dfs -test -e "{marker}"')
    print("[check] rc:", rc, "->", ("exists" if rc == 0 else "missing"))
    return (rc == 0)
    
def _to_spark(df_like, schema=None):
    """
    Return a Spark DataFrame  .
    """
    if isinstance(df_like, SparkDF):
        return df_like
    return spark.createDataFrame(df_like, schema=schema) if schema else spark.createDataFrame(df_like)

def _success_exists(target_dir: str) -> bool:
    """
    Check for the Hadoop/Spark _SUCCESS marker;  
    """
    jvm = spark._jvm
    hconf = spark._jsc.hadoopConfiguration()
    try:
        uri = jvm.java.net.URI(target_dir)
        fs = jvm.org.apache.hadoop.fs.FileSystem.get(uri, hconf)
        success = jvm.org.apache.hadoop.fs.Path(target_dir + "_SUCCESS")
        exists = fs.exists(success)
        print(f"[status] _SUCCESS check at: {target_dir}_SUCCESS -> {exists}")
        return bool(exists)
    except Exception as e:
        print(f"[status] _SUCCESS check failed ({e}); attempting read-probe …")
        try:
            spark.read.parquet(target_dir).limit(1).count()
            print(f"[dewstatus59] read-probe succeeded at: {target_dir}")
            return True
        except Exception as e2:
            print(f"[status] read-probe failed ({e2}); treating as not existing.")
            return False
 

def normalise_ids(df: DataFrame, col: str = "ID") -> DataFrame:
    """
    # Single source of truth for ID normalisation 
    Upper + trim + distinct on the given ID column.
    """
    print(f"[INFO] normalise_ids() on column: {col}")
    df.printSchema()
    df.show(20)
    return df.select(F.upper(F.trim(F.col(col))).alias("ID")).distinct()
    df.printSchema()
    df.show(20)
# hack 
_ids       = normalise_ids
canon_ids  = normalise_ids
_canon_ids = normalise_ids
 
def probe_universe(daily_df, stations_df, inv_agg_df, tag=""):
    """
    DIAGNOSTIC
    """
    # quick previews
    daily_df.show(20)
    stations_df.show(20)
    inv_agg_df.show(20)

    print(tag)
    daily_df.printSchema()
    stations_df.printSchema()
    inv_agg_df.printSchema()
    print(tag)

    print("\n" + "_"*70)
    print(f"[PROBE] Station universe check :: {tag}")

    # id universes
    daily_ids   = _ids(daily_df)
    station_ids = _ids(stations_df)
    inv_ids     = _ids(inv_agg_df)

    # counts
    print("[COUNT] daily IDs         :", daily_ids.count())
    print("[COUNT] station IDs (cat) :", station_ids.count())
    print("[COUNT] inventory IDs     :", inv_ids.count())

    # set differences
    print("[DIFF ] daily - station   :", daily_ids.join(station_ids, "ID", "left_anti").count())
    print("[DIFF ] station - daily   :", station_ids.join(daily_ids, "ID", "left_anti").count())
    print("[DIFF ] station - inv     :", station_ids.join(inv_ids,    "ID", "left_anti").count())
    print("[DIFF ] inv - daily       :", inv_ids.join(daily_ids,      "ID", "left_anti").count())
    print("[DIFF ] inv - station     :", inv_ids.join(station_ids,    "ID", "left_anti").count())

    bprint("[done] probe_universe")


def _count_unique_ids(df: DataFrame) -> int:
    return normalise_ids(df).count()

def pick_unfiltered_daily(preferred_path: str = None) -> DataFrame:
    """Return an unfiltered daily DF (~129k unique station IDs)."""
    cand_names = ["daily", "read_daily", "daily_df", "daily_all", "ghcnd_daily"]
    print("[INFO] Candidate DataFrames:", [n for n in cand_names if n in globals()])
    for name in cand_names:
        obj = globals().get(name)
        if isinstance(obj, DataFrame):
            try:
                n = normalise_ids(obj).count()
                print(f"[CHECK] {name} unique station IDs:", n)
                if n >= 120_000:
                    print(f"[INFO] Using {name} as the unfiltered daily.")
                    return obj
            except Exception as e:
                print(f"[WARN] Could not inspect {name}:", repr(e))
    if preferred_path:
        print(f"[INFO] Trying preferred_path: {preferred_path}")
        df = spark.read.parquet(str(preferred_path))
        n = normalise_ids(df).count()
        print("[CHECK] preferred_path unique station IDs:", n)
        if n >= 120_000:
            print("[INFO] Using preferred_path as the unfiltered daily.")
            return df
    for var in ["DAILY_READ_NAME","DAILY_WRITE_NAME","daily_read_name","daily_write_name","DAILY_NAME"]:
        if var in globals():
            path = globals()[var]
            try:
                print(f"[INFO] Trying {var} = {path}")
                df = spark.read.parquet(str(path))
                n = normalise_ids(df).count()
                print(f"[CHECK] {var} unique station IDs:", n)
                if n >= 120_000:
                    print(f"[INFO] Using {var} as the unfiltered daily.")
                    return df
            except Exception as e:
                print(f"[WARN] Could not read {var}:", repr(e))
    raise SystemExit("[FATAL] Could not find an unfiltered daily dataset (expected ~129k unique station IDs).")

def bprint(text: str="", l=50):
    n = len(text)
    n = abs(n - l)//2
    
    print("\n" + "_" * n + text + "_" * n)

# Back-compat aliases hack to account for non-disciplined naming un-convention
_ids       = normalise_ids
canon_ids  = normalise_ids
_canon_ids = normalise_ids

#print("[TEST] Using _canon_ids:", _canon_ids(stations).count())
#print("[TEST] Using canon_ids :", canon_ids(stations).count())
#print("[TEST] Using _ids      :", _ids(stations).count())


# : pairwise city distances in km using Spark built-ins 
def pairwise_city_distances_spark(cities, radius_km=6371.0):
    """
    cities: list[tuple[str, float, float]] -> [(name, lat_deg, lon_deg), ...]
    returns: Spark DataFrame with columns:
             city_a, city_b, haversine_km, slc_km, delta_km, delta_pct
    """
  #  from pyspark.sql import SparkSession, functions as F, types as T

    spark = SparkSession.getActiveSession()
    if spark is None:
        raise RuntimeError("No active Spark session.")

    schema = T.StructType([
        T.StructField("city", T.StringType(), False),
        T.StructField("lat",  T.DoubleType(), False),
        T.StructField("lon",  T.DoubleType(), False),
        ])
    df = spark.createDataFrame(cities, schema)

    a, b = df.alias("a"), df.alias("b")
    pairs = (a.join(b, F.col("a.city") < F.col("b.city"))
               .select(F.col("a.city").alias("city_a"),
                       F.col("b.city").alias("city_b"),
                       F.col("a.lat").alias("lat1"),
                       F.col("a.lon").alias("lon1"),
                       F.col("b.lat").alias("lat2"),
                       F.col("b.lon").alias("lon2")))

    R = F.lit(float(radius_km))
    lat1 = F.radians(F.col("lat1"));  lat2 = F.radians(F.col("lat2"))
    dlat = lat2 - lat1
    dlon = F.radians(F.col("lon2") - F.col("lon1"))

    a_term = F.sin(dlat/2)**2 + F.cos(lat1)*F.cos(lat2)*F.sin(dlon/2)**2
    c_term = 2*F.atan2(F.sqrt(a_term), F.sqrt(1 - a_term))
    hav_km = R * c_term

    cos_val = F.sin(lat1)*F.sin(lat2) + F.cos(lat1)*F.cos(lat2)*F.cos(dlon)
    cos_val = F.greatest(F.lit(-1.0), F.least(F.lit(1.0), cos_val))
    slc_km = R * F.acos(cos_val)

    delta_km  = F.abs(hav_km - slc_km)
    delta_pct = F.when(hav_km == 0, F.lit(0.0)).otherwise(delta_km / hav_km * 100.0)

    out_df = (pairs
              .withColumn("haversine_km", F.round(hav_km, 2))
              .withColumn("slc_km",       F.round(slc_km, 2))
              .withColumn("delta_km",     F.round(delta_km, 4))
              .withColumn("delta_pct",    F.round(delta_pct, 6))
              .select("city_a", "city_b", "haversine_km", "slc_km", "delta_km", "delta_pct")
              .orderBy("haversine_km"))
    return out_df


# --- Timing helpers for Spark & pure Python (no extra deps)
 

def benchmark_python_distances(cities, radius_km=6371.0, repeats=50000):
    """
    benchmark:
    cities: [(name, lat_deg, lon_deg), ...]  (3 cities => 3 pairs)
    repeats: loop count to make timings stable
    returns: dict with seconds for haversine/slc
    """
    pairs = []
    for i in range(len(cities)):
        for j in range(i+1, len(cities)):
            (_, lat1, lon1), (_, lat2, lon2) = cities[i], cities[j]
            pairs.append((lat1, lon1, lat2, lon2))

    # haversine
    t0 = perf_counter()
    for _ in range(repeats):
        for lat1, lon1, lat2, lon2 in pairs:
            φ1, λ1, φ2, λ2 = map(radians, (lat1, lon1, lat2, lon2))
            dφ, dλ = (φ2 - φ1), (λ2 - λ1)
            a = sin(dφ/2)**2 + cos(φ1)*cos(φ2)*sin(dλ/2)**2
            c = 2*atan2(sqrt(a), sqrt(1 - a))
            _ = radius_km * c
    t1 = perf_counter()

    # spherical law of cosines (SLC)
    t2 = perf_counter()
    for _ in range(repeats):
        for lat1, lon1, lat2, lon2 in pairs:
            φ1, λ1, φ2, λ2 = map(radians, (lat1, lon1, lat2, lon2))
            cosv = sin(φ1)*sin(φ2) + cos(φ1)*cos(φ2)*cos(λ2 - λ1)
            cosv = max(-1.0, min(1.0, cosv))
            _ = radius_km * acos(cosv)
    t3 = perf_counter()

    return {
        "python_haversine_sec": t1 - t0,
        "python_slc_sec":       t3 - t2,
        "repeats": repeats,
        "pairs": len(pairs),
    }


def _parse_ls_bytes(line): 
    parts = line.split()
    if len(parts) < 8:
        return None, None
    try:
        size = int(parts[4])
    except ValueError:
        return None, None
    return size, parts[-1]

def _parse_du_bytes(line):
    parts = line.split()
    if len(parts) < 2:
        return None, None
    try:
        size = int(parts[0])
    except ValueError:
        return None, None
    return size, parts[-1]

def du_bytes(path):
    lines = get_ipython().getoutput(f'hdfs dfs -du "{path}"')
    total = 0
    for ln in lines:
        parts = ln.split()
        if len(parts) >= 2:
            try:
                total += int(parts[0])
            except ValueError:
                pass
    return total
    
def benchmark_spark_distances(cities, radius_km=6368.6, repeats=3):
    """
    Uses Spark built-ins only. Measures full execution
    time by forcing an action.
    
    returns: dict with seconds for haversine/slc and
    row counts used.
    
    For the radius:
    
    The Earth is slightly flattened, so the geocentric 
    radius depends on latitude.  For context: 
    
    * equatorial radius = 6,378.137 km; 
    * polar radius      = 6,356.752 km 
    
    Across New Zealand’s latitudes (≈36–47°S), using the
    WGS-84 ellipsoid, you get roughly:

    Auckland (37°S):       ~6,370.4 km
    Christchurch (43.5°S): ~6,368.0 km
    Dunedin (45.9°S):      ~6,367.2 km
    __________________________________
    Wellington (41°S):     ~6,369.0 km
    mean                  ≈ 6,368.6 km
    """

    
    try:
        from pyspark.sql import SparkSession, functions as F, types as T
    except Exception:
        return None  # no Spark therefore save cannot run in vs code

    spark = SparkSession.getActiveSession()
    if spark is None:
        return None

    # build pairs once and cache
    schema = T.StructType([
        T.StructField("city", T.StringType(), False),
        T.StructField("lat",  T.DoubleType(), False),
        T.StructField("lon",  T.DoubleType(), False),
    ])
    df = spark.createDataFrame(cities, schema)
    a, b = df.alias("a"), df.alias("b")
    pairs = (a.join(b, F.col("a.city") < F.col("b.city"))
               .select(F.col("a.lat").alias("lat1"),
                       F.col("a.lon").alias("lon1"),
                       F.col("b.lat").alias("lat2"),
                       F.col("b.lon").alias("lon2"))
               .cache())
    _ = pairs.count()

    R = F.lit(float(radius_km))
    lat1 = F.radians(F.col("lat1")); lat2 = F.radians(F.col("lat2"))
    dlat = lat2 - lat1
    dlon = F.radians(F.col("lon2") - F.col("lon1"))

    # Haversine expr
    a_term = F.sin(dlat/2)**2 + F.cos(lat1)*F.cos(lat2)*F.sin(dlon/2)**2
    c_term = 2*F.atan2(F.sqrt(a_term), F.sqrt(1 - a_term))
    hav    = R * c_term

    # SLC expr
    cosv = F.sin(lat1)*F.sin(lat2) + F.cos(lat1)*F.cos(lat2)*F.cos(dlon)
    cosv = F.greatest(F.lit(-1.0), F.least(F.lit(1.0), cosv))
    slc = R * F.acos(cosv)

    # time Haversine
    t0 = perf_counter()
    for _ in range(repeats):
        _ = pairs.select(hav.alias("d")).agg(F.sum("d")).collect()
    t1 = perf_counter()

    # time SLC
    t2 = perf_counter()
    for _ in range(repeats):
        _ = pairs.select(slc.alias("d")).agg(F.sum("d")).collect()
    t3 = perf_counter()

    return {
        "spark_pairs": pairs.count(),
        "spark_repeats": repeats,
        "spark_haversine_sec": t1 - t0,
        "spark_slc_sec":       t3 - t2,
    }

def visualise_directory_tree(root_path, max_depth=1):
    """
    Visualise directory tree structure using Rich library for enhanced display.
    Falls back to simple print if Rich is not available.
    Shows immediate children with * for subdirectories.
    """
    try:
        if 'Tree' not in globals() or Tree is None or 'console' not in globals() or console is None:
            print(f"[INFO] Rich library not available. Directory root: {root_path}")
            return
    except NameError:
        print(f"[INFO] Rich library not available. Directory root: {root_path}")
        return
    
    def build_tree(path, tree, depth=0):
        if depth > max_depth:
            return
        try:
            # For remote paths, use hdfs dfs -ls
            if path.startswith("wasbs://"):
                cmd = f"hdfs dfs -ls {path}"
                result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
                if result.returncode == 0:
                    lines = result.stdout.strip().split('\n')
                    for line in lines:
                        if line:
                            parts = line.split()
                            if len(parts) >= 8:
                                item_type = parts[0][0]  # d for directory, - for file
                                full_name = ' '.join(parts[7:])
                                # Extract relative name from full path
                                if full_name.startswith(path):
                                    name = full_name[len(path):].lstrip('/')
                                else:
                                    name = full_name
                                if item_type == 'd':
                                    tree.add(f"📁 {name}/*")
                                else:
                                    tree.add(f"📄 {name}")
                else:
                    tree.add(f"[ERROR] Could not list {path}")
            else:
                # For local paths
                for item in os.listdir(path):
                    item_path = os.path.join(path, item)
                    if os.path.isdir(item_path):
                        tree.add(f"📁 {item}/*")
                    else:
                        tree.add(f"📄 {item}")
        except Exception as e:
            tree.add(f"[ERROR] {str(e)}")
    
    tree = Tree(f"📁 {root_path}")
    build_tree(root_path, tree)
    console.print(tree)


___________________________________HELPER / DIAGNOSTIC FUNCTIONS___________________________________


In [5]:
bprint("SECTION 1: ENVIRONMENT SETUP")
# supports: Assignment Setup — "Configure global variables and paths required for GHCN data processing"
# does: initializes runtime tracking, configures Azure storage paths, defines data locations,
#       establishes file paths, and sets debug flags for conditional processing

notebook_run_time = time.time()
val               = spark.range(1).select(F.date_format(F.current_timestamp(), 'yyyy.MM.dd HH:mm').alias('t')).first()['t']
bprint()
print(f"[time] current time           :  {val}")
bprint()


bprint("ENVIRONMENT") 
print("Spark       :", spark.version)
print("Python tuple:", sys.version_info[:3]) 
print("username    :", username)
print()

bprint("DEEBUG BOOLEANS")
#FORCE_OVERWRITE = False  # False means that if the file exists then we wont re-write it 
#FORCE_OVERWRITE = True   # True means overwrite all resultant files
FORCE_REBUILD_ENRICHED  = True   #has_parquet(enriched_write_name)
FORCE_REBUILD_INV_AGG = True    # has_parquet(inv_agg_write_name)

FORCE_REBUILD_STATIONS  = True    #has_parquet(stations_write_name)
FORCE_REBUILD_INVENTORY = True    # has_parquet(inventory_write_name)
FORCE_REBUILD_STATES    = True    #has_parquet(states_write_name)
FORCE_REBUILD_COUNTRIES = True    #has_parquet(countries_write_name)
FORCE_REBUILD_INV_AGG = True    # has_parquet(inv_agg_write_name)

FORCE_REBUILD_OVERLAP   = True    #has_parquet(overlap_write_name)
FORCE_REBUILD_PRECIP    = True    #has_parquet(precip_write_path)

print(f"[status] FORCE_REBUILD_ENRICHED  :", FORCE_REBUILD_ENRICHED)
print(f"[status] FORCE_REBUILD_INV_AGG   :", FORCE_REBUILD_INV_AGG)
print(f"[status] FORCE_REBUILD_STATIONS  :", FORCE_REBUILD_STATIONS)
print(f"[status] FORCE_REBUILD_INVENTORY :", FORCE_REBUILD_INVENTORY)
print(f"[status] FORCE_REBUILD_STATES    :", FORCE_REBUILD_STATES)
print(f"[status] FORCE_REBUILD_COUNTRIES :", FORCE_REBUILD_COUNTRIES)

print(f"[status] FORCE_REBUILD_OVERLAP   :", FORCE_REBUILD_OVERLAP)
print(f"[status] FORCE_REBUILD_PRECIP    :", FORCE_REBUILD_PRECIP)

bprint("SOURCE FOLDERS")
print()

azure_account_name        = "madsstorage002"
azure_data_container_name = "campus-data"
azure_user_container_name = "campus-user"
previous_year             = 2024  # full hear
most_recent_year          = 2025  # currently building


print("azure_account_name        :", azure_account_name)
print("azure_data_container_name :", azure_data_container_name)
print("azure_user_container_name :", azure_user_container_name)
print("previous_year             :", previous_year)
print("most_recent_year          :", most_recent_year)
print()

data_root      = f"wasbs://{azure_data_container_name}@{azure_account_name}.blob.core.windows.net/ghcnd/"
user_root      = f"wasbs://{azure_user_container_name}@{azure_account_name}.blob.core.windows.net/{username}/"
 
data_root      = ensure_dir(data_root)
user_root      = ensure_dir(user_root) 

print("data_root           :", data_root) 
print("user_root           :", user_root)
bprint()

daily_root     = ensure_dir(f"{data_root}daily/")

print("daily_root          :", daily_root)
print()

aux_root = "../auxiliary/"
aux_root = ensure_dir(aux_root)

reports_dir  = ensure_dir(f"{aux_root}reports/")
images_dir   = ensure_dir(f"{aux_root}images/")
figures_dir  = ensure_dir(f"{aux_root}figures/") 


print("aux_root    :", aux_root)
print("reports_dir :", reports_dir)
print("images_dir  :", images_dir)
print("figures_dir :", figures_dir)
print()



bprint("SOURCE FILES")
stations_read_name   = f'{data_root}ghcnd-stations.txt'
inventory_read_name  = f'{data_root}ghcnd-inventory.txt'
countries_read_name  = f'{data_root}ghcnd-countries.txt'
states_read_name     = f'{data_root}ghcnd-states.txt'


print("stations_read_name  :", stations_read_name)
print("inventory_read_name :", inventory_read_name)
print("countries_read_name :", countries_read_name)
print("states_read_name    :", states_read_name)
print()

previous_csvgz_path  = f'{daily_root}2024.csv.gz' 
current_csvgz_path   = f'{daily_root}2025.csv.gz' 


print("previous_csvgz_path  :", previous_csvgz_path)
print("current_csvgz_path   :", current_csvgz_path)
print()
bprint("USER FOLDERS")
  
stations_write_name  =  ensure_dir(f'{user_root}stations.parquet')      #parquest file referenced by folder
inventory_write_name =  ensure_dir(f'{user_root}inventory.parquet')
countries_write_name =  ensure_dir(f'{user_root}countries.parquet')
states_write_name    =  ensure_dir(f'{user_root}states.parquet') 

inv_agg_write_name   = ensure_dir(f'{user_root}inv_agg.parquet')
enriched_write_name  = ensure_dir(f'{user_root}enriched_write_name.parquet')

print()
print("stations_write_name  :", stations_write_name)
print("inventory_write_name :", inventory_write_name)
print("countries_write_name :", countries_write_name)
print("states_write_name    :", states_write_name)
print("inv_agg_write_name   :", inv_agg_write_name)
print()
print("enriched_write_name :", enriched_write_name)
print("enriched_write_name :", enriched_write_name)

#overlap_write_pathh  = ensure_dir(f'{user_root}q1b32_overlap_counts.parquet')
#precip_write_path    = ensure_dir(f'{user_root}q2a-agg-precipitation.parquet')
 
print("inv_agg_write_name  :", inv_agg_write_name)
station_date_element = ensure_dir(f"{user_root}q2a_station_date_element.parquet")
overlap_counts_name  = ensure_dir(f'{user_root}q1b32_overlap_counts.parquet')
overlap_write_name   = ensure_dir(f'{user_root}q1b32_overlap_counts.parquet')
precip_write_name    = ensure_dir(f'{user_root}q2a-agg-precipitation.parquet') 
print()

inv_agg_write_name   = ensure_dir(f'{user_root}inv_agg.parquet') 

print()
print("stations_write_name :", stations_write_name)
print("overlap_counts_name :", overlap_counts_name)
print("overlap_write_name  :", overlap_write_name)
print("precip_write_name   :", precip_write_name) 



___________SECTION 1: ENVIRONMENT SETUP___________


[Stage 0:>                                                          (0 + 8) / 8]


__________________________________________________
[time] current time           :  2025.10.01 19:23

__________________________________________________

___________________ENVIRONMENT___________________
Spark       : 3.5.1
Python tuple: (3, 8, 10)
username    : dew59


_________________DEEBUG BOOLEANS_________________
[status] FORCE_REBUILD_ENRICHED  : True
[status] FORCE_REBUILD_INV_AGG   : True
[status] FORCE_REBUILD_STATIONS  : True
[status] FORCE_REBUILD_INVENTORY : True
[status] FORCE_REBUILD_STATES    : True
[status] FORCE_REBUILD_COUNTRIES : True
[status] FORCE_REBUILD_OVERLAP   : True
[status] FORCE_REBUILD_PRECIP    : True

__________________SOURCE FOLDERS__________________

azure_account_name        : madsstorage002
azure_data_container_name : campus-data
azure_user_container_name : campus-user
previous_year             : 2024
most_recent_year          : 2025

data_root           : wasbs://campus-data@madsstorage002.blob.core.windows.net/ghcnd/
user_root           : wasbs:/

                                                                                

In [6]:

# Visualise directory trees for data and user roots
visualise_directory_tree(data_root)
visualise_directory_tree(user_root) 

[INFO] Rich library not available. Directory root: wasbs://campus-data@madsstorage002.blob.core.windows.net/ghcnd/
[INFO] Rich library not available. Directory root: wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/


In [7]:
bprint("SECTION 1.1: DATA PREPARATION")
# does: builds the has_parquet() variables) 
 
has_stations  = has_parquet(stations_write_name)
has_inventory = has_parquet(inventory_write_name)
has_states    = has_parquet(states_write_name)
has_countries = has_parquet(countries_write_name)

has_inv_agg   = has_parquet(inv_agg_write_name)
has_enriched  = has_parquet(enriched_write_name)


__________SECTION 1.1: DATA PREPARATION__________

[check] marker  : wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/stations.parquet/_SUCCESS
[check] rc: 0 -> exists

[check] marker  : wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/inventory.parquet/_SUCCESS
[check] rc: 0 -> exists

[check] marker  : wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/states.parquet/_SUCCESS
[check] rc: 0 -> exists

[check] marker  : wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/countries.parquet/_SUCCESS
[check] rc: 0 -> exists

[check] marker  : wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/inv_agg.parquet/_SUCCESS
[check] rc: 0 -> exists

[check] marker  : wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/enriched_write_name.parquet/_SUCCESS
[check] rc: 0 -> exists


In [8]:
bprint("SECTION 5: UNIFIED PARQUET BUILD & LOAD")
# One cell to rule them all: build parquet files if needed, then load clean DataFrames

cell_time = time.time()

# ===== COUNTRIES =====
if FORCE_REBUILD_COUNTRIES or not has_countries:
    print("[rebuild] Countries - reading source and rebuilding...")
    read_countries = spark.read.text(countries_read_name)
    countries_temp = (
        read_countries.select(
            F.substring("value", 1, 2).alias("CODE"),
            F.trim(F.substring("value", 4, 61)).alias("COUNTRY_NAME")
        )
    )
    write_parquet(countries_temp, countries_write_name, "countries")
else:
    print("[skip] Countries - parquet exists and FORCE_REBUILD_COUNTRIES=False")

# Always read from parquet (the "clean" version)
countries = spark.read.parquet(countries_write_name)
print(f"[loaded] countries from parquet: {countries.count():,} rows")

# ===== STATES =====
if FORCE_REBUILD_STATES or not has_states:
    print("[rebuild] States - reading source and rebuilding...")
    read_states = spark.read.text(states_read_name)
    states_temp = (
        read_states.select(
            F.substring("value", 1, 2).alias("CODE"),
            F.trim(F.substring("value", 4, 47)).alias("STATE_NAME")
        )
    )
    write_parquet(states_temp, states_write_name, "states")
else:
    print("[skip] States - parquet exists and FORCE_REBUILD_STATES=False")

# Always read from parquet
states = spark.read.parquet(states_write_name)
print(f"[loaded] states from parquet: {states.count():,} rows")

# ===== STATIONS =====
if FORCE_REBUILD_STATIONS or not has_stations:
    print("[rebuild] Stations - reading source and rebuilding...")
    read_stations = spark.read.text(stations_read_name)
    stations_temp = (
        read_stations.select(
            F.trim(F.substring("value",  1, 11)).alias("ID"),
            F.trim(F.substring("value", 13,  8)).cast("double").alias("LATITUDE"),
            F.trim(F.substring("value", 22,  9)).cast("double").alias("LONGITUDE"),
            F.trim(F.substring("value", 32,  6)).cast("double").alias("ELEVATION"),
            F.trim(F.substring("value", 39,  2)).alias("STATE"),
            F.trim(F.substring("value", 42, 30)).alias("NAME"),
            F.trim(F.substring("value", 73,  3)).alias("GSN_FLAG"),
            F.trim(F.substring("value", 77,  3)).alias("HCNCRN_FLAG"),
            F.trim(F.substring("value", 81,  5)).alias("WMO_ID")
        )
    )
    write_parquet(stations_temp, stations_write_name, "stations")
else:
    print("[skip] Stations - parquet exists and FORCE_REBUILD_STATIONS=False")

# Always read from parquet
stations = spark.read.parquet(stations_write_name)
print(f"[loaded] stations from parquet: {stations.count():,} rows")

# ===== INVENTORY =====
if FORCE_REBUILD_INVENTORY or not has_inventory:
    print("[rebuild] Inventory - reading source and rebuilding...")
    read_inventory = spark.read.text(inventory_read_name)
    inventory_temp = (
        read_inventory.select(
            F.substring("value",  1, 11).alias("ID"),
            F.trim(F.substring("value", 13,  8)).cast("double").alias("LATITUDE"),
            F.trim(F.substring("value", 22,  9)).cast("double").alias("LONGITUDE"),
            F.substring("value", 32,  4).alias("ELEMENT"),
            F.substring("value", 37,  4).cast("int").alias("FIRSTYEAR"),
            F.substring("value", 42,  4).cast("int").alias("LASTYEAR")
        )
    )
    write_parquet(inventory_temp, inventory_write_name, "inventory")
else:
    print("[skip] Inventory - parquet exists and FORCE_REBUILD_INVENTORY=False")

# Always read from parquet
inventory = spark.read.parquet(inventory_write_name)
print(f"[loaded] inventory from parquet: {inventory.count():,} rows")
 

print()
bprint("FINAL SUMMARY")
print(f"[final] countries : {countries.count():8,d} rows")
print(f"[final] states    : {states.count():8,d} rows") 
print(f"[final] stations  : {stations.count():8,d} rows")
print(f"[final] inventory : {inventory.count():8,d} rows")


cell_time = time.time() - cell_time
print(f"[time] Total cell time (sec): {cell_time:6.2f}")
print(f"[time] Total cell time (min): {cell_time/60:6.2f}")
print(f"[time] notebook_run_time (min): {(time.time() - notebook_run_time)/60:5.2f}")


_____SECTION 5: UNIFIED PARQUET BUILD & LOAD_____
[rebuild] Countries - reading source and rebuilding...
[file] write_parquet  : wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/countries.parquet/

__________________________________________________
name :  
root
 |-- CODE: string (nullable = true)
 |-- COUNTRY_NAME: string (nullable = true)

[check] sample:
[cathch] sample failed: An error occurred while calling o279.limit. Trace:
py4j.Py4JException: Method limit([class java.lang.String]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:321)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:329)
	at py4j.Gateway.invoke(Gateway.java:274)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at 

2025-10-01 19:23:22,698 INFO Configuration.deprecation: io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum


Deleted wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/countries.parquet


25/10/01 19:23:25 WARN AzureFileSystemThreadPoolExecutor: Disabling threads for Delete operation as thread count 0 is <= 1


-rw-r--r--   1 dew59 supergroup          0 2025-10-01 19:23 wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/countries.parquet/_SUCCESS
-rw-r--r--   1 dew59 supergroup       4080 2025-10-01 19:23 wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/countries.parquet/part-00000-8f156b4b-aa1c-4be3-91a3-7c83c789b824-c000.snappy.parquet
[time] write_parquet (min)   :  0.11
[time] write_parquet (sec)   :  6.36


                                                                                

[loaded] countries from parquet: 219 rows
[rebuild] States - reading source and rebuilding...
[file] write_parquet  : wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/states.parquet/

__________________________________________________
name :  
root
 |-- CODE: string (nullable = true)
 |-- STATE_NAME: string (nullable = true)

[check] sample:
[cathch] sample failed: An error occurred while calling o302.limit. Trace:
py4j.Py4JException: Method limit([class java.lang.String]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:321)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:329)
	at py4j.Gateway.invoke(Gateway.java:274)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.

2025-10-01 19:23:31,712 INFO Configuration.deprecation: io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum


Deleted wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/states.parquet


25/10/01 19:23:33 WARN AzureFileSystemThreadPoolExecutor: Disabling threads for Delete operation as thread count 0 is <= 1


-rw-r--r--   1 dew59 supergroup          0 2025-10-01 19:23 wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/states.parquet/_SUCCESS
-rw-r--r--   1 dew59 supergroup       1879 2025-10-01 19:23 wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/states.parquet/part-00000-1f4f2ca0-126c-45f5-af93-cf83b06027b9-c000.snappy.parquet
[time] write_parquet (min)   :  0.08
[time] write_parquet (sec)   :  4.81
[loaded] states from parquet: 74 rows
[rebuild] Stations - reading source and rebuilding...
[file] write_parquet  : wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/stations.parquet/

__________________________________________________
name :  
root
 |-- ID: string (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- ELEVATION: double (nullable = true)
 |-- STATE: string (nullable = true)
 |-- NAME: string (nullable = true)
 |-- GSN_FLAG: string (nullable = true)
 |-- HCNCRN_FLAG: string (nullable = true

2025-10-01 19:23:37,296 INFO Configuration.deprecation: io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum


Deleted wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/stations.parquet


25/10/01 19:23:39 WARN AzureFileSystemThreadPoolExecutor: Disabling threads for Delete operation as thread count 0 is <= 1


-rw-r--r--   1 dew59 supergroup          0 2025-10-01 19:23 wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/stations.parquet/_SUCCESS
-rw-r--r--   1 dew59 supergroup    1330337 2025-10-01 19:23 wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/stations.parquet/part-00001-01bc187e-7c7b-48ad-be77-9f78c3be4cf3-c000.snappy.parquet
-rw-r--r--   1 dew59 supergroup     909343 2025-10-01 19:23 wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/stations.parquet/part-00002-01bc187e-7c7b-48ad-be77-9f78c3be4cf3-c000.snappy.parquet
-rw-r--r--   1 dew59 supergroup    1320264 2025-10-01 19:23 wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/stations.parquet/part-00000-01bc187e-7c7b-48ad-be77-9f78c3be4cf3-c000.snappy.parquet
[time] write_parquet (min)   :  0.09
[time] write_parquet (sec)   :  5.69


                                                                                

[loaded] stations from parquet: 129,657 rows
[rebuild] Inventory - reading source and rebuilding...
[file] write_parquet  : wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/inventory.parquet/

__________________________________________________
name :  
root
 |-- ID: string (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- ELEMENT: string (nullable = true)
 |-- FIRSTYEAR: integer (nullable = true)
 |-- LASTYEAR: integer (nullable = true)

[check] sample:
[cathch] sample failed: An error occurred while calling o397.limit. Trace:
py4j.Py4JException: Method limit([class java.lang.String]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:321)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:329)
	at py4j.Gateway.invoke(Gateway.java:274)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at

2025-10-01 19:23:44,592 INFO Configuration.deprecation: io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum


Deleted wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/inventory.parquet


25/10/01 19:23:47 WARN AzureFileSystemThreadPoolExecutor: Disabling threads for Delete operation as thread count 0 is <= 1


-rw-r--r--   1 dew59 supergroup    1009903 2025-10-01 19:23 wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/inventory.parquet/part-00001-a6ffa03e-5c0c-4344-b7d0-2a32e61afee0-c000.snappy.parquet
-rw-r--r--   1 dew59 supergroup     470447 2025-10-01 19:23 wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/inventory.parquet/part-00006-a6ffa03e-5c0c-4344-b7d0-2a32e61afee0-c000.snappy.parquet
-rw-r--r--   1 dew59 supergroup     998728 2025-10-01 19:23 wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/inventory.parquet/part-00000-a6ffa03e-5c0c-4344-b7d0-2a32e61afee0-c000.snappy.parquet
-rw-r--r--   1 dew59 supergroup     961163 2025-10-01 19:23 wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/inventory.parquet/part-00003-a6ffa03e-5c0c-4344-b7d0-2a32e61afee0-c000.snappy.parquet
-rw-r--r--   1 dew59 supergroup      71677 2025-10-01 19:23 wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/inventory.parquet/part-00007-a6ffa03e-5c0

In [9]:
# Q2(b): For each station, count the number of days with precipitation data
bprint("Process Answer: Q2(b)")
cell_time = time.time()
prcp_days_per_station = (
    daily_for_overlap
    .filter((F.col("ELEMENT") == "PRCP") & (F.col("VALUE").isNotNull()))
    .groupBy("ID")
    .agg(F.countDistinct("DATE").alias("days_with_prcp"))
    .orderBy("ID")
)
prcp_days_per_station.show(10)
cell_time = time.time() - cell_time
print(f"[time] Q2(b) cell time (sec): {cell_time:6.2f}")


______________Process Answer: Q2(b)______________


NameError: name 'daily_for_overlap' is not defined

## Q2(b): For each station, count the number of days with precipitation data

**Assignment Question:**
For each station, count the number of days with precipitation data (i.e., non-missing PRCP values).

In [None]:
# Q2(a): Aggregate daily precipitation (PRCP) by station and date
bprint("Process Answer: Q2(a)")
cell_time = time.time()
prcp_daily = (
    daily_for_overlap
    .filter(F.col("ELEMENT") == "PRCP")
    .groupBy("ID", "DATE")
    .agg(F.sum("VALUE").alias("total_prcp"))
    .orderBy("ID", "DATE")
)
prcp_daily.show(10)
cell_time = time.time() - cell_time
print(f"[time] Q2(a) cell time (sec): {cell_time:6.2f}")

## Q2(a): Aggregate daily precipitation (PRCP) by station and date

**Assignment Question:**
Aggregate daily precipitation (PRCP) by station and date, producing a table with total precipitation per station per day.

## Question 2(a) and 2(b)
**Q2(a):** Aggregate daily precipitation (PRCP) by station and date, producing a table with total precipitation per station per day.
**Q2(b):** For each station, count the number of days with precipitation data (i.e., non-missing PRCP values).

In [None]:
# ===== INV_AGG (requires inventory) =====
if FORCE_REBUILD_INV_AGG or not has_inv_agg:
    print("[rebuild] Inv_agg - building aggregation from inventory...")
    core_elements = ["PRCP", "SNOW", "SNWD", "TMAX", "TMIN"]
    inv_agg_temp = (inventory
                   .groupBy("ID")
                   .agg(
                       F.min("FIRSTYEAR").alias("FIRSTYEAR"),
                       F.max("LASTYEAR").alias("LASTYEAR"),
                       F.countDistinct("ELEMENT").alias("ELEMENT_COUNT"),
                       F.countDistinct(
                           F.when(F.col("ELEMENT").isin(core_elements), F.col("ELEMENT"))
                       ).alias("CORE_ELEMENT_COUNT"),
                       F.countDistinct(
                           F.when(~F.col("ELEMENT").isin(core_elements), F.col("ELEMENT"))
                       ).alias("OTHER_ELEMENT_COUNT")
                   ).orderBy(F.col("CORE_ELEMENT_COUNT").desc(),
                            F.col("ELEMENT_COUNT").desc(),
                            F.col("ID").asc())
                   )
    write_parquet(inv_agg_temp, inv_agg_write_name, "inv_agg")
else:
    print("[skip] Inv_agg - parquet exists and FORCE_REBUILD_INV_AGG=False")

# Always read from parquet
inv_agg = spark.read.parquet(inv_agg_write_name)
print(f"[loaded] inv_agg from parquet: {inv_agg.count():,} rows")

In [None]:
bprint("SECTION 5: ENANCED PARQUET BUILD & LOAD")
# One cell to rule them all: build parquet files if needed, then load clean DataFrames

if FORCE_REBUILD_ENRICHED or not has_enriched:
    print("[rebuild] Enriched - building enriched stations from joins...")
    
    # Derive country code from station ID
    stations_cc = stations.withColumn("COUNTRY_CODE", F.substring("ID", 1, 2))
    
    # Join stations with countries
    stn_countries = (
        stations_cc
        .join(countries, stations_cc.COUNTRY_CODE == countries.CODE, "left")
        .drop(countries.CODE)   # keep COUNTRY_CODE from stations, drop duplicate
    )
    
    # Build enriched by joining with states and inventory aggregation
    enriched_temp = (
        stn_countries   # already has station + country info
        .join(states, stn_countries.STATE == states.CODE, "left")
        .join(inv_agg, on="ID", how="left")
        .orderBy(F.col("ID").asc(), F.col("LASTYEAR").asc(), F.col("ELEMENT_COUNT").asc())
    )
    
    write_parquet(enriched_temp, enriched_write_name, "enriched")
else:
    print("[skip] Enriched - parquet exists and FORCE_REBUILD_ENRICHED=False")

# Always read from parquet
enriched = spark.read.parquet(enriched_write_name)
print(f"[loaded] enriched from parquet: {enriched.count():,} rows")

In [None]:
bprint("CSV AND FILE HANDLING")

# Schema for reading CSV files
daily_schema = T.StructType([
    T.StructField("ID",       T.StringType(), True),
    T.StructField("DATE",     T.StringType(), True),  # parsed to DateType below
    T.StructField("ELEMENT",  T.StringType(), True),
    T.StructField("VALUE",    T.IntegerType(), True),
    T.StructField("MFLAG",    T.StringType(), True),
    T.StructField("QFLAG",    T.StringType(), True),
    T.StructField("SFLAG",    T.StringType(), True),
    T.StructField("OBSTIME",  T.StringType(), True),
])

# CSV reading function
def read_csv_with_schema(path: str, schema=None) -> DataFrame:
    """Read a CSV file using the specified schema."""
    return (spark.read.csv(
        path,
        schema=schema or daily_schema,
        header=False,
        mode="PERMISSIVE"
    ))

In [None]:
bprint("Process A1(a)1")
 

#Q1(a): Describe the structure of the GHCN-Daily dataset in storage — identify the container and the top-level items (e.g., daily/, inventory, stations, countries, states), note naming patterns, and briefly state what each contains (with an example path or two).


#!hdfs dfs -ls -h {data_root}
notebook_run_time = time.time() 
print("daily_root -> ", daily_root)
!hdfs dfs -ls -h {data_root} 

In [None]:
bprint("Process A1(a)2")


#Q1(a): Describe the structure of the GHCN-Daily dataset in storage — identify the container and the top-level items (e.g., daily/, inventory, stations, countries, states), note naming patterns, and briefly state what each contains (with an example path or two).
#!hdfs dfs -du -s -h {daily_root} 
print(f"daily_root -> {daily_root}")
!hdfs dfs -du -s -h {daily_root} 
!hdfs dfs -ls    -h {daily_root} 
 

In [None]:
bprint("Plot A1(c).pie — total vs daily")
# supports: Q1(c). Q1(c) (verbatim): "What is the total size of all of the data, and how much of that is daily?"

#Daily folder size, meta-data size 
bprint("Process A1(c)")
cell_time = time.time() 
result = get_ipython().getoutput(f"hdfs dfs -du -s {daily_root}")
print("Raw result:", result)
print()
daily_size_MByte = int(result[0].split()[0])
daily_size_MByte = daily_size_MByte/ (1024**2)
daily_size_Bytes = int(result[0].split()[0])
print("Daily size (bytes):", daily_size_Bytes)
print("Daily size (MB)   :", daily_size_MByte)
 
lines = get_ipython().getoutput(f"hdfs dfs -ls {data_root}")
print(lines)
meta_size_Bytes  = 0
other_size_MByte = 0
other_size_Mbyte = meta_size_Bytes / (1024**2)
for line in lines:
    parts = line.split()
    if len(parts) >= 6 and parts[0].startswith('-'):   # file, not directory
        size = int(parts[2])                           # file size is parts[2]  
        meta_size_Bytes += size
        
print()
bprint() 
print(f"[result] daily size (bytes): {daily_size_Bytes:,d}")
print(f"[result] daily size (MB)   : {daily_size_MByte:.2f}")
print(f"[result] meta-data (bytes) : {meta_size_Bytes:,d}")
print(f"[result] meta-data (MB)    : {meta_size_Bytes / (1024**2):.2f}")

cell_time = time.time() - cell_time
print(f"[time]   Cell time (sec)   : {cell_time:5.2f}") 
print(f"[time]   Cell time (min)   : {cell_time/60:5.2f}") 
print(f"[time] notebook_run_time (min): {(time.time() - notebook_run_time)/60:5.2f}")


In [None]:
 bprint("Plot A1(c).pie — total vs daily") 
# supports: Q1(c). Q1(c) (verbatim): "What is the total size of all of the data, and how much of that is daily?"


cell_time = time.time()  
print("current_csvgz_path :",current_csvgz_path)
cmd        = f"hdfs dfs -ls {current_csvgz_path       }"
result     = subprocess.run(cmd, shell=True, capture_output=True, text=True)
lines      = result.stdout.strip().split("\n")
rows       = []
#print(lines)
for line in lines:
    print(line)
    parts = line.split()
    #print(parts)
    if len(parts) < 6:
        #print("continue")
        continue
    size = int(parts[2])
    path = parts[-1]
    if path.endswith(".csv.gz"):
        year = int(path.split("/")[-1].replace(".csv.gz", ""))
        rows.append((year, size))
        #print(year)
        
 
bprint()
print("Sample parsed rows:", rows[:5])
print("rows :",rows)
cell_time = time.time() - cell_time
print(f"[time] cell_time (sec): {cell_time:6.2f}")
print(f"[time] cell_time (min): {cell_time/60:6.2f}")
print(f"[time] notebook_run_time (min): {(time.time() - notebook_run_time)/60:5.2f}")

In [None]:
bprint("Process Answer: 1(b)8 indirectly")
# supports: Q1(b). Q1(b) (verbatim): "How many years are contained in daily, and how does the size of the data change?" 


# Build Spark DataFrame with exactly the 2 integer columns  

cell_time = time.time()  
# Define schema 
schema = StructType([
    StructField("year", IntegerType(), True),
    StructField("compressed_bytes", IntegerType(), True)
 ])

# Create Spark DataFrame with schema
year_sizes_df = spark.createDataFrame(rows, schema)

print("Schema:")
year_sizes_df.printSchema() 

cell_time =(time.time() - cell_time)  
print(f"[time] cell_time (sec): {cell_time:6.2f}")
print(f"[time] cell_time (min): {cell_time/60:6.2f}")
print(f"[time] notebook_run_time (min): {(time.time() - notebook_run_time)/60:5.2f}")

In [None]:
bprint("Process Answer: 1(b)4") 
#  (“How many years are contained in daily, and how does the size of the data change?”).
# Build Spark DataFrame with exactly the 2 integer columns  
cell_time = time.time() 
year_sizes_df.show(10, truncate=False)
print("Row count:", year_sizes_df.count())
cell_time = time.time() - cell_time
print(f"[time] cell_time (sec): {cell_time:6.2f}")
print(f"[time] cell_time (min): {cell_time/60:6.2f}")
print(f"[time] notebook_run_time (min): {(time.time() - notebook_run_time)/60:5.2f}")

In [None]:
bprint("SECTION 2: DATA INGESTION")
# supports: Q1(b). Q1(b) (verbatim): "How many years are contained in daily, and how does the size of the data change?"

cell_time = time.time()  
# — build FULL `daily` from all years (wildcard)
# Ensure the schema exists (uses your column names, incl. OBSTIME)
if "daily_schema" not in globals():
    daily_schema = T.StructType([
        T.StructField("ID",       T.StringType(), True),
        T.StructField("DATE",     T.StringType(), True),  # parsed to DateType below
        T.StructField("ELEMENT",  T.StringType(), True),
        T.StructField("VALUE",    T.IntegerType(), True),
        T.StructField("MFLAG",    T.StringType(), True),
        T.StructField("QFLAG",    T.StringType(), True),
        T.StructField("SFLAG",    T.StringType(), True),
        T.StructField("OBSTIME",  T.StringType(), True),
    ])

print("Reading all years:", f"{daily_root}*.csv.gz")
print()

_df = spark.read.csv(
    f"{daily_root}*.csv.gz",
    schema=daily_schema,
    header=False,            # flip to True if your files have a header row
    mode="PERMISSIVE"
)

# Some dumps use STATION instead of ID
if "STATION" in _df.columns and "ID" not in _df.columns:
    _df = _df.withColumnRenamed("STATION", "ID")

daily_for_overlap = (
    _df.withColumn(
        "DATE",
        F.coalesce(F.to_date("DATE", "yyyy-MM-dd"),
                   F.to_date("DATE", "yyyyMMdd"))
    )
    .withColumn("ID", F.upper(F.trim(F.col("ID"))))
    .select("ID", "DATE", "ELEMENT", "VALUE", "MFLAG", "QFLAG", "SFLAG", "OBSTIME")
    .cache()
)

# Touch to materialise cache
_ = daily_for_overlap.limit(1).count()

show_df(daily_for_overlap.limit(10), name="daily (full, wildcard)")
cell_time = time.time() - cell_time
print(f"[time] cell_time (sec): {cell_time:6.2f}")
print(f"[time] cell_time (min): {cell_time/60:6.2f}")
print(f"[time] notebook_run_time (min): {(time.time() - notebook_run_time)/60:5.2f}")


In [None]:
bprint("Process Answer: 1(b)6")
# supports: Q1(b) (robust HDFS parsing variant, "NEW -ER", for year/size extraction). Q1(b) (verbatim): "How many years are contained in daily, and how does the size of the data change?"


cell_time = time.time() 
rows        = []
# NOTE:  -du with a files-only --- size + path are stable (behaves like the GOOD run)
lines       = get_ipython().getoutput(f'hdfs dfs -du "{data_root}/ghcnd-*.txt"')
print(lines)
for line in lines:                 # <-- was lines[15:] (skipped everything)
    #print()
    parts = line.split()
    #print(line)
    #print(parts)
    #print(len(parts))
    #print(parts[0])

    if len(parts) >= 2:
        size = int(parts[0])                 # bytes from `hdfs dfs -du`
        path = parts[-1].strip()             # full path
        #print("size:",size)
        print(path)
        # if not path.startswith(daily_root):   # files-only glob excludes /daily already
        rows.append((path, size))             # not compressed

print("\nMetadata file count:", len(rows))
print("Sample parsed rows:", rows[:5])
# Spark schema
schema = StructType([
    StructField("path", StringType(), False),
    StructField("uncompressed_bytes", LongType(), False),
])

metadata_files_df = spark.createDataFrame(rows, schema)

print("\n[spark] other_files_df schema:")
metadata_files_df.printSchema()
print("[spark] sample:")
metadata_files_df.show( truncate=False)
cell_time = time.time() - cell_time
print(f"[time] cell_time (sec): {cell_time:6.2f}")
print(f"[time] cell_time (min): {cell_time/60:6.2f}")
print(f"[time] notebook_run_time (min): {(time.time() - notebook_run_time)/60:5.2f}")

In [None]:
bprint("Q1(b)25") 

has_enriched  = has_parquet(enriched_write_name)
has_stations  = has_parquet(stations_write_name)
has_inventory = has_parquet(inventory_write_name)
has_states    = has_parquet(states_write_name)
has_countries = has_parquet(countries_write_name)

In [None]:
bprint("Process Answer: 1(c)2 — dataset sizes (HDFS) + est. uncompressed daily")
# supports: Q1(c). Q1(c) (verbatim): "What is the total size of all of the data, and how much of that is daily?"

cell_time = time.time()
# 
print(f"daily_root          : {daily_root}")
print(f"inventory_read_name : {inventory_read_name}")
print(f"stations_read_name  : {stations_read_name}")
print(f"countries_read_name : {countries_read_name}")
print(f"states_read_name    : {states_read_name}")
sizes = {
    "daily (folder)":      du_bytes(daily_root),
    "ghcnd-inventory.txt": du_bytes(inventory_read_name),
    "ghcnd-stations.txt":  du_bytes(stations_read_name),
    "ghcnd-countries.txt": du_bytes(countries_read_name),
    "ghcnd-states.txt":    du_bytes(states_read_name),
}
total_bytes = sum(sizes.values())

# Simple gzip expansion estimate  
gzip_expansion_factor = 3.3
est_uncomp_daily = int(sizes["daily (folder)"] * gzip_expansion_factor)

print("[status] gzip_expansion_factor ->", gzip_expansion_factor)
print("[status] sizes (bytes) ->", sizes)
print("[status] total (bytes)  ->", total_bytes)

# Present as a small Spark table (sizes in MB for readability)
to_mb = 1024**2
rows = []
for k, v in sizes.items():
    rows.append((k, round(v/to_mb, 2)))
rows.append(("TOTAL", round(total_bytes/to_mb, 2)))

sizes_df = spark.createDataFrame(rows, ["dataset", "size_mb"])
sizes_df.show(truncate=False)

print(f"[status] estimated uncompressed daily (MB): {est_uncomp_daily/to_mb:,.2f}")

 
user_out = f"wasbs://campus-user@madsstorage002.blob.core.windows.net/dew59/processing"
spark.createDataFrame(
    [(k, v, round(v/to_mb,2)) for k, v in sizes.items()] + [("TOTAL", total_bytes, round(total_bytes/to_mb, 2))]
    , ["dataset","size_bytes","size_mb"]
).coalesce(1).write.mode("overwrite").option("header","true").csv(f"{user_out}/dew59_sizes_mb_csv")

cell_time = time.time() - cell_time
print(f"[time] cell_time (sec): {cell_time:6.2f}")
print(f"[time] cell_time (min): {cell_time/60:6.2f}")

print(f"[time] notebook_run_time (min): {(time.time() - notebook_run_time)/60:5.2f}")

In [None]:
bprint("Process Answer: Q2(c)71")
# supports: Q2(c) — [awaiting verbatim text] load fixed-width metadata into Spark and extract columns using substring by character ranges.
# does: reads the fixed-width STATIONS text via spark.read.text and extracts ID, LATITUDE, LONGITUDE, ELEVATION, STATE, NAME, GSN_FLAG, HCNCRN_FLAG, WMO_ID with F.substring; prints schema/sample.

read_stations = spark.read.text(stations_read_name)

stations = (
    read_stations.select(
        F.trim(F.substring("value",  1, 11)).alias("ID"),                 # 1–11
        F.trim(F.substring("value", 13,  8)).cast("double").alias("LATITUDE"),   # 13–20
        F.trim(F.substring("value", 22,  9)).cast("double").alias("LONGITUDE"),  # 22–30
        F.trim(F.substring("value", 32,  6)).cast("double").alias("ELEVATION"),  # 32–37
        F.trim(F.substring("value", 39,  2)).alias("STATE"),                     # 39–40
        F.trim(F.substring("value", 42, 30)).alias("NAME"),                      # 42–71
        F.trim(F.substring("value", 73,  3)).alias("GSN_FLAG"),                  # 73–75
        F.trim(F.substring("value", 77,  3)).alias("HCNCRN_FLAG"),               # 77–79
        F.trim(F.substring("value", 81,  5)).alias("WMO_ID")                     # 81–85
    )
)
print("stations")
stations.printSchema()
stations.show(10, truncate=False)
print(f"[time] notebook_run_time (min): {(time.time() - notebook_run_time)/60:5.2f}")

## Q2(c): Extract columns from fixed-width metadata

**Assignment Question:**
Load fixed-width metadata into Spark and extract columns using substring by character ranges.

In [None]:
bprint("SECTION 3: DATA PROCESSING")
# supports: Q3(a–c) — "Derive country_code and join stations with countries/states."
# does: parses COUNTRIES fixed-width into [CODE, COUNTRY_NAME]; derives COUNTRY_CODE from station IDs; left-joins stations↔countries to create stn_countries and previews results for verification.

# countries
read_countries = spark.read.text(countries_read_name)
countries = (
    read_countries.select(
        F.substring("value", 1, 2).alias("CODE"),                # 1–2
        F.trim(F.substring("value", 4, 61)).alias("COUNTRY_NAME")# 4–64
    )
)
countries.show()
# derive country code 
stations_cc = stations.withColumn("COUNTRY_CODE", F.substring("ID", 1, 2))
# join country code 
stn_countries = (
    stations_cc
    .join(countries, stations_cc.COUNTRY_CODE == countries.CODE, "left")
    .drop(countries.CODE)   # keep COUNTRY_CODE from stations, drop duplicate
)
stations_cc.show()
stn_countries.show()
stn_countries.select("ID","NAME","COUNTRY_CODE","COUNTRY_NAME").show(20, False)

cell_time = time.time() - cell_time 
print(f"[time]   Cell time (sec)   : {cell_time:5.2f}") 
print(f"[time]   Cell time (min)   : {cell_time/60:5.2f}")  
print(f"[time] notebook_run_time (min): {(time.time() - notebook_run_time)/60:5.2f}")

## Q3(a–c): Derive country code and join stations with countries/states

**Assignment Question:**
Derive country_code from station IDs, join stations with countries and states, and preview the results.

In [None]:
bprint("Process Answer: Q3(a–c)62")
# supports: Q3(a–c) — "Derive country_code and join stations with countries/states."
# does: reads STATES fixed-width from states_read_name, extracts CODE and STATE_NAME to build the states DataFrame, then prints schema and a sample; preparation for the stations↔states join.

# states
cell_time = time.time()  
read_states = spark.read.text(states_read_name)

states = (
    read_states.select(
        F.substring("value", 1, 2).alias("CODE"),                 # 1–2
        F.trim(F.substring("value", 4, 47)).alias("STATE_NAME")   # 4–50  (length = 47)
    )
)

states.printSchema()
states.show(20, truncate=False)
cell_time = time.time() - cell_time 
print(f"[time]   Cell time (sec)   : {cell_time:5.2f}") 
print(f"[time]   Cell time (min)   : {cell_time/60:5.2f}")  
print(f"[time] notebook_run_time (min): {(time.time() - notebook_run_time)/60:5.2f}")

In [None]:
bprint("Process Answer: Q3(d)60")  # indirectly
# supports: Q3(d) — aggregate inventory per station (FIRSTYEAR/LASTYEAR, element counts).
# does: parses the fixed-width inventory file (inventory_read_name) into columns [ID, LATITUDE, LONGITUDE, ELEMENT, FIRSTYEAR, LASTYEAR], then prints schema and a sample to verify ingestion before aggregation.


cell_time = time.time()   
read_inventory = spark.read.text(inventory_read_name)

inventory = (
    read_inventory.select(
        F.substring("value",  1, 11).alias("ID"),                  # 1–11
        F.trim(F.substring("value", 13,  8)).cast("double").alias("LATITUDE"),   # 13–20
        F.trim(F.substring("value", 22,  9)).cast("double").alias("LONGITUDE"),  # 22–30
        F.substring("value", 32,  4).alias("ELEMENT"),             # 32–35
        F.substring("value", 37,  4).cast("int").alias("FIRSTYEAR"),# 37–40
        F.substring("value", 42,  4).cast("int").alias("LASTYEAR")  # 42–45
    )
)

inventory.printSchema()
inventory.show(20, truncate=False)
cell_time = time.time() - cell_time 
print(f"[time]   Cell time (sec)   : {cell_time:5.2f}") 
print(f"[time]   Cell time (min)   : {cell_time/60:5.2f}")  
print(f"[time] notebook_run_time (min): {(time.time() - notebook_run_time)/60:5.2f}")

## Q3(d): Aggregate inventory per station

**Assignment Question:**
Aggregate inventory per station (FIRSTYEAR/LASTYEAR, element counts), and verify ingestion before aggregation.

In [None]:
bprint("Process Answer: Q3(a–c)49 ") # indirectly
# supports: Q3(a–c) — derive country_code and join stations with countries/states.
# does: persists the countries table to Parquet (if missing) at countries_write_name, so it’s available for the upcoming stations↔countries/states joins; prints the write path and timing.

cell_time = time.time()
if(not has_countries):
    write_parquet(countries,countries_write_name)
    bprint()
    print("[written] ",countries_write_name)
cell_time = time.time() - cell_time
print(f"[time] cell_time (sec): {cell_time:6.2f}")
print(f"[time] cell_time (min): {cell_time/60:6.2f}")
print(f"[time] notebook_run_time (min): {(time.time() - notebook_run_time)/60:5.2f}")                      

In [None]:
bprint("Process Answer: Q3(a–c)50")
# supports: Q3(a–c) — derive country_code and join stations with countries/states.
# does: writes the STATES table to Parquet at states_write_name when missing (has_states is False) so it’s ready for the upcoming stations↔states joins; prints the write path and timing.

cell_time = time.time()
if(not has_states):
    write_parquet(states,states_write_name)
    bprint()
    print("[written] ",states_write_name)
    
cell_time = time.time() - cell_time
print(f"[time] cell_time (sec): {cell_time:6.2f}")
print(f"[time] cell_time (min): {cell_time/60:6.2f}")
print(f"[time] notebook_run_time (min): {(time.time() - notebook_run_time)/60:5.2f}")

In [None]:
 bprint("Process Answer: Q3(d)51")
# supports: Q3(d) — aggregate inventory per station (first/last year, element counts) as input to enrichment.
# does: writes the INVENTORY table to Parquet at inventory_write_name when missing (has_inventory is False), ensuring inventory is available for the Q3(d) aggregations and later joins.

cell_time = time.time() 

if(not has_inventory):
    write_parquet(inventory,inventory_write_name)
    bprint()
    print("[written] ",inventory_write_name)    
cell_time = time.time() - cell_time
print(f"[time] cell_time (sec): {cell_time:6.2f}")
print(f"[time] cell_time (min): {cell_time/60:6.2f}")
print(f"[time] notebook_run_time (min): {(time.time() - notebook_run_time)/60:5.2f}")

In [None]:
bprint("Process Answer: Q3(a–c)52")
# supports: Q3(a–c) — "Derive country_code and join stations with countries/states."
# does: writes the STATIONS table to Parquet at stations_write_name when missing (has_stations is False), ensuring the base stations dataset is persisted for the upcoming joins.

cell_time = time.time()
if(not has_stations):
    write_parquet(stations,stations_write_name)
    bprint()
    print("[written] ",stations_write_name)    
cell_time = time.time() - cell_time
print(f"[time] cell_time (sec): {cell_time:6.2f}")
print(f"[time] cell_time (min): {cell_time/60:6.2f}")
print(f"[time] notebook_run_time (min): {(time.time() - notebook_run_time)/60:5.2f}")

In [None]:
bprint("SECTION 4: INVENTORY AGGREGATION")
# supports: Q2(d) — row counts, previews, and schemas for metadata + daily; write a small counts artifact.
# does: counts rows for stations/states/countries/inventory/daily_for_overlap, shows a table, writes the counts CSV to your user area, then previews 3 rows and prints schemas for each dataset.

cell_time = time.time() 

core_elements = ["PRCP", "SNOW", "SNWD", "TMAX", "TMIN"]

inv_agg = (inventory
           .groupBy("ID")
           .agg(
               F.min("FIRSTYEAR").alias("FIRSTYEAR"),
               F.max("LASTYEAR").alias("LASTYEAR"),
               F.countDistinct("ELEMENT").alias("ELEMENT_COUNT"),
               F.countDistinct(
                   F.when(F.col("ELEMENT").isin(core_elements), F.col("ELEMENT"))
               ).alias("CORE_ELEMENT_COUNT"),
               F.countDistinct(
                   F.when(~F.col("ELEMENT").isin(core_elements), F.col("ELEMENT"))
              ).alias("OTHER_ELEMENT_COUNT")
                   ).orderBy(F.col("CORE_ELEMENT_COUNT").desc(),
                        F.col("ELEMENT_COUNT").desc(),
                        F.col("ID").asc())
                        )
print()
bprint()
inv_agg.printSchema()
inv_agg.show(20, truncate=False)

print(f"[result] Aggregated inventory rows : {inv_agg.count():12,d}")
cell_time = time.time() - cell_time 
print(f"[time]   Cell time (sec)   : {cell_time:5.2f}") 
print(f"[time]   Cell time (min)   : {cell_time/60:5.2f}")  
print(f"[time] notebook_run_time (min): {(time.time() - notebook_run_time)/60:5.2f}")

In [None]:
# Q4(b): How many station IDs are in stations but not in daily?
bprint("Process Answer: Q4(b)58")
# supports: Q4(b) — "How many station IDs are in stations but not in daily?"
cell_time = time.time()
station_ids = stations.select(F.col("ID").alias("ID")).distinct()
daily_ids = daily_for_overlap.select(F.col("ID").alias("ID")).distinct()
stations_not_in_daily = station_ids.join(daily_ids, on="ID", how="left_anti")
count_not_in_daily = stations_not_in_daily.count()
print(f"[result] Station IDs in stations but not in daily: {count_not_in_daily:,}")
stations_not_in_daily.show(10)
cell_time = time.time() - cell_time
print(f"[time] Q4(b) cell time (sec): {cell_time:6.2f}")

## Q4(b): How many station IDs are in stations but not in daily?

**Assignment Question:**
How many station IDs are in stations but not in daily?

## Q2(d): Inventory aggregation and row counts

**Assignment Question:**
Aggregate inventory per station (FIRSTYEAR/LASTYEAR, element counts) and provide row counts, previews, and schemas for metadata and daily data.

In [None]:
bprint("Process Answer: Q3(e)56")
# supports: Q3(e) — "Join aggregated inventory into stations to produce an enriched stations table."
# does: builds `enriched` by joining station+country (`stn_countries`) with `states` (on STATE=CODE, left) and `inv_agg` (on ID, left); orders rows, prints row count and a preview of key columns, and records timing.

cell_time = time.time()
enriched = (stn_countries   # already has station + country info
            .join(states, stn_countries.STATE == states.CODE, "left")
            .join(inv_agg, on="ID", how="left")
           # ---- order the result (adjust) ----
             .orderBy(F.col("ID").asc(), F.col("LASTYEAR").asc(), F.col("ELEMENT_COUNT").asc())
)

print()
bprint()
print(f"[result] Enriched stations rows : {enriched.count():12,d}")
enriched.select(
                "ID"           ,"NAME"    ,"COUNTRY_NAME" ,"STATE_NAME",
                "FIRSTYEAR"    ,"LASTYEAR","ELEMENT_COUNT","CORE_ELEMENT_COUNT" ,"OTHER_ELEMENT_COUNT"
               ).show(20, truncate=False)

cell_time = time.time() - cell_time 
print(f"[time]   Cell time (sec)   : {cell_time:5.2f}") 
print(f"[time]   Cell time (min)   : {cell_time/60:5.2f}")  
print(f"[time] notebook_run_time (min): {(time.time() - notebook_run_time)/60:5.2f}")

In [None]:
bprint("Process Answer: Q3(e)53")
# supports: Q3(e) — join inventory aggregates into stations to produce an enriched stations table.
# does: conditionally writes the enriched stations DataFrame to Parquet (if not already present), logging the output path and timing.

#build parquet files conditionally 
cell_time = time.time()
if(not has_enriched):
    write_parquet(enriched,enriched_write_name)
    bprint()
    print("[written] ",enriched_write_name)
    cell_time = time.time()
    print(f"[time]   Cell time (min)   : {cell_time/60:5.2f}")

In [None]:
bprint("Process Answer: Q3(d)57")
# supports: Q3(d) — "Aggregate inventory per station (FIRSTYEAR, LASTYEAR, element counts) and summarise coverage."
# does: prints row counts for stations, countries, states, and inventory (plus check lines), serving as a sanity/baseline count before inventory-by-station aggregations and later joins.


bprint()
print("Row counts (with inventory):")
print(f"[result] stations      : {stations.count() :12,d}")
print(f"[result] countries     : {countries.count():12,d}")
print(f"[result] states        : {states.count()   :12,d}")
print(f"[result] inventory     : {inventory.count():12,d}")
print(f"[check ] stations_cc   : {inventory.count():12,d}")
print(f"[check ] stn_countries : {inventory.count():12,d}")

In [None]:
bprint("Process A X (11) ")
val = spark.range(1).select(F.date_format(F.current_timestamp(), 'yyyy.MM.dd HH:mm').alias('t')).first()['t']
cell_time = time.time() - cell_time  
print(f"[time] current time           :  {val}")
print(f"[time] Cell time (sec)        : {cell_time:6.2f}") 
stop_spark()
print(f"[time] Cell time (min)        : {cell_time/60:6.2f}") 
print(f"[time] notebook_run_time (min):  {(time.time() - notebook_run_time)/60:5.2f}")