Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ ruff-apply: # Resolve 'fixable errors' with 'ruff'
######################
minio-start:
docker run \
-p 9000:9000 \
-p 9001:9001 \
-v $(MINIO_DATA):/data \
-e "MINIO_ROOT_USER=$(MINIO_USERNAME)" \
-e "MINIO_ROOT_PASSWORD=$(MINIO_PASSWORD)" \
quay.io/minio/minio server /data --console-address ":9001"
-d \
-p 9000:9000 \
-p 9001:9001 \
-v $(MINIO_DATA):/data \
-e "MINIO_ROOT_USER=$(MINIO_USERNAME)" \
-e "MINIO_ROOT_PASSWORD=$(MINIO_PASSWORD)" \
quay.io/minio/minio server /data --console-address ":9001"
2 changes: 1 addition & 1 deletion Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pyarrow = "*"

[dev-packages]
black = "*"
boto3-stubs = {version = "*", extras = ["s3"]}
boto3-stubs = {extras = ["essential"], version = "*"}
coveralls = "*"
ipython = "*"
moto = "*"
Expand Down
1,151 changes: 625 additions & 526 deletions Pipfile.lock

Large diffs are not rendered by default.

33 changes: 15 additions & 18 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""tests/conftest.py"""

# ruff: noqa: D205, D209

import os

import boto3
import moto
import pytest

from tests.utils import (
Expand Down Expand Up @@ -62,7 +62,6 @@ def fixed_local_dataset(tmp_path) -> TIMDEXDataset:
timdex_dataset.write(
generate_sample_records(
num_records=1_000,
timdex_record_id_prefix=source,
source=source,
run_date="2024-12-01",
run_id=run_id,
Expand All @@ -82,19 +81,6 @@ def _records_iter(num_records):
return _records_iter


@pytest.fixture
def sample_records_iter_without_partitions():
"""Simulates an iterator of X number of DatasetRecord instances WITHOUT partition
values included."""

def _records_iter(num_records):
return generate_sample_records(
num_records, run_date="invalid run-date", year=None, month=None, day=None
)

return _records_iter


@pytest.fixture
def dataset_with_runs_location(tmp_path) -> str:
"""Fixture to simulate a dataset with multiple full and daily ETL runs."""
Expand Down Expand Up @@ -139,7 +125,6 @@ def dataset_with_runs_location(tmp_path) -> str:
num_records, source, run_date, run_type, action, run_id = params
records = generate_sample_records(
num_records,
timdex_record_id_prefix=source,
source=source,
run_date=run_date,
run_type=run_type,
Expand Down Expand Up @@ -195,7 +180,6 @@ def dataset_with_same_day_runs(tmp_path) -> TIMDEXDataset:
num_records, source, run_date, run_type, action, run_id, run_timestamp = params
records = generate_sample_records(
num_records,
timdex_record_id_prefix=source,
source=source,
run_date=run_date,
run_type=run_type,
Expand All @@ -214,3 +198,16 @@ def dataset_with_same_day_runs(tmp_path) -> TIMDEXDataset:
@pytest.fixture
def timdex_dataset_metadata(dataset_with_same_day_runs):
return TIMDEXDatasetMetadata(timdex_dataset=dataset_with_same_day_runs)


@pytest.fixture
def timdex_bucket():
return "timdex"


@pytest.fixture
def mock_s3_resource(timdex_bucket):
with moto.mock_aws():
conn = boto3.resource("s3", region_name="us-east-1")
conn.create_bucket(Bucket=timdex_bucket)
yield conn
110 changes: 110 additions & 0 deletions tests/test_s3client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
"""tests/test_s3client.py"""

# ruff: noqa: PLR2004, SLF001

import pytest

from timdex_dataset_api.utils import S3Client


def test_s3client_init():
"""Test S3Client initialization."""
client = S3Client()
assert client.resource is not None


def test_s3client_init_with_minio_env(caplog, monkeypatch):
"""Test S3Client initialization with MinIO environment variables."""
caplog.set_level("DEBUG")

monkeypatch.setenv("MINIO_S3_ENDPOINT_URL", "http://localhost:9000")
monkeypatch.setenv("MINIO_USERNAME", "minioadmin")
monkeypatch.setenv("MINIO_PASSWORD", "minioadmin")
monkeypatch.setenv("MINIO_REGION", "us-east-1")

client = S3Client()
assert client.resource is not None
assert "MinIO env vars detected, using for S3Client" in caplog.text


def test_split_s3_uri():
"""Test _split_s3_uri method."""
client = S3Client()
bucket, key = client._split_s3_uri("s3://timdex/path/to/file.txt")
assert bucket == "timdex"
assert key == "path/to/file.txt"


def test_split_s3_uri_invalid():
"""Test _split_s3_uri method with invalid URI."""
client = S3Client()
with pytest.raises(ValueError, match="Invalid S3 URI"):
client._split_s3_uri("timdex/path/to/file.txt")


def test_upload_download_file(mock_s3_resource, tmp_path):
"""Test upload_file and download_file methods."""
client = S3Client()

# Create a test file
test_file = tmp_path / "test.txt"
test_file.write_text("test content")

# Upload the file
s3_uri = "s3://timdex/test.txt"
client.upload_file(test_file, s3_uri)

# Download the file to a different location
download_path = tmp_path / "downloaded.txt"
client.download_file(s3_uri, download_path)

# Verify the content
assert download_path.read_text() == "test content"


def test_delete_file(mock_s3_resource, tmp_path):
"""Test delete_file method."""
client = S3Client()

# Create and upload a test file
test_file = tmp_path / "test.txt"
test_file.write_text("test content")
s3_uri = "s3://timdex/test.txt"
client.upload_file(test_file, s3_uri)

# Delete the file
client.delete_file(s3_uri)

# Verify the file is deleted
bucket = mock_s3_resource.Bucket("timdex")
objects = list(bucket.objects.all())
assert len(objects) == 0


def test_delete_folder(mock_s3_resource, tmp_path):
"""Test delete_folder method."""
client = S3Client()

# Create and upload test files
for i in range(3):
test_file = tmp_path / f"test{i}.txt"
test_file.write_text(f"test content {i}")
s3_uri = f"s3://timdex/folder/test{i}.txt"
client.upload_file(test_file, s3_uri)

# Upload a file outside the folder
other_file = tmp_path / "other.txt"
other_file.write_text("other content")
client.upload_file(other_file, "s3://timdex/other.txt")

# Delete the folder
deleted_keys = client.delete_folder("s3://timdex/folder/")

# Verify only folder contents are deleted
assert len(deleted_keys) == 3
assert all(key.startswith("folder/") for key in deleted_keys)

bucket = mock_s3_resource.Bucket("timdex")
objects = list(bucket.objects.all())
assert len(objects) == 1
assert objects[0].key == "other.txt"
4 changes: 1 addition & 3 deletions tests/test_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,7 @@ def test_dataset_write_partition_for_multiple_sources(

# perform write for source="libguides" and run_date="2024-12-01"
written_files_source_b = new_local_dataset.write(
generate_sample_records(
num_records=7, timdex_record_id_prefix="libguides", source="libguides"
)
generate_sample_records(num_records=7, source="libguides")
)
new_local_dataset.load()

Expand Down
4 changes: 1 addition & 3 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

def generate_sample_records(
num_records: int,
timdex_record_id_prefix: str = "alma",
source: str | None = "alma",
run_date: str | None = "2024-12-01",
run_type: str | None = "daily",
Expand All @@ -25,7 +24,7 @@ def generate_sample_records(

for x in range(num_records):
yield DatasetRecord(
timdex_record_id=f"{timdex_record_id_prefix}:{x}",
timdex_record_id=f"{source}:{x}",
source_record=b"<record><title>Hello World.</title></record>",
transformed_record=b"""{"title":["Hello World."]}""",
source=source,
Expand Down Expand Up @@ -53,7 +52,6 @@ def generate_sample_records_with_simulated_partitions(
source = random.choice(sources)
yield from generate_sample_records(
num_records=batch_size,
timdex_record_id_prefix=source,
source=source,
run_date=random.choice(run_dates),
run_type=random.choice(run_types),
Expand Down
7 changes: 5 additions & 2 deletions timdex_dataset_api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,8 @@ def configure_logger(

def configure_dev_logger() -> logging.Logger:
"""Invoke to setup DEBUG level console logging for development work."""
logging.basicConfig(level=logging.DEBUG)
return configure_logger(__name__)
if not logging.getLogger().handlers:
logging.basicConfig(level=logging.WARNING)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does line 37 set the logging level to logging.WARNING only to be set to logging.DEBUG a few lines after? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 37 sets the WARNING for logging.basicConfig() which is kind of global. But then we set the actual timdex_dataset_api logger instance to DEBUG.

I'll admit, there are probably other ways to do this... but... it's works nicely locally for dev work like so, quite literally from something I was just working on:

import os

from timdex_dataset_api import TIMDEXDatasetMetadata
from timdex_dataset_api.config import configure_dev_logger

configure_dev_logger()  # <---------------

tdm = TIMDEXDatasetMetadata(os.environ["TIMDEX_DATASET_LOCATION"])

This ensures DEBUG logging from this library, but everything else is pretty quiet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! Got it. There's a part of me that thinks logger -> tda_logger but 'tis a non-blocking suggestion!

logger = logging.getLogger("timdex_dataset_api")
logger.setLevel(logging.DEBUG)
return logger
76 changes: 76 additions & 0 deletions timdex_dataset_api/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""timdex_dataset_api/utils.py"""

import logging
import os
import pathlib
from urllib.parse import urlparse

import boto3
from mypy_boto3_s3.service_resource import S3ServiceResource

logger = logging.getLogger(__name__)


class S3Client:
def __init__(
self,
) -> None:
self.resource = self._create_resource()

def _create_resource(self) -> S3ServiceResource:
"""Instantiate a boto3 S3 resource.
If env var MINIO_S3_ENDPOINT_URL is set, assume using local set of MinIO env vars.
"""
endpoint_url = os.getenv("MINIO_S3_ENDPOINT_URL")
if endpoint_url:
logger.debug("MinIO env vars detected, using for S3Client.")
return boto3.resource(
"s3",
endpoint_url=endpoint_url,
aws_access_key_id=os.getenv("MINIO_USERNAME"),
aws_secret_access_key=os.getenv("MINIO_PASSWORD"),
region_name=os.getenv("MINIO_REGION", "us-east-1"),
)
return boto3.resource("s3")

def download_file(self, s3_uri: str, local_path: str | pathlib.Path) -> None:
bucket, key = self._split_s3_uri(s3_uri)
local_path = pathlib.Path(local_path)
local_path.parent.mkdir(parents=True, exist_ok=True)
self.resource.Bucket(bucket).download_file(key, str(local_path))
logger.info(f"Downloaded {s3_uri} to {local_path}")

def upload_file(self, local_path: str | pathlib.Path, s3_uri: str) -> None:
bucket, key = self._split_s3_uri(s3_uri)
local_path = pathlib.Path(local_path)
self.resource.Bucket(bucket).upload_file(str(local_path), key)
logger.info(f"Uploaded {local_path} to {s3_uri}")

def delete_file(self, s3_uri: str) -> None:
bucket, key = self._split_s3_uri(s3_uri)
self.resource.Object(bucket, key).delete()
logger.info(f"Deleted {s3_uri}")

def delete_folder(self, s3_uri: str) -> list[str]:
"""Delete all objects whose keys start with the given prefix."""
bucket, prefix = self._split_s3_uri(s3_uri)
bucket_obj = self.resource.Bucket(bucket)
receipt = bucket_obj.objects.filter(Prefix=prefix).delete()

deleted_keys = []
for request in receipt:
deleted_keys.extend([item["Key"] for item in request["Deleted"]])
logger.info(f"Deleted {deleted_keys}")
return deleted_keys

@staticmethod
def _split_s3_uri(s3_uri: str) -> tuple[str, str]:
"""Validate and split an S3 URI into (bucket, key)."""
parsed = urlparse(s3_uri)
if parsed.scheme != "s3" or not parsed.netloc or not parsed.path:
raise ValueError(f"Invalid S3 URI: {s3_uri!r}")

bucket = parsed.netloc
key = parsed.path.lstrip("/") # strip leading slash from /key
return bucket, key