Skip to content
Closed
24 changes: 24 additions & 0 deletions .github/workflows/python_test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: Run Python Tests

on:
push:

jobs:
test:
env:
AWS_DEFAULT_REGION: us-west-2
runs-on: ubuntu-latest
strategy:
matrix:
python: [3.9]
steps:
- uses: actions/checkout@v2
- name: Setup Python
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python }}
- name: Install Tox and any other packages
run: pip install tox
- name: Run Tox
# Run tox using the version of Python in `PATH`
run: tox -e py
23 changes: 22 additions & 1 deletion lib/ingestor-api/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ export class StacIngestor extends Construct {
env,
dataAccessRole: props.dataAccessRole,
stage: props.stage,
dbSecret: props.stacDbSecret,
dbVpc: props.vpc,
dbSecurityGroup: props.stacDbSecurityGroup,
subnetSelection: props.subnetSelection,
});

this.buildApiEndpoint({
Expand Down Expand Up @@ -84,6 +88,10 @@ export class StacIngestor extends Construct {
env: Record<string, string>;
dataAccessRole: iam.IRole;
stage: string;
dbSecret: secretsmanager.ISecret;
dbVpc: ec2.IVpc;
dbSecurityGroup: ec2.ISecurityGroup;
subnetSelection: ec2.SubnetSelection;
}): PythonFunction {
const handler_role = new iam.Role(this, "execution-role", {
description:
Expand All @@ -101,12 +109,25 @@ export class StacIngestor extends Construct {
entry: `${__dirname}/runtime`,
index: "src/handler.py",
runtime: lambda.Runtime.PYTHON_3_9,
environment: props.env,
timeout: Duration.seconds(30),
environment: { DB_SECRET_ARN: props.dbSecret.secretArn, ...props.env },
vpc: props.dbVpc,
vpcSubnets: props.subnetSelection,
allowPublicSubnet: true,
role: handler_role,
memorySize: 2048,
});

// Allow handler to read DB secret
props.dbSecret.grantRead(handler);

// Allow handler to connect to DB
props.dbSecurityGroup.addIngressRule(
handler.connections.securityGroups[0],
ec2.Port.tcp(5432),
"Allow connections from STAC Ingestor"
);

props.table.grantReadWriteData(handler);
props.dataAccessRole.grant(handler.grantPrincipal, "sts:AssumeRole");

Expand Down
2 changes: 2 additions & 0 deletions lib/ingestor-api/runtime/dev_requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
httpx
moto[dynamodb, ssm]>=4.0.9
6 changes: 2 additions & 4 deletions lib/ingestor-api/runtime/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ orjson>=3.6.8
psycopg[binary,pool]>=3.0.15
pydantic_ssm_settings>=0.2.0
pydantic>=1.9.0
# Waiting for https://github.com/stac-utils/pgstac/pull/135
# pypgstac==0.6.6
pypgstac @ git+https://github.com/stac-utils/pgstac.git@main#egg=pygstac&subdirectory=pypgstac
requests>=2.27.1
pypgstac==0.6.11
requests==2.27.0
# Waiting for https://github.com/stac-utils/stac-pydantic/pull/116
stac-pydantic @ git+https://github.com/alukach/stac-pydantic.git@patch-1
36 changes: 36 additions & 0 deletions lib/ingestor-api/runtime/src/collection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import os

from pypgstac.db import PgstacDB

from .schemas import StacCollection
from .utils import (
get_db_credentials,
convert_decimals_to_float,
load_into_pgstac,
IngestionType,
)
from .vedaloader import VEDALoader


def ingest(collection: StacCollection):
"""
Takes a collection model,
does necessary preprocessing,
and loads into the PgSTAC collection table
"""
creds = get_db_credentials(os.environ["DB_SECRET_ARN"])
collection = [
convert_decimals_to_float(collection.dict(by_alias=True, exclude_unset=True))
]
with PgstacDB(dsn=creds.dsn_string, debug=True) as db:
load_into_pgstac(db=db, ingestions=collection, table=IngestionType.collections)


def delete(collection_id: str):
"""
Deletes the collection from the database
"""
creds = get_db_credentials(os.environ["DB_SECRET_ARN"])
with PgstacDB(dsn=creds.dsn_string, debug=True) as db:
loader = VEDALoader(db=db)
loader.delete_collection(collection_id)
104 changes: 21 additions & 83 deletions lib/ingestor-api/runtime/src/ingestor.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
from datetime import datetime
import os
import decimal
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Sequence
from typing import TYPE_CHECKING, Iterator, List, Optional, Sequence

import boto3
from boto3.dynamodb.types import TypeDeserializer
import orjson
import pydantic
from pypgstac.load import Methods
from pypgstac.db import PgstacDB

from .dependencies import get_settings, get_table
from .schemas import Ingestion, Status
from .vedaloader import VEDALoader
from .utils import (
IngestionType,
get_db_credentials,
convert_decimals_to_float,
load_into_pgstac,
)

if TYPE_CHECKING:
from aws_lambda_typing import context as context_, events
Expand All @@ -32,78 +32,6 @@ def get_queued_ingestions(records: List["DynamodbRecord"]) -> Iterator[Ingestion
yield ingestion


class DbCreds(pydantic.BaseModel):
username: str
password: str
host: str
port: int
dbname: str
engine: str

@property
def dsn_string(self) -> str:
return f"{self.engine}://{self.username}:{self.password}@{self.host}:{self.port}/{self.dbname}" # noqa


def get_db_credentials(secret_arn: str) -> DbCreds:
"""
Load pgSTAC database credentials from AWS Secrets Manager.
"""
print("Fetching DB credentials...")
session = boto3.session.Session(region_name=secret_arn.split(":")[3])
client = session.client(service_name="secretsmanager")
response = client.get_secret_value(SecretId=secret_arn)
return DbCreds.parse_raw(response["SecretString"])


def convert_decimals_to_float(item: Dict[str, Any]) -> Dict[str, Any]:
"""
DynamoDB stores floats as Decimals. We want to convert them back to floats
before inserting them into pgSTAC to avoid any issues when the records are
converted to JSON by pgSTAC.
"""

def decimal_to_float(obj):
if isinstance(obj, decimal.Decimal):
return float(obj)
raise TypeError

return orjson.loads(
orjson.dumps(
item,
default=decimal_to_float,
)
)


def load_into_pgstac(creds: DbCreds, ingestions: Sequence[Ingestion]):
"""
Bulk insert STAC records into pgSTAC.
"""
with PgstacDB(dsn=creds.dsn_string, debug=True) as db:
loader = VEDALoader(db=db)

items = [
# NOTE: Important to deserialize values to convert decimals to floats
convert_decimals_to_float(i.item)
for i in ingestions
]

print(f"Ingesting {len(items)} items")
loading_result = loader.load_items(
file=items,
# use insert_ignore to avoid overwritting existing items or upsert to replace
insert_mode=Methods.upsert,
)

# Trigger update on summaries and extents
collections = set([item.collection for item in items])
for collection in collections:
loader.update_collection_summaries(collection)

return loading_result


def update_dynamodb(
ingestions: Sequence[Ingestion],
status: Status,
Expand Down Expand Up @@ -135,14 +63,24 @@ def handler(event: "events.DynamoDBStreamEvent", context: "context_.Context"):
print("No queued ingestions to process")
return

items = [
# NOTE: Important to deserialize values to convert decimals to floats
convert_decimals_to_float(ingestion.item)
for ingestion in ingestions
]

creds = get_db_credentials(os.environ["DB_SECRET_ARN"])

# Insert into PgSTAC DB
outcome = Status.succeeded
message = None
try:
load_into_pgstac(
creds=get_db_credentials(os.environ["DB_SECRET_ARN"]),
ingestions=ingestions,
)
with PgstacDB(dsn=creds.dsn_string, debug=True) as db:
load_into_pgstac(
db=db,
ingestions=items,
table=IngestionType.items,
)
except Exception as e:
print(f"Encountered failure loading items into pgSTAC: {e}")
outcome = Status.failed
Expand Down
40 changes: 39 additions & 1 deletion lib/ingestor-api/runtime/src/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
from fastapi import Depends, FastAPI, HTTPException

from . import config, dependencies, schemas, services
from . import (
config,
dependencies,
schemas,
services,
collection as collection_loader,
)

app = FastAPI(
root_path=config.settings.root_path,
Expand Down Expand Up @@ -84,6 +90,38 @@ def cancel_ingestion(
return ingestion.cancel(db)


@app.post(
"/collections",
tags=["Collection"],
status_code=201,
dependencies=[Depends(dependencies.get_username)],
)
def publish_collection(collection: schemas.StacCollection):
# pgstac create collection
try:
collection_loader.ingest(collection)
return {f"Successfully published: {collection.id}"}
except Exception as e:
raise HTTPException(
status_code=400,
detail=(f"Unable to publish collection: {e}"),
)


@app.delete(
"/collections/{collection_id}",
tags=["Collection"],
dependencies=[Depends(dependencies.get_username)],
)
def delete_collection(collection_id: str):
try:
collection_loader.delete(collection_id=collection_id)
return {f"Successfully deleted: {collection_id}"}
except Exception as e:
print(e)
raise HTTPException(status_code=400, detail=(f"{e}"))


@app.get("/auth/me")
def who_am_i(username=Depends(dependencies.get_username)):
"""
Expand Down
8 changes: 7 additions & 1 deletion lib/ingestor-api/runtime/src/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from fastapi.exceptions import RequestValidationError
from pydantic import BaseModel, PositiveInt, dataclasses, error_wrappers, validator
from stac_pydantic import Item, shared
from stac_pydantic import Item, Collection, shared

from . import validators

Expand Down Expand Up @@ -43,7 +43,13 @@ def exists(cls, collection):
return collection


class StacCollection(Collection):
id: str
item_assets: Dict


class Status(str, enum.Enum):
started = "started"
queued = "queued"
failed = "failed"
succeeded = "succeeded"
Expand Down
Loading