## **MQTT Subscriber**

In [7]:
!pip install -r requirements.txt

In [2]:
import paho.mqtt.client as mqtt
import redis
import time
import uuid
import json

TOPIC = 's295406'
client = mqtt.Client()

REDIS_CONNECTION_ARGS = {
    'host': 'redis-14517.c85.us-east-1-2.ec2.cloud.redislabs.com',
    'port': '14517',
    'user': 'default',
    'password': 'jEeozJnipzeEjfRNADfZmOkxUAJcg7x4',
}

`Redis Client` definition:

In [3]:
class RedisClient():

    def __init__(self, redis_client = None):
        self.redis_client = redis_client
        self.timeseries_list = []

    def create_redis_connection(self, args):

        print('Creating connection to Redis ...')

        redis_cl = redis.Redis(host=args['host'], port=args['port'], username=args['user'], password=args['password'])
        is_connected = redis_cl.ping()

        print('Redis Connected: ', is_connected)

        self.redis_client = redis_cl

    def get_ts(self):
        return self.timeseries_list

    def get_client(self):
        return self.redis_client
    
    def create_ts(self, ts_name, retention_period=0, chnk_size=128):

        if(ts_name not in self.timeseries_list):
            self.timeseries_list.append(ts_name)

        try:
            print('Creating timeseries %s' % (ts_name))

            if(self.redis_client.exists(ts_name) == 0):
                self.redis_client.ts().create(
                    ts_name, 
                    retention_msecs=retention_period,
                    uncompressed=False,
                    chunk_size=chnk_size
                )
                print(f'Timeseries {ts_name} created!')

            else:
                print('WARNING: Time series %s already exists.' % (ts_name))

        except redis.ResponseError:
            print(f'An error occured while creating the timeseries {ts_name}.')
            pass

In [4]:
redis_client = RedisClient()
redis_client.create_redis_connection(REDIS_CONNECTION_ARGS)
mac_address_list = []

Creating connection to Redis ...
Redis Connected:  True


`Callback` definition and setting:

In [5]:
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
    conn_res = {0:'SUCCESSFUL',1:'FAILED'}
    print(f"Connection {conn_res[rc]} with result code "+str(rc))
    sub_res = client.subscribe(TOPIC)

# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
    message = msg.payload.decode()
    topic = msg.topic
    print(f"Received message '{message}' on topic {topic}")

    result = json.loads(message)
    mac_address = result['mac_address']
    if mac_address not in mac_address_list:
        print()
        redis_client.create_ts(str(mac_address + ':battery'))
        redis_client.create_ts(str(mac_address + ':power'))
        print()
        mac_address_list.append(mac_address)
    
    redis_client.get_client().ts().add(str(mac_address + ':battery'), result['timestamp'], result['battery_level'])
    redis_client.get_client().ts().add(str(mac_address + ':power'), result['timestamp'], int(result['power_plugged']))

    print("-------- Time Series stored! --------")
    print(redis_client.get_client().ts().get(redis_client.get_ts()[0]))
    print(redis_client.get_client().ts().get(redis_client.get_ts()[1]))
    print()


client.on_connect = on_connect
client.on_message = on_message

Client connection and Data Receiver

In [6]:
client.connect('mqtt.eclipseprojects.io', 1883, 60)
client.loop_forever()

Connection SUCCESSFUL with result code 0
Received message '{
    "mac_address": "0x2ea63389558f",
    "timestamp": 1674150462368,
    "battery_level": 36,
    "power_plugged": false
}' on topic s295406

Creating timeseries 0x2ea63389558f:battery
Creating timeseries 0x2ea63389558f:power

-------- Time Series stored! --------
(1674150462368, 36.0)
(1674150462368, 0.0)

Received message '{
    "mac_address": "0x2ea63389558f",
    "timestamp": 1674150463374,
    "battery_level": 36,
    "power_plugged": false
}' on topic s295406
-------- Time Series stored! --------
(1674150463374, 36.0)
(1674150463374, 0.0)

Received message '{
    "mac_address": "0x2ea63389558f",
    "timestamp": 1674150464376,
    "battery_level": 36,
    "power_plugged": false
}' on topic s295406
-------- Time Series stored! --------
(1674150464376, 36.0)
(1674150464376, 0.0)

Received message '{
    "mac_address": "0x2ea63389558f",
    "timestamp": 1674150465380,
    "battery_level": 36,
    "power_plugged": false
}' 

KeyboardInterrupt: 

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=28d88834-5f02-41a4-b15c-b006e84e3419' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>