Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
Signed-off-by: André Srinivasan <andre.srinivasan@gmail.com>
  • Loading branch information
andresrinivasan committed Apr 23, 2024
1 parent 8a5aae8 commit aad8443
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 1 deletion.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.venv
mqtt-hello-world.yaml
exercise.yaml
16 changes: 16 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Current File",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
"justMyCode": false,
}
]
}
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
# cumulocity-first-device-agent
# Cumulocity First Device Agent

74 changes: 74 additions & 0 deletions exercise.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#!/usr/bin/env python3

import random
import time
import paho.mqtt.client as mqtt
import yaml

with open("exercise.yaml", "r") as f:
config = yaml.safe_load(f)

serverUrl = config["serverUrl"]
clientId = config["clientId"]
device_name = config["device_name"]
tenant = config["tenant"]
username = config["username"]
password = config["password"]


def on_publish(client, userdata, mid, reason_code, properties):
print(f"Published message: {mid}")


def publish():
temperature = random.randint(10, 20)
mqttc.publish("s/us", f"211,{temperature}")
return


def on_message(client, userdata, message):
payload = message.payload.decode("utf-8")

c8yType = ""
if payload.startswith("511"):
print(f"Received shell command: {payload}")
c8yType = "c8y_Command"
elif payload.startswith("513"):
print(f"Received config change: {payload}")
c8yType = "c8y_Configuration"
else:
print(f"Received unrecognized payload: {payload}")
exit(1)

client.publish("s/us", f"501,{c8yType}", qos=1).wait_for_publish(1)
client.publish("s/us", f"503,{c8yType},Success")


mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, clientId)
mqttc.username_pw_set(tenant + "/" + username, password)
mqttc.on_publish = on_publish
mqttc.on_message = on_message

mqttc.connect(serverUrl)
mqttc.loop_start()

mqttc.publish("s/us", f"100,{device_name},c8y_MQTTDevice", qos=1).wait_for_publish(1)
print("Device registered.")

mqttc.publish("s/us", "114,c8y_Command,c8y_Configuration")
print("Command and configuration enabled.")

mqttc.subscribe("s/ds")
print("Subscribed.")

try:
while True:
publish()
time.sleep(1)
except (KeyboardInterrupt, SystemExit):
pass

mqttc.disconnect()
mqttc.loop_stop()

exit(0)
100 changes: 100 additions & 0 deletions mqtt-hello-world.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#!/usr/bin/env python3

## From https://cumulocity.com/guides/device-integration/mqtt-examples/#hello-mqtt-python

import paho.mqtt.client as mqtt
import time, random, threading
import multiprocessing as mp
import yaml

with open("mqtt-hello-world.yaml", "r") as f:
config = yaml.safe_load(f)

# client, user and device details
serverUrl = config["serverUrl"]
clientId = config["clientId"]
device_name = config["device_name"]
tenant = config["tenant"]
username = config["username"]
password = config["password"]

# task queue to overcome issue with paho when using multiple threads:
# https://github.com/eclipse/paho.mqtt.python/issues/354
task_queue = mp.Queue()


# display all incoming messages
def on_message(client, userdata, message):
payload = message.payload.decode("utf-8")
print(" < received message " + payload)
if payload.startswith("510"):
task_queue.put(perform_restart)


# simulate restart
def perform_restart():
print("Simulating device restart...")
publish("s/us", "501,c8y_Restart", wait_for_ack=True)

print("...restarting...")
time.sleep(1)

publish("s/us", "503,c8y_Restart", wait_for_ack=True)
print("...restart completed")


# send temperature measurement
def send_measurement():
print("Sending temperature measurement...")
temperature = random.randint(10, 20)
publish("s/us", "211,{}".format(temperature))


# publish a message
def publish(topic, message, wait_for_ack=False):
QoS = 2 if wait_for_ack else 0
message_info = client.publish(topic, message, QoS)
if wait_for_ack:
print(" > awaiting ACK for {}".format(message_info.mid))
message_info.wait_for_publish()
print(" < received ACK for {}".format(message_info.mid))


# display all outgoing messages
def on_publish(client, userdata, mid, reason_code, properties):
print(" > published message: {}".format(mid))


# main device loop
def device_loop():
while True:
task_queue.put(send_measurement)
time.sleep(7)


# connect the client to Cumulocity IoT and register a device
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, clientId)
client.username_pw_set(tenant + "/" + username, password)
client.on_message = on_message
client.on_publish = on_publish

client.connect(serverUrl)
client.loop_start()

client.publish("s/us", "100," + device_name + ",c8y_MQTTDevice")
print("Device created")

client.subscribe("s/ds")

device_loop_thread = threading.Thread(target=device_loop)
device_loop_thread.daemon = True
device_loop_thread.start()

# process all tasks on queue
try:
while True:
task = task_queue.get()
task()
except (KeyboardInterrupt, SystemExit):
print("Received keyboard interrupt, quitting ...")
exit(0)

0 comments on commit aad8443

Please sign in to comment.