Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Multi Mosaic in DynamoDB Table #127

Merged
merged 18 commits into from
Oct 22, 2020
Merged
142 changes: 109 additions & 33 deletions cogeo_mosaic/backends/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import itertools
import json
import os
import re
import sys
import warnings
from decimal import Decimal
Expand All @@ -20,6 +21,7 @@
from cogeo_mosaic.backends.utils import find_quadkeys
from cogeo_mosaic.cache import lru_cache
from cogeo_mosaic.errors import _HTTP_EXCEPTIONS, MosaicError, MosaicExists
from cogeo_mosaic.logger import logger
from cogeo_mosaic.mosaic import MosaicJSON
from cogeo_mosaic.utils import bbox_union

Expand All @@ -31,14 +33,37 @@ class DynamoDBBackend(BaseBackend):
client: Any = attr.ib(default=None)
region: str = attr.ib(default=os.getenv("AWS_REGION", "us-east-1"))
table_name: str = attr.ib(init=False)
mosaic_name: str = attr.ib(init=False)
table: Any = attr.ib(init=False)

_backend_name = "AWS DynamoDB"

def __attrs_post_init__(self):
"""Post Init: parse path, create client and connect to Table."""
"""Post Init: parse path, create client and connect to Table.

A path looks like

dynamodb://{region}/{table_name}:{mosaic_name}
dynamodb:///{table_name}:{mosaic_name}

"""
logger.debug(f"Using DynamoDB backend: {self.path}")

if not re.match(
r"dynamodb://([a-z]{2}\-[a-z]+\-[0-9])?\/[a-zA-Z0-9\_\-\.]+\:[a-zA-Z0-9\_\-\.]+$",
vincentsarago marked this conversation as resolved.
Show resolved Hide resolved
vincentsarago marked this conversation as resolved.
Show resolved Hide resolved
self.path,
):
raise ValueError(f"Invalid DynamoDB path: {self.path}")

parsed = urlparse(self.path)
self.table_name = parsed.path.strip("/")

mosaic_info = parsed.path.lstrip("/").split(":")
self.table_name = mosaic_info[0]
self.mosaic_name = mosaic_info[1]

logger.debug(f"Table: {self.table_name}")
logger.debug(f"Mosaic: {self.mosaic_name}")

self.region = parsed.netloc or self.region
self.client = self.client or boto3.resource("dynamodb", region_name=self.region)
self.table = self.client.Table(self.table_name)
Expand All @@ -53,46 +78,55 @@ def assets_for_point(self, lng: float, lat: float) -> List[str]:
tile = mercantile.tile(lng, lat, self.quadkey_zoom)
return self.get_assets(tile.x, tile.y, tile.z)

def info(self, fetch_quadkeys: bool = False):
def info(self, quadkeys: bool = False):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will be the same for all the backend once we do #126

"""Mosaic info."""
if not fetch_quadkeys:
warnings.warn(
"Returning empty quadkey list, performing full scan operation might be slow and expensive on large database."
"You can retrieve the list of quadkey by setting `fetch_quadkeys=True`"
)

return {
"bounds": self.mosaic_def.bounds,
"center": self.mosaic_def.center,
"maxzoom": self.mosaic_def.maxzoom,
"minzoom": self.mosaic_def.minzoom,
"name": self.mosaic_def.name if self.mosaic_def.name else "mosaic",
"quadkeys": [] if not fetch_quadkeys else self._quadkeys,
"quadkeys": [] if not quadkeys else self._quadkeys,
}

@property
def _quadkeys(self) -> List[str]:
"""Return the list of quadkey tiles."""
warnings.warn(
"Performing full scan operation might be slow and expensive on large database."
resp = self.table.query(
KeyConditionExpression="#mosaicId = :mosaicId",
# allows you to use dyanmodb reserved keywords as field names
ExpressionAttributeNames={"#mosaicId": "mosaicId", "#quadKey": "quadKey"},
ExpressionAttributeValues={":mosaicId": {"S": self.mosaic_name}},
ProjectionExpression="#quadKey",
)
resp = self.table.scan(ProjectionExpression="quadkey") # TODO: Add pagination
return [qk["quadkey"] for qk in resp["Items"] if qk["quadkey"] != "-1"]

def write(self, overwrite: bool = False, **kwargs: Any):
"""Write mosaicjson document to AWS DynamoDB."""
vincentsarago marked this conversation as resolved.
Show resolved Hide resolved
self._create_table(overwrite=overwrite, **kwargs)
if not self._table_exists():
self._create_table(**kwargs)

if self._mosaic_exists():
if not overwrite:
raise MosaicExists(
vincentsarago marked this conversation as resolved.
Show resolved Hide resolved
f"Mosaic already exists in {self.table_name}, use `overwrite=True`."
)
self.clean()

items = self._create_items()
self._write_items(items)

def _update_quadkey(self, quadkey: str, dataset: List[str]):
"""Update quadkey list."""
self.table.put_item(Item={"quadkey": quadkey, "assets": dataset})
"""Update single quadkey in DynamoDB."""
self.table.put_item(
Item={"mosaicId": self.mosaic_name, "quadkey": quadkey, "assets": dataset}
)

def _update_metadata(self):
"""Update bounds and center."""
meta = json.loads(json.dumps(self.metadata), parse_float=Decimal)
vincentsarago marked this conversation as resolved.
Show resolved Hide resolved
meta["quadkey"] = "-1"
meta["mosaicId"] = self.mosaic_name
self.table.put_item(Item=meta)

def update(
Expand All @@ -103,6 +137,8 @@ def update(
**kwargs,
):
"""Update existing MosaicJSON on backend."""
logger.debug(f"Updating {self.mosaic_name}...")

new_mosaic = self.mosaic_def.from_features(
features,
self.mosaic_def.minzoom,
Expand Down Expand Up @@ -136,21 +172,20 @@ def update(

self._update_metadata()

return

def _create_table(
self, overwrite: bool = False, billing_mode: str = "PAY_PER_REQUEST"
):
if self._table_exist():
if not overwrite:
raise MosaicExists("Table already exist, use `overwrite=True`.")
self.table.delete()
self.table.wait_until_not_exists()
def _create_table(self, billing_mode: str = "PAY_PER_REQUEST", **kwargs: Any):
"""Create DynamoDB Table."""
vincentsarago marked this conversation as resolved.
Show resolved Hide resolved
logger.debug(f"Creating {self.table_name} Table.")

# Define schema for primary key
# Non-keys don't need a schema
attr_defs = [{"AttributeName": "quadkey", "AttributeType": "S"}]
key_schema = [{"AttributeName": "quadkey", "KeyType": "HASH"}]
attr_defs = [
{"AttributeName": "mosaicId", "AttributeType": "S"},
{"AttributeName": "quadkey", "AttributeType": "S"},
]
key_schema = [
{"AttributeName": "mosaicId", "KeyType": "HASH"},
{"AttributeName": "quadkey", "KeyType": "RANGE"},
vincentsarago marked this conversation as resolved.
Show resolved Hide resolved
]
vincentsarago marked this conversation as resolved.
Show resolved Hide resolved

# Note: errors if table already exists
try:
Expand All @@ -159,6 +194,7 @@ def _create_table(
TableName=self.table.table_name,
KeySchema=key_schema,
BillingMode=billing_mode,
**kwargs,
)

# If outside try/except block, could wait forever if unable to
Expand All @@ -177,10 +213,11 @@ def _create_items(self) -> List[Dict]:

# NOTE: quadkey is a string type
meta["quadkey"] = "-1"
meta["mosaicId"] = self.mosaic_name
items.append(meta)

for quadkey, assets in self.mosaic_def.tiles.items():
item = {"quadkey": quadkey, "assets": assets}
item = {"mosaicId": self.mosaic_name, "quadkey": quadkey, "assets": assets}
vincentsarago marked this conversation as resolved.
Show resolved Hide resolved
items.append(item)

return items
Expand All @@ -199,12 +236,12 @@ def _read(self) -> MosaicJSON: # type: ignore
meta = self._fetch_dynamodb("-1")

# Numeric values are loaded from DynamoDB as Decimal types
# Convert maxzoom, minzoom, quadkey_zoom to float/int
# Convert maxzoom, minzoom, quadkey_zoom to int
for key in ["minzoom", "maxzoom", "quadkey_zoom"]:
if meta.get(key):
meta[key] = int(meta[key])

# Convert bounds, center to float/int
# Convert bounds, center to float
for key in ["bounds", "center"]:
if meta.get(key):
meta[key] = list(map(float, meta[key]))
Expand All @@ -231,16 +268,55 @@ def get_assets(self, x: int, y: int, z: int) -> List[str]:

def _fetch_dynamodb(self, quadkey: str) -> Dict:
try:
return self.table.get_item(Key={"quadkey": quadkey}).get("Item", {})
return self.table.get_item(
Key={"mosaicId": self.mosaic_name, "quadkey": quadkey}
).get("Item", {})
except ClientError as e:
status_code = e.response["ResponseMetadata"]["HTTPStatusCode"]
exc = _HTTP_EXCEPTIONS.get(status_code, MosaicError)
raise exc(e.response["Error"]["Message"]) from e

def _table_exist(self) -> bool:
def _table_exists(self) -> bool:
"""Check if the Table already exists."""
try:
_ = self.table.table_status
return True
except self.table.meta.client.exceptions.ResourceNotFoundException:
return False

def _mosaic_exists(self) -> bool:
"""Check if the mosaic already exists in the Table."""
item = self.table.get_item(
Key={"mosaicId": self.mosaic_name, "quadkey": "-1"}
).get("Item", {})

return True if item else False
vincentsarago marked this conversation as resolved.
Show resolved Hide resolved

def clean(self):
vincentsarago marked this conversation as resolved.
Show resolved Hide resolved
"""clean MosaicID from dynamoDB Table."""
logger.debug(f"Deleting items for mosaic {self.mosaic_name}...")

# get items
resp = self.table.query(
KeyConditionExpression="#mosaicId = :mosaicId",
ExpressionAttributeNames={"#mosaicId": "mosaicId", "#quadKey": "quadKey"},
ExpressionAttributeValues={":mosaicId": {"S": self.mosaic_name}},
ProjectionExpression="#quadKey",
)
vincentsarago marked this conversation as resolved.
Show resolved Hide resolved

# delete items
for i in resp["Items"]:
self.client.batch_write_item(
RequestItems={
self.table_name: [
{
"DeleteRequest": {
"Key": {
"mosaicId": {"S": i["mosaicId"]},
"quadkey": {"S": i["quadkey"]},
}
}
}
]
}
)