Skip to content

Commit

Permalink
DMS/DynamoDB/MongoDB: Use SQL with parameters instead of inlining values
Browse files Browse the repository at this point in the history
- `SQLOperation` bundles data about an SQL operation, including
  statement and parameters.

- Also, refactor `DynamoDBFullLoadTranslator` to `commons-codec`.
  • Loading branch information
amotl committed Aug 26, 2024
1 parent 3876829 commit ebe0165
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 47 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

## Unreleased
- MongoDB: Fix and verify Zyp transformations
- DMS/DynamoDB/MongoDB I/O: Use SQL with parameters instead of inlining values

## 2024/08/21 v0.0.18
- Dependencies: Unpin commons-codec, to always use the latest version
Expand Down
44 changes: 10 additions & 34 deletions cratedb_toolkit/io/dynamodb/copy.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
# ruff: noqa: S608
import logging
import typing as t

import sqlalchemy as sa
from commons_codec.transform.dynamodb import DynamoCDCTranslatorCrateDB
from commons_codec.transform.dynamodb import DynamoDBFullLoadTranslator
from tqdm import tqdm
from yarl import URL

Expand Down Expand Up @@ -34,7 +33,7 @@ def __init__(
self.dynamodb_table = self.dynamodb_url.path.lstrip("/")
self.cratedb_adapter = DatabaseAdapter(str(cratedb_sqlalchemy_url), echo=False)
self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table)
self.translator = DynamoDBCrateDBTranslator(table_name=self.cratedb_table)
self.translator = DynamoDBFullLoadTranslator(table_name=self.cratedb_table)

self.progress = progress

Expand All @@ -53,45 +52,22 @@ def start(self):
progress_bar = tqdm(total=records_in)
result = self.dynamodb_adapter.scan(table_name=self.dynamodb_table)
records_out = 0
for sql in self.items_to_sql(result["Items"]):
if sql:
try:
connection.execute(sa.text(sql))
records_out += 1
except sa.exc.ProgrammingError as ex:
logger.warning(f"Running query failed: {ex}")
progress_bar.update()
for operation in self.items_to_operations(result["Items"]):
try:
connection.execute(sa.text(operation.statement), operation.parameters)
records_out += 1
except sa.exc.ProgrammingError as ex:
logger.warning(f"Running query failed: {ex}")
progress_bar.update()
progress_bar.close()
connection.commit()
logger.info(f"Number of records written: {records_out}")
if records_out < records_in:
logger.warning("No data has been copied")

def items_to_sql(self, items):
def items_to_operations(self, items):
"""
Convert data for record items to INSERT statements.
"""
for item in items:
yield self.translator.to_sql(item)


class DynamoDBCrateDBTranslator(DynamoCDCTranslatorCrateDB):
@property
def sql_ddl(self):
"""`
Define SQL DDL statement for creating table in CrateDB that stores re-materialized CDC events.
"""
return f"CREATE TABLE IF NOT EXISTS {self.table_name} ({self.DATA_COLUMN} OBJECT(DYNAMIC));"

def to_sql(self, record: t.Dict[str, t.Any]) -> str:
"""
Produce INSERT|UPDATE|DELETE SQL statement from INSERT|MODIFY|REMOVE CDC event record.
"""
values_clause = self.image_to_values(record)
sql = f"INSERT INTO {self.table_name} ({self.DATA_COLUMN}) VALUES ('{values_clause}');"
return sql

@staticmethod
def quote_table_name(name: str):
# TODO @ Upstream: Quoting table names should be the responsibility of the caller.
return name
6 changes: 3 additions & 3 deletions cratedb_toolkit/io/mongodb/cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ def start(self):
# FIXME: Note that the function does not perform any sensible error handling yet.
with self.cratedb_adapter.engine.connect() as connection:
connection.execute(sa.text(self.cdc.sql_ddl))
for sql in self.cdc_to_sql():
if sql:
connection.execute(sa.text(sql))
for operation in self.cdc_to_sql():
if operation:
connection.execute(sa.text(operation.statement), operation.parameters)

def cdc_to_sql(self):
"""
Expand Down
17 changes: 11 additions & 6 deletions cratedb_toolkit/io/processor/kinesis_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
# /// script
# requires-python = ">=3.9"
# dependencies = [
# "commons-codec",
# "commons-codec>=0.0.12",
# "sqlalchemy-cratedb==0.38.0",
# ]
# ///
Expand All @@ -40,7 +40,7 @@
from commons_codec.exception import UnknownOperationError
from commons_codec.model import ColumnTypeMapStore
from commons_codec.transform.aws_dms import DMSTranslatorCrateDB
from commons_codec.transform.dynamodb import DynamoCDCTranslatorCrateDB
from commons_codec.transform.dynamodb import DynamoDBCDCTranslator
from sqlalchemy.util import asbool

LOG_LEVEL: str = os.environ.get("LOG_LEVEL", "INFO")
Expand Down Expand Up @@ -78,11 +78,11 @@

# TODO: Automatically create destination table.
# TODO: Propagate mapping definitions and other settings.
cdc: t.Union[DMSTranslatorCrateDB, DynamoCDCTranslatorCrateDB]
cdc: t.Union[DMSTranslatorCrateDB, DynamoDBCDCTranslator]
if MESSAGE_FORMAT == "dms":
cdc = DMSTranslatorCrateDB(column_types=column_types)
elif MESSAGE_FORMAT == "dynamodb":
cdc = DynamoCDCTranslatorCrateDB(table_name=CRATEDB_TABLE)
cdc = DynamoDBCDCTranslator(table_name=CRATEDB_TABLE)

# Create the database connection outside the handler to allow
# connections to be re-used by subsequent function invocations.
Expand Down Expand Up @@ -123,8 +123,13 @@ def handler(event, context):
logger.debug(f"Record Data: {record_data}")

# Process record.
sql = cdc.to_sql(record_data)
connection.execute(sa.text(sql))
operation = cdc.to_sql(record_data)
connection.execute(sa.text(operation.statement), parameters=operation.parameters)

# Processing alternating CDC events requires write synchronization.
# FIXME: Needs proper table name quoting.
connection.execute(sa.text(f"REFRESH TABLE {CRATEDB_TABLE}"))

connection.commit()

# Bookkeeping.
Expand Down
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ docs = [
]
dynamodb = [
"boto3",
"commons-codec",
"commons-codec>=0.0.12",
]
full = [
"cratedb-toolkit[cfr,cloud,datasets,io,service]",
Expand All @@ -155,10 +155,11 @@ io = [
"sqlalchemy>=2",
]
kinesis = [
"commons-codec>=0.0.12",
"lorrystream[carabas]",
]
mongodb = [
"commons-codec[mongodb,zyp]>=0.0.4",
"commons-codec[mongodb,zyp]>=0.0.12",
"cratedb-toolkit[io]",
"orjson<4,>=3.3.1",
"pymongo<5,>=3.10.1",
Expand Down
1 change: 1 addition & 0 deletions tests/io/mongodb/test_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
pytestmark = pytest.mark.mongodb

pymongo = pytest.importorskip("pymongo", reason="Skipping tests because pymongo is not installed")
pytest.importorskip("bsonjs", reason="Skipping tests because bsonjs is not installed")
pytest.importorskip("rich", reason="Skipping tests because rich is not installed")

from cratedb_toolkit.io.mongodb.api import mongodb_copy # noqa: E402
Expand Down
136 changes: 134 additions & 2 deletions tests/io/test_processor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import base64
import json
import os
import sys

Expand All @@ -7,6 +9,80 @@

pytest.importorskip("commons_codec", reason="Only works with commons-codec installed")

from commons_codec.transform.dynamodb import DynamoDBCDCTranslator # noqa: E402

DYNAMODB_CDC_INSERT_NESTED = {
"awsRegion": "us-east-1",
"eventID": "b581c2dc-9d97-44ed-94f7-cb77e4fdb740",
"eventName": "INSERT",
"userIdentity": None,
"recordFormat": "application/json",
"tableName": "table-testdrive-nested",
"dynamodb": {
"ApproximateCreationDateTime": 1720800199717446,
"Keys": {"id": {"S": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266"}},
"NewImage": {
"id": {"S": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266"},
"data": {"M": {"temperature": {"N": "42.42"}, "humidity": {"N": "84.84"}}},
"meta": {"M": {"timestamp": {"S": "2024-07-12T01:17:42"}, "device": {"S": "foo"}}},
"string_set": {"SS": ["location_1"]},
"number_set": {"NS": [1, 2, 3, 0.34]},
"binary_set": {"BS": ["U3Vubnk="]},
"somemap": {
"M": {
"test": {"N": 1},
"test2": {"N": 2},
}
},
},
"SizeBytes": 156,
"ApproximateCreationDateTimePrecision": "MICROSECOND",
},
"eventSource": "aws:dynamodb",
}

DYNAMODB_CDC_MODIFY_NESTED = {
"awsRegion": "us-east-1",
"eventID": "24757579-ebfd-480a-956d-a1287d2ef707",
"eventName": "MODIFY",
"userIdentity": None,
"recordFormat": "application/json",
"tableName": "foo",
"dynamodb": {
"ApproximateCreationDateTime": 1720742302233719,
"Keys": {"id": {"S": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266"}},
"NewImage": {
"id": {"S": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266"},
"device": {"M": {"id": {"S": "bar"}, "serial": {"N": 12345}}},
"tags": {"L": [{"S": "foo"}, {"S": "bar"}]},
"empty_map": {"M": {}},
"empty_list": {"L": []},
"timestamp": {"S": "2024-07-12T01:17:42"},
"string_set": {"SS": ["location_1"]},
"number_set": {"NS": [1, 2, 3, 0.34]},
"binary_set": {"BS": ["U3Vubnk="]},
"somemap": {
"M": {
"test": {"N": 1},
"test2": {"N": 2},
}
},
"list_of_objects": {"L": [{"M": {"foo": {"S": "bar"}}}, {"M": {"baz": {"S": "qux"}}}]},
},
"OldImage": {
"NOTE": "This event does not match the INSERT record",
"humidity": {"N": "84.84"},
"temperature": {"N": "42.42"},
"location": {"S": "Sydney"},
"timestamp": {"S": "2024-07-12T01:17:42"},
"device": {"M": {"id": {"S": "bar"}, "serial": {"N": 12345}}},
},
"SizeBytes": 161,
"ApproximateCreationDateTimePrecision": "MICROSECOND",
},
"eventSource": "aws:dynamodb",
}


@pytest.fixture
def reset_handler():
Expand All @@ -16,9 +92,9 @@ def reset_handler():
pass


def test_processor_invoke_no_records(reset_handler, mocker, caplog):
def test_processor_kinesis_dms_no_records(reset_handler, mocker, caplog):
"""
Roughly verify that the unified Lambda handler works.
Roughly verify that the unified Lambda handler works with AWS DMS.
"""

# Configure environment variables.
Expand All @@ -33,3 +109,59 @@ def test_processor_invoke_no_records(reset_handler, mocker, caplog):
handler(event, None)

assert "Successfully processed 0 records" in caplog.messages


def test_processor_kinesis_dynamodb_insert_update(cratedb, reset_handler, mocker, caplog):
"""
Roughly verify that the unified Lambda handler works with AWS DynamoDB.
"""

# Define target table name.
table_name = '"testdrive"."demo"'

# Create target table.
cratedb.database.run_sql(DynamoDBCDCTranslator(table_name=table_name).sql_ddl)

# Configure Lambda processor per environment variables.
handler_environment = {
"CRATEDB_SQLALCHEMY_URL": cratedb.get_connection_url(),
"MESSAGE_FORMAT": "dynamodb",
"CRATEDB_TABLE": table_name,
}
mocker.patch.dict(os.environ, handler_environment)

from cratedb_toolkit.io.processor.kinesis_lambda import handler

# Define two CDC events: INSERT and UPDATE.
# They have to be conveyed separately because CrateDB needs a
# `REFRESH TABLE` operation between them.
event = {
"Records": [
wrap_kinesis(DYNAMODB_CDC_INSERT_NESTED),
wrap_kinesis(DYNAMODB_CDC_MODIFY_NESTED),
]
}

# Run transfer command.
handler(event, None)

# Verify outcome of processor, per validating log output.
assert "Successfully processed 2 records" in caplog.messages

# Verify data in target database.
assert cratedb.database.count_records(table_name) == 1
results = cratedb.database.run_sql(f"SELECT * FROM {table_name}", records=True) # noqa: S608
assert results[0]["data"]["list_of_objects"] == [{"foo": "bar"}, {"baz": "qux"}]


def wrap_kinesis(data):
"""
Wrap a CDC event into a Kinesis message, to satisfy the interface of the Lambda processor.
"""
return {
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
"kinesis": {
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
"data": base64.b64encode(json.dumps(data).encode("utf-8")),
},
}

0 comments on commit ebe0165

Please sign in to comment.