<a href="https://colab.research.google.com/github/Method-for-Software-System-Development/Cloud_Computing/blob/develop/logic/mqqt_sim_outdoor.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Sensor Data Streaming Module- Code Overview**

This module provides an abstraction for streaming live or simulated sensor data to the OptiLine dashboard (or other applications). It supports both **real-time MQTT-based data acquisition** and **simulation mode** for testing and development.

---

## Code Structure and Functionality

- **MQTT Configuration:**  
  - Defines the MQTT broker address, topic, and port for subscribing to live sensor data.

- **Thread-Safe Data Queue:**  
  - Uses a `queue.Queue()` object to safely buffer incoming MQTT messages between network callbacks and the main data consumer.

- **Simulation Mode:**  
  - `_simulate_data_stream(delay_seconds=1)`:  
    An infinite generator that yields randomly generated sensor data (Temperature, Humidity, Pressure) at a specified interval. Useful for development or when live data is unavailable.

- **MQTT Mode:**  
  - `_on_mqtt_message`:  
    Callback function for the MQTT client. Decodes incoming JSON payloads and places them in the data queue.
  - `_start_mqtt_listener`:  
    Initializes the MQTT client, subscribes to the specified topic, and runs the listener in a background thread.
  - `_mqtt_data_stream`:  
    Generator that yields sensor data as received via MQTT, blocking on `data_queue.get()` until new data arrives.

- **Unified Data Stream Interface:**  
  - `get_live_data_stream(mode="mqtt")`:  
    Returns a generator according to the selected mode.  
    - `"mqtt"`: Yields real-time data from the MQTT broker.
    - `"simulation"`: Yields synthetic/random data for testing.
    - Raises an exception if an invalid mode is specified.

---

## Extending or Integrating

- To use in a dashboard, call `get_live_data_stream(mode=...)` and iterate over the generator to retrieve up-to-date sensor values.
- The module can be easily extended to support additional data sources, more complex simulation logic, or multiple sensor types/topics.

In [None]:
import paho.mqtt.client as mqtt
import json
import random
import time
import queue
import threading

# MQTT Configuration
MQTT_BROKER = "test.mosquitto.org"
MQTT_TOPIC = "braude/D106/outdoor"
MQTT_PORT = 1883

# Thread-safe queue for storing messages (for MQTT mode)
data_queue = queue.Queue()

def _simulate_data_stream(delay_seconds=1):
    """
    Generator that yields simulated sensor data indefinitely.

    Args:
        delay_seconds (int): Delay between data points in seconds.
    Yields:
        dict: Simulated sensor data.
    """
    while True:
        simulated_data = {
            "Temperature": round(random.uniform(22, 28), 2),
            "Humidity": round(random.uniform(40, 70), 2),
            "Dlight": round(random.uniform(10000, 60000), 2) # daylight
        }
        yield simulated_data
        time.sleep(delay_seconds)

def _on_mqtt_message(client, userdata, msg):
    """
    Callback for incoming MQTT messages.

    Parses the payload and adds the result to the data queue.
    """
    try:
        data = json.loads(msg.payload.decode())
        data_queue.put(data)
    except json.JSONDecodeError:
        pass  # Ignore invalid messages

def _start_mqtt_listener():
    """
    Starts a background thread that listens for MQTT messages.
    """
    client = mqtt.Client()
    client.on_message = _on_mqtt_message
    client.connect(MQTT_BROKER, MQTT_PORT, 60)
    client.subscribe(MQTT_TOPIC)

    thread = threading.Thread(target=client.loop_forever, daemon=True)
    thread.start()

def _mqtt_data_stream():
    """
    Generator that yields sensor data received via MQTT.

    Yields:
        dict: Real sensor data as received from the MQTT broker.
    """
    _start_mqtt_listener()
    while True:
        data = data_queue.get()  # Waits until data is available
        yield data

def get_live_data_stream(mode="mqtt"):
    """
    Returns a generator that yields sensor data indefinitely.

    Args:
        mode (str): Either 'simulation' or 'mqtt'.

    Returns:
        generator: A generator yielding sensor data dictionaries.
    """
    if mode == "simulation":
        return _simulate_data_stream()
    elif mode == "mqtt":
        return _mqtt_data_stream()
    else:
        raise ValueError("Invalid mode. Use 'simulation' or 'mqtt'.")