![lop](../../images/logo_diive1_128px.png)

<span style='font-size:40px; display:block;'>
<b>
    Flux Processing Chain
</b>
</span>

---
**Notebook version**: `5` (XX Jan 2024)  
**Author**: Lukas Hörtnagl (holukas@ethz.ch)  

</br>

# **Background**

- This notebook demonstrates part of the flux post-processing used for fluxes from Swiss FluxNet research stations
- For a description of the different flux levels, see [Flux Processing Chain](https://www.swissfluxnet.ethz.ch/index.php/data/ecosystem-fluxes/flux-processing-chain/)
- Flux calculations (Level-1) were done in a previous step
- This notebook uses the calculated fluxes (Level-1) and applies several post-processing steps:
    - Quality flag extension (Level-2)
    - Storage correction (Level-3.1)
    - Outlier removal (Level-3.2)
- Other flux levels are currently not produced in this example:
    - Gap-filling (Level-4.1)
    - NEE Partitioning (Level-4.2)

</br>

# **User settings**
`FLUXVAR` is the name of the flux variable in the data files. In the EddyPro `_fluxnet_` or - alternatively - `_full_output_` output files, the flux variables we primarily use are:
  - `FC`, `co2_flux` ... CO2 flux, becomes `NEE` after storage correction (Level-3.1)
  - `LE` ... Latent heat flux (water)
  - `H` ... Sensible heat flux
  - `FN2O`, `n2o_flux` ... Nitrous oxide flux
  - `FCH4`, `ch4_flux` ... Methane flux
  
There are more flux variables in the output file, but we rarely need them:
  - `FH2O`, `h2o_flux` ... H2O flux, very important but it is the same as `ET` and `LE` but with different units
  - `ET` ... Evapotranspiration, very important but it is the same as `FH2O` and `LE` but with different units. We can easily calculte `ET` later from `LE`, e.g. in `ReddyProc`.
  - `TAU` ... Momentum flux, a measure of the turbulent transfer of momentum between the land surface and the atmosphere

In [None]:
FLUXVAR = "co2_flux"  # Name of the flux variable
SOURCEDIRS = [r'L:\Sync\luhk_work\TMP\fru']  # Folders where the EddyPro output files are located
SITE_LAT = 47.115833  # Latitude of site
SITE_LON = 8.537778  # Longitude of site
FILETYPE = 'EDDYPRO_FULL_OUTPUT_30MIN'  # Filetype of EddyPro output files, can be 'EDDYPRO_FLUXNET_30MIN' or 'EDDYPRO_FULL_OUTPUT_30MIN'
UTC_OFFSET = 1  # Time stamp offset in relation to UTC, e.g. 1 for UTC+01:00 (CET), important for the calculation of potential radiation for detecting daytime and nighttime
NIGHTTIME_THRESHOLD = 50  # Threshold for potential radiation in W m-2, conditions below threshold are nighttime
DAYTIME_ACCEPT_QCF_BELOW = 2
NIGHTTIMETIME_ACCEPT_QCF_BELOW = 2

</br>

# **Imports**
- This notebook uses `diive` ([source code](https://gitlab.ethz.ch/diive/diive)) to check eddy covariance fluxes for quality

In [None]:
import os
import importlib.metadata
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from datetime import datetime
from pathlib import Path
from diive.core.io.filereader import MultiDataFileReader, search_files
from diive.core.io.files import save_parquet, load_parquet
# from diive.core.times.times import TimestampSanitizer
# from diive.pkgs.fluxprocessingchain.level2_qualityflags import FluxQualityFlagsLevel2EddyPro
# from diive.pkgs.fluxprocessingchain.level31_storagecorrection import FluxStorageCorrectionSinglePointEddyPro
# from diive.pkgs.outlierdetection.stepwiseoutlierdetection import StepwiseOutlierDetection
from diive.pkgs.qaqc.qcf import FlagQCF
from diive.core.plotting.timeseries import TimeSeries  # For simple (interactive) time series plotting
from diive.core.dfun.stats import sstats  # Time series stats
from diive.pkgs.fluxprocessingchain.fluxprocessingchain import FluxProcessingChain, LoadEddyProOutputFiles


version_diive = importlib.metadata.version("diive")
print(f"diive version: v{version_diive}")

</br>

# **Docstring** for `FluxProcessingChain`

In [None]:
help(FluxProcessingChain)

</br>

</br>

# **Load data** (2 options)

## Option 1: Load data from single or multiple output files
- Used to read data from the EddyPro _fluxnet_ output files

In [None]:
ep = LoadEddyProOutputFiles(sourcedir=SOURCEDIRS, filetype=FILETYPE)

In [None]:
ep.searchfiles()

In [None]:
ep.loadfiles()

In [None]:
level1_df = ep.level1_df
level1_metadata = ep.metadata

</br>

## Option 2: Load data from `parquet` file
- Used to continue a previous session where another flux variable was already post-processed
- For example, if you have already post-processed CO2 flux and now want to post-process H2O flux
- Also detects time resolution of time series, this info was lost when saving to the parquet file

<div class="alert alert-block alert-success">
    <b>NEEDS CHECK</b>
</div>

In [None]:
df_orig = load_parquet(filepath='df_level32_qcf.parquet')

</br>

## Check data

In [None]:
level1_df.head()

In [None]:
sstats(level1_df[FLUXVAR])

In [None]:
TimeSeries(series=level1_df[FLUXVAR]).plot_interactive()
# TimeSeries(series=level1_df[FLUXVAR]).plot()

</br>

</br>

# **Flux processing chain**

In [None]:
fpc = FluxProcessingChain(
    level1_df=level1_df,    
    filetype=FILETYPE,    
    fluxcol=FLUXVAR,    
    site_lat=SITE_LAT,
    site_lon=SITE_LON,
    utc_offset=UTC_OFFSET,
    level1_metadata=level1_metadata
)

</br>

</br>

# **Level-2: Quality flag extension**

> Extract additional quality information from the EddyPro output and store it in newly added quality flags.



Note that the USTAR filtering is not part of the Level-2 calculations.

</br>

## User settings
- A test for missing values is always included: flag calculated here from missing flux values in the EddyPro output file

</br>

### Flag: **SSITC** tests (default: `True`)
- Flag calculated in EddyPro
- Combination of the two partial tests *steady state test* and *developed turbulent conditions test*
- This notebook expects the SSITC flag to follow the flagging policy according to Mauder and Foken 2004: 0 for best quality fluxes, 1 for fluxes suitable for general analysis such as annual budgets (although this is debatable) and 2 for fluxes that should be discarded from the dataset

In [None]:
TEST_SSITC = True  # Default True

</br>

### Flag: **Gas completeness** test (default: `True`)
- Flag calculated here from the gas number of records percentage in EddyPro output file
- Checks gas number of records available for each averaging Interval

In [None]:
TEST_GAS_COMPLETENESS = True  # Default True

</br>

### Flag: **Spectral correction factor** test (default: `True`)
- Flag calculated here from the gas `scf` variable in EddyPro output file

In [None]:
TEST_SPECTRAL_CORRECTION_FACTOR = True  # Default True

</br>

### Flag: **Signal strength** test (always recommended if flux was calculated using a gas analyzer)

<div class="alert alert-block alert-danger">
<b>Do not use for H (sensible heat flux).</b> This test is only relevant for fluxes where the concentration was measured by a gas analyzer, e.g. FC, FH2O, LE, ET, N2O, CH4, etc ... 
</div>  

- Signal strength / AGC / window dirtiness test (if available)
- Flag calculated here from the signal strength / AGC variable for the gas analyzer in EddyPro output file
- `SIGNAL_STRENGTH_COL`: Name of the column storing the signal strength, typically 'CUSTOM_AGC_MEAN' for LI-7500, 'CUSTOM_SIGNAL_STRENGTH_IRGA72_MEAN' for LI-7200, or something similar
- `SIGNAL_STRENGTH_THRESHOLD`: Signal strength threshold, flux values where threshold is exceeded are flagged as rejected
- `SIGNAL_STRENGTH_METHOD`: `discard above` flags fluxes where signal strength > threshold, `discard below` where signal strength < threshold

In [None]:
# Signal strength
# SIGNAL_STRENGTH_COL = 'CUSTOM_AGC_MEAN'
TEST_SIGNAL_STRENGTH_COL = 'agc_mean'
TEST_SIGNAL_STRENGTH_METHOD = 'discard above'
TEST_SIGNAL_STRENGTH_THRESHOLD = 90

In [None]:
# TimeSeries(series=df_orig[SIGNAL_STRENGTH_COL]).plot_interactive()
TimeSeries(series=level1_df[TEST_SIGNAL_STRENGTH_COL]).plot()

</br>

### Flags: **Raw data screening** tests
- Flags were calculated in EddyPro

In [None]:
TEST_RAWDATA_SPIKES = True  # Default True
TEST_RAWDATA_AMPLITUDE = True  # Default True
TEST_RAWDATA_DROPOUT = True  # Default True
TEST_RAWDATA_ABSLIM = False  # Default False
TEST_RAWDATA_SKEWKURT_HF = False  # Default False
TEST_RAWDATA_SKEWKURT_SF = False  # Default False
TEST_RAWDATA_DISCONT_HF = False  # Default False
TEST_RAWDATA_DISCONT_SF = False  # Default False

</br>

### Flag: **Angle-of-attack** test (default: `False`)
> This test calculates sample-wise Angle of Attacks throughout the current flux averaging period, and flags it if the percentage of angles of attack exceeding a user-defined range is beyond a (user-defined) threshold.  
> Source: [EddyPro help](https://www.licor.com/env/support/EddyPro/topics/despiking-raw-statistical-screening.html?Highlight=angle%20of%20attack#Angleofattack)  *(3 Jan 2024)*
- Flag was calculated in EddyPro

In [None]:
TEST_RAWDATA_ANGLE_OF_ATTACK = False  # Default False

</br>

### Flag: **Steadiness of horizontal wind** test (default: `False`)
> This test assesses whether the along-wind and crosswind components of the wind vector undergo a systematic reduction (or increase) throughout the file. If the quadratic combination of such systematic variations is beyond the user-selected limit, the flux averaging period is hard-flagged for instationary horizontal wind (Vickers and Mahrt, 1997, Par. 6g).  
> Source: [EddyPro help](https://www.licor.com/env/support/EddyPro/topics/despiking-raw-statistical-screening.html?Highlight=angle%20of%20attack#Steadinessofhorizontalwind)  *(3 Jan 2024)*
- Flag was calculated in EddyPro

In [None]:
TEST_RAWDATA_STEADINESS_OF_HORIZONTAL_WIND = False  # Default False

</br>

## Run

In [None]:
LEVEL2_SETTINGS = {
    'signal_strength': {'signal_strength_col': TEST_SIGNAL_STRENGTH_COL, 'method': TEST_SIGNAL_STRENGTH_METHOD, 'threshold': TEST_SIGNAL_STRENGTH_THRESHOLD},
    'raw_data_screening_vm97': {'spikes': TEST_RAWDATA_SPIKES, 'amplitude': TEST_RAWDATA_AMPLITUDE, 'dropout': TEST_RAWDATA_DROPOUT, 'abslim': TEST_RAWDATA_ABSLIM,
                                'skewkurt_hf': TEST_RAWDATA_SKEWKURT_HF, 'skewkurt_sf': TEST_RAWDATA_SKEWKURT_SF, 'discont_hf': TEST_RAWDATA_DISCONT_HF, 
                                'discont_sf': TEST_RAWDATA_DISCONT_SF},
    'ssitc': TEST_SSITC,
    'gas_completeness': TEST_GAS_COMPLETENESS,
    'spectral_correction_factor': TEST_SPECTRAL_CORRECTION_FACTOR,
    'angle_of_attack': TEST_RAWDATA_ANGLE_OF_ATTACK,
    'steadiness_of_horizontal_wind': TEST_RAWDATA_STEADINESS_OF_HORIZONTAL_WIND
}
fpc.level2_quality_flag_expansion(**LEVEL2_SETTINGS)
fpc.finalize_level2(nighttime_threshold=NIGHTTIME_THRESHOLD, daytime_accept_qcf_below=DAYTIME_ACCEPT_QCF_BELOW, nighttimetime_accept_qcf_below=NIGHTTIMETIME_ACCEPT_QCF_BELOW)

</br>

## Available `Level-2` variables
- This shows all available Level-2 variables, also the ones created previously for other fluxes

In [None]:
[x for x in fpc.fpc_df.columns if 'L2' in x]

</br>

</br>

# **Level-3.1: Storage correction**

- The flux storage term (single point) is added to the flux
- For some records, the storage term can be missing. In such cases, missing terms are gap-filled using random forest
- Without gap-filling the storage term, we can lose an additional e.g. 2-3% of flux data

In [None]:
fpc.level31_storage_correction(gapfill_storage_term=True)

In [None]:
fpc.finalize_level31()

In [None]:
fpc.level31.showplot(maxflux=50)

In [None]:
fpc.level31.report()

</br>

## Available `Level-3.1` variables before preliminary QCF

In [None]:
_vars = [print(x) for x in df_level31.columns if 'L3.1' in x]
if FLUXVAR == 'FC':
    FLUXVAR31 = f"NEE_L3.1"
else:
    FLUXVAR31 = f"{FLUXVAR}_L3.1"  # Storage-corrected flux after Level-3.1
print(f"\nName of the storage-corrected flux variable after Level-3.1:  {FLUXVAR31}")

</br>

## Generate `QCF`, needed for Level-3.2
- `QCF` is the `quality control flag` that combines the quality tests so far into one single `QCF` flag (0-1-2)

In [None]:
qcf = FlagQCF(series=df_level31[FLUXVAR31], df=df_level31, levelid='L3.1', swinpot=df_level31['SW_IN_POT'], nighttime_threshold=50)
qcf.calculate(daytime_accept_qcf_below=2, nighttimetime_accept_qcf_below=2)
df_level31 = qcf.get()

In [None]:
qcf.report_qcf_evolution()

In [None]:
qcf.showplot_qcf_heatmaps()

In [None]:
qcf.report_qcf_series()

In [None]:
qcf.report_qcf_flags()

In [None]:
qcf.showplot_qcf_timeseries()

## Available `Level-3.1` variables after preliminary QCF
- This shows all available Level-3.1 variables, also the ones created previously for other fluxes

In [None]:
_vars = [print(x) for x in df_level31.columns if 'L3.1' in x]
FLUXVAR31QCF = f"{FLUXVAR31}_L3.1_QCF"  # Quality-controlled flux after Level-3.1
print(f"\nName of the storage-corrected and quality-controlled flux variable after Level-3.1:  {FLUXVAR31QCF}  (this variable will be used in the following outlier detection)")

</br>

</br>

# **Level-3.2: Outlier detection**
- Needs quality-controlled data (so far)

</br>

## Plot time series

In [None]:
print(f"{FLUXVAR31QCF} \n(quality-controlled Level-3.1 version of {FLUXVAR31}) \n(originally based on {FLUXVAR})")

In [None]:
# TimeSeries(series=df_level31[FLUXVAR31QCF]).plot_interactive()
TimeSeries(series=df_level31[FLUXVAR31QCF]).plot()

</br>

## Initiate calculations

In [None]:
sod = StepwiseOutlierDetection(dataframe=df_level31,
                               col=FLUXVAR31QCF,
                               site_lat=SITE_LAT,
                               site_lon=SITE_LON,
                               utc_offset=1)

</br>

</br>

## Flag, outlier detection: **absolute limits, separate for daytime and nighttime data**

In [None]:
MIN_DT = -50
MAX_DT = 50
MIN_NT = -10
MAX_NT = 50
print(sod.flag_outliers_abslim_dtnt_test.__doc__)
sod.flag_outliers_abslim_dtnt_test(daytime_minmax=[MIN_DT, MAX_DT], nighttime_minmax=[MIN_NT, MAX_NT], showplot=True, verbose=True)

In [None]:
sod.addflag()

</br>

</br>

## Flag, outlier detection: **Absolute limits**

In [None]:
MIN = -50
MAX = 50
print(sod.flag_outliers_abslim_test.__doc__)
sod.flag_outliers_abslim_test(minval=MIN, maxval=MAX, showplot=True, verbose=True)

In [None]:
sod.addflag()

</br>

</br>

## Flag, outlier detection: **z-score over all data, separate for daytime and nighttime**

In [None]:
print(sod.flag_outliers_zscore_dtnt_test.__doc__)
sod.flag_outliers_zscore_dtnt_test(threshold=4, showplot=True, verbose=True)

In [None]:
sod.addflag()

</br>

</br>

## Flag, oulier detection: **Local standard deviation**

In [None]:
print(sod.flag_outliers_localsd_test.__doc__)
sod.flag_outliers_localsd_test(n_sd=4, winsize=480, showplot=True, verbose=True)

In [None]:
sod.addflag()

</br>

</br>

## Flag, outlier detection: **Manual flagging of datapoints**

In [None]:
# sod.showplot_cleaned(interactive=True)
sod.showplot_cleaned(interactive=False)

In [None]:
print(sod.flag_manualremoval_test.__doc__)
sod.flag_manualremoval_test(remove_dates=[['2022-03-05 19:45:00', '2022-04-05 19:45:00']],
                            showplot=True, verbose=True)

In [None]:
sod.addflag()

In [None]:
# sod.showplot_cleaned(interactive=True)
sod.showplot_cleaned(interactive=False)

</br>

</br>

## Flag, outlier detection: **Increments z-score**

In [None]:
print(sod.flag_outliers_increments_zcore_test.__doc__)
sod.flag_outliers_increments_zcore_test(threshold=8, showplot=True, verbose=True)

In [None]:
sod.addflag()

</br>

</br>

## Flag, outlier detection: **z-score over all data**

In [None]:
print(sod.flag_outliers_zscore_test.__doc__)
sod.flag_outliers_zscore_test(threshold=5, showplot=True, verbose=True)

In [None]:
sod.addflag()

</br>

</br>

## Flag, outlier detection: **Seasonal trend decomposition with z-score on residuals**

In [None]:
print(sod.flag_outliers_stl_rz_test.__doc__)
sod.flag_outliers_stl_rz_test(zfactor=3, decompose_downsampling_freq='6H', repeat=False, showplot=True)

In [None]:
sod.showplot_orig()
sod.showplot_cleaned()

In [None]:
sod.addflag()

</br>

</br>

## Flag, outlier detection: **Thymeboost**
- More info about [thymeboost](https://github.com/tblume1992/ThymeBoost)

In [None]:
print(sod.flag_outliers_thymeboost_test.__doc__)
sod.flag_outliers_thymeboost_test(showplot=True, verbose=True)

In [None]:
sod.addflag()

</br>

</br>

## Flag, outlier detection: **Local outlier factor**

In [None]:
print(sod.flag_outliers_lof_test.__doc__)
sod.flag_outliers_lof_test(n_neighbors=None, contamination=0.005, showplot=True, verbose=True)

In [None]:
sod.addflag()

</br>

</br>

## Flag, outlier detection: **Local outlier factor, daytime/nighttime**

In [None]:
print(sod.flag_outliers_lof_dtnt_test.__doc__)
sod.flag_outliers_lof_dtnt_test(n_neighbors=None, contamination=0.0005, showplot=True, verbose=True)

In [None]:
sod.addflag()

</br>

</br>

## Show outlier-cleaned flux

In [None]:
# sod.showplot_cleaned(interactive=True)
sod.showplot_cleaned(interactive=False)

</br>

</br>

## Finalize outlier detection: **Collect all flags**

In [None]:
df_level32 = sod.get()

</br>

## Generate `QCF`, needed for Level-3.3

In [None]:
qcf = FlagQCF(series=df_level32[FLUXVAR31], df=df_level32, levelid='L3.2', swinpot=df_level32['SW_IN_POT'], nighttime_threshold=50)
qcf.calculate(daytime_accept_qcf_below=2, nighttimetime_accept_qcf_below=2)
df_level32_qcf = qcf.get()

In [None]:
qcf.report_qcf_evolution()

In [None]:
qcf.showplot_qcf_heatmaps()

In [None]:
qcf.report_qcf_series()

In [None]:
qcf.report_qcf_flags()

In [None]:
qcf.showplot_qcf_timeseries()

</br>

## Available `Level-3.2` variables after preliminary QCF
- This shows all available Level-3.2 variables, also the ones created previously for other fluxes

In [None]:
_vars = [print(x) for x in df_level32_qcf.columns if 'L3.2' in x]
FLUXVAR32QCF = f"{FLUXVAR31}_L3.2_QCF"  # Quality-controlled flux after Level-3.2
print(f"\nName of the storage-corrected and quality-controlled flux variable after Level-3.2:  {FLUXVAR32QCF}")

</br>

## Plot quality-controlled flux after `Level-3.2`
- Plot flux after storage-correction, flux quality control and outlier removal

In [None]:
# TimeSeries(series=df_level32_qcf[FLUXVAR32QCF]).plot_interactive()
TimeSeries(series=df_level32_qcf[FLUXVAR32QCF]).plot()

In [None]:
# Creating a dictionary by passing Series objects as values
frame = {
    f'original ({FLUXVAR})': df_level32_qcf[FLUXVAR],
    f'+ storage correction + flux quality flags ({FLUXVAR31QCF})': df_level32_qcf[FLUXVAR31QCF],    
    f'+ storage correction + flux quality flags + outlier removal ({FLUXVAR32QCF})': df_level32_qcf[FLUXVAR32QCF]
}

overview = pd.DataFrame(frame)
overview.cumsum().plot(title=f"Cumulative", figsize=(12, 6));

</br>

# **Overview after `Level-3.2`**

## Available `Level-3.2` fluxes

In [None]:
_fluxcols = [x for x in df_level32_qcf.columns if 'L3.1' and 'L3.2' in x and str(x).endswith('_QCF') and not str(x).startswith('FLAG_') ]
_fluxcols

In [None]:
_subset = df_level32_qcf[_fluxcols]
_subset

In [None]:
_subset.plot(subplots=True, title="Available Level-3.2 fluxes after application of all quality checks (so far)", figsize=(12, 4.5));

In [None]:
boxplots_df = df_level32_qcf[[FLUXVAR32QCF, "NIGHT"]].copy()
boxplots_df["MONTH"] = boxplots_df.index.month
boxplots_df

In [None]:
# Draw boxplots
sns.set_theme(style="ticks", palette="pastel")
sns.boxplot(x="MONTH", y=FLUXVAR32QCF, palette=["r", "b"], hue="NIGHT", data=boxplots_df)
sns.despine(offset=10, trim=True)
plt.axhline(0, color="black");

In [None]:
# Draw a nested violinplot and split the violins for easier comparison
sns.violinplot(data=boxplots_df, x="MONTH", y=FLUXVAR32QCF, hue="NIGHT", split=True, palette=["r", "b"]);
plt.axhline(0, color="black");

</br>

# **Save results to file**
- Save results to file for futher processing
- This can be useful if you want to use the data in another software, e.g. continuing post-processing using the library `ReddyProc` in `R` 
- `Parquet` format is recommended for large datasets

## Option 1: Save to CSV (large and slow)

In [None]:
df_level32_qcf.to_csv("mylovelyhorse.csv")

## Option 2: Save to Parquet (small and fast)
- Needed if you want to continue post-processing in notebooks
- Can also be used in `R` with the `arrow` package

In [None]:
save_parquet(data=df_level32_qcf, filename="df_level32_qcf")

</br>

# *(Preliminary) USTAR*

<div class="alert alert-block alert-danger">
    No USTAR filtering for: <b>H, LE, ET and FH2O.</b> 
</div>

> The USTAR filtering is not applied to H and LE, because it has not been proved that when there are CO2 advective fluxes, these also impact energy fluxes, specifically due to the fact that when advection is in general large (nighttime), energy fluxes are small.

source: [Pastorello et al. (2020). The FLUXNET2015 dataset and the ONEFlux processing pipeline for eddy covariance data](https://doi.org/10.1038/s41597-020-0534-3)



## *(Preliminary) Impact of different USTAR thresholds on data availability*

In [None]:
from diive.pkgs.flux.ustarthreshold import UstarThresholdConstantScenarios
ust = UstarThresholdConstantScenarios(series=df_level32_qcf[FLUXVAR32QCF],
                                      swinpot=df_level32_qcf['SW_IN_POT'],
                                      ustar=df_level32_qcf['USTAR'])
ust.calc(ustarthresholds=[0.05, 0.1, 0.15], showplot=True, verbose=True)

## *(Preliminary) Apply constant USTAR threshold*
- Use constant USTAR threshold for all data

(TODO)

</br>

# **End of notebook**
Congratulations, you reached the end of this notebook! Before you go let's store your finish time.

In [None]:
dt_string = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"Finished. {dt_string}")