In [None]:

# Load library imports
import sys
import torch
import random
import joblib
import logging
import importlib
import numpy as np
import pandas as pd

# Load project Imports
from src.utils.config_loader import load_project_config
from src.data_ingestion.gwl_data_ingestion import process_station_coordinates, \
    fetch_and_process_station_data, download_and_save_station_readings
from src.preprocessing.gwl_preprocessing import load_timeseries_to_dict, outlier_detection, \
    resample_timestep_average, remove_spurious_data, interpolate_short_gaps, handle_short_gaps
from src.preprocessing.gap_imputation import handle_large_gaps
from src.preprocessing.gwl_feature_engineering import build_lags, build_seasonality_features, \
    trim_and_save
from src.graph_building.graph_construction import define_catchment_polygon, build_mesh

In [None]:
# Set up logger config
logging.basicConfig(
    level=logging.INFO,
   format='%(levelname)s - %(message)s',
#    format='%(asctime)s - %(levelname)s - %(name)s - %(message)s',
    handlers=[logging.StreamHandler(sys.stdout)]
)

# Set up logger for file and load config file for paths and params
logger = logging.getLogger(__name__)
config = load_project_config(config_path="config/project_config.yaml")
notebook = True

#Set up root directory paths in config
raw_data_root = config["global"]["paths"]["raw_data_root"]
# Update all values in global paths
for key, val in config["global"]["paths"].items():
    if isinstance(val, str):
        config["global"]["paths"][key] = val.format(raw_data_root=raw_data_root)
# Update all catchment paths
catchments_to_process = config["global"]["pipeline_settings"]["catchments_to_process"]
for catchment in catchments_to_process:
    for key, val in config[catchment]["paths"].items():
        if isinstance(val, str):
            config[catchment]["paths"][key] = val.format(raw_data_root=raw_data_root)

# Set up seeding to define global states
random_seed = config["global"]["pipeline_settings"]["random_seed"]
random.seed(random_seed)
np.random.seed(random_seed)
torch.manual_seed(random_seed)
torch.cuda.manual_seed_all(random_seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

# Define notebook demo catchment
catchments_to_process = config["global"]["pipeline_settings"]["catchments_to_process"]
catchment = catchments_to_process[0]
run_defra_API_calls = config["global"]["pipeline_settings"]["run_defra_api"]  # True to run API calls

logger.info(f"Show Notebook Outputs: {notebook}")
logger.info(f"Notebook Demo Catchment: {catchment.capitalize()}")

### DATA INGESTION ###

Load gwl station list with grid references and convert grid references to easting, northing, longitude and latitude form for plotting and data alignment.

In [None]:
# --- Process Catchment Stations List ----
stations_with_coords_df = process_station_coordinates(
    os_grid_squares=config["global"]["paths"]["gis_os_grid_squares"],
    station_list_input=config[catchment]["paths"]["gwl_station_list"],
    station_list_output=config[catchment]["paths"]["gwl_station_list_with_coords"],
    catchment=catchment
)

logger.info(f"Pipeline step 'Process Station Coordinates for {catchment}' complete.\n")

**API Documentation notes:**

1. The API calls that return readings data have a soft limit of 100,000 rows per-call which can be overridden by setting a _limit parameter. There is a hard limit of 2,000,000 rows, which cannot be overridden.
2. The primary identifier for most stations uses a GUID style identifier called an SUID. These are used in the URL for the station and given as the value of the notation property in the station metadata.  
    a. Wiski identifier (wiskiID) is also available for my subset of stations and data type  
3. All monitoring stations can be filtered by name, location and other parameters. See https://environment.data.gov.uk/hydrology/doc/reference#stations-summary for full metadata details

In [None]:
if run_defra_API_calls:
    # Retrieve gwl monitoring station metadata and measures from DEFRA API
    stations_with_metadata_measures = fetch_and_process_station_data(
        stations_df=stations_with_coords_df,
        base_url=config["global"]["paths"]["defra_station_base_url"],
        output_path=config[catchment]["paths"]["gwl_station_metadata_measures"]
    )

    logger.info(f"Pipeline step 'Pull Hydrological Station Metadata for {catchment}' complete.\n")

    stations_with_metadata_measures.head()

In [None]:
if run_defra_API_calls:
    download_and_save_station_readings(
        stations_df=stations_with_metadata_measures,
        start_date=config["global"]["data_ingestion"]["api_start_date"],
        end_date=config["global"]["data_ingestion"]["api_end_date"],
        gwl_data_output_dir=config[catchment]["paths"]["gwl_data_output_dir"]
    )

    logger.info(f"All timeseries groundwater level data saved for {catchment} catchment.")
    
else:
    
    loaded_csv_path = config[catchment]["paths"]["gwl_station_metadata_measures"]
    stations_with_metadata_measures = pd.read_csv(loaded_csv_path)

### PREPROCESSING ###

Remove stations with insufficient data and clean ts data from outliers and incorrect measurements. Interpolate between small data gaps using rational spline.

1. Load station df's into dict, dropping catchments with insufficient data

In [None]:
# Load timeseries CSVs from API into reference dict
gwl_time_series_dict = load_timeseries_to_dict(
    stations_df=stations_with_metadata_measures,
    col_order=config["global"]["data_ingestion"]["col_order"],
    data_dir=config[catchment]["paths"]["gwl_data_output_dir"],
    inclusion_threshold=config[catchment]["preprocessing"]["inclusion_threshold"],
    station_list_output=config[catchment]["paths"]["gwl_station_list_output"],
    catchment=catchment
)

logger.info(f"All timeseries data converted to dict for {catchment} catchment.\n")

2. Remove outlying and incorrect data points

In [None]:
for station_name, df in gwl_time_series_dict.items():
    gwl_time_series_dict[station_name] = remove_spurious_data(
        target_df=df,
        station_name=station_name,
        path=config[catchment]["visualisations"]["ts_plots"]["time_series_gwl_output"],
        pred_frequency=config["global"]["pipeline_settings"]["prediction_resolution"],
        notebook=notebook
    )

In [None]:
run_outlier_processing = config["global"]["pipeline_settings"]["run_outlier_detection"]

if run_outlier_processing:
    # run outlier detection and processing
    processed_gwl_time_series_dict = outlier_detection(
        gwl_time_series_dict=gwl_time_series_dict,
        output_path=config[catchment]["visualisations"]["ts_plots"]["time_series_gwl_output"],
        dpi=config[catchment]["visualisations"]["ts_plots"]["dpi_save"],
        dict_output=config[catchment]["paths"]["gwl_outlier_dict"],
        notebook=notebook
    )

3. Aggregate to timestep frequency

In [None]:
if not run_outlier_processing:
    input_dict = config[catchment]["paths"]["gwl_outlier_dict"]
    processed_gwl_time_series_dict = joblib.load(input_dict)

timestep_data = resample_timestep_average(
    gwl_data_dict=processed_gwl_time_series_dict,
    start_date=config["global"]["data_ingestion"]["api_start_date"],
    end_date=config["global"]["data_ingestion"]["api_end_date"],
    path=config[catchment]["visualisations"]["ts_plots"]["time_series_gwl_output"],
    pred_frequency=config["global"]["pipeline_settings"]["prediction_resolution"],
    notebook=notebook
)

4. Interpolate across small gaps in the ts data using rational spline or PCHIP - try both (& define threshold n/o missing time steps for interpolation eligibility) + Add binary interpolation flag column

In [None]:
timestep_data, gaps_list, station_max_gap_lengths_calculated = handle_short_gaps(
    timestep_data=timestep_data,
    path=config[catchment]["visualisations"]["ts_plots"]["time_series_gwl_output"],
    max_steps=config["global"]["data_ingestion"]["max_interp_length"],
    start_date=config["global"]["data_ingestion"]["api_start_date"],
    end_date=config["global"]["data_ingestion"]["api_end_date"],
    pred_frequency=config["global"]["pipeline_settings"]["prediction_resolution"],
    notebook=notebook
)

Handle large gaps

In [None]:
synthetic_imputation_performace, cleaned_df_dict = handle_large_gaps(
    df_dict=timestep_data,
    gaps_list=gaps_list,
    catchment=catchment,
    spatial_path=config[catchment]["paths"]["gwl_station_list_output"],
    path=config[catchment]["visualisations"]["ts_plots"]["time_series_gwl_output"],
    threshold_m=config[catchment]["preprocessing"]["large_catchment_threshold_m"],
    radius=config["global"]["preprocessing"]["radius"],
    output_path=config[catchment]["visualisations"]["corr_dist_score_scatters"],
    threshold=config[catchment]["preprocessing"]["dist_corr_score_threshold"],
    predefined_large_gap_lengths=config["global"]["preprocessing"]["gap_lengths_days"] ,
    max_imputation_length_threshold=config["global"]["preprocessing"]["max_imputation_threshold"],
    min_around=config["global"]["preprocessing"]["min_data_points_around_gap"],
    station_max_gap_lengths=station_max_gap_lengths_calculated,
    pred_frequency=config["global"]["pipeline_settings"]["prediction_resolution"],
    k_decay=config[catchment]["preprocessing"]["dist_corr_score_k_decay"],
    random_seed=config["global"]["pipeline_settings"]["random_seed"]
)

5. Lagged: Add lagged features (by timestep across 7 days?) + potentially rolling averages (3-day/7-day?) + THEN trim

In [None]:
df_with_lags = build_lags(
    df_dict=cleaned_df_dict,
    catchment=catchment
)

df_with_seasons = build_seasonality_features(
    df_dict=df_with_lags,
    catchment=catchment,
    pred_frequency=config["global"]["pipeline_settings"]["prediction_resolution"]
)

trimmed_df_dict = trim_and_save(
    df_dict=df_with_seasons,
    model_start_date=config['global']['data_ingestion']['model_start_date'],
    model_end_date=config['global']['data_ingestion']['model_end_date'],
    trimmed_output_dir=config[catchment]["paths"]["trimmed_output_dir"],
    ts_path=config[catchment]["visualisations"]["ts_plots"]["time_series_gwl_output"],
    notebook=notebook,
    catchment=catchment,
    init_with_dip_value=config[catchment]['model']['architecture']['initialise_with_dipped']
)

In [None]:
# Select Catchment area from country wide gdf
define_catchment_polygon(
    england_catchment_gdf_path=config[catchment]['paths']['gis_catchment_boundary'],
    target_mncat=config[catchment]['target_mncat'],
    catchment=catchment,
    polygon_output_path=config[catchment]['paths']['gis_catchment_dir']
)

# Build catchment mesh
mesh_nodes_table, mesh_nodes_gdf, mesh_cells_gdf_polygons, catchment_polygon = build_mesh(
    shape_filepath=config[catchment]['paths']['gis_catchment_dir'],
    output_path=config[catchment]['paths']['mesh_nodes_output'],
    catchment=catchment,
    grid_resolution=config[catchment]['preprocessing']['graph_construction']['grid_resolution']
)

logger.info(f"Pipeline step 'Build Mesh' complete for {catchment} catchment.")

### FINAL PROCESSED STATION DATAFRAMES ###

In [None]:
for station, df in trimmed_df_dict.items():
    print(f"\n{station:}")
    display(df)

**Use the following code to zoom in on a specific are of a station's data:**

In [None]:
# import matplotlib.pyplot as plt
# import matplotlib.dates as mdates
# import pandas as pd

# start_date = '2023-10-21'
# end_date = '2023-10-25'
# test_station = 'renwick'

# # Select station data
# df = trimmed_df_dict[test_station]

# # Filter using index (since it's already datetime)
# filtered_df = df.loc[start_date:end_date]

# # Plot
# fig, ax = plt.subplots(figsize=(15, 6))
# ax.plot(filtered_df.index, filtered_df['value'])
# ax.set_title(f"{test_station} Groundwater Level: {start_date} to {end_date}")
# ax.set_xlabel("Date")
# ax.set_ylabel("Groundwater Level (mAOD)")

# import matplotlib.dates as mdates
# ax.xaxis.set_minor_locator(mdates.WeekdayLocator(interval=1))
# ax.yaxis.set_minor_locator(plt.MultipleLocator(0.1))
# ax.grid(True, which='major', linestyle='-', linewidth=0.5)
# ax.grid(True, which='minor', linestyle=':', linewidth=0.2)

# fig.autofmt_xdate()
# plt.show()

# print(filtered_df.head())