Skip to content

Commit

Permalink
Merge branch 'development' into update_docs
Browse files Browse the repository at this point in the history
  • Loading branch information
eduardogmisiuk committed Apr 28, 2020
2 parents a843c0a + 316602c commit 340ce4c
Show file tree
Hide file tree
Showing 31 changed files with 1,807 additions and 567 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@
.vscode/
.idea/
docs/build/*
*egg-info
*egg-info
.coverage
htmlcov/
15 changes: 12 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,22 @@ python:
cache:
directories:
- node_modules
before_install:
- npm install aglio
install:
- pip install -r requirements/requirements.txt
- pip install -r tests/requirements.txt
- pip install codecov
env:
global:
- DEV_MNGR_CRYPTO_PASS="kamehameHA"
- DEV_MNGR_CRYPTO_IV=1234567890123456
- DEV_MNGR_CRYPTO_SALT="shuriken"
script:
- 'docker build -t dojot/device-manager .'
- 'docker build -t dredd/test -f tests/Dockerfile .'
- 'docker-compose -p test -f tests/docker-compose.yaml up -d kafka data-broker postgres device-manager device-manager-redis postgres-users'
- 'docker-compose -p test -f tests/docker-compose.yaml up -d kafka data-broker postgres device-manager postgres-users'
- 'docker-compose -p test -f tests/docker-compose.yaml run --rm test-runner'
- python3 -m pytest --cov-report=html --cov=DeviceManager tests/
after_success:
- codecov
- travis/publish.sh
- travis/deploy-gh-pages.sh
289 changes: 23 additions & 266 deletions DeviceManager/BackendHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import traceback
import requests
from DeviceManager.utils import HTTPRequestError
from DeviceManager.KafkaNotifier import send_notification, DeviceEvent
from DeviceManager.KafkaNotifier import KafkaNotifier, DeviceEvent
import logging
from datetime import datetime
import time
Expand Down Expand Up @@ -54,303 +54,60 @@ def update(self, device):
raise NotImplementedError('Abstract method called')


# KafkaHandler is the preferred handler
class OrionHandler(BackendHandler):

def __init__(self, service='devm', base_url='http://orion:1026/v2/entities'):
self.baseUrl = base_url
self.service = service
self._noBodyHeaders = {
'Fiware-service': service,
'Fiware-servicePath': '/',
'cache-control': 'no-cache'
}
self._headers = self._noBodyHeaders
self._headers['Content-Type'] = 'application/json'

@staticmethod
def parse_device(device, generated_id=False):
body = {}
type_descr = "template"
for dev_type in device['attrs'].keys():
type_descr += "_" + str(dev_type)
if generated_id:
body = {
"type": type_descr,
"id": device['id']
}
for tpl in device['attrs']:
for attr in device['attrs'][tpl]:
body[attr['label']] = {"type": attr['value_type']}
if (attr['value_type'] == 'geo:point'):
body[attr['label']]['value'] = '0,0'

return body

def create_update_device(self, device, type_descr, is_update=True):
target_url = "%s/%s/attrs?type=%s&options=keyValues" % (self.baseUrl, device['id'], type_descr)
body = json.dumps(OrionHandler.parse_device(device, not is_update))
if not is_update:
target_url = self.baseUrl

try:
LOGGER.info("about to create device in ctx broker")
LOGGER.debug("%s", body)
response = requests.post(target_url, headers=self._headers, data=body)
if 200 <= response.status_code < 300:
LOGGER.debug("Broker update successful")
else:
LOGGER.info("Failed to update ctx broker: %d", response.status_code)
try:
LOGGER.debug("%s", response.json())
except Exception as e:
LOGGER.error(e)
except requests.ConnectionError:
raise HTTPRequestError(500, "Broker is not reachable")

def create(self, device, type_descr):
self.create_update_device(device, type_descr, False)

def remove(self, device_id):
# removal is ignored, thus leaving removed device data lingering in the system
# (this allows easier recovery/rollback of data by the user)
pass

def update(self, device, type_descr):
self.create_update_device(device, type_descr)


class KafkaHandler:

def __init__(self):
pass
self.kafkaNotifier = KafkaNotifier()

def create(self, device, meta):
"""
Publishes event to kafka broker, notifying device creation
"""

LOGGER.info(f" Publishing create event to Kafka")
send_notification(DeviceEvent.CREATE, device, meta)
self.kafkaNotifier.send_notification(DeviceEvent.CREATE, device, meta)

def remove(self, device, meta):
"""
Publishes event to kafka broker, notifying device removal
"""

LOGGER.info(f" Publishing remove event to Kafka")
send_notification(DeviceEvent.REMOVE, device, meta)
self.kafkaNotifier.send_notification(DeviceEvent.REMOVE, device, meta)

def update(self, device, meta):
"""
Publishes event to kafka broker, notifying device update
"""

LOGGER.info(f" Publishing create update to Kafka")
send_notification(DeviceEvent.UPDATE, device, meta)
self.kafkaNotifier.send_notification(DeviceEvent.UPDATE, device, meta)

def configure(self, device, meta):
"""
Publishes event to kafka broker, notifying device configuration
"""
LOGGER.info(f" Publishing configure event to Kafka")
send_notification(DeviceEvent.CONFIGURE, device, meta)


# deprecated
class IotaHandler(BackendHandler):
""" Abstracts interaction with iotagent-json for MQTT device management """
# TODO: this should be configurable (via file or environment variable)
def __init__(self, base_url='http://iotagent:4041/iot',
orion_url='http://orion:1026/v1/contextEntities',
service='devm'):
self.baseUrl = base_url
self.orionUrl = orion_url
self.service = service
self._headers = {
'Fiware-service': service,
'Fiware-servicePath': '/',
'Content-Type': 'application/json',
'cache-control': 'no-cache'
}
self._noBodyHeaders = {
'Fiware-service': service,
'Fiware-servicePath': '/',
'cache-control': 'no-cache'
}

def __get_topic(self, device):

LOGGER.info(' About to get the topic from the device: {}'.format( device))
self.kafkaNotifier.send_notification(DeviceEvent.CONFIGURE, device, meta)

if device.topic:
topic = device.topic
else:
topic = "/%s/%s/attrs" % (self.service, device.device_id)
class KafkaInstanceHandler:

kafkaNotifier = None

LOGGER.info(' Obtained topic: {}'.format( topic))

return topic

def __get_config(self, device):

base_config = {
# this is actually consumed by iotagent
'device_id': device.device_id,
# becomes entity type for context broker
'entity_type': 'device',
# becomes entity id for context broker
'entity_name': device.device_id,
'attributes': [],
# this is actually consumed by iotagent
'internal_attributes': {
"attributes" : [],
"timeout": {"periodicity": device.frequency, "waitMultiplier": 3}
},
'static_attributes': []
}
def __init__(self):
pass

for attr in device.template.attrs:
if attr.type == 'dynamic':
base_config['attributes'].append({
'name': attr.label,
'type': attr.value_type
})
elif attr.type == 'static':
base_config['static_attributes'].append({
'name': attr.label,
'type': attr.value_type,
'value': attr.static_value
})
elif (attr.type == 'meta') and (attr.label == 'mqtt_topic'):
# @BUG however nice, this doesn't seem to work with iotagent-json
base_config['internal_attributes']['attributes'].append({
{"topic": "tcp:mqtt:%s" % attr.static_value},
})
def getInstance(self, kafka_instance):
"""
Instantiates a connection with Kafka, was created because
previously the connection was being created in KafkaNotifier
once time every import.
LOGGER.info(f" The config of the device {device} is {base_config}")
return base_config

def create(self, device):
""" Returns boolean indicating device creation success. """

try:
svc = json.dumps({
"services": [{
"resource": "devm",
"apikey": self.service,
"entity_type": 'device'
}]
})
response = requests.post(self.baseUrl + '/services', headers=self._headers, data=svc)
if not (response.status_code == 409 or
(200 <= response.status_code < 300)):
error = "Failed to configure ingestion subsystem: service creation failed"
LOGGER.error(f" {error}")
raise HTTPRequestError(500, error)
except requests.ConnectionError:
raise HTTPRequestError(500, "Cannot reach ingestion subsystem (service)")

try:
response = requests.post(self.baseUrl + '/devices', headers=self._headers,
data=json.dumps({'devices':[self.__get_config(device)]}))

if not (200 <= response.status_code < 300):
error = "Failed to configure ingestion subsystem: device creation failed"
LOGGER.error(f" {error}")
raise HTTPRequestError(500, error)

LOGGER.info(" Device {device} created with success")

except requests.ConnectionError:
raise HTTPRequestError(500, "Cannot reach ingestion subsystem (device)")

def remove(self, deviceid):
""" Returns boolean indicating device removal success. """

try:
response = requests.delete(self.baseUrl + '/devices/' + deviceid,
headers=self._noBodyHeaders)
if 200 <= response.status_code < 300:
response = requests.delete('%s/%s' % (self.orionUrl, deviceid),
headers=self._noBodyHeaders)
if not (200 <= response.status_code < 300):
error = "Failed to configure ingestion subsystem: device removal failed"
LOGGER.error(f" {error}")
raise HTTPRequestError(500, error)

LOGGER.info(f" Device {deviceid} removed with success")

except requests.ConnectionError:
raise HTTPRequestError(500, "Cannot reach ingestion subsystem")

def update(self, device):
""" Returns boolean indicating device update success. """

self.remove(device.device_id)
return self.create(device)


# Temporarily create a subscription to persist device data
# TODO this must be revisited in favor of a orchestrator-based solution
class PersistenceHandler(object):
"""
Abstracts the configuration of subscriptions targeting the default
history backend (STH)
"""
# TODO: this should be configurable (via file or environment variable)
def __init__(self, service='devm',
base_url='http://orion:1026/v1/contextSubscriptions',
target_url="http://sth:8666/notify"):
self.baseUrl = base_url
self.targetUrl = target_url
self.service = service
self._headers = {
'Fiware-service': service,
'Fiware-servicePath': '/',
'Content-Type': 'application/json',
'cache-control': 'no-cache'
}
self._noBodyHeaders = {
'Fiware-service': service,
'Fiware-servicePath': '/',
'cache-control': 'no-cache'
}

def create(self, device_id, device_type='device'):
""" Returns subscription id on success. """

try:
svc = json.dumps({
"entities": [{
"type": device_type,
"isPattern": "false",
"id": device_id
}],
"reference" : self.targetUrl,
"duration": "P10Y",
"notifyConditions": [{"type": "ONCHANGE"}]
})
response = requests.post(self.baseUrl, headers=self._headers, data=svc)
if not (response.status_code == 409 or
(200 <= response.status_code < 300)):
raise HTTPRequestError(500, "Failed to create subscription")

# return the newly created subs
reply = response.json()
return reply['subscribeResponse']['subscriptionId']
except ValueError:
LOGGER.error('Failed to create subscription')
raise HTTPRequestError(500, "Failed to create subscription")
except requests.ConnectionError:
raise HTTPRequestError(500, "Broker is not reachable")

def remove(self, subsId):
""" Returns boolean indicating subscription removal success. """
:param kafka_instance: An instance of KafkaHandler.
:return An instance of KafkaHandler used to notify
"""

if kafka_instance is None:
self.kafkaNotifier = KafkaHandler()

try:
response = requests.delete(self.baseUrl + '/' + subsId, headers=self._noBodyHeaders)
if not (200 <= response.status_code < 300):
raise HTTPRequestError(500, "Failed to remove subscription")
except requests.ConnectionError:
raise HTTPRequestError(500, "Broker is not reachable")
return self.kafkaNotifier
Loading

0 comments on commit 340ce4c

Please sign in to comment.