Skip to content
This repository has been archived by the owner on Mar 12, 2020. It is now read-only.

Commit

Permalink
Checkpoint #2
Browse files Browse the repository at this point in the history
  • Loading branch information
Mike Graves committed Jul 7, 2016
1 parent e812e31 commit 6d6e7cf
Show file tree
Hide file tree
Showing 16 changed files with 159 additions and 263 deletions.
3 changes: 2 additions & 1 deletion kepler/app.py
Expand Up @@ -7,7 +7,7 @@
import yaml
import rq_dashboard

from kepler.extensions import db, solr, geoserver, dspace, req
from kepler.extensions import db, solr, geoserver, dspace, req, s3
from kepler.job import job_blueprint
from kepler.item import item_blueprint
from kepler.layer import layer_blueprint
Expand Down Expand Up @@ -35,6 +35,7 @@ def register_extensions(app):
geoserver.init_app(app)
dspace.init_app(app)
req.init_app(app)
s3.init_app(app)
app.logger.info('Extensions registered')


Expand Down
9 changes: 9 additions & 0 deletions kepler/extensions.py
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import

import boto3
from flask.ext.sqlalchemy import SQLAlchemy
import redis
import requests
Expand Down Expand Up @@ -70,8 +71,16 @@ def init_app(self, app):
self.q = Queue(connection=conn, async=async)


class S3(BaseExtension):
def init_app(self, app):
self.client = boto3.client('s3',
aws_access_key_id=app.config.get('S3_ACCESS_KEY_ID'),
aws_secret_access_key=app.config.get('S3_SECRET_ACCESS_KEY'))


db = SQLAlchemy()
solr = Solr()
geoserver = GeoServer()
dspace = DSpace()
req = RQ()
s3 = S3()
51 changes: 3 additions & 48 deletions kepler/geoserver.py
Expand Up @@ -35,7 +35,7 @@ def __init__(self, session, url, workspace, datastore):

@property
def service_url(self):
url = self.url.rstrip('/') + '/'
url = self.url.rstrip('/')
return '{}/rest/imports/'.format(url)

@property
Expand All @@ -56,8 +56,9 @@ def put(self, data, ftype):
self._create_upload_task('{}/tasks'.format(import_url.rstrip('/')),
data)
self._run_import(import_url)
return import_url

def _create_import(self):
def _create_import(self, data):
r = self.session.post(self.service_url, json=data)
r.raise_for_status()
return r.headers.get('location')
Expand All @@ -70,49 +71,3 @@ def _create_upload_task(self, url, data):
with io.open(data, 'rb') as fp:
r = self.session.post(url, files={'filedata': fp})
r.raise_for_status()


def put(self, id, data, mimetype):
url = self._put_url(id, mimetype)
headers = {'Content-type': mimetype}
with io.open(data, 'rb') as fp:
r = self.session.put(url, data=fp, headers=headers)
r.raise_for_status()
return self.layer_id(url.rsplit('/', 1)[0], mimetype)

def layer_id(self, url, mimetype):
if mimetype == 'application/zip':
name = self.feature_type_name(self.feature_types(url))
elif mimetype == 'image/tiff':
name = self.coverage_name(self.coverages(url))
return '%s:%s' % (self.workspace, name)

def feature_types(self, url):
json = self._get_json(url)
return json['dataStore']['featureTypes']

def feature_type_name(self, url):
json = self._get_json(url)
return json['featureTypes']['featureType'][0]['name']

def coverages(self, url):
json = self._get_json(url)
return json['coverageStore']['coverages']

def coverage_name(self, url):
json = self._get_json(url)
return json['coverages']['coverage'][0]['name']

def _get_json(self, url):
r = self.session.get(url, headers={'Accept': 'application/json'})
print(r.content)
r.raise_for_status()
return r.json()

def _put_url(self, id, mimetype):
if mimetype == 'application/zip':
return '%sdatastores/%s/file.shp' % (self.service_url,
self.datastore)
elif mimetype == 'image/tiff':
return '%scoveragestores/%s/file.geotiff' % (self.service_url,
id)
81 changes: 38 additions & 43 deletions kepler/jobs.py
@@ -1,61 +1,56 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import
import os
import shutil
import tempfile
import traceback

from blinker import signal
from flask import current_app

from kepler.extensions import db
from kepler.bag import unpack, get_datatype
from kepler.extensions import db, s3
from kepler.models import Job, Item, get_or_create
from kepler.tasks import (index_shapefile, upload_shapefile, index_geotiff,
upload_geotiff, submit_to_dspace)


job_failed = signal('job-failed')
job_completed = signal('job-completed')
def fetch_bag(bucket, key):
tmp = tempfile.NamedTemporaryFile(delete=False)
s3.client.download_file(bucket, key, tmp.name)
return tmp.name


@job_failed.connect
def failure(sender, **kwargs):
job = kwargs['job']
shutil.rmtree(sender.data, ignore_errors=True)
job.status = u'FAILED'
db.session.commit()


@job_completed.connect
def completed(sender, **kwargs):
job = kwargs['job']
shutil.rmtree(sender.data, ignore_errors=True)
job.status = u'COMPLETED'
def create_job(uri):
item = get_or_create(Item, uri=uri)
job = Job(item=item, status=u'CREATED')
db.session.add(job)
db.session.commit()
return job


def create_job(uri, data, task_list, access):
item = get_or_create(Item, uri=uri, access=access)
job = Job(item=item, status=u'PENDING')
db.session.add(job)
db.session.commit()
def run_job(id):
job = Job.query.get(id)
bucket = current_app.config['S3_BUCKET']
key = job.item.uri
data = fetch_bag(bucket, key)
tmpdir = tempfile.mkdtemp()
try:
return JobRunner(job.id, data, task_list)
except Exception:
bag = unpack(data, tmpdir)
datatype = get_datatype(bag)
if datatype == 'shapefile':
tasks = [upload_shapefile, index_shapefile, ]
elif datatype == 'geotiff':
tasks = [upload_geotiff, submit_to_dspace, index_geotiff, ]
else:
raise UnsupportedFormat(datatype)
for task in tasks:
task(job, bag)
job.status = u'PENDING'
except:
job.status = u'FAILED'
db.session.commit()
raise


class JobRunner(object):
def __init__(self, job_id, data, task_list):
self.tasks = task_list
self.job_id = job_id
self.data = data

def __call__(self):
job = Job.query.get(self.job_id)
try:
for task in self.tasks:
task(job, self.data)
job_completed.send(self, job=job)
except Exception:
current_app.logger.warn(traceback.format_exc())
job_failed.send(self, job=job)
raise
current_app.logger.warn(traceback.format_exc())
finally:
db.session.commit()
shutil.rmtree(tmpdir, ignore_errors=True)
os.remove(data)
29 changes: 6 additions & 23 deletions kepler/layer/views.py
Expand Up @@ -12,40 +12,23 @@
from kepler.bag import unpack, get_datatype, get_access
from kepler.exceptions import UnsupportedFormat
from kepler.extensions import req
from kepler.jobs import create_job
from kepler.jobs import create_job, run_job
from kepler.tasks import (upload_shapefile, index_shapefile, upload_geotiff,
index_geotiff, submit_to_dspace,)


class LayerView(View):
def dispatch_request(self, *args, **kwargs):
if request.method == 'POST':
if request.method == 'PUT':
return self.create(*args, **kwargs)

def create(self, *args, **kwargs):
data = request.files['file']
uid = os.path.splitext(os.path.basename(data.filename))[0]
uri = uuid.UUID(uid).urn
tmpdir = tempfile.mkdtemp()
try:
bag = unpack(data, tmpdir)
datatype = get_datatype(bag)
if datatype == 'shapefile':
tasks = [upload_shapefile, index_shapefile, ]
elif datatype == 'geotiff':
tasks = [upload_geotiff, submit_to_dspace, index_geotiff, ]
else:
raise UnsupportedFormat(datatype)
access = get_access(bag)
job = create_job(uri, bag, tasks, access)
except:
shutil.rmtree(tmpdir, ignore_errors=True)
raise
req.q.enqueue(job)
def create(self, id):
job = create_job(id)
req.q.enqueue(run_job, job.id)
return '', 201

@classmethod
def register(cls, app, endpoint, url):
view_func = client_auth_required(cls.as_view(endpoint))
app.add_url_rule(url, 'resource', methods=['POST'],
app.add_url_rule('{}<path:id>'.format(url), 'resource', methods=['PUT'],
view_func=view_func)
4 changes: 2 additions & 2 deletions kepler/models.py
Expand Up @@ -16,8 +16,8 @@ def get_or_create(Model, **kwargs):

class Job(db.Model):
id = db.Column(db.Integer, primary_key=True)
status = db.Column(db.Enum(u'PENDING', u'COMPLETED', u'FAILED',
name='status'), default=u'PENDING')
status = db.Column(db.Enum(u'CREATED', u'PENDING', u'COMPLETED', u'FAILED',
name='status'), default=u'CREATED')
time = db.Column(db.DateTime(timezone=True), default=datetime.now)
item_id = db.Column(db.Integer, db.ForeignKey('item.id'))

Expand Down
3 changes: 3 additions & 0 deletions kepler/settings.py
Expand Up @@ -54,3 +54,6 @@ class TestConfig(DefaultConfig):
GEOSERVER_AUTH_USER = 'username'
GEOSERVER_AUTH_PASS = 'password'
REDISTOGO_URL = 'redis://localhost:6379'
S3_BUCKET = 'test_bucket'
S3_ACCESS_KEY_ID = 'test_access_key_id'
S3_SECRET_ACCESS_KEY = 'test_secret_access_key'
19 changes: 13 additions & 6 deletions kepler/tasks.py
Expand Up @@ -21,7 +21,7 @@
from lxml import etree
import pysolr

from kepler.bag import (get_fgdc, get_shapefile, get_geotiff,
from kepler.bag import (get_fgdc, get_shapefile, get_geotiff, get_access,
get_shapefile_name)
from kepler.records import create_record, MitRecord
from kepler import sword
Expand All @@ -43,7 +43,8 @@ def index_shapefile(job, data):
:param bag: absolute path to bag containing Shapefile
"""

gs = _get_geoserver(job.item.access)
access = get_access(data)
gs = _get_geoserver(access)
refs = {
'http://www.opengis.net/def/serviceType/ogc/wms': gs.wms_url,
'http://www.opengis.net/def/serviceType/ogc/wfs': gs.wfs_url,
Expand All @@ -64,7 +65,8 @@ def index_geotiff(job, data):
:param bag: absolute path to bag containing GeoTIFF
"""

gs = _get_geoserver(job.item.access)
access = get_access(data)
gs = _get_geoserver(access)
refs = {
'http://www.opengis.net/def/serviceType/ogc/wms': gs.wms_url,
'http://schema.org/downloadUrl': job.item.tiff_url
Expand Down Expand Up @@ -131,7 +133,9 @@ def upload_shapefile(job, data):

shp = get_shapefile(data)
access = get_access(data)
_upload_to_geoserver(shp, 'shapefile', access)
import_url = _upload_to_geoserver(shp, 'shapefile', access)
job.import_url = import_url
db.session.commit()


def upload_geotiff(job, data):
Expand All @@ -146,7 +150,9 @@ def upload_geotiff(job, data):
with tempfile.NamedTemporaryFile(suffix='.tif') as fp:
compress(tiff, fp.name)
pyramid(fp.name)
_upload_to_geoserver(fp.name, 'geotiff', access)
import_url = _upload_to_geoserver(fp.name, 'geotiff', access)
job.import_url = import_url
db.session.commit()


def index_marc_records(job, data):
Expand Down Expand Up @@ -182,7 +188,8 @@ def _upload_to_geoserver(data, filetype, access):
"""

gs = _get_geoserver(access)
gs.put(data, filetype)
import_url = gs.put(data, filetype)
return import_url


def _index_records(records):
Expand Down
5 changes: 5 additions & 0 deletions requirements.txt
@@ -1,17 +1,22 @@
alembic==0.8.3
arrow==0.7.0
blinker==1.4
boto3==1.3.1
botocore==1.4.35
click==6.6
docutils==0.12
Flask==0.10.1
Flask-Migrate==1.6.0
Flask-Script==2.0.5
Flask-SQLAlchemy==2.0
futures==3.0.5
GDAL==1.11.1
gitdb==0.6.4
GitPython==1.0.1
gunicorn==19.4.5
itsdangerous==0.24
Jinja2==2.8
jmespath==0.9.0
lxml==3.5.0
Mako==1.0.3
MarkupSafe==0.23
Expand Down

0 comments on commit 6d6e7cf

Please sign in to comment.