From b76e0106c1fabe59bd54a4a4db38e01f1d4451da Mon Sep 17 00:00:00 2001 From: jonavellecuerdo Date: Thu, 24 Oct 2024 14:06:38 -0400 Subject: [PATCH 1/2] Add utility function to parse details from S3 URIs and filenames Why these changes are being introduced: * Several core functions rely on information about the TIMDEX "run" details, which appear in the names of files produced by the "extract" and "transform" steps of the TIMDEX pipeline. This new util function can be used by core functions as needed, reducing duplicated code. How this addresses that need: * Add util function parse_timdex_filename * Update run_ab_transforms and collate_ab_transforms to use util * Use TIMDEX source slug in collated_dataset Side effects of this change: * None Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/TIMX-380 * https://mitlibraries.atlassian.net/browse/TIMX-365 --- abdiff/core/collate_ab_transforms.py | 35 +++++++--------- abdiff/core/run_ab_transforms.py | 42 ++++++++----------- abdiff/core/utils.py | 24 +++++++++++ tests/test_collate_ab_transforms.py | 35 +++++++--------- tests/test_run_ab_transforms.py | 62 +++++++++++++++++----------- tests/test_utils.py | 48 +++++++++++++++++++++ 6 files changed, 156 insertions(+), 90 deletions(-) diff --git a/abdiff/core/collate_ab_transforms.py b/abdiff/core/collate_ab_transforms.py index 9906da3..a69765b 100644 --- a/abdiff/core/collate_ab_transforms.py +++ b/abdiff/core/collate_ab_transforms.py @@ -11,7 +11,7 @@ import pyarrow as pa from abdiff.core.exceptions import OutputValidationError -from abdiff.core.utils import write_to_dataset +from abdiff.core.utils import parse_timdex_filename, write_to_dataset logger = logging.getLogger(__name__) @@ -75,8 +75,8 @@ def collate_ab_transforms( def get_transformed_records_iter( - transformed_file: str | Path, -) -> Generator[dict[str, str | bytes]]: + transformed_file: str, +) -> Generator[dict[str, str | bytes | None]]: """Yields data for every TIMDEX record in a transformed file. This function uses ijson to yield records from a JSON stream @@ -91,17 +91,16 @@ def get_transformed_records_iter( transformed file. * transformed_file_name: The name of the transformed file, excluding file extension. """ - version, transformed_file_name = parse_parquet_details_from_transformed_file( - str(transformed_file) - ) + version = get_transform_version(transformed_file) + filename_details = parse_timdex_filename(transformed_file) with open(transformed_file, "rb") as file: for record in ijson.items(file, "item"): yield { "timdex_record_id": record["timdex_record_id"], - "source": record["source"], + "source": filename_details["source"], "record": json.dumps(record).encode(), "version": version, - "transformed_file_name": transformed_file_name, + "transformed_file_name": transformed_file.split("/")[-1], } @@ -124,7 +123,7 @@ def get_transformed_batches_iter( for transformed_files in ab_transformed_file_lists: for transformed_file in transformed_files: record_iter = get_transformed_records_iter( - transformed_file=Path(run_directory) / transformed_file + transformed_file=str(Path(run_directory) / transformed_file) ) for record_batch in itertools.batched(record_iter, READ_BATCH_SIZE): yield pa.RecordBatch.from_pylist(list(record_batch)) @@ -255,19 +254,15 @@ def validate_output(dataset_path: str) -> None: ) -def parse_parquet_details_from_transformed_file(transformed_file: str) -> tuple[str, ...]: - """Parse parquet details from the absolute path of a transformed file. - - This will retrieve the transmogrifier image version ('a' or 'b') and - the transformed filename. - """ +def get_transform_version(transformed_filepath: str) -> str: + """Get A/B transform version, either 'a' or 'b'.""" match_result = re.match( - r".*transformed\/(.*)\/(.*).json", - transformed_file, + r".*transformed\/(.*)\/.*.json", + transformed_filepath, ) if not match_result: raise ValueError( # noqa: TRY003 - f"Transformed filename is invalid: {transformed_file}." + f"Transformed filepath is invalid: {transformed_filepath}." ) - version, filename = match_result.groups() - return version, filename + + return match_result.groups()[0] diff --git a/abdiff/core/run_ab_transforms.py b/abdiff/core/run_ab_transforms.py index 6654e8b..7bd7eab 100644 --- a/abdiff/core/run_ab_transforms.py +++ b/abdiff/core/run_ab_transforms.py @@ -3,7 +3,6 @@ import glob import logging import os -import re import time from concurrent.futures import Future, ThreadPoolExecutor from datetime import timedelta @@ -19,7 +18,11 @@ DockerContainerTimeoutError, OutputValidationError, ) -from abdiff.core.utils import create_subdirectories, update_or_create_run_json +from abdiff.core.utils import ( + create_subdirectories, + parse_timdex_filename, + update_or_create_run_json, +) CONFIG = Config() @@ -139,16 +142,14 @@ def run_all_docker_containers( with ThreadPoolExecutor(max_workers=CONFIG.transmogrifier_max_workers) as executor: for input_file in input_files: - source, output_file = parse_transform_details_from_extract_filename( - input_file - ) + filename_details = parse_timdex_filename(input_file) for docker_image, transformed_directory in run_configs: args = ( docker_image, transformed_directory, - source, + str(filename_details["source"]), input_file, - output_file, + get_transformed_filename(filename_details), docker_client, ) tasks.append(executor.submit(run_docker_container, *args)) @@ -310,22 +311,13 @@ def validate_output( ) -def parse_transform_details_from_extract_filename(input_file: str) -> tuple[str, ...]: - """Parse transform details from extract filename. - - Namely, the source and the output filename are parsed from the extract - filename. These variables are required by the transform command. - """ - extract_filename = input_file.split("/")[-1] - match_result = re.match( - r"^([\w\-]+?)-(\d{4}-\d{2}-\d{2})-(\w+)-extracted-records-to-index(?:_(\d+))?\.\w+$", - extract_filename, +def get_transformed_filename(filename_details: dict) -> str: + """Get transformed filename using extract filename details.""" + filename_details.update( + stage="transformed", + index=f"_{sequence}" if (sequence := filename_details["index"]) else "", ) - if not match_result: - raise ValueError( # noqa: TRY003 - f"Extract filename is invalid: {extract_filename}." - ) - source, date, cadence, sequence = match_result.groups() - sequence_suffix = f"_{sequence}" if sequence else "" - output_filename = f"{source}-{date}-{cadence}-transformed-records-to-index{sequence_suffix}.json" # noqa: E501 - return source, output_filename + output_filename = ( + "{source}-{date}-{cadence}-{stage}-records-to-index{index}.{file_type}" + ) + return output_filename.format(**filename_details) diff --git a/abdiff/core/utils.py b/abdiff/core/utils.py index 25cf4d5..6f60f40 100644 --- a/abdiff/core/utils.py +++ b/abdiff/core/utils.py @@ -2,6 +2,7 @@ import json import os +import re from collections.abc import Iterable from pathlib import Path @@ -78,6 +79,29 @@ def load_dataset(base_dir: str | Path) -> ds.Dataset: return ds.dataset(base_dir, partitioning="hive") +def parse_timdex_filename(s3_uri_or_filename: str) -> dict[str, str | None]: + """Parse details from filename.""" + filename = s3_uri_or_filename.split("/")[-1] + + match_result = re.match( + r"^([\w\-]+?)-(\d{4}-\d{2}-\d{2})-(\w+)-(\w+)-records-to-(.+?)(?:_(\d+))?\.(\w+)$", + filename, + ) + + keys = ["source", "date", "cadence", "stage", "action", "index", "file_type"] + if not match_result: + raise ValueError( # noqa: TRY003 + f"Provided S3 URI and filename is invalid: {filename}." + ) + + try: + return dict(zip(keys, match_result.groups(), strict=True)) + except ValueError as exception: + raise ValueError( # noqa: TRY003 + f"Provided S3 URI and filename is invalid: {filename}." + ) from exception + + def write_to_dataset( data: ( ds.Dataset diff --git a/tests/test_collate_ab_transforms.py b/tests/test_collate_ab_transforms.py index 3d3c68d..80f16ac 100644 --- a/tests/test_collate_ab_transforms.py +++ b/tests/test_collate_ab_transforms.py @@ -14,9 +14,9 @@ TRANSFORMED_DATASET_SCHEMA, collate_ab_transforms, get_joined_batches_iter, + get_transform_version, get_transformed_batches_iter, get_transformed_records_iter, - parse_parquet_details_from_transformed_file, validate_output, ) from abdiff.core.exceptions import OutputValidationError @@ -55,19 +55,21 @@ def test_collate_ab_transforms_success( str(Path(collated_dataset_path) / "records-0.parquet") ] assert len(collated_df) == 10 - assert set(collated_df["source"].unique()) == {"MIT Alma", "DSpace@MIT"} + assert set(collated_df["source"].unique()) == {"alma", "dspace"} # assert result of full outer join missing_in_b = collated_df[collated_df["record_b"].isna()] assert len(missing_in_b) == 1 - assert missing_in_b["source"].to_list() == ["MIT Alma"] + assert missing_in_b["source"].to_list() == ["alma"] def test_get_transformed_records_iter_success(example_transformed_directory): """Validates the structure of the yielded TIMDEX record dictionaries.""" records_iter = get_transformed_records_iter( - transformed_file=Path(example_transformed_directory) - / "a/alma-2024-08-29-daily-transformed-records-to-index.json" + transformed_file=str( + Path(example_transformed_directory) + / "a/alma-2024-08-29-daily-transformed-records-to-index.json" + ) ) timdex_record_dict = next(records_iter) @@ -82,7 +84,7 @@ def test_get_transformed_records_iter_success(example_transformed_directory): assert timdex_record_dict["version"] == "a" assert ( timdex_record_dict["transformed_file_name"] - == "alma-2024-08-29-daily-transformed-records-to-index" + == "alma-2024-08-29-daily-transformed-records-to-index.json" ) @@ -169,24 +171,15 @@ def test_validate_output_raises_error_if_missing_record_column(run_directory): validate_output(dataset_path=missing_record_cols_dataset_path) -def test_parse_parquet_details_from_transformed_file_success( - transformed_directories, output_filename -): +def test_get_transform_version_success(transformed_directories, output_filename): transformed_directory_a, transformed_directory_b = transformed_directories transformed_file_a = str(Path(transformed_directory_a) / output_filename) transformed_file_b = str(Path(transformed_directory_b) / output_filename) - transformed_filename = output_filename.replace(".json", "") # remove file .ext - assert parse_parquet_details_from_transformed_file(transformed_file_a) == ( - "a", - transformed_filename, - ) - assert parse_parquet_details_from_transformed_file(transformed_file_b) == ( - "b", - transformed_filename, - ) + assert get_transform_version(transformed_file_a) == "a" + assert get_transform_version(transformed_file_b) == "b" -def test_parse_parquet_details_from_transformed_file_raise_error(output_filename): - with pytest.raises(ValueError, match="Transformed filename is invalid"): - parse_parquet_details_from_transformed_file(output_filename) +def test_get_transform_version_raise_error(): + with pytest.raises(ValueError, match="Transformed filepath is invalid."): + get_transform_version("invalid") diff --git a/tests/test_run_ab_transforms.py b/tests/test_run_ab_transforms.py index d7e2caf..965619e 100644 --- a/tests/test_run_ab_transforms.py +++ b/tests/test_run_ab_transforms.py @@ -12,12 +12,13 @@ from abdiff.core.run_ab_transforms import ( aggregate_logs, collect_container_results, + get_transformed_filename, get_transformed_files, - parse_transform_details_from_extract_filename, run_ab_transforms, run_docker_container, validate_output, ) +from abdiff.core.utils import parse_timdex_filename from tests.conftest import MockedContainerRun, MockedFutureSuccess @@ -113,13 +114,13 @@ def test_run_docker_container_success( ) input_file = "s3://timdex-extract-dev/source/source-2024-01-01-daily-extracted-records-to-index.xml" - source, output_file = parse_transform_details_from_extract_filename(input_file) - container, exception = run_docker_container( + filename_details = parse_timdex_filename(input_file) + container, _ = run_docker_container( docker_image=image_name, transformed_directory=transformed_directory, - source=source, + source=filename_details["source"], input_file=input_file, - output_file=output_file, + output_file=get_transformed_filename(filename_details), docker_client=mocked_docker_client, ) assert container.id == mocked_docker_container.id @@ -180,34 +181,30 @@ def test_validate_output_error(): validate_output(ab_transformed_file_lists=([], []), input_files_count=1) -def test_parse_transform_details_from_extract_filename_success(input_file): - source, output_file = parse_transform_details_from_extract_filename(input_file) - assert source == "source" - assert output_file == "source-2024-01-01-full-transformed-records-to-index.json" - - -def test_parse_transform_details_from_extract_filename_if_sequenced_success(): - input_file = "s3://timdex-extract-dev/source/source-2024-01-01-full-extracted-records-to-index_01.xml" - source, output_filename = parse_transform_details_from_extract_filename(input_file) - assert source == "source" +def test_get_output_filename_success(): assert ( - output_filename == "source-2024-01-01-full-transformed-records-to-index_01.json" + get_transformed_filename( + { + "source": "source", + "date": "2024-01-01", + "cadence": "full", + "stage": "extracted", + "action": "index", + "index": None, + "file_type": "xml", + } + ) + == "source-2024-01-01-full-transformed-records-to-index.xml" ) -def test_parse_transform_details_from_extract_filename_raise_error(): - input_file = "s3://timdex-extract-dev/source/-2024-01-01-full-extracted-records-to-index_01.xml" - with pytest.raises(ValueError, match="Extract filename is invalid"): - parse_transform_details_from_extract_filename(input_file) - - def test_run_docker_container_timeout_triggered( mocked_docker_client, mocked_docker_container_a ): mocked_docker_client.containers.run.return_value = mocked_docker_container_a mocked_docker_container_a.run_duration = 2 timeout = 1 - container, exception = run_docker_container( + _, exception = run_docker_container( "abc123", "abc", "alma", @@ -225,7 +222,7 @@ def test_run_docker_container_timeout_not_triggered( mocked_docker_client.containers.run.return_value = mocked_docker_container_a mocked_docker_container_a.run_duration = 2 timeout = 3 - container, exception = run_docker_container( + container, _ = run_docker_container( "abc123", "abc", "alma", @@ -288,3 +285,20 @@ def test_collect_containers_return_containers_and_exceptions( assert len(containers) == 2 assert len(exceptions) == 1 assert exceptions[0] == mocked_exception + + +def test_get_output_filename_indexed_success(): + assert ( + get_transformed_filename( + { + "source": "source", + "date": "2024-01-01", + "cadence": "full", + "stage": "extracted", + "action": "index", + "index": "01", + "file_type": "xml", + } + ) + == "source-2024-01-01-full-transformed-records-to-index_01.xml" + ) diff --git a/tests/test_utils.py b/tests/test_utils.py index 34957a8..bf36318 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -8,6 +8,7 @@ from abdiff.core import init_job from abdiff.core.utils import ( create_subdirectories, + parse_timdex_filename, read_job_json, update_or_create_job_json, write_to_dataset, @@ -56,6 +57,53 @@ def test_create_sub_directories_success(tmp_path): assert os.path.exists(subdirectory_b) +def test_parse_timdex_filename_s3_uri_success(): + assert parse_timdex_filename( + "s3://timdex-extract-dev/source/source-2024-01-01-full-extracted-records-to-index.xml" + ) == { + "source": "source", + "date": "2024-01-01", + "cadence": "full", + "stage": "extracted", + "action": "index", + "index": None, + "file_type": "xml", + } + + +def test_parse_timdex_filename_indexed_s3_uri_success(): + assert parse_timdex_filename( + "s3://timdex-extract-dev/source/source-2024-01-01-full-extracted-records-to-index_01.xml" + ) == { + "source": "source", + "date": "2024-01-01", + "cadence": "full", + "stage": "extracted", + "action": "index", + "index": "01", + "file_type": "xml", + } + + +def test_parse_timdex_filename_filename_success(): + assert parse_timdex_filename( + "source-2024-01-01-full-extracted-records-to-index.xml" + ) == { + "source": "source", + "date": "2024-01-01", + "cadence": "full", + "stage": "extracted", + "action": "index", + "index": None, + "file_type": "xml", + } + + +def test_parse_timdex_filename_raise_error_if_invalid_filename(): + with pytest.raises(ValueError, match="Provided S3 URI and filename is invalid."): + parse_timdex_filename(s3_uri_or_filename="invalid") + + def test_write_to_dataset_success(tmp_path): record_batch = pa.RecordBatch.from_pylist( [{"fruit": "apple", "color": "red"}, {"fruit": "banana", "color": "yellow"}] From 9425ce3b4a3c44267944687387623b73bfc3af67 Mon Sep 17 00:00:00 2001 From: jonavellecuerdo Date: Tue, 29 Oct 2024 12:41:00 -0400 Subject: [PATCH 2/2] Address comments in PR * Update modules to use 'run-date' and 'run-type' --- abdiff/core/run_ab_transforms.py | 11 +++++++++-- abdiff/core/utils.py | 2 +- tests/test_run_ab_transforms.py | 8 ++++---- tests/test_utils.py | 12 ++++++------ 4 files changed, 20 insertions(+), 13 deletions(-) diff --git a/abdiff/core/run_ab_transforms.py b/abdiff/core/run_ab_transforms.py index 7bd7eab..28a447b 100644 --- a/abdiff/core/run_ab_transforms.py +++ b/abdiff/core/run_ab_transforms.py @@ -318,6 +318,13 @@ def get_transformed_filename(filename_details: dict) -> str: index=f"_{sequence}" if (sequence := filename_details["index"]) else "", ) output_filename = ( - "{source}-{date}-{cadence}-{stage}-records-to-index{index}.{file_type}" + "{source}-{run_date}-{run_type}-{stage}-records-to-index{index}.{file_type}" + ) + return output_filename.format( + source=filename_details["source"], + run_date=filename_details["run-date"], + run_type=filename_details["run-type"], + stage=filename_details["stage"], + index=filename_details["index"], + file_type=filename_details["file_type"], ) - return output_filename.format(**filename_details) diff --git a/abdiff/core/utils.py b/abdiff/core/utils.py index 6f60f40..501c2d1 100644 --- a/abdiff/core/utils.py +++ b/abdiff/core/utils.py @@ -88,7 +88,7 @@ def parse_timdex_filename(s3_uri_or_filename: str) -> dict[str, str | None]: filename, ) - keys = ["source", "date", "cadence", "stage", "action", "index", "file_type"] + keys = ["source", "run-date", "run-type", "stage", "action", "index", "file_type"] if not match_result: raise ValueError( # noqa: TRY003 f"Provided S3 URI and filename is invalid: {filename}." diff --git a/tests/test_run_ab_transforms.py b/tests/test_run_ab_transforms.py index 965619e..fa82e88 100644 --- a/tests/test_run_ab_transforms.py +++ b/tests/test_run_ab_transforms.py @@ -186,8 +186,8 @@ def test_get_output_filename_success(): get_transformed_filename( { "source": "source", - "date": "2024-01-01", - "cadence": "full", + "run-date": "2024-01-01", + "run-type": "full", "stage": "extracted", "action": "index", "index": None, @@ -292,8 +292,8 @@ def test_get_output_filename_indexed_success(): get_transformed_filename( { "source": "source", - "date": "2024-01-01", - "cadence": "full", + "run-date": "2024-01-01", + "run-type": "full", "stage": "extracted", "action": "index", "index": "01", diff --git a/tests/test_utils.py b/tests/test_utils.py index bf36318..c2f2c65 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -62,8 +62,8 @@ def test_parse_timdex_filename_s3_uri_success(): "s3://timdex-extract-dev/source/source-2024-01-01-full-extracted-records-to-index.xml" ) == { "source": "source", - "date": "2024-01-01", - "cadence": "full", + "run-date": "2024-01-01", + "run-type": "full", "stage": "extracted", "action": "index", "index": None, @@ -76,8 +76,8 @@ def test_parse_timdex_filename_indexed_s3_uri_success(): "s3://timdex-extract-dev/source/source-2024-01-01-full-extracted-records-to-index_01.xml" ) == { "source": "source", - "date": "2024-01-01", - "cadence": "full", + "run-date": "2024-01-01", + "run-type": "full", "stage": "extracted", "action": "index", "index": "01", @@ -90,8 +90,8 @@ def test_parse_timdex_filename_filename_success(): "source-2024-01-01-full-extracted-records-to-index.xml" ) == { "source": "source", - "date": "2024-01-01", - "cadence": "full", + "run-date": "2024-01-01", + "run-type": "full", "stage": "extracted", "action": "index", "index": None,