Skip to content

Commit

Permalink
Merge pull request #19 from MITLibraries/timx-142-handle-deleted-records
Browse files Browse the repository at this point in the history
Timx 142 handle deleted records
  • Loading branch information
hakbailey committed Jan 25, 2023
2 parents 62d4bbe + d958139 commit beae6de
Show file tree
Hide file tree
Showing 9 changed files with 460 additions and 265 deletions.
14 changes: 14 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
version: 2
updates:
- package-ecosystem: "pip"
directory: "/"
schedule:
interval: "daily"
reviewers:
- "MITLibraries/dataeng"
- package-ecosystem: "docker"
directory: "/"
schedule:
interval: "weekly"
reviewers:
- "MITLibraries/dataeng"
2 changes: 1 addition & 1 deletion .python-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.9.15
3.9.16
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ black = "*"
coverage = "*"
freezegun = "*"
moto = "*"
mypy = "*"
pylama = {extras = ["all"], version = "*"}
pytest = "*"

Expand Down
595 changes: 346 additions & 249 deletions Pipfile.lock

Large diffs are not rendered by default.

32 changes: 24 additions & 8 deletions lambdas/commands.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import logging

from lambdas import config, helpers

logger = logging.getLogger(__name__)


def generate_extract_command(
input_data: dict, run_date: str, timdex_bucket: str, verbose: bool
Expand Down Expand Up @@ -75,11 +79,7 @@ def generate_transform_commands(
if verbose:
transform_command.append("--verbose")

if load_type == "index":
files_to_transform.append({"transform-command": transform_command})
elif load_type == "delete":
# Not yet implemented
pass
files_to_transform.append({"transform-command": transform_command})

return {"files-to-transform": files_to_transform}

Expand All @@ -90,6 +90,7 @@ def generate_load_commands(
"""Generate task run command for loading records into OpenSearch."""
if run_type == "daily":
files_to_index = []
files_to_delete = []

for transform_output_file in transform_output_files:
load_type, _ = helpers.get_load_type_and_sequence_from_timdex_filename(
Expand All @@ -104,16 +105,31 @@ def generate_load_commands(
]
files_to_index.append({"load-command": load_command})
elif load_type == "delete":
# Not yet implemented
pass
load_command = [
"bulk-delete",
"--source",
source,
f"s3://{timdex_bucket}/{transform_output_file}",
]
files_to_delete.append({"load-command": load_command})

return {"files-to-index": files_to_index}
return {"files-to-index": files_to_index, "files-to-delete": files_to_delete}

if run_type == "full":
new_index_name = helpers.generate_index_name(source)

files_to_index = []
for transform_output_file in transform_output_files:
load_type, _ = helpers.get_load_type_and_sequence_from_timdex_filename(
transform_output_file
)
if load_type == "delete":
logger.error(
"%s full ingest had a deleted records file: %s",
source,
transform_output_file,
)
continue
load_command = [
"bulk-index",
"--index",
Expand Down
7 changes: 6 additions & 1 deletion lambdas/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ def generate_step_output_filename(
load), and optional file sequence number, generate a full file name.
"""
sequence_suffix = f"_{sequence}" if sequence else ""
file_type = "xml" if step == "extract" else "json"
if step == "extract":
file_type = "xml"
elif load_type == "delete":
file_type = "txt"
else:
file_type = "json"
return f"{prefix}-to-{load_type}{sequence_suffix}.{file_type}"


Expand Down
45 changes: 41 additions & 4 deletions tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def test_generate_transform_commands_required_input_fields():
}


def test_generate_transform_input_all_input_fields():
def test_generate_transform_commands_all_input_fields():
input_data = {
"next-step": "transform",
"run-date": "2022-01-02T12:13:14Z",
Expand Down Expand Up @@ -117,6 +117,16 @@ def test_generate_transform_input_all_input_fields():
"--verbose",
]
},
{
"transform-command": [
"--input-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-daily-extracted-records-to-delete.xml",
"--output-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-daily-transformed-records-to-delete.txt",
"--source=testsource",
"--verbose",
]
},
]
}

Expand All @@ -125,7 +135,7 @@ def test_generate_load_commands_daily():
transform_output_files = [
"testsource/testsource-2022-01-02-daily-transformed-records-to-index_01.json",
"testsource/testsource-2022-01-02-daily-transformed-records-to-index_02.json",
"testsource/testsource-2022-01-02-daily-transformed-records-to-delete.json",
"testsource/testsource-2022-01-02-daily-transformed-records-to-delete.txt",
]
assert commands.generate_load_commands(
transform_output_files, "daily", "testsource", "test-timdex-bucket"
Expand All @@ -149,7 +159,18 @@ def test_generate_load_commands_daily():
"testsource-2022-01-02-daily-transformed-records-to-index_02.json",
]
},
]
],
"files-to-delete": [
{
"load-command": [
"bulk-delete",
"--source",
"testsource",
"s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-daily-transformed-records-to-delete.txt",
]
}
],
}


Expand Down Expand Up @@ -184,7 +205,7 @@ def test_generate_load_commands_full_not_aliased():


@freeze_time("2022-01-02 12:13:14")
def test_generate_load_input_full_aliased():
def test_generate_load_commands_full_aliased():
transform_output_files = [
"alma/alma-2022-01-02-full-transformed-records-to-index.json"
]
Expand Down Expand Up @@ -215,6 +236,22 @@ def test_generate_load_input_full_aliased():
}


@freeze_time("2022-01-02 12:13:14")
def test_generate_load_commands_full_with_deletes_logs_error(caplog):
transform_output_files = [
"alma/alma-2022-01-02-full-transformed-records-to-delete.txt"
]
commands.generate_load_commands(
transform_output_files, "full", "alma", "test-timdex-bucket"
)
assert (
"lambdas.commands",
40,
"alma full ingest had a deleted records file: "
"alma/alma-2022-01-02-full-transformed-records-to-delete.txt",
) in caplog.record_tuples


def test_generate_load_command_unexpected_input():
transform_output_files = [
"alma/alma-2022-01-02-full-transformed-records-to-index.json"
Expand Down
27 changes: 26 additions & 1 deletion tests/test_format_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@ def test_lambda_handler_with_next_step_transform_alma_files_present():
"next-step": "load",
"transform": {
"files-to-transform": [
{
"transform-command": [
"--input-file=s3://test-timdex-bucket/alma/"
"alma-2022-09-12-daily-extracted-records-to-delete.xml",
"--output-file=s3://test-timdex-bucket/alma/"
"alma-2022-09-12-daily-transformed-records-to-delete.txt",
"--source=alma",
]
},
{
"transform-command": [
"--input-file=s3://test-timdex-bucket/alma/"
Expand Down Expand Up @@ -161,6 +170,11 @@ def test_lambda_handler_with_next_step_load_files_present(s3_client):
Key="testsource/testsource-2022-01-02-daily-transformed-records-to-index.json",
Body="I am a file",
)
s3_client.put_object(
Bucket="test-timdex-bucket",
Key="testsource/testsource-2022-01-02-daily-transformed-records-to-delete.txt",
Body="record-id",
)
event = {
"run-date": "2022-01-02T12:13:14Z",
"run-type": "daily",
Expand All @@ -183,7 +197,18 @@ def test_lambda_handler_with_next_step_load_files_present(s3_client):
"testsource-2022-01-02-daily-transformed-records-to-index.json",
]
},
]
],
"files-to-delete": [
{
"load-command": [
"bulk-delete",
"--source",
"testsource",
"s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-daily-transformed-records-to-delete.txt",
]
},
],
},
}

Expand Down
2 changes: 1 addition & 1 deletion tests/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def test_generate_step_output_filename_with_sequence():
def test_generate_step_output_filename_without_sequence():
assert (
helpers.generate_step_output_filename("delete", "prefix", "transform")
== "prefix-to-delete.json"
== "prefix-to-delete.txt"
)


Expand Down

0 comments on commit beae6de

Please sign in to comment.