Skip to content

Commit

Permalink
Merge pull request #3 from conwetlab/feature/ngsi-registry
Browse files Browse the repository at this point in the history
Feature/ngsi registry
  • Loading branch information
aarranz committed Jul 17, 2018
2 parents e7bfb72 + 3792e67 commit a810f25
Show file tree
Hide file tree
Showing 5 changed files with 454 additions and 38 deletions.
110 changes: 91 additions & 19 deletions ckanext/ngsiview/controller.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Copyright 2015 Telefonica Investigacion y Desarrollo, S.A.U
# Copyright 2018 CoNWeT Lab. Univerisdad Politécnica de Madrid
# Copyright (c) 2018 Future Internet Consulting and Development Solutions S.L.
#
# This file is part of ckanext-ngsipreview.
Expand Down Expand Up @@ -27,13 +30,89 @@
from pylons import config
import requests

from .plugin import NGSI_REG_FORMAT

log = getLogger(__name__)

CHUNK_SIZE = 512


class ProxyNGSIController(base.BaseController):

def _proxy_query_resource(self, resource, parsed_url, headers):
verify = config.get('ckan.ngsi.verify_requests', True)

if parsed_url.path.find('/v1/queryContext') != -1:
if resource.get("payload", "").strip() == "":
details = 'Please add a payload to complete the query.'
base.abort(409, detail=details)

try:
json.loads(resource['payload'])
except:
details = "Payload field doesn't contain valid JSON data."
base.abort(409, detail=details)

headers['Content-Type'] = "application/json"
r = requests.post(resource['url'], headers=headers, data=resource["payload"], stream=True, verify=verify)

else:
r = requests.get(resource['url'], headers=headers, stream=True, verify=verify)

return r

def _proxy_registration_resource(self, resource, parsed_url, headers):
verify = config.get('ckan.ngsi.verify_requests', True)
path = parsed_url.path

if path.endswith('/'):
path = path[:-1]

path = path + '/v2/op/query'
attrs = []

if 'attrs_str' in resource and len(resource['attrs_str']):
attrs = resource['attrs_str'].split(',')
body = {
'entities': [],
'attrs': attrs
}

# Include entity information
for entity in resource['entity']:
query_entity = {
'type': entity['value']
}
if 'isPattern' in entity and entity['isPattern'] == 'on':
query_entity['idPattern'] = entity['id']
else:
query_entity['id'] = entity['id']

body['entities'].append(query_entity)

# Parse expression to include georel information
if 'expression' in resource and len(resource['expression']):
# Separate expresion query strings
supported_expressions = ['georel', 'geometry', 'coords']
parsed_expression = resource['expression'].split('&')

expression = {}
for exp in parsed_expression:
parsed_exp = exp.split('=')

if len(parsed_exp) != 2 or not parsed_exp[0] in supported_expressions:
base.abort(422, detail='The expression is not a valid one for NGSI Registration, only georel, geometry, and coords is supported')
else:
expression[parsed_exp[0]] = parsed_exp[1]

body['expression'] = expression

headers['Content-Type'] = 'application/json'
url = urlparse.urljoin(parsed_url.scheme + '://' + parsed_url.netloc, path)
response = requests.post(url, headers=headers, json=body, stream=True, verify=verify)

return response

def proxy_ngsi_resource(self, resource_id):
# Chunked proxy for ngsi resources.
context = {'model': base.model, 'session': base.model.Session, 'user': base.c.user or base.c.author}
Expand All @@ -46,7 +125,7 @@ def proxy_ngsi_resource(self, resource_id):
'Accept': 'application/json'
}

if resource.get('oauth_req', 'false') == 'true':
if 'oauth_req' in resource and resource['oauth_req'] == 'true':
token = toolkit.c.usertoken['access_token']
headers['X-Auth-Token'] = token

Expand All @@ -56,28 +135,16 @@ def proxy_ngsi_resource(self, resource_id):
headers['FIWARE-ServicePath'] = resource['service_path']

url = resource['url']
parsedurl = urlparse.urlsplit(url)
parsed_url = urlparse.urlsplit(url)

if parsedurl.scheme not in ("http", "https") or not parsedurl.netloc:
if parsed_url.scheme not in ("http", "https") or not parsed_url.netloc:
base.abort(409, detail='Invalid URL.')

try:
if parsedurl.path.find('/v1/queryContext') != -1:
if resource.get("payload", "").strip() == "":
details = 'Please add a payload to complete the query.'
base.abort(409, detail=details)

try:
json.loads(resource['payload'])
except json.JSONDecodeError:
details = "Payload field doesn't contain valid JSON data."
base.abort(409, detail=details)

headers['Content-Type'] = "application/json"
r = requests.post(url, headers=headers, data=resource["payload"], stream=True, verify=verify)

if resource['format'].lower() == NGSI_REG_FORMAT:
r = self._proxy_registration_resource(resource, parsed_url, headers)
else:
r = requests.get(url, headers=headers, stream=True, verify=verify)
r = self._proxy_query_resource(resource, parsed_url, headers)
except requests.HTTPError:
details = 'Could not proxy ngsi_resource. We are working to resolve this issue as quickly as possible'
base.abort(409, detail=details)
Expand All @@ -88,7 +155,6 @@ def proxy_ngsi_resource(self, resource_id):
details = 'Could not proxy ngsi_resource because the connection timed out.'
base.abort(504, detail=details)


if r.status_code == 401:
if 'oauth_req' in resource and resource['oauth_req'] == 'true':
details = 'ERROR 401 token expired. Retrieving new token, reload please.'
Expand All @@ -101,6 +167,12 @@ def proxy_ngsi_resource(self, resource_id):
log.info(details)
base.abort(409, detail=details)

elif r.status_code == 400:
response = r.json()
details = response['description']
log.info(details)
base.abort(422, detail=details)

else:
r.raise_for_status()
base.response.content_type = r.headers['content-type']
Expand Down
94 changes: 91 additions & 3 deletions ckanext/ngsiview/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Orion Context Broker. If not, see http://www.gnu.org/licenses/.
# along with CKAN NGSI View extension. If not, see http://www.gnu.org/licenses/.

import logging

Expand All @@ -34,6 +34,7 @@


NGSI_FORMAT = 'fiware-ngsi'
NGSI_REG_FORMAT = 'fiware-ngsi-registry'


def check_query(resource):
Expand All @@ -47,6 +48,7 @@ class NgsiView(p.SingletonPlugin):
p.implements(p.IConfigurer, inherit=True)
p.implements(p.IConfigurable, inherit=True)
p.implements(p.IResourceView, inherit=True)
p.implements(p.IResourceController, inherit=True)

def before_map(self, m):
m.connect(
Expand Down Expand Up @@ -93,7 +95,7 @@ def can_view(self, data_dict):
proxy_enabled = p.plugin_loaded('resource_proxy')
same_domain = datapreview.on_same_domain(data_dict)

if format_lower == NGSI_FORMAT and check_query(resource):
if (format_lower == NGSI_FORMAT and check_query(resource)) or format_lower == NGSI_REG_FORMAT:
return same_domain or proxy_enabled
else:
return False
Expand All @@ -103,14 +105,15 @@ def setup_template_variables(self, context, data_dict):
proxy_enabled = p.plugin_loaded('resource_proxy')
oauth2_enabled = p.plugin_loaded('oauth2')
same_domain = datapreview.on_same_domain(data_dict)
format_lower = resource.get('format', '').lower()

if 'oauth_req' not in resource:
oauth_req = 'false'
else:
oauth_req = resource['oauth_req']

url = resource['url']
if not check_query(resource):
if format_lower == NGSI_FORMAT and not check_query(resource):
details = "</br></br>This is not a ContextBroker query, please check <a href='https://forge.fiware.org/plugins/mediawiki/wiki/fiware/index.php/Publish/Subscribe_Broker_-_Orion_Context_Broker_-_User_and_Programmers_Guide'>Orion Context Broker documentation</a></br></br></br>"
f_details = "This is not a ContextBroker query, please check Orion Context Broker documentation."
h.flash_error(f_details, allow_html=False)
Expand Down Expand Up @@ -149,3 +152,88 @@ def setup_template_variables(self, context, data_dict):

def view_template(self, context, data_dict):
return 'ngsi.html'

def _iterate_serialized(self, resource, handler):
pending_entities = True
index = 0
while pending_entities:
prefix = 'entity__' + str(index) + '__'

if prefix + 'id' in resource:
handler(prefix)
index = index + 1
else:
pending_entities = False

def _serialize_resource(self, resource):
# Check if NGSI resource is being created
serialized_resource = resource
if resource['format'] == NGSI_REG_FORMAT:

if 'entity' not in resource or not len(resource['entity']):
# Raise an error, al least one entity must be provided
raise p.toolkit.ValidationError({'NGSI Data': ['At least one NGSI entity must be provided']})

# Remove all serialized entries from the resource
def remove_serialized(prefix):
del resource[prefix + 'id']
del resource[prefix + 'value']

if prefix + 'isPattern' in resource:
del resource[prefix + 'isPattern']

self._iterate_serialized(resource, remove_serialized)

index = 0
# Serialize entity information to support custom field saving
entities = {}
for entity in resource['entity']:
if 'delete' in entity and entity['delete'] == 'on':
continue

prefix = 'entity__' + str(index) + '__'
entities[prefix + 'id'] = entity['id']
entities[prefix + 'value'] = entity['value']

# Check if there is an isPattern field
if 'isPattern' in entity and entity['isPattern'] == 'on':
entities[prefix + 'isPattern'] = entity['isPattern']
index = index + 1

del serialized_resource['entity']
serialized_resource.update(entities)

return serialized_resource

def before_create(self, context, resource):
return self._serialize_resource(resource)

def after_create(self, context, resource):
# Create entry in the NGSI registry
pass

def before_update(self, context, current, resource):
return self._serialize_resource(resource)

def after_update(self, context, resource):
pass

def before_show(self, resource):
# Deserialize resource information
entities = []
def deserilize_handler(prefix):
entity = {
'id': resource[prefix + 'id'],
'value': resource[prefix + 'value'],
}

if prefix + 'isPattern' in resource:
entity['isPattern'] = resource[prefix + 'isPattern']

entities.append(entity)

self._iterate_serialized(resource, deserilize_handler)

resource['entity'] = entities

return resource

0 comments on commit a810f25

Please sign in to comment.