# Insitu reference network

## Import packages

In [None]:
import os

import cdsapi
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy.stats
import xarray as xr
from c3s_eqc_automatic_quality_control import download

plt.style.use("seaborn-v0_8-notebook")

## Define Parameters

In [None]:
# Time period
start = "2006-05"
stop = "2020-03"

# Stations
stations = ["BAR", "SOD", "NYA"]  # Use None to analyse all stations
assert isinstance(stations, list | None)

# Directory for csv files
csv_dir = "./csv_files"

# CDS credentials
os.environ["CDSAPI_RC"] = os.path.expanduser("~/ciardini_virginia/.cdsapirc")

## Define request

In [None]:
collection_id = "insitu-observations-gruan-reference-network"
request = {
    "version": "1_0_0",
    "variable": [
        "air_temperature",
        "relative_humidity",
        "air_pressure",
        "altitude",
        "water_vapour_volume_mixing_ratio",
    ],
    "data_format": "netcdf",
}

client = cdsapi.Client()
requests = []
for date in pd.date_range(start, stop, freq="1MS"):
    time_request = {"year": date.strftime("%Y"), "month": date.strftime("%m")}
    time_request["day"] = client.client.apply_constraints(
        collection_id, request | time_request
    )["day"]
    if time_request["day"]:
        requests.append(request | time_request)

## Functions to cache

In [None]:
def _reorganize_dataset(ds):
    # Rename
    (varname,) = set(ds["observed_variable"].values)
    ds = ds.rename(observation_value=str(varname)).drop_vars("observed_variable")
    ds = ds.rename(
        {
            var: "_".join([varname, var.replace("_value", "")])
            for var in ds.data_vars
            if var.startswith("uncertainty")
        }
    )
    # Update attrs
    for var, da in ds.data_vars.items():
        match var:
            case "pressure":
                da.attrs["long_name"] = "Pressure"
            case "air_temperature":
                da.attrs["long_name"] = "Temperature"
            case "altitude":
                da.attrs["long_name"] = "Altitude"
            case "relative_humidity":
                da.attrs["long_name"] = "Relative"
            case "water_vapour_mixing_ratio":
                da.attrs["long_name"] = "Mixing"
        for string in ("units", "type"):
            if string in var:
                ds = ds.drop_vars(var)
                (value,) = set(da.values)
                attrs_var = varname if var == string else var.replace("_" + string, "")
                ds[attrs_var].attrs[string] = value
    return ds


def reorganize_dataset(ds):
    for var, da in ds.data_vars.items():
        if np.issubdtype(da.dtype, np.bytes_):
            ds[var].values = np.char.decode(da.values, "utf-8")

    if not ds.sizes["index"]:
        return ds

    datasets = []
    for var, ds in ds.groupby("observed_variable"):
        datasets.append(_reorganize_dataset(ds))
    with xr.set_options(use_new_combine_kwarg_defaults=True):
        return xr.merge(datasets)


def compute_specific_humidity_from_water_vapour_mixing_ratio(
    water_vapour_mixing_ratio,
    molar_mass_water=18.01528,
    molar_mass_dry_air=28.9647,
):
    specific_humidity = (
        (molar_mass_water * water_vapour_mixing_ratio)
        / (molar_mass_dry_air + molar_mass_water * water_vapour_mixing_ratio)
        * 1000
    )
    specific_humidity.attrs = {"long_name": "Specific Humidity", "units": "g/kg"}
    return specific_humidity


def compute_integrated_water_vapour(specific_humidity):
    specific_humidity = specific_humidity / 1.0e3  # g/kg → kg/kg
    delta_altitude = specific_humidity["altitude"].diff("altitude").fillna(0)  # m

    integrated_water_vapour = (specific_humidity * delta_altitude).sum("altitude")
    integrated_water_vapour.attrs = {
        "long_name": "Integrated Water Vapour",
        "units": "kg/m²",
    }
    return integrated_water_vapour


def compute_insitu_profiles(ds):
    ds = reorganize_dataset(ds)

    # Add variables
    ds["specific_humidity"] = compute_specific_humidity_from_water_vapour_mixing_ratio(
        ds["water_vapour_mixing_ratio"]
    )
    ds["time"] = ("index", pd.to_datetime(ds["report_timestamp"]).values)

    # Compute profiles
    subset = ["air_temperature", "relative_humidity", "specific_humidity", "altitude"]
    profiles = []
    for station, ds_station in ds.groupby("primary_station_id"):
        for time, profile in ds_station.groupby("time"):
            profile = profile.swap_dims(index="altitude")[subset]
            profile = profile.sortby("altitude")
            profile = profile.dropna("altitude", how="any", subset=subset)
            profile = profile.drop_duplicates("altitude")
            if (profile["altitude"].diff("altitude") > 2_000).any():
                continue
            profile = profile.interp(altitude=range(50, 30_001, 50))
            profile = profile.expand_dims(time=[time])
            profile = profile.assign_coords(station=("time", [station]))
            profiles.append(profile)
    ds = xr.concat(profiles, "time")

    # Add integrated water vapour
    ds["integrated_water_vapour"] = compute_integrated_water_vapour(
        ds["specific_humidity"]
    )
    return ds

## Download and transform

In [None]:
ds = download.download_and_transform(
    collection_id,
    requests,
    chunks={"year": 1, "month": 1},
    transform_func=compute_insitu_profiles,
    cached_open_mfdataset_kwargs={"concat_dim": "time", "combine": "nested"},
)
if stations is not None:
    ds = ds.where(ds["station"].isin(stations).compute(), drop=True)
ds = ds.compute()

## Seasonal profiles

In [None]:
season_colors = {
    "DJF": "tab:blue",
    "MAM": "tab:green",
    "JJA": "tab:orange",
    "SON": "tab:red",
}

# Setup figure
fig, axs = plt.subplots(1, len(set(ds["station"].values)), sharey=True, figsize=(18, 6))
for ax, (station, ds_station) in zip(axs, ds.groupby("station")):
    # Compute seasonal mean
    da = ds_station["air_temperature"] - 273.15  # °C
    grouped = da.groupby("time.season")
    ds_seasonal = xr.merge([grouped.mean().rename("mean"), grouped.std().rename("std")])
    for season, color in season_colors.items():
        # Plot
        ds_season = ds_seasonal.sel(season=season)
        ax.plot(
            ds_season["mean"], ds_season["altitude"], label=f"{season}", color=color
        )
        ax.fill_betweenx(
            ds_season["altitude"],
            ds_season["mean"] - ds_season["std"],
            ds_season["mean"] + ds_season["std"],
            color=color,
            alpha=0.3,
        )
    # Ax settings
    ax.set_title(f"Station: {station}")
    ax.set_xlabel("Temperature [°C]")
    ax.grid()
    ax.legend(title="Season")
# Figure settings
axs[0].set_ylabel("Altitude [m]")
_ = fig.suptitle("Seasonal profiles ±1σ")

## Monthly mean temperature

In [None]:
fig, axs = plt.subplots(1, len(set(ds["station"].values)), sharey=True, figsize=(18, 6))
dataarrays = {
    station: da.resample(time="1MS").mean()
    for station, da in ds["air_temperature"].groupby("station")
}
vmin = min(da.min().values for da in dataarrays.values())
vmax = max(da.max().values for da in dataarrays.values())
for ax, (station, da_station) in zip(axs, dataarrays.items()):
    da_station = da_station.dropna("time", how="all")
    pcm = da_station.plot(
        x="time", cmap="coolwarm", vmin=vmin, vmax=vmax, ax=ax, add_colorbar=False
    )
    ax.set_title(f"Station: {station}")
    ax.xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m"))
    ax.tick_params(axis="x", rotation=45)
fig.colorbar(pcm, ax=axs, orientation="vertical", label="Temperature [K]")
_ = fig.suptitle("Monthly Mean Temperature")

## Temperature monthly trend

In [None]:
def linregress_slope_significant(x, y):
    mask = np.isfinite(x) & np.isfinite(y)
    if mask.sum() < 2:
        return np.nan, np.nan
    res = scipy.stats.linregress(x[mask], y[mask])
    return res.slope, np.abs(res.slope) > res.stderr


datasets = []
for station, da_station in ds["air_temperature"].groupby("station"):
    da = da_station.resample(time="1MS").mean("time") - 273.15
    for month, da_month in da.groupby("time.month"):
        slope, significant = xr.apply_ufunc(
            linregress_slope_significant,
            da_month["time"].dt.year,
            da_month,
            input_core_dims=[["time"], ["time"]],
            output_core_dims=[[], []],
            vectorize=True,
            dask="parallelized",
            output_dtypes=[float, float],
        )
        ds_slope = xr.Dataset({"slope": slope, "significant": significant})
        datasets.append(ds_slope.expand_dims(month=[month], station=[station]))
ds_slope = xr.combine_by_coords(datasets)
ds_slope["altitude"].attrs = ds["altitude"].attrs
ds_slope["slope"].attrs = {"long_name": "Temperature trend", "units": "°C/year"}

In [None]:
facet = ds_slope["slope"].plot(
    col="station",
    x="month",
    robust=True,
    figsize=(18, 6),
    cmap="coolwarm",
)
for ax, sel_dict in zip(facet.axs.flatten(), facet.name_dicts.flatten()):
    ds_station = ds_slope.sel(sel_dict)
    x = ds_station["month"]
    y = ds_station["altitude"]
    z = np.ma.masked_array(
        ds_station["slope"].transpose(),
        mask=(ds_station["significant"] != 1).transpose(),
    )
    ax.pcolor(x, y, z, hatch="///", alpha=0)
_ = fig.suptitle("Temperature Monthly Trend", fontsize=16)

## Integrated water vapour

In [None]:
fig, axs = plt.subplots(
    len(set(ds["station"].values)), 1, sharex=True, figsize=(10, 10)
)
for ax, (station, ds_station) in zip(axs, ds.groupby("station")):
    da = ds_station["integrated_water_vapour"].resample(time="1MS").mean()
    da_no_nan = da.assign_coords(idx=("time", range(da.sizes["time"]))).dropna("time")
    res = scipy.stats.linregress(da_no_nan["idx"], da_no_nan)
    print(f"{station}: Trend = {res.slope:.3f} kg/m²/month, p = {res.pvalue:.3f}")
    da.plot(ax=ax, label="IWV")
    ax.plot(
        da["time"],
        np.arange(da.sizes["time"]) * res.slope + res.intercept,
        "r--",
        label="Trend",
    )
    ax.grid()
    ax.legend()
    ax.set_xlabel("")
    ax.set_title(f"IWV – {station}")
_ = ax.set_xlabel("Time")

## Specific humidity and integrate water vapour

In [None]:
fig, axs = plt.subplots(nrows=3, ncols=2, figsize=(16, 12), constrained_layout=True)
for i, (station, ds_station) in enumerate(ds.groupby("station")):
    ds_station = ds_station.resample(time="1MS").mean()
    ds_station["specific_humidity"].plot(ax=axs[i, 0], x="time")
    axs[i, 0].set_title(f"{station} – WVMR")

    ds_station["integrated_water_vapour"].plot(ax=axs[i, 1], x="time", marker="o")
    axs[i, 0].set_title(f"{station} – IWV")
    axs[i, 1].grid()
_ = fig.suptitle("Spec hum e IWV")