## Real-time processing of Air Quality Data for Anomaly detection


To integrate our Python program with Bytewax for building a dataflow architecture, where input and deserialization are stateless and anomaly detection is stateful, we'll follow these steps:

1. Set up the Bytewax Dataflow: Define the dataflow to ingest data, perform the deserialization, imputation, and pass the data through the anomaly detection which is stateful.
2. Integrate Stateless Steps: These include reading input, deserializing data, and imputing missing values using KNN.
3. Integrate Stateful Step: This will be your anomaly detection, which maintains state over the window of data it analyzes.

In [11]:
!pip install bytewax==0.19 python-dotenv scipy==1.13.0 kafka-python==2.0.2
!pip install pandas==2.0.3 river
!pip install scikit-learn==1.4.2

Collecting river
  Downloading river-0.21.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.9/3.9 MB[0m [31m12.5 MB/s[0m eta [36m0:00:00[0m
INFO: pip is looking at multiple versions of river to determine which version is compatible with other requirements. This could take a while.
  Downloading river-0.20.1.tar.gz (796 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m796.8/796.8 kB[0m [31m20.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
  Downloading river-0.19.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.8/3.8 MB[0m [31m29.4 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: river
Successfully installed river-0.19

In [2]:
import bytewax.operators as op

from bytewax.dataflow import Dataflow
from bytewax.testing import TestingSource
from bytewax.connectors.stdio import StdOutSink

from bytewax.inputs import StatelessSourcePartition, DynamicSource


import json
from bytewax.testing import run_main


In [3]:
import requests
import json
from datetime import datetime, timezone
from river import anomaly
from sklearn.impute import KNNImputer
import pandas as pd
import numpy as np

def impute_data_with_knn(deserialized_data):
    """
    Takes a list of dictionaries from deserialized data, converts it into a DataFrame,
    performs KNN imputation, and converts it back into a list of dictionaries.

    Args:
    deserialized_data: A list of dictionaries containing sensor data.

    Returns:
    A list of dictionaries with imputed data.
    """
    # Convert list of dictionaries to DataFrame
    df = pd.DataFrame(deserialized_data)

    # Ensure all numeric columns are in appropriate data types
    for column in df.columns:
        if df[column].dtype == 'object':
            try:
                df[column] = pd.to_numeric(df[column])
            except ValueError:
                continue  # Keep non-convertible columns as object if needed

    # Apply KNN imputer to numeric columns
    numeric_columns = df.select_dtypes(include=[np.number]).columns
    imputer = KNNImputer(n_neighbors=5, weights='uniform')
    imputed_array = imputer.fit_transform(df[numeric_columns])

    # Update numeric columns in DataFrame with imputed values
    df[numeric_columns] = imputed_array

    # Convert DataFrame back to a list of dictionaries
    imputed_data = df.to_dict(orient='records')

    return imputed_data


def serialize(data):
    """
    This function serializes the data by converting it
    to a JSON string and then encoding it to bytes.

    Args:
    data: A dictionary containing the data to be serialized.

    Returns:
    A list of serialized data in bytes format.
    """
    headers = data['fields']
    serialized_data = []

    for entry in data['data']:
        try:
            # Create a dictionary for each entry, matching fields with values
            entry_data = {headers[i]: entry[i] for i in range(len(headers))}
            # Convert the dictionary to a JSON string and then encode it to bytes
            entry_bytes = json.dumps(entry_data).encode('utf-8')
            serialized_data.append(entry_bytes)
        except IndexError:
            # This block catches cases where the entry might not have all the fields
            print("IndexError with entry:", entry)
            continue

    return serialized_data

def deserialize(byte_objects_list):
    """
    This function deserializes the data by decoding the bytes
    it converts epoch time to a datetime object and converts
    "pm2.5_cf_1" to a float.

    Args:
    byte_objects_list: A list of byte objects to be deserialized.

    Returns:
    A list of dictionaries containing the deserialized data.
    """
    results = []  # List to hold the processed sensor data
    for byte_object in byte_objects_list:
        if byte_object:  # Check if byte_object is not empty
            sensor_data = json.loads(byte_object.decode('utf-8'))  # Decode and load JSON from bytes

            # Convert "pm2.5_cf_1" to a float, check if the value exists and is not None
            if 'pm2.5_cf_1' in sensor_data and sensor_data['pm2.5_cf_1'] is not None:
                sensor_data['pm2.5_cf_1'] = float(sensor_data['pm2.5_cf_1'])

            # Convert "date_created" from Unix epoch time to a datetime object, check if the value exists
            if 'date_created' in sensor_data and sensor_data['date_created'] is not None:
                sensor_data['date_created'] = datetime.fromtimestamp(sensor_data['date_created'], tz=timezone.utc)

            results.append(sensor_data)  # Add the processed data to the results list

    return results

class AnomalyDetector:
    """
    Anomaly detector using HalfSpaceTrees from River library

    This class is used to detect anomalies in the data using online ML models
    with the River library
    """

    def __init__(self, n_trees=10, height=8, window_size=72, seed=11):
        """
        Initialize the anomaly detector
        """
        self.detector = anomaly.HalfSpaceTrees(
            n_trees=n_trees,
            height=height,
            window_size=window_size,
            limits={'x': (0.0, 1200)},  # ensure these limits make sense for your data
            seed=seed
        )

    def update(self, data):
        """
        Update the anomaly detector with new data
        """
        # Check if 'pm1.0_cf_1' is not None and is a floatable type
        if data['pm1.0_cf_1'] is not None:
            try:
                value = float(data['pm1.0_cf_1'])
                score = self.detector.score_one({'x': value})
                self.detector.learn_one({'x': value})
                data['score'] = score
            except ValueError:
                print(f"Skipping entry, invalid data for pm1.0_cf_1: {data['pm1.0_cf_1']}")
        else:
            print(f"Skipping entry, missing data for pm1.0_cf_1: {data}")
        return data

def mapper(state, value):
  if state is None:
    state = AnomalyDetector(n_trees=4, height=3, window_size=50, seed=11)

    state.update(value)
    return (state, state.summarize())



In [16]:
# Opening JSON file
f = open('data.json')

# returns JSON object as
# a dictionary
data = json.load(f)



To convert the `serialize` function into a Bytewax stream-equivalent format, we need to create a data source that behaves as a generator or a source of streaming data. Below, I will define two classes to model this behavior: one for partition-specific streaming data (`SerializedData`), and another to encapsulate the dynamic data generation across potentially multiple workers (`SerializedInput`).

Step 1: Define `SerializedData` as a `StatelessSourcePartition`
This class will act as a source partition that iterates over a dataset, serializing each entry according to the provided headers and fields.

Step 2: Define `SerializedInput` as a `DynamicSource`
This class encapsulates the partition management for the data source, ensuring that each worker in a distributed environment gets a proper instance of the source partition.

In [24]:
class SerializedData(StatelessSourcePartition):
    """
    Emit serialized data directly for simplicity. This class will serialize
    each entry in the 'data' list by mapping it to the corresponding 'fields'.
    """
    def __init__(self, full_data):
        self.fields = full_data['fields']
        self.data_entries = full_data['data']
        self.metadata = {k: v for k, v in full_data.items() if k not in ['fields', 'data']}
        self._it = iter(self.data_entries)

    def next_batch(self):
        try:
            data_entry = next(self._it)
            # Map each entry in 'data' with the corresponding field in 'fields'
            data_dict = dict(zip(self.fields, data_entry))
            # Merge metadata with data_dict to form the complete record
            complete_record = {**self.metadata, **{"data": data_dict}}
            # Serialize the complete record
            serialized = json.dumps(complete_record).encode('utf-8')
            return [serialized]
        except StopIteration:
            raise StopIteration


class SerializedInput(DynamicSource):
    """
    Dynamic data source that partitions the input data among workers.
    """
    def __init__(self, data):
        self.data = data
        self.total_entries = len(data['data'])

    def build(self, step_id, worker_index, worker_count):
        # Calculate the slice of data each worker should handle
        part_size = self.total_entries // worker_count
        start = part_size * worker_index
        end = start + part_size if worker_index != worker_count - 1 else self.total_entries

        # Create a partition of the data for the specific worker
        # Note: This partitions only the 'data' array. Metadata and fields are assumed
        # to be common and small enough to be replicated across workers.
        data_partition = {
            "api_version": self.data['api_version'],
            "time_stamp": self.data['time_stamp'],
            "data_time_stamp": self.data['data_time_stamp'],
            "max_age": self.data['max_age'],
            "firmware_default_version": self.data['firmware_default_version'],
            "fields": self.data['fields'],
            "data": self.data['data'][start:end]
        }

        return SerializedData(data_partition)

* Data Initialization: The `SerializedData` class now takes the entire data structure, keeps the metadata, and iterates over the data list. Each entry in data is mapped to the corresponding field specified in fields, combined with the metadata, serialized into a JSON string, and then encoded.

* Integration into Dataflow: The class is used directly within a Bytewax dataflow as an input source, demonstrating how serialized data would be produced from the structured input.

We can then deserialize the data with a simple function.


In [25]:
def process_deserialized_data(byte_data):
    """Convert serialized byte data back to dictionary."""
    sensor_data = json.loads(byte_data.decode('utf-8'))
    return sensor_data

In [27]:
flow = Dataflow("air-quality-flow")
inp = op.input("inp", flow, SerializedInput(data))
deserialize = op.map("deserialize", inp, process_deserialized_data)
# impute = op.map("impute", deserialize, impute_data_with_knn)
# anomaly = op.stateful_map("anomaly_detector", impute, mapper )
op.inspect("inspect_deserialized", deserialize)
run_main(flow)

air-quality-flow.inspect_deserialized: {'api_version': 'V1.0.12-0.0.54', 'time_stamp': 1712793708, 'data_time_stamp': 1712793665, 'max_age': 604800, 'firmware_default_version': '7.02', 'data': {'sensor_index': 53, 'date_created': 1454548891, 'rssi': -50, 'uptime': 10183, 'latitude': 40.246742, 'longitude': -111.7048, 'humidity': None, 'temperature': None, 'pressure': None, 'pm1.0': 0.0, 'pm2.5_alt': 2.1, 'pm10.0': 0.0, 'pm1.0_cf_1': 0.0, 'pm2.5_atm': 0.0, 'pm2.5_cf_1': 0.0, 'pm10.0_cf_1': 0.0}}
air-quality-flow.inspect_deserialized: {'api_version': 'V1.0.12-0.0.54', 'time_stamp': 1712793708, 'data_time_stamp': 1712793665, 'max_age': 604800, 'firmware_default_version': '7.02', 'data': {'sensor_index': 77, 'date_created': 1456896339, 'rssi': -58, 'uptime': 2320, 'latitude': 40.750816, 'longitude': -111.82529, 'humidity': None, 'temperature': None, 'pressure': None, 'pm1.0': 15.5, 'pm2.5_alt': 14.5, 'pm10.0': 15.9, 'pm1.0_cf_1': 15.5, 'pm2.5_atm': 15.8, 'pm2.5_cf_1': 15.8, 'pm10.0_cf_1': 

KeyboardInterrupt: (src/run.rs:150:17) interrupt signal received, all processes have been shut down

In [8]:
from bytewax.inputs import ManualInputConfig

ImportError: cannot import name 'ManualInputConfig' from 'bytewax.inputs' (/usr/local/lib/python3.10/dist-packages/bytewax/inputs.py)