From 1061afc7031de630d8f1b69a23897bb727384f88 Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Mon, 27 Jan 2025 10:31:44 +0100 Subject: [PATCH 1/2] Fix for loading content --- ingestify/application/dataset_store.py | 2 +- ingestify/domain/models/dataset/file.py | 2 +- .../domain/models/ingestion/ingestion_job_summary.py | 2 +- ingestify/tests/test_engine.py | 8 ++++++++ 4 files changed, 11 insertions(+), 3 deletions(-) diff --git a/ingestify/application/dataset_store.py b/ingestify/application/dataset_store.py index 470eb0f..c2be1ae 100644 --- a/ingestify/application/dataset_store.py +++ b/ingestify/application/dataset_store.py @@ -295,7 +295,7 @@ def load_files( def get_stream(file_): return reader( self.file_repository.load_content( - bucket=self.bucket, storage_path=file_.storage_path + storage_path=file_.storage_path ) ) diff --git a/ingestify/domain/models/dataset/file.py b/ingestify/domain/models/dataset/file.py index 3670e1b..b983e2e 100644 --- a/ingestify/domain/models/dataset/file.py +++ b/ingestify/domain/models/dataset/file.py @@ -116,7 +116,7 @@ class LoadedFile(BaseModel): data_serialization_format: Optional[str] # Example: 'json' storage_compression_method: Optional[str] # Example: 'gzip' storage_path: Path - _stream: Union[BinaryIO, Callable[[], Awaitable[BinaryIO]]] + _stream: Union[BinaryIO, BytesIO, Callable[[], Awaitable[Union[BinaryIO, BytesIO]]]] revision_id: Optional[int] = None # This can be used when a Revision is squashed def load_stream(self): diff --git a/ingestify/domain/models/ingestion/ingestion_job_summary.py b/ingestify/domain/models/ingestion/ingestion_job_summary.py index ec6baf4..b18345a 100644 --- a/ingestify/domain/models/ingestion/ingestion_job_summary.py +++ b/ingestify/domain/models/ingestion/ingestion_job_summary.py @@ -112,7 +112,7 @@ def output_report(self): print(f" - Failed tasks: {self.failed_tasks}") print(f" - Successful tasks: {self.successful_tasks}") - print(f" - Successful ignored tasks: {self.successful_tasks}") + print(f" - Successful ignored tasks: {self.ignored_successful_tasks}") print(f" - Skipped datasets: {self.skipped_datasets}") print("--------------------") diff --git a/ingestify/tests/test_engine.py b/ingestify/tests/test_engine.py index 506aa01..eb17c5a 100644 --- a/ingestify/tests/test_engine.py +++ b/ingestify/tests/test_engine.py @@ -245,6 +245,12 @@ def test_engine(config_file): items = list(engine.store.dataset_repository.session.query(IngestionJobSummary)) print(items) + # Make sure we can load the files + files = engine.store.load_files(datasets.first(), lazy=True) + assert files.get_file("file1").stream.read() == b'content1' + + files = engine.store.load_files(datasets.first(), lazy=False) + assert files.get_file("file1").stream.read() == b'content1' def test_iterator_source(config_file): """Test when a Source returns a Iterator to do Batch processing. @@ -312,3 +318,5 @@ def test_change_partition_key_transformer(): This probably means we need to use the storage_path for reading. """ + + From 3bca29e98d1dca026d9d953aa79d56a741ee7db0 Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Mon, 27 Jan 2025 10:34:12 +0100 Subject: [PATCH 2/2] Code formatting --- ingestify/application/dataset_store.py | 4 +--- ingestify/tests/test_engine.py | 7 +++---- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/ingestify/application/dataset_store.py b/ingestify/application/dataset_store.py index c2be1ae..f9b01cf 100644 --- a/ingestify/application/dataset_store.py +++ b/ingestify/application/dataset_store.py @@ -294,9 +294,7 @@ def load_files( def get_stream(file_): return reader( - self.file_repository.load_content( - storage_path=file_.storage_path - ) + self.file_repository.load_content(storage_path=file_.storage_path) ) loaded_file = LoadedFile( diff --git a/ingestify/tests/test_engine.py b/ingestify/tests/test_engine.py index eb17c5a..56087a1 100644 --- a/ingestify/tests/test_engine.py +++ b/ingestify/tests/test_engine.py @@ -247,10 +247,11 @@ def test_engine(config_file): # Make sure we can load the files files = engine.store.load_files(datasets.first(), lazy=True) - assert files.get_file("file1").stream.read() == b'content1' + assert files.get_file("file1").stream.read() == b"content1" files = engine.store.load_files(datasets.first(), lazy=False) - assert files.get_file("file1").stream.read() == b'content1' + assert files.get_file("file1").stream.read() == b"content1" + def test_iterator_source(config_file): """Test when a Source returns a Iterator to do Batch processing. @@ -318,5 +319,3 @@ def test_change_partition_key_transformer(): This probably means we need to use the storage_path for reading. """ - -