Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
SHELL=/bin/bash
DATETIME:=$(shell date -u +%Y%m%dT%H%M%SZ)
MINIO_COMPOSE_FILE=abdiff/extras/minio/docker-compose.yaml

help: # Preview Makefile commands
@awk 'BEGIN { FS = ":.*#"; print "Usage: make <target>\n\nTargets:" } \
Expand Down Expand Up @@ -54,3 +55,13 @@ black-apply: # Apply changes with 'black'

ruff-apply: # Resolve 'fixable errors' with 'ruff'
pipenv run ruff check --fix .

####################################
# MinIO local S3 commands
####################################

start-minio-server:
docker compose --env-file .env -f $(MINIO_COMPOSE_FILE) up -d

stop-minio-server:
docker compose --env-file .env -f $(MINIO_COMPOSE_FILE) stop
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ deepdiff = "*"

[dev-packages]
black = "*"
boto3-stubs = {version = "*", extras = ["s3"]}
coveralls = "*"
freezegun = "*"
ipython = "*"
Expand Down
402 changes: 224 additions & 178 deletions Pipfile.lock

Large diffs are not rendered by default.

42 changes: 42 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,43 @@ Compare transformed TIMDEX records from two versions (A,B) of Transmogrifier.
- To lint the repo: `make lint`
- To run the app: `pipenv run abdiff --help`

### Running a Local MinIO Server

TIMDEX extract files from S3 (i.e., input files to use in transformations) can be downloaded to a local MinIO server hosted via a Docker container. [MinIO is an object storage solution that provides an Amazon Web Services S3-compatible API and supports all core S3 features](https://min.io/docs/minio/kubernetes/upstream/). The MinIO server acts as a "local S3 file system", allowing the app to access data on disk through an S3 interface. Since the MinIO server runs in a Docker container, it can be easily started when needed and stopped when not in use. Any data stored in the MinIO server will persist as long as the files exist in the directory specified for `MINIO_S3_LOCAL_STORAGE`.

Downloading extract files improves the runtime of a diff by reducing the number of requests sent to S3 and avoids AWS credentials timing out. Once an extract file is stored in the local MinIO server, the app can access the data from MinIO for all future runs that include the extract file, avoiding repeated downloads of data used across multiple runs.


1. Configure your `.env` file. In addition to the [required environment variables](#required), the following environment variables must also be set:

```text
MINIO_S3_LOCAL_STORAGE=# full file system path to the directory where MinIO stores its object data on the local disk
MINIO_ROOT_USER=# username for root user account for MinIO server
MINIO_ROOT_PASSWORD=# password for root user account MinIO server
TIMDEX_BUCKET=# when using CLI command 'timdex-sources-csv', this is required to know what TIMDEX bucket to use
```

Note: There are additional variables required by the Local MinIO server (see vars prefixed with "MINIO" in [optional environment variables](#optional)). For these variables, defaults are provided in [abdiff.config](abdiff/config.py).

2. Create an AWS profile `minio`. When prompted for an "AWS Access Key ID" and "AWS Secret Access Key", pass the values set for the `MINIO_ROOT_USER` and `MINIO_ROOT_PASSWORD` environment variables, respectively.
```shell
aws configure --profile minio
```

3. Launch a local MinIO server via Docker container by running the Makefile command:
```shell
make start-minio-server
```

The API is accessible at: http://127.0.0.1:9000.
The WebUI is accessible at: http://127.0.0.1:9001.

4. On your browser, navigate to the WebUI and sign into the local MinIO server. Create a bucket in the local MinIO server named after the S3 bucket containing the TIMDEX extract files that will be used in the A/B Diff.

5. Proceed with A/B Diff CLI commands as needed!

Once a diff run is complete, you can stop the local MinIO server using the Makefile command: `make stop-minio-server`. If you're planning to run another diff using the same files, all you have to do is restart the local MinIO server. Your data will persist as long as the files exist in the directory you specified for `MINIO_S3_LOCAL_STORAGE`.

## Concepts

A **Job** in `abdiff` represents the A/B test for comparing the results from two versions of Transmogrifier. When a job is first created, a working directory and a JSON file `job.json` with an initial set of configurations is created.
Expand Down Expand Up @@ -90,6 +127,11 @@ AWS_SESSION_TOKEN=# passed to Transmogrifier containers for use
### Optional

```text
MINIO_S3_LOCAL_STORAGE=# full file system path to the directory where MinIO stores its object data on the local disk
MINIO_S3_URL=# endpoint for MinIO server API; default is "http://localhost:9000/"
MINIO_S3_CONTAINER_URL=# endpoint for the MinIO server when acccessed from inside a Docker container; default is "http://host.docker.internal:9000/"
MINIO_ROOT_USER=# username for root user account for MinIO server
MINIO_ROOT_PASSWORD=# password for root user account MinIO server
WEBAPP_HOST=# host for flask webapp
WEBAPP_PORT=# port for flask webapp
TRANSMOGRIFIER_MAX_WORKERS=# max number of Transmogrifier containers to run in parallel; default is 6
Expand Down
17 changes: 16 additions & 1 deletion abdiff/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
calc_ab_diffs,
calc_ab_metrics,
collate_ab_transforms,
download_input_files,
init_run,
run_ab_transforms,
)
Expand Down Expand Up @@ -148,7 +149,17 @@ def init_job(
help="Message to describe Run.",
default="Not provided.",
)
def run_diff(job_directory: str, input_files: str, message: str) -> None:
@click.option(
"--download-files",
is_flag=True,
help=(
"Pass to download input files from AWS S3 to a local Minio S3 server "
"for Transmogrifier to use."
),
)
def run_diff(
job_directory: str, input_files: str, message: str, *, download_files: bool
) -> None:

job_data = read_job_json(job_directory)
run_directory = init_run(job_directory, message=message)
Expand All @@ -160,11 +171,15 @@ def run_diff(job_directory: str, input_files: str, message: str) -> None:
else:
input_files_list = [filepath.strip() for filepath in input_files.split(",")]

if download_files:
download_input_files(input_files_list)

ab_transformed_file_lists = run_ab_transforms(
run_directory=run_directory,
image_tag_a=job_data["image_tag_a"],
image_tag_b=job_data["image_tag_b"],
input_files=input_files_list,
use_local_s3=download_files,
)
collated_dataset_path = collate_ab_transforms(
run_directory=run_directory,
Expand Down
23 changes: 23 additions & 0 deletions abdiff/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ class Config:
"WORKSPACE",
)
OPTIONAL_ENV_VARS = (
"MINIO_S3_LOCAL_STORAGE",
"MINIO_S3_URL",
"MINIO_S3_CONTAINER_URL",
"MINIO_ROOT_USER",
"MINIO_ROOT_PASSWORD",
"WEBAPP_HOST",
"WEBAPP_PORT",
"TRANSMOGRIFIER_MAX_WORKERS",
Expand All @@ -25,6 +30,24 @@ def __getattr__(self, name: str) -> Any: # noqa: ANN401
message = f"'{name}' not a valid configuration variable"
raise AttributeError(message)

@property
def minio_s3_url(self) -> str:
"""Host for ABDiff context (host machine) to connect to MinIO."""
return self.MINIO_S3_URL or "http://localhost:9000/"

@property
def minio_s3_container_url(self) -> str:
"""Host for Transmogrifier Docker containers to connect to MinIO."""
return self.MINIO_S3_CONTAINER_URL or "http://host.docker.internal:9000/"

@property
def minio_root_user(self) -> str:
return self.MINIO_ROOT_USER or "minioadmin"

@property
def minio_root_password(self) -> str:
return self.MINIO_ROOT_PASSWORD or "minioadmin"

@property
def webapp_host(self) -> str:
return self.WEBAPP_HOST or "localhost"
Expand Down
2 changes: 2 additions & 0 deletions abdiff/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
from abdiff.core.init_job import init_job
from abdiff.core.init_run import init_run
from abdiff.core.run_ab_transforms import run_ab_transforms
from abdiff.extras.minio.download_input_files import download_input_files

__all__ = [
"init_job",
"init_run",
"build_ab_images",
"download_input_files",
"run_ab_transforms",
"collate_ab_transforms",
"calc_ab_diffs",
Expand Down
39 changes: 32 additions & 7 deletions abdiff/core/run_ab_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ def run_ab_transforms(
image_tag_b: str,
input_files: list[str],
docker_client: docker.client.DockerClient | None = None,
*,
use_local_s3: bool = False,
) -> tuple[list[str], ...]:
"""Run Docker containers with versioned images of Transmogrifier.
Expand All @@ -59,6 +61,10 @@ def run_ab_transforms(
URIs for input files on S3 are accepted.
docker_client (docker.client.DockerClient | None, optional): Docker client.
Defaults to None.
use_local_s3 (bool): Boolean indicating whether the container should
access input files from a local MinIO server (i.e., "local S3 bucket")
or from AWS S3. This flag determines the appropriate environment variables
to set for the Docker containers. Default is False.
Returns:
tuple[list[str], ...]: A tuple containing two lists, where each list contains
Expand Down Expand Up @@ -95,7 +101,9 @@ def run_ab_transforms(
]

# run containers and collect results
futures = run_all_docker_containers(docker_client, input_files, run_configs)
futures = run_all_docker_containers(
docker_client, input_files, run_configs, use_local_s3=use_local_s3
)
containers, exceptions = collect_container_results(futures)
logger.info(
f"Successful containers: {len(containers)}, failed containers: {len(exceptions)}"
Expand Down Expand Up @@ -129,6 +137,8 @@ def run_all_docker_containers(
docker_client: docker.client.DockerClient,
input_files: list[str],
run_configs: list[tuple],
*,
use_local_s3: bool = False,
) -> list[Future]:
"""Invoke Docker containers to run in parallel via threads.
Expand All @@ -152,7 +162,11 @@ def run_all_docker_containers(
get_transformed_filename(filename_details),
docker_client,
)
tasks.append(executor.submit(run_docker_container, *args))
tasks.append(
executor.submit(
run_docker_container, *args, use_local_s3=use_local_s3
)
)

logger.info(f"All {len(tasks)} containers have exited.")
return tasks
Expand All @@ -166,12 +180,27 @@ def run_docker_container(
output_file: str,
docker_client: docker.client.DockerClient,
timeout: int = CONFIG.transmogrifier_timeout,
*,
use_local_s3: bool = False,
) -> tuple[Container, Exception | None]:
"""Run Transmogrifier via Docker container to transform input file.
The container is run in a detached state to capture a container handle for later use
but this function waits for the container to exit before returning.
"""
if use_local_s3:
environment_variables = {
"AWS_ENDPOINT_URL": CONFIG.minio_s3_container_url,
"AWS_ACCESS_KEY_ID": CONFIG.minio_root_user,
"AWS_SECRET_ACCESS_KEY": CONFIG.minio_root_password,
}
else:
environment_variables = {
"AWS_ACCESS_KEY_ID": CONFIG.AWS_ACCESS_KEY_ID,
"AWS_SECRET_ACCESS_KEY": CONFIG.AWS_SECRET_ACCESS_KEY,
"AWS_SESSION_TOKEN": CONFIG.AWS_SESSION_TOKEN,
}

container = docker_client.containers.run(
docker_image,
command=[
Expand All @@ -180,11 +209,7 @@ def run_docker_container(
f"--source={source}",
],
detach=True,
environment={
"AWS_ACCESS_KEY_ID": CONFIG.AWS_ACCESS_KEY_ID,
"AWS_SECRET_ACCESS_KEY": CONFIG.AWS_SECRET_ACCESS_KEY,
"AWS_SESSION_TOKEN": CONFIG.AWS_SESSION_TOKEN,
},
environment=environment_variables,
labels={
"docker_image": docker_image,
"source": source,
Expand Down
Empty file added abdiff/extras/minio/__init__.py
Empty file.
17 changes: 17 additions & 0 deletions abdiff/extras/minio/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
services:
minio:
image: quay.io/minio/minio:latest
command: server --console-address ":9001" /mnt/data
ports:
- "9000:9000" # API port
- "9001:9001" # Console port
environment:
MINIO_ROOT_USER: ${MINIO_ROOT_USER}
MINIO_ROOT_PASSWORD: ${MINIO_ROOT_PASSWORD}
healthcheck:
test: ["CMD", "mc", "ready", "local"]
interval: 5s
timeout: 5s
retries: 5
volumes:
- ${MINIO_S3_LOCAL_STORAGE}:/mnt/data
101 changes: 101 additions & 0 deletions abdiff/extras/minio/download_input_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import logging
import subprocess

import boto3
from botocore.exceptions import ClientError
from mypy_boto3_s3.client import S3Client

from abdiff.config import Config

logger = logging.getLogger(__name__)

CONFIG = Config()


def download_input_files(input_files: list[str]) -> None:
"""Download extract files from S3 to a local MinIO server.

For each file download, two AWS CLI commands are run by subprocess.
The output from the first command is piped to the second command.
These commands are further explained below:

1. Copy the contents from the input file and direct to stdout.
```
aws s3 cp <input_file> -
```

2. Given the stdout from the previous command as input, copy the contents
to a similarly named file on the local MinIO server.
```
aws s3 cp --endpoint-url <minio_s3_url> --profile minio - <input_file>
```

Note: An S3 client connected to the local MinIO server will check whether the
file exists prior to any download.
"""
s3_client = boto3.client(
"s3",
endpoint_url=CONFIG.minio_s3_url,
aws_access_key_id=CONFIG.minio_root_user,
aws_secret_access_key=CONFIG.minio_root_password,
)

success_count = 0
fail_count = 0
for i, input_file in enumerate(input_files):
try:
download_input_file(input_file, s3_client)
success_count += 1
logger.info(
f"Input file: {i + 1} / {len(input_files)}: '{input_file}' "
"available locally for transformation."
)
except subprocess.CalledProcessError:
fail_count += 1
logger.info(
f"Input file: {i + 1} / {len(input_files)}: '{input_file}' "
"failed to download."
)
logger.info(
f"Available input files: {success_count}, missing input files: {fail_count}."
)

if fail_count > 0:
raise RuntimeError( # noqa: TRY003
f"{fail_count} input file(s) failed to download."
)
Comment on lines +50 to +66
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great! I like the consistent logging structure, the final tally, and the exception raised if any failures. This will be helpful for debugging large amounts of files to download, if anything goes wrong.



def download_input_file(input_file: str, s3_client: S3Client) -> None:
if check_object_exists(CONFIG.TIMDEX_BUCKET, input_file, s3_client):
return
copy_command = ["aws", "s3", "cp", input_file, "-"]
upload_command = [
"aws",
"s3",
"cp",
"--endpoint-url",
CONFIG.minio_s3_url,
"--profile",
"minio",
"-",
input_file,
]
copy_process = subprocess.run(args=copy_command, check=True, capture_output=True)
subprocess.run(
args=upload_command,
check=True,
input=copy_process.stdout,
)


def check_object_exists(bucket: str, input_file: str, s3_client: S3Client) -> bool:
key = input_file.replace(f"s3://{bucket}/", "")
try:
s3_client.head_object(Bucket=bucket, Key=key)
except ClientError as exception:
if exception.response["Error"]["Code"] == "404":
return False
return False
else:
return True