# DOsat data processing for Ricky 👨‍💻

>Code to add weather data extracted from gridded reanalysis data (ERA5 Land) to DOsat measurement catalog for further downstream analysis.

Christian Werner, 2021-10-26

In [None]:
import xarray as xr
import pandas as pd
import geopandas as gpd
import datetime
from joblib import delayed, Parallel
from tqdm import tqdm
from functools import partial
import numpy as np
import os


import hvplot.pandas

WORKERS = 16

In [None]:
# source thredds login secrets: THREDDS_USER, THREDDS_PASSWORD
%load_ext dotenv
%dotenv .env

THREDDS_USER=os.getenv('THREDDS_USER')
THREDDS_PASSWORD=os.getenv('THREDDS_PASSWORD')

## Data loading and cleanup 🧹

In [None]:
df = pd.read_csv("data/raw/DOSAT_GRQA_R.csv", low_memory=False)
df = df.dropna(subset=['DOsat'])                                   # drop na rows
df.loc[:,"obs_date"] = pd.to_datetime(df.obs_date)                 # convert date/ time to proper dt dtype 
df.loc[:,"obs_time"] = pd.to_timedelta(df.obs_time)
df["datetime"] = df.obs_date + df.obs_time
df = df[(df.datetime.dt.year >= 1981) & (df.datetime.dt.year <= 2019)]                               # limit data to years >= 1981
df = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df.lon_wgs84, df.lat_wgs84), crs='epsg:4326')
df = df.drop(columns=['obs_date', 'obs_time',                      # drop unnecessary columns
                      'source_param_name', 'source_param_code', 
                      'param_code', 'param_name', 
                      'lat_wgs84', 'lon_wgs84'], axis=1)
df.head(2)

In [None]:
(df.drop_duplicates(subset=['site_id'])
   .to_crs(epsg=3857)
   .hvplot.points(figsize=(14,5), color='red', tiles='StamenTerrainRetina', hover_cols=['datetime', 'DOsat'])
)

## ERA5 Land reanalysis data

... loaded from internal THREDDs Server at [IMK-IFU, KIT](https://www.imk-ifu.kit.edu) (source: [ERA5 Land](https://cds.climate.copernicus.eu/cdsapp#!/dataset/reanalysis-era5-land?tab=overview))

In [None]:
LEAD_TIME = 3 # days (incl. obs day)

In [None]:
def fix_coords(ds: xr.Dataset, lon_name: str = "longitude") -> xr.Dataset:
    """fix coords and remove bnds dim if present"""
    ds.coords['longitude'] = (ds.coords['longitude'] + 180) % 360 - 180
    
    if 'bnds' in list(ds.dims.keys()):
        ds = ds.drop_dims("bnds")
        ds.coords['time'] = ds.time.dt.floor(freq='1d')
    return ds.sortby(ds.longitude)

In [None]:
def process_data(sample_date, sample, *, ds_tp=None, ds_t2m=None) -> xr.Dataset:
    YEAR = sample_date.year
    

    # NOTE: we currently do not check if date extends into the previous year!
    interval = slice(sample_date - datetime.timedelta(days=LEAD_TIME-1), sample_date)
    subset = xr.merge(
        [
            # 3day lead time for precip 
            fix_coords(ds_tp).sel(time=interval).sum(dim='time', skipna=False),
            # actual date only for temp
            fix_coords(ds_t2m).sel(time=sample_date.strftime(format="%Y-%m-%d"), drop=True) - 273.15 
        ]
    )

    lats = xr.DataArray(sample.geometry.y, dims='points') 
    lons = xr.DataArray(sample.geometry.x, dims='points')
    
    # select by coords
    points = subset.sel(latitude = lats, longitude = lons, method = 'nearest')
    
    # round and merge data
    sample["temp"] = points.t2m.round(1)
    sample["precip_3h"] = points.tp.round(1)

    return sample

In [None]:
# get data
all_samples = []

URL = "thredds.imk-ifu.kit.edu:9670/thredds/dodsC/regclim/raster/global/era5_land/daily"

for year, sample_year in df.groupby(df.datetime.dt.year):
    
    ds_tp = xr.open_dataset(f"https://{THREDDS_USER}:{THREDDS_PASSWORD}@{URL}/ERA5_Land_daily_tp_{year}.nc")
    ds_t2m = xr.open_dataset(f"https://{THREDDS_USER}:{THREDDS_PASSWORD}@{URL}/ERA5_Land_daily_t2m_{year}.nc")

    samples = Parallel(n_jobs=WORKERS)(
        delayed(
            partial(process_data, ds_tp=ds_tp, ds_t2m=ds_t2m)
        )(sample_date, sample) for sample_date, sample in tqdm(sample_year.groupby(sample_year.datetime.dt.date), desc=f"{year}")
    )

    all_samples.extend(samples)
    

In [None]:
def sanitize_for_shp_output(df):
    """trim colnames and convert datetime cols for shp export"""
    df.columns = [x[:10] for x in df.columns]
    coltypes = gpd.io.file.infer_schema(df)['properties']
    for colname, coltype in coltypes.items():
        if coltype == 'datetime':
            df[colname] = df[colname].astype('str')
    return df

In [None]:
def extract_lat_lon_from_geometry(df):
    """convert point geometry to latitude, longitude cols"""
    df["longitude"] = df.geometry.x
    df["latitude"] = df.geometry.y
    return df.drop("geometry", axis=1)

In [None]:
df_out = pd.concat(all_samples, axis=0).sort_index()
df_out.pipe(extract_lat_lon_from_geometry).to_csv("dosat_with_weather.csv", index=False)
df_out.pipe(sanitize_for_shp_output).to_file("dosat_with_weather.shp")
df_out.head()

## Some plotting 

In [None]:
q_low = df_out["precip_3h"].quantile(0.01)
q_hi  = df_out["precip_3h"].quantile(0.99)

df_out = df_out[(df_out["precip_3h"] < q_hi) & (df_out["precip_3h"] > q_low)]

In [None]:
q_low = df_out["DOsat"].quantile(0.05)
q_hi  = df_out["DOsat"].quantile(0.95)

df_out = df_out[(df_out["DOsat"] < q_hi) & (df_out["DOsat"] > q_low)]

In [None]:
df_out.plot.hexbin(
    x='precip_3h', y='temp', C='DOsat', 
    gridsize=25, 
    reduce_C_function=np.mean, 
    cmap='inferno', 
    figsize=(9,9)
);