In [None]:
import ssl
import paho.mqtt.client as mqtt
import json
import os 
import psycopg2
import psycopg2.extras
from datetime import datetime

# MQTT Broker Configuration (Local Mosquitto)
BROKER = os.getenv("MQTT_BROKER", "localhost")
PORT = int(os.getenv("MQTT_PORT", 1883))
USERNAME = os.getenv("MQTT_USER", "")
PASSWORD = os.getenv("MQTT_PASS", "")
TOPIC = os.getenv("MQTT_TOPIC", "#")

# PostgreSQL Configuration
DB_CONFIG = {
    "host": os.getenv("DB_HOST", "localhost"),
    "port": int(os.getenv("DB_PORT", 5432)),
    "database": os.getenv("DB_NAME", "sensordata"),
    "user": os.getenv("DB_USER", "user"),
    "password": os.getenv("DB_PASSWORD", "password")
}

def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("‚úÖ Connected to MQTT Broker successfully")
        client.subscribe(TOPIC)
        print(f"üì° Subscribed to topic: {TOPIC}")
    else:
        print(f"‚ùå Connection failed with code {rc}")

def on_disconnect(client, userdata, rc):
    if rc != 0:
        print(f"‚ö†Ô∏è Unexpected disconnection: {rc}")
    else:
        print("üëã Disconnected from MQTT Broker")

def on_message(client, userdata, msg):
    print(f"üì® Topic: {msg.topic}")
    print(f"üì¶ Payload: {msg.payload.decode(errors='ignore')}")
    print("-" * 50)

def on_log(client, userdata, level, buf):
    print(f"üîç LOG: {buf}")


In [None]:
def get_db_connection():
    """Create and return a PostgreSQL connection"""
    try:
        conn = psycopg2.connect(**DB_CONFIG)
        return conn
    except Exception as e:
        print(f"‚ùå Database connection error: {e}")
        raise

def insert_int(topic: str, payload: dict, value: int):
    """Insert integer value into lake_raw_data_int table"""
    try:
        conn = get_db_connection()
        cur = conn.cursor()
        
        cur.execute(
            """
            INSERT INTO lake_raw_data_int (topic, payload, value, timestamp)
            VALUES (%s, %s, %s, %s)
            """,
            (topic, json.dumps(payload), value, datetime.now())
        )
        
        conn.commit()
        print(f"‚úÖ INT inserted: topic={topic}, value={value}")
        cur.close()
        conn.close()
    except Exception as e:
        print(f"‚ùå Error inserting INT: {e}")
        if conn:
            conn.rollback()
            conn.close()

def insert_float(topic: str, payload: dict, value: float):
    """Insert float value into lake_raw_data_float table"""
    try:
        conn = get_db_connection()
        cur = conn.cursor()
        
        cur.execute(
            """
            INSERT INTO lake_raw_data_float (topic, payload, value, timestamp)
            VALUES (%s, %s, %s, %s)
            """,
            (topic, json.dumps(payload), value, datetime.now())
        )
        
        conn.commit()
        print(f"‚úÖ FLOAT inserted: topic={topic}, value={value}")
        cur.close()
        conn.close()
    except Exception as e:
        print(f"‚ùå Error inserting FLOAT: {e}")
        if conn:
            conn.rollback()
            conn.close()

# Example usage
# insert_int("lake/raw/int", {"value": 42}, 42)
# insert_float("lake/raw/float", {"value": 13.37}, 13.37)


In [6]:
isinstance(4,(float))

False

In [None]:
def main():
    """Main function to start MQTT subscriber"""
    print("üöÄ Starting MQTT Subscriber...")
    print(f"üìç Broker: {BROKER}:{PORT}")
    print(f"üë§ Database: {DB_CONFIG['database']}@{DB_CONFIG['host']}")
    print("-" * 50)
    
    client = mqtt.Client(protocol=mqtt.MQTTv311, client_id="iot_subscriber")
    
    # Set credentials (if needed)
    if USERNAME and PASSWORD:
        client.username_pw_set(USERNAME, PASSWORD)
    
    # Set callbacks
    client.on_connect = on_connect
    client.on_disconnect = on_disconnect
    client.on_message = on_message_handler
    client.on_log = on_log
    
    # Connect and start listening
    try:
        print(f"üîó Connecting to {BROKER}:{PORT}...")
        client.connect(BROKER, PORT, keepalive=60)
        print("üîÑ Starting network loop...")
        client.loop_forever()
    except KeyboardInterrupt:
        print("\n‚õî Subscriber interrupted by user")
        client.disconnect()
    except Exception as e:
        print(f"‚ùå Error: {e}")
        client.disconnect()

# Uncomment to run:
# main()


In [None]:
import json

def on_message(client, userdata, msg):
    print(f"Topic: {msg.topic}")

    try:
        # 1Ô∏è‚É£ bytes ‚Üí string
        payload_str = msg.payload.decode("utf-8")

        # 2Ô∏è‚É£ string ‚Üí dict
        payload_json = json.loads(payload_str)

        # 3Ô∏è‚É£ extract value
        value = payload_json["value"]

        print(f"Value: {value} ({type(value).__name__})")

    except (UnicodeDecodeError, json.JSONDecodeError, KeyError) as e:
        print("Invalid payload:", e)
        print("Raw payload:", msg.payload)

    print("-" * 50)

In [None]:
def on_message_handler(client, userdata, msg):
    """
    Process incoming MQTT messages and route them to the correct database table.
    - lake/raw/int ‚Üí lake_raw_data_int
    - lake/raw/float ‚Üí lake_raw_data_float
    """
    try:
        # Decode payload
        payload_str = msg.payload.decode("utf-8")
        payload_json = json.loads(payload_str)
        value = payload_json.get("value")
        
        if value is None:
            print(f"‚ö†Ô∏è No 'value' key in payload: {payload_json}")
            return
        
        # Determine data type and insert
        if msg.topic == "lake/raw/int":
            if isinstance(value, int) or (isinstance(value, float) and value.is_integer()):
                insert_int(msg.topic, payload_json, int(value))
            else:
                print(f"‚ö†Ô∏è Expected INT but got {type(value).__name__}: {value}")
                
        elif msg.topic == "lake/raw/float":
            if isinstance(value, (int, float)):
                insert_float(msg.topic, payload_json, float(value))
            else:
                print(f"‚ö†Ô∏è Expected FLOAT but got {type(value).__name__}: {value}")
        else:
            print(f"üì® Message from {msg.topic}: {value}")
    
    except (UnicodeDecodeError, json.JSONDecodeError, KeyError) as e:
        print(f"‚ùå Error processing message: {e}")
        print(f"   Raw payload: {msg.payload}")
    except Exception as e:
        print(f"‚ùå Unexpected error: {e}")


In [2]:
import json

In [3]:
'{"value":40}'

'{"value":40}'

In [5]:
temp = json.loads('{"value":40}')

In [8]:
valor = temp['value']

In [10]:
isinstance(valor, (int))

True

In [None]:
'{"value": 45.66}'