# Notebook for processing IMERG Rainfall data in Blob

This notebook is used to process the rainfall data on Blob for each cyclone since Favio. It extracts the daily on-land rainfall values from -2 days to +5 days to landfall in a 250km radius around the landfall location.

In [183]:
%load_ext jupyter_black
%load_ext autoreload
%autoreload 2

The jupyter_black extension is already loaded. To reload it, use:
  %reload_ext jupyter_black
The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [184]:
import os
import tqdm
import pandas as pd
from dotenv import load_dotenv
from pathlib import Path
import geopandas as gpd
from datetime import datetime, timedelta
import rioxarray as rxr
import rasterio as rio
from azure.storage.blob import ContainerClient
import xarray as xr
import matplotlib.pyplot as plt
import numpy as np
import warnings
from shapely.geometry import Point
from pyproj import CRS, Transformer
from shapely.ops import transform
from rasterio.features import geometry_mask

load_dotenv()
ADMS = ["Sofala", "Inhambane", "Nampula", "Zambezia"]
AA_DATA_DIR = Path(os.getenv("AA_DATA_DIR"))
AA_DATA_DIR_NEW = Path(os.getenv("AA_DATA_DIR_NEW"))

DEV_BLOB_SAS = os.getenv("DSCI_AZ_SAS_DEV")
DEV_BLOB_NAME = "imb0chd0dev"
DEV_BLOB_URL = f"https://{DEV_BLOB_NAME}.blob.core.windows.net/"
DEV_BLOB_PROJ_URL = DEV_BLOB_URL + "projects" + "?" + DEV_BLOB_SAS
GLOBAL_CONTAINER_NAME = "global"
DEV_BLOB_GLB_URL = DEV_BLOB_URL + GLOBAL_CONTAINER_NAME + "?" + DEV_BLOB_SAS

dev_glb_container_client = ContainerClient.from_container_url(DEV_BLOB_GLB_URL)

In [185]:
warnings.filterwarnings("ignore")

In [186]:
adm1_path = (
    AA_DATA_DIR
    / "public"
    / "raw"
    / "moz"
    / "cod_ab"
    / "moz_admbnda_adm1_ine_20190607.shp"
)

gdf_adm1 = gpd.read_file(adm1_path)
gdf_sel = gdf_adm1[gdf_adm1.ADM1_PT.isin(ADMS)]
ibtracs_path = (
    Path(AA_DATA_DIR)
    / "public"
    / "raw"
    / "glb"
    / "ibtracs"
    / "IBTrACS.SI.list.v04r01.points/IBTrACS.SI.list.v04r01.points.shp"
)
adm2_path = (
    AA_DATA_DIR
    / "public"
    / "raw"
    / "moz"
    / "cod_ab"
    / "moz_admbnda_adm2_ine_20190607.shp"
)

gdf_adm2 = gpd.read_file(adm2_path)
gdf_sel_adm2 = gdf_adm2[gdf_adm2.ADM1_PT.isin(ADMS)]

minx, miny, maxx, maxy = gdf_sel.total_bounds

In [5]:
#blob_names = existing_files = [
#    x.name for x in dev_glb_container_client.list_blobs(name_starts_with="imerg/v6/")
#]

In [12]:
gdf_ibtracs = gpd.read_file(ibtracs_path)

In [89]:
gdf_adm1_sel_buff = gdf_adm1[gdf_adm1.ADM1_PT.isin(ADMS)].buffer(250 / 111)
# also making sure to take one time step before landfall since some storms even off shore can cause a lot of rain
gdf_ibtracs_time = gdf_ibtracs[gdf_ibtracs["ISO_TIME"] >= "2003-03-11"]
# which cyclones made landfall or came close by around 50km to land
landfall_cyclones = gpd.sjoin(
    gdf_ibtracs_time, gdf_sel, how="inner", predicate="intersects"
)["NAME"].unique()

In [182]:
# das = []
# for blob_name in tqdm.tqdm(blob_names):
#    cog_url = (
#        f"https://{DEV_BLOB_NAME}.blob.core.windows.net/global/"
#        f"{blob_name}?{DEV_BLOB_SAS}"
#    )
#    da_in = rxr.open_rasterio(cog_url, masked=True)
#    da_in = da_in.sel(x=slice(minx, maxx), y=slice(miny, maxy))
#    date_in = pd.to_datetime(blob_name.split(".")[0][-10:])
#    da_in["date"] = date_in

#    # Persisting to reduce the number of downstream Dask layers
#    da_in = da_in.persist()
#    das.append(da_in)

In [195]:
combined_df = []
dates = pd.date_range(start="2003-03-11", periods=len(das), freq="D")

for cyc in landfall_cyclones:
    cyc_df = gdf_ibtracs_time[gdf_ibtracs_time["NAME"] == cyc]
    cyc_df["date"] = pd.to_datetime(cyc_df["ISO_TIME"]).dt.date
    cyc_sjoin = gpd.sjoin(cyc_df, gdf_sel, how="left", predicate="within")
    cyc_df["ADM1_PT"] = cyc_sjoin["ADM1_PT"]
    cyc_df["actual_within_land"] = cyc_sjoin["index_right"].notna()
    cyc_df["point_location"] = np.where(
        cyc_df["actual_within_land"], "Within", "Outside"
    )
    first_landfall = (
        cyc_df[cyc_df["actual_within_land"]].index[0]
        if not cyc_df[cyc_df["actual_within_land"]].empty
        else None
    )
    if first_landfall is not None:
        cyc_df.loc[cyc_df.index == first_landfall, "point_location"] = "Landfall"
        landfall_time = pd.to_datetime(
            cyc_df[cyc_df["point_location"] == "Landfall"]["ISO_TIME"].values[0]
        )
        lf_dt = cyc_df[cyc_df["point_location"] == "Landfall"]

        storm_df = pd.DataFrame(
            {
                "storm": cyc,
                "date": pd.date_range(
                    start=landfall_time - pd.Timedelta(days=2),
                    end=landfall_time + pd.Timedelta(days=5),
                ),
                "time_step": range(-2, 6),
            }
        )

        # Initialize a list to store extracted values for each row
        extracted_values_list = []

        # Iterate over each row in the DataFrame
        for index, row in storm_df.iterrows():
            target_date = row["date"].normalize()  # Ensure target_date is a Timestamp
            # Create a GeoDataFrame
            gdf_lf = gpd.GeoDataFrame(
                {"geometry": [Point(lf_dt["LON"].values[0], lf_dt["LAT"].values[0])]},
                crs=CRS("EPSG:4326"),
            )
            gdf_lf["geometry"] = gdf_lf.buffer(250 / 111)
            gdf_lf = gpd.overlay(gdf_lf, gdf_sel, how="intersection")

            # Find the index of the target_date in the dates list
            target_timestamp = pd.Timestamp(target_date)  # Convert to Timestamp

            # Get the corresponding DataArray
            blob_name = (
                f"imerg/v6/imerg-daily-late-{target_date.strftime('%Y-%m-%d')}.tif"
            )
            cog_url = (
                f"https://{DEV_BLOB_NAME}.blob.core.windows.net/global/"
                f"{blob_name}?{DEV_BLOB_SAS}"
            )
            # da_in = rxr.open_rasterio(cog_url, masked=True)
            # da_in = da_in.sel(x=slice(minx, maxx), y=slice(miny, maxy))
            # date_in = pd.to_datetime(blob_name.split(".")[0][-10:])
            # da_in["date"] = date_in

            # Persisting to reduce the number of downstream Dask layers
            # da_in = da_in.persist()
            # Load the raster data
            try:
                da_in = rxr.open_rasterio(cog_url, masked=True)
                # Persist the DataArray in memory
                da_in = da_in.persist()
                # Ensure CRS is set
                if da_in.rio.crs is None:
                    da_in.rio.write_crs(
                        "EPSG:4326", inplace=True
                    )  # Set the correct EPSG code if needed

                # Load the polygon data
                polygon_gdf = gdf_lf

                # Create a mask from the polygon
                polygon_union = polygon_gdf.unary_union
                mask = geometry_mask(
                    [polygon_union],
                    transform=da_in.rio.transform(),
                    invert=True,
                    out_shape=da_in.rio.shape,
                )

                # Apply the mask to the DataArray
                masked_da = da_in.where(mask)

                # Compute the median of the values within the polygon
                values = masked_da.values[~np.isnan(masked_da.values)]
                median_value = np.median(values)
            except Exception as e:
                median_value = np.nan
            # Append the median value to the list
            extracted_values_list.append(median_value)

        # Add the extracted values to the DataFrame
        storm_df["median_precip_250km"] = extracted_values_list
        combined_df.append(storm_df)

# Combine all DataFrames into a single DataFrame
rain_df = pd.concat(combined_df, ignore_index=True)

In [196]:
rain_df.to_csv(
    AA_DATA_DIR / "public" / "processed" / "moz" / "daily_imerg_cyclone_landfall.csv"
)

In [None]:
#ds = xr.concat(das, dim="date", join="override", combine_attrs="drop")

In [None]:
# Now clip to the specific geometry
#ds = ds.rio.write_crs(4326)
#ds = ds.rio.set_spatial_dims(x_dim="x", y_dim="y")
#ds_clip = ds.rio.clip(gdf_sel.geometry)
#results = []

# TODO: Is there a better way to aggregate here?
# Pauline's suggestion: We would be looking at landfall point so best to leave as clipped rasters. 
# We can have a trigger as a radius around the point only and not by admin level. 
# These loops will be v slow...
#for day in ds_clip.date.values:

#    ds_time = ds_clip.sel(date=day)

#    for idx, row in gdf_sel.iterrows():
#        admin_name = row["ADM1_PT"]
#        polygon = row["geometry"]

#        ds_clipped = ds_time.rio.clip([polygon], all_touched=True)
#        total_precipitation = int(ds_clipped.sum(dim=["x", "y"]).values[0])

#        results.append(
#            {
#                "ADM1": admin_name,
#                "date": pd.to_datetime(day),
#                "total_precipitation": total_precipitation,
#            }
#        )

In [None]:
#df_precipitation = pd.DataFrame.from_dict(results)

In [None]:
# Create a plot to sanity check
#plt.figure(figsize=(12, 6))

# Group by ADM1 and plot each group
#for adm1, group in df_precipitation.groupby("ADM1"):
#    plt.plot(group["date"], group["total_precipitation"], label=adm1)

#plt.xlabel("Date")
##plt.ylabel("Precipitation")
#plt.title("Total daily precipitation per Province in Mozambique")
#plt.legend(title="ADM1")
#plt.tight_layout()
#plt.grid(False)
#plt.show()

In [None]:
# Now further check by plotting some specific dates
# Just observationally, the plots here make sense with the aggregations plotted above
# ds_clip.plot(x="x", y="y", col="date", col_wrap=5)

In [None]:
#df_precipitation.to_csv(
#    AA_DATA_DIR / "public" / "processed" / "moz" / "daily_imerg_precip_adm1_sel.csv"
#)