Skip to content

Commit

Permalink
Merge pull request #273 from RWTH-EBC/272-mqtt-v2-does-not-work
Browse files Browse the repository at this point in the history
272 mqtt v2 does not work

BREAKING CHANGE: migrate to paho-mqtt 2.0.0
  • Loading branch information
djs0109 committed Jun 21, 2024
2 parents 0e4290c + 3527677 commit d9cad8c
Show file tree
Hide file tree
Showing 12 changed files with 29 additions and 23 deletions.
5 changes: 3 additions & 2 deletions examples/ngsi_v2/e04_ngsi_v2_context_subscriptions_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
"type": "Integer"}
}
room_entity = ContextEntity(**room_001)
cb_client.post_entity(entity=room_entity)
cb_client.post_entity(entity=room_entity, update=True)


# # 2 Setup a subscription and MQTT notifications
Expand Down Expand Up @@ -132,7 +132,7 @@ def on_message(client, userdata, msg):
logger.info("Received this message:\n" + message.json(indent=2))


def on_disconnect(client, userdata, reasonCode, properties=None):
def on_disconnect(client, userdata, flags, reasonCode, properties=None):
logger.info("MQTT client disconnected with reasonCode "
+ str(reasonCode))

Expand All @@ -141,6 +141,7 @@ def on_disconnect(client, userdata, reasonCode, properties=None):

mqtt_client = mqtt.Client(userdata=None,
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
transport="tcp")
# add callbacks to the mqtt-client
mqtt_client.on_connect = on_connect
Expand Down
3 changes: 2 additions & 1 deletion examples/ngsi_v2/e08_ngsi_v2_iota_paho_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,13 @@ def on_message(client, userdata, msg):
f"/{device.device_id}/cmdexe",
payload=json.dumps(res))

def on_disconnect(client, userdata, reasonCode, properties):
def on_disconnect(client, userdata, flags, reasonCode, properties):
logger.info("MQTT client disconnected" + str(reasonCode))

mqtt_client = mqtt.Client(client_id="filip-iot-example",
userdata=None,
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
transport="tcp")
# add our callbacks to the client
mqtt_client.on_connect = on_connect
Expand Down
6 changes: 3 additions & 3 deletions examples/ngsi_v2/e09_ngsi_v2_iota_filip_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,16 @@
#
mqttc = IoTAMQTTClient()

def on_connect(mqttc, obj, flags, rc):
def on_connect(mqttc, obj, flags, rc, properties=None):
mqttc.logger.info("rc: " + str(rc))

def on_connect_fail(mqttc, obj):
mqttc.logger.info("Connect failed")

def on_publish(mqttc, obj, mid):
def on_publish(mqttc, obj, mid, rc, properties=None):
mqttc.logger.info("mid: " + str(mid))

def on_subscribe(mqttc, obj, mid, granted_qos):
def on_subscribe(mqttc, obj, mid, granted_qos, properties=None):
mqttc.logger.info("Subscribed: " + str(mid)
+ " " + str(granted_qos))

Expand Down
3 changes: 3 additions & 0 deletions filip/clients/mqtt/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import warnings
from datetime import datetime
from typing import Any, Callable, Dict, List, Tuple, Union

import paho.mqtt.client as mqtt

from filip.clients.mqtt.encoder import BaseEncoder, Json, Ultralight
Expand Down Expand Up @@ -126,6 +127,7 @@ def __init__(self,
userdata=None,
protocol=mqtt.MQTTv311,
transport="tcp",
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
devices: List[Device] = None,
service_groups: List[ServiceGroup] = None,
custom_encoder: Dict[str, BaseEncoder] = None):
Expand Down Expand Up @@ -188,6 +190,7 @@ def __init__(self,
clean_session=clean_session,
userdata=userdata,
protocol=protocol,
callback_api_version=callback_api_version,
transport=transport)

# setup logging functionality
Expand Down
8 changes: 1 addition & 7 deletions filip/clients/ngsi_v2/cb.py
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,7 @@ def update_existing_entity_attributes(
The entity attributes are updated with the ones in the payload.
In addition to that, if one or more attributes in the payload doesn't
exist in the entity, an error is returned. This corresponds to a
'PATcH' request.
'PATCH' request.
Args:
entity_id: Entity id to be updated
Expand Down Expand Up @@ -2140,12 +2140,6 @@ def _value_is_not_none(value):
continue
else:
return False
if not _value_is_not_none(v) or not _value_is_not_none(ex_value):
warnings.warn(
"Different field found:{"
f"{k}: ({v}, {ex_value})"
"}"
)
if v != ex_value:
self.logger.debug(f"Not equal fields for key {k}: ({v}, {ex_value})")
if not _value_is_not_none(v) and not _value_is_not_none(ex_value) or k == "timesSent":
Expand Down
1 change: 1 addition & 0 deletions filip/clients/ngsi_v2/iota.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ def post_devices(self, *, devices: Union[Device, List[Device]],
devices = [devices]
url = urljoin(self.base_url, 'iot/devices')
headers = self.headers

data = {"devices": [json.loads(device.model_dump_json(exclude_none=True)
) for device in devices]}
try:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ python-Levenshtein~=0.23.0
python-dateutil~=2.8.2
wget~=3.2
stringcase~=1.2.0
paho-mqtt~=1.6.1
paho-mqtt~=2.0.0
datamodel_code_generator[http]~=0.25.0
pyjexl~=0.3.0
# tutorials
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@

INSTALL_REQUIRES = ['aenum~=3.1.15',
'datamodel_code_generator[http]~=0.25.0',
'paho-mqtt~=1.6.1',
'paho-mqtt~=2.0.0',
'pandas_datapackage_reader~=0.18.0',
'pydantic~=2.5.2',
'pydantic-settings~=2.0.0',
'geojson_pydantic~=1.0.2',
'stringcase>=1.2.0',
'rdflib~=6.0.0',
'regex~=2023.10.3',
Expand Down
6 changes: 3 additions & 3 deletions tests/clients/test_mqtt_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,16 @@ def setUp(self) -> None:

self.mqttc = IoTAMQTTClient()

def on_connect(mqttc, obj, flags, rc):
def on_connect(mqttc, obj, flags, rc,properties):
mqttc.logger.info("rc: " + str(rc))

def on_connect_fail(mqttc, obj):
mqttc.logger.info("Connect failed")

def on_publish(mqttc, obj, mid):
def on_publish(mqttc, obj, mid,rc,properties):
mqttc.logger.info("mid: " + str(mid))

def on_subscribe(mqttc, obj, mid, granted_qos):
def on_subscribe(mqttc, obj, mid, granted_qos,properties):
mqttc.logger.info("Subscribed: " + str(mid)
+ " " + str(granted_qos))

Expand Down
9 changes: 6 additions & 3 deletions tests/clients/test_ngsi_v2_cb.py
Original file line number Diff line number Diff line change
Expand Up @@ -708,13 +708,14 @@ def on_message(client, userdata, msg):
nonlocal sub_message
sub_message = Message.model_validate_json(msg.payload)

def on_disconnect(client, userdata, reasonCode, properties=None):
def on_disconnect(client, userdata, flags, reasonCode, properties=None):
logger.info("MQTT client disconnected with reasonCode "
+ str(reasonCode))

import paho.mqtt.client as mqtt
mqtt_client = mqtt.Client(userdata=None,
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
transport="tcp")
# add our callbacks to the client
mqtt_client.on_connect = on_connect
Expand Down Expand Up @@ -933,13 +934,14 @@ def on_message(client, userdata, msg):
sub_message = Message.model_validate_json(msg.payload)
sub_messages[sub_message.subscriptionId] = sub_message

def on_disconnect(client, userdata, reasonCode, properties=None):
def on_disconnect(client, userdata, flags, reasonCode, properties=None):
logger.info("MQTT client disconnected with reasonCode "
+ str(reasonCode))

import paho.mqtt.client as mqtt
mqtt_client = mqtt.Client(userdata=None,
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
transport="tcp")
# add our callbacks to the client
mqtt_client.on_connect = on_connect
Expand Down Expand Up @@ -1324,12 +1326,13 @@ def on_message(client, userdata, msg):
f"/{device.device_id}/cmdexe",
payload=json.dumps(res))

def on_disconnect(client, userdata, reasonCode, properties=None):
def on_disconnect(client, userdata, flags, reasonCode, properties=None):
pass

mqtt_client = mqtt.Client(client_id="filip-test",
userdata=None,
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
transport="tcp")

# add our callbacks to the client
Expand Down
3 changes: 2 additions & 1 deletion tests/models/test_ngsi_v2_iot.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import List
import warnings
from paho.mqtt import client as mqtt_client
from paho.mqtt.client import CallbackAPIVersion
import pyjexl

from filip.models.base import FiwareHeader
Expand Down Expand Up @@ -163,7 +164,7 @@ def test_expression_language(self):
)
self.iota_client.post_device(device=device1)

mqtt_cl = mqtt_client.Client()
mqtt_cl = mqtt_client.Client(callback_api_version=CallbackAPIVersion.VERSION2)
mqtt_cl.connect(settings.MQTT_BROKER_URL.host, settings.MQTT_BROKER_URL.port)
mqtt_cl.loop_start()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@
history_weather_station = []

# ToDo: Create an MQTTv5 client with paho-mqtt.
mqttc = mqtt.Client(protocol=mqtt.MQTTv5)
mqttc = mqtt.Client(protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
# set user data if required
mqttc.username_pw_set(username=MQTT_USER, password=MQTT_PW)

Expand Down

0 comments on commit d9cad8c

Please sign in to comment.