# DATA420-25S2 Assignment 1 — GHCN Data Analysis using Spark  
**Notebook Framework (Scaffold)** · 2025-08-14

> 本 Notebook 作为 **可运行框架** 使用：按题目顺序预置了代码单元与输出占位，并内置 *grading checklist*，以便你边学 Lecture 9–12 边完成作业。

**作者（Author）**：Yu Xia  
**学号（Student ID）**：62380486

---


## ✅ Grading Checklist（对标评分要点）

> 根据 *DATA420-25S2 Assignment 1 (Grading)* 整理（Answers 25, Reasoning 25, Tables 7, Visualizations 18, Writing 13, Coding 12）。  
完成每一项任务后，请在对应清单处打勾。

- [ ] **Answers**：所有问题的答案由 **Spark 计算** 得出，单位清晰（rows / years / stations）。
- [ ] **Reasoning**：对 *怎么做 & 为什么这样做* 给出解释（数据结构、类型选择、join 代价、可视化方法等）。
- [ ] **Tables**：提供所需统计表（数据集大小与行数、core elements 计数、国家/州汇总等）。
- [ ] **Visualizations**：目录树、年度 daily 大小、NZ station 地图、TMIN/TMAX 子图 & 全国均值、2024 降水 choropleth。
- [ ] **Writing**：报告结构（Background / Processing / Analysis / Visualizations / Conclusions / References），语言简洁专业，正确引用外部资源与 AI 使用。
- [ ] **Coding**：Notebook 结构清晰、无异常 cell、注释完善、风格统一、补充材料有序。

---


## 0. 环境与会话（Environment & Spark Session）


In [None]:
# Run this cell to import pyspark and to define start_spark() and stop_spark()

import findspark

findspark.init()

import getpass
import pandas
import pyspark
import random
import re

from IPython.display import display, HTML
from pyspark import SparkContext
from pyspark.sql import SparkSession


# Constants used to interact with Azure Blob Storage using the hdfs command or Spark

global username

username = re.sub('@.*', '', getpass.getuser())

global azure_account_name
global azure_data_container_name
global azure_user_container_name
global azure_user_token

azure_account_name = "madsstorage002"
azure_data_container_name = "campus-data"
azure_user_container_name = "campus-user"
azure_user_token = r"sp=racwdl&st=2025-08-01T09:41:33Z&se=2026-12-30T16:56:33Z&spr=https&sv=2024-11-04&sr=c&sig=GzR1hq7EJ0lRHj92oDO1MBNjkc602nrpfB5H8Cl7FFY%3D"


# Functions used below

def dict_to_html(d):
    """Convert a Python dictionary into a two column table for display.
    """

    html = []

    html.append(f'<table width="100%" style="width:100%; font-family: monospace;">')
    for k, v in d.items():
        html.append(f'<tr><td style="text-align:left;">{k}</td><td>{v}</td></tr>')
    html.append(f'</table>')

    return ''.join(html)


def show_as_html(df, n=20):
    """Leverage existing pandas jupyter integration to show a spark dataframe as html.
    
    Args:
        n (int): number of rows to show (default: 20)
    """

    display(df.limit(n).toPandas())

    
def display_spark():
    """Display the status of the active Spark session if one is currently running.
    """
    
    if 'spark' in globals() and 'sc' in globals():

        name = sc.getConf().get("spark.app.name")

        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:green">active</span></b>, look for <code>{name}</code> under the running applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://localhost:{sc.uiWebUrl.split(":")[-1]}" target="_blank">Spark Application UI</a></li>',
            f'</ul>',
            f'<p><b>Config</b></p>',
            dict_to_html(dict(sc.getConf().getAll())),
            f'<p><b>Notes</b></p>',
            f'<ul>',
            f'<li>The spark session <code>spark</code> and spark context <code>sc</code> global variables have been defined by <code>start_spark()</code>.</li>',
            f'<li>Please run <code>stop_spark()</code> before closing the notebook or restarting the kernel or kill <code>{name}</code> by hand using the link in the Spark UI.</li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))
        
    else:
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:red">stopped</span></b>, confirm that <code>{username} (notebook)</code> is under the completed applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))


# Functions to start and stop spark

def start_spark(executor_instances=2, executor_cores=1, worker_memory=1, master_memory=1):
    """Start a new Spark session and define globals for SparkSession (spark) and SparkContext (sc).
    
    Args:
        executor_instances (int): number of executors (default: 2)
        executor_cores (int): number of cores per executor (default: 1)
        worker_memory (float): worker memory (default: 1)
        master_memory (float): master memory (default: 1)
    """

    global spark
    global sc

    cores = executor_instances * executor_cores
    partitions = cores * 4
    port = 4000 + random.randint(1, 999)

    spark = (
        SparkSession.builder
        .config("spark.driver.extraJavaOptions", f"-Dderby.system.home=/tmp/{username}/spark/")
        .config("spark.dynamicAllocation.enabled", "false")
        .config("spark.executor.instances", str(executor_instances))
        .config("spark.executor.cores", str(executor_cores))
        .config("spark.cores.max", str(cores))
        .config("spark.driver.memory", f'{master_memory}g')
        .config("spark.executor.memory", f'{worker_memory}g')
        .config("spark.driver.maxResultSize", "0")
        .config("spark.sql.shuffle.partitions", str(partitions))
        .config("spark.kubernetes.container.image", "madsregistry001.azurecr.io/hadoop-spark:v3.3.5-openjdk-8")
        .config("spark.kubernetes.container.image.pullPolicy", "IfNotPresent")
        .config("spark.kubernetes.memoryOverheadFactor", "0.3")
        .config("spark.memory.fraction", "0.1")
        .config(f"fs.azure.sas.{azure_user_container_name}.{azure_account_name}.blob.core.windows.net",  azure_user_token)
        .config("spark.app.name", f"{username} (notebook)")
        .getOrCreate()
    )
    sc = SparkContext.getOrCreate()
    
    display_spark()

    
def stop_spark():
    """Stop the active Spark session and delete globals for SparkSession (spark) and SparkContext (sc).
    """

    global spark
    global sc

    if 'spark' in globals() and 'sc' in globals():

        spark.stop()

        del spark
        del sc

    display_spark()


# Make css changes to improve spark output readability

html = [
    '<style>',
    'pre { white-space: pre !important; }',
    'table.dataframe td { white-space: nowrap !important; }',
    'table.dataframe thead th:first-child, table.dataframe tbody th { display: none; }',
    '</style>',
]
display(HTML(''.join(html)))

### Assignment 1 ###

The code below demonstrates how to explore and load the data provided for the assignment from Azure Blob Storage and how to save any outputs that you generate to a separate user container.

**Key points**

- The data provided for the assignment is stored in Azure Blob Storage and outputs that you generate will be stored in Azure Blob Storage as well. Hadoop and Spark can both interact with Azure Blob Storage similar to how they interact with HDFS, but where the replication and distribution is handled by Azure instead. This makes it possible to read or write data in Azure over HTTPS where the path is prefixed by `wasbs://`.
- There are two containers, one for the data which is read only and one for any outputs that you generate,
  - `wasbs://campus-data@madsstorage002.blob.core.windows.net/`
  - `wasbs://campus-user@madsstorage002.blob.core.windows.net/`
- You can use variable interpolation to insert your global username variable into paths automatically.
  - This works for bash commands as well.

In [None]:
# Run this cell to start a spark session in this notebook

start_spark(executor_instances=4, executor_cores=2, worker_memory=4, master_memory=4)

25/08/01 21:36:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


0,1
spark.dynamicAllocation.enabled,false
spark.fs.azure.sas.uco-user.madsstorage002.blob.core.windows.net,"""sp=racwdl&st=2024-09-19T08:00:18Z&se=2025-09-19T16:00:18Z&spr=https&sv=2022-11-02&sr=c&sig=qtg6fCdoFz6k3EJLw7dA8D3D8wN0neAYw8yG4z4Lw2o%3D"""
spark.kubernetes.driver.pod.name,spark-master-driver
spark.executor.instances,4
spark.driver.extraJavaOptions,-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false -Dderby.system.home=/tmp/jsw93/spark/
spark.driver.memory,4g
spark.app.id,spark-80de32c5f2824931896179ff15cd5530
spark.fs.azure.sas.campus-user.madsstorage002.blob.core.windows.net,"""sp=racwdl&st=2024-09-19T08:03:31Z&se=2025-09-19T16:03:31Z&spr=https&sv=2022-11-02&sr=c&sig=kMP%2BsBsRzdVVR8rrg%2BNbDhkRBNs6Q98kYY695XMRFDU%3D"""
spark.kubernetes.container.image.pullPolicy,IfNotPresent
spark.sql.shuffle.partitions,32


## 0.1 路径与工具（Paths & Helpers）

- **输入（Input, read-only）**：`wasbs://campus-data@madsstorage002.blob.core.windows.net/ghcnd/`
- **输出（Output, per-user）**：`wasbs://campus-user@madsstorage002.blob.core.windows.net/<username>/`
- **本地缓存（可选）**：仅小规模中间结果用于图形化。


In [None]:
# 路径配置 —— 按需替换 <username>
DATA_ROOT = "wasbs://campus-data@madsstorage002.blob.core.windows.net/ghcnd/"
USER_ROOT = "wasbs://campus-user@madsstorage002.blob.core.windows.net/<username>/"  # TODO: 替换为你的用户名

paths = {
    "daily":     DATA_ROOT + "daily/",
    "stations":  DATA_ROOT + "stations/ghcnd-stations.txt",
    "countries": DATA_ROOT + "countries/ghcnd-countries.txt",
    "states":    DATA_ROOT + "states/ghcnd-states.txt",
    "inventory": DATA_ROOT + "inventory/ghcnd-inventory.txt",
}

paths


In [None]:
# 常用导入
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.window import Window

# Pandas/绘图（仅对较小聚合结果使用）
import pandas as pd
import numpy as np


# 1. Processing（数据处理）

> 对照 Assignment 1 – Processing Q1–Q4。完成后输出统计表，并保存 **enriched stations** 到 `USER_ROOT`。


## 1.1 Q1: 使用 `hdfs` 探索数据结构（Data layout, compression, years）

**目标（Answers）**：目录结构、压缩状态、年份范围、大小统计。  
**Reasoning**：压缩/非压缩差异，解压后大小估算；daily 随时间体量变化。  
**Tables/Vis**：数据集大小表、目录树、年度大小变化图（可选）。


In [None]:
# 若在可用环境中，可调用 shell 命令（某些环境需前置 '!' 或使用 Py4J 访问 FS）。
# 这里保留占位：请在集群终端/Notebook中运行 hdfs 命令并将结果粘贴到 markdown/表格中。
# 示例：!hdfs dfs -du -h $DATA_ROOT

# TODO: 将 hdfs dfs 列表与尺寸统计结果整理成 Pandas DataFrame 以便制表/绘图。


## 1.2 Q2: 定义 schema 并加载数据（CSV + 固定宽度文本）

**要点**：  
- `daily` 为 CSV（空字段为空），需定义 schema（DATE/OBSERVATION_TIME 讨论 `StringType/DateType/TimestampType`）。  
- `stations/countries/states/inventory` 为固定宽度文本，使用 `substring` 提取列。  


In [None]:
# Q2a: 定义 daily schema —— 可按 README/brief 描述调整类型
daily_schema = T.StructType([
    T.StructField("ID", T.StringType(), True),
    T.StructField("DATE", T.StringType(), True),           # 也可在加载后 to_date
    T.StructField("ELEMENT", T.StringType(), True),
    T.StructField("VALUE", T.DoubleType(), True),
    T.StructField("MEASUREMENT_FLAG", T.StringType(), True),
    T.StructField("QUALITY_FLAG", T.StringType(), True),
    T.StructField("SOURCE_FLAG", T.StringType(), True),
    T.StructField("OBSERVATION_TIME", T.StringType(), True) # 加载后再规范化 HHMM
])
daily_schema


In [None]:
# Q2b: 加载最近一年的 daily 子集（示例：2024）
latest_year = "2024"  # TODO: 如需动态探测，可先从目录中解析年份
daily_df = spark.read.csv(paths['daily'] + f"{latest_year}.csv.gz", header=False, schema=daily_schema)

# 可选：规范化日期/时间
daily_df = (daily_df
            .withColumn("DATE", F.to_date("DATE", "yyyyMMdd"))
            .withColumn("OBS_HH", F.substring("OBSERVATION_TIME", 1, 2).cast("int"))
            .withColumn("OBS_MM", F.substring("OBSERVATION_TIME", 3, 2).cast("int"))
           )

daily_df.printSchema()
daily_df.show(5)


In [None]:
# Q2c: 解析固定宽度文本（stations/countries/states/inventory）
# 读入为一列 'value' 的 DataFrame，然后用 substring 提取对应字符范围。索引从 1 开始。

stations_raw = spark.read.text(paths['stations'])

stations_df = (stations_raw
    .withColumn("ID",        F.substring("value", 1, 11))
    .withColumn("LATITUDE",  F.substring("value", 13, 8).cast("double"))
    .withColumn("LONGITUDE", F.substring("value", 22, 9).cast("double"))
    .withColumn("ELEVATION", F.substring("value", 32, 6).cast("double"))
    .withColumn("STATE",     F.substring("value", 39, 2))
    .withColumn("NAME",      F.substring("value", 42, 30))
    .withColumn("GSN_FLAG",  F.substring("value", 73, 3))
    .withColumn("HCN_CRN",   F.substring("value", 77, 3))
    .withColumn("WMO_ID",    F.substring("value", 81, 5))
    .drop("value")
)

countries_df = (spark.read.text(paths['countries'])
    .withColumn("CODE", F.substring("value", 1, 2))
    .withColumn("COUNTRY_NAME", F.substring("value", 4, 61))
    .drop("value")
)

states_df = (spark.read.text(paths['states'])
    .withColumn("CODE", F.substring("value", 1, 2))
    .withColumn("STATE_NAME", F.substring("value", 4, 47))
    .drop("value")
)

inventory_df = (spark.read.text(paths['inventory'])
    .withColumn("ID",        F.substring("value", 1, 11))
    .withColumn("LATITUDE",  F.substring("value", 13, 8).cast("double"))
    .withColumn("LONGITUDE", F.substring("value", 22, 9).cast("double"))
    .withColumn("ELEMENT",   F.substring("value", 32, 4))
    .withColumn("FIRSTYEAR", F.substring("value", 37, 4).cast("int"))
    .withColumn("LASTYEAR",  F.substring("value", 42, 4).cast("int"))
    .drop("value")
)

stations_df.printSchema()
countries_df.printSchema()
states_df.printSchema()
inventory_df.printSchema()


In [None]:
# Q2d–e: 行数统计
counts = {
    "stations_rows": stations_df.count(),
    "countries_rows": countries_df.count(),
    "states_rows": states_df.count(),
    "inventory_rows": inventory_df.count(),
    "daily_rows_2024": daily_df.count()
}
counts


## 1.3 Q3: 构建 enriched stations（country/state/inventory 聚合）

**目标**：提取国家代码、LEFT JOIN countries & states；统计每站 first/last year、core/other 元素数量；保存 enriched 表。  
**优化建议**：先对 `inventory` 按元素类别过滤再 join；避免无必要的宽表物化。


In [None]:
# Q3a: 从 station ID 提取国家代码
stations_enriched = stations_df.withColumn("COUNTRY_CODE", F.substring("ID", 1, 2))

# Q3b: LEFT JOIN countries
stations_enriched = (stations_enriched
    .join(countries_df.withColumnRenamed("CODE", "COUNTRY_CODE"), on="COUNTRY_CODE", how="left")
)

# Q3c: LEFT JOIN states（仅 US 适用）
stations_enriched = (stations_enriched
    .join(states_df.withColumnRenamed("CODE", "STATE"), on="STATE", how="left")
)

# Q3d: inventory 聚合
core_elements = F.array(F.lit("TMAX"), F.lit("TMIN"), F.lit("PRCP"), F.lit("SNOW"), F.lit("SNWD"))

inv_by_station = (inventory_df
    .groupBy("ID")
    .agg(
        F.min("FIRSTYEAR").alias("FIRSTYEAR_ANY"),
        F.max("LASTYEAR").alias("LASTYEAR_ANY"),
        F.countDistinct("ELEMENT").alias("N_ELEMENTS"),
        F.sum(F.when(F.col("ELEMENT").isin("TMAX","TMIN","PRCP","SNOW","SNWD"), 1).otherwise(0)).alias("N_CORE_ELEMENTS")
    )
)

stations_enriched = (stations_enriched
    .join(inv_by_station.withColumnRenamed("ID", "ID"), on="ID", how="left")
)

stations_enriched.printSchema()
stations_enriched.show(5)


In [None]:
# Q3e: 保存 enriched stations（建议 parquet）
(stations_enriched
 .write.mode("overwrite")
 .parquet(USER_ROOT + "enriched_stations.parquet"))


## 1.4 Q4: 检查 daily 缺失的 stations

**目标**：找出在 `stations` 中但完全不出现在 `daily` 的站点数。  
**提示**：避免全量 join，可先获得最近一年的 station ID 子集或用 distinct station ID 映射 + broadcast 小表。


In [None]:
# 提取 2024 年 daily 中出现过的 station ID（可根据需要扩展年份范围）
daily_station_ids_2024 = daily_df.select("ID").distinct().withColumnRenamed("ID", "ID_IN_DAILY")

# 使用 left anti join 找出从未出现在 daily_2024 的站
missing_in_daily = (stations_df
    .join(daily_station_ids_2024, stations_df.ID == daily_station_ids_2024.ID_IN_DAILY, how="left_anti")
)

missing_count = missing_in_daily.count()
missing_count


# 2. Analysis（分析问答）

> 对照 Assignment 1 – Analysis Q1–Q3。强调 **方法解释（Reasoning）** 与 **效率**。

## 2.1 Q1: 站点概况统计（总数、2025活跃、网络归属、南半球、美国属地、国家/州统计）

**要点**：  
- 2025 活跃：`LASTYEAR_ANY >= 2025` 或结合 daily 2025 是否有观测。  
- 南半球：`LATITUDE < 0`。  
- 网络字段：`GSN_FLAG` / `HCN_CRN`。  
- 国家/州分布：统计并保存。


In [None]:
# 总站点数
total_stations = stations_df.count()

# 2025 活跃（基于 inventory 汇总）
active_2025 = stations_enriched.filter(F.col("LASTYEAR_ANY") >= 2025).count()

# 网络归属统计（示例：是否在 GSN/HCN/CRN，具体取值需根据数据取样验证）
network_counts = (stations_enriched
    .select(
        F.when(F.col("GSN_FLAG").isNotNull() & (F.col("GSN_FLAG") != ""), 1).otherwise(0).alias("is_GSN"),
        F.when(F.col("HCN_CRN").contains("HCN"), 1).otherwise(0).alias("is_HCN"),
        F.when(F.col("HCN_CRN").contains("CRN"), 1).otherwise(0).alias("is_CRN"),
    )
    .agg(F.sum("is_GSN").alias("GSN"),
         F.sum("is_HCN").alias("HCN"),
         F.sum("is_CRN").alias("CRN"))
)

# 南半球站点数
southern_hemisphere = stations_df.filter(F.col("LATITUDE") < 0).count()

# 美国属地（国家名中包含 United States 但不等于 United States）
us_territories = stations_enriched.filter(
    (F.col("COUNTRY_NAME").contains("United States")) & (F.col("COUNTRY_NAME") != "United States")
).count()

# 按国家与州统计并保存
by_country = (stations_enriched.groupBy("COUNTRY_CODE", "COUNTRY_NAME").count())
by_state = (stations_enriched.filter(F.col("STATE").isNotNull())
            .groupBy("STATE","STATE_NAME").count())

by_country.write.mode("overwrite").parquet(USER_ROOT + "stations_by_country.parquet")
by_state.write.mode("overwrite").parquet(USER_ROOT + "stations_by_state.parquet")

(total_stations, active_2025, network_counts.collect(), southern_hemisphere, us_territories)


## 2.2 Q2: UDF 计算地理距离（Haversine）并在 NZ 站点两两配对

**要点**：  
- 仅对 **新西兰** 站点对子做 pairwise 计算（子集更高效）。  
- 使用 **Haversine formula** 计算球面距离（单位公里）。  
- 找出最近两站。

In [None]:
# 选取新西兰（New Zealand）站点
nz_stations = stations_enriched.filter(F.col("COUNTRY_NAME").contains("New Zealand"))                                .select("ID","NAME","LATITUDE","LONGITUDE")

# 产生两两组合（SELF CROSS JOIN，注意规模；如需进一步优化，可采样或基于网格索引）
left = nz_stations.select(
    F.col("ID").alias("ID_A"),
    F.col("NAME").alias("NAME_A"),
    F.col("LATITUDE").alias("LAT_A"),
    F.col("LONGITUDE").alias("LON_A"),
)
right = nz_stations.select(
    F.col("ID").alias("ID_B"),
    F.col("NAME").alias("NAME_B"),
    F.col("LATITUDE").alias("LAT_B"),
    F.col("LONGITUDE").alias("LON_B"),
)

pairs = left.crossJoin(right).filter(F.col("ID_A") < F.col("ID_B"))

# Haversine UDF
from math import radians, sin, cos, asin, sqrt

def haversine_km(lat1, lon1, lat2, lon2):
    R = 6371.0088  # mean Earth radius in km
    phi1, phi2 = radians(lat1), radians(lat2)
    dphi = radians(lat2 - lat1)
    dlambda = radians(lon2 - lon1)
    a = sin(dphi/2)**2 + cos(phi1)*cos(phi2)*sin(dlambda/2)**2
    c = 2*asin(sqrt(a))
    return float(R*c)

from pyspark.sql.functions import udf
haversine_udf = udf(haversine_km, T.DoubleType())

pairs = pairs.withColumn("DIST_KM", haversine_udf("LAT_A","LON_A","LAT_B","LON_B"))

closest_pair = pairs.orderBy(F.col("DIST_KM").asc()).limit(1)
closest_pair.show(truncate=False)


## 2.3 Q3: Core elements 统计与 TMAX 无配对 TMIN 数量

**要点**：  
- Core elements = {TMAX, TMIN, PRCP, SNOW, SNWD}。  
- 统计各元素观测数；统计无配对（TMAX 且同站同日无 TMIN）。

In [None]:
# 仅核心要素
core_elems = ["TMAX","TMIN","PRCP","SNOW","SNWD"]
daily_core = daily_df.filter(F.col("ELEMENT").isin(core_elems))

# 各元素观测数
elem_counts = daily_core.groupBy("ELEMENT").count()
elem_counts.show()

# TMAX 无配对 TMIN：基于（ID, DATE）
tmax = daily_df.filter(F.col("ELEMENT")=="TMAX").select(F.col("ID").alias("ID_T"), F.col("DATE").alias("DATE_T"))
tmin = daily_df.filter(F.col("ELEMENT")=="TMIN").select(F.col("ID").alias("ID_N"), F.col("DATE").alias("DATE_N"))

tmax_no_tmin = (tmax.join(tmin, (tmax.ID_T==tmin.ID_N) & (tmax.DATE_T==tmin.DATE_N), how="left_anti"))
missing_pairs_count = tmax_no_tmin.count()

# 参与这些观测的唯一站点数
unique_stations_missing = tmax_no_tmin.select("ID_T").distinct().count()

(missing_pairs_count, unique_stations_missing)


# 3. Visualizations（可视化）

> 对照 Assignment 1 – Visualization Q1–Q2。先在 Spark 侧聚合，再 `.toPandas()` 绘图。

## 3.1 Q1: New Zealand — TMIN/TMAX 时间序列（站点子图 + 全国平均）

**流程**：  
1) 过滤 NZ 站点 ID → 过滤 daily 中 TMIN/TMAX → 选择合适的时间聚合（如月平均）。  
2) 站点级别：每站一幅子图（subplot）。  
3) 全国：合并后作一幅大图。

In [None]:
# 获取 NZ 站点 ID 集合
nz_ids_df = nz_stations.select("ID").distinct()
daily_nz_tm = (daily_df
    .join(nz_ids_df, on="ID", how="inner")
    .filter(F.col("ELEMENT").isin("TMIN","TMAX"))
)

# 选择月平均作为平滑级别
daily_nz_tm = daily_nz_tm.withColumn("YEAR", F.year("DATE")).withColumn("MONTH", F.month("DATE"))
monthly_nz = (daily_nz_tm
    .groupBy("ID","ELEMENT","YEAR","MONTH")
    .agg(F.avg("VALUE").alias("AVG_VALUE"))
)

# 收集为 Pandas 后绘图（注意规模控制）
pdf = monthly_nz.toPandas()

# TODO: 使用 matplotlib/plotly 生成 subplot（每站一子图）与全国平均曲线。
# 注意：VALUE 单位可能需换算/标注；缺口（缺失月份）需显式显示。


## 3.2 Q2: 全球年度降水（PRCP）— 国家级 Choropleth（2024）

**流程**：  
1) 从 daily 过滤 PRCP → 按国家+年份聚合 → 计算年度平均日降水。  
2) 保存统计表到 `USER_ROOT`。  
3) 与 `geopandas` 或 `plotly` 的国家名称匹配，绘制 2024 年 choropleth。  


In [None]:
# 计算年度国家平均日降水（示例：2024 年）
year_target = 2024
daily_prcp = daily_df.filter(F.col("ELEMENT")=="PRCP").withColumn("YEAR", F.year("DATE"))

# 将站点与国家 join（使用 enriched_stations）
daily_prcp_country = (daily_prcp
    .join(stations_enriched.select("ID","COUNTRY_CODE","COUNTRY_NAME"), on="ID", how="left")
    .filter(F.col("YEAR")==year_target)
)

prcp_by_country_year = (daily_prcp_country
    .groupBy("COUNTRY_CODE","COUNTRY_NAME","YEAR")
    .agg(F.avg("VALUE").alias("AVG_DAILY_PRCP"))
)

# 保存结果
(prcp_by_country_year
 .write.mode("overwrite")
 .parquet(USER_ROOT + f"prcp_by_country_year_{year_target}.parquet"))


In [None]:
# TODO（可视化）：
# 1) prcp_by_country_year_{year}.parquet 读取为 Pandas
# 2) 与自然地理国家边界（geopandas 自带或 naturalearth_lowres）匹配
# 3) 绘制 choropleth（注意：投影选择、配色、缺失国家处理、异常值注记）


# 4. 报告写作（Writing）与导出（Export）

- 在每个问题完成后，记录：**Methodology（方法）** → **Results（结果）** → **Reasoning（解释）**。  
- 将表格和图形导出到 supplementary material（zip）。  
- 在报告正文中引用图表（Figure X / Table Y）并说明含义。


In [None]:
# 可选：将关键统计导出为 CSV 便于在报告中引用
# 示例：elem_counts.toPandas().to_csv('/mnt/data/core_element_counts.csv', index=False)



---

## 结束（Finish）

- 请确保：所有路径正确、输出已写入 `USER_ROOT`、Notebook 无报错。  
- 提交：报告 PDF（3,000–5,000 字）+ supplementary zip（代码、图、脚本；不包含数据输出）。

> Good luck! 🚀
