# MQTT Subscriber

## Import the required module

In [1]:
import paho.mqtt.client as mqtt
import json

### Connection to Redis

In [2]:
import redis

REDIS_HOST = 'redis-11392.c300.eu-central-1-1.ec2.redns.redis-cloud.com'
REDIS_PORT = 11392
REDIS_USERNAME = 'default'
REDIS_PASSWORD = 'nVukRxv3hvJTuYLkK6n4XdWF8etIzoOO'


redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, username=REDIS_USERNAME, password=REDIS_PASSWORD)
is_connected = redis_client.ping()
print('Redis Connected:', is_connected)

Redis Connected: True


## Create a new MQTT client

In [3]:
client = mqtt.Client()

## Define the callback for when the client receives a response from the MQTT broker 

In [4]:
def on_connect(client, userdata, flags, rc):
    print(f'Connected with result code {str(rc)}')
    # Subscribe to a topic when the client connects
    client.subscribe('s333550')

## Define the callback for when a message is published on a subscribed topic

In [5]:
def on_message(client, userdata, msg):
    # Decode the message
    message = msg.payload.decode()

    # Decode JSON string to dict
    my_dict = json.loads(message)

    #parameters of the message
    mac_address = my_dict['mac_address']
    timestamp = int(my_dict['timestamp'])*1000  # Ensure timestamp is an integer and is in milliseconds
    temperature = float(my_dict['temperature'])  # Ensure temperature is numeric
    humidity = float(my_dict['humidity'])  # Ensure humidity is numeric

    """create the time-series"""
    # Set the retention of "temperature" and "humidity" to 365 days
    thirty_day_in_ms = 365 * 24 * 60 * 60 * 1000
    try:
        redis_client.ts().create(f'{mac_address}:temperature', retention_msecs=thirty_day_in_ms, uncompressed=False, chunk_size=128,)
    except redis.ResponseError:
        pass

    try:
        redis_client.ts().create(f'{mac_address}:humidity', retention_msecs=thirty_day_in_ms, uncompressed=False, chunk_size=128,)
    except redis.ResponseError:
        pass


    #Add temperature and humidity data to Redis time series
    redis_client.ts().add(f'{mac_address}:temperature', timestamp, temperature)
    redis_client.ts().add(f'{mac_address}:humidity', timestamp, humidity)

    # Print the message and its topic
    print(f"Received message '{message}' on topic {msg.topic}")


## Set the callbacks to the client object

In [6]:
client.on_connect = on_connect
client.on_message = on_message

## Connect to the MQTT broker and wait for messages

In [7]:
client.connect('mqtt.eclipseprojects.io', 1883)

# Start the client loop to process network events and call the callbacks
client.loop_forever()

Connected with result code 0
Received message '{"mac_address": "0xe45f01e89bcc", "timestamp": 1735467300.3312886, "temperature": 33, "humidity": 6}' on topic s333550
Received message '{"mac_address": "0xe45f01e89bcc", "timestamp": 1735467302.586188, "temperature": 33, "humidity": 5}' on topic s333550
Received message '{"mac_address": "0xe45f01e89bcc", "timestamp": 1735467304.8405037, "temperature": 33, "humidity": 6}' on topic s333550
Received message '{"mac_address": "0xe45f01e89bcc", "timestamp": 1735467307.0948474, "temperature": 33, "humidity": 6}' on topic s333550
Received message '{"mac_address": "0xe45f01e89bcc", "timestamp": 1735467309.3493822, "temperature": 33, "humidity": 6}' on topic s333550
Received message '{"mac_address": "0xe45f01e89bcc", "timestamp": 1735467311.6034698, "temperature": 33, "humidity": 6}' on topic s333550
Received message '{"mac_address": "0xe45f01e89bcc", "timestamp": 1735467313.8578596, "temperature": 33, "humidity": 6}' on topic s333550
Received mess

KeyboardInterrupt: 

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=8a9d9526-dc21-42d6-ba37-8f708634743d' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>