# Neighbour checks for quality control flags
Covers QC16-25

## Table of contents
[Neighbourhood functions](#neighbourhood-functions)  
[QC16 Daily neighbours (wet)](#QC16---Daily-neighbours-wet)  
[QC17 Hourly neighbours (wet)](#QC17---Hourly-neighbours-wet)  
[QC18 Daily neighbours (dry)](#QC18---Daily-neighbours-dry)  
[QC19 Hourly neighbours (dry)](#QC19---Hourly-neighbours-dry)  
[QC20 Monthly neighbours](#QC20---Monthly-neighbours)  
[QC21 Timing offset](#QC21---Timing-offset)  
[QC22 Pre-QC Affinity](#QC22---Pre-QC-Affinity)  
[QC23 Pre-QC Pearson](#QC23---Pre-QC-Pearson)  
[QC24 Daily factor](#QC24---Daily-factor)  
[QC25 Monthly factor](#QC25---Monthly-factor)  

See '3.3.4 Neighbouring gauge checks on large values' in Lewis et al. (2021)

In [320]:
import calendar
import datetime
import glob

import zipfile  ## used once for Intense format data
import pandas as pd  ## used once for Intense format data

import polars as pl
import numpy as np

import scipy.stats
import geopy.distance

In [321]:
TARGET_STATION_ID = "DE_02483"
DISTANCE_THRESHOLD = 50  # 50 km
OVERLAP_THRESHOLD = 365 * 3  # three years

## Data reading globals
GAUGE_DATA_PATH = "../data/gauge_data"
DATA_ROWS_TO_SKIP = 20  ## First 20 rows are metadata TODO: what if not?
UNIT_COL = "new_units"  ## There is an original_units col too TODO: think of way to do this for different data

MULTIPLYING_FACTORS = {"hourly": 24, "daily": 1}  ## compared to daily reference

In [322]:
def read_metadata(data_path):
    metadata = {}

    with open(data_path, "r") as f:
        while True:
            key, val = f.readline().strip().split(":", maxsplit=1)
            key = key.lower().replace(" ", "_")
            metadata[key.strip()] = val.strip()
            if key == "other":
                break
    return metadata

In [323]:
station_metadata = read_metadata(
    data_path=f"../data/gauge_data/{TARGET_STATION_ID}.txt"
)
station_metadata["start_datetime"] = datetime.datetime.strptime(
    station_metadata["start_datetime"], "%Y%m%d%H"
)
station_metadata["end_datetime"] = datetime.datetime.strptime(
    station_metadata["end_datetime"], "%Y%m%d%H"
)

In [324]:
def add_datetime_to_gauge_data(station_metadata, gauge_data, time_multiplying_factor):
    """
    Add datetime column to gauge data using metadata from that gauge.
    NOTE: Could maybe extend so can find metadata if not provided?
    """
    startdate = station_metadata["start_datetime"]
    enddate = station_metadata["end_datetime"]
    assert isinstance(
        startdate, datetime.datetime
    ), "Please convert start_ and end_datetime to datetime.datetime objects"

    date_interval = []
    delta_days = (enddate + datetime.timedelta(days=1) - startdate).days
    for i in range(delta_days * time_multiplying_factor):
        date_interval.append(startdate + datetime.timedelta(hours=i))

    ## add datetime column
    assert len(gauge_data) == len(date_interval)
    gauge_data = gauge_data.with_columns(
        time=pl.Series(date_interval)
    )  ## set time columns

    return gauge_data

In [325]:
rain_col = f"rain_{station_metadata['original_units']}"

In [326]:
## read in gauge data
target_gauge = pl.read_csv(
    f"../data/gauge_data/{TARGET_STATION_ID}.txt",
    skip_rows=20,
    schema_overrides={rain_col: pl.Float64},
)

In [327]:
target_gauge = add_datetime_to_gauge_data(
    station_metadata,
    target_gauge,
    time_multiplying_factor=MULTIPLYING_FACTORS["hourly"],
)
target_gauge = target_gauge.select(["time", rain_col])  ## Reorder (to look nice)

In [328]:
def replace_no_data_with_nan(data, no_data_value):
    return data.with_columns(
        pl.when(pl.col(rain_col) == no_data_value)
        .then(np.nan)
        .otherwise(pl.col(rain_col))
        .alias(rain_col)
    )

In [329]:
## make no data vals nans
target_gauge = replace_no_data_with_nan(
    target_gauge, no_data_value=int(station_metadata["no_data_value"])
)

In [330]:
target_gauge.head()

time,rain_mm
datetime[μs],f64
2006-01-01 00:00:00,0.9
2006-01-01 01:00:00,0.3
2006-01-01 02:00:00,0.3
2006-01-01 03:00:00,0.0
2006-01-01 04:00:00,0.0


# Neighbourhood functions
TODO: convert to Classes

### Part 1. Make or read summary metadata of stations

In [331]:
## Could work by checking if metadata already exists (or user can input)
all_gauge_data_paths = glob.glob(f"{GAUGE_DATA_PATH}/*.txt")
all_gauge_data_paths

['../data/gauge_data/DE_02718.txt',
 '../data/gauge_data/DE_00389.txt',
 '../data/gauge_data/DE_01300.txt',
 '../data/gauge_data/DE_00390.txt',
 '../data/gauge_data/DE_06264.txt',
 '../data/gauge_data/DE_02483.txt',
 '../data/gauge_data/DE_04488.txt',
 '../data/gauge_data/DE_03215.txt',
 '../data/gauge_data/DE_04313.txt',
 '../data/gauge_data/DE_00310.txt',
 '../data/gauge_data/DE_06303.txt']

In [332]:
all_station_metadata_list = []
for ind, file in enumerate(all_gauge_data_paths):
    one_station_metadata = read_metadata(data_path=file)
    all_station_metadata_list.append(one_station_metadata)

In [333]:
all_station_metadata = pl.from_dicts(all_station_metadata_list)
all_station_metadata = all_station_metadata.with_columns(
    pl.col("latitude").cast(pl.Float64),
    pl.col("longitude").cast(pl.Float64),
    (pl.col("start_datetime") + "00").str.strptime(pl.Datetime, "%Y%m%d%H%M"),
    (pl.col("end_datetime") + "00").str.strptime(pl.Datetime, "%Y%m%d%H%M"),
)
all_station_metadata.head()

station_id,country,original_station_number,original_station_name,path_to_original_data,latitude,longitude,start_datetime,end_datetime,elevation,number_of_records,percent_missing_data,original_timestep,new_timestep,original_units,new_units,time_zone,daylight_saving_info,no_data_value,resolution,other
str,str,str,str,str,f64,f64,datetime[μs],datetime[μs],str,str,str,str,str,str,str,str,str,str,str,str
"""DE_02718""","""Germany""","""02718""","""NA""","""B:/INTENSE data/Original data/…",51.288,8.7928,2006-01-01 00:00:00,2010-12-31 23:00:00,"""458m""","""43824""","""0.00""","""1hr""","""1hr""","""mm""","""mm""","""CET""","""NA""","""-999""","""0.10""",""""""
"""DE_00389""","""Germany""","""00389""","""NA""","""B:/INTENSE data/Original data/…",51.0148,8.4318,2009-11-01 00:00:00,2010-12-31 23:00:00,"""436m""","""10224""","""0.00""","""1hr""","""1hr""","""mm""","""mm""","""CET""","""NA""","""-999""","""0.10""",""""""
"""DE_01300""","""Germany""","""01300""","""NA""","""B:/INTENSE data/Original data/…",51.254,8.1565,2006-01-01 00:00:00,2010-12-31 23:00:00,"""351m""","""43824""","""0.00""","""1hr""","""1hr""","""mm""","""mm""","""CET""","""NA""","""-999""","""0.10""",""""""
"""DE_00390""","""Germany""","""00390""","""NA""","""B:/INTENSE data/Original data/…",50.9837,8.3679,2006-01-01 00:00:00,2010-12-31 23:00:00,"""610m""","""43824""","""0.00""","""1hr""","""1hr""","""mm""","""mm""","""CET""","""NA""","""-999""","""0.10""",""""""
"""DE_06264""","""Germany""","""06264""","""NA""","""B:/INTENSE data/Original data/…",51.4143,8.6498,2006-01-01 00:00:00,2010-12-31 23:00:00,"""457m""","""43824""","""0.00""","""1hr""","""1hr""","""mm""","""mm""","""CET""","""NA""","""-999""","""0.10""",""""""


### Part 2. Compute distance from target station
TODO: What if the location data is in a different projection i.e. EPSG: 27700?  
TODO: Remove duplicates

In [334]:
def compute_distance_from_target_id(metadata, target_id):
    target_station = metadata.filter(pl.col("station_id") == target_id)
    target_latlon = (
        target_station["latitude"].item(),
        target_station["longitude"].item(),
    )

    neighbour_distances = {}
    for other_station_id, other_lat, other_lon in metadata[
        ["station_id", "latitude", "longitude"]
    ].rows():

        neighbour_distances[other_station_id] = geopy.distance.geodesic(
            target_latlon, (other_lat, other_lon)
        ).kilometers
    return neighbour_distances

In [335]:
neighbours_distances = compute_distance_from_target_id(
    all_station_metadata, TARGET_STATION_ID
)
neighbours_distances

{'DE_02718': 24.361752991036987,
 'DE_00389': 18.844342416100808,
 'DE_01300': 24.64263807685722,
 'DE_00390': 23.462778369145934,
 'DE_06264': 28.343769567230837,
 'DE_02483': 0.0,
 'DE_04488': 15.929311137997068,
 'DE_03215': 15.710073576478786,
 'DE_04313': 35.397225659812456,
 'DE_00310': 13.134594190689885,
 'DE_06303': 13.96385857171046}

In [336]:
# ALTERNATIVE: do we want to avoid using geopy.distance and simply write a distance function?
# # ALTERNATIVE: maybe precompute a matrix of distances?
# all_station_dist_mtx = scipy.spatial.distance.cdist(all_station_metadata[['latitude', 'longitude']].rows(),
#                                         all_station_metadata[['latitude', 'longitude']].rows(),
#                                         metric=lambda pnt1, pnt2: geopy.distance.geodesic(pnt1, pnt2).kilometers)

In [337]:
neighbours_distances_df = pl.DataFrame(
    {
        "station_id": neighbours_distances.keys(),
        "distances": neighbours_distances.values(),
    }
)
neighbours_distances_df

station_id,distances
str,f64
"""DE_02718""",24.361753
"""DE_00389""",18.844342
"""DE_01300""",24.642638
"""DE_00390""",23.462778
"""DE_06264""",28.34377
…,…
"""DE_04488""",15.929311
"""DE_03215""",15.710074
"""DE_04313""",35.397226
"""DE_00310""",13.134594


In [338]:
## Subset based on 50 km
close_neighbours = neighbours_distances_df.filter(
    (pl.col("distances") <= DISTANCE_THRESHOLD) & (pl.col("distances") != 0)
)

## closest 10
close_neighbours.sort("distances")[:10]

station_id,distances
str,f64
"""DE_00310""",13.134594
"""DE_06303""",13.963859
"""DE_03215""",15.710074
"""DE_04488""",15.929311
"""DE_00389""",18.844342
"""DE_00390""",23.462778
"""DE_02718""",24.361753
"""DE_01300""",24.642638
"""DE_06264""",28.34377
"""DE_04313""",35.397226


### Part 3. Compute the temporal overlap

In [339]:
def compute_overlap_days(start_1, end_1, start_2, end_2):
    ## TODO: add cast to datetime functionality/checks
    ## compute overlap
    overlap_start = max(start_1, start_2)
    overlap_end = min(end_1, end_2)

    overlap_days = max(0, (overlap_end - overlap_start).days)

    return overlap_days

In [340]:
def compute_overlap_days_from_target_id(metadata, target_id):
    target_station = metadata.filter(pl.col("station_id") == target_id)
    start_1, end_1 = (
        target_station["start_datetime"].item(),
        target_station["end_datetime"].item(),
    )

    neighbour_overlap_days = {}
    for other_station_id, start_2, end_2 in metadata[
        ["station_id", "start_datetime", "end_datetime"]
    ].rows():
        if target_id == other_station_id:
            continue

        neighbour_overlap_days[other_station_id] = compute_overlap_days(
            start_1, end_1, start_2, end_2
        )
    return neighbour_overlap_days

In [341]:
neighbour_overlap_days = compute_overlap_days_from_target_id(
    all_station_metadata, TARGET_STATION_ID
)
neighbour_overlap_days

{'DE_02718': 1825,
 'DE_00389': 425,
 'DE_01300': 1825,
 'DE_00390': 1825,
 'DE_06264': 1825,
 'DE_04488': 1613,
 'DE_03215': 1309,
 'DE_04313': 1825,
 'DE_00310': 1825,
 'DE_06303': 1825}

#### Subset based on 3 years

In [342]:
neighbour_overlap_days_df = pl.DataFrame(
    {
        "station_id": neighbour_overlap_days.keys(),
        "overlap_days": neighbour_overlap_days.values(),
    }
)
neighbour_overlap_days_df

station_id,overlap_days
str,i64
"""DE_02718""",1825
"""DE_00389""",425
"""DE_01300""",1825
"""DE_00390""",1825
"""DE_06264""",1825
"""DE_04488""",1613
"""DE_03215""",1309
"""DE_04313""",1825
"""DE_00310""",1825
"""DE_06303""",1825


In [343]:
neighbour_overlap_days_df.filter(pl.col("overlap_days") >= OVERLAP_THRESHOLD)

station_id,overlap_days
str,i64
"""DE_02718""",1825
"""DE_01300""",1825
"""DE_00390""",1825
"""DE_06264""",1825
"""DE_04488""",1613
"""DE_03215""",1309
"""DE_04313""",1825
"""DE_00310""",1825
"""DE_06303""",1825


## Part 4. Bring together to get neighbours both close and overlapping

In [344]:
num_closest_gauges = 10  ## based on IntenseQC

In [345]:
## Subset based on 50 km
close_neighbour_ids = neighbours_distances_df.filter(
    (pl.col("distances") <= DISTANCE_THRESHOLD) & (pl.col("distances") != 0)
)
## closest 10 values
closest_neighbour_ids = close_neighbour_ids.sort("distances")[:num_closest_gauges][
    "station_id"
].to_list()

## Subset based on 3 years
overlapping_neighbour_ids = neighbour_overlap_days_df.filter(
    pl.col("overlap_days") >= OVERLAP_THRESHOLD
)["station_id"].to_list()

In [346]:
all_neighbour_ids = set(overlapping_neighbour_ids).intersection(
    set(closest_neighbour_ids)
)
all_neighbour_ids

{'DE_00310',
 'DE_00390',
 'DE_01300',
 'DE_02718',
 'DE_03215',
 'DE_04313',
 'DE_04488',
 'DE_06264',
 'DE_06303'}

In [355]:
all_neighbour_ids_paths = {}
for n_id in all_neighbour_ids:
    ids_path = glob.glob(f"{GAUGE_DATA_PATH}/*{n_id}.txt")
    assert len(ids_path) == 1, f"There are {len(ids_path)} data files for {n_id}"
    all_neighbour_ids_paths[n_id] = ids_path[0]

In [356]:
all_neighbour_ids_paths

{'DE_03215': '../data/gauge_data/DE_03215.txt',
 'DE_02718': '../data/gauge_data/DE_02718.txt',
 'DE_04488': '../data/gauge_data/DE_04488.txt',
 'DE_06264': '../data/gauge_data/DE_06264.txt',
 'DE_00390': '../data/gauge_data/DE_00390.txt',
 'DE_01300': '../data/gauge_data/DE_01300.txt',
 'DE_04313': '../data/gauge_data/DE_04313.txt',
 'DE_06303': '../data/gauge_data/DE_06303.txt',
 'DE_00310': '../data/gauge_data/DE_00310.txt'}

## Part 5. Get neighbouring GDSR gauge by ID (an example)

In [357]:
def get_neighbouring_gdsr_data(neighbour_gdsr_id, time_multiplying_factor):
    data_path = all_neighbour_ids_paths[neighbour_gdsr_id]
    stn_metadata = all_station_metadata.filter(
        pl.col("station_id") == neighbour_gdsr_id
    )
    assert (
        len(stn_metadata) == 1
    ), f"There are {len(stn_metadata)} metadata values for {neighbour_gdsr_id}. Investigate because there should only be one"
    stn_metadata = stn_metadata.to_dicts()[0]  ## convert df to a dict

    ## Read in gauge data
    units = stn_metadata[UNIT_COL]
    rain_col_name = f"rain_{units}"
    gdsr_data = pl.read_csv(
        data_path, skip_rows=DATA_ROWS_TO_SKIP, schema_overrides={rain_col_name: pl.Float64}
    )

    ## make datetime column
    gdsr_data_w_dates = add_datetime_to_gauge_data(
        stn_metadata, gdsr_data, time_multiplying_factor=time_multiplying_factor
    )
    gdsr_data_w_dates = gdsr_data_w_dates.select(
        ["time", rain_col_name]
    )  ## Reorder (to look nice)

    return gdsr_data_w_dates

In [358]:
get_neighbouring_gdsr_data(
    neighbour_gdsr_id="DE_06264", time_multiplying_factor=MULTIPLYING_FACTORS["hourly"]
)

time,rain_mm
datetime[μs],f64
2006-01-01 00:00:00,0.0
2006-01-01 01:00:00,0.1
2006-01-01 02:00:00,0.0
2006-01-01 03:00:00,0.0
2006-01-01 04:00:00,0.0
…,…
2010-12-31 19:00:00,0.0
2010-12-31 20:00:00,0.0
2010-12-31 21:00:00,0.0
2010-12-31 22:00:00,0.0


## Part 6. Get neighbouring GPCC gauge by ID (an example)

#### Note:
In the original methodology, GPCC is extracted on the fly

Hence, this needs to be refactored

In [359]:
# TODO: check whether GPCC name is same ID as GDSR i.e. gauge_data ID

In [360]:
existing_gpcc_daily_paths = {}
existing_gpcc_monthly_paths = {}
for neighbour_id in all_neighbour_ids_paths.keys():
    gpcc_id = neighbour_id.split("DE_")[1].lstrip("0")
    existing_gpcc_daily_paths[neighbour_id] = glob.glob(
        f"../data/GPCC/tw_{gpcc_id}.zip"
    )
    existing_gpcc_monthly_paths[neighbour_id] = glob.glob(
        f"../data/GPCC/mw_{gpcc_id}.zip"
    )

In [361]:
gpcc_id_to_use = "DE_00310"
gpcc_id = gpcc_id_to_use.split("DE_")[-1].lstrip("0")  # like 6303
example_dat_path = existing_gpcc_daily_paths[gpcc_id_to_use][0]
f = zipfile.ZipFile(example_dat_path).open(f"tw_{gpcc_id}.dat")
example_gpcc = pl.from_pandas(pd.read_csv(f, skiprows=1, header=None, sep=r"\s+"))

## drop unnecessary columns
example_gpcc = example_gpcc.drop([str(i) for i in range(4, 16)])

## make datetime column (apparently it's 7am-7pm)
example_gpcc = example_gpcc.with_columns(
    pl.datetime(pl.col("2"), pl.col("1"), pl.col("0"), 7).alias("time")
).drop(["0", "1", "2"])

## rename and reorder
example_gpcc = example_gpcc.rename({"3": rain_col})
example_gpcc = example_gpcc.select(["time", rain_col])  ## Reorder (to look nice)

example_gpcc

time,rain_mm
datetime[μs],f64
1951-01-01 07:00:00,5.2
1951-01-02 07:00:00,10.3
1951-01-03 07:00:00,0.0
1951-01-04 07:00:00,3.7
1951-01-05 07:00:00,3.5
…,…
2018-12-27 07:00:00,0.0
2018-12-28 07:00:00,0.2
2018-12-29 07:00:00,6.9
2018-12-30 07:00:00,0.9


In [362]:
## resample into daily (also round to 1 decimal place) TODO: check offset='7h' part
target_gauge_daily = (
    target_gauge.group_by_dynamic("time", every="1d", offset="7h")
    .agg(
        [
            pl.len().alias("n_hours"),
            pl.col(rain_col).mean().round(1).alias(rain_col),
        ]
    )
    .filter(pl.col("n_hours") == 24)
    .drop("n_hours")
)  # Ensure at least 24 data points
target_gauge_daily

time,rain_mm
datetime[μs],f64
2006-01-01 07:00:00,0.2
2006-01-02 07:00:00,0.0
2006-01-03 07:00:00,0.0
2006-01-04 07:00:00,0.0
2006-01-05 07:00:00,0.0
…,…
2010-12-26 07:00:00,0.2
2010-12-27 07:00:00,0.0
2010-12-28 07:00:00,0.0
2010-12-29 07:00:00,0.0


In [363]:
joined_gauges_gpcc = target_gauge_daily.join(
    example_gpcc, on="time", suffix=f"_GPCC_{gpcc_id}"
)
joined_gauges_gpcc = joined_gauges_gpcc.drop_nans()
joined_gauges_gpcc.head()

time,rain_mm,rain_mm_GPCC_310
datetime[μs],f64,f64
2006-01-01 07:00:00,0.2,0.2
2006-01-02 07:00:00,0.0,0.0
2006-01-03 07:00:00,0.0,0.0
2006-01-04 07:00:00,0.0,0.0
2006-01-05 07:00:00,0.0,0.0


## Step 7 Compute factor, affinity index and correlation

In [364]:
## get gauge non-0 minima
def get_target_neighbour_non_zero_minima(
    data, target_col, other_col, default_minima=0.1
):
    target_col_min = np.around(
        data.filter(pl.col(target_col) >= default_minima).min()[target_col], 1
    )[0]
    other_col_min = np.around(
        data.filter(pl.col(other_col) >= default_minima).min()[other_col],
        1,
    )[0]
    non_zero_minima = max(target_col_min, other_col_min, default_minima)
    return non_zero_minima


## get gauge non-0 minima
non_zero_minima = get_target_neighbour_non_zero_minima(
    joined_gauges_gpcc, target_col=rain_col, other_col=f"{rain_col}_GPCC_{gpcc_id}"
)

In [365]:
def get_target_and_neighbour_not_minima_column(
    data, data_minima, target_col, other_col
):
    """

    Parameters
    ----------
    data : pl.DataFrame
    data_minima : float or int

    Returns
    -------

    """
    return data.with_columns(
        pl.when((pl.col(target_col) > data_minima) & (pl.col(other_col) > data_minima))
        .then(1)
        .when(
            (pl.col(target_col) == data_minima) & (pl.col(other_col) == data_minima),
        )
        .then(1)
        .when(
            (pl.col(target_col) == data_minima) & (pl.col(other_col) > data_minima),
        )
        .then(0)
        .when((pl.col(target_col) > data_minima) & (pl.col(other_col) == data_minima))
        .then(0)
        .otherwise(np.nan)
        .alias("gauges_not_minima")
    )

In [366]:
joined_gauges_gpcc_minima = get_target_and_neighbour_not_minima_column(
    joined_gauges_gpcc,
    data_minima=non_zero_minima,
    target_col=rain_col,
    other_col=f"{rain_col}_GPCC_{gpcc_id}",
)
joined_gauges_gpcc_minima

time,rain_mm,rain_mm_GPCC_310,gauges_not_minima
datetime[μs],f64,f64,f64
2006-01-01 07:00:00,0.2,0.2,1.0
2006-01-02 07:00:00,0.0,0.0,
2006-01-03 07:00:00,0.0,0.0,
2006-01-04 07:00:00,0.0,0.0,
2006-01-05 07:00:00,0.0,0.0,
…,…,…,…
2010-12-26 07:00:00,0.2,2.9,1.0
2010-12-27 07:00:00,0.0,0.2,
2010-12-28 07:00:00,0.0,0.0,
2010-12-29 07:00:00,0.0,0.0,


In [367]:
def compute_factor_diff_target_and_neighbour(data, target_col, other_col):
    return data.with_columns(
        pl.when((pl.col(target_col) > 0) & (pl.col(other_col) > 0))
        .then(pl.col(target_col) / pl.col(other_col))
        .otherwise(np.nan)
        .alias("factor_diff")
    )

In [368]:
joined_gauges_gpcc_factor = compute_factor_diff_target_and_neighbour(joined_gauges_gpcc, target_col=rain_col, other_col=f"{rain_col}_GPCC_{gpcc_id}")
joined_gauges_gpcc_factor

time,rain_mm,rain_mm_GPCC_310,factor
datetime[μs],f64,f64,f64
2006-01-01 07:00:00,0.2,0.2,1.0
2006-01-02 07:00:00,0.0,0.0,
2006-01-03 07:00:00,0.0,0.0,
2006-01-04 07:00:00,0.0,0.0,
2006-01-05 07:00:00,0.0,0.0,
…,…,…,…
2010-12-26 07:00:00,0.2,2.9,0.068966
2010-12-27 07:00:00,0.0,0.2,
2010-12-28 07:00:00,0.0,0.0,
2010-12-29 07:00:00,0.0,0.0,


In [369]:
def calc_affinity_index(data, return_match_and_diff=False):
    match = (
        data["gauges_not_minima"]
        .value_counts()
        .filter(pl.col("gauges_not_minima") == 1)["count"]
        .item()
    )
    diff = (
        data["gauges_not_minima"]
        .value_counts()
        .filter(pl.col("gauges_not_minima") == 0)["count"]
        .item()
    )
    affinity = match / (match + diff)
    if return_match_and_diff:
        return match, diff, affinity
    return affinity


def calc_gauge_correlation(data, rain_col, other_col):
    return np.ma.corrcoef(
        np.ma.masked_invalid(data[rain_col]), np.ma.masked_invalid(data[other_col])
    )[0, 1]

In [370]:
match, diff, affinity = calc_affinity_index(
    joined_gauges_gpcc_minima, return_match_and_diff=True
)
print(f"different rows: {diff}, matching rows: {match}, affinity :{affinity}")

different rows: 157, matching rows: 540, affinity :0.7747489239598279


In [372]:
gauge_corr = calc_gauge_correlation(
    joined_gauges_gpcc, rain_col=rain_col, other_col=f"{rain_col}_GPCC_{gpcc_id}"
)
factor_mean = joined_gauges_gpcc_factor["factor_diff"].drop_nans().mean()
print(f"diff: {diff}, match:{match}")
print(
    f"gauge affinity: {affinity}, gauge correlation: {gauge_corr}, mean factor diff: {factor_mean}"
)

diff: 157, match:540
gauge affinity: 0.7747489239598279, gauge correlation: 0.0114714970965478, mean factor diff: 0.17114986225908063


## Part 8 Compare target with neighbour (hourly and daily)
- For hourly data, the data is first converted to daily to do comparison

_Output:_ df with long list of neighbour columns and flags

Works by computing differences from target and each of its neighbours then collates all those differences and associated difference flags into a single flag/column that describes how similar target is from neighbours


In [373]:
gpcc_id_to_use = "DE_00310"
gpcc_id = gpcc_id_to_use.split("DE_")[-1].lstrip("0")  # like 6303

In [374]:
joined_gauges_gpcc = target_gauge_daily.join(
    example_gpcc, on="time", suffix=f"_GPCC_{gpcc_id}"
)
joined_gauges_gpcc = joined_gauges_gpcc.drop_nans()
joined_gauges_gpcc.head()

time,rain_mm,rain_mm_GPCC_310
datetime[μs],f64,f64
2006-01-01 07:00:00,0.2,0.2
2006-01-02 07:00:00,0.0,0.0
2006-01-03 07:00:00,0.0,0.0
2006-01-04 07:00:00,0.0,0.0
2006-01-05 07:00:00,0.0,0.0


## Part 8.1 Wet neighbours
- This is normalised difference
TODO: Problem that there is not enough data which is wetter in the target than in the GPCC neighbour

In [375]:
def normalise_data(data):
    return (data - data.min()) / (data.max() - data.min())

In [376]:
joined_gauges_gpcc_normalised_diff = joined_gauges_gpcc.with_columns(
    # get normalised difference between target and neighbour
    rain_mm_normalised_diff=normalise_data(pl.col(f"{rain_col}"))
    - normalise_data(pl.col(f"{rain_col}_GPCC_{gpcc_id}"))
)

In [377]:
WET_THRESHOLD = 0.3  # default in original methodology is 1
joined_gauges_gpcc_normalised_diff_filtered = joined_gauges_gpcc_normalised_diff.filter(
    (pl.col(f"{rain_col}") >= WET_THRESHOLD)
    & (pl.col(f"{rain_col}").is_finite())
    & (pl.col(f"{rain_col}_GPCC_{gpcc_id}").is_finite())
    & (pl.col(f"{rain_col}_normalised_diff") > 0.0)
)

In [378]:
joined_gauges_gpcc_normalised_diff_filtered

time,rain_mm,rain_mm_GPCC_310,rain_mm_normalised_diff
datetime[μs],f64,f64,f64
2006-04-25 07:00:00,0.3,0.0,0.000941
2006-05-13 07:00:00,0.3,0.0,0.000941
2006-12-23 07:00:00,112.5,0.0,0.352886
2006-12-24 07:00:00,318.8,0.0,1.0
2007-03-16 07:00:00,3.1,0.0,0.009724
…,…,…,…
2009-11-06 07:00:00,9.8,0.9,0.015486
2009-11-18 07:00:00,1.2,0.0,0.003764
2009-11-19 07:00:00,0.8,0.0,0.002509
2010-08-09 07:00:00,1.3,0.0,0.004078


In [379]:
if not len(joined_gauges_gpcc_normalised_diff_filtered) >= 30:
    print(
        "Original methodology needs there to be at least 30 values to fit exponential function"
    )

Original methodology needs there to be at least 30 values to fit exponential function


In [380]:
expon_params = scipy.stats.expon.fit(
    joined_gauges_gpcc_normalised_diff_filtered[f"{rain_col}_normalised_diff"]
)

In [381]:
# Calculate thresholds at key percentiles of fitted distribution
q95 = scipy.stats.expon.ppf(0.95, expon_params[0], expon_params[1])
q99 = scipy.stats.expon.ppf(0.99, expon_params[0], expon_params[1])
q999 = scipy.stats.expon.ppf(0.999, expon_params[0], expon_params[1])

q95, q99, q999

(np.float64(0.3444910378117832),
 np.float64(0.5294660929815043),
 np.float64(0.7941055682411567))

In [382]:
## Assign flags
joined_gauges_gpcc_normalised_wet_flags = (
    joined_gauges_gpcc_normalised_diff.with_columns(
        pl.when(
            (pl.col(rain_col) >= 1.0) & (pl.col(f"{rain_col}_normalised_diff") <= q95)
        )
        .then(0)
        .when(
            (pl.col(rain_col) >= 1.0)
            & (pl.col(f"{rain_col}_normalised_diff") > q95)
            & (pl.col(f"{rain_col}_normalised_diff") <= q99),
        )
        .then(1)
        .when(
            (pl.col(rain_col) >= 1.0)
            & (pl.col(f"{rain_col}_normalised_diff") > q99)
            & (pl.col(f"{rain_col}_normalised_diff") <= q999),
        )
        .then(2)
        .when((pl.col(rain_col) >= 1.0) & (pl.col(f"{rain_col}_normalised_diff") > q95))
        .then(3)
        .otherwise(0)
        .alias("wet_flags")
    )
)

In [383]:
joined_gauges_gpcc_normalised_wet_flags["wet_flags"].value_counts()

wet_flags,count
i32,u32
2,1
1,1
3,1
0,1757


## Part 8.2 Dry neighbours

In [384]:
DRY_PERIOD_DAYS = 15
gpcc_col_name = f"{rain_col}_GPCC_{gpcc_id}"

In [385]:
joined_gauges_gpcc.head()

time,rain_mm,rain_mm_GPCC_310
datetime[μs],f64,f64
2006-01-01 07:00:00,0.2,0.2
2006-01-02 07:00:00,0.0,0.0
2006-01-03 07:00:00,0.0,0.0
2006-01-04 07:00:00,0.0,0.0
2006-01-05 07:00:00,0.0,0.0


In [386]:
## TODO: This might use preexisting code, as this code is essentially looking for dry periods
def convert_to_dry_spell(rain_daily, dry_period_days, col_name):
    ## Convert to polars dataframe to use 'with_columns'
    if not isinstance(rain_daily, pl.DataFrame):
        rain_daily = rain_daily.to_frame()

    ## make dry day column
    rain_daily_dry_day = add_dry_day_col(rain_daily, col_name)
    rain_daily_dry_day = rain_daily_dry_day.with_columns(
        dry_spell_fraction=pl.col("dry_day").rolling_sum(
            window_size=dry_period_days, min_samples=dry_period_days
        )
        / dry_period_days
    )
    return rain_daily_dry_day["dry_spell_fraction"]


def add_dry_day_col(rain_daily, col_name):
    rain_daily_dry_day = rain_daily.with_columns(
        (pl.col(col_name) == 0).cast(pl.Int8()).alias("dry_day")
    )
    return rain_daily_dry_day

In [387]:
joined_gauges_gpcc_dry_spell = joined_gauges_gpcc.with_columns(
    pl.col(rain_col)
    .map_batches(
        lambda row: convert_to_dry_spell(row, DRY_PERIOD_DAYS, col_name=rain_col)
    )
    .alias("dry_spell_fraction"),
    pl.col(gpcc_col_name)
    .map_batches(
        lambda row: convert_to_dry_spell(row, DRY_PERIOD_DAYS, col_name=gpcc_col_name)
    )
    .alias(f"dry_spell_fraction_{gpcc_id}"),
)

In [388]:
joined_gauges_gpcc_dry_spell.head()

time,rain_mm,rain_mm_GPCC_310,dry_spell_fraction,dry_spell_fraction_310
datetime[μs],f64,f64,f64,f64
2006-01-01 07:00:00,0.2,0.2,,
2006-01-02 07:00:00,0.0,0.0,,
2006-01-03 07:00:00,0.0,0.0,,
2006-01-04 07:00:00,0.0,0.0,,
2006-01-05 07:00:00,0.0,0.0,,


In [389]:
## TODO: why have this part?
## consider only whether dry 15-day periods are corrobated as dry by neighbours
## dry flag works on the basis of fraction of dry days within 15-day
## check based on whether 0, 1, 2 or >= 3 wet days are recorded at the neighbour when the target is dry over the 15-day period
## truncating these fractions to 2 dp below and manipulating equalities to work with these fractions, but could work in days not fractions if change the convertToDrySpell function
dry_period = DRY_PERIOD_DAYS
fraction_drydays = {}
for d in range(1, 3 + 1):
    fraction_drydays[str(d)] = np.trunc((1.0 - (float(d) / dry_period)) * 10**2) / (
        10**2
    )

fraction_drydays  # moving window, so 1 = all dry, 0 = all wet

{'1': np.float64(0.93), '2': np.float64(0.86), '3': np.float64(0.8)}

In [390]:
## Assign flags
joined_gauges_gpcc_dry_flags = joined_gauges_gpcc_dry_spell.with_columns(
    pl.when(
        (pl.col("dry_spell_fraction") == 1.0)
        & (pl.col(f"dry_spell_fraction_{gpcc_id}") == 1.0)
    )
    .then(0)
    .when(
        (pl.col("dry_spell_fraction") == 1.0)
        & (pl.col(f"dry_spell_fraction_{gpcc_id}") < 1.0)
        & (pl.col(f"dry_spell_fraction_{gpcc_id}") >= fraction_drydays["1"]),
    )
    .then(1)
    .when(
        (pl.col("dry_spell_fraction") == 1.0)
        & (pl.col(f"dry_spell_fraction_{gpcc_id}") < fraction_drydays["1"])
        & (pl.col(f"dry_spell_fraction_{gpcc_id}") >= fraction_drydays["2"]),
    )
    .then(2)
    .when(
        (pl.col("dry_spell_fraction") == 1.0)
        & (pl.col(f"dry_spell_fraction_{gpcc_id}") < fraction_drydays["2"])
    )
    .then(3)
    .otherwise(0)
    .alias("dry_flags")
)

In [391]:
joined_gauges_gpcc_dry_flags["dry_flags"].value_counts()

dry_flags,count
i32,u32
1,1
3,126
0,1629
2,4


## Part 9 Compare target with neighbour (monthly) 
- Works differently from hourly and daily
- original methodology uses bfill which uses the next available value to fillNA

In [392]:
gpcc_id_to_use = "DE_03215"
gpcc_id = gpcc_id_to_use.split("DE_")[-1].lstrip("0")  # like 6303

In [393]:
## TODO: get gpcc monthly
joined_gauges_gpcc = target_gauge_daily.join(
    example_gpcc, on="time", suffix=f"_GPCC_{gpcc_id}"
)
joined_gauges_gpcc = joined_gauges_gpcc.drop_nans()
joined_gauges_gpcc.head()

time,rain_mm,rain_mm_GPCC_3215
datetime[μs],f64,f64
2006-01-01 07:00:00,0.2,0.2
2006-01-02 07:00:00,0.0,0.0
2006-01-03 07:00:00,0.0,0.0
2006-01-04 07:00:00,0.0,0.0
2006-01-05 07:00:00,0.0,0.0


In [394]:
gpcc_col_name = f"{rain_col}_GPCC_{gpcc_id}"

In [395]:
## resample into monthly
joined_gauges_gpcc_monthly = joined_gauges_gpcc.group_by_dynamic(
    "time", every="1mo", offset="7h"
).agg(
    [
        pl.col(rain_col).mean().round(1).alias(rain_col),
        pl.col(gpcc_col_name).mean().round(1).alias(gpcc_col_name),
    ]
)  # TODO: Ensure at least 24*n_days_in_month data points

In [396]:
joined_gauges_gpcc_monthly.head()

time,rain_mm,rain_mm_GPCC_3215
datetime[μs],f64,f64
2006-01-01 07:00:00,0.1,1.1
2006-02-01 07:00:00,0.3,3.0
2006-03-01 07:00:00,0.4,3.6
2006-04-01 07:00:00,0.3,2.3
2006-05-01 07:00:00,0.6,4.6


In [397]:
joined_gauges_gpcc_monthly_perc_diff = joined_gauges_gpcc_monthly.with_columns(
    perc_diff=((pl.col(rain_col) - pl.col(gpcc_col_name)) * 100.0)
    / pl.col(gpcc_col_name),
    factor_diff=pl.col(rain_col) / pl.col(gpcc_col_name),
)

In [398]:
joined_gauges_gpcc_monthly_flags = joined_gauges_gpcc_monthly_perc_diff.with_columns(
    pl.when(pl.col("perc_diff") <= -100.0)
    .then(-3)
    .when((pl.col("perc_diff") <= -50.0) & (pl.col("perc_diff") > -100.0))
    .then(-2)
    .when((pl.col("perc_diff") <= -25.0) & (pl.col("perc_diff") > -50.0))
    .then(-1)
    .when((pl.col("perc_diff") < 25.0) & (pl.col("perc_diff") > -25.0))
    .then(0)
    .when((pl.col("perc_diff") >= 25.0) & (pl.col("perc_diff") < 50.0))
    .then(1)
    .when((pl.col("perc_diff") >= 50.0) & (pl.col("perc_diff") < 100.0))
    .then(2)
    .when(pl.col("perc_diff") >= 100.0)
    .then(3)
    .otherwise(None)
    .alias("precip_flags"),
    pl.when((pl.col("factor_diff") < 11) & (pl.col("factor_diff") > 9))
    .then(1)
    .when((pl.col("factor_diff") < 26) & (pl.col("factor_diff") > 24))
    .then(2)
    .when((pl.col("factor_diff") < 3) & (pl.col("factor_diff") > 2))
    .then(3)
    .when((pl.col("factor_diff") > 1 / 11) & (pl.col("factor_diff") < 1 / 9))
    .then(4)
    .when((pl.col("factor_diff") > 1 / 26) & (pl.col("factor_diff") < 1 / 24))
    .then(5)
    .when((pl.col("factor_diff") > 1 / 3) & (pl.col("factor_diff") < 1 / 2))
    .then(6)
    .otherwise(0)
    .alias("factor_flags"),
)

In [399]:
joined_gauges_gpcc_monthly_flags["precip_flags"].value_counts()

precip_flags,count
i32,u32
-3,6
3,2
-2,52


In [400]:
joined_gauges_gpcc_monthly_flags["factor_flags"].value_counts()

factor_flags,count
i32,u32
0,55
4,5


In [401]:
## Set up NaN
joined_gauges_gpcc_monthly_flags = joined_gauges_gpcc_monthly_flags.with_columns(
    pl.when(pl.col(rain_col).is_nan())
    .then(np.nan)
    .otherwise(pl.col("factor_flags"))
    .alias("factor_flags"),
    pl.when(pl.col(rain_col).is_nan())
    .then(np.nan)
    .otherwise(pl.col("precip_flags"))
    .alias("precip_flags"),
)

# QC16 - Daily neighbours (wet)
[Back to Index](#Table-of-contents)

#### Differences from `intense-qc`: 
- Many, although mainly this implementation opens the neighbour wet and dry functions to parameter tweaking

#### Step 1. resample to daily resolution

In [74]:
## resample into daily (also round to 1 decimal place) TODO: check offset='7h' part
target_gauge_daily = (
    target_gauge.group_by_dynamic("time", every="1d", offset="7h", closed="right")
    .agg(
        [
            pl.len().alias("n_hours"),
            pl.col(rain_col).mean().round(1).alias(rain_col),
        ]
    )
    .filter(pl.col("n_hours") == 24)
    .drop("n_hours")
)  # Ensure at least 24 data points

In [75]:
target_gauge_daily.head()

time,rain_mm
datetime[μs],f64
2006-01-01 07:00:00,0.2
2006-01-02 07:00:00,0.0
2006-01-03 07:00:00,0.0
2006-01-04 07:00:00,0.0
2006-01-05 07:00:00,0.0


#### Step 2. Join each GPCC data & compare to target

In [76]:
existing_gpcc_daily_paths

{'DE_03215': ['../data/GPCC/tw_3215.zip'],
 'DE_02718': [],
 'DE_04488': [],
 'DE_06264': [],
 'DE_00390': [],
 'DE_01300': [],
 'DE_04313': [],
 'DE_06303': ['../data/GPCC/tw_6303.zip'],
 'DE_00310': ['../data/GPCC/tw_310.zip']}

In [77]:
all_data = target_gauge_daily.clone()

for gpcc_id_to_use, gpcc_path in existing_gpcc_daily_paths.items():
    ## 0. Load GPCC data
    gpcc_id_name = gpcc_id_to_use.split("DE_")[1].lstrip("0")
    gpcc_col_name = f"{rain_col}_GPCC_{gpcc_id_name}"
    if len(gpcc_path) == 0:
        continue
    f = zipfile.ZipFile(gpcc_path[0]).open(f"tw_{gpcc_id_name}.dat")
    example_gpcc = pl.from_pandas(pd.read_csv(f, skiprows=1, header=None, sep=r"\s+"))

    ## 1. drop unnecessary columns
    example_gpcc = example_gpcc.drop([str(i) for i in range(4, 16)])

    ## 2. make datetime column (apparently it's 7am-7pm)
    example_gpcc = example_gpcc.with_columns(
        pl.datetime(pl.col("2"), pl.col("1"), pl.col("0"), 7).alias("time")
    ).drop(["0", "1", "2"])

    ## 3. rename and reorder
    example_gpcc = example_gpcc.rename({"3": rain_col})
    example_gpcc = example_gpcc.select(["time", rain_col])  ## Reorder (to look nice)

    ## 4. join to target data
    joined_gauges_gpcc = target_gauge_daily.join(
        example_gpcc, on="time", suffix=f"_GPCC_{gpcc_id_name}"
    )
    joined_gauges_gpcc = joined_gauges_gpcc.drop_nans()

    ## 5. get normalised diff
    joined_gauges_gpcc_normalised_diff = joined_gauges_gpcc.with_columns(
        # get normalised difference between target and neighbour
        rain_mm_normalised_diff=normalise_data(pl.col(f"{rain_col}"))
        - normalise_data(pl.col(f"{rain_col}_GPCC_{gpcc_id_name}"))
    )

    ## 6. filter values
    joined_gauges_gpcc_normalised_diff_filtered = (
        joined_gauges_gpcc_normalised_diff.filter(
            (pl.col(f"{rain_col}") >= WET_THRESHOLD)
            & (pl.col(f"{rain_col}").is_finite())
            & (pl.col(f"{rain_col}_GPCC_{gpcc_id_name}").is_finite())
            & (pl.col(f"{rain_col}_normalised_diff") > 0.0)
        )
    )

    ## 7. Calculate exponential function of normalised diff
    expon_params = scipy.stats.expon.fit(
        joined_gauges_gpcc_normalised_diff_filtered[f"{rain_col}_normalised_diff"]
    )
    # 8. Calculate thresholds at key percentiles of fitted distribution
    q95 = scipy.stats.expon.ppf(0.95, expon_params[0], expon_params[1])
    q99 = scipy.stats.expon.ppf(0.99, expon_params[0], expon_params[1])
    q999 = scipy.stats.expon.ppf(0.999, expon_params[0], expon_params[1])

    ## 9. Assign flags
    joined_gauges_gpcc_normalised_wet_flags = (
        joined_gauges_gpcc_normalised_diff.with_columns(
            pl.when(
                (pl.col(rain_col) >= 1.0)
                & (pl.col(f"{rain_col}_normalised_diff") <= q95)
            )
            .then(0)
            .when(
                (pl.col(rain_col) >= 1.0)
                & (pl.col(f"{rain_col}_normalised_diff") > q95)
                & (pl.col(f"{rain_col}_normalised_diff") <= q99),
            )
            .then(1)
            .when(
                (pl.col(rain_col) >= 1.0)
                & (pl.col(f"{rain_col}_normalised_diff") > q99)
                & (pl.col(f"{rain_col}_normalised_diff") <= q999),
            )
            .then(2)
            .when(
                (pl.col(rain_col) >= 1.0)
                & (pl.col(f"{rain_col}_normalised_diff") > q95)
            )
            .then(3)
            .otherwise(0)
            .alias(f"wet_flags_{gpcc_id_name}")
        )
    )

    ## 10. Join to all data
    all_data = all_data.join(
        joined_gauges_gpcc_normalised_wet_flags[["time", f"wet_flags_{gpcc_id_name}"]],
        on="time",
        how="left",
    )
    print(
        joined_gauges_gpcc_normalised_wet_flags[
            f"wet_flags_{gpcc_id_name}"
        ].value_counts()
    )

shape: (4, 2)
┌────────────────┬───────┐
│ wet_flags_3215 ┆ count │
│ ---            ┆ ---   │
│ i32            ┆ u32   │
╞════════════════╪═══════╡
│ 2              ┆ 1     │
│ 1              ┆ 4     │
│ 0              ┆ 1250  │
│ 3              ┆ 1     │
└────────────────┴───────┘
shape: (4, 2)
┌────────────────┬───────┐
│ wet_flags_6303 ┆ count │
│ ---            ┆ ---   │
│ i32            ┆ u32   │
╞════════════════╪═══════╡
│ 2              ┆ 1     │
│ 3              ┆ 1     │
│ 1              ┆ 1     │
│ 0              ┆ 1756  │
└────────────────┴───────┘
shape: (4, 2)
┌───────────────┬───────┐
│ wet_flags_310 ┆ count │
│ ---           ┆ ---   │
│ i32           ┆ u32   │
╞═══════════════╪═══════╡
│ 3             ┆ 1     │
│ 1             ┆ 1     │
│ 2             ┆ 1     │
│ 0             ┆ 1756  │
└───────────────┴───────┘


#### Step 3. Calculate num neighbours online

In [78]:
## global determining the minimum number of non-null neighbours to count flags
MIN_NUM_NEIGHBOURS = 2  # original method is 3

In [79]:
num_neighbours = len(all_data.columns[2:])
num_neighbours

3

In [80]:
all_data_num_neighbours = all_data.with_columns(
    (
        num_neighbours
        - pl.sum_horizontal(pl.all().exclude("time").is_null().cast(pl.Int16))
    ).alias("num_neighbours_online")
)
all_data_num_neighbours

time,rain_mm,wet_flags_3215,wet_flags_6303,wet_flags_310,num_neighbours_online
datetime[μs],f64,i32,i32,i32,i16
2006-01-01 07:00:00,0.2,,0,0,2
2006-01-02 07:00:00,0.0,,0,0,2
2006-01-03 07:00:00,0.0,,0,0,2
2006-01-04 07:00:00,0.0,,0,0,2
2006-01-05 07:00:00,0.0,,0,0,2
…,…,…,…,…,…
2010-12-26 07:00:00,0.2,0,0,0,3
2010-12-27 07:00:00,0.0,0,0,0,3
2010-12-28 07:00:00,0.0,0,0,0,3
2010-12-29 07:00:00,0.0,0,0,0,3


In [81]:
all_data_num_neighbours["num_neighbours_online"].plot.line()

#### Step 4. Majority voting flags for the target

In [82]:
all_data_wet_flags = all_data_num_neighbours.with_columns(
    pl.when(pl.col("num_neighbours_online") < MIN_NUM_NEIGHBOURS)
    .then(np.nan)
    .otherwise(
        pl.min_horizontal(pl.all().exclude("time", rain_col, "num_neighbours_online"))
    )
    .alias("wet_flags")
)

In [83]:
all_data_wet_flags["wet_flags"].value_counts()

wet_flags,count
f64,u32
1.0,1
2.0,1
3.0,1
,66
0.0,1756


##### Extra note about method:
We are checking for minimum 2 neighbours, but original method needs 3

In [84]:
all_data_num_neighbours.filter(pl.col("wet_flags_310") == 3)

time,rain_mm,wet_flags_3215,wet_flags_6303,wet_flags_310,num_neighbours_online
datetime[μs],f64,i32,i32,i32,i16
2006-12-24 07:00:00,300.0,,3,3,2


# QC17 - Hourly neighbours (wet) 
[Back to Index](#Table-of-contents)

#### Differences from `intense-qc`: 
- Need to consider how the 7am-7pm can fit into RainfallQC

#### Step 0. resample to daily resolution

In [85]:
## resample into daily (also round to 1 decimal place) TODO: check offset='7h' part
target_gauge_daily = (
    target_gauge.group_by_dynamic("time", every="1d", offset="7h", closed="right")
    .agg(
        [
            pl.len().alias("n_hours"),
            pl.col(rain_col).mean().round(1).alias(rain_col),
        ]
    )
    .filter(pl.col("n_hours") == 24)
    .drop("n_hours")
)  # Ensure at least 24 data points

In [86]:
target_gauge_daily.head()

time,rain_mm
datetime[μs],f64
2006-01-01 07:00:00,0.2
2006-01-02 07:00:00,0.0
2006-01-03 07:00:00,0.0
2006-01-04 07:00:00,0.0
2006-01-05 07:00:00,0.0


#### Step 1. Loop through each neighbour and compare to target

In [87]:
all_data = (
    target_gauge_daily.clone()
)  ## currently daily instead of hourly because this works best with later flood-fill

for n_id in all_neighbour_ids_paths.keys():
    ## 1. Get neighbouring GDSR data nd resample into daily
    print(n_id)
    neighbouring_gdsr_gauge = get_neighbouring_gdsr_data(
        neighbour_gdsr_id=n_id, time_multiplying_factor=MULTIPLYING_FACTORS["hourly"]
    )
    ## make no data vals nans
    station_metadata = read_metadata(data_path=all_neighbour_ids_paths[n_id])
    neighbouring_gdsr_gauge = replace_no_data_with_nan(
        neighbouring_gdsr_gauge, no_data_value=int(station_metadata["no_data_value"])
    )

    joined_gauges_gdsr = target_gauge.join(
        neighbouring_gdsr_gauge, on="time", suffix=f"_{n_id}"
    )
    joined_gauges_gdsr = joined_gauges_gdsr.drop_nans()

    ## resample into daily (also round to 1 decimal place) TODO: think about offset part
    joined_gauges_gdsr = (
        joined_gauges_gdsr.group_by_dynamic("time", every="1d", offset="7h")
        .agg(
            [
                pl.len().alias("n_hours"),
                pl.col(rain_col).mean().round(1).alias(rain_col),
                pl.col(f"{rain_col}_{n_id}")
                .mean()
                .round(1)
                .alias(f"{rain_col}_GDSR_{n_id}"),
            ]
        )
        .filter(pl.col("n_hours") == 24)
        .drop("n_hours")
    )  # Ensure at least 24 data points

    ## 2. get normalised diff
    joined_gauges_gdsr_normalised_diff = joined_gauges_gdsr.with_columns(
        # get normalised difference between target and neighbour
        rain_mm_normalised_diff=normalise_data(pl.col(f"{rain_col}"))
        - normalise_data(pl.col(f"{rain_col}_GDSR_{n_id}"))
    )

    ## 3. filter values
    joined_gauges_gdsr_normalised_diff_filtered = (
        joined_gauges_gdsr_normalised_diff.filter(
            (pl.col(f"{rain_col}") >= WET_THRESHOLD)
            & (pl.col(f"{rain_col}").is_finite())
            & (pl.col(f"{rain_col}_GDSR_{n_id}").is_finite())
            & (pl.col(f"{rain_col}_normalised_diff") > 0.0)
        )
    )

    ## 4. Calcualte exponetial function of normalised diff
    expon_params = scipy.stats.expon.fit(
        joined_gauges_gdsr_normalised_diff_filtered[f"{rain_col}_normalised_diff"]
    )
    # 5. Calculate thresholds at key percentiles of fitted distribution
    q95 = scipy.stats.expon.ppf(0.95, expon_params[0], expon_params[1])
    q99 = scipy.stats.expon.ppf(0.99, expon_params[0], expon_params[1])
    q999 = scipy.stats.expon.ppf(0.999, expon_params[0], expon_params[1])

    ## 6. Assign flags
    joined_gauges_gdsr_normalised_wet_flags = (
        joined_gauges_gdsr_normalised_diff.with_columns(
            pl.when(
                (pl.col(rain_col) >= 1.0)
                & (pl.col(f"{rain_col}_normalised_diff") <= q95)
            )
            .then(0)
            .when(
                (pl.col(rain_col) >= 1.0)
                & (pl.col(f"{rain_col}_normalised_diff") > q95)
                & (pl.col(f"{rain_col}_normalised_diff") <= q99),
            )
            .then(1)
            .when(
                (pl.col(rain_col) >= 1.0)
                & (pl.col(f"{rain_col}_normalised_diff") > q99)
                & (pl.col(f"{rain_col}_normalised_diff") <= q999),
            )
            .then(2)
            .when(
                (pl.col(rain_col) >= 1.0)
                & (pl.col(f"{rain_col}_normalised_diff") > q95)
            )
            .then(3)
            .otherwise(0)
            .alias(f"wet_flags_{n_id}")
        )
    )
    all_data = all_data.join(
        joined_gauges_gdsr_normalised_wet_flags[["time", f"wet_flags_{n_id}"]],
        on="time",
        how="left",
    )

    print(
        "before turning back into hourly", all_data[f"wet_flags_{n_id}"].value_counts()
    )

DE_03215
before turning back into hourly shape: (5, 2)
┌────────────────────┬───────┐
│ wet_flags_DE_03215 ┆ count │
│ ---                ┆ ---   │
│ i32                ┆ u32   │
╞════════════════════╪═══════╡
│ 2                  ┆ 1     │
│ 1                  ┆ 4     │
│ 0                  ┆ 1244  │
│ null               ┆ 575   │
│ 3                  ┆ 1     │
└────────────────────┴───────┘
DE_02718
before turning back into hourly shape: (3, 2)
┌────────────────────┬───────┐
│ wet_flags_DE_02718 ┆ count │
│ ---                ┆ ---   │
│ i32                ┆ u32   │
╞════════════════════╪═══════╡
│ 3                  ┆ 3     │
│ null               ┆ 68    │
│ 0                  ┆ 1754  │
└────────────────────┴───────┘
DE_04488
before turning back into hourly shape: (4, 2)
┌────────────────────┬───────┐
│ wet_flags_DE_04488 ┆ count │
│ ---                ┆ ---   │
│ i32                ┆ u32   │
╞════════════════════╪═══════╡
│ 2                  ┆ 1     │
│ null               ┆ 274   

#### Step 2. Calculate num neighbours online

In [88]:
## global determining the minimum number of non-null neighbours to count flags
MIN_NUM_NEIGHBOURS = 2  # original method is 3

In [89]:
all_data.head()

time,rain_mm,wet_flags_DE_03215,wet_flags_DE_02718,wet_flags_DE_04488,wet_flags_DE_06264,wet_flags_DE_00390,wet_flags_DE_01300,wet_flags_DE_04313,wet_flags_DE_06303,wet_flags_DE_00310
datetime[μs],f64,i32,i32,i32,i32,i32,i32,i32,i32,i32
2006-01-01 07:00:00,0.2,,0,,0,0,0,0,0,0
2006-01-02 07:00:00,0.0,,0,,0,0,0,0,0,0
2006-01-03 07:00:00,0.0,,0,,0,0,0,0,0,0
2006-01-04 07:00:00,0.0,,0,,0,0,0,0,0,0
2006-01-05 07:00:00,0.0,,0,,0,0,0,0,0,0


In [90]:
num_neighbours = len(all_data.columns[2:])
num_neighbours

9

In [91]:
all_data_num_neighbours = all_data.with_columns(
    (
        num_neighbours
        - pl.sum_horizontal(pl.all().exclude("time").is_null().cast(pl.Int16))
    ).alias("num_neighbours_online")
)
all_data_num_neighbours

time,rain_mm,wet_flags_DE_03215,wet_flags_DE_02718,wet_flags_DE_04488,wet_flags_DE_06264,wet_flags_DE_00390,wet_flags_DE_01300,wet_flags_DE_04313,wet_flags_DE_06303,wet_flags_DE_00310,num_neighbours_online
datetime[μs],f64,i32,i32,i32,i32,i32,i32,i32,i32,i32,i16
2006-01-01 07:00:00,0.2,,0,,0,0,0,0,0,0,7
2006-01-02 07:00:00,0.0,,0,,0,0,0,0,0,0,7
2006-01-03 07:00:00,0.0,,0,,0,0,0,0,0,0,7
2006-01-04 07:00:00,0.0,,0,,0,0,0,0,0,0,7
2006-01-05 07:00:00,0.0,,0,,0,0,0,0,0,0,7
…,…,…,…,…,…,…,…,…,…,…,…
2010-12-26 07:00:00,0.2,0,0,0,0,0,0,0,0,0,9
2010-12-27 07:00:00,0.0,0,0,0,0,0,0,0,0,0,9
2010-12-28 07:00:00,0.0,0,0,0,0,0,0,0,0,0,9
2010-12-29 07:00:00,0.0,0,0,0,0,0,0,0,0,0,9


In [92]:
all_data_num_neighbours["num_neighbours_online"].value_counts()

num_neighbours_online,count
i16,u32
0,65
4,1
3,1
9,1210
6,9
7,203
5,1
8,335


In [93]:
all_data.filter(pl.col("wet_flags_DE_00310") == 2)[0]

time,rain_mm,wet_flags_DE_03215,wet_flags_DE_02718,wet_flags_DE_04488,wet_flags_DE_06264,wet_flags_DE_00390,wet_flags_DE_01300,wet_flags_DE_04313,wet_flags_DE_06303,wet_flags_DE_00310
datetime[μs],f64,i32,i32,i32,i32,i32,i32,i32,i32,i32
2006-12-23 07:00:00,131.3,,3,2,3,3,2,3,2,2


#### Step 3. Majority voting flags for the target

In [94]:
all_data_wet_flags = all_data_num_neighbours.with_columns(
    pl.when(pl.col("num_neighbours_online") < MIN_NUM_NEIGHBOURS)
    .then(np.nan)
    .otherwise(
        pl.min_horizontal(pl.all().exclude("time", rain_col, "num_neighbours_online"))
    )
    .alias("wet_flags")
)

In [95]:
all_data_wet_flags["wet_flags"].value_counts()

wet_flags,count
f64,u32
2.0,1
,65
3.0,2
0.0,1757


## Step 4. Join back to hourly data

In [96]:
target_gauge_w_wet_flags = target_gauge.join(
    all_data_wet_flags[["time", "wet_flags"]], on="time", how="left"
)

## Step 5. Forward flood-fill

In [97]:
## Forward flood-fill data to convert the flags back to hourly
target_gauge_w_wet_flags = target_gauge_w_wet_flags.with_columns(
    pl.col("wet_flags").forward_fill(limit=23)
)

In [98]:
target_gauge_w_wet_flags["wet_flags"].value_counts()

wet_flags,count
f64,u32
2.0,24
3.0,48
,1560
0.0,42168
,24


# QC18 - Daily neighbours (dry) 
[Back to Index](#Table-of-contents)

#### Differences from `intense-qc`: 
- 

In [99]:
DRY_PERIOD_DAYS = 15

In [100]:
fraction_drydays = {}
for d in range(1, 3 + 1):
    fraction_drydays[str(d)] = np.trunc(
        (1.0 - (float(d) / DRY_PERIOD_DAYS)) * 10**2
    ) / (10**2)

#### Step 1. Loop through each neighbour and compare to target

In [101]:
all_data = target_gauge_daily.clone()

for gpcc_id_to_use, gpcc_path in existing_gpcc_daily_paths.items():
    ## 0. Load GPCC data
    gpcc_id_name = gpcc_id_to_use.split("DE_")[1].lstrip("0")
    gpcc_col_name = f"{rain_col}_GPCC_{gpcc_id_name}"
    if len(gpcc_path) == 0:
        continue
    f = zipfile.ZipFile(gpcc_path[0]).open(f"tw_{gpcc_id_name}.dat")
    example_gpcc = pl.from_pandas(pd.read_csv(f, skiprows=1, header=None, sep=r"\s+"))

    ## 1. drop unnecessary columns
    example_gpcc = example_gpcc.drop([str(i) for i in range(4, 16)])

    ## 2. make datetime column (apparently it's 7am-7pm)
    example_gpcc = example_gpcc.with_columns(
        pl.datetime(pl.col("2"), pl.col("1"), pl.col("0"), 7).alias("time")
    ).drop(["0", "1", "2"])

    ## 3. rename and reorder
    example_gpcc = example_gpcc.rename({"3": rain_col})
    example_gpcc = example_gpcc.select(["time", rain_col])  ## Reorder (to look nice)

    ## 4. join to target data
    joined_gauges_gpcc = target_gauge_daily.join(
        example_gpcc, on="time", suffix=f"_GPCC_{gpcc_id_name}"
    )
    joined_gauges_gpcc = joined_gauges_gpcc.drop_nans()

    ## 5. convert to dry spell
    joined_gauges_gpcc_dry_spell = joined_gauges_gpcc.with_columns(
        pl.col(rain_col)
        .map_batches(
            lambda row: convert_to_dry_spell(row, DRY_PERIOD_DAYS, col_name=rain_col)
        )
        .alias("dry_spell_fraction"),
        pl.col(gpcc_col_name)
        .map_batches(
            lambda row: convert_to_dry_spell(
                row, DRY_PERIOD_DAYS, col_name=gpcc_col_name
            )
        )
        .alias(f"dry_spell_fraction_{gpcc_id}"),
    )

    ## 7. Assign dry flags
    joined_gauges_gpcc_dry_flags = joined_gauges_gpcc_dry_spell.with_columns(
        pl.when(
            (pl.col("dry_spell_fraction") == 1.0)
            & (pl.col(f"dry_spell_fraction_{gpcc_id}") == 1.0)
        )
        .then(0)
        .when(
            (pl.col("dry_spell_fraction") == 1.0)
            & (pl.col(f"dry_spell_fraction_{gpcc_id}") < 1.0)
            & (pl.col(f"dry_spell_fraction_{gpcc_id}") >= fraction_drydays["1"]),
        )
        .then(1)
        .when(
            (pl.col("dry_spell_fraction") == 1.0)
            & (pl.col(f"dry_spell_fraction_{gpcc_id}") < fraction_drydays["1"])
            & (pl.col(f"dry_spell_fraction_{gpcc_id}") >= fraction_drydays["2"]),
        )
        .then(2)
        .when(
            (pl.col("dry_spell_fraction") == 1.0)
            & (pl.col(f"dry_spell_fraction_{gpcc_id}") < fraction_drydays["2"])
        )
        .then(3)
        .otherwise(0)
        .alias(f"dry_flags_{gpcc_id_to_use}")
    )

    print(
        gpcc_id_to_use,
        joined_gauges_gpcc_dry_flags[f"dry_flags_{gpcc_id_to_use}"].value_counts(),
    )

    ## 8. Join to all data
    all_data = all_data.join(
        joined_gauges_gpcc_dry_flags[["time", f"dry_flags_{gpcc_id_to_use}"]],
        on="time",
        how="left",
    )

DE_03215 shape: (4, 2)
┌────────────────────┬───────┐
│ dry_flags_DE_03215 ┆ count │
│ ---                ┆ ---   │
│ i32                ┆ u32   │
╞════════════════════╪═══════╡
│ 0                  ┆ 1125  │
│ 3                  ┆ 126   │
│ 1                  ┆ 1     │
│ 2                  ┆ 4     │
└────────────────────┴───────┘
DE_06303 shape: (3, 2)
┌────────────────────┬───────┐
│ dry_flags_DE_06303 ┆ count │
│ ---                ┆ ---   │
│ i32                ┆ u32   │
╞════════════════════╪═══════╡
│ 0                  ┆ 1628  │
│ 2                  ┆ 1     │
│ 3                  ┆ 130   │
└────────────────────┴───────┘
DE_00310 shape: (4, 2)
┌────────────────────┬───────┐
│ dry_flags_DE_00310 ┆ count │
│ ---                ┆ ---   │
│ i32                ┆ u32   │
╞════════════════════╪═══════╡
│ 0                  ┆ 1628  │
│ 1                  ┆ 1     │
│ 2                  ┆ 4     │
│ 3                  ┆ 126   │
└────────────────────┴───────┘


#### Step 2. Calculate num neighbours online

In [102]:
## global determining the minimum number of non-null neighbours to count flags
MIN_NUM_NEIGHBOURS = 2  # original method is 3

In [103]:
all_data.head()

time,rain_mm,dry_flags_DE_03215,dry_flags_DE_06303,dry_flags_DE_00310
datetime[μs],f64,i32,i32,i32
2006-01-01 07:00:00,0.2,,0,0
2006-01-02 07:00:00,0.0,,0,0
2006-01-03 07:00:00,0.0,,0,0
2006-01-04 07:00:00,0.0,,0,0
2006-01-05 07:00:00,0.0,,0,0


In [104]:
num_neighbours = len(all_data.columns[2:])
num_neighbours

3

In [105]:
all_data_num_neighbours = all_data.with_columns(
    (
        num_neighbours
        - pl.sum_horizontal(pl.all().exclude("time").is_null().cast(pl.Int16))
    ).alias("num_neighbours_online")
)
all_data_num_neighbours

time,rain_mm,dry_flags_DE_03215,dry_flags_DE_06303,dry_flags_DE_00310,num_neighbours_online
datetime[μs],f64,i32,i32,i32,i16
2006-01-01 07:00:00,0.2,,0,0,2
2006-01-02 07:00:00,0.0,,0,0,2
2006-01-03 07:00:00,0.0,,0,0,2
2006-01-04 07:00:00,0.0,,0,0,2
2006-01-05 07:00:00,0.0,,0,0,2
…,…,…,…,…,…
2010-12-26 07:00:00,0.2,0,0,0,3
2010-12-27 07:00:00,0.0,0,0,0,3
2010-12-28 07:00:00,0.0,0,0,0,3
2010-12-29 07:00:00,0.0,0,0,0,3


In [106]:
all_data_num_neighbours["num_neighbours_online"].value_counts()

num_neighbours_online,count
i16,u32
0,66
3,1256
2,503


In [107]:
all_data.filter(pl.col("dry_flags_DE_03215") == 1)

time,rain_mm,dry_flags_DE_03215,dry_flags_DE_06303,dry_flags_DE_00310
datetime[μs],f64,i32,i32,i32
2010-07-04 07:00:00,0.0,1,3,1


#### Step 3. Majority voting flags for the target

In [108]:
all_data_dry_flags = all_data_num_neighbours.with_columns(
    pl.when(pl.col("num_neighbours_online") < MIN_NUM_NEIGHBOURS)
    .then(np.nan)
    .otherwise(
        pl.min_horizontal(pl.all().exclude("time", rain_col, "num_neighbours_online"))
    )
    .alias("dry_flags")
)

In [109]:
all_data_dry_flags["dry_flags"].value_counts()

dry_flags,count
f64,u32
1.0,1
,66
0.0,1628
3.0,125
2.0,5


In [110]:
all_data_dry_flags.filter((pl.col("dry_flags") > 0) & (pl.col("dry_flags").is_finite()))

time,rain_mm,dry_flags_DE_03215,dry_flags_DE_06303,dry_flags_DE_00310,num_neighbours_online,dry_flags
datetime[μs],f64,i32,i32,i32,i16,f64
2007-10-16 07:00:00,0.0,2,3,2,3,2.0
2007-12-26 07:00:00,0.0,2,3,2,3,2.0
2007-12-27 07:00:00,0.0,3,3,3,3,3.0
2007-12-28 07:00:00,0.0,3,2,2,3,2.0
2008-05-13 07:00:00,0.0,2,3,2,3,2.0
…,…,…,…,…,…,…
2010-10-26 07:00:00,0.0,3,3,3,3,3.0
2010-10-27 07:00:00,0.0,3,3,3,3,3.0
2010-10-28 07:00:00,0.0,3,3,3,3,3.0
2010-10-29 07:00:00,0.0,3,3,3,3,3.0


### Step 4. Back propogate flags so period previous to flag is also given the same flag
Number of days of period set by DRY_PERIOD_DAYS

In [111]:
def back_propagate_flags(df, flag_column, num_days):
    """
    Back fill-in flags a number of days. This will prioritise higher flag values

    Parameters
    ----------
    df : polars.DataFrame
        Data with `flag_column` series
    flag_column : str
        column with flags
    num_days: int
        Number of days to back-propagate
    Returns
    -------
    df

    """
    df = df.clone()
    # TODO: automatically extract flag values
    flag_values = (
        flag for flag in df[flag_column].unique() if (np.isfinite(flag) & (flag > 0))
    )
    for flag, df_filtered in [
        (flag, df.filter(pl.col(flag_column) == flag)) for flag in flag_values
    ]:
        for time_to_use in df_filtered["time"]:
            assert isinstance(
                time_to_use, datetime.datetime
            ), "time_to_use must be datetime.datetime"
            start_time = time_to_use - datetime.timedelta(days=num_days)
            df = df.with_columns(
                pl.when(
                    (pl.col("time") >= start_time) & (pl.col("time") <= time_to_use)
                )
                .then(flag)
                .otherwise(pl.col(flag_column))
                .alias(flag_column)
            )

    return df

In [112]:
all_data_dry_flags_back_prop = back_propagate_flags(
    all_data_dry_flags, flag_column="dry_flags", num_days=(DRY_PERIOD_DAYS - 1)
)

In [113]:
## before:
all_data_dry_flags["dry_flags"].value_counts()

dry_flags,count
f64,u32
2.0,5
3.0,125
,66
1.0,1
0.0,1628


In [114]:
## after
all_data_dry_flags_back_prop["dry_flags"].value_counts()

dry_flags,count
f64,u32
0.0,1516
2.0,33
,66
3.0,195
1.0,15


In [115]:
all_data_dry_flags_back_prop[845:950]

time,rain_mm,dry_flags_DE_03215,dry_flags_DE_06303,dry_flags_DE_00310,num_neighbours_online,dry_flags
datetime[μs],f64,i32,i32,i32,i16,f64
2008-04-25 07:00:00,0.0,0,0,0,3,0.0
2008-04-26 07:00:00,0.0,0,0,0,3,0.0
2008-04-27 07:00:00,0.0,0,0,0,3,0.0
2008-04-28 07:00:00,0.6,0,0,0,3,0.0
2008-04-29 07:00:00,0.0,0,0,0,3,2.0
…,…,…,…,…,…,…
2008-08-03 07:00:00,0.7,0,0,0,3,0.0
2008-08-04 07:00:00,0.1,0,0,0,3,0.0
2008-08-05 07:00:00,0.0,0,0,0,3,0.0
2008-08-06 07:00:00,0.0,0,0,0,3,0.0


# QC19 - Hourly neighbours (dry) 
[Back to Index](#Table-of-contents)

#### Differences from `intense-qc`: 
- NOTE: the order in which the data is back-propogated and forward flood-fill will matter
- TODO: sort out whether you want to do back-propogation or forward flood fill first (original metho does back prop then flood fill)

#### Step 0. resample to daily resolution

In [116]:
## resample into daily (also round to 1 decimal place) TODO: check offset='7h' part
target_gauge_daily = (
    target_gauge.group_by_dynamic("time", every="1d", offset="7h", closed="right")
    .agg(
        [
            pl.len().alias("n_hours"),
            pl.col(rain_col).mean().round(1).alias(rain_col),
        ]
    )
    .filter(pl.col("n_hours") == 24)
    .drop("n_hours")
)  # Ensure at least 24 data points

In [117]:
target_gauge_daily.head()

time,rain_mm
datetime[μs],f64
2006-01-01 07:00:00,0.2
2006-01-02 07:00:00,0.0
2006-01-03 07:00:00,0.0
2006-01-04 07:00:00,0.0
2006-01-05 07:00:00,0.0


## Step 1. Loop through each neighbour

In [118]:
all_data = (
    target_gauge_daily.clone()
)  ## currently daily instead of hourly because this works best with later flood-fill

for n_id in all_neighbour_ids_paths.keys():
    ## 0. Get neighbouring GDSR data
    print(n_id)
    gdsr_col_name = f"{rain_col}_GDSR_{n_id}"
    neighbouring_gdsr_gauge = get_neighbouring_gdsr_data(
        neighbour_gdsr_id=n_id, time_multiplying_factor=MULTIPLYING_FACTORS["hourly"]
    )
    ## 1. make no data vals nans
    station_metadata = read_metadata(data_path=all_neighbour_ids_paths[n_id])
    neighbouring_gdsr_gauge = replace_no_data_with_nan(
        neighbouring_gdsr_gauge, no_data_value=int(station_metadata["no_data_value"])
    )

    joined_gauges_gdsr = target_gauge.join(
        neighbouring_gdsr_gauge, on="time", suffix=f"_{n_id}"
    )
    joined_gauges_gdsr = joined_gauges_gdsr.drop_nans()

    ## 2. resample into daily (also round to 1 decimal place) TODO: think about offset part
    joined_gauges_gdsr = (
        joined_gauges_gdsr.group_by_dynamic("time", every="1d", offset="7h")
        .agg(
            [
                pl.len().alias("n_hours"),
                pl.col(rain_col).mean().round(1).alias(rain_col),
                pl.col(f"{rain_col}_{n_id}").mean().round(1).alias(gdsr_col_name),
            ]
        )
        .filter(pl.col("n_hours") == 24)
        .drop("n_hours")
    )  # Ensure at least 24 data points

    ## 3. convert to dry spell
    joined_gauges_gdsr_dry_spell = joined_gauges_gdsr.with_columns(
        pl.col(rain_col)
        .map_batches(
            lambda row: convert_to_dry_spell(row, DRY_PERIOD_DAYS, col_name=rain_col)
        )
        .alias("dry_spell_fraction"),
        pl.col(gdsr_col_name)
        .map_batches(
            lambda row: convert_to_dry_spell(
                row, DRY_PERIOD_DAYS, col_name=gdsr_col_name
            )
        )
        .alias(f"dry_spell_fraction_{n_id}"),
    )

    ## 4. Assign dry flags
    joined_gauges_gdsr_dry_flags = joined_gauges_gdsr_dry_spell.with_columns(
        pl.when(
            (pl.col("dry_spell_fraction") == 1.0)
            & (pl.col(f"dry_spell_fraction_{n_id}") == 1.0)
        )
        .then(0)
        .when(
            (pl.col("dry_spell_fraction") == 1.0)
            & (pl.col(f"dry_spell_fraction_{n_id}") < 1.0)
            & (pl.col(f"dry_spell_fraction_{n_id}") >= fraction_drydays["1"]),
        )
        .then(1)
        .when(
            (pl.col("dry_spell_fraction") == 1.0)
            & (pl.col(f"dry_spell_fraction_{n_id}") < fraction_drydays["1"])
            & (pl.col(f"dry_spell_fraction_{n_id}") >= fraction_drydays["2"]),
        )
        .then(2)
        .when(
            (pl.col("dry_spell_fraction") == 1.0)
            & (pl.col(f"dry_spell_fraction_{n_id}") < fraction_drydays["2"])
        )
        .then(3)
        .otherwise(0)
        .alias(f"dry_flags_{n_id}")
    )

    ## 5. Join to all data
    all_data = all_data.join(
        joined_gauges_gdsr_dry_flags[["time", f"dry_flags_{n_id}"]],
        on="time",
        how="left",
    )

    print(
        "Hourly grouped into daily:",
        gpcc_id_to_use,
        joined_gauges_gdsr_dry_flags[f"dry_flags_{n_id}"].value_counts(),
    )

DE_03215
Hourly grouped into daily: DE_00310 shape: (4, 2)
┌────────────────────┬───────┐
│ dry_flags_DE_03215 ┆ count │
│ ---                ┆ ---   │
│ i32                ┆ u32   │
╞════════════════════╪═══════╡
│ 0                  ┆ 1126  │
│ 1                  ┆ 6     │
│ 2                  ┆ 7     │
│ 3                  ┆ 111   │
└────────────────────┴───────┘
DE_02718
Hourly grouped into daily: DE_00310 shape: (4, 2)
┌────────────────────┬───────┐
│ dry_flags_DE_02718 ┆ count │
│ ---                ┆ ---   │
│ i32                ┆ u32   │
╞════════════════════╪═══════╡
│ 1                  ┆ 2     │
│ 3                  ┆ 110   │
│ 0                  ┆ 1633  │
│ 2                  ┆ 12    │
└────────────────────┴───────┘
DE_04488
Hourly grouped into daily: DE_00310 shape: (4, 2)
┌────────────────────┬───────┐
│ dry_flags_DE_04488 ┆ count │
│ ---                ┆ ---   │
│ i32                ┆ u32   │
╞════════════════════╪═══════╡
│ 3                  ┆ 103   │
│ 2              

## Step 2. Calculate num neighbours online

In [119]:
## global determining the minimum number of non-null neighbours to count flags
MIN_NUM_NEIGHBOURS = 2  # original method is 3

In [120]:
num_neighbours = len(all_data.columns[2:])
num_neighbours

9

In [121]:
all_data_num_neighbours = all_data.with_columns(
    (
        num_neighbours
        - pl.sum_horizontal(pl.all().exclude("time").is_null().cast(pl.Int16))
    ).alias("num_neighbours_online")
)
all_data_num_neighbours

time,rain_mm,dry_flags_DE_03215,dry_flags_DE_02718,dry_flags_DE_04488,dry_flags_DE_06264,dry_flags_DE_00390,dry_flags_DE_01300,dry_flags_DE_04313,dry_flags_DE_06303,dry_flags_DE_00310,num_neighbours_online
datetime[μs],f64,i32,i32,i32,i32,i32,i32,i32,i32,i32,i16
2006-01-01 07:00:00,0.2,,0,,0,0,0,0,0,0,7
2006-01-02 07:00:00,0.0,,0,,0,0,0,0,0,0,7
2006-01-03 07:00:00,0.0,,0,,0,0,0,0,0,0,7
2006-01-04 07:00:00,0.0,,0,,0,0,0,0,0,0,7
2006-01-05 07:00:00,0.0,,0,,0,0,0,0,0,0,7
…,…,…,…,…,…,…,…,…,…,…,…
2010-12-26 07:00:00,0.2,0,0,0,0,0,0,0,0,0,9
2010-12-27 07:00:00,0.0,0,0,0,0,0,0,0,0,0,9
2010-12-28 07:00:00,0.0,0,0,0,0,0,0,0,0,0,9
2010-12-29 07:00:00,0.0,0,0,0,0,0,0,0,0,0,9


In [122]:
all_data_num_neighbours["num_neighbours_online"].value_counts()

num_neighbours_online,count
i16,u32
4,1
9,1210
0,65
3,1
8,335
7,203
5,1
6,9


In [123]:
all_data.filter(pl.col("dry_flags_DE_03215") == 1)

time,rain_mm,dry_flags_DE_03215,dry_flags_DE_02718,dry_flags_DE_04488,dry_flags_DE_06264,dry_flags_DE_00390,dry_flags_DE_01300,dry_flags_DE_04313,dry_flags_DE_06303,dry_flags_DE_00310
datetime[μs],f64,i32,i32,i32,i32,i32,i32,i32,i32,i32
2007-10-16 07:00:00,0.0,1,0,0,0,,,1,0,0
2008-09-28 07:00:00,0.0,1,3,0,2,1.0,1.0,2,2,1
2008-09-29 07:00:00,0.0,1,3,2,2,1.0,1.0,2,3,1
2009-07-02 07:00:00,0.0,1,3,2,3,3.0,3.0,2,2,3
2009-07-03 07:00:00,0.0,1,3,2,3,3.0,3.0,2,2,3
2009-07-04 07:00:00,0.0,1,3,2,3,3.0,3.0,2,2,3


## Step 3. Majority voting flags for the target

In [124]:
all_data_dry_flags = all_data_num_neighbours.with_columns(
    pl.when(pl.col("num_neighbours_online") < MIN_NUM_NEIGHBOURS)
    .then(np.nan)
    .otherwise(
        pl.min_horizontal(pl.all().exclude("time", rain_col, "num_neighbours_online"))
    )
    .alias("dry_flags")
)

In [125]:
all_data_dry_flags["dry_flags"].value_counts()

dry_flags,count
f64,u32
2.0,25
0.0,1637
1.0,6
,65
3.0,92


In [126]:
all_data_dry_flags.filter((pl.col("dry_flags") > 0) & (pl.col("dry_flags").is_finite()))

time,rain_mm,dry_flags_DE_03215,dry_flags_DE_02718,dry_flags_DE_04488,dry_flags_DE_06264,dry_flags_DE_00390,dry_flags_DE_01300,dry_flags_DE_04313,dry_flags_DE_06303,dry_flags_DE_00310,num_neighbours_online,dry_flags
datetime[μs],f64,i32,i32,i32,i32,i32,i32,i32,i32,i32,i16,f64
2008-09-29 07:00:00,0.0,1,3,2,2,1,1,2,3,1,9,1.0
2008-09-30 07:00:00,0.0,2,3,3,3,2,2,3,3,2,9,2.0
2008-10-01 07:00:00,0.0,3,3,3,3,3,3,3,,3,8,3.0
2008-10-02 07:00:00,0.0,3,3,3,,3,3,3,3,3,8,3.0
2008-10-03 07:00:00,0.0,3,3,3,3,3,3,3,3,3,9,3.0
…,…,…,…,…,…,…,…,…,…,…,…,…
2010-10-26 07:00:00,0.0,3,3,3,3,3,3,3,3,3,9,3.0
2010-10-27 07:00:00,0.0,3,3,3,3,3,3,3,3,3,9,3.0
2010-10-28 07:00:00,0.0,3,3,3,3,3,3,3,3,3,9,3.0
2010-10-29 07:00:00,0.0,3,3,3,3,3,3,3,3,3,9,3.0


## Step 4. Back propogate flags so period previous to flag is also given the same flag
Number of days of period set by DRY_PERIOD_DAYS

In [127]:
def back_propagate_flags(df, flag_column, num_days):
    """
    Back fill-in flags a number of days. This will prioritise higher flag values

    Parameters
    ----------
    df : polars.DataFrame
        Data with `flag_column` series
    flag_column : str
        column with flags
    num_days: int
        Number of days to back-propagate
    Returns
    -------
    df

    """
    df = df.clone()
    # TODO: automatically extract flag values
    flag_values = (
        flag for flag in df[flag_column].unique() if (np.isfinite(flag) & (flag > 0))
    )
    for flag, df_filtered in [
        (flag, df.filter(pl.col(flag_column) == flag)) for flag in flag_values
    ]:
        for time_to_use in df_filtered["time"]:
            assert isinstance(
                time_to_use, datetime.datetime
            ), "time_to_use must be datetime.datetime"
            start_time = time_to_use - datetime.timedelta(days=num_days)
            df = df.with_columns(
                pl.when(
                    (pl.col("time") >= start_time) & (pl.col("time") <= time_to_use)
                )
                .then(flag)
                .otherwise(pl.col(flag_column))
                .alias(flag_column)
            )

    return df

In [128]:
all_data_dry_flags_back_prop = back_propagate_flags(
    all_data_dry_flags, flag_column="dry_flags", num_days=(DRY_PERIOD_DAYS - 1)
)

In [129]:
## whilst still daily data:
all_data_dry_flags["dry_flags"].value_counts()

dry_flags,count
f64,u32
0.0,1637
3.0,92
2.0,25
,65
1.0,6


In [130]:
## after
all_data_dry_flags_back_prop["dry_flags"].value_counts()

dry_flags,count
f64,u32
0.0,1581
,65
2.0,8
1.0,1
3.0,170


In [131]:
all_data_dry_flags_back_prop[845:950]

time,rain_mm,dry_flags_DE_03215,dry_flags_DE_02718,dry_flags_DE_04488,dry_flags_DE_06264,dry_flags_DE_00390,dry_flags_DE_01300,dry_flags_DE_04313,dry_flags_DE_06303,dry_flags_DE_00310,num_neighbours_online,dry_flags
datetime[μs],f64,i32,i32,i32,i32,i32,i32,i32,i32,i32,i16,f64
2008-04-25 07:00:00,0.0,0,0,0,0,0,0,0,0,0,9,0.0
2008-04-26 07:00:00,0.0,0,0,0,0,0,0,0,0,0,9,0.0
2008-04-27 07:00:00,0.0,0,0,0,0,0,0,0,0,0,9,0.0
2008-04-28 07:00:00,0.6,0,0,0,0,0,0,0,0,0,9,0.0
2008-04-29 07:00:00,0.0,0,0,0,0,0,0,0,0,0,9,0.0
…,…,…,…,…,…,…,…,…,…,…,…,…
2008-08-03 07:00:00,0.7,0,0,0,0,0,0,0,0,0,9,0.0
2008-08-04 07:00:00,0.1,0,0,0,0,0,0,0,0,0,9,0.0
2008-08-05 07:00:00,0.0,0,0,0,0,0,0,0,0,0,9,0.0
2008-08-06 07:00:00,0.0,0,0,0,0,0,0,0,0,0,9,0.0


## Step 5. Join back to hourly data

In [132]:
target_gauge_w_dry_flags = target_gauge.join(
    all_data_dry_flags_back_prop[["time", "dry_flags"]], on="time", how="left"
)

## Step 6. Forward flood-fill

In [133]:
## Forward flood-fill data to convert the flags back to hourly
target_gauge_w_dry_flags = target_gauge_w_dry_flags.with_columns(
    pl.col("dry_flags").forward_fill(limit=23)
)

In [134]:
target_gauge_w_dry_flags["dry_flags"].value_counts()

dry_flags,count
f64,u32
,1560
3.0,4080
1.0,24
2.0,192
0.0,37944
,24


In [135]:
target_gauge_w_dry_flags.filter(
    (pl.col("dry_flags") > 0) & (pl.col("dry_flags").is_finite())
)

time,rain_mm,dry_flags
datetime[μs],f64,f64
2008-09-15 07:00:00,0.0,1.0
2008-09-15 08:00:00,0.0,1.0
2008-09-15 09:00:00,0.0,1.0
2008-09-15 10:00:00,0.0,1.0
2008-09-15 11:00:00,0.0,1.0
…,…,…
2010-10-31 02:00:00,0.0,3.0
2010-10-31 03:00:00,0.0,3.0
2010-10-31 04:00:00,0.0,3.0
2010-10-31 05:00:00,0.0,3.0


# QC20 - Monthly neighbours
[Back to Index](#Table-of-contents)

#### Differences from `intense-qc`: 
- The intenseQC code is wrong for the +/-4, 5 flags, so I am going off the description in the paper

## Step 0. Resample into monthly
Also includes step of extracting year and month and computing expected days in month

In [136]:
PERC_MONTH_NEEDED = 95  ## 95% stated in paper. In IntenseQC it says: "count valid values within month and set flag as nan if more than 5% of data is missing"

In [137]:
# NOTE: Annoyingly, there is no days_in_month functionality for polars yet
def make_monthyear_col(data):
    # Make year and month columns
    return data.with_columns(
        [
            pl.col("time").dt.year().alias("year"),
            pl.col("time").dt.month().alias("month"),
        ]
    )


def get_expected_days_in_month(data):
    """

    Parameters
    ----------
    data : pl.DataFrame
        Data with 'year' and 'month' columns

    Returns
    -------

    """
    # Use map_elements + calendar.monthrange to compute days in each month
    return data.with_columns(
        [
            pl.struct(["year", "month"])
            .map_elements(
                lambda x: calendar.monthrange(x["year"], x["month"])[1],
                return_dtype=pl.Int64,
            )
            .alias("expected_days_in_month")
        ]
    )

In [138]:
target_gauge_monthyear = make_monthyear_col(target_gauge)
target_gauge_monthyear = get_expected_days_in_month(target_gauge_monthyear)

In [139]:
target_gauge_monthly = (
    target_gauge_monthyear.group_by_dynamic("time", every="1mo", closed="right")
    .agg(
        [
            pl.len().alias("n_hours"),
            pl.col(rain_col).sum().alias(rain_col),
            pl.col("expected_days_in_month").first(),
        ]
    )
    .filter(
        pl.col("n_hours")
        >= (24 * pl.col("expected_days_in_month") * PERC_MONTH_NEEDED / 100)
    )  # TODO: Ensure at least 95% values for month are available
    .drop("n_hours", "expected_days_in_month")
)

In [140]:
target_gauge_monthly

time,rain_mm
datetime[μs],f64
2006-01-01 00:00:00,
2006-02-01 00:00:00,
2006-03-01 00:00:00,302.9
2006-04-01 00:00:00,224.2
2006-05-01 00:00:00,402.2
…,…
2010-08-01 00:00:00,417.1
2010-09-01 00:00:00,30.0
2010-10-01 00:00:00,40.0
2010-11-01 00:00:00,192.5


## Step 1. Loop through monthly GPCC neighbours and compare to target

TODO: need to find a way to programatically parse TW (daily) and MW (monthly) data

In [141]:
def calculate_perc_diff(target, other):
    """
    Parameters
    ----------
    target: pl.Series
        Target data to compare other too
    other: pl.Series
        Other data
    Returns
    -------
    perc_diff: pl.Series
    """
    return (target - other) * 100 / other



In [142]:
all_data = target_gauge_monthly.clone()

for gpcc_id_to_use, gpcc_path in existing_gpcc_monthly_paths.items():
    ## Step 0. Load GPCC monthly data
    gpcc_id_name = gpcc_id_to_use.split("DE_")[1].lstrip("0")
    if len(gpcc_path) == 0:
        continue
    f = zipfile.ZipFile(gpcc_path[0]).open(f"mw_{gpcc_id_name}.dat")
    example_gpcc = pl.from_pandas(pd.read_csv(f, skiprows=1, header=None, sep=r"\s+"))

    ## Step 1. drop unnecessary columns
    example_gpcc = example_gpcc.drop([str(i) for i in range(3, 15)])

    ## Step 2. make datetime column (apparently it's 7am-7pm)
    example_gpcc = example_gpcc.with_columns(
        pl.datetime(year=pl.col("1"), month=pl.col("0"), day=1).alias("time")
    ).drop(["0", "1"])

    ## Step 3. rename and reorder
    example_gpcc = example_gpcc.rename({"2": rain_col})
    example_gpcc = example_gpcc.select(["time", rain_col])  ## Reorder (to look nice)

    ## Step 4. join to target data
    joined_gauges_gpcc = target_gauge_monthly.join(
        example_gpcc, on="time", suffix=f"_GPCC_{gpcc_id_name}"
    )
    joined_gauges_gpcc = joined_gauges_gpcc.drop_nans()

    ## Step 5. Calculate perc_diff between target and neighbour
    joined_gauges_gpcc_perc_diff = joined_gauges_gpcc.with_columns(
        perc_diff=calculate_perc_diff(
            pl.col(f"{rain_col}"), pl.col(f"{rain_col}_GPCC_{gpcc_id_name}")
        )
    )

    ## Step 6. Assign flags
    joined_gauges_gpcc_perc_flags = joined_gauges_gpcc_perc_diff.with_columns(
        pl.when((pl.col("perc_diff") <= -100.0))
        .then(-3)
        .when((pl.col("perc_diff") <= -50.0) & (pl.col("perc_diff") > -100.0))
        .then(-2)
        .when((pl.col("perc_diff") <= -25.0) & (pl.col("perc_diff") > -50.0))
        .then(-1)
        .when((pl.col("perc_diff") <= 25.0) & (pl.col("perc_diff") > -25.0))
        .then(0)
        .when((pl.col("perc_diff") >= 25.0) & (pl.col("perc_diff") < 50.0))
        .then(1)
        .when((pl.col("perc_diff") >= 50.0) & (pl.col("perc_diff") < 100.0))
        .then(2)
        .when((pl.col("perc_diff") >= 100.0))
        .then(3)
        .otherwise(0)
        .alias(f"monthly_flags_{gpcc_id_name}")
    )

    all_data = all_data.join(
        joined_gauges_gpcc_perc_flags[["time", f"monthly_flags_{gpcc_id_name}"]],
        on="time",
        how="left",
    )

    print(all_data[f"monthly_flags_{gpcc_id_name}"].value_counts())

shape: (7, 2)
┌────────────────────┬───────┐
│ monthly_flags_3215 ┆ count │
│ ---                ┆ ---   │
│ i32                ┆ u32   │
╞════════════════════╪═══════╡
│ 1                  ┆ 3     │
│ null               ┆ 27    │
│ 2                  ┆ 13    │
│ -3                 ┆ 2     │
│ 3                  ┆ 10    │
│ 0                  ┆ 4     │
│ -2                 ┆ 1     │
└────────────────────┴───────┘
shape: (8, 2)
┌────────────────────┬───────┐
│ monthly_flags_6303 ┆ count │
│ ---                ┆ ---   │
│ i32                ┆ u32   │
╞════════════════════╪═══════╡
│ -1                 ┆ 2     │
│ -3                 ┆ 2     │
│ 3                  ┆ 10    │
│ 0                  ┆ 16    │
│ 2                  ┆ 3     │
│ null               ┆ 17    │
│ -2                 ┆ 1     │
│ 1                  ┆ 9     │
└────────────────────┴───────┘
shape: (8, 2)
┌───────────────────┬───────┐
│ monthly_flags_310 ┆ count │
│ ---               ┆ ---   │
│ i32               ┆ u32   │
╞

In [143]:
all_data

time,rain_mm,monthly_flags_3215,monthly_flags_6303,monthly_flags_310
datetime[μs],f64,i32,i32,i32
2006-01-01 00:00:00,,,,
2006-02-01 00:00:00,,,,
2006-03-01 00:00:00,302.9,,3,3
2006-04-01 00:00:00,224.2,,3,3
2006-05-01 00:00:00,402.2,,3,3
…,…,…,…,…
2010-08-01 00:00:00,417.1,3,3,2
2010-09-01 00:00:00,30.0,-2,-2,-2
2010-10-01 00:00:00,40.0,1,0,0
2010-11-01 00:00:00,192.5,3,1,1


## Step 2. Calculate num neighbours online

In [144]:
## global determining the minimum number of non-null neighbours to count flags
MIN_NUM_NEIGHBOURS = 2  # original method is 3

In [145]:
num_neighbours = len(all_data.columns[2:])
num_neighbours

3

In [146]:
all_data_num_neighbours = all_data.with_columns(
    (
        num_neighbours
        - pl.sum_horizontal(pl.all().exclude("time").is_null().cast(pl.Int16))
    ).alias("num_neighbours_online")
)
all_data_num_neighbours

time,rain_mm,monthly_flags_3215,monthly_flags_6303,monthly_flags_310,num_neighbours_online
datetime[μs],f64,i32,i32,i32,i16
2006-01-01 00:00:00,,,,,0
2006-02-01 00:00:00,,,,,0
2006-03-01 00:00:00,302.9,,3,3,2
2006-04-01 00:00:00,224.2,,3,3,2
2006-05-01 00:00:00,402.2,,3,3,2
…,…,…,…,…,…
2010-08-01 00:00:00,417.1,3,3,2,3
2010-09-01 00:00:00,30.0,-2,-2,-2,3
2010-10-01 00:00:00,40.0,1,0,0,3
2010-11-01 00:00:00,192.5,3,1,1,3


In [147]:
all_data_num_neighbours["num_neighbours_online"].value_counts()

num_neighbours_online,count
i16,u32
2,10
0,17
3,33


In [148]:
all_data.filter(pl.col("monthly_flags_3215") == -3)

time,rain_mm,monthly_flags_3215,monthly_flags_6303,monthly_flags_310
datetime[μs],f64,i32,i32,i32
2009-06-01 00:00:00,0.0,-3,-3,-3
2009-07-01 00:00:00,0.0,-3,-3,-3


## Step 3. Majority voting flags for the target

In [149]:
all_data_monthly_flags = all_data_num_neighbours.with_columns(
    pl.when(pl.col("num_neighbours_online") < MIN_NUM_NEIGHBOURS)
    .then(np.nan)
    .otherwise(
        pl.min_horizontal(pl.all().exclude("time", rain_col, "num_neighbours_online"))
    )
    .alias("monthly_flags")
)

In [150]:
all_data_monthly_flags["monthly_flags"].value_counts()

monthly_flags,count
f64,u32
-1.0,3
-3.0,2
0.0,19
3.0,9
-2.0,1
1.0,5
,17
2.0,4


In [151]:
all_data_monthly_flags.filter(
    (pl.col("monthly_flags") > 0) & (pl.col("monthly_flags").is_finite())
)

time,rain_mm,monthly_flags_3215,monthly_flags_6303,monthly_flags_310,num_neighbours_online,monthly_flags
datetime[μs],f64,i32,i32,i32,i16,f64
2006-03-01 00:00:00,302.9,,3,3,2,3.0
2006-04-01 00:00:00,224.2,,3,3,2,3.0
2006-05-01 00:00:00,402.2,,3,3,2,3.0
2006-08-01 00:00:00,372.5,,3,3,2,3.0
2006-09-01 00:00:00,89.1,,3,3,2,3.0
…,…,…,…,…,…,…
2009-04-01 00:00:00,41.9,2,1,1,3,1.0
2009-11-01 00:00:00,1099.5,3,3,3,3,3.0
2010-08-01 00:00:00,417.1,3,3,2,3,2.0
2010-11-01 00:00:00,192.5,3,1,1,3,1.0


## Step 4. Calculate neighbour monthly min/max climatology

In [152]:
all_gpcc_data = target_gauge_monthly.clone()

for gpcc_id_to_use, gpcc_path in existing_gpcc_monthly_paths.items():
    ## Step 0. Load GPCC monthly data
    gpcc_id_name = gpcc_id_to_use.split("DE_")[1].lstrip("0")
    if len(gpcc_path) == 0:
        continue
    f = zipfile.ZipFile(gpcc_path[0]).open(f"mw_{gpcc_id_name}.dat")
    example_gpcc = pl.from_pandas(pd.read_csv(f, skiprows=1, header=None, sep=r"\s+"))

    ## Step 1. drop unnecessary columns
    example_gpcc = example_gpcc.drop([str(i) for i in range(3, 15)])

    ## Step 2. make datetime column (apparently it's 7am-7pm)
    example_gpcc = example_gpcc.with_columns(
        pl.datetime(year=pl.col("1"), month=pl.col("0"), day=1).alias("time")
    ).drop(["0", "1"])

    ## Step 3. rename and reorder
    example_gpcc = example_gpcc.rename({"2": f"{rain_col}_GPCC_{gpcc_id_name}"})
    example_gpcc = example_gpcc.select(
        ["time", f"{rain_col}_GPCC_{gpcc_id_name}"]
    )  ## Reorder (to look nice)

    ## Step 4. Add rows together climatology
    all_gpcc_data = all_gpcc_data.join(
        example_gpcc,
        on="time",
        how="left",
    )

In [153]:
all_gpcc_data_monthyear = make_monthyear_col(all_gpcc_data)

In [154]:
all_gpcc_data_month_min_max = all_gpcc_data_monthyear.group_by("month").agg(
    [
        pl.min_horizontal(pl.all().exclude("time", "month", "year").min()).alias(
            "neighbour_min"
        ),
        pl.max_horizontal(pl.all().exclude("time", "month", "year").max()).alias(
            "neighbour_max"
        ),
    ]
)

In [155]:
all_gpcc_data_month_min_max

month,neighbour_min,neighbour_max
i8,f64,f64
9,17.1,206.0
1,34.0,226.3
12,51.5,160.25
5,43.3,402.2
4,1.0,4996.5
…,…,…
11,70.5,1099.5
2,59.7,246.35
3,54.7,302.9
7,0.0,136.8


## Step 5. Adjust flags to look for values outside of climatological range

In [156]:
all_data_monthly_flags = all_data_monthly_flags.select(
    ["time", rain_col, "num_neighbours_online", "monthly_flags"]
)
all_data_monthly_flags_w_monthyear = make_monthyear_col(all_data_monthly_flags)

all_data_monthly_flags_w_climatology = all_data_monthly_flags_w_monthyear.join(
    all_gpcc_data_month_min_max, on="month"
)
all_data_monthly_flags_w_climatology = all_data_monthly_flags_w_climatology.drop(
    "year", "month"
)

In [157]:
all_data_monthly_flags_w_climatology

time,rain_mm,num_neighbours_online,monthly_flags,neighbour_min,neighbour_max
datetime[μs],f64,i16,f64,f64,f64
2006-01-01 00:00:00,,0,,34.0,226.3
2006-02-01 00:00:00,,0,,59.7,246.35
2006-03-01 00:00:00,302.9,2,3.0,54.7,302.9
2006-04-01 00:00:00,224.2,2,3.0,1.0,4996.5
2006-05-01 00:00:00,402.2,2,3.0,43.3,402.2
…,…,…,…,…,…
2010-08-01 00:00:00,417.1,3,2.0,40.0,417.1
2010-09-01 00:00:00,30.0,3,-2.0,17.1,206.0
2010-10-01 00:00:00,40.0,3,0.0,22.8,188.9
2010-11-01 00:00:00,192.5,3,1.0,70.5,1099.5


In [158]:
## The intenseQC code is wrong here, so I am going off the description in the paper
all_data_monthly_flags_extreme = all_data_monthly_flags_w_climatology.with_columns(
    pl.when(
        (pl.col("monthly_flags") == -3)
        & (pl.col("num_neighbours_online") >= MIN_NUM_NEIGHBOURS)
        & (pl.col(rain_col) <= (0.75 * pl.col("neighbour_min")))
        & (pl.col(rain_col) > (0.5 * pl.col("neighbour_min")))
    )
    .then(-4)
    .when(
        (pl.col("monthly_flags") == 3)
        & (pl.col("num_neighbours_online") >= MIN_NUM_NEIGHBOURS)
        & (pl.col(rain_col) < (0.5 * pl.col("neighbour_min")))
    )
    .then(-5)
    .when(
        (pl.col("monthly_flags") == 3)
        & (pl.col("num_neighbours_online") >= MIN_NUM_NEIGHBOURS)
        & (pl.col(rain_col) > (1.25 * pl.col("neighbour_max")))
        & (pl.col(rain_col) < (2 * pl.col("neighbour_max")))
    )
    .then(4)
    .when(
        (pl.col("monthly_flags") == 3)
        & (pl.col("num_neighbours_online") >= MIN_NUM_NEIGHBOURS)
        & (pl.col(rain_col) > (2 * pl.col("neighbour_max")))
    )
    .then(5)
    .otherwise(pl.col("monthly_flags"))
    .alias("monthly_flags")
)

In [159]:
## NOTE: there are no -4, -5, 4 or 5 flags in this target data
all_data_monthly_flags_extreme["monthly_flags"].value_counts()

monthly_flags,count
f64,u32
2.0,4
-2.0,1
0.0,19
3.0,9
-1.0,3
-3.0,2
1.0,5
,17


# QC21 - Timing offset 
[Back to Index](#Table-of-contents)

#### Differences from `intense-qc`: 
-

#### Notes on method:
- Only for daily neighbours

### Step 1. Read in target data and join to nearest neighbour

In [402]:
## resample into daily (also round to 1 decimal place) TODO: check offset='7h' part
target_gauge_daily = (
    target_gauge.group_by_dynamic("time", every="1d", offset="7h", closed="right")
    .agg(
        [
            pl.len().alias("n_hours"),
            pl.col(rain_col).mean().round(1).alias(rain_col),
        ]
    )
    .filter(pl.col("n_hours") == 24)
    .drop("n_hours")
)  # Ensure at least 24 data points

In [403]:
existing_gpcc_daily_paths

{'DE_03215': ['../data/GPCC/tw_3215.zip'],
 'DE_02718': [],
 'DE_04488': [],
 'DE_06264': [],
 'DE_00390': [],
 'DE_01300': [],
 'DE_04313': [],
 'DE_06303': ['../data/GPCC/tw_6303.zip'],
 'DE_00310': ['../data/GPCC/tw_310.zip']}

In [404]:
## TODO: function that will extract closest neighbour
gpcc_id_to_use = "DE_06303"
gpcc_path = existing_gpcc_daily_paths[gpcc_id_to_use]  ## might not be closest

## 0. Load GPCC data
gpcc_id_name = gpcc_id_to_use.split("DE_")[1].lstrip("0")
gpcc_col_name = f"{rain_col}_GPCC_{gpcc_id_name}"

f = zipfile.ZipFile(gpcc_path[0]).open(f"tw_{gpcc_id_name}.dat")
example_gpcc = pl.from_pandas(pd.read_csv(f, skiprows=1, header=None, sep=r"\s+"))

## 1. drop unnecessary columns
example_gpcc = example_gpcc.drop([str(i) for i in range(4, 16)])

## 2. make datetime column (apparently it's 7am-7pm)
example_gpcc = example_gpcc.with_columns(
    pl.datetime(pl.col("2"), pl.col("1"), pl.col("0"), 7).alias("time")
).drop(["0", "1", "2"])

## 3. rename and reorder
example_gpcc = example_gpcc.rename({"3": rain_col})
example_gpcc = example_gpcc.select(["time", rain_col])  ## Reorder (to look nice)

## 4. join to target data
joined_gauges_gpcc = target_gauge_daily.join(
    example_gpcc, on="time", suffix=f"_GPCC_{gpcc_id_name}"
)
joined_gauges_gpcc = joined_gauges_gpcc.drop_nans()

### Step 2. Get lagged data
+/- 1 day

Notes:
- need to test for big gaps in days

In [405]:
def get_lagged_data(data, shift_in_time, target_col, time_res):
    """
    NOTE: Do I want this to take n-amount of columns and loop over and shift?

    Parameters
    ----------
    data : pl.DataFrame
    shift_in_time : int
        Amount to shift data by i.e. 1 for 1 day if time_res set to '1d'
    target_col : str
    time_res : str
        Time resolution like '1d' or '1mo'

    Returns
    -------

    """
    ## Upsample data to 1 day to fill in gaps
    data = data.upsample("time", every=time_res)

    return data.with_columns(
        pl.col(target_col)
        .last()
        .over(pl.col("time").dt.truncate(time_res))
        .shift(shift_in_time),
    )


joined_gauges_gpcc_lag_minus1 = get_lagged_data(
    joined_gauges_gpcc, shift_in_time=-1, target_col=rain_col, time_res="1d"
)
joined_gauges_gpcc_lag_plus1 = get_lagged_data(
    joined_gauges_gpcc, shift_in_time=1, target_col=rain_col, time_res="1d"
)

In [406]:
joined_gauges_gpcc_lag_minus1

time,rain_mm,rain_mm_GPCC_6303
datetime[μs],f64,f64
2006-01-01 07:00:00,0.0,0.2
2006-01-02 07:00:00,0.0,0.0
2006-01-03 07:00:00,0.0,0.0
2006-01-04 07:00:00,0.0,0.0
2006-01-05 07:00:00,0.0,0.0
…,…,…
2010-12-26 07:00:00,0.0,3.0
2010-12-27 07:00:00,0.0,0.4
2010-12-28 07:00:00,0.0,0.0
2010-12-29 07:00:00,0.0,0.0


In [407]:
joined_gauges_gpcc_lag_plus1

time,rain_mm,rain_mm_GPCC_6303
datetime[μs],f64,f64
2006-01-01 07:00:00,,0.2
2006-01-02 07:00:00,0.2,0.0
2006-01-03 07:00:00,0.0,0.0
2006-01-04 07:00:00,0.0,0.0
2006-01-05 07:00:00,0.0,0.0
…,…,…
2010-12-26 07:00:00,0.0,3.0
2010-12-27 07:00:00,0.2,0.4
2010-12-28 07:00:00,0.0,0.0
2010-12-29 07:00:00,0.0,0.0


### Step 3. Loop through -1, 0, +1 lags and compute offset flag

In [408]:
affinities = {}
gauge_corrs = {}
for lag, gauge_data in zip(
    [-1, 0, 1],
    [joined_gauges_gpcc_lag_minus1, joined_gauges_gpcc, joined_gauges_gpcc_lag_plus1],
):
    ## Step 1. get gauge non-0 minima
    non_zero_minima = get_target_neighbour_non_zero_minima(
        gauge_data, target_col=rain_col, other_col=f"{rain_col}_GPCC_{gpcc_id_name}"
    )
    gauge_data_minima = get_target_and_neighbour_not_minima_column(
        gauge_data,
        data_minima=non_zero_minima,
        target_col=rain_col,
        other_col=f"{rain_col}_GPCC_{gpcc_id_name}",
    )

    ### Step 2. Calculate affinity and gauge correlation
    match, diff, affinity = calc_affinity_index(
        gauge_data_minima, return_match_and_diff=True
    )
    gauge_corr = calc_gauge_correlation(
        gauge_data, rain_col=rain_col, other_col=f"{rain_col}_GPCC_{gpcc_id_name}"
    )

    affinities[lag] = affinity
    gauge_corrs[lag] = gauge_corr

    print(f"Lag: {lag}")
    print(
        f"different rows: {diff}, matching rows: {match}, affinity :{affinity}, correlation {gauge_corr}"
    )
    print("#" * 10)

Lag: -1
different rows: 131, matching rows: 417, affinity :0.7609489051094891, correlation -0.013916554390363284
##########
Lag: 0
different rows: 158, matching rows: 542, affinity :0.7742857142857142, correlation 0.009957123786108265
##########
Lag: 1
different rows: 131, matching rows: 444, affinity :0.7721739130434783, correlation -0.010228388153778127
##########


### Step 4. Flag data

In [409]:
print(f"{affinities}, \nIndex of max: {max(affinities, key=affinities.get)}")

{-1: 0.7609489051094891, 0: 0.7742857142857142, 1: 0.7721739130434783}, 
Index of max: 0


In [410]:
print(f"{gauge_corrs}, \nIndex of max: {max(gauge_corrs, key=gauge_corrs.get)}")

{-1: np.float64(-0.013916554390363284), 0: np.float64(0.009957123786108265), 1: np.float64(-0.010228388153778127)}, 
Index of max: 0


In [411]:
if max(affinities, key=affinities.get) == max(gauge_corrs, key=gauge_corrs.get):
    offset_flag = max(affinities, key=affinities.get)
else:
    offset_flag = 0

print(offset_flag)

0


In [412]:
## test
affinities[-1] = 1
gauge_corrs[-1] = 1

if max(affinities, key=affinities.get) == max(gauge_corrs, key=gauge_corrs.get):
    offset_flag = max(affinities, key=affinities.get)
else:
    offset_flag = 0

print(offset_flag)

-1


# QC22 - Pre-QC Affinity  
[Back to Index](#Table-of-contents)

#### Differences from `intense-qc`: 
-

### Step 1. Read in target data and join to nearest neighbour

In [413]:
## resample into daily (also round to 1 decimal place) TODO: check offset='7h' part
target_gauge_daily = (
    target_gauge.group_by_dynamic("time", every="1d", offset="7h", closed="right")
    .agg(
        [
            pl.len().alias("n_hours"),
            pl.col(rain_col).mean().round(1).alias(rain_col),
        ]
    )
    .filter(pl.col("n_hours") == 24)
    .drop("n_hours")
)  # Ensure at least 24 data points

In [414]:
existing_gpcc_daily_paths

{'DE_03215': ['../data/GPCC/tw_3215.zip'],
 'DE_02718': [],
 'DE_04488': [],
 'DE_06264': [],
 'DE_00390': [],
 'DE_01300': [],
 'DE_04313': [],
 'DE_06303': ['../data/GPCC/tw_6303.zip'],
 'DE_00310': ['../data/GPCC/tw_310.zip']}

In [415]:
## TODO: function that will extract closest neighbour
gpcc_id_to_use = "DE_06303"
gpcc_path = existing_gpcc_daily_paths[gpcc_id_to_use]  ## might not be closest

## 0. Load GPCC data
gpcc_id_name = gpcc_id_to_use.split("DE_")[1].lstrip("0")
gpcc_col_name = f"{rain_col}_GPCC_{gpcc_id_name}"

f = zipfile.ZipFile(gpcc_path[0]).open(f"tw_{gpcc_id_name}.dat")
example_gpcc = pl.from_pandas(pd.read_csv(f, skiprows=1, header=None, sep=r"\s+"))

## 1. drop unnecessary columns
example_gpcc = example_gpcc.drop([str(i) for i in range(4, 16)])

## 2. make datetime column (apparently it's 7am-7pm)
example_gpcc = example_gpcc.with_columns(
    pl.datetime(pl.col("2"), pl.col("1"), pl.col("0"), 7).alias("time")
).drop(["0", "1", "2"])

## 3. rename and reorder
example_gpcc = example_gpcc.rename({"3": rain_col})
example_gpcc = example_gpcc.select(["time", rain_col])  ## Reorder (to look nice)

## 4. join to target data
joined_gauges_gpcc = target_gauge_daily.join(
    example_gpcc, on="time", suffix=f"_GPCC_{gpcc_id_name}"
)
joined_gauges_gpcc = joined_gauges_gpcc.drop_nans()

### Step 2. Get non-0 minima

In [416]:
## Step 1. get gauge non-0 minima
non_zero_minima = get_target_neighbour_non_zero_minima(
    joined_gauges_gpcc, target_col=rain_col, other_col=f"{rain_col}_GPCC_{gpcc_id_name}"
)
joined_gauges_gpcc_minima = get_target_and_neighbour_not_minima_column(
    joined_gauges_gpcc,
    data_minima=non_zero_minima,
    target_col=rain_col,
    other_col=f"{rain_col}_GPCC_{gpcc_id_name}",
)
joined_gauges_gpcc_minima

time,rain_mm,rain_mm_GPCC_6303,gauges_not_minima
datetime[μs],f64,f64,f64
2006-01-01 07:00:00,0.2,0.2,1.0
2006-01-02 07:00:00,0.0,0.0,
2006-01-03 07:00:00,0.0,0.0,
2006-01-04 07:00:00,0.0,0.0,
2006-01-05 07:00:00,0.0,0.0,
…,…,…,…
2010-12-26 07:00:00,0.2,3.0,1.0
2010-12-27 07:00:00,0.0,0.4,
2010-12-28 07:00:00,0.0,0.0,
2010-12-29 07:00:00,0.0,0.0,


### Step 3. Compute affinity

In [417]:
### Step 2. Calculate affinity and gauge correlation
match, diff, affinity = calc_affinity_index(
    joined_gauges_gpcc_minima, return_match_and_diff=True
)

print(f"different rows: {diff}, matching rows: {match}, affinity :{affinity}")

different rows: 158, matching rows: 542, affinity :0.7742857142857142


# QC23 - Pre-QC Pearson
[Back to Index](#Table-of-contents)

#### Differences from `intense-qc`: 
-

### See Steps 1-2 of QC22

### Step 3. Calculate gauge correlation

In [418]:
gauge_corr = calc_gauge_correlation(
    joined_gauges_gpcc, rain_col=rain_col, other_col=f"{rain_col}_GPCC_{gpcc_id_name}"
)
print(gauge_corr)

0.009957123786108265


# QC24 - Daily factor  
[Back to Index](#Table-of-contents)

#### Differences from `intense-qc`: 
-

### Step 1. Compute factor diff

In [419]:
joined_gauges_gpcc_factor = compute_factor_diff_target_and_neighbour(joined_gauges_gpcc, target_col=rain_col, other_col=f"{rain_col}_GPCC_{gpcc_id_name}")

In [420]:
joined_gauges_gpcc_factor

time,rain_mm,rain_mm_GPCC_6303,factor
datetime[μs],f64,f64,f64
2006-01-01 07:00:00,0.2,0.2,1.0
2006-01-02 07:00:00,0.0,0.0,
2006-01-03 07:00:00,0.0,0.0,
2006-01-04 07:00:00,0.0,0.0,
2006-01-05 07:00:00,0.0,0.0,
…,…,…,…
2010-12-26 07:00:00,0.2,3.0,0.066667
2010-12-27 07:00:00,0.0,0.4,
2010-12-28 07:00:00,0.0,0.0,
2010-12-29 07:00:00,0.0,0.0,


In [426]:
factor_mean = joined_gauges_gpcc_factor.filter(
    (pl.col(rain_col) > 0)  & (pl.col(f"{rain_col}_GPCC_{gpcc_id_name}") > 0)
)['factor'].mean()

print("Factor mean:", factor_mean)

Factor mean: 0.21474557905460745


In [429]:
all_data = target_gauge_daily.clone()

for gpcc_id_to_use, gpcc_path in existing_gpcc_daily_paths.items():
    ## 0. Load GPCC data
    gpcc_id_name = gpcc_id_to_use.split("DE_")[1].lstrip("0")
    gpcc_col_name = f"{rain_col}_GPCC_{gpcc_id_name}"
    if len(gpcc_path) == 0:
        continue
    f = zipfile.ZipFile(gpcc_path[0]).open(f"tw_{gpcc_id_name}.dat")
    example_gpcc = pl.from_pandas(pd.read_csv(f, skiprows=1, header=None, sep=r"\s+"))

    ## 1. drop unnecessary columns
    example_gpcc = example_gpcc.drop([str(i) for i in range(4, 16)])

    ## 2. make datetime column (apparently it's 7am-7pm)
    example_gpcc = example_gpcc.with_columns(
        pl.datetime(pl.col("2"), pl.col("1"), pl.col("0"), 7).alias("time")
    ).drop(["0", "1", "2"])

    ## 3. rename and reorder
    example_gpcc = example_gpcc.rename({"3": rain_col})
    example_gpcc = example_gpcc.select(["time", rain_col])  ## Reorder (to look nice)

    ## 4. join to target data
    joined_gauges_gpcc = target_gauge_daily.join(
        example_gpcc, on="time", suffix=f"_GPCC_{gpcc_id_name}"
    )
    joined_gauges_gpcc = joined_gauges_gpcc.drop_nans()

    joined_gauges_factor = compute_factor_diff_target_and_neighbour(joined_gauges_gpcc, target_col=rain_col, other_col=f"{rain_col}_GPCC_{gpcc_id_name}")

    f_mean = joined_gauges_factor["factor_diff"].drop_nans().mean()
    print(f"{gpcc_id_to_use}: factor mean: {f_mean}")

DE_03215: factor mean: 0.2748925805169023
DE_06303: factor mean: 0.21474557905460745
DE_00310: factor mean: 0.17670482524262948


# QC25 - Monthly factor
[Back to Index](#Table-of-contents)

#### Differences from `intense-qc`: 
-

### Step 1. Load in monthly data and check completeness

In [430]:
target_gauge_monthyear = make_monthyear_col(target_gauge)
target_gauge_monthyear = get_expected_days_in_month(target_gauge_monthyear)

In [431]:
target_gauge_monthly = (
    target_gauge_monthyear.group_by_dynamic("time", every="1mo", closed="right")
    .agg(
        [
            pl.len().alias("n_hours"),
            pl.col(rain_col).sum().alias(rain_col),
            pl.col("expected_days_in_month").first(),
        ]
    )
    .filter(
        pl.col("n_hours")
        >= (24 * pl.col("expected_days_in_month") * PERC_MONTH_NEEDED / 100)
    )  # TODO: Ensure at least 95% values for month are available
    .drop("n_hours", "expected_days_in_month")
)

In [434]:
target_gauge_monthly

time,rain_mm
datetime[μs],f64
2006-01-01 00:00:00,
2006-02-01 00:00:00,
2006-03-01 00:00:00,302.9
2006-04-01 00:00:00,224.2
2006-05-01 00:00:00,402.2
…,…
2010-08-01 00:00:00,417.1
2010-09-01 00:00:00,30.0
2010-10-01 00:00:00,40.0
2010-11-01 00:00:00,192.5


### Step 2. Loop through and compute factor diff

In [448]:
all_data = target_gauge_monthly.clone()

for gpcc_id_to_use, gpcc_path in existing_gpcc_monthly_paths.items():
    ## Step 0. Load GPCC monthly data
    gpcc_id_name = gpcc_id_to_use.split("DE_")[1].lstrip("0")
    if len(gpcc_path) == 0:
        continue
    f = zipfile.ZipFile(gpcc_path[0]).open(f"mw_{gpcc_id_name}.dat")
    example_gpcc = pl.from_pandas(pd.read_csv(f, skiprows=1, header=None, sep=r"\s+"))

    ## Step 1. drop unnecessary columns
    example_gpcc = example_gpcc.drop([str(i) for i in range(3, 15)])

    ## Step 2. make datetime column (apparently it's 7am-7pm)
    example_gpcc = example_gpcc.with_columns(
        pl.datetime(year=pl.col("1"), month=pl.col("0"), day=1).alias("time")
    ).drop(["0", "1"])

    ## Step 3. rename and reorder
    example_gpcc = example_gpcc.rename({"2": rain_col})
    example_gpcc = example_gpcc.select(["time", rain_col])  ## Reorder (to look nice)

    ## Step 4. join to target data
    joined_gauges_gpcc_monthly = target_gauge_monthly.join(
        example_gpcc, on="time", suffix=f"_GPCC_{gpcc_id_name}"
    )
    joined_gauges_gpcc_monthly = joined_gauges_gpcc_monthly.drop_nans()

    ## Step 5. Calculate factor between target and neighbour
    # TODO: check whether it is okay or whether this function should ignore zeros
    joined_gauges_gpcc_monthly_factor = joined_gauges_gpcc_monthly.with_columns(
        factor_diff=pl.col(rain_col) / pl.col(f"{rain_col}_GPCC_{gpcc_id_name}"),
    )

    joined_gauges_gpcc_monthly_factor_flags = joined_gauges_gpcc_monthly_factor.with_columns(
        pl.when((pl.col("factor_diff") < 11) & (pl.col("factor_diff") > 9))
        .then(1)
        .when((pl.col("factor_diff") < 26) & (pl.col("factor_diff") > 24))
        .then(2)
        .when((pl.col("factor_diff") < 3) & (pl.col("factor_diff") > 2))
        .then(3)
        .when((pl.col("factor_diff") > 1 / 11) & (pl.col("factor_diff") < 1 / 9))
        .then(4)
        .when((pl.col("factor_diff") > 1 / 26) & (pl.col("factor_diff") < 1 / 24))
        .then(5)
        .when((pl.col("factor_diff") > 1 / 3) & (pl.col("factor_diff") < 1 / 2))
        .then(6)
        .otherwise(0)
        .alias(f"factor_flags_{gpcc_id_name}")
    )

    all_data = all_data.join(
        joined_gauges_gpcc_monthly_factor_flags[["time", f"factor_flags_{gpcc_id_name}"]],
        on="time",
        how="left",
    )

    print(all_data[f"factor_flags_{gpcc_id_name}"].value_counts())

shape: (4, 2)
┌───────────────────┬───────┐
│ factor_flags_3215 ┆ count │
│ ---               ┆ ---   │
│ i32               ┆ u32   │
╞═══════════════════╪═══════╡
│ 6                 ┆ 1     │
│ 0                 ┆ 25    │
│ null              ┆ 27    │
│ 3                 ┆ 7     │
└───────────────────┴───────┘
shape: (3, 2)
┌───────────────────┬───────┐
│ factor_flags_6303 ┆ count │
│ ---               ┆ ---   │
│ i32               ┆ u32   │
╞═══════════════════╪═══════╡
│ 3                 ┆ 6     │
│ null              ┆ 17    │
│ 0                 ┆ 37    │
└───────────────────┴───────┘
shape: (4, 2)
┌──────────────────┬───────┐
│ factor_flags_310 ┆ count │
│ ---              ┆ ---   │
│ i32              ┆ u32   │
╞══════════════════╪═══════╡
│ null             ┆ 17    │
│ 6                ┆ 1     │
│ 0                ┆ 35    │
│ 3                ┆ 7     │
└──────────────────┴───────┘


In [451]:
all_data

time,rain_mm,factor_flags_3215,factor_flags_6303,factor_flags_310
datetime[μs],f64,i32,i32,i32
2006-01-01 00:00:00,,,,
2006-02-01 00:00:00,,,,
2006-03-01 00:00:00,302.9,,3,3
2006-04-01 00:00:00,224.2,,3,0
2006-05-01 00:00:00,402.2,,3,3
…,…,…,…,…
2010-08-01 00:00:00,417.1,0,3,0
2010-09-01 00:00:00,30.0,6,0,6
2010-10-01 00:00:00,40.0,0,0,0
2010-11-01 00:00:00,192.5,3,0,0


In [452]:
joined_gauges_gpcc_monthly_factor_flags

time,rain_mm,rain_mm_GPCC_310,factor_diff,factor_flags_310
datetime[μs],f64,f64,f64,i32
2006-03-01 00:00:00,302.9,112.5,2.692444,3
2006-04-01 00:00:00,224.2,68.4,3.277778,0
2006-05-01 00:00:00,402.2,138.2,2.910275,3
2006-08-01 00:00:00,372.5,139.5,2.670251,3
2006-09-01 00:00:00,89.1,17.1,5.210526,0
…,…,…,…,…
2010-08-01 00:00:00,417.1,231.8,1.799396,0
2010-09-01 00:00:00,30.0,84.8,0.353774,6
2010-10-01 00:00:00,40.0,42.1,0.950119,0
2010-11-01 00:00:00,192.5,140.2,1.373039,0
