In [1]:
import json, random, time, datetime
import redis

server = redis.Redis(host='localhost', port=6379, db=0)

SENSOR_ID = "wtf-pipe-1"

def generate_reading():
    return {
        "timestamp": datetime.datetime.now().isoformat() + 'Z',
        "sensor_id": SENSOR_ID,
        "temperature": round(random.uniform(10, 35), 1),
        "pressure": round(random.uniform(1.0, 3.0), 2),
        "flow": round(random.uniform(20, 100), 1)
    }
count = 0
while count < 20:
    reading = generate_reading()
    print(f"Publishing reading: {reading}")
    server.publish('sensor_data', str(reading)) #json.dumps(reading))
    count += 1
    time.sleep(2)



Publishing reading: {'timestamp': '2025-05-27T12:54:43.207045Z', 'sensor_id': 'wtf-pipe-1', 'temperature': 13.6, 'pressure': 1.52, 'flow': 40.5}
Publishing reading: {'timestamp': '2025-05-27T12:54:45.212780Z', 'sensor_id': 'wtf-pipe-1', 'temperature': 22.5, 'pressure': 2.2, 'flow': 43.9}
Publishing reading: {'timestamp': '2025-05-27T12:54:47.214884Z', 'sensor_id': 'wtf-pipe-1', 'temperature': 26.4, 'pressure': 1.81, 'flow': 30.7}
Publishing reading: {'timestamp': '2025-05-27T12:54:49.216363Z', 'sensor_id': 'wtf-pipe-1', 'temperature': 31.4, 'pressure': 2.97, 'flow': 86.8}
Publishing reading: {'timestamp': '2025-05-27T12:54:51.218125Z', 'sensor_id': 'wtf-pipe-1', 'temperature': 21.7, 'pressure': 1.89, 'flow': 93.9}
Publishing reading: {'timestamp': '2025-05-27T12:54:53.219451Z', 'sensor_id': 'wtf-pipe-1', 'temperature': 29.6, 'pressure': 2.5, 'flow': 53.8}
Publishing reading: {'timestamp': '2025-05-27T12:54:55.221136Z', 'sensor_id': 'wtf-pipe-1', 'temperature': 28.9, 'pressure': 1.45, '

In [2]:
import json, time, datetime
import redis
from collections import deque

r = redis.Redis(host='localhost', port=6379, db=0)

sub = r.pubsub()
sub.subscribe('sensor_data')

WINDOW = deque(maxlen=10)  # for drift
DRIFT_THRESHOLD = 38.0
SPIKE_THRESHOLDS = {'pressure': 4.0, 'flow': 120.0}
DROPOUT_SECONDS = 10

last_timestamp = time.time()

def detect_anomalies(data):
    global last_timestamp
    now = time.time()
    last_timestamp = now
    anomalies = []

    # Spike detection
    for param, threshold in SPIKE_THRESHOLDS.items():
        if data[param] > threshold:
            anomalies.append({
                "type": "spike",
                "timestamp": data["timestamp"],
                "sensor_id": data["sensor_id"],
                "parameter": param,
                "value": data[param],
                "message": f"{param.title()} spike detected: {data[param]}"
            })

    # Drift detection
    WINDOW.append(data)
    temps = [x['temperature'] for x in WINDOW]
    if all(t > DRIFT_THRESHOLD for t in temps) and len(WINDOW) == WINDOW.maxlen:
        anomalies.append({
            "type": "drift",
            "timestamp": data["timestamp"],
            "sensor_id": data["sensor_id"],
            "parameter": "temperature",
            "value": temps[-1],
            "duration_seconds": 20,
            "message": f"Temperature drift detected over 20 seconds."
        })
        WINDOW.clear()

    if anomalies:
        with open("./storage/anomaly_log.json", "a") as f:
            for a in anomalies:
                f.write(json.dumps(a) + "\n")

for msg in sub.listen():
    print(msg, msg['data'])
    if msg['type'] == 'message':
        print(f"Received message: {msg['data'].decode('utf-8')}")
    continue
        # detect_anomalies(json.loads(msg['data']))


{'type': 'subscribe', 'pattern': None, 'channel': b'sensor_data', 'data': 1} 1


KeyboardInterrupt: 

In [3]:
import json, time, datetime
import redis
from collections import deque
import random
import os


def generate_reading():
    return {
        "timestamp": datetime.datetime.now().isoformat() + 'Z',
        "sensor_id": SENSOR_ID,
        "temperature": round(random.uniform(10, 40), 1),
        "pressure": round(random.uniform(1.0, 4.0), 2),
        "flow": round(random.uniform(20, 130), 1)
    }


WINDOW = deque(maxlen=10)  # for drift
DRIFT_THRESHOLD = 38.0
SPIKE_THRESHOLDS = {'pressure': 4.0, 'flow': 120.0}
DROPOUT_SECONDS = 10

def detect_anomalies(data:dict):
    global last_timestamp
    now = time.time()
    last_timestamp = now
    anomalies = []

    # Spike detection
    for param, threshold in SPIKE_THRESHOLDS.items():
        if data[param] > threshold:
            print(f"Spike detected: {param} = {data[param]}")
            anomalies.append({
                "type": "spike",
                "timestamp": data["timestamp"],
                "sensor_id": data["sensor_id"],
                "parameter": param,
                "value": data[param],
                "message": f"{param.title()} spike detected: {data[param]}"
            })

    # Drift detection
    WINDOW.append(data)
    temps = [x['temperature'] for x in WINDOW]
    if all(t > DRIFT_THRESHOLD for t in temps) and len(WINDOW) == WINDOW.maxlen:
        print(f"Drift detected: temperature = {temps[-1]}")
        anomalies.append({
            "type": "drift",
            "timestamp": data["timestamp"],
            "sensor_id": data["sensor_id"],
            "parameter": "temperature",
            "value": temps[-1],
            "duration_seconds": 20,
            "message": f"Temperature drift detected over 20 seconds."
        })
        WINDOW.clear()

    if now - last_timestamp > DROPOUT_SECONDS:
        anomalies.append({
            "type": "dropout",
            "timestamp": datetime.datetime.now().isoformat(),
            "sensor_id": SENSOR_ID,
            "parameter": "all",
            "message": f"No data received in {int(now - last_timestamp)} seconds."
        })


    if anomalies:
        log_path = os.path.join("storage", "anomaly_log.json")
        os.makedirs(os.path.dirname(log_path), exist_ok=True)
        with open(log_path, "a") as f:
            for a in anomalies:
                f.write(json.dumps(a) + "\n")


count = 0
while count < 20:
    reading = generate_reading()
    print(f"Publishing reading: {reading}")

    detect_anomalies(reading)
    count += 1
    time.sleep(2) # Sleep to avoid busy waiting

Publishing reading: {'timestamp': '2025-05-27T12:56:34.191635Z', 'sensor_id': 'wtf-pipe-1', 'temperature': 27.5, 'pressure': 3.31, 'flow': 91.9}
Publishing reading: {'timestamp': '2025-05-27T12:56:36.191922Z', 'sensor_id': 'wtf-pipe-1', 'temperature': 17.3, 'pressure': 3.3, 'flow': 73.0}
Publishing reading: {'timestamp': '2025-05-27T12:56:38.192521Z', 'sensor_id': 'wtf-pipe-1', 'temperature': 38.3, 'pressure': 3.58, 'flow': 92.6}
Publishing reading: {'timestamp': '2025-05-27T12:56:40.193040Z', 'sensor_id': 'wtf-pipe-1', 'temperature': 20.9, 'pressure': 3.63, 'flow': 45.6}
Publishing reading: {'timestamp': '2025-05-27T12:56:42.193573Z', 'sensor_id': 'wtf-pipe-1', 'temperature': 37.0, 'pressure': 3.13, 'flow': 47.1}
Publishing reading: {'timestamp': '2025-05-27T12:56:44.194214Z', 'sensor_id': 'wtf-pipe-1', 'temperature': 11.0, 'pressure': 3.05, 'flow': 48.7}
Publishing reading: {'timestamp': '2025-05-27T12:56:46.194994Z', 'sensor_id': 'wtf-pipe-1', 'temperature': 39.1, 'pressure': 2.27, 

In [5]:
from langchain_ollama import OllamaLLM
from langchain.prompts import PromptTemplate

MODEL_NAME = "llama3.2"

llm = OllamaLLM(model=MODEL_NAME)
prompt = PromptTemplate.from_template("""
            As a water treatment system analyst. 
            Analyze the following anomalies and provide a very concise report/summary.

            Focus on:
            1. Most critical and recent issues
            2. Patterns or trends in the anomalies

            Anomaly Data:
            {anomaly_text}

            Provide a concise and structured summary with severity levels and actionable insights.
            Do not Hallucinate or make up data.
        """)
llm_chain = prompt | llm

def generate_summary():
    with open("storage/anomaly_log.json") as f:
        lines = f.readlines()[-10:]
    text = "\n".join(lines)
    # return llm_chain.invoke({"anomaly_text": text})
    formatted_prompt = prompt.format(anomaly_text=text)
    return llm_chain.invoke(input=formatted_prompt)

generated_summary = generate_summary()

In [6]:
print(generated_summary)

**Anomaly Report Summary**

**Most Critical and Recent Issues:**

1. **High Flow Spike (Severity: High)**
	* Timestamp: 2025-05-27T12:57:12.205779Z
	* Sensor ID: wtf-pipe-1
	* Value: 125.4
	* Message: "Flow spike detected: 125.4"
2. **High Flow Spike (Severity: High)**
	* Timestamp: 2025-05-27T12:57:06.202199Z
	* Sensor ID: wtf-pipe-1
	* Value: 127.0
	* Message: "Flow spike detected: 127.0"

**Patterns or Trends in Anomalies:**

1. **Frequency:** The anomaly frequency is relatively high, with multiple spikes observed in a short period (approximately 10 minutes).
2. **Time of Day:** The anomalies occur during the afternoon (12:56-12:57 PM) when water demand is typically higher.
3. **Sensor ID:** All anomalies are detected at sensor wtf-pipe-1, which may indicate an issue with this specific pipe or a broader system issue.

**Actionable Insights:**

1. Investigate and isolate the cause of the high flow spikes to prevent potential overloading of the water treatment system.
2. Review system

In [10]:
import threading
import time
import redis

server = redis.Redis()

def subscriber():
    sub = server.pubsub()
    sub.subscribe('sensor_data')
    print("Subscribed. Waiting for messages...")
    for msg in sub.listen():
        if msg['type'] == 'message':
            print(f"Received: {msg['data'].decode('utf-8')}")

def publisher():
    SENSOR_ID = "wtf-pipe-1"
    for count in range(5):
        reading = f"Testing {SENSOR_ID}: Count - {count}"
        server.publish('sensor_data', reading)
        print(f"Published: {reading}")
        time.sleep(1)

# Start subscriber in a separate thread
threading.Thread(target=subscriber, daemon=True).start()

# Wait a bit and then start publishing
time.sleep(2)
publisher()


Subscribed. Waiting for messages...
Published: Testing wtf-pipe-1: Count - 0
Received: Testing wtf-pipe-1: Count - 0
Published: Testing wtf-pipe-1: Count - 1
Received: Testing wtf-pipe-1: Count - 1
Published: Testing wtf-pipe-1: Count - 2
Received: Testing wtf-pipe-1: Count - 2
Published: Testing wtf-pipe-1: Count - 3Received: Testing wtf-pipe-1: Count - 3

Received: Testing wtf-pipe-1: Count - 4Published: Testing wtf-pipe-1: Count - 4

