# Combination of the HSAF-H60B observation and nowcast scripts

This script combines the previous scripts for downloading and pre-processing HSAF observations, and subsequently creating a nowcast. Unlike the earlier scripts, which included numerous plotting and visualization steps to explain the workflow in detail, this version focuses solely on producing observation and nowcast outputs the same output.

You can specify your preferred settings in the first cell of the script. After that, simply use "Run All," and all parameters will automatically be applied throughout the script. This streamlined approach makes the script more suitable for operational use.

In [53]:
import os
import re
import math
import gzip
import shutil
import warnings
from pathlib import Path
from datetime import datetime, timedelta, date
from ftplib import FTP
import numpy as np
import pandas as pd
import xarray as xr
import geopandas as gpd
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
from matplotlib.colors import BoundaryNorm, ListedColormap
from matplotlib.animation import FuncAnimation
from IPython.display import HTML
import cartopy.crs as ccrs
import cartopy.feature as cfeature
import rioxarray
from pyproj import CRS
from pyresample.geometry import AreaDefinition
try:
    from satpy.readers.core._geos_area import get_area_extent
except ImportError:
    from satpy.readers._geos_area import get_area_extent
from enhanced_steps import EnhancedStepsNowcast
warnings.filterwarnings("ignore")
import os
import re
import math
import gzip
import shutil
import warnings
from pathlib import Path
from datetime import datetime, timedelta, date
from ftplib import FTP
import numpy as np
import pandas as pd
import xarray as xr
import geopandas as gpd
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
from matplotlib.colors import BoundaryNorm, ListedColormap
from matplotlib.animation import FuncAnimation
from IPython.display import HTML
import cartopy.crs as ccrs
import cartopy.feature as cfeature
import rioxarray
from pyproj import CRS
from pyresample.geometry import AreaDefinition
try:
    from satpy.readers.core._geos_area import get_area_extent
except ImportError:
    from satpy.readers._geos_area import get_area_extent
from enhanced_steps import EnhancedStepsNowcast
from PIL import Image, ImageSequence, ImageDraw, ImageFont
warnings.filterwarnings("ignore")


print("All libraries have been successfully imported!")

All libraries have been successfully imported!


## Script Settings and Output Options

To run the script, you have a few configuration options.
- If you set selection_mode to "Current", the script will automatically select the most recent available data.
- If you want to run the script for a historical period, use the "Select_target_data" option.

Additionally, you can choose which outputs to generate: GIF animations, Excel time series, and graphs. These outputs will not be displayed within the script itself, but will be automatically saved in the designated output folder structure.

You can control which outputs are created by setting the corresponding options to True or False according to your preferences.

In [54]:
username = "xxxxxxx"                                                    # Replace with your actual username
password = "xxxxxxx"                                                    # Replace with your actual password

# Selection mode: "Select_target_data" or "Current"

selection_mode = "Select_target_data"  
# selection_mode = "Current"  

if selection_mode == "Current":
    now = datetime.utcnow()  
    minute = (now.minute // 15) * 15                                    # Round down to nearest 15 minutes
    target_datetime = now.replace(minute=minute, second=0, microsecond=0)
    print(target_datetime)

if selection_mode == "Select_target_data":
    target_datetime = datetime(2025, 8, 30, 15, 0)                      # Start point for data selection
    print(target_datetime)

make_gif = True                                                         # True to create a GIF animation, False otherwise
make_timeseries_Excel = True                                            # True to generate an Excel file with the time series
make_graph = True                                                       # True to create plots

n_steps = 10                                                            # Number of steps to process
region = {"lat_min": 4, "lat_max": 16, "lon_min": -6, "lon_max": 3}     # Region of interest (latitude/longitude)
ensemble_number = 2                                                     # Ensemble member number to use

points_of_interest = [
    {"name": "Nouna", "lat": 12.76, "lon": -3.84},                      # Point of interest 1
    {"name": "Kante", "lat": 9.961861, "lon": 1.042944},                # Point of interest 2
]

# Visualization parameters
bounds = [0, 0.5, 2, 5, 10, 15, 25, 40, 100]                            # Colorbar boundaries
colors = ["#ffffff", "#add8e6", "#0000ff", "#00ff00",
          "#ffff00", "#ffa500", "#ff0000", "#ff69b4", "#ff69b4"]        # Colors for each interval

2025-08-30 15:00:00


In [55]:
# Create folders
data_folder = Path("./h60b_data")
raw_folder = data_folder / "raw"
processed_folder = data_folder / "processed"
nowcasting_folder = data_folder / "nowcast"

# Create directories if they do not exist yet
raw_folder.mkdir(parents=True, exist_ok=True)
processed_folder.mkdir(parents=True, exist_ok=True)
Path(nowcasting_folder).mkdir(parents=True, exist_ok=True)

In [56]:
# HSAF FTP server details
ftp_server = "ftphsaf.meteoam.it"
ftp_directory = "./h60B/h60_cur_mon_data/"

def parse_h60b_timestamp(filename):
    try:
        parts = filename.split("_")
        date_str = parts[1]                  # YYYYMMDD
        time_str = parts[2].split(".")[0]    # HHMM
        timestamp = datetime.strptime(f"{date_str}{time_str}", "%Y%m%d%H%M")
        return timestamp
    except:
        return None
    
def list_h60b_files_full():
    try:
        print("Connecting to the HSAF FTP server...")
        with FTP(ftp_server) as ftp:
            # Login with credentials
            ftp.login(username, password)
            print("Connection successful!")
            ftp.cwd(ftp_directory)
            files = []
            ftp.dir(lambda line: files.append(line.split()[-1]))
            h60b_files = [f for f in files if f.startswith("h60")]
            timestamps_files = [(parse_h60b_timestamp(f), f) 
                                for f in h60b_files 
                                if parse_h60b_timestamp(f) is not None]
            
            if not timestamps_files:
                print("No files with valid timestamp found.")
                return []
            
            timestamps_files.sort()
            
            first_file = timestamps_files[0]
            last_file  = timestamps_files[-1]
            print(f"Latest available file: {last_file[1]} ({last_file[0]})")
            return [f[1] for f in timestamps_files]
    
    except Exception as e:
        print(f"Error connecting to FTP: {e}")
        print("Make sure you are registered at https://hsaf.meteoam.it/ and have valid credentials")
        return []
    
all_files = list_h60b_files_full()
timestamps_files = [(parse_h60b_timestamp(f), f) for f in all_files if parse_h60b_timestamp(f)]
timestamps_files.sort()


Connecting to the HSAF FTP server...
Connection successful!
Latest available file: h60_20251010_1115_fdk.nc.gz (2025-10-10 11:15:00)


In [57]:
def select_files_before_date(target_time=None, n_steps=10, timestamps_files=timestamps_files, current=False):
    if not timestamps_files:
        print("No files available for selection.")
        return []

    if current:
        selected = timestamps_files[-n_steps:] if n_steps <= len(timestamps_files) else timestamps_files
        return [f[1] for f in selected]

    if target_time is None:
        print("Error: target_time must be provided if 'current' is False.")
        return []

    selected_files = []
    current_ts = target_time

    for _ in range(n_steps):
        previous_file = None
        for ts, f in reversed(timestamps_files):
            if ts <= current_ts:
                previous_file = (ts, f)
                break

        if previous_file:
            selected_files.append(previous_file[1])
            current_ts = previous_file[0] - timedelta(minutes=15)
        else:
            print(f"No file available before {current_ts}")
            break

    return selected_files[::-1]

if selection_mode == "Select_target_data":
    selected_files = select_files_before_date(target_datetime, n_steps=n_steps, current=False)
elif selection_mode == "Current":
    selected_files = select_files_before_date(target_datetime, n_steps=n_steps, current=True)

In [58]:
def download_h60b_file(filename, output_folder=None):

    target_folder = Path(output_folder) if output_folder else raw_folder
    target_folder.mkdir(parents=True, exist_ok=True)

    gz_path = target_folder / filename
    nc_path = target_folder / filename.replace(".gz", "")

    if nc_path.exists():
        print(f"The file already exists: {nc_path.name}")
        return str(nc_path)

    try:
        print(f"Downloading {filename}...")

        with FTP(ftp_server) as ftp:
            ftp.login(username, password)
            ftp.cwd(ftp_directory)

            with open(gz_path, "wb") as f:
                ftp.retrbinary(f"RETR {filename}", f.write)

        file_size = gz_path.stat().st_size / (1024*1024)  
        with gzip.open(gz_path, "rb") as f_in:
            with open(nc_path, "wb") as f_out:
                shutil.copyfileobj(f_in, f_out)
        gz_path.unlink()

        final_size = nc_path.stat().st_size / (1024*1024) 

        return str(nc_path)

    except Exception as e:
        print(f"Error downloading {filename}: {e}")
        return None

In [59]:
def preprocess_file(raw_file: str, output_folder=None):
    
    target_folder = Path(output_folder) if output_folder else processed_folder
    target_folder.mkdir(parents=True, exist_ok=True)
    filename = Path(raw_file).name

    # Extract timestamp from the filename and define the output path
    timestamp = parse_h60b_timestamp(filename)
    hsaf_filename = f"HSAF-H60B_{timestamp.strftime('%Y%m%dT%H%M%S')}.nc"
    output_path_HSAF = target_folder / hsaf_filename

    # Check if preprocessed file already exists
    if output_path_HSAF.exists():
        print(f"File already preprocessed: {hsaf_filename} -> Skipping preprocessing")
        return str(output_path_HSAF)

    # --- Define coordinate systems ---
    crs_in = "+proj=geos +a=6378.169 +b=6356.584 +h=35785.831 +lat_0=0 +lon_0=0.000000"  # Original CRS (satellite)
    crs_out = "EPSG:4326"  # Output CRS: latitude/longitude

    ds = xr.open_dataset(raw_file, decode_coords="all", decode_cf=False)
    ds = ds.rename({"nx": "x", "ny": "y"})
    rr_raw = ds["rr"]
    scale_factor = rr_raw.encoding.get("scale_factor", 0.1)
    add_offset  = rr_raw.encoding.get("add_offset", 0.0)
    ds["rr"] = rr_raw.astype("float32") * scale_factor + add_offset
    ds["rr"].encoding.clear()
    ds["rr"].attrs.clear()
    ds = ds[["rr"]].astype("float32")

    source_crs = CRS.from_proj4(ds.attrs["gdal_projection"])
    crs_in = source_crs.to_proj4()  # Exact CRS of the file
    crs_out = "EPSG:4326"

    cgms_projection = (
        "+proj=geos +coff=1856.000000 +cfac=13642337.000000 "
        "+loff=1856.000000 +lfac=13642337.000000 "
        "+spp=0.000000 +r_eq=6378.169000 +r_pol=6356.583800 +h=42164.000000"
    )
    matches = re.findall(r"\+?(\w+)\s*=\s*([^\s]+)", cgms_projection)
    parse_dict = {k: v for k, v in matches}
    cd = source_crs.to_dict()
    area_extent = get_area_extent({
        "scandir": "N2S",                             # Scan direction of satellite (N2S = North to South)
        "h": float(parse_dict["h"]) * 1000 - float(parse_dict["r_eq"]) * 1000,  # Satellite height above Earth
        "loff": float(parse_dict["loff"]),            # Line and column offsets
        "coff": float(parse_dict["coff"]),           
        "lfac": float(parse_dict["lfac"]),            # Scale factors for lines and columns
        "cfac": float(parse_dict["cfac"]),
        "ncols": ds.x.size,                           # Number of columns and rows
        "nlines": ds.y.size,
    })

    area_def_src = AreaDefinition(
        "areaD",
        cd["proj"],
        "areaD",
        {"lon_0": cd["lon_0"], "a": cd["a"], "b": cd["b"], "h": cd["h"], "proj": cd["proj"]},
        ds.y.size,
        ds.x.size,
        (area_extent[0], area_extent[1], area_extent[2], area_extent[3]),
    )

    # Create new X and Y coordinates
    x, y = area_def_src.get_proj_coords()
    new_x_coords = np.linspace(x.min(), x.max(), num=ds.sizes["x"])
    new_y_coords = np.linspace(y.max(), y.min(), num=ds.sizes["y"])
    ds = ds.assign_coords(y=("y", new_y_coords), x=("x", new_x_coords))

    # Write CRS corresponding to coordinate units
    ds = ds.rio.write_crs(crs_in)
    ds = ds.rename({"rr": "precip_intensity"}).sortby("y")
    ds = ds.expand_dims("time").assign_coords(time=("time", [timestamp]))
    ds = ds.transpose("time", "y", "x")
    ds.precip_intensity.attrs = {
        "standard_name": "precipitation_flux",
        "long_name": "Precipitation flux derived from cloud optical properties",
        "units": "kg m-2 h-1",
    }

    ds["precip_intensity"] = ds["precip_intensity"].where(ds["precip_intensity"] >= 0, np.nan)
    da = ds["precip_intensity"].rio.write_nodata(np.nan, encoded=True)
    ds["precip_intensity"] = da
    ds = ds.rio.reproject(crs_out, nodata=np.nan)

    # Clear encoding after reprojection
    for var_name in ds.data_vars:
        ds[var_name].encoding.clear()
        for attr in ["_FillValue", "missing_value", "fill_value", "FillValue"]:
            ds[var_name].attrs.pop(attr, None)

    # Final dimension order
    ds = ds.transpose("time", "y", "x")

    # Set CF attributes for coordinates
    ds.x.attrs = {"standard_name": "longitude", "long_name": "longitude", "units": "degrees_east", "axis": "X"}
    ds.y.attrs = {"standard_name": "latitude", "long_name": "latitude", "units": "degrees_north", "axis": "Y"}
    ds.time.attrs = {"standard_name": "time", "long_name": "time", "axis": "T"}

    # Set global dataset attributes
    ds.attrs = {
        "Conventions": "CF-1.6",
        "title": "RAINSAT H60B MSG SEVIRI Precipitation",
        "source": "EUMETSAT H-SAF H60B",
        "creator": "HKV services",
        "creation_date": date.today().strftime("%Y-%m-%d"),
        "time_coverage_start": timestamp.strftime("%Y-%m-%dT%H:%M:%S"),
        "time_coverage_end": (timestamp + timedelta(minutes=15)).strftime("%Y-%m-%dT%H:%M:%S"),
        "geospatial_lat_min": float(ds.y.min()),
        "geospatial_lat_max": float(ds.y.max()),
        "geospatial_lon_min": float(ds.x.min()),
        "geospatial_lon_max": float(ds.x.max()),
        "crs": crs_out,
        "product_details": "https://hsaf.meteoam.it/Products/Detail?prod=H60B",
        "data_source": "hsaf-h60b",
    }

    # NetCDF encoding
    encoding = {
        "precip_intensity": {"dtype": "float32", "zlib": True, "complevel": 4, "_FillValue": -999.0},
        "time": {"units": "minutes since 1970-01-01 00:00:00", "dtype": "float64"},
    }

    # Save preprocessed NetCDF file
    ds.to_netcdf(output_path_HSAF, encoding=encoding)
    ds.close()
    print(f"H60B file successfully preprocessed: {hsaf_filename}")
    return str(output_path_HSAF)

In [60]:
# Final loop to download and preprocess the selected files
for selected_file in selected_files:
    downloaded_file = download_h60b_file(selected_file)
    timestamp = parse_h60b_timestamp(Path(downloaded_file).name)
    hsaf_filename = f"HSAF-H60B_{timestamp.strftime('%Y%m%dT%H%M%S')}.nc"
    output_path_HSAF = processed_folder / hsaf_filename

    if output_path_HSAF.exists():
        print(f"{hsaf_filename} already exists, skipping to the next file\n" + "-"*40)
        continue
    
    preprocess_file(downloaded_file)
    print(f"{selected_file} processed\n" + "-"*40)

Downloading h60_20250830_1245_fdk.nc.gz...


H60B file successfully preprocessed: HSAF-H60B_20250830T124500.nc
h60_20250830_1245_fdk.nc.gz processed
----------------------------------------
Downloading h60_20250830_1300_fdk.nc.gz...
H60B file successfully preprocessed: HSAF-H60B_20250830T130000.nc
h60_20250830_1300_fdk.nc.gz processed
----------------------------------------
Downloading h60_20250830_1315_fdk.nc.gz...
H60B file successfully preprocessed: HSAF-H60B_20250830T131500.nc
h60_20250830_1315_fdk.nc.gz processed
----------------------------------------
Downloading h60_20250830_1330_fdk.nc.gz...
H60B file successfully preprocessed: HSAF-H60B_20250830T133000.nc
h60_20250830_1330_fdk.nc.gz processed
----------------------------------------
Downloading h60_20250830_1345_fdk.nc.gz...
H60B file successfully preprocessed: HSAF-H60B_20250830T134500.nc
h60_20250830_1345_fdk.nc.gz processed
----------------------------------------
Downloading h60_20250830_1400_fdk.nc.gz...
H60B file successfully preprocessed: HSAF-H60B_20250830T1400

In [61]:
settings = {
    "data_source": "h60b",        # Data source
    "ensemble": 5,                # Number of ensemble members
    "n_input_files": 10,          # Number of input files to use
    "n_lead_times": 12,           # Number of forecast time steps (each step is 15 minutes)
    "frequency": 15,              # Minutes between each time step
    "transform": "dB",            # Transformation used by pysteps ("dB", "log", etc.)
    "threshold": 0.05,            # Threshold (mm/h) to define rain/no rain
    "buffer_distance": 5000,      # Buffer distance (meters) around the country for clipping
    "crs_out": "EPSG:4326",       # Output coordinate reference system
    "norain_thr": 0.005,          # Parameter for the Norain method
    "zerovalue": -15.0,           # Parameter for the Norain method
    "max_workers": 2,             # Number of processors to use
}

In [62]:
def match_files_by_timestamp(selected_files, processed_folder):

    processed_folder = Path(processed_folder)
    all_files = list(processed_folder.glob("*"))

    # Extract timestamps from the selected files
    ts_list = []
    pattern_selected = r"(\d{8}_\d{4})"
    for f in selected_files:
        match = re.search(pattern_selected, f)
        if match:
            dt = datetime.strptime(match.group(), "%Y%m%d_%H%M")
            ts_list.append(dt)

    # Match with processed files based on timestamp
    matched_files = []
    pattern_processed = r"(\d{8}T\d{6})"  
    for f in all_files:
        match = re.search(pattern_processed, f.name)
        if match:
            dt_f = datetime.strptime(match.group(), "%Y%m%dT%H%M%S")
            if dt_f in ts_list:
                matched_files.append(f)

    matched_files.sort(key=lambda x: re.search(pattern_processed, x.name).group())
    return matched_files

observations_input_nowcast = match_files_by_timestamp(selected_files, processed_folder)


def open_as_time_stack(input_files):
    """
    Open multiple NetCDF files as a single time-stacked xarray Dataset.
    """
    if not input_files:
        raise FileNotFoundError("No input files found.")
    datasets = xr.open_mfdataset(input_files)                       
    return datasets

# Load the preprocessed observations as a time-stacked dataset
input_dataset = open_as_time_stack(observations_input_nowcast)

In [63]:
# Remove existing nowcast file if it exists
nc_path = Path("h60b_data/nowcast/pysteps_h60b_latest.nc")
if nc_path.exists():
    nc_path.unlink()

steps_settings = {
    "datafolder": data_folder,                      # Folder containing preprocessed HSAF files
    "ensemble": settings["ensemble"],               # Number of ensemble members
    "n_lead_times": settings["n_lead_times"],       # Number of lead times to forecast
    "frequency": settings["frequency"],             # Time step frequency (minutes)
    "transform": settings["transform"],             # PySTEPS transformation ("dB", "log", etc.)
    "threshold": settings["threshold"],             # Threshold to distinguish rain / no rain
    "buffer_distance": settings["buffer_distance"], # Buffer distance around the region of interest
    "crs_out": settings["crs_out"],                 # Output coordinate reference system
    "norain_thr": settings["norain_thr"],           # Parameter for "no rain" handling
    "zerovalue": settings["zerovalue"],             # Minimum value for "no rain"
    "max_workers": settings["max_workers"],         # Number of CPUs to use
}

engine = EnhancedStepsNowcast(steps_settings, "h60b")
dataset_roi = input_dataset.sel(
    x=slice(region["lon_min"], region["lon_max"]),
    y=slice(region["lat_max"], region["lat_min"])  # Note: Y coordinates might be inverted
)

nowcast_arrays, metadata = engine.nowcast_steps_pysteps(dataset_roi)

Rain fraction is: 0.008419090835426626, while minimum fraction is 0.005
unknown projection longlat
Inputs validated and initialized successfully.
Computing STEPS nowcast
-----------------------

Inputs
------
input dimensions: 287x215
km/pixel:         3.0
time step:        15 minutes

Methods
-------
extrapolation:          semilagrangian
bandpass filter:        gaussian
decomposition:          fft
noise generator:        nonparametric
noise adjustment:       no
velocity perturbator:   None
conditional statistics: no
precip. mask method:    incremental
probability matching:   cdf
FFT method:             numpy
domain:                 spatial

Parameters
----------
number of time steps:     12
ensemble size:            5
parallel threads:         2
number of cascade levels: 6
order of the AR(p) model: 2
precip. intensity threshold: -13.010299956639813
Nowcast components initialized successfully.
Rain fraction is: 0.005234583907300867, while minimum fraction is 0.0
Extrapolation complete

In [64]:
# Create output folder for the nowcast results
output_path_nowcast = "/workspaces/Tools-for-weather-and-climate-services-in-Africa/2_H60B_Nowcast/h60b_data/nowcast"
Path(output_path_nowcast).mkdir(parents=True, exist_ok=True)

engine.settings["threddsdata"] = output_path_nowcast

country_name = f"Country_{target_datetime.strftime('%Y%m%dT%H%M')}"

engine.export_nowcast_to_netcdf(
    country=country_name,
    nowcasting_arrays=nowcast_arrays,
    date_start=None,       # Automatically use the first time step
    metadata=metadata,
    reproject=True,
    data_source=settings["data_source"],
)

## Options to visualise and save results

In [65]:
# Create GIFs for the observations
if make_gif:
    obs_files = []
    pattern = r"\d{8}T\d{4}"  # Timestamp pattern  

    for f in observations_input_nowcast:
        match = re.search(pattern, str(f))  
        if match:
            ts_str = match.group()
            matching = list(processed_folder.glob(f"*{ts_str}*.nc"))
            obs_files.extend(matching)
        else:
            print(f"No timestamp found in: {f}")

    if not obs_files:
        print("No observation files found for the selected time steps.")
    else:
        ds_obs = xr.open_mfdataset([str(f) for f in obs_files])

        # Region of Interest
        ds_roi = ds_obs.sel(
            x=slice(region["lon_min"], region["lon_max"]),
            y=slice(region["lat_max"], region["lat_min"])
        )

        var = ds_roi["precip_intensity"] if "precip_intensity" in ds_roi.data_vars else ds_roi["precipitation"]
        x = var["x"].values
        y = var["y"].values

        if y[0] > y[-1]:
            extent = [x.min(), x.max(), y.max(), y.min()]
            origin = "upper"
        else:
            extent = [x.min(), x.max(), y.min(), y.max()]
            origin = "lower"

        cmap = ListedColormap(colors)
        norm = BoundaryNorm(boundaries=bounds, ncolors=cmap.N, extend='max')
        cmap.set_bad("none")
        
        fig, ax = plt.subplots(figsize=(5, 6), subplot_kw={"projection": ccrs.PlateCarree()})
        plt.close(fig)
        ax.set_extent([x.min(), x.max(), y.min(), y.max()])
        ax.coastlines(resolution='10m', color='black')
        ax.add_feature(cfeature.BORDERS, edgecolor='black')

        last_frame = var.isel(time=-1)
        last_frame_flipped = np.ma.masked_invalid(last_frame.values[::-1, :]).astype(np.float32)

        img = ax.imshow(
            last_frame_flipped,
            extent=extent,
            origin=origin,
            cmap=cmap,
            norm=norm
        )

        cbar = fig.colorbar(img, ax=ax, fraction=0.05, pad=0.02, ticks=bounds)
        cbar.set_label("Precipitation (mm/h)")
        ax.set_title(np.datetime_as_string(last_frame.time.values, unit='m'))

        def update(i):
            frame = var.isel(time=i)
            frame_flipped = np.ma.masked_invalid(frame.values[::-1, :]).astype(np.float32)
            img.set_data(frame_flipped)
            ax.set_title(np.datetime_as_string(frame.time.values, unit='m'))
            return (img,)

        ani = FuncAnimation(fig, update, frames=var.sizes["time"], interval=600, blit=True)

        results_folder = Path("results")
        results_folder.mkdir(parents=True, exist_ok=True)

        last_time = var.time.values[9]
        last_time_dt = pd.to_datetime(last_time)
        safe_time_str = last_time_dt.strftime("%Y%m%dT%H%M")

        time_folder = results_folder / f"ens_{ensemble_number}_{safe_time_str}"
        time_folder.mkdir(parents=True, exist_ok=True)
        
        gif_filename = f"observations_animation_{safe_time_str}.gif"
        gif_path = time_folder / gif_filename
        ani.save(gif_path, writer="pillow", fps=2)

In [66]:
# Create GIFs for the nowcast
if make_gif:
    ds_nowcast = xr.open_dataset(nowcasting_folder / "pysteps_h60b_latest.nc")
    varname = "precip_intensity" if "precip_intensity" in ds_nowcast.data_vars else "precipitation"
    var = ds_nowcast[varname]
    ds_nowcast.close()

    if "ens_number" in var.dims:
        var = var.isel(ens_number=ensemble_number)

    def fmt_time(tarr, i):
        v = tarr.isel(time=i).values
        try:
            return np.datetime_as_string(np.asarray(v, dtype="datetime64[m]"), unit='m')
        except Exception:
            try:
                return v.strftime("%Y-%m-%d %H:%M")
            except Exception:
                return str(v)

    cmap = ListedColormap(colors)
    norm = BoundaryNorm(boundaries=bounds, ncolors=cmap.N, extend='max')
    cmap.set_bad("none")

    xname = next((c for c in ("x", "lon", "longitude") if c in var.coords), None)
    yname = next((c for c in ("y", "lat", "latitude") if c in var.coords), None)
    x = var[xname].values
    y = var[yname].values

    if y[0] > y[-1]:
        extent = [x.min(), x.max(), y.max(), y.min()]
        origin = "upper"
    else:
        extent = [x.min(), x.max(), y.min(), y.max()]
        origin = "lower"

    fig, ax = plt.subplots(figsize=(5, 6), subplot_kw={"projection": ccrs.PlateCarree()})
    plt.close(fig)
    ax.set_extent([x.min(), x.max(), y.min(), y.max()])
    ax.coastlines(resolution='10m', color='black')
    ax.add_feature(cfeature.BORDERS, edgecolor='black')

    first_frame = var.isel(time=0).where(var.isel(time=0) > 0)
    first_frame_flipped = np.ma.masked_invalid(first_frame.values[::-1, :]).astype(np.float32)

    img = ax.imshow(
        first_frame_flipped,
        extent=extent,
        origin=origin,
        cmap=cmap,
        norm=norm
    )

    cbar = fig.colorbar(img, ax=ax, fraction=0.05, pad=0.02, ticks=bounds)
    cbar.set_label("Precipitation (mm/h)")
    ax.set_title(fmt_time(var["time"], 0))

    def update(i):
        frame = var.isel(time=i).where(var.isel(time=i) > 0)
        frame_flipped = np.ma.masked_invalid(frame.values[::-1, :]).astype(np.float32)
        img.set_data(frame_flipped)
        ax.set_title(fmt_time(var["time"], i))
        return (img,)

    ani = FuncAnimation(fig, update, frames=var.sizes["time"], interval=600, blit=True)

    gif_filename = f"nowcast_animation_{target_datetime.strftime('%Y%m%dT%H%M')}.gif"
    gif_path = time_folder / gif_filename
    ani.save(gif_path, writer="pillow", fps=2)


In [67]:
## Create Excel file with observation time series
if make_timeseries_Excel:
    ds_obs = xr.open_mfdataset([str(f) for f in observations_input_nowcast])

    last_time = ds_obs.time.values[-1]
    last_time_str = np.datetime_as_string(last_time, unit='m')
    safe_time_str = last_time_str.replace(":", "").replace("-", "")

    data_dict = {}
    for pt in points_of_interest:
        varname = "precip_intensity" if "precip_intensity" in ds_obs.data_vars else "precipitation"
        rain_series = ds_obs.sel(
            x=ds_obs["x"].sel(x=pt["lon"], method="nearest"),
            y=ds_obs["y"].sel(y=pt["lat"], method="nearest")
        )[varname]
        data_dict[pt["name"]] = rain_series.values

    df_points = pd.DataFrame(data_dict, index=rain_series["time"].values)
    df_points.index.name = "time"

    results_folder = Path("results")
    results_folder.mkdir(parents=True, exist_ok=True)

    # Folder named by the time step
    time_folder = results_folder / f"ens_{ensemble_number}_{safe_time_str}"
    time_folder.mkdir(parents=True, exist_ok=True)
    excel_filename = f"observations_HSAF_points_timeseries_{safe_time_str}.xlsx"
    excel_path = time_folder / excel_filename
    df_points.to_excel(excel_path, index=True)

In [68]:
## Create Excel file with nowcast time series
if make_timeseries_Excel:
    nowcast_file = nowcasting_folder / "pysteps_h60b_latest.nc"
    ds_nowcast = xr.open_dataset(nowcast_file)

    varname = "precip_intensity" if "precip_intensity" in ds_nowcast.data_vars else "precipitation"
    var = ds_nowcast[varname]
    ds_nowcast.close()
    if "ens_number" in var.dims:
        var = var.isel(ens_number=ensemble_number)

    data_dict = {}
    y_vals = var["y"].values
    for pt in points_of_interest:
        x_idx = abs(var["x"].values - pt["lon"]).argmin()
        y_idx = abs(var["y"].values - pt["lat"]).argmin()
        y_idx_flipped = len(y_vals) - 1 - y_idx  # flip Y-axis 
        rain_series = var[:, y_idx_flipped, x_idx]
        data_dict[pt["name"]] = rain_series.values

    first_time = var.time.values[0]
    first_time_str = np.datetime_as_string(first_time, unit='m')
    safe_time_str = first_time_str.replace(":", "").replace("-", "")

    df_points = pd.DataFrame(data_dict, index=rain_series["time"].values)
    df_points.index.name = "time"
    excel_filename = f"nowcast_points_timeseries_{safe_time_str}.xlsx"
    excel_path = time_folder / excel_filename
    df_points.to_excel(excel_path, index=True)

In [69]:
## Create a plot with observation data
if make_graph:
    ds_obs = xr.open_mfdataset([str(f) for f in observations_input_nowcast])
    varname = "precip_intensity" if "precip_intensity" in ds_obs.data_vars else "precipitation"
    var = ds_obs[varname]

    data_dict = {}
    for pt in points_of_interest:
        x_idx = abs(var["x"].values - pt["lon"]).argmin()
        y_idx = abs(var["y"].values - pt["lat"]).argmin()
        rain_series = var[:, y_idx, x_idx]
        data_dict[pt["name"]] = rain_series.values

    df_points = pd.DataFrame(data_dict, index=var.time.values)
    df_points.index.name = "time"

    last_time_dt = pd.to_datetime(var.time.values[-1])
    safe_time_str = last_time_dt.strftime("%Y%m%dT%H%M")
    results_folder = Path("results")
    time_folder = results_folder / f"ens_{ensemble_number}_{safe_time_str}"
    time_folder.mkdir(parents=True, exist_ok=True)

    fig, ax1 = plt.subplots(figsize=(14, 7))
    x = np.arange(len(df_points.index))
    n_locations = len(df_points.columns)
    width = 0.8 / n_locations

    for i, col in enumerate(df_points.columns):
        ax1.bar(
            x + i * width,
            df_points[col].values,
            width=width,
            label=col
        )

    ax1.set_xticks(x + width * (n_locations - 1) / 2)
    ax1.set_xticklabels(pd.to_datetime(df_points.index).strftime("%Y-%m-%d %H:%M"), rotation=45, ha="right")
    ax1.set_ylabel("Precipitation intensity (mm/h)")
    ax1.set_xlabel("Time")

    # Cumulative precipitation (multiply by 0.25 because time step = 15 min)
    df_mm = df_points * 0.25
    ax2 = ax1.twinx()
    for col in df_mm.columns:
        ax2.plot(
            x,
            df_mm[col].cumsum(),
            linestyle="--",
            label=f"Cumulative {col} (mm)"
        )
    ax2.set_ylabel("Cumulative sum (mm)")

    lines_labels = [ax.get_legend_handles_labels() for ax in [ax1, ax2]]
    lines, labels = [sum(lol, []) for lol in zip(*lines_labels)]
    ax1.legend(lines, labels, loc="upper left")

    plt.title(f"Precipitation and cumulative sum")
    plt.tight_layout()
    graph_filename = f"observations_graph_{safe_time_str}.png"
    graph_path = time_folder / graph_filename
    plt.savefig(graph_path, dpi=150)
    plt.close()


In [70]:
## Create a plot with nowcast data
if make_graph:
    nowcast_file = nowcasting_folder / "pysteps_h60b_latest.nc"
    ds_nowcast = xr.open_dataset(nowcast_file)

    varname = "precip_intensity" if "precip_intensity" in ds_nowcast.data_vars else "precipitation"
    var = ds_nowcast[varname]
    ds_nowcast.close()
    if "ens_number" in var.dims:
        var = var.isel(ens_number=ensemble_number)

    first_time = var.time.values[0]
    first_time_str = np.datetime_as_string(first_time, unit='m')
    safe_time_str = first_time_str.replace(":", "").replace("-", "")

    data_dict = {}
    y_vals = var["y"].values
    for pt in points_of_interest:
        x_idx = abs(var["x"].values - pt["lon"]).argmin()
        y_idx = abs(var["y"].values - pt["lat"]).argmin()
        y_idx_flipped = len(y_vals) - 1 - y_idx  # Flip y-axis
        rain_series = var[:, y_idx_flipped, x_idx]
        data_dict[pt["name"]] = rain_series.values

    df_points = pd.DataFrame(data_dict, index=rain_series["time"].values)
    df_points.index.name = "time"

    fig, ax1 = plt.subplots(figsize=(12, 6))
    x = np.arange(len(df_points.index))
    width = 0.8 / len(df_points.columns)

    for i, col in enumerate(df_points.columns):
        ax1.bar(
            x + i * width,
            df_points[col].values,
            width=width,
            label=col
        )

    ax1.set_xticks(x + width * (len(df_points.columns)-1)/2)
    ax1.set_xticklabels(df_points.index.strftime("%Y-%m-%d %H:%M"), rotation=45, ha="right")
    ax1.set_ylabel("Precipitation intensity (mm/h)")
    ax1.set_xlabel("Time")

    # Convert to cumulative precipitation in mm (assuming 15 min timestep)
    df_points_mm = df_points * 0.25

    ax2 = ax1.twinx()
    for col in df_points.columns:
        ax2.plot(
            x,
            df_points_mm[col].cumsum(),
            linestyle="--",
            label=f"Cumulative {col} (mm)"
        )
    ax2.set_ylabel("Cumulative sum (mm)")

    # Combine legends from both axes
    lines_labels = [ax.get_legend_handles_labels() for ax in [ax1, ax2]]
    lines, labels = [sum(lol, []) for lol in zip(*lines_labels)]
    ax1.legend(lines, labels, loc="upper left")

    plt.title(f"Predicted precipitation intensity in Nowcast")
    plt.tight_layout()

    graph_filename = f"nowcast_graph_{safe_time_str}.png"
    graph_path = time_folder / graph_filename
    plt.savefig(graph_path, dpi=150)
    plt.close()

In [71]:
## Create a combination GIF with the observed and nowcasted data 

if make_gif:
    safe_time_str = target_datetime.strftime("%Y%m%dT%H%M")
    time_folder = results_folder / f"ens_{ensemble_number}_{safe_time_str}"
    gif_files = list(time_folder.glob("*.gif"))

    obs_gif = next((f for f in gif_files if "observations" in f.name.lower()), None)
    nowcast_gif = next((f for f in gif_files if "nowcast" in f.name.lower()), None)

    def add_title(frame, title):
        frame = frame.convert("RGBA")
        draw = ImageDraw.Draw(frame)
        try:
            font = ImageFont.truetype("arial.ttf", 500)
        except:
            font = ImageFont.load_default()

        text_bbox = draw.textbbox((0, 0), title, font=font)
        text_width = text_bbox[2] - text_bbox[0]
        x = (frame.width - text_width) // 2
        y = 20

        draw.text((x, y), title, fill="black", font=font)
        return frame

    if obs_gif and nowcast_gif:
        gif1 = Image.open(obs_gif)
        gif2 = Image.open(nowcast_gif)

        frames = []
        for frame in ImageSequence.Iterator(gif1):
            frames.append(add_title(frame.copy(), "OBSERVATIONS"))
        for frame in ImageSequence.Iterator(gif2):
            frames.append(add_title(frame.copy(), "NOWCAST"))

        combination_path = time_folder / "combined.gif"
        frames[0].save(
            combination_path,
            save_all=True,
            append_images=frames[1:],
            duration=600,
            loop=0
        )