Skip to content

Commit

Permalink
Merge pull request #141 from kevinuehara/refactory
Browse files Browse the repository at this point in the history
Remove dependencies not used in device-manager
  • Loading branch information
kevinuehara committed Aug 27, 2019
2 parents 55672af + 0429979 commit d17b78b
Show file tree
Hide file tree
Showing 8 changed files with 5 additions and 323 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ cache:
script:
- 'docker build -t dojot/device-manager .'
- 'docker build -t dredd/test -f tests/Dockerfile .'
- 'docker-compose -p test -f tests/docker-compose.yaml up -d kafka data-broker postgres device-manager device-manager-redis postgres-users'
- 'docker-compose -p test -f tests/docker-compose.yaml up -d kafka data-broker postgres device-manager postgres-users'
- 'docker-compose -p test -f tests/docker-compose.yaml run --rm test-runner'
after_success:
- travis/publish.sh
Expand Down
266 changes: 0 additions & 266 deletions DeviceManager/BackendHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,73 +53,6 @@ def update(self, device):
"""
raise NotImplementedError('Abstract method called')


# KafkaHandler is the preferred handler
class OrionHandler(BackendHandler):

def __init__(self, service='devm', base_url='http://orion:1026/v2/entities'):
self.baseUrl = base_url
self.service = service
self._noBodyHeaders = {
'Fiware-service': service,
'Fiware-servicePath': '/',
'cache-control': 'no-cache'
}
self._headers = self._noBodyHeaders
self._headers['Content-Type'] = 'application/json'

@staticmethod
def parse_device(device, generated_id=False):
body = {}
type_descr = "template"
for dev_type in device['attrs'].keys():
type_descr += "_" + str(dev_type)
if generated_id:
body = {
"type": type_descr,
"id": device['id']
}
for tpl in device['attrs']:
for attr in device['attrs'][tpl]:
body[attr['label']] = {"type": attr['value_type']}
if (attr['value_type'] == 'geo:point'):
body[attr['label']]['value'] = '0,0'

return body

def create_update_device(self, device, type_descr, is_update=True):
target_url = "%s/%s/attrs?type=%s&options=keyValues" % (self.baseUrl, device['id'], type_descr)
body = json.dumps(OrionHandler.parse_device(device, not is_update))
if not is_update:
target_url = self.baseUrl

try:
LOGGER.info("about to create device in ctx broker")
LOGGER.debug("%s", body)
response = requests.post(target_url, headers=self._headers, data=body)
if 200 <= response.status_code < 300:
LOGGER.debug("Broker update successful")
else:
LOGGER.info("Failed to update ctx broker: %d", response.status_code)
try:
LOGGER.debug("%s", response.json())
except Exception as e:
LOGGER.error(e)
except requests.ConnectionError:
raise HTTPRequestError(500, "Broker is not reachable")

def create(self, device, type_descr):
self.create_update_device(device, type_descr, False)

def remove(self, device_id):
# removal is ignored, thus leaving removed device data lingering in the system
# (this allows easier recovery/rollback of data by the user)
pass

def update(self, device, type_descr):
self.create_update_device(device, type_descr)


class KafkaHandler:

def __init__(self):
Expand Down Expand Up @@ -155,202 +88,3 @@ def configure(self, device, meta):
"""
LOGGER.info(f" Publishing configure event to Kafka")
send_notification(DeviceEvent.CONFIGURE, device, meta)


# deprecated
class IotaHandler(BackendHandler):
""" Abstracts interaction with iotagent-json for MQTT device management """
# TODO: this should be configurable (via file or environment variable)
def __init__(self, base_url='http://iotagent:4041/iot',
orion_url='http://orion:1026/v1/contextEntities',
service='devm'):
self.baseUrl = base_url
self.orionUrl = orion_url
self.service = service
self._headers = {
'Fiware-service': service,
'Fiware-servicePath': '/',
'Content-Type': 'application/json',
'cache-control': 'no-cache'
}
self._noBodyHeaders = {
'Fiware-service': service,
'Fiware-servicePath': '/',
'cache-control': 'no-cache'
}

def __get_topic(self, device):

LOGGER.info(' About to get the topic from the device: {}'.format( device))

if device.topic:
topic = device.topic
else:
topic = "/%s/%s/attrs" % (self.service, device.device_id)

LOGGER.info(' Obtained topic: {}'.format( topic))

return topic

def __get_config(self, device):

base_config = {
# this is actually consumed by iotagent
'device_id': device.device_id,
# becomes entity type for context broker
'entity_type': 'device',
# becomes entity id for context broker
'entity_name': device.device_id,
'attributes': [],
# this is actually consumed by iotagent
'internal_attributes': {
"attributes" : [],
"timeout": {"periodicity": device.frequency, "waitMultiplier": 3}
},
'static_attributes': []
}

for attr in device.template.attrs:
if attr.type == 'dynamic':
base_config['attributes'].append({
'name': attr.label,
'type': attr.value_type
})
elif attr.type == 'static':
base_config['static_attributes'].append({
'name': attr.label,
'type': attr.value_type,
'value': attr.static_value
})
elif (attr.type == 'meta') and (attr.label == 'mqtt_topic'):
# @BUG however nice, this doesn't seem to work with iotagent-json
base_config['internal_attributes']['attributes'].append({
{"topic": "tcp:mqtt:%s" % attr.static_value},
})

LOGGER.info(f" The config of the device {device} is {base_config}")
return base_config

def create(self, device):
""" Returns boolean indicating device creation success. """

try:
svc = json.dumps({
"services": [{
"resource": "devm",
"apikey": self.service,
"entity_type": 'device'
}]
})
response = requests.post(self.baseUrl + '/services', headers=self._headers, data=svc)
if not (response.status_code == 409 or
(200 <= response.status_code < 300)):
error = "Failed to configure ingestion subsystem: service creation failed"
LOGGER.error(f" {error}")
raise HTTPRequestError(500, error)
except requests.ConnectionError:
raise HTTPRequestError(500, "Cannot reach ingestion subsystem (service)")

try:
response = requests.post(self.baseUrl + '/devices', headers=self._headers,
data=json.dumps({'devices':[self.__get_config(device)]}))

if not (200 <= response.status_code < 300):
error = "Failed to configure ingestion subsystem: device creation failed"
LOGGER.error(f" {error}")
raise HTTPRequestError(500, error)

LOGGER.info(" Device {device} created with success")

except requests.ConnectionError:
raise HTTPRequestError(500, "Cannot reach ingestion subsystem (device)")

def remove(self, deviceid):
""" Returns boolean indicating device removal success. """

try:
response = requests.delete(self.baseUrl + '/devices/' + deviceid,
headers=self._noBodyHeaders)
if 200 <= response.status_code < 300:
response = requests.delete('%s/%s' % (self.orionUrl, deviceid),
headers=self._noBodyHeaders)
if not (200 <= response.status_code < 300):
error = "Failed to configure ingestion subsystem: device removal failed"
LOGGER.error(f" {error}")
raise HTTPRequestError(500, error)

LOGGER.info(f" Device {deviceid} removed with success")

except requests.ConnectionError:
raise HTTPRequestError(500, "Cannot reach ingestion subsystem")

def update(self, device):
""" Returns boolean indicating device update success. """

self.remove(device.device_id)
return self.create(device)


# Temporarily create a subscription to persist device data
# TODO this must be revisited in favor of a orchestrator-based solution
class PersistenceHandler(object):
"""
Abstracts the configuration of subscriptions targeting the default
history backend (STH)
"""
# TODO: this should be configurable (via file or environment variable)
def __init__(self, service='devm',
base_url='http://orion:1026/v1/contextSubscriptions',
target_url="http://sth:8666/notify"):
self.baseUrl = base_url
self.targetUrl = target_url
self.service = service
self._headers = {
'Fiware-service': service,
'Fiware-servicePath': '/',
'Content-Type': 'application/json',
'cache-control': 'no-cache'
}
self._noBodyHeaders = {
'Fiware-service': service,
'Fiware-servicePath': '/',
'cache-control': 'no-cache'
}

def create(self, device_id, device_type='device'):
""" Returns subscription id on success. """

try:
svc = json.dumps({
"entities": [{
"type": device_type,
"isPattern": "false",
"id": device_id
}],
"reference" : self.targetUrl,
"duration": "P10Y",
"notifyConditions": [{"type": "ONCHANGE"}]
})
response = requests.post(self.baseUrl, headers=self._headers, data=svc)
if not (response.status_code == 409 or
(200 <= response.status_code < 300)):
raise HTTPRequestError(500, "Failed to create subscription")

# return the newly created subs
reply = response.json()
return reply['subscribeResponse']['subscriptionId']
except ValueError:
LOGGER.error('Failed to create subscription')
raise HTTPRequestError(500, "Failed to create subscription")
except requests.ConnectionError:
raise HTTPRequestError(500, "Broker is not reachable")

def remove(self, subsId):
""" Returns boolean indicating subscription removal success. """

try:
response = requests.delete(self.baseUrl + '/' + subsId, headers=self._noBodyHeaders)
if not (200 <= response.status_code < 300):
raise HTTPRequestError(500, "Failed to remove subscription")
except requests.ConnectionError:
raise HTTPRequestError(500, "Broker is not reachable")
36 changes: 1 addition & 35 deletions DeviceManager/DeviceHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from DeviceManager.utils import create_id, get_pagination, format_response
from DeviceManager.utils import HTTPRequestError
from DeviceManager.conf import CONFIG
from DeviceManager.BackendHandler import OrionHandler, KafkaHandler, PersistenceHandler
from DeviceManager.BackendHandler import KafkaHandler

from DeviceManager.DatabaseHandler import db
from DeviceManager.DatabaseModels import assert_device_exists, assert_template_exists
Expand Down Expand Up @@ -418,12 +418,6 @@ def create_device(req):

# Handlers
kafka_handler = KafkaHandler()
if CONFIG.orion:
ctx_broker_handler = OrionHandler(service=tenant)
subs_handler = PersistenceHandler(service=tenant)
else:
ctx_broker_handler = None
subs_handler = None

full_device = None
orm_devices = []
Expand Down Expand Up @@ -459,16 +453,6 @@ def create_device(req):

# Updating handlers
kafka_handler.create(full_device, meta={"service": tenant})
if CONFIG.orion:
# Generating 'device type' field for history
type_descr = "template"
for dev_type in full_device['attrs'].keys():
type_descr += "_" + str(dev_type)
# TODO remove this in favor of kafka as data broker....
ctx_broker_handler.create(full_device, type_descr)
sub_id = subs_handler.create(full_device['id'], type_descr)
orm_device.persistence = sub_id


if verbose:
result = {
Expand Down Expand Up @@ -503,9 +487,6 @@ def delete_device(req, device_id):

kafka_handler = KafkaHandler()
kafka_handler.remove(data, meta={"service": tenant})
if CONFIG.orion:
subscription_handler = PersistenceHandler(service=tenant)
subscription_handler.remove(orm_device.persistence)

db.session.delete(orm_device)
db.session.commit()
Expand Down Expand Up @@ -581,21 +562,6 @@ def update_device(req, device_id):

full_device = serialize_full_device(updated_orm_device, tenant)

if CONFIG.orion:
# Create subscription pointing to history service
# (STH, logstash based persister)
subs_handler = PersistenceHandler(service=tenant)
subs_handler.remove(old_orm_device.persistence)
# Generating 'device type' field for subscription request
type_descr = "template"
for dev_type in full_device['attrs'].keys():
type_descr += "_" + str(dev_type)
updated_orm_device.persistence = subs_handler.create(
device_id, type_descr)

ctx_broker_handler = OrionHandler(service=tenant)
ctx_broker_handler.update(serialize_full_device(old_orm_device, tenant), type_descr)

kafka_handler = KafkaHandler()
kafka_handler.update(full_device, meta={"service": tenant})

Expand Down
6 changes: 1 addition & 5 deletions DeviceManager/ImportHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from DeviceManager.Logger import Log
from DeviceManager.utils import format_response, HTTPRequestError
from DeviceManager.conf import CONFIG
from DeviceManager.BackendHandler import OrionHandler, KafkaHandler, PersistenceHandler
from DeviceManager.BackendHandler import KafkaHandler

from DeviceManager.DatabaseHandler import db
from DeviceManager.DatabaseModels import DeviceTemplate, Device, DeviceAttr, DeviceOverride
Expand Down Expand Up @@ -64,10 +64,6 @@ def notifies_deletion_to_kafka(device, tenant):
data = serialize_full_device(device, tenant)
kafka_handler = KafkaHandler()
kafka_handler.remove(data, meta={"service": tenant})
if CONFIG.orion:
subscription_handler = PersistenceHandler(service=tenant)
subscription_handler.remove(device.persistence)


def delete_records(tenant):
overrides = db.session.query(DeviceOverride)
Expand Down
Loading

0 comments on commit d17b78b

Please sign in to comment.