Skip to content
This repository has been archived by the owner on Mar 12, 2020. It is now read-only.

Commit

Permalink
Add S3 and async geoserver support
Browse files Browse the repository at this point in the history
This changes the way jobs are created and run in kepler. Adding layers
to geoserver must be done asynchronously due to timeout problems. Jobs
are created initially without any data and queued. When the job is run
later, the data is fetched from S3.
  • Loading branch information
Mike Graves committed Jul 13, 2016
1 parent 2d40a5d commit e65780e
Show file tree
Hide file tree
Showing 20 changed files with 285 additions and 279 deletions.
13 changes: 8 additions & 5 deletions .travis.yml
Expand Up @@ -8,11 +8,14 @@ env:
- C_INCLUDE_PATH=$HOME/gdal/include
- LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HOME/gdal/lib
- PATH=$PATH:$HOME/gdal/bin
matrix:
- TOX_ENV=py27
- TOX_ENV=py33
- TOX_ENV=py34
- TOX_ENV=coveralls
matrix:
include:
- env: TOX_ENV=py27
- python: 3.5
env: TOX_ENV=py35
- env: TOX_ENV=coveralls
allow_failures:
- env: TOX_ENV=py35
cache:
directories:
- $HOME/gdal
Expand Down
3 changes: 2 additions & 1 deletion kepler/app.py
Expand Up @@ -7,7 +7,7 @@
import yaml
import rq_dashboard

from kepler.extensions import db, solr, geoserver, dspace, req
from kepler.extensions import db, solr, geoserver, dspace, req, s3
from kepler.job import job_blueprint
from kepler.item import item_blueprint
from kepler.layer import layer_blueprint
Expand Down Expand Up @@ -35,6 +35,7 @@ def register_extensions(app):
geoserver.init_app(app)
dspace.init_app(app)
req.init_app(app)
s3.init_app(app)
app.logger.info('Extensions registered')


Expand Down
5 changes: 5 additions & 0 deletions kepler/bag.py
Expand Up @@ -26,6 +26,11 @@ def get_shapefile_name(bag):
return os.path.splitext(os.path.basename(f))[0]


def get_geotiff_name(bag):
gt = get_geotiff(bag)
return os.path.splitext(os.path.basename(gt))[0]


def get_geotiff(bag):
return _extract_data(bag, '.tif')

Expand Down
18 changes: 18 additions & 0 deletions kepler/extensions.py
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import

import boto3
from flask.ext.sqlalchemy import SQLAlchemy
import redis
import requests
Expand Down Expand Up @@ -70,8 +71,25 @@ def init_app(self, app):
self.q = Queue(connection=conn, async=async)


class S3(BaseExtension):
def init_app(self, app):
s3_url = app.config.get('S3_TEST_URL')
kwargs = {}
if s3_url:
kwargs = {
'endpoint_url': s3_url,
'config': boto3.session.Config(s3={'addressing_style': 'virtual'})
}
self.client = boto3.client('s3',
aws_access_key_id=app.config.get('S3_ACCESS_KEY_ID'),
aws_secret_access_key=app.config.get('S3_SECRET_ACCESS_KEY'),
**kwargs
)


db = SQLAlchemy()
solr = Solr()
geoserver = GeoServer()
dspace = DSpace()
req = RQ()
s3 = S3()
101 changes: 60 additions & 41 deletions kepler/geoserver.py
Expand Up @@ -9,11 +9,34 @@ def __init__(self, session, url, workspace, datastore):
self.url = url
self.workspace = workspace
self.datastore = datastore
self.shapefile_import = {
'import': {
'targetStore': {
'dataStore': {
'name': self.datastore
}
},
'targetWorkspace': {
'workspace': {
'name': self.workspace
}
}
}
}
self.geotiff_import = {
'import': {
'targetWorkspace': {
'workspace': {
'name': self.workspace
}
}
}
}

@property
def service_url(self):
url = self.url.rstrip('/') + '/'
return '%srest/workspaces/%s/' % (url, self.workspace)
url = self.url.rstrip('/')
return '{}/rest/imports/'.format(url)

@property
def wms_url(self):
Expand All @@ -23,47 +46,43 @@ def wms_url(self):
def wfs_url(self):
return '%s/wfs' % self.url.rstrip('/')

def put(self, id, data, mimetype):
url = self._put_url(id, mimetype)
headers = {'Content-type': mimetype}
with io.open(data, 'rb') as fp:
r = self.session.put(url, data=fp, headers=headers)
r.raise_for_status()
return self.layer_id(url.rsplit('/', 1)[0], mimetype)

def layer_id(self, url, mimetype):
if mimetype == 'application/zip':
name = self.feature_type_name(self.feature_types(url))
elif mimetype == 'image/tiff':
name = self.coverage_name(self.coverages(url))
return '%s:%s' % (self.workspace, name)

def feature_types(self, url):
json = self._get_json(url)
return json['dataStore']['featureTypes']
def put(self, data, ftype, name):
if ftype == 'shapefile':
import_url = self._create_import(self.shapefile_import)
elif ftype == 'geotiff':
import_url = self._create_import(self.geotiff_import)
else:
raise Exception('unknown format')
task_url = self._create_upload_task(
'{}/tasks'.format(import_url.rstrip('/')), data)
self._set_task_method(task_url, name, ftype)
self._run_import(import_url)
return import_url

def feature_type_name(self, url):
json = self._get_json(url)
return json['featureTypes']['featureType'][0]['name']

def coverages(self, url):
json = self._get_json(url)
return json['coverageStore']['coverages']
def _create_import(self, data):
r = self.session.post(self.service_url, json=data)
r.raise_for_status()
return r.headers.get('location')

def coverage_name(self, url):
json = self._get_json(url)
return json['coverages']['coverage'][0]['name']
def _run_import(self, url):
r = self.session.post('{}?async=true'.format(url))
r.raise_for_status()

def _get_json(self, url):
r = self.session.get(url, headers={'Accept': 'application/json'})
print(r.content)
def _create_upload_task(self, url, data):
with io.open(data, 'rb') as fp:
r = self.session.post(url, files={'filedata': fp})
r.raise_for_status()
return r.json()
return r.headers.get('location')

def _put_url(self, id, mimetype):
if mimetype == 'application/zip':
return '%sdatastores/%s/file.shp' % (self.service_url,
self.datastore)
elif mimetype == 'image/tiff':
return '%scoveragestores/%s/file.geotiff' % (self.service_url,
id)
def _set_task_method(self, url, name, ftype):
shape = '{}/rest/workspaces/{}/datastores/{}/featuretypes/{}'
tiff = '{}/rest/workspaces/{}/coveragestores/{}/coverages/{}'
if ftype == 'shapefile':
r = self.session.get(shape.format(self.url.rstrip('/'),
self.workspace, self.datastore,
name))
elif ftype == 'geotiff':
r = self.session.get(tiff.format(self.url.rstrip('/'),
self.workspace, name, name))
if r.status_code == 200:
self.session.put(url, json={'task': {'updateMode': 'REPLACE'}})
81 changes: 38 additions & 43 deletions kepler/jobs.py
@@ -1,61 +1,56 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import
import os
import shutil
import tempfile
import traceback

from blinker import signal
from flask import current_app

from kepler.extensions import db
from kepler.bag import unpack, get_datatype
from kepler.extensions import db, s3
from kepler.models import Job, Item, get_or_create
from kepler.tasks import (index_shapefile, upload_shapefile, index_geotiff,
upload_geotiff, submit_to_dspace)


job_failed = signal('job-failed')
job_completed = signal('job-completed')
def fetch_bag(bucket, key):
tmp = tempfile.NamedTemporaryFile(delete=False)
s3.client.download_file(bucket, key, tmp.name)
return tmp.name


@job_failed.connect
def failure(sender, **kwargs):
job = kwargs['job']
shutil.rmtree(sender.data, ignore_errors=True)
job.status = u'FAILED'
db.session.commit()


@job_completed.connect
def completed(sender, **kwargs):
job = kwargs['job']
shutil.rmtree(sender.data, ignore_errors=True)
job.status = u'COMPLETED'
def create_job(uri):
item = get_or_create(Item, uri=uri)
job = Job(item=item, status=u'CREATED')
db.session.add(job)
db.session.commit()
return job


def create_job(uri, data, task_list, access):
item = get_or_create(Item, uri=uri, access=access)
job = Job(item=item, status=u'PENDING')
db.session.add(job)
db.session.commit()
def run_job(id):
job = Job.query.get(id)
bucket = current_app.config['S3_BUCKET']
key = job.item.uri
data = fetch_bag(bucket, key)
tmpdir = tempfile.mkdtemp()
try:
return JobRunner(job.id, data, task_list)
except Exception:
bag = unpack(data, tmpdir)
datatype = get_datatype(bag)
if datatype == 'shapefile':
tasks = [upload_shapefile, index_shapefile, ]
elif datatype == 'geotiff':
tasks = [upload_geotiff, submit_to_dspace, index_geotiff, ]
else:
raise UnsupportedFormat(datatype)
for task in tasks:
task(job, bag)
job.status = u'PENDING'
except:
job.status = u'FAILED'
db.session.commit()
raise


class JobRunner(object):
def __init__(self, job_id, data, task_list):
self.tasks = task_list
self.job_id = job_id
self.data = data

def __call__(self):
job = Job.query.get(self.job_id)
try:
for task in self.tasks:
task(job, self.data)
job_completed.send(self, job=job)
except Exception:
current_app.logger.warn(traceback.format_exc())
job_failed.send(self, job=job)
raise
current_app.logger.warn(traceback.format_exc())
finally:
db.session.commit()
shutil.rmtree(tmpdir, ignore_errors=True)
os.remove(data)
29 changes: 6 additions & 23 deletions kepler/layer/views.py
Expand Up @@ -12,40 +12,23 @@
from kepler.bag import unpack, get_datatype, get_access
from kepler.exceptions import UnsupportedFormat
from kepler.extensions import req
from kepler.jobs import create_job
from kepler.jobs import create_job, run_job
from kepler.tasks import (upload_shapefile, index_shapefile, upload_geotiff,
index_geotiff, submit_to_dspace,)


class LayerView(View):
def dispatch_request(self, *args, **kwargs):
if request.method == 'POST':
if request.method == 'PUT':
return self.create(*args, **kwargs)

def create(self, *args, **kwargs):
data = request.files['file']
uid = os.path.splitext(os.path.basename(data.filename))[0]
uri = uuid.UUID(uid).urn
tmpdir = tempfile.mkdtemp()
try:
bag = unpack(data, tmpdir)
datatype = get_datatype(bag)
if datatype == 'shapefile':
tasks = [upload_shapefile, index_shapefile, ]
elif datatype == 'geotiff':
tasks = [upload_geotiff, submit_to_dspace, index_geotiff, ]
else:
raise UnsupportedFormat(datatype)
access = get_access(bag)
job = create_job(uri, bag, tasks, access)
except:
shutil.rmtree(tmpdir, ignore_errors=True)
raise
req.q.enqueue(job)
def create(self, id):
job = create_job(id)
req.q.enqueue(run_job, job.id)
return '', 201

@classmethod
def register(cls, app, endpoint, url):
view_func = client_auth_required(cls.as_view(endpoint))
app.add_url_rule(url, 'resource', methods=['POST'],
app.add_url_rule('{}<path:id>'.format(url), 'resource', methods=['PUT'],
view_func=view_func)
5 changes: 3 additions & 2 deletions kepler/models.py
Expand Up @@ -16,8 +16,9 @@ def get_or_create(Model, **kwargs):

class Job(db.Model):
id = db.Column(db.Integer, primary_key=True)
status = db.Column(db.Enum(u'PENDING', u'COMPLETED', u'FAILED',
name='status'), default=u'PENDING')
status = db.Column(db.Enum(u'CREATED', u'PENDING', u'COMPLETED', u'FAILED',
name='status'), default=u'CREATED')
import_url = db.Column(db.String())
time = db.Column(db.DateTime(timezone=True), default=datetime.now)
item_id = db.Column(db.Integer, db.ForeignKey('item.id'))

Expand Down
6 changes: 6 additions & 0 deletions kepler/settings.py
Expand Up @@ -34,6 +34,9 @@ def __init__(self):
self.REDISTOGO_URL = os.environ['REDISTOGO_URL']
self.REDIS_URL = os.environ['REDISTOGO_URL']
self.RQ_POLL_INTERVAL = 2500
self.S3_BUCKET = os.environ['S3_BUCKET']
self.S3_ACCESS_KEY_ID = os.environ['S3_ACCESS_KEY_ID']
self.S3_SECRET_ACCESS_KEY = os.environ['S3_SECRET_ACCESS_KEY']


class TestConfig(DefaultConfig):
Expand All @@ -54,3 +57,6 @@ class TestConfig(DefaultConfig):
GEOSERVER_AUTH_USER = 'username'
GEOSERVER_AUTH_PASS = 'password'
REDISTOGO_URL = 'redis://localhost:6379'
S3_BUCKET = 'test_bucket'
S3_ACCESS_KEY_ID = 'test_access_key_id'
S3_SECRET_ACCESS_KEY = 'test_secret_access_key'

0 comments on commit e65780e

Please sign in to comment.