Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 15 additions & 20 deletions abdiff/core/collate_ab_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import pyarrow as pa

from abdiff.core.exceptions import OutputValidationError
from abdiff.core.utils import write_to_dataset
from abdiff.core.utils import parse_timdex_filename, write_to_dataset

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -75,8 +75,8 @@ def collate_ab_transforms(


def get_transformed_records_iter(
transformed_file: str | Path,
) -> Generator[dict[str, str | bytes]]:
transformed_file: str,
) -> Generator[dict[str, str | bytes | None]]:
"""Yields data for every TIMDEX record in a transformed file.

This function uses ijson to yield records from a JSON stream
Expand All @@ -91,17 +91,16 @@ def get_transformed_records_iter(
transformed file.
* transformed_file_name: The name of the transformed file, excluding file extension.
"""
version, transformed_file_name = parse_parquet_details_from_transformed_file(
str(transformed_file)
)
version = get_transform_version(transformed_file)
filename_details = parse_timdex_filename(transformed_file)
with open(transformed_file, "rb") as file:
for record in ijson.items(file, "item"):
yield {
"timdex_record_id": record["timdex_record_id"],
"source": record["source"],
"source": filename_details["source"],
"record": json.dumps(record).encode(),
"version": version,
"transformed_file_name": transformed_file_name,
"transformed_file_name": transformed_file.split("/")[-1],
}


Expand All @@ -124,7 +123,7 @@ def get_transformed_batches_iter(
for transformed_files in ab_transformed_file_lists:
for transformed_file in transformed_files:
record_iter = get_transformed_records_iter(
transformed_file=Path(run_directory) / transformed_file
transformed_file=str(Path(run_directory) / transformed_file)
)
for record_batch in itertools.batched(record_iter, READ_BATCH_SIZE):
yield pa.RecordBatch.from_pylist(list(record_batch))
Expand Down Expand Up @@ -255,19 +254,15 @@ def validate_output(dataset_path: str) -> None:
)


def parse_parquet_details_from_transformed_file(transformed_file: str) -> tuple[str, ...]:
"""Parse parquet details from the absolute path of a transformed file.

This will retrieve the transmogrifier image version ('a' or 'b') and
the transformed filename.
"""
def get_transform_version(transformed_filepath: str) -> str:
"""Get A/B transform version, either 'a' or 'b'."""
match_result = re.match(
r".*transformed\/(.*)\/(.*).json",
transformed_file,
r".*transformed\/(.*)\/.*.json",
transformed_filepath,
)
if not match_result:
raise ValueError( # noqa: TRY003
f"Transformed filename is invalid: {transformed_file}."
f"Transformed filepath is invalid: {transformed_filepath}."
)
version, filename = match_result.groups()
return version, filename

return match_result.groups()[0]
49 changes: 24 additions & 25 deletions abdiff/core/run_ab_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import glob
import logging
import os
import re
import time
from concurrent.futures import Future, ThreadPoolExecutor
from datetime import timedelta
Expand All @@ -19,7 +18,11 @@
DockerContainerTimeoutError,
OutputValidationError,
)
from abdiff.core.utils import create_subdirectories, update_or_create_run_json
from abdiff.core.utils import (
create_subdirectories,
parse_timdex_filename,
update_or_create_run_json,
)

CONFIG = Config()

Expand Down Expand Up @@ -139,16 +142,14 @@ def run_all_docker_containers(

with ThreadPoolExecutor(max_workers=CONFIG.transmogrifier_max_workers) as executor:
for input_file in input_files:
source, output_file = parse_transform_details_from_extract_filename(
input_file
)
filename_details = parse_timdex_filename(input_file)
for docker_image, transformed_directory in run_configs:
args = (
docker_image,
transformed_directory,
source,
str(filename_details["source"]),
input_file,
output_file,
get_transformed_filename(filename_details),
docker_client,
)
tasks.append(executor.submit(run_docker_container, *args))
Expand Down Expand Up @@ -310,22 +311,20 @@ def validate_output(
)


def parse_transform_details_from_extract_filename(input_file: str) -> tuple[str, ...]:
"""Parse transform details from extract filename.

Namely, the source and the output filename are parsed from the extract
filename. These variables are required by the transform command.
"""
extract_filename = input_file.split("/")[-1]
match_result = re.match(
r"^([\w\-]+?)-(\d{4}-\d{2}-\d{2})-(\w+)-extracted-records-to-index(?:_(\d+))?\.\w+$",
extract_filename,
def get_transformed_filename(filename_details: dict) -> str:
"""Get transformed filename using extract filename details."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this update to use the parsed components for the output filename.

filename_details.update(
stage="transformed",
index=f"_{sequence}" if (sequence := filename_details["index"]) else "",
)
output_filename = (
"{source}-{run_date}-{run_type}-{stage}-records-to-index{index}.{file_type}"
)
return output_filename.format(
source=filename_details["source"],
run_date=filename_details["run-date"],
run_type=filename_details["run-type"],
stage=filename_details["stage"],
index=filename_details["index"],
file_type=filename_details["file_type"],
)
if not match_result:
raise ValueError( # noqa: TRY003
f"Extract filename is invalid: {extract_filename}."
)
source, date, cadence, sequence = match_result.groups()
sequence_suffix = f"_{sequence}" if sequence else ""
output_filename = f"{source}-{date}-{cadence}-transformed-records-to-index{sequence_suffix}.json" # noqa: E501
return source, output_filename
24 changes: 24 additions & 0 deletions abdiff/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import json
import os
import re
from collections.abc import Iterable
from pathlib import Path

Expand Down Expand Up @@ -78,6 +79,29 @@ def load_dataset(base_dir: str | Path) -> ds.Dataset:
return ds.dataset(base_dir, partitioning="hive")


def parse_timdex_filename(s3_uri_or_filename: str) -> dict[str, str | None]:
"""Parse details from filename."""
filename = s3_uri_or_filename.split("/")[-1]

match_result = re.match(
r"^([\w\-]+?)-(\d{4}-\d{2}-\d{2})-(\w+)-(\w+)-records-to-(.+?)(?:_(\d+))?\.(\w+)$",
filename,
)

keys = ["source", "run-date", "run-type", "stage", "action", "index", "file_type"]
if not match_result:
raise ValueError( # noqa: TRY003
f"Provided S3 URI and filename is invalid: {filename}."
)

try:
return dict(zip(keys, match_result.groups(), strict=True))
except ValueError as exception:
raise ValueError( # noqa: TRY003
f"Provided S3 URI and filename is invalid: {filename}."
) from exception


def write_to_dataset(
data: (
ds.Dataset
Expand Down
35 changes: 14 additions & 21 deletions tests/test_collate_ab_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
TRANSFORMED_DATASET_SCHEMA,
collate_ab_transforms,
get_joined_batches_iter,
get_transform_version,
get_transformed_batches_iter,
get_transformed_records_iter,
parse_parquet_details_from_transformed_file,
validate_output,
)
from abdiff.core.exceptions import OutputValidationError
Expand Down Expand Up @@ -55,19 +55,21 @@ def test_collate_ab_transforms_success(
str(Path(collated_dataset_path) / "records-0.parquet")
]
assert len(collated_df) == 10
assert set(collated_df["source"].unique()) == {"MIT Alma", "DSpace@MIT"}
assert set(collated_df["source"].unique()) == {"alma", "dspace"}

# assert result of full outer join
missing_in_b = collated_df[collated_df["record_b"].isna()]
assert len(missing_in_b) == 1
assert missing_in_b["source"].to_list() == ["MIT Alma"]
assert missing_in_b["source"].to_list() == ["alma"]


def test_get_transformed_records_iter_success(example_transformed_directory):
"""Validates the structure of the yielded TIMDEX record dictionaries."""
records_iter = get_transformed_records_iter(
transformed_file=Path(example_transformed_directory)
/ "a/alma-2024-08-29-daily-transformed-records-to-index.json"
transformed_file=str(
Path(example_transformed_directory)
/ "a/alma-2024-08-29-daily-transformed-records-to-index.json"
)
)
timdex_record_dict = next(records_iter)

Expand All @@ -82,7 +84,7 @@ def test_get_transformed_records_iter_success(example_transformed_directory):
assert timdex_record_dict["version"] == "a"
assert (
timdex_record_dict["transformed_file_name"]
== "alma-2024-08-29-daily-transformed-records-to-index"
== "alma-2024-08-29-daily-transformed-records-to-index.json"
)


Expand Down Expand Up @@ -169,24 +171,15 @@ def test_validate_output_raises_error_if_missing_record_column(run_directory):
validate_output(dataset_path=missing_record_cols_dataset_path)


def test_parse_parquet_details_from_transformed_file_success(
transformed_directories, output_filename
):
def test_get_transform_version_success(transformed_directories, output_filename):
transformed_directory_a, transformed_directory_b = transformed_directories
transformed_file_a = str(Path(transformed_directory_a) / output_filename)
transformed_file_b = str(Path(transformed_directory_b) / output_filename)
transformed_filename = output_filename.replace(".json", "") # remove file .ext

assert parse_parquet_details_from_transformed_file(transformed_file_a) == (
"a",
transformed_filename,
)
assert parse_parquet_details_from_transformed_file(transformed_file_b) == (
"b",
transformed_filename,
)
assert get_transform_version(transformed_file_a) == "a"
assert get_transform_version(transformed_file_b) == "b"


def test_parse_parquet_details_from_transformed_file_raise_error(output_filename):
with pytest.raises(ValueError, match="Transformed filename is invalid"):
parse_parquet_details_from_transformed_file(output_filename)
def test_get_transform_version_raise_error():
with pytest.raises(ValueError, match="Transformed filepath is invalid."):
get_transform_version("invalid")
62 changes: 38 additions & 24 deletions tests/test_run_ab_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
from abdiff.core.run_ab_transforms import (
aggregate_logs,
collect_container_results,
get_transformed_filename,
get_transformed_files,
parse_transform_details_from_extract_filename,
run_ab_transforms,
run_docker_container,
validate_output,
)
from abdiff.core.utils import parse_timdex_filename
from tests.conftest import MockedContainerRun, MockedFutureSuccess


Expand Down Expand Up @@ -113,13 +114,13 @@ def test_run_docker_container_success(
)

input_file = "s3://timdex-extract-dev/source/source-2024-01-01-daily-extracted-records-to-index.xml"
source, output_file = parse_transform_details_from_extract_filename(input_file)
container, exception = run_docker_container(
filename_details = parse_timdex_filename(input_file)
container, _ = run_docker_container(
docker_image=image_name,
transformed_directory=transformed_directory,
source=source,
source=filename_details["source"],
input_file=input_file,
output_file=output_file,
output_file=get_transformed_filename(filename_details),
docker_client=mocked_docker_client,
)
assert container.id == mocked_docker_container.id
Expand Down Expand Up @@ -180,34 +181,30 @@ def test_validate_output_error():
validate_output(ab_transformed_file_lists=([], []), input_files_count=1)


def test_parse_transform_details_from_extract_filename_success(input_file):
source, output_file = parse_transform_details_from_extract_filename(input_file)
assert source == "source"
assert output_file == "source-2024-01-01-full-transformed-records-to-index.json"


def test_parse_transform_details_from_extract_filename_if_sequenced_success():
input_file = "s3://timdex-extract-dev/source/source-2024-01-01-full-extracted-records-to-index_01.xml"
source, output_filename = parse_transform_details_from_extract_filename(input_file)
assert source == "source"
def test_get_output_filename_success():
assert (
output_filename == "source-2024-01-01-full-transformed-records-to-index_01.json"
get_transformed_filename(
{
"source": "source",
"run-date": "2024-01-01",
"run-type": "full",
"stage": "extracted",
"action": "index",
"index": None,
"file_type": "xml",
}
)
== "source-2024-01-01-full-transformed-records-to-index.xml"
)


def test_parse_transform_details_from_extract_filename_raise_error():
input_file = "s3://timdex-extract-dev/source/-2024-01-01-full-extracted-records-to-index_01.xml"
with pytest.raises(ValueError, match="Extract filename is invalid"):
parse_transform_details_from_extract_filename(input_file)


def test_run_docker_container_timeout_triggered(
mocked_docker_client, mocked_docker_container_a
):
mocked_docker_client.containers.run.return_value = mocked_docker_container_a
mocked_docker_container_a.run_duration = 2
timeout = 1
container, exception = run_docker_container(
_, exception = run_docker_container(
"abc123",
"abc",
"alma",
Expand All @@ -225,7 +222,7 @@ def test_run_docker_container_timeout_not_triggered(
mocked_docker_client.containers.run.return_value = mocked_docker_container_a
mocked_docker_container_a.run_duration = 2
timeout = 3
container, exception = run_docker_container(
container, _ = run_docker_container(
"abc123",
"abc",
"alma",
Expand Down Expand Up @@ -288,3 +285,20 @@ def test_collect_containers_return_containers_and_exceptions(
assert len(containers) == 2
assert len(exceptions) == 1
assert exceptions[0] == mocked_exception


def test_get_output_filename_indexed_success():
assert (
get_transformed_filename(
{
"source": "source",
"run-date": "2024-01-01",
"run-type": "full",
"stage": "extracted",
"action": "index",
"index": "01",
"file_type": "xml",
}
)
== "source-2024-01-01-full-transformed-records-to-index_01.xml"
)
Loading