Skip to content
This repository was archived by the owner on Jun 18, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions slingshot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,9 @@
PUBLIC_WORKSPACE = "public"
RESTRICTED_WORKSPACE = "secure"
DATASTORE = "pg"


class state:
PENDING = 0
FAILED = 1
PUBLISHED = 2
48 changes: 41 additions & 7 deletions slingshot/app.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import base64
from datetime import datetime
import os
import threading
import uuid
Expand All @@ -8,6 +9,8 @@
import requests

from slingshot import PUBLIC_WORKSPACE, RESTRICTED_WORKSPACE, DATASTORE
from slingshot.db import load_layer
from slingshot.layer import create_layer
from slingshot.parsers import FGDCParser, parse
from slingshot.record import Record
from slingshot.s3 import S3IO, session
Expand Down Expand Up @@ -115,10 +118,11 @@ def put(self, url, **kwargs):


class GeoServer(HttpMethodMixin):
def __init__(self, url, client, auth=None):
self.url = "{}/rest".format(url.rstrip("/"))
def __init__(self, url, client, auth=None, s3_alias="s3"):
self.url = url.rstrip("/")
self.client = client
self.auth = auth
self.s3_alias = s3_alias

def request(self, method, path, **kwargs):
"""Make a request.
Expand All @@ -129,12 +133,12 @@ def request(self, method, path, **kwargs):
very large.
"""
kwargs = {"stream": False, "auth": self.auth, **kwargs}
url = "{}/{}".format(self.url, path.lstrip("/"))
url = "{}/rest/{}".format(self.url, path.lstrip("/"))
r = self.client.request(method, url, **kwargs)
r.raise_for_status()
return r

def add(self, layer, s3_alias="s3"):
def add(self, layer):
"""Add the layer to GeoServer.

In the case of of a Shapefile, the layer should already exist in the
Expand All @@ -144,19 +148,20 @@ def add(self, layer, s3_alias="s3"):
if layer.format == 'Shapefile':
self._add_feature(layer)
elif layer.format == 'GeoTiff':
self._add_coverage(layer, s3_alias)
self._add_coverage(layer)
else:
raise Exception("Unknown format")

def _add_coverage(self, layer, s3_alias):
def _add_coverage(self, layer):
workspace = PUBLIC_WORKSPACE if layer.is_public() else \
RESTRICTED_WORKSPACE
data = {
"coverageStore": {
"name": layer.name,
"type": "S3GeoTiff",
"enabled": True,
"url": "{}://{}/{}".format(s3_alias, layer.bucket, layer.tif),
"url": "{}://{}/{}".format(self.s3_alias, layer.bucket,
layer.tif),
"workspace": {"name": workspace},
}
}
Expand Down Expand Up @@ -212,3 +217,32 @@ def delete(self, query='dct_provenance_s:MIT'):

def commit(self):
self.post('update', json={'commit': {}})


def publish_layer(bucket, key, geoserver, solr, destination, s3_url):
unpacked = unpack_zip(bucket, key, destination, s3_url)
layer = create_layer(*unpacked, s3_url)
layer.record = create_record(layer, geoserver.url)
if layer.format == "Shapefile":
load_layer(layer)
geoserver.add(layer)
solr.add(layer.record.as_dict())
return layer.name


def publishable_layers(bucket, dynamo):
s3 = session().resource("s3")
db = session().resource("dynamodb")
uploads = s3.Bucket(bucket)
table = db.Table(dynamo)
for page in uploads.objects.pages():
for obj in page:
name = os.path.splitext(obj.key)[0]
res = table.get_item(Key={"LayerName": name})
layer = res.get("Item")
if layer:
l_mod = datetime.fromisoformat(layer['LastMod'])
if l_mod > obj.last_modified.replace(tzinfo=None):
# published layer is newer than uploaded layer
continue
yield obj.key
90 changes: 58 additions & 32 deletions slingshot/cli.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from concurrent.futures import as_completed, ThreadPoolExecutor
from datetime import datetime
import os.path

import click

from slingshot import PUBLIC_WORKSPACE, RESTRICTED_WORKSPACE, DATASTORE
from slingshot.app import (create_record, GeoServer, HttpSession, make_slug,
Solr, unpack_zip,)
from slingshot.db import engine, load_layer
from slingshot.layer import create_layer
from slingshot import state, PUBLIC_WORKSPACE, RESTRICTED_WORKSPACE, DATASTORE
from slingshot.app import (GeoServer, HttpSession, make_slug, publish_layer,
publishable_layers, Solr)
from slingshot.db import engine
from slingshot.marc import MarcParser
from slingshot.record import Record
from slingshot.s3 import session


@click.group()
Expand Down Expand Up @@ -70,15 +74,17 @@ def initialize(geoserver, geoserver_user, geoserver_password, db_host, db_port,


@main.command()
@click.argument('bucket')
@click.argument('key')
@click.argument('dest')
@click.argument('layers', nargs=-1)
@click.option('--publish-all', is_flag=True,
help="Publish all layers in the upload bucket. If the layer "
"has already been published it will be skipped unless the "
"uploaded layer is newer than the published layer.")
@click.option('--db-uri', envvar='PG_DATABASE',
help="SQLAlchemy PostGIS URL "
"Ex: postgresql://user:password@host:5432/dbname")
@click.option('--db-schema', envvar='PG_SCHEMA', default='public',
help="PostGres schema name. Default value: public")
@click.option('--geoserver', envvar='GEOSERVER', help="Base Geoserver ULR")
@click.option('--geoserver', envvar='GEOSERVER', help="Base Geoserver URL")
@click.option('--geoserver-user', envvar='GEOSERVER_USER',
help="GeoServer user")
@click.option('--geoserver-password', envvar='GEOSERVER_PASSWORD',
Expand All @@ -95,35 +101,55 @@ def initialize(geoserver, geoserver_user, geoserver_password, db_host, db_port,
"appears as the protocol) for alternative S3 services, for "
"example: minio://bucket/key. See https://docs.geoserver.org/latest/en/user/community/s3-geotiff/index.html " # noqa: E501
"for more information.")
def publish(bucket, key, dest, db_uri, db_schema, geoserver, geoserver_user,
geoserver_password, solr, solr_user, solr_password, s3_endpoint,
s3_alias):
"""Publish layer at s3://BUCKET/KEY

This will publish the uploaded zipfile layer named KEY in the S3 bucket
named BUCKET. The unpacked, processed layer will be stored in a new
directory named after the layer in the DEST bucket.

The initial zipfile should contain the necessary data files and an
fgdc.xml file. The unpacked zipfile will be flattened (any subdirectories
removed) and a geoblacklight.json file containing the GeoBlacklight
record will be added.
"""
@click.option('--dynamo-table',
help="Name of DynamoDB table for tracking state of layer")
@click.option('--upload-bucket', help="Name of S3 bucket for uploaded layers")
@click.option('--storage-bucket', help="Name of S3 bucket for stored layers")
@click.option('--num-workers', default=1,
help="Number of worker threads to use. There is likely not much "
"point in setting this higher than the database connection "
"pool size which is 5 by default. Defaults to 1.")
def publish(layers, db_uri, db_schema, geoserver, geoserver_user,
geoserver_password, solr, solr_user, solr_password,
s3_endpoint, s3_alias, dynamo_table, upload_bucket,
storage_bucket, num_workers, publish_all):
if not any((layers, publish_all)) or all((layers, publish_all)):
raise click.ClickException(
"You must specify either one or more uploaded layer package names "
"or use the --publish-all switch.")
geo_auth = (geoserver_user, geoserver_password) if geoserver_user and \
geoserver_password else None
solr_auth = (solr_user, solr_password) if solr_user and solr_password \
else None
engine.configure(db_uri, db_schema)
geo_svc = GeoServer(geoserver, HttpSession(), auth=geo_auth)
geo_svc = GeoServer(geoserver, HttpSession(), auth=geo_auth,
s3_alias=s3_alias)
solr_svc = Solr(solr, HttpSession(), auth=solr_auth)
layer = create_layer(*(unpack_zip(bucket, key, dest, s3_endpoint)),
s3_endpoint)
layer.record = create_record(layer, geoserver)
if layer.format == "Shapefile":
load_layer(layer)
geo_svc.add(layer, s3_alias)
solr_svc.add(layer.record.as_dict())
click.echo("Published {}".format(layer.name))
dynamodb = session().resource("dynamodb").Table(dynamo_table)
if publish_all:
work = publishable_layers(upload_bucket, dynamo_table)
else:
work = layers
with ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = {executor.submit(publish_layer, upload_bucket, layer,
geo_svc, solr_svc, storage_bucket,
s3_endpoint): layer for layer in work}
for future in as_completed(futures):
layer = futures[future]
try:
res = future.result()
except Exception as e:
click.echo("Could not publish {}: {}".format(layer, e))
dynamodb.put_item(Item={
"LayerName": os.path.splitext(layer)[0],
"LastMod": datetime.utcnow().isoformat(timespec="seconds"),
"State": state.FAILED})
else:
click.echo("Published {}".format(res))
dynamodb.put_item(Item={
"LayerName": os.path.splitext(layer)[0],
"LastMod": datetime.utcnow().isoformat(timespec="seconds"),
"State": state.PUBLISHED})


@main.command()
Expand Down
17 changes: 16 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os

import boto3
from moto import mock_s3
from moto import mock_s3, mock_dynamodb2
import pytest

from slingshot.layer import Shapefile
Expand All @@ -16,6 +16,21 @@ def s3():
yield conn


@pytest.fixture
def dynamo_table():
with mock_dynamodb2():
db = boto3.resource('dynamodb')
table = db.create_table(
TableName="slingshot",
KeySchema=[{"AttributeName": "LayerName", "KeyType": "HASH"}],
AttributeDefinitions=[
{"AttributeName": "LayerName", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
ProvisionedThroughput={"ReadCapacityUnits": 1,
"WriteCapacityUnits": 1})
yield table


@pytest.fixture
def shapefile():
return _data_file('fixtures/bermuda.zip')
Expand Down
27 changes: 27 additions & 0 deletions tests/test_app.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
import uuid

import requests_mock
Expand All @@ -8,6 +9,7 @@
HttpSession,
make_slug,
make_uuid,
publishable_layers,
Solr,
unpack_zip,
)
Expand Down Expand Up @@ -72,3 +74,28 @@ def test_make_uuid_creates_uuid_string():

def test_make_slug_creates_slug():
assert make_slug('bermuda') == 'mit-34clfhaokfmkq'


def test_publishable_layers_includes_new_layer(s3, dynamo_table):
s3.Bucket("upload").put_object(Key="foo.zip", Body="Some data")
layers = list(publishable_layers("upload", "slingshot"))
assert layers.pop() == "foo.zip"


def test_publishable_layers_includes_updated_layer(s3, dynamo_table):
awhile_ago = datetime(1980, 1, 1).isoformat()
dynamo_table.put_item(Item={"LayerName": "foo",
"LastMod": awhile_ago})
s3.Bucket("upload").put_object(Key="foo.zip", Body="Some data")
layers = list(publishable_layers("upload", "slingshot"))
assert layers.pop() == "foo.zip"


def test_publishable_layers_skips_old_layer(s3, dynamo_table):
the_future = datetime(2080, 1, 1).isoformat()
# This test will fail in the year 2080. Probably ok. I'll be dead anyways.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I don't specifically think this is going to cause real world problems, I'd suggest using a dynamic date such as today + one year to avoid the arbitrary future failure. And yes, I won't have to fix it either so 🤷‍♀

dynamo_table.put_item(Item={"LayerName": "foo",
"LastMod": the_future})
s3.Bucket("upload").put_object(Key="foo.zip", Body="Some data")
layers = list(publishable_layers("upload", "slingshot"))
assert not layers
40 changes: 27 additions & 13 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pytest
import requests_mock

from slingshot import state
from slingshot.cli import main
from slingshot.db import engine, metadata

Expand Down Expand Up @@ -31,7 +32,7 @@ def db():


@pytest.mark.integration
def test_publishes_shapefile(db, runner, shapefile, s3):
def test_publishes_shapefile(db, runner, shapefile, s3, dynamo_table):
bucket = s3.Bucket("upload")
bucket.upload_file(shapefile, "bermuda.zip")
uri = db().url
Expand All @@ -40,21 +41,28 @@ def test_publishes_shapefile(db, runner, shapefile, s3):
m.post("mock://example.com/geoserver/rest/workspaces/public/"
"datastores/pg/featuretypes")
m.post("mock://example.com/solr/update/json/docs")
res = runner.invoke(main, ['publish', 'upload', 'bermuda.zip',
'store', '--db-uri', uri, '--db-schema',
schema, '--geoserver',
'mock://example.com/geoserver/',
'--solr', 'mock://example.com/solr'])
res = runner.invoke(main,
['publish',
'--upload-bucket', 'upload',
'--storage-bucket', 'store',
'--db-uri', uri,
'--db-schema', schema,
'--geoserver', 'mock://example.com/geoserver/',
'--solr', 'mock://example.com/solr',
'--dynamo-table', dynamo_table.name,
'bermuda.zip'])
assert res.exit_code == 0
assert "Published bermuda" in res.output
with db().connect() as conn:
r = conn.execute('SELECT COUNT(*) FROM {}.bermuda'.format(schema)) \
.scalar()
assert r == 713
item = dynamo_table.get_item(Key={"LayerName": "bermuda"}).get("Item")
assert item["State"] == state.PUBLISHED


@pytest.mark.integration
def test_publishes_geotiff(runner, geotiff, s3):
def test_publishes_geotiff(runner, geotiff, s3, dynamo_table):
bucket = s3.Bucket("upload")
bucket.upload_file(geotiff, "france.zip")
with requests_mock.Mocker() as m:
Expand All @@ -63,12 +71,18 @@ def test_publishes_geotiff(runner, geotiff, s3):
m.post('mock://example.com/geoserver/rest/workspaces/secure'
'/coveragestores/france/coverages')
m.post('mock://example.com/solr/update/json/docs')
res = runner.invoke(main, ['publish', 'upload', 'france.zip',
'store', '--geoserver',
'mock://example.com/geoserver/',
'--solr', 'mock://example.com/solr'])
assert res.exit_code == 0
assert 'Published france' in res.output
res = runner.invoke(main,
['publish',
'--upload-bucket', 'upload',
'--storage-bucket', 'store',
'--geoserver', 'mock://example.com/geoserver/',
'--solr', 'mock://example.com/solr',
'--dynamo-table', dynamo_table.name,
'france.zip'])
assert res.exit_code == 0
assert 'Published france' in res.output
item = dynamo_table.get_item(Key={"LayerName": "france"}).get("Item")
assert item["State"] == state.PUBLISHED


@pytest.mark.integration
Expand Down
Loading