# Use-Case: Machine Learning for Water Quality State Estimation

Water quality state estimation is a crucial task for the successful operation of Water Distribution Networks. However, given the complex dynamics, estimating water quality states in the network is also very challenging.
In this context, Machine Learning based approaches offer promising solutions.

### Outline

This notebook demonstrates the complete workflow of building a **simple** Machine Learning based water quality state estimator using [EPyT-Flow](https://github.com/WaterFutures/EPyT-Flow) and [TensorFlow](https://www.tensorflow.org/):

1. Dataset Generation
    - Scenario generation and simulation
    - Post-processing to prepare data for Machine Learning
2. Training a Machine Learning Model
    - Specifying a *Deep Recurrent Neural Network* for water quality state estimation
    - Data pre-processing to avoid numerical problems
    - Train the *Deep Recurrent Neural Network*
3. Evaluation
    - Computing several standard evaluation metrics
    - Visualize results in a time series plot

In [None]:
%pip install epyt-flow tensorflow[cpu]

In [None]:
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=ImportWarning)

import os
import random
import numpy as np
import tensorflow as tf
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score, max_error

from epyt_flow.simulation import ScenarioSimulator, ToolkitConstants, ScenarioConfig
from epyt_flow.topology import NetworkTopology
from epyt_flow.data.benchmarks import load_leakdb_scenarios
from epyt_flow.utils import to_seconds, plot_timeseries_prediction, plot_timeseries_data, create_path_if_not_exist

### 1. Dataset Generation

#### Scenario Generation

For this notebook, we collect leak-free scenarios from the [LeakDB](https://github.com/KIOS-Research/LeakDB) Hanoi dataset and modify them as follows:

- 30 days simulation duration
- 30 min intervals
- Initial Cl concentration is equal to zero
- Set all reaction coefficients to zero
- Cl injection (spike pattern) at the reservoir
- Flow sensors at all nodes
- Cl concentration sensors at all nodes

In [None]:
# Function for generating spike patterns
def create_spike_pattern(pattern_length: int) -> np.ndarray:
    steps = np.array([*range(1, pattern_length-1, 1)])
    pattern_mult = .5*np.sin(steps*.5) + .5

    pattern_mult[8] = 0.0001
    spike_pattern = np.copy(pattern_mult[:9])
    pattern_mult[9:] = 0

    delay = 50  # Spike pattern -- random distance (>= 50) between two spikes
    rand_offset = 50
    cur_idx = 9

    pattern_len = len(pattern_mult)
    len_pattern = len(spike_pattern)
    while (cur_idx + delay + rand_offset + len_pattern) < pattern_len:
        cur_idx += delay + random.randint(0, rand_offset)
        pattern_mult[cur_idx:cur_idx+len_pattern] = np.copy(spike_pattern)
        cur_idx += len_pattern

    return pattern_mult

In [None]:
# Simulate a given scenario
def run_sim(scenario_config: ScenarioConfig, f_out: str) -> None:
    with ScenarioSimulator(scenario_config=scenario_config) as sim:
        # Set general parameters
        sim.set_general_parameters(simulation_duration=to_seconds(days=30),
                                   hydraulic_time_step=to_seconds(minutes=30),
                                   reporting_time_step=to_seconds(minutes=30),
                                   quality_time_step=to_seconds(minutes=5))

        # Make things easy:
        # Set initial concentrations to zero
        # Set all reaction coefficients to zero
        zeroNodes = [0] * sim.epanet_api.getNodeCount()
        sim.epanet_api.setNodeInitialQuality(zeroNodes)
        sim.epanet_api.setLinkBulkReactionCoeff([0] * sim.epanet_api.getLinkCount())
        sim.epanet_api.setLinkWallReactionCoeff([0] * sim.epanet_api.getLinkCount())

        # Add chlorine (spike pattern) injection at the reservoir
        sim.enable_chemical_analysis()

        pattern_length = max(sim.epanet_api.getPatternLengths())
        pattern = create_spike_pattern(pattern_length)

        reservoir_id = sim.epanet_api.getNodeReservoirNameID()[0]
        reservoir_idx = sim.epanet_api.getNodeIndex(reservoir_id)
        sim.add_quality_source(node_id=reservoir_id,
                               pattern=pattern,
                               source_type=ToolkitConstants.EN_CONCEN)

        # Place quality and flow sensor at all nodes/links
        sim.set_flow_sensors(sensor_locations=sim.sensor_config.links)
        sim.set_node_quality_sensors(sensor_locations=sim.sensor_config.nodes)

        # Run simulation and store results
        res = sim.run_simulation()

        sim.get_topology().save_to_file(os.path.join(os.path.dirname(f_out), "hanoi"))  # Store topology information

        np.savez(f_out,
                 injection_node_idx=reservoir_idx - 1,  # NOTE: Indices start at zero in Python, while EPANET starts at 1!
                 injection_pattern=pattern,
                 node_ids=sim.sensor_config.nodes,
                 link_ids=sim.sensor_config.links,
                 flow_data=res.get_data_flows(),
                 node_quality=res.get_data_nodes_quality())

In [None]:
# Create new directory for storing all generated data
path_to_data = "quality-example-data"
create_path_if_not_exist(path_to_data)

In [None]:
# Collect leak free scenarios from first 100 LeakDB Hanoi scenarios
scenarios = load_leakdb_scenarios(range(100), use_net1=False)
scenarios = list(filter(lambda s: len(s.system_events) == 0, scenarios))  # Filter out scenarios with leakages
print(f"Number of scenarios: {len(scenarios)}")

In [None]:
# Simulate all scenarios and store results
for i, scenario in enumerate(scenarios):
    run_sim(scenario, os.path.join(path_to_data, f"{i}.npz"))

#### Data Preparation

Before we can build a Machine Learning model, we have to prepare the data accordingly -- i.e. we need input data and the corresponding output.

Here, use the current flows at all links and the current Cl concentration at the reservoir as an input to predict Cl concentration (output) at a specified target node. This means that the model has to learn how to use the flow to compute how the Cl travels from the reservoir through the network.

In [None]:
# Create input-output data from simulation results
def prepare_data(flows: np.ndarray, cl_conc: np.ndarray, target_node_idx: int,
                 injection_nodes_idx: int) -> tuple[np.ndarray, np.ndarray]:
    X = []  # Input
    y = []  # Output

    cur_time = 0
    while cur_time < flows.shape[0]:
        X.append(np.concatenate((flows[cur_time, :].flatten(),
                                 cl_conc[cur_time, injection_nodes_idx].flatten())))
        y.append(cl_conc[cur_time, target_node_idx])

        cur_time += 1

    return np.array(X), np.array(y)

In our example, we want to predict Cl concentration at node/junction "25":

![alt text](quality_task.png)

In [None]:
# Load topology information
topo = NetworkTopology.load_from_file(os.path.join(path_to_data, "hanoi.epytflow_topology"))

# Specify index of the node for which we want to predict the Cl concentration over time
target_node_idx = topo.get_all_nodes().index("25")    # We want to predict Cl concentration at node/junction "25"

In [None]:
# Process simulated scenarios
X = []
y = []
for i in range(len(scenarios)):
    data = np.load(os.path.join(path_to_data, f"{i}.npz"))

    X_i, y_i = prepare_data(data["flow_data"], data["node_quality"],
                            target_node_idx=target_node_idx,
                            injection_nodes_idx=data["injection_node_idx"])

    X.append(X_i)
    y.append(y_i)

In [None]:
# Plot one output (first 500 time steps) for illustrative purposes
plot_timeseries_data(np.array(y[0][:500]).reshape(1, -1),
                     x_axis_label="Time steps (30min)",
                     y_axis_label="Cl concentration")

Finally, we have to split the data into train, validation, and test sets to properly evaluate the generalizability of our Machine Learning model.

Here, we use the first 50% as the training data, the next 25% as the validation set, and the last 25% as the test set that we are going to use to evaluate our Machine Learning model.

In [None]:
# Split data into train, validation, and test set
split_points = [int(len(X) / 2), int(len(X) / 2) + int(len(X) / 4)]
X_train, X_val, X_test = np.array(X[:split_points[0]]), np.array(X[split_points[0]:split_points[1]]), np.array(X[split_points[1]:])
y_train, y_val, y_test = np.array(y[:split_points[0]]), np.array(y[split_points[0]:split_points[1]]), np.array(y[split_points[1]:])

print((X_train.shape, y_train.shape), (X_val.shape, y_val.shape), (X_test.shape, y_test.shape))

### 2. Training a Machine Learning Model

After having prepared the data, we can finally build and training a Machine Learning model to predict the Cl concentration at a specific node.
This involves the following steps:

1. Pre-processing the data, i.e. scaling, to avoid numerical problems.
2. Specify the Machine Learning model -- here, specifying the *Deep Recurrent Neural Network*.
3. Train the Deep Recurrent Neural Network on the training data to predict the Cl concentration.

In [None]:
# Scale data to avoid numeric instabilities
scaler = StandardScaler()
scaler.fit(X_train.reshape(-1, X_train.shape[-1]))

X_train = scaler.transform(X_train.reshape(-1, X_train.shape[-1])).reshape(X_train.shape)
X_val = scaler.transform(X_val.reshape(-1, X_val.shape[-1])).reshape(X_val.shape)
X_test = scaler.transform(X_test.reshape(-1, X_test.shape[-1])).reshape(X_test.shape)

We propose a *Deep Recurrent Neural Network* (DRNN) architecture that predicts (over time) the chlorine concentration at the target node based on (over time) all flows and the chlorine injection at the reservoir.

The architecture is *recurrent* because it has a memory component -- this memory component allows the neural network to internally model the transport delay.

<img src="quality_drnn.png"/>

In [None]:
# Specifying the Neural Network architecture
class RecurrentDeepNeuralNetwork():
    def __init__(self, output_size: int = 1, input_shape: list[int] = None,
                 hidden_layer_sizes: list[int] = (128, 64), activation: str = 'tanh'):
        self.model = tf.keras.Sequential(
                [tf.keras.layers.Input(input_shape)] +        # Input layer
                [tf.keras.layers.LSTM(ls, activation=activation, return_sequences=True) # Hidden layer
                 for ls in hidden_layer_sizes] +
                [tf.keras.layers.Dense(output_size, activation="relu")])    # No negative outputs in the last layer -- Cl concentration cannot be negative!
        self.solver = "adam"

    def save(self, f_out: str = "recurrent_pred_state.keras") -> None:
        self.model.save(f_out)

    def load(self, f_in: str = "recurrent_pred_state.keras") -> None:
        self.model = tf.keras.models.load_model(f_in)

    def fit(self, X_train_flows: np.ndarray, y_train_changes: np.ndarray, n_epochs: int = 500,
            callbacks=[], val=None) -> None:   
        # Fit model by minimizing loss function -- i.e. mean squared error
        self.model.compile(optimizer=self.solver, loss=tf.keras.losses.MeanSquaredError(), metrics=["mse"])
        self.model.fit(X_train_flows, y_train_changes, epochs=n_epochs, verbose=True, callbacks=callbacks, validation_data=val)

    def __call__(self, X_flows: np.ndarray) -> np.ndarray:
        return self.predict(X_flows)

    def predict(self, X_flows: np.ndarray) -> np.ndarray:
        return self.model(X_flows, training=False).numpy()

In [None]:
# Build and train a neural network for predicting the Cl concentration
model = RecurrentDeepNeuralNetwork(input_shape=(X_train.shape[1], X_train.shape[2]))

# Fit neural network for at most 100 iterations (early stopping will stop training if there is no improvement for 10 iterations)
earlystopping_mgr = tf.keras.callbacks.EarlyStopping(monitor='val_mse', min_delta=0, patience=10, verbose=0, mode='min',
                                                     baseline=None, restore_best_weights=True, start_from_epoch=0)
model.fit(X_train, y_train, n_epochs=100, callbacks=[earlystopping_mgr], val=(X_val, y_val))

# Save final (trained) model
model.save(os.path.join(path_to_data, f"model_node{target_node_idx}.keras"))

### 3. Evaluation

As the last step, we evaluate our trained Neural Network on the unseen test data set to assess its generalizability -- i.e. can be predict the Cl concentration for unseen scenarios (i.e. different demand pattern and different spike pattern)?

In [None]:
# Compute standard evaluation metrics
y_test_pred = model.predict(X_test)

print(f"MSE: {mean_squared_error(y_test.flatten(), y_test_pred.flatten())}\n" +
      f"R^2: {r2_score(y_test.flatten(), y_test_pred.flatten())}\n" +
      f"MAB: {mean_absolute_error(y_test.flatten(), y_test_pred.flatten())}\n" +
      f"ME: {max_error(y_test.flatten(), y_test_pred.flatten())}")

In [None]:
s_id = 0   # Index of test scenario -- note that we have 6 test scenarios

# Time window to plot -- i.e. first 1000 time steps
t0 = 0
t1 = 1000

# Plot ground truth and predicted Cl concentration
#plot_timeseries_data(y_test[s_id, t0:t1].reshape(1, -1))
plot_timeseries_prediction(y_test[s_id, t0:t1].flatten(), y_test_pred[s_id, t0:t1].flatten(),
                           x_axis_label="Time steps (30min)", y_axis_label="Cl concentration")