Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
325 lines (276 sloc) 10.7 KB
import requests
from owslib.wfs import WebFeatureService
from openapi_spec_validator import openapi_v3_spec_validator
from GeoHealthCheck.probe import Probe
from GeoHealthCheck.result import Result, push_result
class WFS3Drilldown(Probe):
"""
Probe for WFS3 endpoint "drilldown": starting
with top endpoint: get Collections and do
GetItems on them etc. Using OWSLib.WebFeatureService.
TODO: needs finalization.
"""
NAME = 'WFS3 Drilldown'
DESCRIPTION = 'Traverses a OGC WFS3 (REST) API endpoint by drilling down'
RESOURCE_TYPE = 'OGC:WFS3'
REQUEST_METHOD = 'GET'
PARAM_DEFS = {
'drilldown_level': {
'type': 'string',
'description': 'How heavy the drilldown should be.\
basic: test presence endpoints, \
full: go through collections, fetch items',
'default': 'basic',
'required': True,
'range': ['basic', 'full']
}
}
"""Param defs"""
def __init__(self):
Probe.__init__(self)
def perform_request(self):
"""
Perform the drilldown.
See https://github.com/geopython/OWSLib/blob/
master/tests/doctests/wfs3_GeoServerCapabilities.txt
"""
wfs3 = None
collections = None
# 1. Test top endpoints existence
result = Result(True, 'Test Top Endpoints')
result.start()
try:
wfs3 = WebFeatureService(self._resource.url, version='3.0')
wfs3.conformance()
# TODO: OWSLib 0.17.1 has no call to '/api yet.
url = wfs3._build_url('api')
api = requests.get(url).json()
for attr in ['components', 'paths', 'openapi']:
val = api.get(attr, None)
if val is None:
msg = '/api: missing attr: %s' % attr
result = push_result(
self, result, False, msg, 'Test Collection')
continue
collections = wfs3.collections()
except Exception as err:
result.set(False, str(err))
result.stop()
self.result.add_result(result)
if self._parameters['drilldown_level'] == 'basic':
return
# ASSERTION: will do full drilldown from here
# 2. Test layers
# TODO: use parameters to work on less/more drilling
# "full" could be all layers.
result = Result(True, 'Test Collections')
result.start()
coll_name = ''
try:
for collection in collections:
coll_name = collection['name']
coll_name = coll_name.encode('utf-8')
try:
coll = wfs3.collection(coll_name)
# TODO: Maybe also add crs
for attr in ['name', 'links']:
val = coll.get(attr, None)
if val is None:
msg = '%s: missing attr: %s' \
% (coll_name, attr)
result = push_result(
self, result, False, msg, 'Test Collection')
continue
except Exception as e:
msg = 'GetCollection %s: OWSLib err: %s ' \
% (str(e), coll_name)
result = push_result(
self, result, False, msg, 'Test GetCollection')
continue
try:
items = wfs3.collection_items(coll_name, limit=1)
except Exception as e:
msg = 'GetItems %s: OWSLib err: %s ' % (str(e), coll_name)
result = push_result(
self, result, False, msg, 'Test GetItems')
continue
features = items.get('features', None)
if features is None:
msg = 'GetItems %s: No features attr' % coll_name
result = push_result(
self, result, False, msg, 'Test GetItems')
continue
type = items.get('type', '')
if type != 'FeatureCollection':
msg = '%s:%s type not FeatureCollection: %s' \
% (coll_name, type, val)
result = push_result(
self, result, False, msg, 'Test GetItems')
continue
if len(items['features']) > 0:
fid = items['features'][0]['id']
try:
item = wfs3.collection_item(coll_name, fid)
except Exception as e:
msg = 'GetItem %s: OWSLib err: %s' \
% (str(e), coll_name)
result = push_result(
self, result, False, msg, 'Test GetItem')
continue
for attr in \
['id', 'links', 'properties', 'geometry', 'type']:
val = item.get(attr, None)
if val is None:
msg = '%s:%s missing attr: %s' \
% (coll_name, str(fid), attr)
result = push_result(
self, result, False, msg, 'Test GetItem')
continue
if attr == 'type' and val != 'Feature':
msg = '%s:%s type not Feature: %s' \
% (coll_name, str(fid), val)
result = push_result(
self, result, False, msg, 'Test GetItem')
continue
except Exception as err:
result.set(False, 'Collection err: %s : e=%s'
% (coll_name, str(err)))
result.stop()
# Add to overall Probe result
self.result.add_result(result)
class WFS3OpenAPIValidator(Probe):
"""
Probe for WFS3 OpenAPI Spec Validation (/api endpoint).
Uses https://pypi.org/project/openapi-spec-validator/.
"""
NAME = 'WFS3 OpenAPI Validator'
DESCRIPTION = 'Validates WFS3 /api endpoint for OpenAPI compliance'
RESOURCE_TYPE = 'OGC:WFS3'
REQUEST_METHOD = 'GET'
"""Param defs"""
def __init__(self):
Probe.__init__(self)
def perform_request(self):
"""
Perform the validation.
Uses https://github.com/p1c2u/openapi-spec-validator on
the specfile (dict) returned from the OpenAPI endpoint.
"""
# Step 1 basic sanity check
result = Result(True, 'OpenAPI Validation Test')
result.start()
api_doc = None
try:
wfs3 = WebFeatureService(self._resource.url, version='3.0')
# TODO: OWSLib 0.17.1 has no call to '/api yet.
# Build endpoint URL (may have f=json etc)
api_url = wfs3._build_url('api')
# Get OpenAPI spec from endpoint as dict once
api_doc = requests.get(api_url).json()
# Basic sanity check
for attr in ['components', 'paths', 'openapi']:
val = api_doc.get(attr, None)
if val is None:
msg = '/api: missing attr: %s' % attr
result.set(False, msg)
break
except Exception as err:
result.set(False, str(err))
result.stop()
self.result.add_result(result)
# No use to proceed if OpenAPI basics not complied
if api_doc is None or result.success is False:
return
# ASSERTION: /api exists, next OpenAPI Validation
# Step 2 detailed OpenAPI Compliance test
result = Result(True, 'Validate OpenAPI Compliance')
result.start()
try:
# Call the openapi-spec-validator and iterate through errors
errors_iterator = openapi_v3_spec_validator.iter_errors(api_doc)
for error in errors_iterator:
# Add each validation error as separate Result object
result = push_result(
self, result, False,
str(error), 'OpenAPI Compliance Result')
except Exception as err:
result.set(False, 'OpenAPI Validation err: e=%s' % str(err))
result.stop()
# Add to overall Probe result
self.result.add_result(result)
# class WFS3Caps(Probe):
# """Probe for OGC WFS3 API main endpoint url"""
#
# NAME = 'OGC WFS3 API Capabilities'
# DESCRIPTION = 'Perform OGC WFS3 API Capabilities
# Operation and check validity'
# RESOURCE_TYPE = 'OGC:WFS3'
#
# REQUEST_METHOD = 'GET'
#
# # e.g. https://demo.pygeoapi.io/master/collections?f=json
# REQUEST_TEMPLATE = '/{endpoint}?f=json'
#
# def __init__(self):
# Probe.__init__(self)
#
# PARAM_DEFS = {
# 'endpoint': {
# 'type': 'string',
# 'description': 'The OGC WFS3 API service endpoint type',
# 'default': '/collections',
# 'required': True,
# 'range': ['collections', 'conformance', 'api']
# }
# }
# """Param defs"""
#
# CHECKS_AVAIL = {
# 'GeoHealthCheck.plugins.check.checks.HttpStatusNoError': {
# 'default': True
# },
# 'GeoHealthCheck.plugins.check.checks.JsonParse': {
# 'default': True
# }
# }
# """Check for OGC WFS3 API OGC WFS3 API service endpoint
# availability"""
#
#
# class WFS3Collection(Probe):
# """Probe for OGC WFS3 API main endpoint url"""
#
# NAME = 'OGC WFS3 API Capabilities'
# DESCRIPTION = 'Perform OGC WFS3 API Capabilities Operation and
# check validity'
# RESOURCE_TYPE = 'OGC:WFS3'
#
# REQUEST_METHOD = 'GET'
#
# def __init__(self):
# Probe.__init__(self)
#
# CHECKS_AVAIL = {
# 'GeoHealthCheck.plugins.check.checks.HttpStatusNoError': {
# 'default': True
# },
# 'GeoHealthCheck.plugins.check.checks.JsonParse': {
# 'default': True
# },
# 'GeoHealthCheck.plugins.check.checks.ContainsStrings': {
# 'default': True,
# 'set_params': {
# 'strings': {
# 'name': 'Must contain links to at least WFS3 Collections,
# Conformance and OpenAPI endpoint',
# 'value': ['links', 'href', '/collections',
# '/conformance', '/api']
# }
# }
# },
# }
# """
# Checks avail for all specific Caps checks.
# Optionally override Check.PARAM_DEFS using set_params
# e.g. with specific `value` or even `name`.
# """
You can’t perform that action at this time.