# QC protocol for Private Weather Stations

This notebook presents how to use the Python package `pypwsqc`, a quality assurance protocol developed for automated private weather stations (PWS).
The protocol consists of three filters from de Vos et al (2019) the Faulty Zero filter, the High Influx filter and the Station Outlier filter as well as the Indicator Correlation Filter (IC) from Bardossy et al. (2021) 



Publications: 
* de Vos, L. W., Leijnse, H., Overeem, A., & Uijlenhoet, R. (2019). Quality control for crowdsourced personal weather stations to enable operational rainfall monitoring. Geophysical Research Letters, 46(15), 8820-8829 with original R code at available at https://github.com/LottedeVos/PWSQC/.
* Bárdossy, A., Seidel, J., and El Hachem, A.: The use of personal weather station observations to improve precipitation estimation and interpolation, Hydrol. Earth Syst. Sci., 25, 583–601, https://doi.org/10.5194/hess-25-583-2021, 2021. 



In [None]:
import sys

sys.path.append('../poligrain/src')
sys.path.append('../pypwsqc/src')

In [None]:
import matplotlib.pyplot as plt
import poligrain as plg
import xarray as xr
from tqdm import tqdm
import numpy as np

import pypwsqc

import warnings
warnings.simplefilter(action="ignore", category=FutureWarning)
warnings.simplefilter(action="ignore", category=RuntimeWarning)

## Download example data from poligrain

In this example, we use an open PWS dataset from Amsterdam, called the "AMS PWS" dataset. By running the cell below, a NetCDF-file will be downloaded to your current repository (if your machine is connected to the internet).

In [None]:
(
    ds_pws,
    ds_gauges,
) = plg.example_data.load_ams_pws(data_dir="example_data", subset="full_period")


## Data preparations

This package handles rainfall data as `xarray`  Datasets. The data set must have `time` and `id` dimensions, `latitude` and `longitude` as coordinates, and `rainfall` as data variable.

An example of how to convert .csv data to a `xarray` dataset is found [here](https://github.com/OpenSenseAction/OS_data_format_conventions/blob/main/notebooks/PWS_example_dataset.ipynb).

In [None]:
ds_pws.load()

In [None]:
ds_gauges.load()

### Reproject coordinates
First we reproject the coordinates to a local metric coordinate reference system to allow for distance calculations. In the Amsterdam example we use EPSG:25832. **Remember to use a local metric reference system for your use case!** We use the function `spatial.project_point_coordinates` in the `poligrain`package. 

In [None]:
ds_pws.coords["x"], ds_pws.coords["y"] = plg.spatial.project_point_coordinates(
    x=ds_pws.lon, y=ds_pws.lat, target_projection="EPSG:25832"
)

ds_gauges.coords["x"], ds_gauges.coords["y"] = plg.spatial.project_point_coordinates(
    ds_gauges.lon,
    ds_gauges.lat,
    target_projection="EPSG:25832",
)

### Create distance matrix

Then, we calculate the distances between all stations in our data set. If your data set has a large number of stations this can take some time.

In [None]:
distance_matrix = plg.spatial.calc_point_to_point_distances(ds_pws, ds_pws)

In [None]:
plt.pcolor(distance_matrix.values)
plt.colorbar(label='distance [m]')
plt.xlabel('pws_id')
plt.ylabel('pws_id');

### Calculate data variables 
Next, we will calculate the data variables `nbrs_not_nan` and `reference` that are needed to perform the quality control. 

`nbrs_not_nan`:
Number of neighbours within a specificed range `max_distance` around the station that are reporting rainfall for each time step. The selected range depends on the use case and area of interest. In this example we use 10'000 meters. 

 `reference`:
Median rainfall of all stations within range `max_distance` from each station.

`max_distance` is called `d` in the original publication.

In [None]:
max_distance = 10e3

In [None]:
nbrs_not_nan = []
reference = []

for pws_id in tqdm(ds_pws.id.data):
    neighbor_ids = distance_matrix.id.data[
        (distance_matrix.sel(id=pws_id) < max_distance)
        & (distance_matrix.sel(id=pws_id) > 0)
    ]

    N = ds_pws.rainfall.sel(id=neighbor_ids).isnull().sum(dim="id")
    nbrs_not_nan.append(N)

    median = ds_pws.sel(id=neighbor_ids).rainfall.median(dim="id")
    reference.append(median)

ds_pws["nbrs_not_nan"] = xr.concat(nbrs_not_nan, dim="id")
ds_pws["reference"] = xr.concat(reference, dim="id")

## Plot some data

In [None]:
# pws and ref time series in hourl resulution
ds_pws.rainfall.isel(id=16).resample(time='1h').sum().plot(figsize=(12,2),alpha=0.5,label='pws')
ds_gauges.rainfall.isel(id=0).plot(color='C1',alpha=0.5,label='reference')
plt.legend();

In [None]:
ds_pws.rainfall.cumsum(dim='time').plot.line(x='time',lw=2, alpha=0.2,color='blue',add_legend=False)
ds_gauges.rainfall.cumsum(dim='time').plot.line(x='time',color='red', alpha=0.2, add_legend=False)
plt.ylim(0,2500);

# Quality control methods

## Faulty Zero Filter (FZ)
Conditions for raising Faulty Zeros flag:

* Median rainfall of neighbouring stations within range max_distance is larger than zero for at least nint time intervals while the station itself reports zero rainfall.
* The FZ flag remains 1 until the station reports nonzero rainfall.
* Filter cannot be applied if less than `n_stat` neighbours are reporting data (FZ flag is set to -1)
* NOTE! The filter cannot be applied if the station has reported NaN data in the last `nint` time steps. This gives more -1 flags than in the original R-implementation that does not use this condition. This choice was done to ensure that timesteps without data at the evaluated station is not mistakenly being interpreted as timesteps who have passed the quality control (if they would have been flagged with 0) or as time steps with a Faulty Zero issue (if they would have been flagged with 1).
  
For settings for parameter `nint` and `nstat`, see table 1 in [de Vos et al. (2019)](https://agupubs.onlinelibrary.wiley.com/doi/full/10.1029/2019GL083731)


In [None]:
%%time 
# takes 2-3 minutes
# compute filter
ds_pws = pypwsqc.flagging.fz_filter(
    ds_pws=ds_pws,
    nint=3,
    n_stat=5,
)

## High Influx Filter (HI)
Conditions for raising High Influx flag:

* If median rainfall of neighbours is below threshold ϕA, then high influx if rainfall above threshold ϕB
* If median rainfall of neighbours is above ϕA, then high influx if rainfall exceeds median times ϕB/ϕA
* Filter cannot be applied if less than n_stat neighbours are reporting data (HI flag is set to -1)
* NOTE! The filter cannot be applied if the station has reported NaN data in the last `nint` time steps. This gives more -1 flags than in the original R-implementation that does not use this condition. This choice was done to ensure that timesteps without data at the evaluated station is not mistakenly being interpreted as timesteps who have passed the quality control (if they would have been flagged with 0) or as time steps with a High Influx issue (if they would have been flagged with 1).
  
For settings for parameter `ϕA`, `ϕB`, `nstat`, and `nint`, see table 1 in [de Vos et al. (2019)](https://agupubs.onlinelibrary.wiley.com/doi/full/10.1029/2019GL083731)

In [None]:
%%time
# compute filter
ds_pws = pypwsqc.flagging.hi_filter(
    ds_pws=ds_pws,
    hi_thres_a=0.4,
    hi_thres_b=10,
    nint=6,
    n_stat=5,
)

## Station outlier filter (SO)
Conditions for raising Station Outlier flag:

* Median of the rolling pearson correlation with all neighboring stations within range `max_distance` is less than threshold `gamma`
* Filter cannot be applied if less than `n_stat` neighbours are reporting data (SO flag is set to -1)
* Filter cannot be applied if there are less than `n_stat` neighbours with less than `mmatch` intervals overlapping with the evaluated station (SO flag is set to -1)

For settings for parameter `evaluation_period`, `mmatch`, `gamma`, and `n_stat`,  see table 1 in [de Vos et al. (2019)](https://agupubs.onlinelibrary.wiley.com/doi/full/10.1029/2019GL083731)

Note! The SO-filter is different compared with the original R-code. In its original implementation, any interval with at least `mrain` intervals of nonzero rainfall measurements is evaluated. In this implementation, only a fixed rolling time window is evaluated. Therefore, the `mrain` variable from the orignal code is not needed. In the original publication, the variable `evaluation_period` (the evaluation period) is set to 4032. For 5-minute data, this is equivalent of two weeks. When the option of a variable evaluation period is excluded, two weeks is often too short as there might not be enough wet periods in the last two weeks to calculate the correlation. This results in a lot of '-1'-flags (filter cannot be applied). It is suggested to use a longer evaluation period, for example four weeks (`evaluation_period` = 8064 for 5-minute data).

The first `evaluation_period` timesteps (here set to 8064 time steps), the rollig median correlation is computed with the last time steps in the time series. Therefore, the resulting `median_corr_nbrs` should be disregarded the first `evaluation_period` time steps.

`evaluation_period` is called `mint`in the original publication.

We initialize data variables for the resulting SO-flags and the median pearson correlation with neighboring stations with the value -999. If the variables have the value 0 (passed the test), 1 (did not pass the test) or -1 (not enough information) after running the SO-filter, we know that these time series have been evaluated. If the value is still -999, this means that something went wrong as the data has not been processed. 

We also save the threshold `gamma` as a variable. In this way we can easily visualize if the median correlation with neighbors drops below this threshold, which is the condition for raising a SO-flag.

In [None]:
evaluation_period = 8064
mmatch = 200
gamma = 0.15
n_stat = 5
max_distance = 10e3

In [None]:
ds_pws["so_flag"] = xr.DataArray(
    np.ones((len(ds_pws.id), len(ds_pws.time))) * -999, dims=("id", "time")
)
ds_pws["median_corr_nbrs"] = xr.DataArray(
    np.ones((len(ds_pws.id), len(ds_pws.time))) * -999, dims=("id", "time")
)
ds_pws["gamma"] = xr.DataArray(
    np.ones((len(ds_pws.id), len(ds_pws.time))) * gamma, dims=("id", "time")
)

### Run filter

In [None]:
%%time
# takes 1min per month 
ds_pws_sel = pypwsqc.flagging.so_filter(
    ds_pws = ds_pws.sel(time="2018"),
    distance_matrix = distance_matrix,
    evaluation_period = 8064,
    mmatch = 200,
    gamma = 0.15,
    n_stat = 5,
    max_distance = 10e3,
)

## Exercise 1
1.1 Plot a PWS that is flagged by the each of the filters and their neigbhoring station values (reference) for comparison.  
1.2 Plot an overview of all three flags for the year 2018.

In [None]:
# 1.1 - your solution here


In [None]:
if input("Enter 'Solution' to display solutions: ")=='Solution':
    %load solutions/3_1_1_solution.py

In [None]:
# 1.2 - your solution here



In [None]:
if input("Enter 'Solution' to display solutions: ")=='Solution':
    %load solutions/3_1_2_solution.py

## Indicator Correlation Filter (IC)
The PWS data needs to be in hourly values as the indocator correlation filter by Bárdossy et al. (2021) does not work with 5 minute data

For the aggreation, the new value for an hour is considered as valid if at least 10 out 12 5-min values within one hour have valid data. This can be set by the min count parameter.

In [None]:
ds_pws_hourly = ds_pws.resample(time="1h").sum(min_count=10)

### Indicator correlation vs distance
First, we calculate the indicator correlations over distance for the reference data set. This is assumed to be the correct spatial pattern of precipitation which is used for checking the PWS later on.

In [None]:
import pypwsqc.indicator_correlation as ic

In [None]:
# Distance and indicator correlations of reference data
dist_mtx_ref, indcorr_mtx_ref = ic.indicator_distance_matrix(
    ds_gauges.rainfall,
    ds_gauges.rainfall,
    max_distance=30e3,
    prob=0.99,
    min_valid_overlap=2 * 24 * 30,
)

In [None]:
# plot the indicator time series of all reference stations
(ds_gauges.rainfall>ds_gauges.rainfall.quantile(0.99)).plot(figsize=(15,5));

In [None]:
plt.scatter(dist_mtx_ref, indcorr_mtx_ref, color="red", s=10, label="Ref")
plt.ylim(0, 1)
plt.xlim(0, 30e3)
plt.ylabel("Indicator Correlation [-]")
plt.xlabel("Distance [m]")
plt.title("Indicator Correlation vs. Distance for Reference Data")
plt.legend();

In [None]:
# Distance and indicator correlations of PWS
dist_mtx_pws, indcorr_mtx_pws = ic.indicator_distance_matrix(
    ds_pws_hourly.rainfall,
    ds_pws_hourly.rainfall,
    prob=0.99,
    max_distance=30e3,
    min_valid_overlap=2 * 24 * 30,
)

In [None]:
plt.scatter(dist_mtx_pws, indcorr_mtx_pws, color="b", alpha=0.2, s=10, label="PWS")
plt.scatter(dist_mtx_ref, indcorr_mtx_ref, color="red", s=10, label="Ref")
plt.ylim(0, 1)
plt.xlim(0, 30e3)
plt.ylabel("Indicator Correlation [-]")
plt.xlabel("Distance [m]")
plt.title("Indicator Correlation over Distance for PWS and Reference Data")
plt.legend();

We can see that the PWS data is very "noisy", i.e. the indicator correlation of nearby PWS stations is very low which we would not expect from the reference. Such PWS are likely to have data quality issues ans will be removed by the Indicator Correlation Filter.

Finally the distance and indicator correlations matrices between PWS and reference data are calculated.

In [None]:
dist_mtx_ref_pws, indcorr_mtx_ref_pws = ic.indicator_distance_matrix(
    da_a=ds_gauges.rainfall,
    da_b=ds_pws_hourly.rainfall,
    prob=0.99,
    min_valid_overlap=2 * 24 * 30,
)

### Apply filter

In [None]:
indcorr_results = ic.ic_filter(
    indicator_correlation_matrix_ref=indcorr_mtx_ref,
    distance_correlation_matrix_ref=dist_mtx_ref,
    indicator_correlation_matrix=indcorr_mtx_ref_pws,
    distance_matrix=dist_mtx_ref_pws,
    max_distance=20000,
    bin_size=1000,
    quantile_bin_ref=0.1,
    quantile_bin_pws=0.5,
    threshold=0.05,
);

In [None]:
indcorr_results

The results are returned as `xarray.Dataset` with four variables:

`indcorr`: Indicator correlation matrix between `Ref` and `PWS`

`dist`: Distance matrix between `Ref` and `PWS`

`indcorr_good`: Bool Array indicating whether a PWS was accepted ('True') or rejected ('False') by the filter

`indcorr_score`: A metric which indicates how well a PWS fit's into the correlation structure of the Reference

In [None]:
print(
    str(indcorr_results.indcorr_good.data.sum())
    + " of "
    + str(len(indcorr_results.indcorr_good))
    + " PWS were accepted"
)

## Exercise 2
2.1 Plot time series of following pws and colorize their points in the ic vs distane plot: 'ams74', 'ams134', 'ams113', 'ams36'. 
You can use following code for the ic vs. distance plot:
```python
distance_matrix_pws_ref = plg.spatial.calc_point_to_point_distances(ds_pws, ds_gauges)
pws_id='ams74'
indcorr_results.plot.scatter(x="dist", y="indcorr", color="grey", alpha=0.5, s=10, )
plt.scatter(dist_mtx_ref, indcorr_mtx_ref, color="r", alpha=0.5, s=10, label="Ref")
indcorr_results.sel(id_neighbor=pws_id).plot.scatter(
    x="dist", y="indcorr", color="lime", s=15, label=pws_id,
)
```
2.2 Check if these pws where flagged by the FZ, HI or SO filter.  

In [None]:
# 2.1 your solution here




In [None]:
# 2.2 your solution here




In [None]:
# one solution for both exercises
if input("Enter 'Solution' to display solutions: ")=='Solution':
    %load solutions/3_2_solution.py