# Process Data




In this script we'll create two files:

- single series file: contains each series (sensor, rain gauge) with additional metadata
- combined file: contains all the series combined based on timestamp

In [1]:
import os
import time

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import h5py
import nexusformat.nexus as nx
import pickle
import json
from tqdm import tqdm
import datetime


from fault_management_uds.data.hdf_tools import delete_group, create_group, print_tree, save_dataframe_in_HDF5, load_dataframe_from_HDF5, update_filtered_data_in_HDF5
from fault_management_uds.data.process import ensure_data_is_from_start_to_end, remove_nans_from_start_end
from fault_management_uds.data.load import import_external_metadata, import_metadata
from fault_management_uds.config import bools_2_meta, error_indicators



from fault_management_uds.config import PROJ_ROOT
from fault_management_uds.config import DATA_DIR, RAW_DATA_DIR, INTERIM_DATA_DIR, PROCESSED_DATA_DIR, EXTERNAL_DATA_DIR
from fault_management_uds.config import MODELS_DIR, REPORTS_DIR, FIGURES_DIR, REFERENCE_DIR
from fault_management_uds.config import natural_sensor_order


2024-11-10 20:42:41.277 | INFO     | fault_management_uds.config:<module>:11 - PROJ_ROOT path is: /Users/arond.jacobsen/Documents/GitHub/fault_management_uds


In [2]:
runtime_start = time.time()

---

# TODO:

- what does the comment; scaling factor wrong initially mean?


---

### Understanding the data

check the bellinge article, 

- Figure 4: how the sewage system works and what the sensors measure
- Table 1: the sensors
- Figure 7: the sewer data, when available, when outliers, when nan
- Figure 13: The available data



#### Create HDF5 file

In [3]:
data_file_path = PROCESSED_DATA_DIR / 'Bellinge.h5'

In [4]:

mode = 'w' if not os.path.exists(data_file_path) else 'a'
# Open or create an HDF5 file in append mode
with h5py.File(data_file_path, mode) as hdf:
    pass

print("HDF5 file created")

HDF5 file created


In [5]:
# the data will contain two groups
single_series_group = "single_series"
create_group(data_file_path, single_series_group, verbose=False)
# single series will be groups by series type, e.g. rain gauge, sewer data..

combined_data_group = "combined_data"
create_group(data_file_path, combined_data_group, verbose=False)
# combined data will combine each series by date and contain raw, cleaned, indicator

In [6]:
print_tree(data_file_path)

root
├── combined_data
│   ├── clean
│   │   ├── columns
│   │   ├── data
│   │   └── timestamps
│   └── raw
│       ├── columns
│       ├── data
│       └── timestamps
└── single_series
    ├── rain_gauge_data
    │   ├── 5425
    │   │   ├── columns
    │   │   ├── data
    │   │   └── timestamps
    │   └── 5427
    │       ├── columns
    │       ├── data
    │       └── timestamps
    └── sewer_data
        ├── G71F04R_Level1
        │   ├── bools
        │   │   ├── columns
        │   │   ├── data
        │   │   └── timestamps
        │   ├── clean
        │   │   ├── columns
        │   │   ├── data
        │   │   └── timestamps
        │   └── raw
        │       ├── columns
        │       ├── data
        │       └── timestamps
        ├── G71F04R_Level2
        │   ├── bools
        │   │   ├── columns
        │   │   ├── data
        │   │   └── timestamps
        │   ├── clean
        │   │   ├── columns
        │   │   ├── data
        │   │   └── timestamps
        │ 

NXroot('Bellinge')

---
# Single Series Data

missing:
- temperature
- band

### Create sensor data

Goal:
- for each sensor, combine its multiple files into a single series
- quicker and selective loading

In [7]:
# get the paths
interim_sensor_path = INTERIM_DATA_DIR / 'Bellinge' / 'sensor-data' 
# list all the files in the folder
files = os.listdir(interim_sensor_path)
print(len(files))
print(files)

37
['G71F68Y_LevelPS_System2000_p1.pkl', 'G71F68Yp1_System2000_p1.pkl', 'G71F68Yp1_power_System2000_p1.pkl', 'G71F05R_LevelBasin_System2000_p1.pkl', 'G71F68Yp1_power_iFix_p1.pkl', 'G71F05R_LevelBasin_iFix_p1.pkl', 'G80F11B_Level2_iFix_p1.pkl', 'G71F05R_LevelInlet_System2000_p1.pkl', 'G72F040_Danova_p1.pkl', 'G80F66Y_Level1_iFix_p1.pkl', 'G80F66Y_Level2_iFix_p1.pkl', 'G80F11B_Level1_iFix_p1.pkl', 'G71F68Y_LevelPS_iFix_p1.pkl', 'G71F68Yp2_power_System2000_p1.pkl', 'G71F68Yp2_power_iFix_p1.pkl', 'G71F05R_position_iFix_p1.pkl', 'G73F010_Danova_p1.pkl', 'G71F04R_Level2_System2000_p1.pkl', 'G71F04R_Level2_iFix_p4.pkl', 'G71F04R_Level2_System2000_p2.pkl', 'G71F05R_LevelInlet_iFix_p1.pkl', 'G80F13P_LevelPS_iFix_p1.pkl', 'G80F13Pp2_power_iFix_p1.pkl', 'metadata.csv', 'G80F13Pp2_power_System2000_p1.pkl', 'G71F04R_Level2_iFix_p3.pkl', 'G71F68Yp1_iFix_p1.pkl', 'G71F04R_Level1_iFix_p3.pkl', 'G80F13Pp1_power_System2000_p1.pkl', 'G80F13Pp1_power_iFix_p1.pkl', 'G71F06R_LevelInlet_iFix_p1.pkl', 'G71F06

In [8]:
external_metadata = import_metadata(REFERENCE_DIR / 'external_metadata.csv')

In [9]:
not_check_unique = ['IdMeasurement', 'Folderpath', 'Filename', 'TagSRO', 'Navn', 'sensor_orientation']
for i, col in enumerate(external_metadata.columns):
    if col not in not_check_unique:
        print(f"{col}: {external_metadata[col].unique()}")
        print('')

Area: ['Bellinge']

Source: ['System2000' 'iFix' 'Danova']

Version: ['p1' 'p2' 'p3' 'p4']

Type: ['Level' 'Position' 'Discharge' 'Power']

StartTime: <DatetimeArray>
['2010-08-01 00:00:00', '2020-01-06 00:00:00', '2020-10-12 00:00:00',
 '2020-11-19 00:00:00', '2020-10-13 00:00:00', '2017-03-07 00:00:00',
 '2020-01-08 00:00:00', '2019-06-27 00:00:00', '2019-10-23 00:00:00',
 '2010-01-01 00:00:00', '2018-09-05 00:00:00']
Length: 11, dtype: datetime64[ns]

EndTime: <DatetimeArray>
['2020-01-06 00:00:00', '2020-10-12 00:00:00', '2020-11-19 00:00:00',
 '2021-08-19 00:00:00', '2020-03-23 00:00:00', '2018-09-04 00:00:00']
Length: 6, dtype: datetime64[ns]

Conversion: [1.000000e+02 6.896552e+01 6.896552e-01 1.000000e+00 3.600000e+03]

comment: ['cm -> m' 'cm -> m + scaling factor wrong initially (2.9/2)'
 'scaling factor wrong initially' nan
 'cm -> m + scaling factor wrong initially' 'm3/h -> m3/s']

unit: ['cm' 'm' 'm3/h' 'A' nan]

obvious_min: [ 1.e-03  0.e+00  2.e-03 -5.e+00]

obvious_max

In [10]:
assert external_metadata.shape[0] == len(files)-1, f"metadata shape: {external_metadata.shape[0]}, files: {len(files)}"

In [11]:
# create a group if it does not exist
sensor_group_path = single_series_group + '/sewer_data'
create_group(data_file_path, sensor_group_path, verbose=False)


In [12]:
# Iterate over the metadata
for sensor_name, sensor_metadata in external_metadata.groupby('IdMeasurement'):
    # if sensor_name != "G72F040":
    #     # go to the next sensor
    #     continue
    print(f"Sensor name: {sensor_name}")

    # Create a group for the sensor
    sensor_path = f"{sensor_group_path}/{sensor_name}"
    create_group(data_file_path, sensor_path, verbose=False)


    # create an empty dataframe to concatenate the data
    full_data = pd.DataFrame(columns=['time', 'raw_value', 'value_no_errors'] + error_indicators)
    
    # Iterate over the rows in the metadata
    for i, file_metadata in sensor_metadata.iterrows():
        print(f"    File: {file_metadata['SaveName']}")
        
        # Load and prepare
        data_path = interim_sensor_path / f"{file_metadata['SaveName']}.pkl"
        # load the pickle file
        with open(data_path, 'rb') as f:
            data = pickle.load(f)
        data['time'] = pd.to_datetime(data['time'])
        data.sort_values('time', inplace=True)
        data.reset_index(drop=True, inplace=True)
        # remove the NaNs from the start and end
        data = remove_nans_from_start_end(data, 'raw_value')
        # concatenate the data
        full_data = pd.concat([full_data, data], axis=0)


    # sort values by value_no_errors and drop duplicates
    full_data.sort_values(['time', 'value_no_errors', 'raw_value'], inplace=True)
    full_data.drop_duplicates(subset='time', keep='first', inplace=True)
    full_data.reset_index(drop=True, inplace=True)


    # sort the data by time
    full_data.sort_values('time', inplace=True)
    full_data.reset_index(drop=True, inplace=True)

    # Save to the HDF5 file
    # save the raw
    raw_series = full_data[['time', 'raw_value']].copy()
    raw_series.columns = ['time', 'value']
    save_dataframe_in_HDF5(data_file_path, sensor_path, f"raw", raw_series)
    del raw_series
    # save the clean
    clean_series = full_data[['time', 'value_no_errors']].copy()
    clean_series.columns = ['time', 'value']
    save_dataframe_in_HDF5(data_file_path, sensor_path, f"clean", clean_series)
    del clean_series

    # save the errors
    bool_series = full_data[['time'] + ['ffill'] + error_indicators].copy()
    # convert to boolean
    for col in ['ffill'] + error_indicators:
        bool_series[col] = bool_series[col].astype(bool)
    save_dataframe_in_HDF5(data_file_path, sensor_path, f"bools", bool_series)
    del bool_series
    
    print('    Done')
    print('')

# clean up memory
del full_data



Sensor name: G71F04R_Level1
    File: G71F04R_Level1_System2000_p1
    File: G71F04R_Level1_System2000_p2
    File: G71F04R_Level1_iFix_p3
    File: G71F04R_Level1_iFix_p4
    Done

Sensor name: G71F04R_Level2
    File: G71F04R_Level2_System2000_p1
    File: G71F04R_Level2_System2000_p2
    File: G71F04R_Level2_iFix_p3
    File: G71F04R_Level2_iFix_p4
    Done

Sensor name: G71F05R_LevelBasin
    File: G71F05R_LevelBasin_System2000_p1
    File: G71F05R_LevelBasin_iFix_p1
    Done

Sensor name: G71F05R_LevelInlet
    File: G71F05R_LevelInlet_System2000_p1
    File: G71F05R_LevelInlet_iFix_p1
    Done

Sensor name: G71F05R_position
    File: G71F05R_position_System2000_p1
    File: G71F05R_position_iFix_p1
    Done

Sensor name: G71F06R_LevelInlet
    File: G71F06R_LevelInlet_System2000_p1
    File: G71F06R_LevelInlet_iFix_p1
    Done

Sensor name: G71F68Y_LevelPS
    File: G71F68Y_LevelPS_System2000_p1
    File: G71F68Y_LevelPS_iFix_p1
    Done

Sensor name: G71F68Yp1
    File: G71F68Yp

In [13]:
#f = print_tree(data_file_path, save_path=FIGURES_DIR / 'single_series_dir')
f = print_tree(data_file_path, group=sensor_group_path)

sewer_data
├── G71F04R_Level1
│   ├── bools
│   │   ├── columns
│   │   ├── data
│   │   └── timestamps
│   ├── clean
│   │   ├── columns
│   │   ├── data
│   │   └── timestamps
│   └── raw
│       ├── columns
│       ├── data
│       └── timestamps
├── G71F04R_Level2
│   ├── bools
│   │   ├── columns
│   │   ├── data
│   │   └── timestamps
│   ├── clean
│   │   ├── columns
│   │   ├── data
│   │   └── timestamps
│   └── raw
│       ├── columns
│       ├── data
│       └── timestamps
├── G71F05R_LevelBasin
│   ├── bools
│   │   ├── columns
│   │   ├── data
│   │   └── timestamps
│   ├── clean
│   │   ├── columns
│   │   ├── data
│   │   └── timestamps
│   └── raw
│       ├── columns
│       ├── data
│       └── timestamps
├── G71F05R_LevelInlet
│   ├── bools
│   │   ├── columns
│   │   ├── data
│   │   └── timestamps
│   ├── clean
│   │   ├── columns
│   │   ├── data
│   │   └── timestamps
│   └── raw
│       ├── columns
│       ├── data
│       └── timestamps
├── G71F05R_position
│   

#### Create new metadata that is for each sensor

In [14]:
len(f.keys())

19

In [15]:
columns = ['IdMeasurement', 'Type', 'Navn', 'Unit', 'UnitAlias', 'obvious_min', 'obvious_max']
metadata = external_metadata[columns].copy()
# remove duplicates
metadata.drop_duplicates(keep='first', inplace=True)

assert len(f.keys()) == metadata.shape[0], f"metadata sensors: {metadata.shape[0]}, HDF5 sensors: {len(f.keys())}"

# fix the StartTime and EndTime
for sensor_name, sensor_group in external_metadata.groupby('IdMeasurement'):
    mask = metadata['IdMeasurement'] == sensor_name
    metadata.loc[mask, 'StartTime'] = sensor_group['StartTime'].min()
    metadata.loc[mask, 'EndTime'] = sensor_group['EndTime'].max()
    print(f"Sensor name: {sensor_name}")
    print(f"    StartTime: {metadata.loc[mask, 'StartTime'].values[0]}")
    print(f"    EndTime: {metadata.loc[mask, 'EndTime'].values[0]}")

Sensor name: G71F04R_Level1
    StartTime: 2010-08-01T00:00:00.000000000
    EndTime: 2021-08-19T00:00:00.000000000
Sensor name: G71F04R_Level2
    StartTime: 2010-08-01T00:00:00.000000000
    EndTime: 2021-08-19T00:00:00.000000000
Sensor name: G71F05R_LevelBasin
    StartTime: 2010-08-01T00:00:00.000000000
    EndTime: 2021-08-19T00:00:00.000000000
Sensor name: G71F05R_LevelInlet
    StartTime: 2010-08-01T00:00:00.000000000
    EndTime: 2021-08-19T00:00:00.000000000
Sensor name: G71F05R_position
    StartTime: 2010-08-01T00:00:00.000000000
    EndTime: 2021-08-19T00:00:00.000000000
Sensor name: G71F06R_LevelInlet
    StartTime: 2010-08-01T00:00:00.000000000
    EndTime: 2021-08-19T00:00:00.000000000
Sensor name: G71F68Y_LevelPS
    StartTime: 2010-08-01T00:00:00.000000000
    EndTime: 2021-08-19T00:00:00.000000000
Sensor name: G71F68Yp1
    StartTime: 2010-08-01T00:00:00.000000000
    EndTime: 2021-08-19T00:00:00.000000000
Sensor name: G71F68Yp1_power
    StartTime: 2017-03-07T00:00:0

In [16]:
# save the metadata
# Save the metadata as a csv file
metadata.to_csv(REFERENCE_DIR / 'sensor_metadata.csv', index=False)
print("Metadata is saved")

Metadata is saved


In [17]:
# load the metadata
metadata = pd.read_csv(REFERENCE_DIR / 'sensor_metadata.csv')
metadata.head()

Unnamed: 0,IdMeasurement,Type,Navn,Unit,UnitAlias,obvious_min,obvious_max,StartTime,EndTime
0,G71F04R_Level1,Level,Niv. Indløb 1,m,Meter,0.001,1.5,2010-08-01,2021-08-19
1,G71F04R_Level2,Level,Niv. Indløb 2,m,Meter,0.001,1.5,2010-08-01,2021-08-19
2,G71F05R_LevelBasin,Level,Niv. Skyllevandsbeh.,m,Meter,0.0,5.0,2010-08-01,2021-08-19
3,G71F05R_LevelInlet,Level,Niv. Indløb,m,Meter,0.001,1.5,2010-08-01,2021-08-19
4,G71F05R_position,Position,Position throttle,m,Meter,0.0,3.0,2010-08-01,2021-08-19


## Similar for the Rain Gauge Data

- Assume the rain data is of high quality
- Only stores data when it is more than 0
- Thus all data is available

In [18]:
# get the paths
#rain_gauges_path = EXTERNAL_DATA_DIR / 'Bellinge' / 'rain-gauge-data' / '#3a_Raingauges'
rain_gauges_path = INTERIM_DATA_DIR / 'Bellinge' / 'rain-gauge-data'

# contains = "_ts"
# # list all the files in the folder
files = os.listdir(rain_gauges_path)

from fault_management_uds.config import rain_gauges

assert len(files) == len(rain_gauges), f"files: {len(files)}, rain_gauges: {len(rain_gauges)}"

In [19]:
# create a group if it does not exist
rain_gauge_group_path = single_series_group + '/rain_gauge_data'
create_group(data_file_path, rain_gauge_group_path, verbose=False)

for name in rain_gauges:
    filename = f"{name}.pkl"
    print(f"Loading data from file: {filename}")
    data = pd.read_pickle(rain_gauges_path / filename)
    # Save to the HDF5 file
    save_dataframe_in_HDF5(data_file_path, rain_gauge_group_path, name, data)
    print('    Done')

# clean up memory
del data

Loading data from file: 5425.pkl
    Done
Loading data from file: 5427.pkl
    Done


In [20]:
f = print_tree(data_file_path, group=rain_gauge_group_path)

rain_gauge_data
├── 5425
│   ├── columns
│   ├── data
│   └── timestamps
└── 5427
    ├── columns
    ├── data
    └── timestamps


---

# Creating the combined data


We'll create three datasets
- raw
- clean
- indicator

where indicator will contain the following values
- -1 is normal
- 0 is missing
- 1+ is error

#### Initializing

In [21]:
combined_data_group = "combined_data"

In [22]:
min_time = external_metadata['StartTime'].min()
max_time = external_metadata['EndTime'].max()

print(f"Min time: {min_time}, Max time: {max_time}")

Min time: 2010-01-01 00:00:00, Max time: 2021-08-19 00:00:00


In [23]:
from fault_management_uds.config import single_series_order

#### First create empty datasets

In [24]:
# create a template df for the combined data
time_range = pd.date_range(start=min_time, end=max_time, freq='1min')
# default to zeros
empty_data = np.zeros((len(time_range), len(single_series_order)))
empty_df = pd.DataFrame(empty_data, columns=single_series_order)
empty_df['time'] = time_range

In [25]:
# the combined data to be created
combined_series = ['raw', 'clean', 'indicator']

series_name = 'raw'
# save the data, 0 by default
save_dataframe_in_HDF5(data_file_path, combined_data_group, series_name, empty_df)

series_name = 'clean'
# save the data, 0 by default
save_dataframe_in_HDF5(data_file_path, combined_data_group, series_name, empty_df)

# series_name = 'indicator'
# # save the data, 0 by default
# # indicators will be 0 for no data, 1 for normal, 2 for error
# save_dataframe_in_HDF5(data_file_path, combined_data_group, series_name, empty_df)

# series_name = 'full_indicator'
# # save the data, 0 by default
# # indicators will be 0 for no data, 1 for normal, 2 for error
# save_dataframe_in_HDF5(data_file_path, combined_data_group, series_name, empty_df)


In [26]:
#f = print_tree(data_file_path, save_path=FIGURES_DIR / 'single_series_dir')
f = print_tree(data_file_path, group="combined_data")

combined_data
├── clean
│   ├── columns
│   ├── data
│   └── timestamps
└── raw
    ├── columns
    ├── data
    └── timestamps


#### Adding the data to combined

Rain data

In [27]:
rain_gauge_group_path

'single_series/rain_gauge_data'

In [28]:
# iterate the combined data
for combined in ['raw', 'clean']:
    print(f"Combined data: {combined}")
    combined_data, start_idx, end_idx, column_indices = load_dataframe_from_HDF5(data_file_path, f"{combined_data_group}/{combined}")
    # iterate the rain gauges
    for rain_gauge in rain_gauges:
        print(f"    Rain gauge: {rain_gauge}")
        # load all the data
        rain_gauge_path = f"{rain_gauge_group_path}/{rain_gauge}"
        df, _, _, _ = load_dataframe_from_HDF5(data_file_path, rain_gauge_path)
        # rename the columns
        df = df.rename(columns={'value': rain_gauge})

        # Insert the rain gauge data from df into combined_data without changing combined_data's index
        combined_data[rain_gauge] = df[rain_gauge].reindex(combined_data.index)

    # save the combined data
    update_filtered_data_in_HDF5(data_file_path, f"{combined_data_group}/{combined}", combined_data, start_idx, end_idx, column_indices)

# NOTE: no need to iterate the indicator data as it the rain gauges have no errors or missing data

# clean up memory
del combined_data

Combined data: raw
    Rain gauge: 5425
    Rain gauge: 5427
        Data saved in group '/combined_data/raw'
Combined data: clean
    Rain gauge: 5425
    Rain gauge: 5427
        Data saved in group '/combined_data/clean'


In-sewer data

In [29]:
# next step is to insert the in-sewer data into the combined data
# iterate the combined data
for combined in ['raw', 'clean']:
    print(f"Combined data: {combined}")
    combined_data, start_idx, end_idx, column_indices = load_dataframe_from_HDF5(data_file_path, f"{combined_data_group}/{combined}")
    # iterate the sensors
    for sensor in natural_sensor_order:
        print(f"    Sensor: {sensor}")
        # load all the data, NOTE: use the combined suffix
        sensor_path = f"{sensor_group_path}/{sensor}/{combined}"
        df, _, _, _ = load_dataframe_from_HDF5(data_file_path, sensor_path)
        # rename the columns
        df = df.rename(columns={'value': sensor})

        # Insert the sensor data from df into combined_data without changing combined_data's index
        combined_data[sensor] = df[sensor].reindex(combined_data.index)

    # save the combined data
    update_filtered_data_in_HDF5(data_file_path, f"{combined_data_group}/{combined}", combined_data, start_idx, end_idx, column_indices)


# clean up memory
del combined_data

Combined data: raw
    Sensor: G80F11B_Level1
    Sensor: G80F11B_Level2
    Sensor: G80F66Y_Level1
    Sensor: G80F66Y_Level2
    Sensor: G80F13P_LevelPS
    Sensor: G80F13Pp1_power
    Sensor: G80F13Pp2_power
    Sensor: G73F010
    Sensor: G72F040
    Sensor: G71F05R_LevelInlet
    Sensor: G71F05R_LevelBasin
    Sensor: G71F05R_position
    Sensor: G71F04R_Level1
    Sensor: G71F04R_Level2
    Sensor: G71F06R_LevelInlet
    Sensor: G71F68Y_LevelPS
    Sensor: G71F68Yp1
    Sensor: G71F68Yp1_power
    Sensor: G71F68Yp2_power
        Data saved in group '/combined_data/raw'
Combined data: clean
    Sensor: G80F11B_Level1
    Sensor: G80F11B_Level2
    Sensor: G80F66Y_Level1
    Sensor: G80F66Y_Level2
    Sensor: G80F13P_LevelPS
    Sensor: G80F13Pp1_power
    Sensor: G80F13Pp2_power
    Sensor: G73F010
    Sensor: G72F040
    Sensor: G71F05R_LevelInlet
    Sensor: G71F05R_LevelBasin
    Sensor: G71F05R_position
    Sensor: G71F04R_Level1
    Sensor: G71F04R_Level2
    Sensor: G71F06R_

In [30]:

# stop system from running
raise SystemExit("Stop right there!")

SystemExit: Stop right there!

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


Indicator data    

In [None]:
# next step is to insert the in-sewer data into the combined data
# iterate the combined data
combined = 'indicator'
print(f"Combined data: {combined}")
combined_data, start_idx, end_idx, column_indices = load_dataframe_from_HDF5(data_file_path, f"{combined_data_group}/{combined}")


# iterate the sensors
for sensor in natural_sensor_order:
    print(f"    Sensor: {sensor}")
    # Indicator with 1 for data and 0 for no data using the raw data
    sensor_path = f"{sensor_group_path}/{sensor}/{sensor}_clean"
    df, _, _, _ = load_dataframe_from_HDF5(data_file_path, sensor_path)

    # TODO: create indicator based on the function in processed!
    # TODO: probably save it somewhere else than in load data? maybe in processed data?


    # Indicator with 1 for data and 0 for no data
    df['indicator'] = df['value'].notna().astype(int)
    # Set 0 valued data to -1 indicator
    df.loc[df['value'] == 0, 'indicator'] = -1

    # Insert the sensor data from df into combined_data without changing combined_data's index
    #combined_data[sensor] = df['indicator'].reindex(combined_data.index)
    combined_data[sensor] = df['indicator'].reindex(combined_data.index).combine_first(combined_data[sensor])

    # Indicator with 2 for error otherwise remove rows
    sensor_path = f"{sensor_group_path}/{sensor}/{sensor}_bools"
    df, _, _, _ = load_dataframe_from_HDF5(data_file_path, sensor_path, columns=error_indicators)
    # Indicator with 2 for error otherwise remove rows
    df['indicator'] = df.any(axis=1).astype(int) * 2
    # drop rows with no errors
    df = df[df['indicator'] > 0]

    # Insert the sensor data from df into combined_data without changing combined_data's index
    #combined_data[sensor] = df['indicator'].reindex(combined_data.index)
    combined_data[sensor] = df['indicator'].reindex(combined_data.index).combine_first(combined_data[sensor])

# similar for the rain gauges
for rain_gauge in rain_gauges:
    print(f"    Rain gauge: {rain_gauge}")
    # Indicator with 1 for data and 0 for no data using the raw data
    rain_gauge_path = f"{rain_gauge_group_path}/{rain_gauge}"
    df, _, _, _ = load_dataframe_from_HDF5(data_file_path, rain_gauge_path)

    # Indicator with 1 for data and 0 for no data
    df['indicator'] = df['value'].notna().astype(int)
    # Set 0 valued data to -1 indicator
    df.loc[df['value'] == 0, 'indicator'] = -1

    # there are no errors

    # Insert the rain gauge data from df into combined_data without changing combined_data's index
    #combined_data[rain_gauge] = df['indicator'].reindex(combined_data.index)
    combined_data[rain_gauge] = df['indicator'].reindex(combined_data.index).combine_first(combined_data[rain_gauge])


# save the combined data
update_filtered_data_in_HDF5(data_file_path, f"{combined_data_group}/{combined}", combined_data, start_idx, end_idx, column_indices)
    
# clean up memory
del combined_data

Combined data: indicator
    Sensor: G80F11B_Level1
    Sensor: G80F11B_Level2
    Sensor: G80F66Y_Level1
    Sensor: G80F66Y_Level2
    Sensor: G80F13P_LevelPS
    Sensor: G80F13Pp1_power
    Sensor: G80F13Pp2_power
    Sensor: G73F010
    Sensor: G72F040
    Sensor: G71F05R_LevelInlet
    Sensor: G71F05R_LevelBasin
    Sensor: G71F05R_position
    Sensor: G71F04R_Level1
    Sensor: G71F04R_Level2
    Sensor: G71F06R_LevelInlet
    Sensor: G71F68Y_LevelPS
    Sensor: G71F68Yp1
    Sensor: G71F68Yp1_power
    Sensor: G71F68Yp2_power
    Rain gauge: 5425
    Rain gauge: 5427
        Data saved in group '/combined_data/indicator'


In [None]:
combined = 'indicator'
print(f"Combined data: {combined}")
combined_data, start_idx, end_idx, column_indices = load_dataframe_from_HDF5(data_file_path, f"{combined_data_group}/{combined}")


# save to each sensor
for sensor_name in natural_sensor_order:
    print(f"Sensor: {sensor_name}")
    sensor_indicator = combined_data[sensor_name].copy().to_frame()
    sensor_indicator.reset_index(inplace=True, drop=False, names='time')
    sensor_indicator.columns = ['time', 'value']
    sensor_path = f"{sensor_group_path}/{sensor_name}"
    save_dataframe_in_HDF5(data_file_path, sensor_path, f"{sensor_name}_{combined}", sensor_indicator)

# clean up memory
del combined_data

Combined data: indicator
Sensor: G80F11B_Level1
Sensor: G80F11B_Level2
Sensor: G80F66Y_Level1
Sensor: G80F66Y_Level2
Sensor: G80F13P_LevelPS
Sensor: G80F13Pp1_power
Sensor: G80F13Pp2_power
Sensor: G73F010
Sensor: G72F040
Sensor: G71F05R_LevelInlet
Sensor: G71F05R_LevelBasin
Sensor: G71F05R_position
Sensor: G71F04R_Level1
Sensor: G71F04R_Level2
Sensor: G71F06R_LevelInlet
Sensor: G71F68Y_LevelPS
Sensor: G71F68Yp1
Sensor: G71F68Yp1_power
Sensor: G71F68Yp2_power


#### We want to create a indicator columns, but what if some errors occur together?

In [None]:
# so we have 5 errros, and how often can we combine them in sets of 1, 2, 3, 4, 5
(5*4/2) * 5
# i.e. 50 different indicators if we run multilabel error indicators...
# instead do a smart check to see what errors occur together and create a new indicator for that

50.0

In [None]:
f = print_tree(data_file_path, print_tree=False)

# collect all the errors for all the sensors
more_one_error = np.array([[False]*len(error_indicators)])

for sensor in natural_sensor_order:
    sensor_bools_path = sensor_group_path + '/' + sensor + '/' + sensor + '_bools'
    data, _, _, _ = load_dataframe_from_HDF5(data_file_path, sensor_bools_path, columns=error_indicators)
    data = data.astype(bool)
    if data.sum(axis=1).max() > 1:
        print(f"Sensor: {sensor}, max sum: {data.sum(axis=1).max()}, total higher than 1: {data.sum(axis=1).sum()}")
        # combine the data, data[data.sum(axis=1) > 1]
        more_one_error = np.concatenate([more_one_error, data[data.sum(axis=1) > 1]])

# remove the first row
more_one_error = more_one_error[1:]

Sensor: G80F66Y_Level1, max sum: 2, total higher than 1: 485
Sensor: G80F66Y_Level2, max sum: 2, total higher than 1: 481
Sensor: G80F13P_LevelPS, max sum: 2, total higher than 1: 1759
Sensor: G71F05R_LevelInlet, max sum: 3, total higher than 1: 48288
Sensor: G71F05R_LevelBasin, max sum: 2, total higher than 1: 81701
Sensor: G71F04R_Level1, max sum: 2, total higher than 1: 7217059
Sensor: G71F04R_Level2, max sum: 2, total higher than 1: 455816
Sensor: G71F06R_LevelInlet, max sum: 2, total higher than 1: 46626
Sensor: G71F68Y_LevelPS, max sum: 2, total higher than 1: 13751


In [None]:
# check how many combinations we have
error_combinations = {}
for row in more_one_error:
    # get the index of there the error is True
    errors = list(set(np.where(row)[0]))
    error_name_list = [error_indicators[int(error)] for error in errors]

    # check if list is in the dictionary
    if str(error_name_list) in error_combinations:
        error_combinations[str(error_name_list)] += 1
    else:
        error_combinations[str(error_name_list)] = 1

In [None]:
error_combinations

{"['stamp', 'outbound']": 2098,
 "['outbound', 'frozen']": 4281,
 "['stamp', 'outbound', 'frozen']": 1,
 "['man_remove', 'outbound']": 3249413}

In [None]:
# iterate the keys
print(f"Total combinations: {len(error_combinations)}\n")
for error_name_list in error_combinations.keys():
    # # extract the errors
    # errors = key[1:-1].split(',')
    # errors = [error_indicators[int(error)] for error in errors]
    print(f"Errors: {error_name_list}, count: {error_combinations[error_name_list]}")

Total combinations: 4

Errors: ['stamp', 'outbound'], count: 2098
Errors: ['outbound', 'frozen'], count: 4281
Errors: ['stamp', 'outbound', 'frozen'], count: 1
Errors: ['man_remove', 'outbound'], count: 3249413


When creating an indicator for the combined that, we'll use these combinations as well as indicators for errors

#### Creating an indicator to data quality

In [None]:
# lets create a json file with indicator to error
error_indicator_2_dq = {}
error_list_2_indicator = {}

# add the no error indicator
error_indicator_2_dq[0] = 'No errors'
error_list_2_indicator[tuple([False]*len(error_indicators))] = 0

# iterate the errors
for error in error_indicators:
    highest_indicator = len(error_indicator_2_dq.keys())
    error_indicator_2_dq[highest_indicator] = bools_2_meta[error]['alias']
    #error.replace('_', ' ').capitalize().replace('Man', 'Manual').replace('remove', 'removal')
    error_list = [True if error == error_indicator else False for error_indicator in error_indicators]
    error_list_2_indicator[tuple(error_list)] = highest_indicator

# save the json file
with open(REFERENCE_DIR / 'error_indicator_2_dq.json', 'w') as f:
    json.dump(error_indicator_2_dq, f)

with open(REFERENCE_DIR / 'error_list_2_indicator.json', 'w') as f:
    # cannot save keys as tuple, convert to str
    error_list_2_indicator = {str(key): value for key, value in error_list_2_indicator.items()}
    json.dump(error_list_2_indicator, f)


In [None]:
# load the json file
with open(REFERENCE_DIR / 'error_indicator_2_dq.json', 'r') as f:
    error_indicator_2_dq = json.load(f)
    # convert the keys to integers
    error_indicator_2_dq = {int(key): value for key, value in error_indicator_2_dq.items()}

with open(REFERENCE_DIR / 'error_list_2_indicator.json', 'r') as f:
    error_list_2_indicator = json.load(f)
    # convert the keys to integers
    error_list_2_indicator = {eval(key): value for key, value in error_list_2_indicator.items()}

In [None]:
# add the combinations to the dictionary

for error_name_list in error_combinations.keys():
    # extract the errors
    error_name_list = eval(error_name_list)
    error_list = [True if error_name in error_name_list else False for error_name in error_indicators]
    errors_alias_list = [bools_2_meta[error_name]['alias'] for error_name in error_name_list]
    # create a error1, error2,... 
    errors_str = ', '.join(errors_alias_list)
    # add the indicator
    highest_indicator = len(error_indicator_2_dq.keys())
    error_indicator_2_dq[highest_indicator] = errors_str
    error_list_2_indicator[tuple(error_list)] = highest_indicator

# save the json file
with open(REFERENCE_DIR / 'error_indicator_2_dq.json', 'w') as f:
    json.dump(error_indicator_2_dq, f)
with open(REFERENCE_DIR / 'error_list_2_indicator.json', 'w') as f:
    # cannot save keys as tuple, convert to str
    error_list_2_indicator = {str(key): value for key, value in error_list_2_indicator.items()}
    json.dump(error_list_2_indicator, f)

In [None]:
# load the json file
with open(REFERENCE_DIR / 'error_indicator_2_dq.json', 'r') as f:
    error_indicator_2_dq = json.load(f)
    # convert the keys to integers
    error_indicator_2_dq = {int(key): value for key, value in error_indicator_2_dq.items()}

with open(REFERENCE_DIR / 'error_list_2_indicator.json', 'r') as f:
    error_list_2_indicator = json.load(f)
    # convert the keys to integers
    error_list_2_indicator = {eval(key): value for key, value in error_list_2_indicator.items()}

In [None]:
# total runtime
runtime_end = time.time()
runtime = runtime_end - runtime_start
print(f"Total runtime: {runtime:.2f} seconds, i.e. {runtime/60:.2f} minutes")

# current time
print(f"Current time: {runtime_end}")

Total runtime: 315.38 seconds, i.e. 5.26 minutes
Current time: 1729838200.324862


In [None]:

# stop system from running
raise SystemExit("Stop right there!")

SystemExit: Stop right there!

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


---

# Not going to use the full indicator

### Adding a indicator series for each sensor

In [None]:
# next step is to insert the in-sewer data into the combined data
# iterate the combined data
combined = 'full_indicator'
print(f"Combined data: {combined}")
combined_data, start_idx, end_idx, column_indices = load_dataframe_from_HDF5(data_file_path, f"{combined_data_group}/{combined}")

# iterate the sensors
for sensor in natural_sensor_order:
    print(f"Sensor: {sensor}")
    # Indicator with 1 for data and 0 for no data using the raw data
    sensor_path = f"{sensor_group_path}/{sensor}/{sensor}_raw"
    df, _, _, _ = load_dataframe_from_HDF5(data_file_path, sensor_path)

    # Indicator with 1 for data and 0 for no data
    df['indicator'] = df['value'].notna().astype(int)

    # Insert the sensor data from df into combined_data without changing combined_data's index
    combined_data[sensor] = df['indicator'].reindex(combined_data.index).combine_first(combined_data[sensor])

    # Indicator with 2 for error otherwise remove rows
    sensor_path = f"{sensor_group_path}/{sensor}/{sensor}_errors"
    df, _, _, _ = load_dataframe_from_HDF5(data_file_path, sensor_path)

    # First filter out the rows with no errors
    df['any_error'] = df.any(axis=1).astype(int)
    # drop rows with no errors
    df = df[df['any_error'] > 0]
    df = df[error_indicators]
    df['error_list'] = df.apply(tuple, axis=1)
    df['indicator'] = df['error_list'].map(error_list_2_indicator)

    # Insert the sensor data from df into combined_data without changing combined_data's index
    combined_data[sensor] = df['indicator'].reindex(combined_data.index).combine_first(combined_data[sensor])

# save the combined data
update_filtered_data_in_HDF5(data_file_path, f"{combined_data_group}/{combined}", combined_data, start_idx, end_idx, column_indices)


# clean up memory
del combined_data



Combined data: full_indicator
    Loading data from group: combined_data/full_indicator
Sensor: G80F11B_Level1
    Loading data from group: single_series/sewer_data/G80F11B_Level1/G80F11B_Level1_raw
    Loading data from group: single_series/sewer_data/G80F11B_Level1/G80F11B_Level1_errors
Sensor: G80F11B_Level2
    Loading data from group: single_series/sewer_data/G80F11B_Level2/G80F11B_Level2_raw
    Loading data from group: single_series/sewer_data/G80F11B_Level2/G80F11B_Level2_errors
Sensor: G80F66Y_Level1
    Loading data from group: single_series/sewer_data/G80F66Y_Level1/G80F66Y_Level1_raw
    Loading data from group: single_series/sewer_data/G80F66Y_Level1/G80F66Y_Level1_errors
Sensor: G80F66Y_Level2
    Loading data from group: single_series/sewer_data/G80F66Y_Level2/G80F66Y_Level2_raw
    Loading data from group: single_series/sewer_data/G80F66Y_Level2/G80F66Y_Level2_errors
Sensor: G80F13P_LevelPS
    Loading data from group: single_series/sewer_data/G80F13P_LevelPS/G80F13P_Le

In [None]:
combined = 'full_indicator'
print(f"Combined data: {combined}")
combined_data, start_idx, end_idx, column_indices = load_dataframe_from_HDF5(data_file_path, f"{combined_data_group}/{combined}")


# save to each sensor
for sensor_name in natural_sensor_order:
    print(f"Sensor: {sensor_name}")
    sensor_indicator = combined_data[sensor_name].copy().to_frame()
    sensor_indicator.reset_index(inplace=True, drop=False, names='time')
    sensor_indicator.columns = ['time', 'value']
    sensor_path = f"{sensor_group_path}/{sensor_name}"
    save_dataframe_in_HDF5(data_file_path, sensor_path, f"{sensor_name}_{combined}", sensor_indicator)

# clean up memory
del combined_data

Combined data: full_indicator
    Loading data from group: combined_data/full_indicator
Sensor: G80F11B_Level1
Sensor: G80F11B_Level2
Sensor: G80F66Y_Level1
Sensor: G80F66Y_Level2
Sensor: G80F13P_LevelPS
Sensor: G80F13Pp1_power
Sensor: G80F13Pp2_power
Sensor: G73F010
Sensor: G72F040
Sensor: G71F05R_LevelInlet
Sensor: G71F05R_LevelBasin
Sensor: G71F05R_position
Sensor: G71F04R_Level1
Sensor: G71F04R_Level2
Sensor: G71F06R_LevelInlet
Sensor: G71F68Y_LevelPS
Sensor: G71F68Yp1
Sensor: G71F68Yp1_power
Sensor: G71F68Yp2_power


#### How often is clean nan but not raw?

In [None]:
f = print_tree(data_file_path, print_tree=False)

raw_clean_mismatch = {}

for sensor in natural_sensor_order:
    sensor_raw_path = sensor_group_path + '/' + sensor + '/' + sensor + '_raw' + '/data'
    raw = f[sensor_raw_path].nxdata
    sensor_clean_path = sensor_group_path + '/' + sensor + '/' + sensor + '_clean' + '/data'
    clean = f[sensor_clean_path].nxdata
    # count how often clean is nan but raw is not (numpy arrays)
    mismatch = np.sum(np.isnan(clean) & ~np.isnan(raw))
    raw_clean_mismatch[sensor] = mismatch

This is how often the interpolation failed to fill the data gaps