Skip to content

Commit

Permalink
Handle deleted records
Browse files Browse the repository at this point in the history
Why these changes are being introduced:
We had previously skipped deleted records during command generation, but
we're now ready to include them so they can be handled by other
applications in the pipeline.

How this addresses that need:
* Adds deleted record commands to the generate_load_commands function.
* Logs an error if deleted record transform output files are found while
  generating load commands, as this indicates there is an issue with the
  full export from source.
* Removes deleted record command placeholder from the
  generate_transform_commands function, as the input command to
  transmogrifier is identical for all records.
* Updates the helpers step_output_filename function to generate the
  correct filename for deleted records.
* Updates relevant tests.

Relevant ticket(s):
* https://mitlibraries.atlassian.net/browse/TIMX-142
  • Loading branch information
hakbailey committed Jan 24, 2023
1 parent ad1b095 commit d958139
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 15 deletions.
32 changes: 24 additions & 8 deletions lambdas/commands.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import logging

from lambdas import config, helpers

logger = logging.getLogger(__name__)


def generate_extract_command(
input_data: dict, run_date: str, timdex_bucket: str, verbose: bool
Expand Down Expand Up @@ -75,11 +79,7 @@ def generate_transform_commands(
if verbose:
transform_command.append("--verbose")

if load_type == "index":
files_to_transform.append({"transform-command": transform_command})
elif load_type == "delete":
# Not yet implemented
pass
files_to_transform.append({"transform-command": transform_command})

return {"files-to-transform": files_to_transform}

Expand All @@ -90,6 +90,7 @@ def generate_load_commands(
"""Generate task run command for loading records into OpenSearch."""
if run_type == "daily":
files_to_index = []
files_to_delete = []

for transform_output_file in transform_output_files:
load_type, _ = helpers.get_load_type_and_sequence_from_timdex_filename(
Expand All @@ -104,16 +105,31 @@ def generate_load_commands(
]
files_to_index.append({"load-command": load_command})
elif load_type == "delete":
# Not yet implemented
pass
load_command = [
"bulk-delete",
"--source",
source,
f"s3://{timdex_bucket}/{transform_output_file}",
]
files_to_delete.append({"load-command": load_command})

return {"files-to-index": files_to_index}
return {"files-to-index": files_to_index, "files-to-delete": files_to_delete}

if run_type == "full":
new_index_name = helpers.generate_index_name(source)

files_to_index = []
for transform_output_file in transform_output_files:
load_type, _ = helpers.get_load_type_and_sequence_from_timdex_filename(
transform_output_file
)
if load_type == "delete":
logger.error(
"%s full ingest had a deleted records file: %s",
source,
transform_output_file,
)
continue
load_command = [
"bulk-index",
"--index",
Expand Down
7 changes: 6 additions & 1 deletion lambdas/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ def generate_step_output_filename(
load), and optional file sequence number, generate a full file name.
"""
sequence_suffix = f"_{sequence}" if sequence else ""
file_type = "xml" if step == "extract" else "json"
if step == "extract":
file_type = "xml"
elif load_type == "delete":
file_type = "txt"
else:
file_type = "json"
return f"{prefix}-to-{load_type}{sequence_suffix}.{file_type}"


Expand Down
45 changes: 41 additions & 4 deletions tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def test_generate_transform_commands_required_input_fields():
}


def test_generate_transform_input_all_input_fields():
def test_generate_transform_commands_all_input_fields():
input_data = {
"next-step": "transform",
"run-date": "2022-01-02T12:13:14Z",
Expand Down Expand Up @@ -117,6 +117,16 @@ def test_generate_transform_input_all_input_fields():
"--verbose",
]
},
{
"transform-command": [
"--input-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-daily-extracted-records-to-delete.xml",
"--output-file=s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-daily-transformed-records-to-delete.txt",
"--source=testsource",
"--verbose",
]
},
]
}

Expand All @@ -125,7 +135,7 @@ def test_generate_load_commands_daily():
transform_output_files = [
"testsource/testsource-2022-01-02-daily-transformed-records-to-index_01.json",
"testsource/testsource-2022-01-02-daily-transformed-records-to-index_02.json",
"testsource/testsource-2022-01-02-daily-transformed-records-to-delete.json",
"testsource/testsource-2022-01-02-daily-transformed-records-to-delete.txt",
]
assert commands.generate_load_commands(
transform_output_files, "daily", "testsource", "test-timdex-bucket"
Expand All @@ -149,7 +159,18 @@ def test_generate_load_commands_daily():
"testsource-2022-01-02-daily-transformed-records-to-index_02.json",
]
},
]
],
"files-to-delete": [
{
"load-command": [
"bulk-delete",
"--source",
"testsource",
"s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-daily-transformed-records-to-delete.txt",
]
}
],
}


Expand Down Expand Up @@ -184,7 +205,7 @@ def test_generate_load_commands_full_not_aliased():


@freeze_time("2022-01-02 12:13:14")
def test_generate_load_input_full_aliased():
def test_generate_load_commands_full_aliased():
transform_output_files = [
"alma/alma-2022-01-02-full-transformed-records-to-index.json"
]
Expand Down Expand Up @@ -215,6 +236,22 @@ def test_generate_load_input_full_aliased():
}


@freeze_time("2022-01-02 12:13:14")
def test_generate_load_commands_full_with_deletes_logs_error(caplog):
transform_output_files = [
"alma/alma-2022-01-02-full-transformed-records-to-delete.txt"
]
commands.generate_load_commands(
transform_output_files, "full", "alma", "test-timdex-bucket"
)
assert (
"lambdas.commands",
40,
"alma full ingest had a deleted records file: "
"alma/alma-2022-01-02-full-transformed-records-to-delete.txt",
) in caplog.record_tuples


def test_generate_load_command_unexpected_input():
transform_output_files = [
"alma/alma-2022-01-02-full-transformed-records-to-index.json"
Expand Down
27 changes: 26 additions & 1 deletion tests/test_format_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@ def test_lambda_handler_with_next_step_transform_alma_files_present():
"next-step": "load",
"transform": {
"files-to-transform": [
{
"transform-command": [
"--input-file=s3://test-timdex-bucket/alma/"
"alma-2022-09-12-daily-extracted-records-to-delete.xml",
"--output-file=s3://test-timdex-bucket/alma/"
"alma-2022-09-12-daily-transformed-records-to-delete.txt",
"--source=alma",
]
},
{
"transform-command": [
"--input-file=s3://test-timdex-bucket/alma/"
Expand Down Expand Up @@ -161,6 +170,11 @@ def test_lambda_handler_with_next_step_load_files_present(s3_client):
Key="testsource/testsource-2022-01-02-daily-transformed-records-to-index.json",
Body="I am a file",
)
s3_client.put_object(
Bucket="test-timdex-bucket",
Key="testsource/testsource-2022-01-02-daily-transformed-records-to-delete.txt",
Body="record-id",
)
event = {
"run-date": "2022-01-02T12:13:14Z",
"run-type": "daily",
Expand All @@ -183,7 +197,18 @@ def test_lambda_handler_with_next_step_load_files_present(s3_client):
"testsource-2022-01-02-daily-transformed-records-to-index.json",
]
},
]
],
"files-to-delete": [
{
"load-command": [
"bulk-delete",
"--source",
"testsource",
"s3://test-timdex-bucket/testsource/"
"testsource-2022-01-02-daily-transformed-records-to-delete.txt",
]
},
],
},
}

Expand Down
2 changes: 1 addition & 1 deletion tests/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def test_generate_step_output_filename_with_sequence():
def test_generate_step_output_filename_without_sequence():
assert (
helpers.generate_step_output_filename("delete", "prefix", "transform")
== "prefix-to-delete.json"
== "prefix-to-delete.txt"
)


Expand Down

0 comments on commit d958139

Please sign in to comment.