This repository has been archived by the owner on Mar 12, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
tasks.py
240 lines (180 loc) · 6.94 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# -*- coding: utf-8 -*-
"""
kepler.tasks
------------
This module provides a set of tasks that send data to different Web
services. Each task has the same signature::
task(job, data)
In most cases, the data will be an absolute path to a
`bag <https://tools.ietf.org/html/draft-kunze-bagit-10>`_.
"""
from __future__ import absolute_import
import tempfile
import uuid
from flask import current_app
from ogre.xml import FGDCParser
from lxml import etree
import pysolr
from kepler.bag import (get_fgdc, get_shapefile, get_geotiff, get_access,
get_shapefile_name)
from kepler.records import create_record, MitRecord
from kepler import sword
from kepler.utils import make_uuid
from kepler.extensions import db, solr as solr_session, geoserver, dspace
from kepler.parsers import MarcParser
from kepler.geo import compress, pyramid
try:
from itertools import imap as map
except ImportError:
pass
def index_shapefile(job, data):
"""Index an uploaded Shapefile in Solr.
:param job: :class:`~kepler.models.Job`
:param bag: absolute path to bag containing Shapefile
"""
access = get_access(data)
gs = _get_geoserver(access)
refs = {
'http://www.opengis.net/def/serviceType/ogc/wms': gs.wms_url,
'http://www.opengis.net/def/serviceType/ogc/wfs': gs.wfs_url,
}
uid = uuid.UUID(job.item.uri)
shp_name = get_shapefile_name(data)
layer_id = "%s:%s" % (gs.workspace, shp_name)
job.item.layer_id = layer_id
db.session.commit()
_index_from_fgdc(job, bag=data, dct_references_s=refs, uuid=str(uid),
layer_id_s=layer_id)
def index_geotiff(job, data):
"""Index an uploaded GeoTIFF file in Solr.
:param job: :class:`~kepler.models.Job`
:param bag: absolute path to bag containing GeoTIFF
"""
access = get_access(data)
gs = _get_geoserver(access)
refs = {
'http://www.opengis.net/def/serviceType/ogc/wms': gs.wms_url,
'http://schema.org/downloadUrl': job.item.tiff_url
}
uid = uuid.UUID(job.item.uri)
layer_id = "%s:%s" % (gs.workspace, uid)
job.item.layer_id = layer_id
db.session.commit()
_index_from_fgdc(job, bag=data, dct_references_s=refs, uuid=str(uid),
layer_id_s=layer_id)
def submit_to_dspace(job, data):
"""Upload GeoTIFF to DSpace.
.. note:: only runs if `Item.handle` has not previously been set.
:param job: :class:`~kepler.models.Job`
:param data: absolute path to bag containing GeoTIFF
"""
if not job.item.handle:
pkg = sword.SWORDPackage(uuid=job.item.uri)
tiff = get_geotiff(data)
pkg.datafiles.append(tiff)
pkg.metadata = _fgdc_to_mods(get_fgdc(data))
with tempfile.NamedTemporaryFile(suffix='.zip') as fp:
pkg.write(fp)
handle = sword.submit(current_app.config['SWORD_SERVICE_URL'],
fp.name)
job.item.handle = handle
db.session.commit()
def get_geotiff_url_from_dspace(job):
"""Retrieve the GeoTIFF URL from a DSpace Handle.
.. note:: assumes the OAI-ORE only contains a single TIFF.
:param job: :class:`~kepler.models.Job`
"""
handle = job.item.handle.replace('http://hdl.handle.net/', '')
ore_url = current_app.config['OAI_ORE_URL'] + handle + '/ore.xml'
r = dspace.session.get(ore_url)
r.raise_for_status()
doc = etree.fromstring(r.content)
tif_urls = doc.xpath(
'//rdf:Description/@rdf:about[contains(., ".tif")]',
namespaces={'rdf': 'http://www.w3.org/1999/02/22-rdf-syntax-ns#'})
if len(tif_urls) != 1:
raise Exception('Expected 1 tiff, found ' + str(len(tif_urls)))
else:
job.item.tiff_url = tif_urls[0]
db.session.commit()
def upload_shapefile(job, data):
"""Upload Shapefile to GeoServer.
:param job: :class:`~kepler.models.Job`
:param data: absolute path to bag containing Shapefile
"""
shp = get_shapefile(data)
access = get_access(data)
import_url = _upload_to_geoserver(shp, 'shapefile', access)
job.import_url = import_url
db.session.commit()
def upload_geotiff(job, data):
"""Upload GeoTIFF to GeoServer.
:param job: :class:`~kepler.models.Job`
:param data: absolute path to bag containing GeoTIFF
"""
tiff = get_geotiff(data)
access = get_access(data)
with tempfile.NamedTemporaryFile(suffix='.tif') as fp:
compress(tiff, fp.name)
pyramid(fp.name)
import_url = _upload_to_geoserver(fp.name, 'geotiff', access)
job.import_url = import_url
db.session.commit()
def index_marc_records(job, data):
_index_records(_load_marc_records(data))
def _index_from_fgdc(job, bag, **kwargs):
"""Index a GeoServer-bound layer from the attached FGDC metadata.
This will pull the FGDC metadata out of the Bag and add any necessary
fields before indexing in Solr.
:param job: :class:`~kepler.models.Job`
:param bag: absolute path to bag containing FGDC metadata
:param \**kwargs: additional fields to add to the record
"""
fgdc = get_fgdc(bag)
record = create_record(fgdc, FGDCParser, **kwargs)
_index_records([record.as_dict()])
def _upload_to_geoserver(data, filetype, access):
"""Uploads Shapefiles and GeoTIFFs to GeoServer.
This is a generic task for uploading data to GeoServer. You should
instead use the specific data type functions: :func:`~upload_shapefile`
and :func:`~upload_geotiff`.
:param job: :class:`~kepler.models.Job`
:param data: absolute path to either Shapefile or GeoTIFF
:param mimetype: one of ``application/zip`` or ``image/tiff``
"""
gs = _get_geoserver(access)
import_url = gs.put(data, filetype)
return import_url
def _index_records(records):
"""Indexes records in Solr.
:param records: iterator of dictionaries to be added to Solr
"""
solr = pysolr.Solr(current_app.config['SOLR_URL'])
solr.session = solr_session.session
solr.add(map(_prep_solr_record, records))
def _load_marc_records(data):
for record in MarcParser(data):
uid = make_uuid(record['_marc_id'],
current_app.config['UUID_NAMESPACE'])
record.update(uuid=uid)
yield MitRecord(**record).as_dict()
def _fgdc_to_mods(fgdc):
xslt = current_app.config['FGDC_MODS_XSLT']
xform = etree.XSLT(etree.parse(xslt))
doc = etree.parse(fgdc)
mods = xform(doc)
return etree.tostring(mods, encoding="unicode")
def _prep_solr_record(record):
"""Normalize field values for pysolr.
`pysolr` does not handle sets, so these need to be converted to
lists before being added to solr.
"""
return {k: _normalize_sets(v) for k, v in record.items()}
def _normalize_sets(value):
if isinstance(value, set):
return list(value)
return value
def _get_geoserver(access):
if access.lower() == 'restricted':
return geoserver.secure
return geoserver.public