<center><h1>Real Time Monitoring of Water Distribution System</h1></center>


# Installing Required Libraries

This cell installs necessary libraries like `ipywidgets`, `scipy` ,which allows Python to interact with users. It will attempt to install the library if it is not already present in the environment. 



In [11]:
pip install ipywidgets

Defaulting to user installation because normal site-packages is not writeable
Collecting ipywidgets
  Downloading ipywidgets-8.1.5-py3-none-any.whl (139 kB)
[2K     [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.8/139.8 KB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0m MB/s[0m eta [36m0:00:01[0m:01[0m
[?25hCollecting jupyterlab-widgets~=3.0.12
  Downloading jupyterlab_widgets-3.0.13-py3-none-any.whl (214 kB)
[2K     [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m214.4/214.4 KB[0m [31m727.7 kB/s[0m eta [36m0:00:00[0m MB/s[0m eta [36m0:00:01[0m
Collecting widgetsnbextension~=4.0.12
  Downloading widgetsnbextension-4.0.13-py3-none-any.whl (2.3 MB)
[2K     [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.3/2.3 MB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m[36m0:00:01[0mm eta [36m0:00:01[0m
Installing collected packages: widgetsnbextension, jupyterlab-widgets, ipywidgets
Successfully installed ipywidgets-8.1.5 jupy

In [10]:
pip install scipy

Defaulting to user installation because normal site-packages is not writeable
Collecting scipy
  Downloading scipy-1.14.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (41.2 MB)
[2K     [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m41.2/41.2 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0mm eta [36m0:00:01[0m[36m0:00:01[0m
Installing collected packages: scipy
Successfully installed scipy-1.14.1
Note: you may need to restart the kernel to use updated packages.


In [1]:
import warnings
warnings.filterwarnings("ignore")

# Understanding the Water Monitoring Dashboard

## 1. Data Acquisition:

`Kafka Consumer:`The code sets up a Kafka consumer to listen to a specific topic ('test-topic'). This consumer continuously pulls sensor data from the Kafka stream.
`Data Parsing:` The received data is parsed into JSON format, extracting the 'flow_rate' and 'turbidity' values.

## 2. Data Storage and Visualization:

`Data Queues:` The code maintains two dequeues (double-ended queues) to store the latest 50 data points for flow rate and turbidity. This limits the amount of data stored and processed, improving performance.<br>
`Dash App:` A Dash app is initialized to create a web-based dashboard. The layout consists of two graphs: one for flow rate and one for turbidity.<br>
`Graph Updates:` The `update_flow_rate_graph` and `update_turbidity_graph` functions are called periodically to update the graphs.

## 3. Real-time Monitoring:

`Interval Component:` A dcc.Interval component triggers the graph updates every second.<br>
`Threshold-Based Alerts:` The code checks if the flow rate and turbidity values exceed their respective thresholds. If so, the data points are highlighted on the graph, potentially indicating an issue in the water distribution system.

In [2]:
import json
import time
from kafka import KafkaConsumer
from collections import deque
import dash
from dash import dcc, html
from dash.dependencies import Input, Output
import plotly.graph_objs as go

# Threshold values
MIN_FLOW_RATE_THRESHOLD = 130
MAX_FLOW_RATE_THRESHOLD = 170
MIN_TURBIDITY_THRESHOLD = 0
MAX_TURBIDITY_THRESHOLD = 7

# Initialize Kafka Consumer
consumer = KafkaConsumer(
    'test-topic',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

# Data queues for both metrics
flow_rate_queue = deque(maxlen=50)
turbidity_queue = deque(maxlen=50)

# Initialize Dash app
app = dash.Dash(__name__)
app.layout = html.Div([
    html.H1("Real-Time Flow Rate and Turbidity Dashboard"),
    dcc.Graph(id='flow-rate-graph'),
    dcc.Graph(id='turbidity-graph'),
    dcc.Interval(
        id='interval-component',
        interval=1*1000,  # Update every second
        n_intervals=0
    )
])

# Function to update queues with Kafka data
def update_data():
    for message in consumer:
        data = message.value
        flow_rate = data.get('flow_rate')
        turbidity = data.get('turbidity')

        if flow_rate is not None:
            flow_rate_queue.append(flow_rate)
        if turbidity is not None:
            turbidity_queue.append(turbidity)
        
        # We break here to allow for periodic updates in Dash
        break

# Flow rate graph callback
@app.callback(
    Output('flow-rate-graph', 'figure'),
    Input('interval-component', 'n_intervals')
)
def update_flow_rate_graph(n):
    update_data()

    # Prepare flow rate plot
    flow_rate_x = list(range(len(flow_rate_queue)))
    below_flow_rate = [y if y < MIN_FLOW_RATE_THRESHOLD else None for y in flow_rate_queue]
    above_flow_rate = [y if y > MAX_FLOW_RATE_THRESHOLD else None for y in flow_rate_queue]

    figure = go.Figure()
    figure.add_trace(go.Scatter(x=flow_rate_x, y=list(flow_rate_queue), mode='lines', name='Flow Rate', line=dict(color='blue')))
    figure.add_trace(go.Scatter(x=flow_rate_x, y=below_flow_rate, mode='markers', name='Below Threshold', marker=dict(color='red', size=8)))
    figure.add_trace(go.Scatter(x=flow_rate_x, y=above_flow_rate, mode='markers', name='Above Threshold', marker=dict(color='orange', size=8)))
    figure.update_layout(
        title="Flow Rate",
        xaxis=dict(title='Time Steps', range=[0, 50]),
        yaxis=dict(title='Flow Rate', range=[min(flow_rate_queue, default=0)-1, max(flow_rate_queue, default=1)+1])
    )
    return figure

# Turbidity graph callback
@app.callback(
    Output('turbidity-graph', 'figure'),
    Input('interval-component', 'n_intervals')
)
def update_turbidity_graph(n):
    update_data()

    # Prepare turbidity plot
    turbidity_x = list(range(len(turbidity_queue)))
    below_turbidity = [y if y < MIN_TURBIDITY_THRESHOLD else None for y in turbidity_queue]
    above_turbidity = [y if y > MAX_TURBIDITY_THRESHOLD else None for y in turbidity_queue]

    figure = go.Figure()
    figure.add_trace(go.Scatter(x=turbidity_x, y=list(turbidity_queue), mode='lines', name='Turbidity', line=dict(color='green')))
    figure.add_trace(go.Scatter(x=turbidity_x, y=below_turbidity, mode='markers', name='Below Threshold', marker=dict(color='purple', size=8)))
    figure.add_trace(go.Scatter(x=turbidity_x, y=above_turbidity, mode='markers', name='Above Threshold', marker=dict(color='pink', size=8)))
    figure.update_layout(
        title="Turbidity",
        xaxis=dict(title='Time Steps', range=[0, 50]),
        yaxis=dict(title='Turbidity', range=[min(turbidity_queue, default=0)-1, max(turbidity_queue, default=1)+1])
    )
    return figure

# Run the app
if __name__ == '__main__':
    #app.run_server(debug=True)
    app.run_server(port=8051)  # Change 8051 to any other available port



[2024-11-11 19:21:01,753] ERROR in app: Exception on /_dash-update-component [POST]
Traceback (most recent call last):
  File "/home/ishan/.local/lib/python3.10/site-packages/flask/app.py", line 1473, in wsgi_app
    response = self.full_dispatch_request()
  File "/home/ishan/.local/lib/python3.10/site-packages/flask/app.py", line 882, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/home/ishan/.local/lib/python3.10/site-packages/flask/app.py", line 880, in full_dispatch_request
    rv = self.dispatch_request()
  File "/home/ishan/.local/lib/python3.10/site-packages/flask/app.py", line 865, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)  # type: ignore[no-any-return]
  File "/home/ishan/.local/lib/python3.10/site-packages/dash/dash.py", line 1376, in dispatch
    ctx.run(
  File "/home/ishan/.local/lib/python3.10/site-packages/dash/_callback.py", line 514, in add_context
    raise err
  File "/home/ishan/.local/l

## Retrieve the model

In [1]:
mean_vector = None
cov_matrix = None

from kafka import KafkaConsumer
import pickle
import numpy as np

# Initialize Kafka consumer
consumer = KafkaConsumer('test-topic', bootstrap_servers='localhost:9092')
l = []

try:
    # Receive and deserialize the array
    for message in consumer:
        received_array = pickle.loads(message.value)
        l.append(received_array)

        # Check if we have received both mean_vector and cov_matrix
        if len(l) >= 2:
            mean_vector = np.array(l[0])
            cov_matrix = np.array(l[1])
            print("mean_vector:", mean_vector)
            print("cov_matrix:", cov_matrix)
            break  # Stop after receiving the two arrays

except KeyboardInterrupt:
    print("Interrupted manually.")

finally:
    consumer.close()


mean_vector: [50.00704514  2.19076574 56.36229051 19.98691505]
cov_matrix: [[ 1.00309648e+02  9.41805566e-02  8.48376953e-01  5.17668780e-01]
 [ 9.41805566e-02  3.63305804e+00  4.53765516e-02 -2.20363993e-02]
 [ 8.48376953e-01  4.53765516e-02  2.05170088e+02 -2.36254089e-01]
 [ 5.17668780e-01 -2.20363993e-02 -2.36254089e-01  2.51695017e+01]]


# Understanding the Enhanced Water Monitoring Dashboard with Anomaly Detection

## 1. Anomaly Detection:

`Mahalanobis Distance:` This statistical technique is used to measure the distance of a data point from the mean of a multivariate distribution. A higher distance indicates an outlier or anomaly.<br>
`Threshold-Based Detection:` If the calculated Mahalanobis distance exceeds a predefined threshold, the data point is flagged as an anomaly.<br>
`Anomaly Tracking:` The anomaly_indexes list keeps track of the absolute indices of detected anomalies.

In [4]:
import json
import numpy as np
from kafka import KafkaConsumer
from collections import deque
import dash
from dash import dcc, html
from dash.dependencies import Input, Output
import plotly.graph_objs as go
from scipy.spatial.distance import mahalanobis

# Threshold for Mahalanobis distance
distance_threshold = 7
inv_cov_matrix = np.linalg.inv(cov_matrix)

# Kafka Consumer initialization
consumer = KafkaConsumer(
    'test-topic',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

# Data queues for real-time plots
flow_rate_queue = deque(maxlen=50)
turbidity_queue = deque(maxlen=50)
pressure_queue = deque(maxlen=50)
temperature_queue = deque(maxlen=50)
anomaly_indexes = []  # Holds absolute indexes of anomalies

# Initialize Dash app
app = dash.Dash(__name__)
app.layout = html.Div([
    html.H1("Real-Time Flow Rate and Turbidity Dashboard with Anomaly Detection"),
    dcc.Graph(id='flow-rate-graph'),
    dcc.Graph(id='turbidity-graph'),
    dcc.Interval(
        id='interval-component',
        interval=1*1000,  # Update every second
        n_intervals=0
    )
])

# Initialize a message counter to track absolute positions
message_counter = 0

# Function to update data from Kafka and detect anomalies
def update_data():
    global message_counter
    for message in consumer:
        data = message.value
        flow_rate = data.get('flow_rate')
        turbidity = data.get('turbidity')
        pressure = data.get('pressure')
        temperature = data.get('temperature')

        if None not in [flow_rate, turbidity, pressure, temperature]:
            # Append new data to queues
            flow_rate_queue.append(flow_rate)
            turbidity_queue.append(turbidity)
            pressure_queue.append(pressure)
            temperature_queue.append(temperature)

            # Calculate Mahalanobis distance
            new_point = np.array([flow_rate, turbidity, pressure, temperature])
            distance = mahalanobis(new_point, mean_vector, inv_cov_matrix)

            # Check if distance exceeds threshold, if so, mark as anomaly with absolute index
            if distance > distance_threshold:
                anomaly_indexes.append(message_counter)

            message_counter += 1
        break  # Update one message per interval

# Flow rate graph callback
@app.callback(
    Output('flow-rate-graph', 'figure'),
    Input('interval-component', 'n_intervals')
)
def update_flow_rate_graph(n):
    update_data()

    # Prepare x-axis based on the current count of messages
    flow_rate_x = list(range(message_counter - len(flow_rate_queue), message_counter))

    # Filter anomaly points that are within the visible range of flow_rate_x
    anomaly_x = [i for i in anomaly_indexes if i in flow_rate_x]
    anomaly_y = [flow_rate_queue[flow_rate_x.index(i)] for i in anomaly_x]

    figure = go.Figure()
    figure.add_trace(go.Scatter(x=flow_rate_x, y=list(flow_rate_queue), mode='lines', name='Flow Rate', line=dict(color='blue')))
    figure.add_trace(go.Scatter(x=anomaly_x, y=anomaly_y, mode='markers', name='Anomaly', marker=dict(color='red', size=8)))
    figure.update_layout(
        title="Flow Rate",
        xaxis=dict(title='Time Steps', range=[flow_rate_x[0], flow_rate_x[-1]]),
        yaxis=dict(title='Flow Rate', range=[min(flow_rate_queue, default=0)-1, max(flow_rate_queue, default=1)+1])
    )
    return figure

# Turbidity graph callback
@app.callback(
    Output('turbidity-graph', 'figure'),
    Input('interval-component', 'n_intervals')
)
def update_turbidity_graph(n):
    update_data()

    # Prepare x-axis for turbidity
    turbidity_x = list(range(message_counter - len(turbidity_queue), message_counter))

    figure = go.Figure()
    figure.add_trace(go.Scatter(x=turbidity_x, y=list(turbidity_queue), mode='lines', name='Turbidity', line=dict(color='green')))
    figure.update_layout(
        title="Turbidity",
        xaxis=dict(title='Time Steps', range=[turbidity_x[0], turbidity_x[-1]]),
        yaxis=dict(title='Turbidity', range=[min(turbidity_queue, default=0)-1, max(turbidity_queue, default=1)+1])
    )
    return figure

# Run the app
if __name__ == '__main__':
    app.run_server(port=8051)


[2024-11-11 19:22:52,324] ERROR in app: Exception on /_dash-update-component [POST]
Traceback (most recent call last):
  File "/home/ishan/.local/lib/python3.10/site-packages/flask/app.py", line 1473, in wsgi_app
    response = self.full_dispatch_request()
  File "/home/ishan/.local/lib/python3.10/site-packages/flask/app.py", line 882, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/home/ishan/.local/lib/python3.10/site-packages/flask/app.py", line 880, in full_dispatch_request
    rv = self.dispatch_request()
  File "/home/ishan/.local/lib/python3.10/site-packages/flask/app.py", line 865, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)  # type: ignore[no-any-return]
  File "/home/ishan/.local/lib/python3.10/site-packages/dash/dash.py", line 1376, in dispatch
    ctx.run(
  File "/home/ishan/.local/lib/python3.10/site-packages/dash/_callback.py", line 514, in add_context
    raise err
  File "/home/ishan/.local/l