# PZEM MQTT Data Simulator

This notebook simulates PZEM-004T electrical measurements and publishes the data via MQTT for real-time monitoring. The simulated parameters include:
- Voltage (V)
- Current (A) 
- Frequency (Hz)
- Power Factor
- Power (W)
- Energy (kWh)

## 1. Import Required Libraries

In [None]:
!pip install paho-mqtt numpy

: 

In [None]:
import paho.mqtt.client as mqtt
import json
import time
import random
import threading
from datetime import datetime
from IPython.display import display, HTML, clear_output
import numpy as np

## 2. Configure MQTT Settings

In [None]:
# MQTT Configuration
MQTT_BROKER = "test.mosquitto.org"  # Public MQTT broker for testing
MQTT_PORT = 1883
MQTT_KEEPALIVE = 60

# MQTT Topics
TOPIC_PREFIX = "pzem"
TOPIC_VOLTAGE =  TOPIC_PREFIX + "/voltage"
TOPIC_CURRENT =  TOPIC_PREFIX + "/current"
TOPIC_FREQUENCY =  TOPIC_PREFIX + "/frequency"
TOPIC_POWER_FACTOR =  TOPIC_PREFIX + "/power_factor"
TOPIC_POWER =  TOPIC_PREFIX + "/power"
TOPIC_ENERGY = TOPIC_PREFIX + "/energy"
TOPIC_ALL_DATA = TOPIC_PREFIX + "/all_data"

# Simulation parameters
UPDATE_INTERVAL = 2  # seconds

print("MQTT Configuration initialized")
print(f"Broker: {MQTT_BROKER}:{MQTT_PORT}")
print(f"Update interval: {UPDATE_INTERVAL} seconds")

## 3. Create PZEM Data Simulation Functions

In [None]:
class PZEMSimulator:
    def __init__(self):
        self.base_voltage = 220.0  # Base voltage in V
        self.base_current = 5.0    # Base current in A
        self.base_frequency = 50.0 # Base frequency in Hz
        self.energy_accumulator = 0.0  # Accumulated energy in kWh
        self.last_time = time.time()
        
    def get_voltage(self):
        """Simulate voltage with ±10% variation around 220V"""
        return round(self.base_voltage + random.uniform(-22, 22), 2)
    
    def get_current(self):
        """Simulate current with realistic variations (0-30A)"""
        # Add some noise and periodic variation
        base = self.base_current + 2 * np.sin(time.time() * 0.1)
        current = max(0, base + random.uniform(-1, 3))
        return round(current, 2)
    
    def get_frequency(self):
        """Simulate frequency with ±2% variation around 50Hz"""
        return round(self.base_frequency + random.uniform(-1, 1), 2)
    
    def get_power_factor(self):
        """Simulate power factor between 0.8 and 1.0"""
        return round(random.uniform(0.8, 1.0), 3)
    
    def calculate_power(self, voltage, current, power_factor):
        """Calculate apparent power"""
        return round(voltage * current * power_factor, 2)
    
    def update_energy(self, power):
        """Update accumulated energy"""
        current_time = time.time()
        time_diff = (current_time - self.last_time) / 3600  # Convert to hours
        energy_increment = power * time_diff / 1000  # Convert W to kWh
        self.energy_accumulator += energy_increment
        self.last_time = current_time
        return round(self.energy_accumulator, 4)
    
    def get_all_measurements(self):
        """Get complete PZEM measurements"""
        voltage = self.get_voltage()
        current = self.get_current()
        frequency = self.get_frequency()
        power_factor = self.get_power_factor()
        power = self.calculate_power(voltage, current, power_factor)
        energy = self.update_energy(power)
        
        return {
            'timestamp': datetime.now().isoformat(),
            'voltage': voltage,
            'current': current,
            'frequency': frequency,
            'power_factor': power_factor,
            'power': power,
            'energy': energy
        }

# Initialize simulator
pzem = PZEMSimulator()
print("PZEM Simulator initialized successfully!")

# Test simulation
test_data = pzem.get_all_measurements()
print("Sample data:")
for key, value in test_data.items():
    print(f"  {key}: {value}")

## 4. Set Up MQTT Publisher

In [None]:
class MQTTPublisher:
    def __init__(self, broker_host, port, keepalive=60):
        self.client = mqtt.Client()
        self.broker_host = broker_host
        self.port = port
        self.keepalive = keepalive
        self.is_connected = False
        
        # Set up callbacks
        self.client.on_connect = self.on_connect
        self.client.on_disconnect = self.on_disconnect
        self.client.on_publish = self.on_publish
        
    def on_connect(self, client, userdata, flags, rc):
        if rc == 0:
            self.is_connected = True
            print(f"Connected to MQTT broker at {self.broker_host}:{self.port}")
        else:
            print(f"Failed to connect, return code {rc}")
    
    def on_disconnect(self, client, userdata, rc):
        self.is_connected = False
        print("Disconnected from MQTT broker")
    
    def on_publish(self, client, userdata, mid):
        pass  # Could add publish confirmation here
    
    def connect(self):
        try:
            self.client.connect(self.broker_host, self.port, self.keepalive)
            self.client.loop_start()
            return True
        except Exception as e:
            print(f"Connection error: {e}")
            return False
    
    def disconnect(self):
        if self.is_connected:
            self.client.loop_stop()
            self.client.disconnect()
    
    def publish_data(self, topic, data):
        if self.is_connected:
            try:
                json_data = json.dumps(data)
                result = self.client.publish(topic, json_data)
                return result.rc == mqtt.MQTT_ERR_SUCCESS
            except Exception as e:
                print(f"Publish error: {e}")
                return False
        else:
            print("Not connected to MQTT broker")
            return False

# Initialize MQTT publisher
mqtt_publisher = MQTTPublisher(MQTT_BROKER, MQTT_PORT)
print("MQTT Publisher initialized")

## 5. Create Data Publishing Loop

In [None]:
# Global variable to control the publishing loop
is_publishing = False
publish_thread = None

def publish_pzem_data():
    """Continuous data publishing function"""
    global is_publishing
    
    while is_publishing:
        try:
            # Get measurements from simulator
            measurements = pzem.get_all_measurements()
            
            # Publish to individual topics
            mqtt_publisher.publish_data(TOPIC_VOLTAGE, {'voltage': measurements['voltage']})
            mqtt_publisher.publish_data(TOPIC_CURRENT, {'current': measurements['current']})
            mqtt_publisher.publish_data(TOPIC_FREQUENCY, {'frequency': measurements['frequency']})
            mqtt_publisher.publish_data(TOPIC_POWER_FACTOR, {'power_factor': measurements['power_factor']})
            mqtt_publisher.publish_data(TOPIC_POWER, {'power': measurements['power']})
            mqtt_publisher.publish_data(TOPIC_ENERGY, {'energy': measurements['energy']})
            
            # Publish all data to combined topic
            mqtt_publisher.publish_data(TOPIC_ALL_DATA, measurements)
            
            # Display current data
            clear_output(wait=True)
            print("=== PZEM MQTT Data Publisher ===")
            print(f"Status: {'Publishing' if is_publishing else 'Stopped'}")
            print(f"MQTT Connected: {mqtt_publisher.is_connected}")
            print(f"Timestamp: {measurements['timestamp']}")
            print(f"Voltage: {measurements['voltage']} V")
            print(f"Current: {measurements['current']} A")
            print(f"Frequency: {measurements['frequency']} Hz")
            print(f"Power Factor: {measurements['power_factor']}")
            print(f"Power: {measurements['power']} W")
            print(f"Energy: {measurements['energy']} kWh")
            print("\nPress 'Stop Publishing' button to stop...")
            
            # Wait for next update
            time.sleep(UPDATE_INTERVAL)
            
        except Exception as e:
            print(f"Error in publishing loop: {e}")
            time.sleep(1)

def start_publishing():
    """Start the data publishing"""
    global is_publishing, publish_thread
    
    if not is_publishing:
        # Connect to MQTT broker
        if mqtt_publisher.connect():
            # Wait a moment for connection to establish
            time.sleep(1)
            
            is_publishing = True
            publish_thread = threading.Thread(target=publish_pzem_data)
            publish_thread.daemon = True
            publish_thread.start()
            print("Data publishing started!")
        else:
            print("Failed to connect to MQTT broker. Please check your broker settings.")

def stop_publishing():
    """Stop the data publishing"""
    global is_publishing
    
    is_publishing = False
    if publish_thread and publish_thread.is_alive():
        publish_thread.join(timeout=2)
    
    mqtt_publisher.disconnect()
    print("Data publishing stopped!")

print("Publishing functions ready!")
print("Use start_publishing() to begin and stop_publishing() to end.")

## 6. Start/Stop Publishing Controls

In [None]:
# Run this cell to START publishing MQTT data
start_publishing()

In [None]:
# Run this cell to STOP publishing MQTT data
stop_publishing()