Skip to content

Commit

Permalink
Merge 27dc625 into 36afd3b
Browse files Browse the repository at this point in the history
  • Loading branch information
aarranz committed Jul 16, 2018
2 parents 36afd3b + 27dc625 commit d012396
Show file tree
Hide file tree
Showing 2 changed files with 191 additions and 84 deletions.
165 changes: 81 additions & 84 deletions ckanext/ngsiview/controller.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python
# Copyright 2015 Telefonica Investigacion y Desarrollo, S.A.U
# Copyright (c) 2018 Future Internet Consulting and Development Solutions S.L.
#
# This file is part of ckanext-ngsipreview.
#
Expand All @@ -16,105 +17,101 @@
# You should have received a copy of the GNU Affero General Public License
# along with Orion Context Broker. If not, see http://www.gnu.org/licenses/.

import json
from logging import getLogger
import urlparse
from pylons import config
import requests
import json

import ckan.logic as logic
import ckan.lib.base as base
import ckan.plugins as p
from ckan.plugins import toolkit
from pylons import config
import requests

log = getLogger(__name__)

CHUNK_SIZE = 512


def proxy_ngsi_resource(context, data_dict):
# Chunked proxy for ngsi resources.
resource_id = data_dict['resource_id']
log.info('Proxify resource {id}'.format(id=resource_id))
resource = logic.get_action('resource_show')(context, {'id': resource_id})
verify = config.get('ckan.ngsi.verify_requests', True)

try:

headers = {
'Accept': 'application/json'
}
class ProxyNGSIController(base.BaseController):

if 'oauth_req' in resource and resource['oauth_req'] == 'true':
token = p.toolkit.c.usertoken['access_token']
headers['X-Auth-Token'] = token
def proxy_ngsi_resource(self, resource_id):
# Chunked proxy for ngsi resources.
context = {'model': base.model, 'session': base.model.Session, 'user': base.c.user or base.c.author}

if resource.get('tenant', '') != '':
headers['FIWARE-Service'] = resource['tenant']
if resource.get('service_path', '') != '':
headers['FIWARE-ServicePath'] = resource['service_path']
log.info('Proxify resource {id}'.format(id=resource_id))
resource = logic.get_action('resource_show')(context, {'id': resource_id})
verify = config.get('ckan.ngsi.verify_requests', True)

url = resource['url']
try:
parsedurl = urlparse.urlsplit(url)
except:
base.abort(409, detail='Invalid URL.')

if not parsedurl.scheme or not parsedurl.netloc:
base.abort(409, detail='Invalid URL.')
headers = {
'Accept': 'application/json'
}

if parsedurl.path.find('/v1/queryContext') != -1:
if resource.get("payload", "").strip() == "":
details = 'Please add a payload to complete the query.'
base.abort(409, detail=details)
if resource.get('oauth_req', 'false') == 'true':
token = toolkit.c.usertoken['access_token']
headers['X-Auth-Token'] = token

try:
json.loads(resource['payload'])
except:
details = "Payload field doesn't contain valid JSON data."
base.abort(409, detail=details)

headers['Content-Type'] = "application/json"
r = requests.post(url, headers=headers, data=resource["payload"], stream=True, verify=verify)

else:
r = requests.get(url, headers=headers, stream=True, verify=verify)

if r.status_code == 401:
if 'oauth_req' in resource and resource['oauth_req'] == 'true':
details = 'ERROR 401 token expired. Retrieving new token, reload please.'
log.info(details)
base.abort(409, detail=details)
p.toolkit.c.usertoken_refresh()

elif 'oauth_req' not in resource or resource['oauth_req'] == 'false':
details = 'This query may need Oauth-token, please check if the token field on resource_edit is correct.'
log.info(details)
base.abort(409, detail=details)

else:
r.raise_for_status()
base.response.content_type = r.headers['content-type']
base.response.charset = r.encoding

for chunk in r.iter_content(chunk_size=CHUNK_SIZE):
base.response.body_file.write(chunk)

except ValueError:
details = ''
base.abort(409, detail=details)
except requests.HTTPError:
details = 'Could not proxy ngsi_resource. We are working to resolve this issue as quickly as possible'
base.abort(409, detail=details)
except requests.ConnectionError:
details = 'Could not proxy ngsi_resource because a connection error occurred.'
base.abort(502, detail=details)
except requests.Timeout:
details = 'Could not proxy ngsi_resource because the connection timed out.'
base.abort(504, detail=details)
if resource.get('tenant', '') != '':
headers['FIWARE-Service'] = resource['tenant']
if resource.get('service_path', '') != '':
headers['FIWARE-ServicePath'] = resource['service_path']


class ProxyNGSIController(base.BaseController):

def proxy_ngsi_resource(self, resource_id):
data_dict = {'resource_id': resource_id}
context = {'model': base.model, 'session': base.model.Session, 'user': base.c.user or base.c.author}
return proxy_ngsi_resource(context, data_dict)
url = resource['url']
try:
parsedurl = urlparse.urlsplit(url)
except ValueError:
base.abort(409, detail='Invalid URL.')

if parsedurl.scheme not in ("http", "https") or not parsedurl.netloc:
base.abort(409, detail='Invalid URL.')

if parsedurl.path.find('/v1/queryContext') != -1:
if resource.get("payload", "").strip() == "":
details = 'Please add a payload to complete the query.'
base.abort(409, detail=details)

try:
json.loads(resource['payload'])
except json.JSONDecodeError:
details = "Payload field doesn't contain valid JSON data."
base.abort(409, detail=details)

headers['Content-Type'] = "application/json"
r = requests.post(url, headers=headers, data=resource["payload"], stream=True, verify=verify)

else:
r = requests.get(url, headers=headers, stream=True, verify=verify)

if r.status_code == 401:
if 'oauth_req' in resource and resource['oauth_req'] == 'true':
details = 'ERROR 401 token expired. Retrieving new token, reload please.'
log.info(details)
toolkit.c.usertoken_refresh()
base.abort(409, detail=details)

elif 'oauth_req' not in resource or resource['oauth_req'] == 'false':
details = 'Authentication requested by server, please check resourece configuration.'
log.info(details)
base.abort(409, detail=details)

else:
r.raise_for_status()
base.response.content_type = r.headers['content-type']
base.response.charset = r.encoding

for chunk in r.iter_content(chunk_size=CHUNK_SIZE):
base.response.body_file.write(chunk)

except ValueError:
details = ''
base.abort(409, detail=details)
except requests.HTTPError:
details = 'Could not proxy ngsi_resource. We are working to resolve this issue as quickly as possible'
base.abort(409, detail=details)
except requests.ConnectionError:
details = 'Could not proxy ngsi_resource because a connection error occurred.'
base.abort(502, detail=details)
except requests.Timeout:
details = 'Could not proxy ngsi_resource because the connection timed out.'
base.abort(504, detail=details)
110 changes: 110 additions & 0 deletions ckanext/ngsiview/tests/test_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# -*- coding: utf-8 -*-

# Copyright (c) 2018 Future Internet Consulting and Development Solutions S.L.

# This file is part of ckanext-ngsiview.
#
# Ckanext-ngsiview is free software: you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# Ckanext-ngsiview is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
# General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with ckanext-ngsiview. If not, see http://www.gnu.org/licenses/.

import unittest

from mock import ANY, DEFAULT, patch
from parameterized import parameterized

from ckanext.ngsiview.controller import ProxyNGSIController


class NgsiViewControllerTestCase(unittest.TestCase):

@classmethod
def setUpClass(cls):
super(NgsiViewControllerTestCase, cls).setUpClass()
cls.controller = ProxyNGSIController()

@parameterized.expand([
({}, {}),
({"oauth_req": "true"}, {"X-Auth-Token": "valid-access-token"}),
({"tenant": "a"}, {"FIWARE-Service": "a"}),
({"service_path": "/a"}, {"FIWARE-ServicePath": "/a"}),
({"tenant": "a", "service_path": "/a,/b"}, {"FIWARE-Service": "a", "FIWARE-ServicePath": "/a,/b"}),
])
@patch.multiple("ckanext.ngsiview.controller", base=DEFAULT, logic=DEFAULT, requests=DEFAULT, toolkit=DEFAULT)
def test_basic_request(self, resource, headers, base, logic, requests, toolkit):
body = '{"json": "body"}'
resource['url'] = "http://cb.example.org/v2/entites"
logic.get_action('resource_show').return_value = resource
response = requests.get()
response.status_code = 200
response.headers['content-type'] = "application/json"
response.encoding = "UTF-8"
response.iter_content.return_value = (body,)
toolkit.c.usertoken = {
'access_token': "valid-access-token",
}

expected_headers = {
"Accept": "application/json",
}
expected_headers.update(headers)

self.controller.proxy_ngsi_resource("resource_id")

requests.get.assert_called_with(resource['url'], headers=expected_headers, stream=True, verify=True)
base.response.body_file.write.assert_called_with(body)

@parameterized.expand([
("relative/url",),
("http://#a",),
("ftp://example.com",),
("tatata:///da",),
])
@patch.multiple("ckanext.ngsiview.controller", base=DEFAULT, logic=DEFAULT, requests=DEFAULT, toolkit=DEFAULT)
def test_invalid_url_request(self, url, base, logic, requests, toolkit):
resource = {
'url': url,
}
logic.get_action('resource_show').return_value = resource
base.abort.side_effect = TypeError

with self.assertRaises(TypeError):
self.controller.proxy_ngsi_resource("resource_id")

base.abort.assert_called_with(409, detail=ANY)
requests.get.assert_not_called()

@parameterized.expand([
(True,),
(False,),
])
@patch.multiple("ckanext.ngsiview.controller", base=DEFAULT, logic=DEFAULT, requests=DEFAULT, toolkit=DEFAULT)
def test_auth_required_request(self, auth_configured, base, logic, requests, toolkit):
resource = {
'url': "http://cb.example.org/v2/entites",
'oauth_req': 'true' if auth_configured else 'false'
}
logic.get_action('resource_show').return_value = resource
response = requests.get()
response.status_code = 401
requests.get.reset_mock()
base.abort.side_effect = TypeError

with self.assertRaises(TypeError):
self.controller.proxy_ngsi_resource("resource_id")

base.abort.assert_called_once_with(409, detail=ANY)
requests.get.assert_called_once_with(resource['url'], headers=ANY, stream=True, verify=True)
if auth_configured:
toolkit.c.usertoken_refresh.assert_called_with()
else:
toolkit.c.usertoken_refresh.assert_not_called()

0 comments on commit d012396

Please sign in to comment.