Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

272 mqtt v2 does not work #273

Merged
merged 10 commits into from
Jun 21, 2024
5 changes: 3 additions & 2 deletions examples/ngsi_v2/e04_ngsi_v2_context_subscriptions_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
"type": "Integer"}
}
room_entity = ContextEntity(**room_001)
cb_client.post_entity(entity=room_entity)
cb_client.post_entity(entity=room_entity, update=True)


# # 2 Setup a subscription and MQTT notifications
Expand Down Expand Up @@ -132,7 +132,7 @@ def on_message(client, userdata, msg):
logger.info("Received this message:\n" + message.json(indent=2))


def on_disconnect(client, userdata, reasonCode, properties=None):
def on_disconnect(client, userdata, flags, reasonCode, properties=None):
logger.info("MQTT client disconnected with reasonCode "
+ str(reasonCode))

Expand All @@ -141,6 +141,7 @@ def on_disconnect(client, userdata, reasonCode, properties=None):

mqtt_client = mqtt.Client(userdata=None,
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
transport="tcp")
# add callbacks to the mqtt-client
mqtt_client.on_connect = on_connect
Expand Down
3 changes: 2 additions & 1 deletion examples/ngsi_v2/e08_ngsi_v2_iota_paho_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,13 @@ def on_message(client, userdata, msg):
f"/{device.device_id}/cmdexe",
payload=json.dumps(res))

def on_disconnect(client, userdata, reasonCode, properties):
def on_disconnect(client, userdata, flags, reasonCode, properties):
logger.info("MQTT client disconnected" + str(reasonCode))

mqtt_client = mqtt.Client(client_id="filip-iot-example",
userdata=None,
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
transport="tcp")
# add our callbacks to the client
mqtt_client.on_connect = on_connect
Expand Down
6 changes: 3 additions & 3 deletions examples/ngsi_v2/e09_ngsi_v2_iota_filip_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,16 @@
#
mqttc = IoTAMQTTClient()

def on_connect(mqttc, obj, flags, rc):
def on_connect(mqttc, obj, flags, rc, properties=None):
mqttc.logger.info("rc: " + str(rc))

def on_connect_fail(mqttc, obj):
mqttc.logger.info("Connect failed")

def on_publish(mqttc, obj, mid):
def on_publish(mqttc, obj, mid, rc, properties=None):
mqttc.logger.info("mid: " + str(mid))

def on_subscribe(mqttc, obj, mid, granted_qos):
def on_subscribe(mqttc, obj, mid, granted_qos, properties=None):
mqttc.logger.info("Subscribed: " + str(mid)
+ " " + str(granted_qos))

Expand Down
3 changes: 3 additions & 0 deletions filip/clients/mqtt/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import warnings
from datetime import datetime
from typing import Any, Callable, Dict, List, Tuple, Union

import paho.mqtt.client as mqtt

from filip.clients.mqtt.encoder import BaseEncoder, Json, Ultralight
Expand Down Expand Up @@ -126,6 +127,7 @@ def __init__(self,
userdata=None,
protocol=mqtt.MQTTv311,
transport="tcp",
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
devices: List[Device] = None,
service_groups: List[ServiceGroup] = None,
custom_encoder: Dict[str, BaseEncoder] = None):
Expand Down Expand Up @@ -188,6 +190,7 @@ def __init__(self,
clean_session=clean_session,
userdata=userdata,
protocol=protocol,
callback_api_version=callback_api_version,
transport=transport)

# setup logging functionality
Expand Down
8 changes: 1 addition & 7 deletions filip/clients/ngsi_v2/cb.py
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,7 @@ def update_existing_entity_attributes(
The entity attributes are updated with the ones in the payload.
In addition to that, if one or more attributes in the payload doesn't
exist in the entity, an error is returned. This corresponds to a
'PATcH' request.
'PATCH' request.

Args:
entity_id: Entity id to be updated
Expand Down Expand Up @@ -2140,12 +2140,6 @@ def _value_is_not_none(value):
continue
else:
return False
if not _value_is_not_none(v) or not _value_is_not_none(ex_value):
warnings.warn(
"Different field found:{"
f"{k}: ({v}, {ex_value})"
"}"
)
if v != ex_value:
self.logger.debug(f"Not equal fields for key {k}: ({v}, {ex_value})")
if not _value_is_not_none(v) and not _value_is_not_none(ex_value) or k == "timesSent":
Expand Down
1 change: 1 addition & 0 deletions filip/clients/ngsi_v2/iota.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ def post_devices(self, *, devices: Union[Device, List[Device]],
devices = [devices]
url = urljoin(self.base_url, 'iot/devices')
headers = self.headers

data = {"devices": [json.loads(device.model_dump_json(exclude_none=True)
) for device in devices]}
try:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ python-Levenshtein~=0.23.0
python-dateutil~=2.8.2
wget~=3.2
stringcase~=1.2.0
paho-mqtt~=1.6.1
paho-mqtt~=2.0.0
datamodel_code_generator[http]~=0.25.0
pyjexl~=0.3.0
# tutorials
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@

INSTALL_REQUIRES = ['aenum~=3.1.15',
'datamodel_code_generator[http]~=0.25.0',
'paho-mqtt~=1.6.1',
'paho-mqtt~=2.0.0',
'pandas_datapackage_reader~=0.18.0',
'pydantic~=2.5.2',
'pydantic-settings~=2.0.0',
'geojson_pydantic~=1.0.2',
'stringcase>=1.2.0',
'rdflib~=6.0.0',
'regex~=2023.10.3',
Expand Down
6 changes: 3 additions & 3 deletions tests/clients/test_mqtt_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,16 @@ def setUp(self) -> None:

self.mqttc = IoTAMQTTClient()

def on_connect(mqttc, obj, flags, rc):
def on_connect(mqttc, obj, flags, rc,properties):
mqttc.logger.info("rc: " + str(rc))

def on_connect_fail(mqttc, obj):
mqttc.logger.info("Connect failed")

def on_publish(mqttc, obj, mid):
def on_publish(mqttc, obj, mid,rc,properties):
mqttc.logger.info("mid: " + str(mid))

def on_subscribe(mqttc, obj, mid, granted_qos):
def on_subscribe(mqttc, obj, mid, granted_qos,properties):
mqttc.logger.info("Subscribed: " + str(mid)
+ " " + str(granted_qos))

Expand Down
9 changes: 6 additions & 3 deletions tests/clients/test_ngsi_v2_cb.py
Original file line number Diff line number Diff line change
Expand Up @@ -708,13 +708,14 @@ def on_message(client, userdata, msg):
nonlocal sub_message
sub_message = Message.model_validate_json(msg.payload)

def on_disconnect(client, userdata, reasonCode, properties=None):
def on_disconnect(client, userdata, flags, reasonCode, properties=None):
logger.info("MQTT client disconnected with reasonCode "
+ str(reasonCode))

import paho.mqtt.client as mqtt
mqtt_client = mqtt.Client(userdata=None,
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
transport="tcp")
# add our callbacks to the client
mqtt_client.on_connect = on_connect
Expand Down Expand Up @@ -933,13 +934,14 @@ def on_message(client, userdata, msg):
sub_message = Message.model_validate_json(msg.payload)
sub_messages[sub_message.subscriptionId] = sub_message

def on_disconnect(client, userdata, reasonCode, properties=None):
def on_disconnect(client, userdata, flags, reasonCode, properties=None):
logger.info("MQTT client disconnected with reasonCode "
+ str(reasonCode))

import paho.mqtt.client as mqtt
mqtt_client = mqtt.Client(userdata=None,
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
transport="tcp")
# add our callbacks to the client
mqtt_client.on_connect = on_connect
Expand Down Expand Up @@ -1324,12 +1326,13 @@ def on_message(client, userdata, msg):
f"/{device.device_id}/cmdexe",
payload=json.dumps(res))

def on_disconnect(client, userdata, reasonCode, properties=None):
def on_disconnect(client, userdata, flags, reasonCode, properties=None):
pass

mqtt_client = mqtt.Client(client_id="filip-test",
userdata=None,
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
transport="tcp")

# add our callbacks to the client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@
history_weather_station = []

# ToDo: Create an MQTTv5 client with paho-mqtt.
mqttc = mqtt.Client(protocol=mqtt.MQTTv5)
mqttc = mqtt.Client(protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
# set user data if required
mqttc.username_pw_set(username=MQTT_USER, password=MQTT_PW)

Expand Down