Skip to content

Commit

Permalink
Merge pull request #199 from rodrigoaasm/DOJOT-95
Browse files Browse the repository at this point in the history
[DOJOT-95] Update: device-manager configure topic
  • Loading branch information
rodrigoaasm committed Jul 30, 2022
2 parents 2817639 + 0c88a09 commit 29ce388
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 33 deletions.
3 changes: 2 additions & 1 deletion DeviceManager/BackendHandler.py
Expand Up @@ -7,6 +7,7 @@
import requests
from DeviceManager.utils import HTTPRequestError
from DeviceManager.KafkaNotifier import KafkaNotifier, DeviceEvent
from DeviceManager.conf import CONFIG
import logging
from datetime import datetime
import time
Expand Down Expand Up @@ -88,7 +89,7 @@ def configure(self, device, meta):
Publishes event to kafka broker, notifying device configuration
"""
LOGGER.info(f" Publishing configure event to Kafka")
self.kafkaNotifier.send_notification(DeviceEvent.CONFIGURE, device, meta)
self.kafkaNotifier.send_notification(DeviceEvent.CONFIGURE, device, meta, CONFIG.device_actuation_subject)

class KafkaInstanceHandler:

Expand Down
35 changes: 8 additions & 27 deletions DeviceManager/KafkaNotifier.py
Expand Up @@ -49,37 +49,18 @@ def __init__(self):
# Maps services to their managed topics
self.topic_map = {}

def get_topic(self, service, subject):
if service in self.topic_map.keys():
if subject in self.topic_map[service].keys():
return self.topic_map[service][subject]

target = "{}/topic/{}".format(CONFIG.data_broker, subject)
userinfo = {
"username": "device-manager",
"service": service
}

jwt = "{}.{}.{}".format(base64.b64encode("model".encode()).decode(),
base64.b64encode(json.dumps(
userinfo).encode()).decode(),
base64.b64encode("signature".encode()).decode())

response = requests.get(target, headers={"authorization": jwt})
if 200 <= response.status_code < 300:
payload = response.json()
if self.topic_map.get(service, None) is None:
self.topic_map[service] = {}
self.topic_map[service][subject] = payload['topic']
return payload['topic']
return None

def send_notification(self, event, device, meta):
def get_topic(self, service, subject, suffix = ""):
topic = "{}.{}".format(service, subject)
if( suffix != "" ):
topic = "{}.{}".format(topic, suffix)
return topic

def send_notification(self, event, device, meta, suffix = ""):
# TODO What if Kafka is not yet up?

full_msg = NotificationMessage(event, device, meta)
try:
topic = self.get_topic(meta['service'], CONFIG.subject)
topic = self.get_topic(meta['service'], CONFIG.subject, suffix)
LOGGER.debug(f" topic for {CONFIG.subject} is {topic}")
if topic is None:
LOGGER.error(f" Failed to retrieve named topic to publish to")
Expand Down
3 changes: 3 additions & 0 deletions DeviceManager/conf.py
Expand Up @@ -16,9 +16,11 @@ def __init__(self,
broker="http://data-broker",
subject="dojot.device-manager.device",
device_subject="device-data",
device_actuation_subject="actuation",
status_timeout="5",
create_db=True,
log_level="INFO"):

# Postgres configuration data
self.dbname = os.environ.get('DBNAME', db)
self.dbhost = os.environ.get('DBHOST', dbhost)
Expand All @@ -39,6 +41,7 @@ def __init__(self,

# Which subject to publish new device information to
self.subject = os.environ.get('SUBJECT', subject)
self.device_actuation_subject = os.environ.get('DEVICE_ACTUATION_SUBJECT', device_actuation_subject)
self.device_subject = os.environ.get('DEVICE_SUBJECT', device_subject)
self.status_timeout = int(os.environ.get('STATUS_TIMEOUT', status_timeout))

Expand Down
22 changes: 22 additions & 0 deletions dev-Dockerfile
@@ -0,0 +1,22 @@
FROM python:3.6-alpine as basis

RUN apk update && apk --no-cache add postgresql-dev gcc musl-dev

RUN pip install cython

RUN mkdir -p /usr/src/app/requirements
WORKDIR /usr/src/app

RUN python3 -m venv /usr/src/venv
ENV VIRTUAL_ENV="/usr/src/venv"
ENV PATH="$VIRTUAL_ENV/bin:$PATH"

ADD . /usr/src/app
RUN pip install -r requirements/requirements.txt
RUN apk update && apk --no-cache add libpq

ENV PYTHONPATH="/usr/src/app"

EXPOSE 5000

CMD ["tail", "-f", "/dev/null"]
16 changes: 11 additions & 5 deletions tests/test_kafka_notifier.py
Expand Up @@ -23,11 +23,17 @@ def test_get_topic(self):
with patch.object(KafkaNotifier, "__init__", lambda x: None):
KafkaNotifier.topic_map = {}

with patch("requests.get") as request_mock:
request_mock.return_value = Mock(status_code=200, json=Mock(return_value={'topic': '83a257de-c421-4529-b42d-5976def7b526'}))
result = KafkaNotifier().get_topic('admin', 'dojot.device-manager.device')
self.assertIsNotNone(result)
self.assertEqual(result, '83a257de-c421-4529-b42d-5976def7b526')
result = KafkaNotifier().get_topic('admin', 'dojot.device-manager.device')
self.assertIsNotNone(result)
self.assertEqual(result, 'admin.dojot.device-manager.device')

def test_get_topic_with_suffix(self):
with patch.object(KafkaNotifier, "__init__", lambda x: None):
KafkaNotifier.topic_map = {}

result = KafkaNotifier().get_topic('admin', 'dojot.device-manager.device', "configure")
self.assertIsNotNone(result)
self.assertEqual(result, 'admin.dojot.device-manager.device.configure')

def test_send_notification(self):
data = {'label': 'test_device', 'id': 'test_device_id', 'templates': [1], 'attrs': {}}
Expand Down

0 comments on commit 29ce388

Please sign in to comment.