In [1]:
import paho.mqtt.client as mqtt
from dataclasses import dataclass
import json
import redis
import time
import datetime 

In [2]:
@dataclass
class RedisCredential:
    """
    The RedisCredential is a class used to store users data.
    """
    username: str
    password: str
    host: str
    port: str

TanguyRedis = RedisCredential(
    "default",
    "2llMg9E9AkqLcDdx6HDpLQ7nzkQKgtCC",
    "redis-17061.c300.eu-central-1-1.ec2.redns.redis-cloud.com",
    17061
)

In [3]:

class RedisWriter:
    """
    The RedisWriter class is used to manage the redis database.
    """

    def __init__(
        self,
        credentials
    ):
        # Create the redis client
        self.redis_client = redis.Redis(
            host=credentials.host,
            port=credentials.port,
            username=credentials.username,
            password=credentials.password
        )

        # Verify the connection
        if self.redis_client.ping():
            print("Succesfully connected to Redis")
        else:
            print("Not connected to Redis.")
        
        
    def create(
        self, 
        ts: str, 
        chunk_size_bytes: int = 4096,
        retention_hour: int = 24,
        compresses: bool = True, 
        delete_if_exists: bool = False):

        # Create the timeseries
        try:
            self.redis_client.ts().create(
                ts,
                retention_msecs=retention_hour*60*60*1000,
                chunk_size=chunk_size_bytes,
                uncompressed = not compresses
            )
        except redis.ResponseError:
            # Ignore error if the timeseries already exists
            pass
        
    
    def add(self, ts, timestamp, value):
        """Add a new value to the time series."""
        self.redis_client.ts().add(ts, timestamp, value)
    
    def get(self, ts):
        return self.redis_client.ts().get(ts)


In [5]:
rdwriter = RedisWriter(TanguyRedis)

In [6]:
client = mqtt.Client()

In [7]:
def on_connect(client, userdata, flags, rc):
    print(f'Connected with result code {str(rc)}')
    # Subscribe to a topic when the client connects
    client.subscribe('s329112/#')

In [8]:
def on_message(client, userdata, msg):
    # Decode the message
    message = msg.payload.decode()
    data = json.loads(message)

    mac_address = data['mac_address']
    timestamp = data["timestamp"]
    temperature = data['temperature']
    humidity = data['humidity']
    temperature_timeseries_name = f'{mac_address}:temperature'
    humidity_timeseries_name = f'{mac_address}:humidity'

    rdwriter.create(temperature_timeseries_name)
    rdwriter.create(humidity_timeseries_name)
    rdwriter.add(temperature_timeseries_name, timestamp, temperature)
    rdwriter.add(humidity_timeseries_name, timestamp, humidity)
    # Print the message and its topic
    print(f"Received message '{message}' on topic {msg.topic}")

In [9]:
client.on_connect = on_connect
client.on_message = on_message

In [10]:
client.connect('mqtt.eclipseprojects.io', 1883)

# Start the client loop to process network events and call the callbacks
client.loop_forever()

Connected with result code 0
Succesfully connected to Redis
Succesfully connected to Redis
Received message '{"mac_address": "0xe45f01d8f467", "timestamp": 1736408933822, "temperature": 4, "humidity": 3}' on topic s329112
Succesfully connected to Redis
Succesfully connected to Redis
Received message '{"mac_address": "0xe45f01d8f467", "timestamp": 1736408936075, "temperature": 10, "humidity": 7}' on topic s329112
Succesfully connected to Redis
Succesfully connected to Redis
Received message '{"mac_address": "0xe45f01d8f467", "timestamp": 1736408938328, "temperature": 8, "humidity": 0}' on topic s329112
Succesfully connected to Redis
Succesfully connected to Redis
Received message '{"mac_address": "0xe45f01d8f467", "timestamp": 1736408940581, "temperature": 4, "humidity": 2}' on topic s329112
Succesfully connected to Redis
Succesfully connected to Redis
Received message '{"mac_address": "0xe45f01d8f467", "timestamp": 1736408942834, "temperature": 8, "humidity": 1}' on topic s329112
Succe

KeyboardInterrupt: 

In [15]:
last_timestamp_ms, last_value = rdwriter.get('0xe45f01d8f467:humidity')
print(last_timestamp_ms, last_value)
last_timestamp_ms, last_value = rdwriter.get('0xe45f01d8f467:temperature')
print(last_timestamp_ms, last_value)

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=b6b1fe17-4020-4f88-867f-48004baa1058' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>