Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
bdaef39
Fix message sending
dastier Oct 21, 2021
c52a9b6
Fix revoking
dastier Oct 21, 2021
e0b69e8
Fix MqttMessagingService
dastier Oct 21, 2021
81bae13
Update examples.txt
dastier Oct 25, 2021
0ad04c9
Fix messaging in examples
dastier Oct 25, 2021
9abf794
Refactor Mqtt client
dastier Oct 25, 2021
c90ba11
Update examples
dastier Oct 25, 2021
f944e84
add test script
dastier Oct 25, 2021
68674df
Update example
dastier Oct 25, 2021
210aba9
Merge remote-tracking branch 'origin/Refactor_Project' into Refactor_…
dastier Oct 25, 2021
005c7ca
remove odd prints
dastier Oct 25, 2021
5a4aed5
Merge remote-tracking branch 'origin/Refactor_Project' into Refactor_…
dastier Oct 25, 2021
2cc7707
Refactor http client
dastier Oct 26, 2021
f6b8fb1
Merge remote-tracking branch 'origin/Refactor_Project' into Refactor_…
dastier Oct 26, 2021
a44be3d
Fix examples
dastier Oct 26, 2021
a1362f7
Add subscription to examples
dastier Oct 26, 2021
f0bc39f
Refactor SubscriptionParameters
dastier Oct 26, 2021
0c47fb8
Fix Http Client
dastier Oct 26, 2021
738db91
Fix revoking test
dastier Oct 27, 2021
6b389f1
Fix outbox
dastier Oct 27, 2021
4c9e6d1
Fix TypeUrl
dastier Oct 27, 2021
0a3afaa
Fix OutboxMessage
dastier Oct 27, 2021
2c9c247
Fix OutboxMessage
dastier Oct 27, 2021
565c69a
Implement SubscriptionItemBuilder, CapabilityBuilder
dastier Oct 27, 2021
26d8971
Refactor encode_header method
dastier Oct 28, 2021
f01f841
Fix CapabilityService
dastier Oct 28, 2021
ddacb85
Refactor CapabilityParameters
dastier Oct 28, 2021
6f62a2b
Refactor Parameters
dastier Oct 28, 2021
c711db3
Remove redundant methods in HttpMessagingService
dastier Oct 28, 2021
ba81a1c
Move AuthorizationResultUrl, AuthorizationToken, AuthorizationResult …
dastier Oct 28, 2021
8c29477
Create helper method let_agrirouter_process_the_message
dastier Oct 28, 2021
4c4927b
Remove redundant CU-classes in onboarding
dastier Oct 28, 2021
4ce5468
Refactor BaseEnvieonment
dastier Oct 28, 2021
819c60c
Refactor Authorization.__init__
dastier Oct 28, 2021
4aac735
Refactor auth
dastier Oct 28, 2021
8005b50
Refactor SoftwareOnboardingResponse
dastier Oct 28, 2021
55ecf8f
Refactor SoftwareOnboardingParameter
dastier Oct 28, 2021
3658d11
Implement tests for
dastier Oct 28, 2021
a11821b
Refactor messaging test
dastier Oct 28, 2021
5707874
Refactor Dtos
dastier Nov 9, 2021
9a4edb4
[WIP] Fix MqttClient
dastier Nov 9, 2021
344f735
Fix SoftwareOnboardingResponse
dastier Nov 11, 2021
dbdeb6f
Fix MqttClient.__init__ method
dastier Nov 11, 2021
4373604
Merge branch 'develop' into Fix_MqttClient
dastier Nov 11, 2021
8558c45
Remove redundant files
dastier Nov 11, 2021
0c6da94
Fix SoftwareOnboardingResponse.json_deserialize method
dastier Nov 11, 2021
869b774
Fix let_agrirouter_process_the_message method
dastier Nov 12, 2021
9915d36
Refactor MqttMessagingService
dastier Nov 12, 2021
972ccdb
Implement __str__ and __repr__ magic methods for dtos
dastier Nov 12, 2021
9067872
Refactor MqttClient
dastier Nov 12, 2021
2d06436
Remove redundant prints
dastier Nov 12, 2021
8de531a
Refactor SoftwareOnboardingResponse
dastier Nov 13, 2021
8e1d24e
Update examp,es script
dastier Nov 13, 2021
65a8c8c
Add example functions for QueryHeaderService to example_script
dastier Nov 13, 2021
368689d
Fix example_script
dastier Nov 13, 2021
88d78ba
Refactor example_script
dastier Nov 15, 2021
1181acb
Fix ConnectionCriteria dto
dastier Nov 15, 2021
ab4f189
Add __str__ and __repr__ methods for SoftwareOnboardingResponse
dastier Nov 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 46 additions & 38 deletions agrirouter/auth/dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,44 +4,6 @@
from agrirouter.messaging.exceptions import WrongFieldError


class AuthorizationResultUrl:
def __init__(self,
*,
state: str = None,
signature: str = None,
token: str = None,
error: str = None
):
self.state = state
self.signature = signature
self.token = token
self.error = error

def get_state(self) -> str:
return self.state

def set_state(self, state: str) -> None:
self.state = state

def get_signature(self) -> str:
return self.signature

def set_signature(self, signature: str) -> None:
self.signature = signature

def get_token(self) -> str:
return self.token

def set_token(self, token: str) -> None:
self.token = token

def get_error(self) -> str:
return self.error

def set_error(self, error: str) -> None:
self.error = error


class AuthorizationToken:
ACCOUNT = 'account'
REGISTRATION_CODE = 'regcode'
Expand Down Expand Up @@ -88,6 +50,52 @@ def set_expires(self, expires: str) -> None:
self.expires = expires


class AuthorizationResultUrl:
def __init__(self,
*,
state: str = None,
signature: str = None,
token: str = None,
decoded_token: AuthorizationToken = None,
error: str = None
):
self.state = state
self.signature = signature
self.token = token
self.decoded_token = decoded_token
self.error = error

def get_state(self) -> str:
return self.state

def set_state(self, state: str) -> None:
self.state = state

def get_signature(self) -> str:
return self.signature

def set_signature(self, signature: str) -> None:
self.signature = signature

def get_token(self) -> str:
return self.token

def set_token(self, token: str) -> None:
self.token = token

def get_error(self) -> str:
return self.error

def set_error(self, error: str) -> None:
self.error = error

def get_decoded_token(self) -> AuthorizationToken:
return self.decoded_token

def set_decoded_token(self, decoded_token: AuthorizationToken) -> None:
self.decoded_token = decoded_token


class AuthorizationResult:
def __init__(self,
*,
Expand Down
20 changes: 10 additions & 10 deletions agrirouter/auth/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from cryptography.exceptions import InvalidSignature

from agrirouter.auth.dto import AuthorizationToken
from agrirouter.auth.dto import AuthorizationToken, AuthorizationResultUrl
from agrirouter.onboarding.signature import verify_signature


Expand Down Expand Up @@ -69,16 +69,16 @@ def decode_token(token: Union[str, bytes]) -> AuthorizationToken:
auth_token.json_deserialize(json.loads(decoded_token))
return auth_token

def get_auth_result(self) -> dict:
if not self.is_successful:
return {self.ERROR_KEY: self.error}
def get_auth_result(self) -> AuthorizationResultUrl:
decoded_token = self.decode_token(self.token)
return {
self.SIGNATURE_KEY: self.signature,
self.STATE_KEY: self.state,
self.TOKEN_KEY: self.token,
self.CRED_KEY: decoded_token
}

return AuthorizationResultUrl(
signature=self.signature,
state=self.state,
token=self.token,
decoded_token=decoded_token,
error=self.error
)

def get_signature(self):
return self.signature
Expand Down
2 changes: 2 additions & 0 deletions agrirouter/messaging/clients/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ASYNC = "ASYNC"
SYNC = "SYNC"
94 changes: 57 additions & 37 deletions agrirouter/messaging/clients/mqtt.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
import time
import ssl
from typing import Any, List, Tuple

from paho.mqtt import client as mqtt_client
import paho.mqtt.client as mqtt_client
from paho.mqtt.client import MQTTv31, MQTTMessageInfo

from agrirouter.messaging.certification import create_certificate_file_from_pen
from agrirouter.messaging.clients.constants import SYNC, ASYNC


class MqttClient:

def __init__(self,
client_id,
onboard_response,
client_id: str,
on_message_callback: callable = None,
userdata: Any = None,
clean_session: bool = True
clean_session: bool = False
):
# TODO: Implement on_message_callback parameter validation:
# must take params as described at https://pypi.org/project/paho-mqtt/#callbacks
Expand All @@ -22,26 +28,54 @@ def __init__(self,
protocol=MQTTv31,
transport="tcp"
)

self.mqtt_client.on_message = on_message_callback if on_message_callback else self._get_on_message_callback()
self.mqtt_client.on_connect = self._get_on_connect_callback()
self.mqtt_client.on_connect = self._get_on_connect_callback(onboard_response)
self.mqtt_client.on_disconnect = self._get_on_disconnect_callback()
self.mqtt_client.on_subscribe = self._get_on_subscribe_callback()
self.mqtt_client.on_unsubscribe = self._get_on_unsubscribe_callback()

certificate_file_path = create_certificate_file_from_pen(onboard_response)
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.load_cert_chain(
certfile=certificate_file_path,
keyfile=certificate_file_path,
password=onboard_response.get_authentication().get_secret(),
)
self.mqtt_client.tls_set_context(context)

self._mode = None

def connect(self, host: str, port: str) -> None:
self.mqtt_client.connect(
host=host,
port=int(port)
)
self.mqtt_client.loop()

self._mode = SYNC

def connect_async(self, host: str, port: str):
self.mqtt_client.connect_async(
host=host,
port=port
port=int(port)
)
self.mqtt_client.loop_start()

self._mode = ASYNC

while self.mqtt_client._state == 0:
time.sleep(1)

def disconnect(self):
self.mqtt_client.loop_stop()
self.mqtt_client.disconnect()

def publish(self, topic, payload, qos=0) -> MQTTMessageInfo:
"""
def receive_outbox_messages(self):
self.mqtt_client.loop()

def publish(self, topic, payload, qos=2) -> MQTTMessageInfo:
"""
:param topic: str representing unique name of the topic that the message should be published on
:param payload: The actual message to send
:param qos: int representing the quality of service level to use. May be [0, 1, 2]
Expand All @@ -52,6 +86,10 @@ def publish(self, topic, payload, qos=0) -> MQTTMessageInfo:
payload=payload,
qos=qos
)
if self._mode == SYNC:
self.mqtt_client.loop()
time.sleep(3)
self.mqtt_client.loop()
return message_info

def subscribe(self, topics: List[Tuple[str, int]]) -> tuple:
Expand All @@ -67,7 +105,7 @@ def subscribe(self, topics: List[Tuple[str, int]]) -> tuple:

:return: tuple
"""
result, mid = self.mqtt_client.subscribe(topics)
result, mid = self.mqtt_client.subscribe(topics, qos=2)
return result, mid

def unsubscribe(self, topics: List[str]) -> tuple:
Expand All @@ -85,61 +123,43 @@ def unsubscribe(self, topics: List[str]) -> tuple:
return result, mid

@staticmethod
def _get_on_connect_callback() -> callable:

def on_connect(client, userdata, flags, rc, properties=None):
print("Connection started")
with open("connection.txt", "w") as file:
file.write("Connection started")
if rc == 0:
file.write("Connected!!")
else:
file.write("Do not Connected!!")
if rc == 0:
print("Connected to MQTT Broker!")
else:
print(f"Failed to connect, return code: {rc}")
def _get_on_connect_callback(onboard_response) -> callable:

return client, userdata, flags, rc, properties
def on_connect(client: mqtt_client.Client, userdata, flags, rc, properties=None):
if rc == 0:
client.subscribe(topic=onboard_response.connection_criteria.commands)
time.sleep(3)

return on_connect

@staticmethod
def _get_on_message_callback() -> callable:

def on_message(client, userdata, msg):
# print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")

return client, userdata, msg

return on_message

@staticmethod
def _get_on_subscribe_callback() -> callable:

def on_subscribe(client, userdata, mid, granted_qos, properties=None):
# print(f"Subscribed {userdata} to `{properties}`")

return client, userdata, mid, granted_qos, properties
def on_subscribe(*args, **kwargs):
return args, kwargs

return on_subscribe

@staticmethod
def _get_on_disconnect_callback() -> callable:

def on_disconnect(client, userdata, rc):
# print(f"Disconnected from from `{properties}`")

return client, userdata, rc
def on_disconnect(*args, **kwargs):
return args, kwargs

return on_disconnect

@staticmethod
def _get_on_unsubscribe_callback() -> callable:

def on_unsubscribe(client, userdata, mid):
# print(f"Unsubscribed `{userdata}` from `{properties}`")

return client, userdata, mid
def on_unsubscribe(*args, **kwargs):
return args, kwargs

return on_unsubscribe
31 changes: 17 additions & 14 deletions agrirouter/messaging/services/commons.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from agrirouter.messaging.request import MessageRequest
from agrirouter.messaging.result import MessagingResult
from agrirouter.onboarding.exceptions import BadMessagingResult
from agrirouter.onboarding.response import SoftwareOnboardingResponse


class AbstractMessagingClient(ABC):
Expand Down Expand Up @@ -50,33 +51,35 @@ def send(self, parameters) -> MessagingResult:
class MqttMessagingService(AbstractMessagingClient):

def __init__(self,
client_id,
onboarding_response,
onboarding_response: SoftwareOnboardingResponse,
on_message_callback: callable = None,
client_async: bool = True
):

self.onboarding_response = onboarding_response
self.client = MqttClient(
client_id=client_id,
onboard_response=onboarding_response,
client_id=onboarding_response.get_connection_criteria().get_client_id(),
on_message_callback=on_message_callback,
)
self.client.connect(
self.onboarding_response.get_connection_criteria().get_host(),
self.onboarding_response.get_connection_criteria().get_port()
)
if client_async:
self.client.connect_async(
self.onboarding_response.get_connection_criteria().get_host(),
self.onboarding_response.get_connection_criteria().get_port()
)
else:
self.client.connect(
self.onboarding_response.get_connection_criteria().get_host(),
self.onboarding_response.get_connection_criteria().get_port()
)

def send(self, parameters, qos: int = 0) -> MessagingResult:
message_request = self.create_message_request(parameters)
mqtt_payload = message_request.json_serialize()
self.client.publish(
self.onboarding_response.get_connection_criteria().get_measures(), json.dumps(mqtt_payload),
topic=self.onboarding_response.get_connection_criteria().get_measures(),
payload=json.dumps(mqtt_payload),
qos=qos
)
result = MessagingResult([parameters.get_application_message_id()])
return result

def subscribe(self):
pass

def unsubscribe(self):
pass
Loading