Skip to content
This repository has been archived by the owner on Jun 19, 2023. It is now read-only.

Commit

Permalink
Latest work on post-processing
Browse files Browse the repository at this point in the history
  • Loading branch information
David Read committed Apr 4, 2016
1 parent 4505c11 commit e1839fd
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 57 deletions.
5 changes: 3 additions & 2 deletions ckanext/dgu/bin/common.py
Expand Up @@ -4,7 +4,7 @@ class ScriptError(Exception):
pass


def get_ckanapi(config_ini_or_ckan_url):
def get_ckanapi(config_ini_or_ckan_url, **kwargs):
'''Given a config.ini filepath or a remote CKAN URL, returns a ckanapi
instance that you can use to call action commands
'''
Expand All @@ -30,7 +30,8 @@ def get_ckanapi(config_ini_or_ckan_url):
sys.exit(1)
ckan = ckanapi.RemoteCKAN(ckan_url,
apikey=apikey,
user_agent='dgu script')
user_agent='dgu script',
**kwargs)
else:
# must be a config.ini filepath
load_config(config_ini_or_ckan_url)
Expand Down
4 changes: 3 additions & 1 deletion ckanext/dgu/celery_import.py
@@ -1,2 +1,4 @@
def task_imports():
return ['ckanext.dgu.tasks']
return ['ckanext.dgu.tasks',
'ckanext.dgu.gemini_postprocess_tasks',
]
146 changes: 108 additions & 38 deletions ckanext/dgu/gemini_postprocess.py
Expand Up @@ -5,10 +5,15 @@
import httplib
from lxml import etree
import traceback
import urlparse
import urllib
import json

from owslib import wms as owslib_wms

from ckan.common import OrderedDict
import ckan.plugins as p
from ckan import logic

log = logging.getLogger(__name__)

Expand All @@ -19,17 +24,56 @@ def is_id(id_string):
return bool(re.match(reg_ex, id_string))


def process_resource(ckan_ini_filepath, resource_id, queue):
def hash_a_dict(dict_):
return json.dumps(dict_, sort_keys=True)


def process_package_(package_id):
from ckan import model

# Using default CKAN schema instead of DGU, because we will write it back
# in the same way in a moment. However it changes formats to lowercase.
context_ = {'model': model, 'ignore_auth': True, 'session': model.Session,
#'schema': logic.schema.default_show_package_schema()
}
package = p.toolkit.get_action('package_show')(context_, {'id': package_id})
package_changed = None

for resource in package.get('individual_resources', []) + \
package.get('timeseries_resources', []) + \
package.get('additional_resources', []):
log.info('Processing package=%s resource=%s',
package['name'], resource['id'][:4])
resource_hash_before = hash_a_dict(resource)
process_resource(resource)
if not package_changed:
resource_changed = hash_a_dict(resource) != resource_hash_before
if resource_changed:
package_changed = True
if package_changed:
log.info('Writing dataset changes')
# use default schema so that format can be missing
user = p.toolkit.get_action('get_site_user')({'ignore_auth': True}, {})
context = {'model': model,
'session': model.Session,
'ignore_auth': True,
'user': user['name'],
#'schema': logic.schema.default_update_package_schema()
}
p.toolkit.get_action('package_update')(context, package)
else:
log.info('Writing dataset changes')


def process_resource(resource):
'''
Edits resource in-place.
'''
#log = process_resource.get_logger()
#load_config(ckan_ini_filepath)
#register_translator()

from ckan import model
from pylons import config

assert is_id(resource_id), resource_id
context_ = {'model': model, 'ignore_auth': True, 'session': model.Session}
resource = p.toolkit.get_action('resource_show')(context_, {'id': resource_id})
url = resource['url']

# Check if the service is a view service
is_wms = _is_wms(url)
Expand All @@ -38,7 +82,7 @@ def process_resource(ckan_ini_filepath, resource_id, queue):
#resource['verified_date'] = datetime.now().isoformat()
base_urls = _wms_base_urls(url)
resource['wms_base_urls'] = ' '.join(base_urls)
resource_format = 'WMS'
resource['format'] = 'WMS'


def _is_wms(url):
Expand All @@ -47,39 +91,66 @@ def _is_wms(url):
'''
# Try WMS 1.3 as that is what INSPIRE expects
is_wms = _try_wms_url(url, version='1.3')
# First try using WMS 1.1.1 as that is very common
if not is_wms:
# is_wms None means socket timeout, so don't bother trying again
if is_wms is False:
# Try using WMS 1.1.1 as that is very common
is_wms = _try_wms_url(url, version='1.1.1')
log.debug('WMS check result: %s', is_wms)
return is_wms


# Like owslib_wms.WMSCapabilitiesReader(version=version).capabilities_url only
# it deals with uppercase param keys too!
def wms_capabilities_url(url):
qs = []
if service_url.find('?') != -1:
qs = cgi.parse_qsl(service_url.split('?')[1])
def strip_session_id(url):
return re.sub(';jsessionid=[^/\?]+', ';jsessionid=', url)


def get_wms_base_url(url):
return strip_session_id(url.split('?')[0])


# Like owslib_wms.WMSCapabilitiesReader(version=version).capabilities_url only:
# * it deals with uppercase param keys too!
# * version is configurable or can be not included at all
def wms_capabilities_url(url, version=None):
'''Given what is assumed to be a WMS base URL, adds any missing parameters
to cajole it to work ('service' & 'request'). The 'version' parameter is
man-handled to be what you specify or removed if necessary.
'''
if url.find('?') != -1:
param_list = urlparse.parse_qsl(url.split('?')[1])
params = OrderedDict(param_list)
else:
params = OrderedDict()
params_lower = (param.lower() for param in params)

params = [x[0] for x in qs]
if 'service' not in params_lower:
params['service'] = 'WMS'
if 'request' not in params_lower:
params['request'] = 'GetCapabilities'

if 'service' not in params:
qs.append(('service', 'WMS'))
if 'request' not in params:
qs.append(('request', 'GetCapabilities'))
if 'version' not in params:
qs.append(('version', self.version))
if 'version' in params:
del params['version']
if 'VERSION' in params:
del params['VERSION']
if version:
params['version'] = version

urlqs = urlencode(tuple(qs))
return service_url.split('?')[0] + '?' + urlqs
urlqs = urllib.urlencode(params)
return url.split('?')[0] + '?' + urlqs


def _try_wms_url(url, version='1.3'):
# Here's a neat way to run this manually:
# python -c "import logging; logging.basicConfig(level=logging.INFO); from ckanext.dgu.gemini_postprocess import _try_wms_url; print _try_wms_url('http://soilbio.nerc.ac.uk/datadiscovery/WebPage5.aspx')"
import pdb; pdb.set_trace()
'''Does a GetCapabilities request and returns whether it responded ok.
Returns:
True - got a WMS response that isn't a ServiceException
False - got a different response, or got HTTP/WMS error
None - socket timeout - host is simply not responding, and is so slow communicating there is no point trying it again
'''

try:
capabilities_url = wms_capabilities_url(url)
capabilities_url = wms_capabilities_url(url, version)
log.debug('WMS check url: %s', capabilities_url)
try:
res = urllib2.urlopen(capabilities_url, None, 10)
Expand All @@ -93,7 +164,7 @@ def _try_wms_url(url, version='1.3'):
return False
except socket.timeout, e:
log.info('WMS check for %s failed due to HTTP connection timeout error "%s".', capabilities_url, e)
return False
return None
except socket.error, e:
log.info('WMS check for %s failed due to HTTP socket connection error "%s".', capabilities_url, e)
return False
Expand Down Expand Up @@ -151,19 +222,18 @@ def _try_wms_url(url, version='1.3'):


def _wms_base_urls(url):
'''Given a WMS URL this method returns the base URLs it uses. It does
it by making basic WMS requests.
'''Given a WMS URL this method returns the base URLs it uses (so that they
can be proxied when previewing it). It does it by making basic WMS
requests.
'''
# Here's a neat way to test this manually:
# python -c "import logging; logging.basicConfig(level=logging.INFO); from ckanext.spatial.harvesters.gemini import GeminiSpatialHarvester; print GeminiSpatialHarvester._wms_base_urls('http://www.ordnancesurvey.co.uk/oswebsite/xml/atom/')"
try:
capabilities_url = wms_capabilities_url(url)
# Get rid of the "version=1.1.1" param that OWSLIB adds, because
# the OS WMS previewer doesn't specify a version, so may receive
# later versions by default. And versions like 1.3 may have
# different base URLs. It does mean that we can't use OWSLIB to parse
# the result though.
capabilities_url = re.sub('&version=[^&]+', '', capabilities_url)
capabilities_url = wms_capabilities_url(url, version=None)
# We don't want a "version" param, because the OS WMS previewer doesn't
# specify a version, so may receive later versions by default. And
# versions like 1.3 may have different base URLs. It does mean that we
# can't use OWSLIB to parse the result though.
try:
log.debug('WMS base url check: %s', capabilities_url)
res = urllib2.urlopen(capabilities_url, None, 10)
Expand Down Expand Up @@ -201,7 +271,7 @@ def _wms_base_urls(url):
urls = xml_tree.xpath(xpath, namespaces=namespaces)
for url in urls:
if url:
base_url = url.split('?')[0]
base_url = get_wms_base_url(url)
base_urls.add(base_url)
log.info('Extra WMS base urls: %r', base_urls)
return base_urls
Expand Down
16 changes: 2 additions & 14 deletions ckanext/dgu/gemini_postprocess_tasks.py
@@ -1,10 +1,9 @@
import os


from ckan.lib.celery_app import celery
import ckan.plugins as p

from ckanext.dgu.gemini_postprocess import process_resource
from ckanext.dgu.gemini_postprocess import process_package_


def create_package_task(package, queue):
Expand All @@ -25,8 +24,6 @@ def process_package(ckan_ini_filepath, package_id, queue='bulk'):
'''
Archive a package.
'''
from ckan import model

load_config(ckan_ini_filepath)
register_translator()

Expand All @@ -37,12 +34,7 @@ def process_package(ckan_ini_filepath, package_id, queue='bulk'):
# Also put try/except around it is easier to monitor ckan's log rather than
# celery's task status.
try:
context_ = {'model': model, 'ignore_auth': True, 'session': model.Session}
package = p.toolkit.get_action('package_show')(context_, {'id': package_id})

for resource in package['resources']:
resource_id = resource['id']
process_resource(ckan_ini_filepath, resource_id, queue)
process_package_(package_id)
except Exception, e:
if os.environ.get('DEBUG'):
raise
Expand All @@ -51,10 +43,6 @@ def process_package(ckan_ini_filepath, package_id, queue='bulk'):
e, package_id, package['name'] if 'package' in dir() else '')
raise

# Refresh the index for this dataset, so that it contains the latest
# archive info.
_update_search_index(package_id, log)


def load_config(ckan_ini_filepath):
import paste.deploy
Expand Down
4 changes: 2 additions & 2 deletions ckanext/dgu/plugin.py
Expand Up @@ -581,7 +581,7 @@ def get_auth_functions(self):
}


class DguSpatialPlugin(p.SingletonPlugin, p.toolkit.DefaultDatasetForm):
class DguSpatialPlugin(p.SingletonPlugin):
"""
DGU-specific bits to do with spatial that are not in ckanext-spatial
"""
Expand All @@ -599,6 +599,6 @@ def notify(self, entity, operation=None):

# TODO avoid infinite loop

log.debug('Notified of UKLP package event: %s %s', entity.id, operation)
log.debug('Notified of UKLP package event: %s %s', pkg.name, operation)

gemini_postprocess_tasks.create_package_task(entity, 'priority')

0 comments on commit e1839fd

Please sign in to comment.