<h2 style="color:Black;">Module Code: CSMPR - MSc Project</h2>

<h2 style="color:Black;">Project Title: Predictive Modelling of Extreme Rainfalls</h2>

<h3 style="color:Black;">Student Number: <span style="color:green;">32822955</span></h3>

<h3 style="color:Black;">Acknowledgments: Supervisor - Professor. Atta Badii, Researcher - Kieran Hunt </h3>

<hr style= "border:2px solid black"> </hr>

## Table of Contents
[1. Data Extraction](#Overview)
  - [1.1 Setup and helpers](#Setup_and_helpers)
  - [1.2 Download Section](#Download_Section)
    
[2. Overview of the data](#Overview_of_the_data)
  - [2.1 Statistical Report](#Statistics)
  - [2.2 Visualizations](#Visualizations)


## Data Extraction

Goal: Download ERA5 single and pressure levels (hourly, monthly) and IMERG Half-Hourly (30 min) for 2015.
- IMERG path: raw/imerg/YYYY/MM/imerg_<original>.HDF5
- ERA5: per variable per month
- IMERG: per month
- Region: dict(north=62.0, south=49.0, west=-13.0, east=3.5)
- Root path: D:\\extreme_rainfalls\\
- Set up Copernicus API: create ~/.cdsapirc with your key.
- Set up NASA Earthdata: create ~/.netrc with credentials for urs.earthdata.nasa.gov.

### 1.1 Setup and helpers

#### Environment Setup
- To Setup all packages in anaconda, conda install environment.yml in anaconda prompt
- To Setup all packages in windows, pip install erequirements.txt in notebook

In [1]:
import sys, os, subprocess, shutil, site, re, time, glob
from pathlib import Path
import traceback
from __future__ import annotations
import json, sys, math, errno
from pathlib import Path
from typing import Dict, List, Tuple
from datetime import datetime, timedelta
from urllib.parse import urljoin

In [2]:
# Imports and config
import os, time, json, calendar, requests
from pathlib import Path
from typing import List, Dict
from datetime import datetime, timedelta
from tqdm import tqdm

try:
    import cdsapi
except Exception:
    print("Install cdsapi if you plan to download ERA5: pip install cdsapi")

# >>> FIX: make BASE_DIR a Path, not a str (and keep it portable)
# Use your Windows path if it exists, otherwise fall back to a home-based folder.
win_base = Path(r"D:\extreme_rainfalls")
BASE_DIR = win_base if win_base.exists() else (Path.home() / "extreme_rainfalls")
BASE_DIR = BASE_DIR.resolve()

RAW_DIR = (BASE_DIR / "data" / "raw")
RAW_DIR.mkdir(parents=True, exist_ok=True)  # optional: ensure it exists

YEARS = list(range(2015, 2025))

# UK window
REGION = dict(north=62.0, south=49.0, west=-13.0, east=3.5)

ERA5_SINGLE_VARS = [
    "total_precipitation", "convective_precipitation", "2m_temperature",
    "10m_u_component_of_wind", "10m_v_component_of_wind",
    "surface_pressure", "total_column_water_vapour",
    "total_column_cloud_liquid_water", "boundary_layer_height", "mean_sea_level_pressure"
]
ERA5_PL_LEVELS = ["850","700","500","300"]
ERA5_PL_VARS = ["specific_humidity","u_component_of_wind","v_component_of_wind",
                "geopotential","relative_humidity","temperature"]

IMERG_VARS = [
    "precipitationCal",
    "precipitationUncal",
    "precipitationQualityIndex",
    "randomError",
    "probabilityLiquidPrecipitation"
]

# Subregions for credible evaluation and weighting
SUBREGIONS = {
    "West_Highlands": dict(north=58.0, south=56.0, west=-6.5, east=-4.0),
    "Lake_District":  dict(north=55.0, south=54.2, west=-3.5, east=-2.7),
    "Snowdonia":      dict(north=53.3, south=52.7, west=-4.2, east=-3.3),
}

imerg_months = list(range(1, 13))  

def dir_era5_single(y):  return RAW_DIR / "era5_single"   / str(y)
def dir_era5_pl(y):      return RAW_DIR / "era5_pressure" / str(y)
def dir_imerg(y):        return RAW_DIR / "imerg"         / str(y)


for y in YEARS:
    print(
        y,
        "single:", "OK" if dir_era5_single(y).exists() else "MISSING",
        "| pl:",   "OK" if dir_era5_pl(y).exists()     else "MISSING",
        "| imerg:","OK" if dir_imerg(y).exists()        else "MISSING"
    )

STATUS_DIR = Path(BASE_DIR) / "data" / "_status"
STATUS_DIR.mkdir(parents=True, exist_ok=True)
STATUS_FILE = STATUS_DIR / "download_status.json"
print("Raw data dir:", RAW_DIR)
print("Status file:", STATUS_FILE)

2015 single: OK | pl: OK | imerg: OK
2016 single: OK | pl: OK | imerg: OK
2017 single: OK | pl: OK | imerg: OK
2018 single: OK | pl: OK | imerg: OK
2019 single: OK | pl: OK | imerg: OK
2020 single: OK | pl: OK | imerg: OK
2021 single: OK | pl: OK | imerg: OK
2022 single: OK | pl: OK | imerg: OK
2023 single: OK | pl: OK | imerg: OK
2024 single: OK | pl: OK | imerg: OK
Raw data dir: D:\extreme_rainfalls\data\raw
Status file: D:\extreme_rainfalls\data\_status\download_status.json


- The above results explains data from 2015 to 2015 is downloaded in the raw directory and is verified as available

In [3]:
# Status helpers
import json
from datetime import datetime

def load_status(path: Path):
    if path.exists():
        try:
            return json.loads(path.read_text())
        except Exception:
            pass
    return {}

def save_status(path: Path, status: dict):
    path.write_text(json.dumps(status, indent=2))

def mark_status(status: dict, group: str, key: str, value: str):
    """
    Mark a (group, key) entry with a state and UTC timestamp.
    States: 'queued' | 'downloading' | 'done' | 'skipped' | 'error'
    Example keys:
      group='era5_single', key='total_precipitation_201501'
      group='era5_pl',     key='specific_humidity_L850_201501'
      group='imerg_month', key='201501'
      group='imerg_file',  key='<original HDF5 name>'
    """
    status.setdefault(group, {})
    status[group][key] = {"state": value, "timestamp": datetime.utcnow().isoformat() + "Z"}
    return status

def pretty_print_status(status: dict):
    import pandas as pd
    rows = []
    for group, items in status.items():
        for k, v in items.items():
            rows.append({"Group": group, "Item": k, "State": v.get("state"), "Updated": v.get("timestamp")})
    if rows:
        df = pd.DataFrame(rows).sort_values(["Group","Item"])
        try:
            from caas_jupyter_tools import display_dataframe_to_user
            display_dataframe_to_user("Download Status", df)
        except Exception:
            print(df)
    else:
        print("No status yet.")

In [4]:
def _area_from_region(region: dict):
    return [region["north"], region["west"], region["south"], region["east"]]

def download_era5_single_by_variable(year, variables, region, out_dir, status_path):
    """
    Download ERA5 single-level variables for multiple years and months.
    
    Args:
        years       (iterable): List or range of years (e.g., range(2016, 2022)).
        variables   (list)    : ERA5 single-level variables.
        region      (dict)    : Bounding box {north, south, west, east}.
        out_dir     (Path)    : Output directory root.
        status_path (Path)    : Path to status JSON file.
    """
    status = load_status(status_path)
    area   = _area_from_region(region)
    times  = [f"{h:02d}:00" for h in range(24)]
    c      = cdsapi.Client()

    for var in tqdm(variables, desc="ERA5 single vars"):        
            for month in range(1, 13):
                mstr = f"{month:02d}"
                key  = f"{var}_{year}_{mstr}"
                year_dir = out_dir / str(year)
                year_dir.mkdir(parents=True, exist_ok=True)
                target = year_dir / f"era5_single_{var}_{year}{mstr}.nc"
            

                if target.exists():
                    status = mark_status(status, 'era5_single', key, 'skipped')
                    save_status(status_path, status)
                    continue

                status = mark_status(status, 'era5_single', key, 'downloading')
                save_status(status_path, status)

                days = [f"{d:02d}" for d in range(1, calendar.monthrange(year, month)[1] + 1)]
                try:
                    r = c.retrieve(
                        "reanalysis-era5-single-levels",
                        {
                            "product_type": "reanalysis",
                            "variable": [var],
                            "year": str(year),
                            "month": mstr,
                            "day": days,
                            "time": times,
                            "area": area,
                            "format": "netcdf",
                        }
                    )
                    r.download(str(target))
                    status = mark_status(status, 'era5_single', key, 'done')
                except Exception as e:
                    print(f"Error for {key}:", e)
                    status = mark_status(status, 'era5_single', key, 'error')

                save_status(status_path, status)

    return load_status(status_path)


def download_era5_pl_by_var_level(year, variables, levels, region, out_dir, status_path):
    """
    Download ERA5 pressure-level variables for multiple years, months, and levels.

    Args:
        years       (iterable): List or range of years (e.g., range(2016, 2022)).
        variables   (list)    : ERA5 pressure-level variables.
        levels      (list)    : Pressure levels in hPa (e.g., ["925", "850", "700"]).
        region      (dict)    : Bounding box {north, south, west, east}.
        out_dir     (Path)    : Output directory root.
        status_path (Path)    : Path to status JSON file.
    """
    status = load_status(status_path)
    area   = _area_from_region(region)
    times  = [f"{h:02d}:00" for h in range(24)]
    c      = cdsapi.Client()

    for var in tqdm(variables, desc="ERA5 PL vars"):
        for lev in tqdm(levels, leave=False, desc=f"{var} levels"):           
                for month in range(1, 13):
                    mstr = f"{month:02d}"
                    key  = f"{var}_L{lev}_{year}_{mstr}"
                    year_dir = out_dir / str(year)
                    year_dir.mkdir(parents=True, exist_ok=True)
                    target = year_dir / f"era5_pl_{var}_L{lev}_{year}{mstr}.nc"

                    if target.exists():
                        status = mark_status(status, 'era5_pl', key, 'skipped')
                        save_status(status_path, status)
                        continue

                    status = mark_status(status, 'era5_pl', key, 'downloading')
                    save_status(status_path, status)

                    days = [f"{d:02d}" for d in range(1, calendar.monthrange(year, month)[1] + 1)]
                    try:
                        r = c.retrieve(
                            "reanalysis-era5-pressure-levels",
                            {
                                "product_type": "reanalysis",
                                "variable": [var],
                                "pressure_level": [lev],
                                "year": str(year),
                                "month": mstr,
                                "day": days,
                                "time": times,
                                "area": area,
                                "format": "netcdf",
                            }
                        )
                        r.download(str(target))
                        status = mark_status(status, 'era5_pl', key, 'done')
                    except Exception as e:
                        print(f"Error for {key}:", e)
                        status = mark_status(status, 'era5_pl', key, 'error')

                    save_status(status_path, status)

    return load_status(status_path)

In [5]:
# -------------------- IMERG via GES DISC Subset Service --------------------
def _earthdata_session():
    """
    Return a requests.Session that authenticates via ~/.netrc for GES DISC.
    """
    import netrc, requests
    s = requests.Session()
    try:
        _ = netrc.netrc()
    except FileNotFoundError:
        print("~/.netrc missing. Add Earthdata creds.")
    s.trust_env = True
    try:
        s.get("https://gpm1.gesdisc.eosdis.nasa.gov/data/", timeout=30)
    except Exception:
        pass
    return s

def list_imerg_files(year, month):
    """
    Query NASA CMR API to get all valid IMERG V07 half-hourly granules for a given month.
    Returns a list of full download URLs (with orbit numbers).
    """
    import requests
    from datetime import datetime
    from calendar import monthrange

    start = datetime(year, month, 1)
    end = datetime(year, month, monthrange(year, month)[1])

    temporal = f"{start:%Y-%m-%dT00:00:00Z},{end:%Y-%m-%dT23:59:59Z}"
    url = (
        "https://cmr.earthdata.nasa.gov/search/granules.json"
        "?short_name=GPM_3IMERGHH"
        "&version=07"
        f"&temporal={temporal}"
        "&page_size=2000"
    )

    r = requests.get(url)
    r.raise_for_status()
    data = r.json()

    files = []
    for entry in data.get("feed", {}).get("entry", []):
        for link in entry.get("links", []):
            if "href" in link and link["href"].endswith(".HDF5"):
                files.append(link["href"])
    return files



def validate_imerg_file(nc_path, expected_vars=IMERG_VARS):
    """
    Check if all expected IMERG variables exist and contain data.
    """
    try:
        import xarray as xr
        ds = xr.open_dataset(nc_path, engine="netcdf4")  # ✅ force backend
        missing = [v for v in expected_vars if v not in ds.variables]
        if missing:
            print(f"[WARN] {nc_path.name}: missing {missing}")
            return False
        for v in expected_vars:
            if ds[v].size == 0:
                print(f"[WARN] {nc_path.name}: variable {v} has no data")
                return False
        print(f"[OK] {nc_path.name}: all variables present with data")
        return True
    except Exception as e:
        print(f"[ERROR] {nc_path.name}: {e}")
        return False

        
def imerg_hhr_file_urls_for_day(date):
    """
    Build all 30-min IMERG V07 granule URLs for a given day.
    """

    base = "https://gpm1.gesdisc.eosdis.nasa.gov/data/GPM_L3/IMERG/HH"
    yyyy = date.strftime("%Y")
    mm = date.strftime("%m")
    yyyymmdd = date.strftime("%Y%m%d")
    urls = []
    for hh in range(24):
        for mm_ in (0, 30):
            start = f"S{hh:02d}{mm_:02d}00"
            end_dt = (datetime(date.year, date.month, date.day, hh, mm_)
                      + timedelta(minutes=30) - timedelta(seconds=1))
            end = f"E{end_dt.strftime('%H%M%S')}"
            urls.append(
                f"{base}/{yyyy}/{mm}/3B-HHR.MS.MRG.3IMERG.{yyyymmdd}-{start}-{end}.0000.V07B.HDF5"
            )
    return urls


def _subset_params_for_day(date, variables, region, label, bbox_order="swne"):
    """
    Construct parameters for GES DISC OTF Subset Service.
    """
    filenames = [u.replace("https://gpm1.gesdisc.eosdis.nasa.gov", "")
                 for u in imerg_hhr_file_urls_for_day(date)]
    params = []
    for f in filenames:
        if not f.startswith("/data"):
            f = "/data" + f
        params.append(("FILENAME", f))

    if bbox_order == "swne":
        bbox = f"{region['south']},{region['west']},{region['north']},{region['east']}"
    else:
        bbox = f"{region['north']},{region['west']},{region['south']},{region['east']}"

    params.extend([
        ("FORMAT", "NetCDF4"),
        ("BBOX", bbox),
        ("VARIABLES", ",".join(variables)),
        ("LABEL", f"{label}.nc"),
    ])
    return params


def _stream_subset_request(session, params, out_nc, timeout=1800):
    """
    Make the subset request and save output NetCDF to disk.
    Returns (ok: bool, msg: str).
    """
    import requests
    url = "https://gpm1.gesdisc.eosdis.nasa.gov/daac-bin/OTF/HTTP_services.cgi"
    headers = {"User-Agent": "subset-client/0.1"}
    with session.get(url, params=params, stream=True, timeout=timeout, headers=headers) as r:
        if r.status_code != 200:
            return False, f"HTTP {r.status_code}"
        with open(out_nc, "wb") as f:
            for chunk in r.iter_content(1024 * 1024):
                if chunk:
                    f.write(chunk)
        return True, "ok"

def download_imerg_raw_day(date, out_dir, status_path):
    """
    Download all IMERG V07 half-hourly HDF5 granules for a given day.
    Saves them as:
      raw/imerg/YYYY/MM/imerg_YYYYMMDD_HHMM.HDF5
    """
    status = load_status(status_path)
    day_key = date.strftime("%Y%m%d")
    out_dir.mkdir(parents=True, exist_ok=True)

    urls = imerg_hhr_file_urls_for_day(date)
    s = _earthdata_session()

    for url in urls:
        fname = url.split("/")[-1]
        out_file = out_dir / fname

        if out_file.exists():
            status = mark_status(status, 'imerg_raw', fname, 'skipped')
            continue

        r = s.get(url, stream=True, timeout=600)
        if r.status_code == 200:
            with open(out_file, "wb") as f:
                for chunk in r.iter_content(1024*1024):
                    if chunk:
                        f.write(chunk)
            status = mark_status(status, 'imerg_raw', fname, 'done')
        else:
            print(f"[ERROR] {url} → HTTP {r.status_code}")
            status = mark_status(status, 'imerg_raw', fname, 'error')

    save_status(status_path, status)


def download_imerg_raw_month(year, month, out_dir, status_path):
    """
    Download all IMERG V07 half-hourly HDF5 granules for a given month
    using CMR-discovered URLs.
    """
    status = load_status(status_path)
    out_dir.mkdir(parents=True, exist_ok=True)

    files = list_imerg_files(year, month)
    if not files:
        print(f"[WARN] No files found for {year}-{month:02d}")
        return

    s = _earthdata_session()

    for url in files:
        fname = url.split("/")[-1]
        out_file = out_dir / fname

        if out_file.exists():
            status = mark_status(status, 'imerg_raw', fname, 'skipped')
            continue

        r = s.get(url, stream=True, timeout=600)
        if r.status_code == 200:
            with open(out_file, "wb") as f:
                for chunk in r.iter_content(1024*1024):
                    if chunk:
                        f.write(chunk)
            status = mark_status(status, 'imerg_raw', fname, 'done')
            print(f"[OK] {fname}")
        else:
            print(f"[ERROR] {url} → HTTP {r.status_code}")
            status = mark_status(status, 'imerg_raw', fname, 'error')

    save_status(status_path, status)


### 1.2 Download Section

In [6]:
# ===========================================================
# Download Section (DISABLED because data is already downloaded)
# ===========================================================

RUN_DOWNLOADS = False   #  change to True ONLY, for re-download

if RUN_DOWNLOADS:
    # ----------------- ERA5 single-level -----------------
    print("\n=== ERA5 single-level downloads ===")
    for y in YEARS:
        out_dir_single = dir_era5_single(y)  # e.g., RAW_DIR/era5_single/YYYY
        out_dir_single.mkdir(parents=True, exist_ok=True)
        download_era5_single_by_variable(
            year=y,
            variables=ERA5_SINGLE_VARS,
            region=REGION,
            out_dir=out_dir_single,
            status_path=STATUS_FILE,
        )   
    pretty_print_status(load_status(STATUS_FILE))    

    # ----------------- ERA5 pressure-level -----------------
    print("\n=== ERA5 pressure-level downloads ===")
    for y in YEARS:
        out_dir_pl = dir_era5_pl(y)  # e.g., RAW_DIR/era5_pressure/YYYY
        out_dir_pl.mkdir(parents=True, exist_ok=True)
        download_era5_pl_by_var_level(
            year=y,
            variables=ERA5_PL_VARS,
            levels=ERA5_PL_LEVELS,
            region=REGION,
            out_dir=out_dir_pl,
            status_path=STATUS_FILE,
        )    
    pretty_print_status(load_status(STATUS_FILE))

    # ----------------- IMERG (raw HDF5 via catalog) -----------------
    print("\n=== IMERG (Raw HDF5 via Catalog) downloads ===")
    for y in YEARS:
        for month in imerg_months:
            out_dir = dir_imerg(y) / f"{month:02d}"   # RAW_DIR/imerg/YYYY/MM
            out_dir.mkdir(parents=True, exist_ok=True)
            download_imerg_raw_month(
                year=y,
                month=month,
                out_dir=out_dir,
                status_path=STATUS_FILE,
            )
    pretty_print_status(load_status(STATUS_FILE))

else:
    print("Downloads skipped — data already available locally.")


Downloads skipped — data already available locally.


# 2. Overview of the data
- We inspect ERA5 single-level, ERA5 pressure-level, and IMERG rainfall datasets.  
- This helps confirm temporal coverage, spatial extent, and variable availability.

### Statistical Report

#### Load File Lists

In [12]:
from pathlib import Path
import xarray as xr

# Define raw data directories (following your style)
ERA5_SINGLE_RAW = RAW_DIR / "era5_single"
ERA5_PL_RAW     = RAW_DIR / "era5_pressure"
IMERG_RAW       = RAW_DIR / "imerg"

# Recursively search all subfolders (year/month)
era5_single_files = sorted(ERA5_SINGLE_RAW.rglob("*.nc"))
era5_pl_files     = sorted(ERA5_PL_RAW.rglob("*.nc"))
imerg_files       = sorted(list(IMERG_RAW.rglob("*.HDF5")) + list(IMERG_RAW.rglob("*.nc4")))

print("ERA5 single files:", len(era5_single_files))
print("ERA5 pressure files:", len(era5_pl_files))
print("IMERG files:", len(imerg_files))



ERA5 single files: 1280
ERA5 pressure files: 3072
IMERG files: 181104


#### Display sample files
- We load one file from each source using `xarray.open_dataset()`.
- To check variables, dimensions, and metadata.

In [13]:

# --- Load sample files (only if available) ---
if era5_single_files:
    ds_single = xr.open_dataset(era5_single_files[0])
    print("ERA5 single sample:\n", ds_single)

if era5_pl_files:
    ds_pl = xr.open_dataset(era5_pl_files[0])
    print("ERA5 pressure sample:\n", ds_pl)

if imerg_files:
    ds_imerg = xr.open_dataset(imerg_files[0])
    print("IMERG sample:\n", ds_imerg)

ERA5 single sample:
 <xarray.Dataset> Size: 11MB
Dimensions:     (valid_time: 744, latitude: 53, longitude: 67)
Coordinates:
    number      int64 8B ...
  * valid_time  (valid_time) datetime64[ns] 6kB 2015-01-01 ... 2015-01-31T23:...
  * latitude    (latitude) float64 424B 62.0 61.75 61.5 ... 49.5 49.25 49.0
  * longitude   (longitude) float64 536B -13.0 -12.75 -12.5 ... 3.0 3.25 3.5
    expver      (valid_time) <U4 12kB ...
Data variables:
    u10         (valid_time, latitude, longitude) float32 11MB ...
Attributes:
    GRIB_centre:             ecmf
    GRIB_centreDescription:  European Centre for Medium-Range Weather Forecasts
    GRIB_subCentre:          0
    Conventions:             CF-1.7
    institution:             European Centre for Medium-Range Weather Forecasts
    history:                 2025-08-26T23:33 GRIB to CDM+CF via cfgrib-0.9.1...
ERA5 pressure sample:
 <xarray.Dataset> Size: 11MB
Dimensions:         (valid_time: 744, pressure_level: 1, latitude: 53,
           

#### Variables and Dimensions
- `ds.data_vars` shows available variables.  
- `ds.dims` tells us spatial and temporal resolution.

In [14]:
print("ERA5 single variables:", list(ds_single.data_vars))
print("ERA5 pressure variables:", list(ds_pl.data_vars))
print("IMERG variables:", list(ds_imerg.data_vars))

print("\nERA5 single dimensions:", ds_single.dims)
print("ERA5 pressure dimensions:", ds_pl.dims)
print("IMERG dimensions:", ds_imerg.dims)


ERA5 single variables: ['u10']
ERA5 pressure variables: ['z']
IMERG variables: []



#### Spatial Coverage
- We confirm latitude/longitude ranges cover the UK domain.  
- This avoids errors if files contain global data.

In [19]:
print("ERA5 single lat/lon:", float(ds_single.latitude.min()), "→", float(ds_single.latitude.max()),
      float(ds_single.longitude.min()), "→", float(ds_single.longitude.max()))

print("ERA5 pressure lat/lon:", float(ds_pl.latitude.min()), "→", float(ds_pl.latitude.max()),
      float(ds_pl.longitude.min()), "→", float(ds_pl.longitude.max()))


ERA5 single lat/lon: 49.0 → 62.0 -13.0 → 3.5
ERA5 pressure lat/lon: 49.0 → 62.0 -13.0 → 3.5


### Summary
- File counts confirm dataset availability.  
- Variables & dimensions confirm correct structure.  
-  lat/lon confirm alignment with project domain. 