From 8c1b516cf2fae17d600536af2da9f149d574b6bc Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Tue, 26 May 2026 21:21:19 -0700 Subject: [PATCH 1/3] close stream in upload-error test to prevent finalizer leak across tests --- .../storage/test_large_binary_output_stream.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py b/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py index ce9fc4e5d12..17725d9c66a 100644 --- a/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py +++ b/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py @@ -215,13 +215,19 @@ def test_write_after_upload_error_raises_error(self, large_binary): mock_s3.upload_fileobj.side_effect = Exception("Upload failed") stream = LargeBinaryOutputStream(large_binary) - stream.write(b"test data") + try: + stream.write(b"test data") - # Wait a bit for the error to be set - time.sleep(0.1) + # Wait a bit for the error to be set + time.sleep(0.1) - with pytest.raises(IOError, match="Background upload failed"): - stream.write(b"more data") + with pytest.raises(IOError, match="Background upload failed"): + stream.write(b"more data") + finally: + # Close inside the patch scope so finalizer-driven cleanup + # doesn't fire against a later test's mocks. + with pytest.raises(IOError): + stream.close() def test_multiple_close_calls(self, large_binary): """Test that multiple close() calls are safe.""" From 3a7062b0b5fee5360deb7582a45f6584a5a5325b Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Wed, 27 May 2026 03:20:49 -0700 Subject: [PATCH 2/3] Move S3 cleanup call into the upload thread --- .../storage/large_binary_output_stream.py | 20 +++++++++---------- .../test_large_binary_output_stream.py | 18 ++++++----------- 2 files changed, 15 insertions(+), 23 deletions(-) diff --git a/amber/src/main/python/pytexera/storage/large_binary_output_stream.py b/amber/src/main/python/pytexera/storage/large_binary_output_stream.py index 0cdf8a3679f..cb5c6288c62 100644 --- a/amber/src/main/python/pytexera/storage/large_binary_output_stream.py +++ b/amber/src/main/python/pytexera/storage/large_binary_output_stream.py @@ -153,12 +153,21 @@ def write(self, b: Union[bytes, bytearray]) -> int: if self._upload_thread is None: def upload_worker(): + # Reuse this s3 for cleanup so finalizers never re-resolve it. + s3 = None try: large_binary_manager._ensure_bucket_exists(self._bucket_name) s3 = large_binary_manager._get_s3_client() reader = _QueueReader(self._queue) s3.upload_fileobj(reader, self._bucket_name, self._object_key) except Exception as e: + if s3 is not None: + try: + s3.delete_object( + Bucket=self._bucket_name, Key=self._object_key + ) + except Exception: + pass # original upload error takes precedence with self._lock: self._upload_exception = e finally: @@ -214,12 +223,10 @@ def close(self) -> None: self._upload_thread.join() self._upload_complete.wait() - # Check for errors and cleanup if needed with self._lock: exception = self._upload_exception if exception is not None: - self._cleanup_failed_upload() raise IOError( f"Failed to complete upload: {exception}" ) from exception @@ -228,15 +235,6 @@ def close(self) -> None: # the second close() call on Python 3.13+. super().close() - def _cleanup_failed_upload(self): - """Clean up a failed upload by deleting the S3 object.""" - try: - s3 = large_binary_manager._get_s3_client() - s3.delete_object(Bucket=self._bucket_name, Key=self._object_key) - except Exception: - # Ignore cleanup errors - we're already handling an upload failure - pass - def __enter__(self): """Context manager entry.""" return self diff --git a/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py b/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py index 17725d9c66a..6a7aea720fc 100644 --- a/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py +++ b/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py @@ -215,19 +215,13 @@ def test_write_after_upload_error_raises_error(self, large_binary): mock_s3.upload_fileobj.side_effect = Exception("Upload failed") stream = LargeBinaryOutputStream(large_binary) - try: - stream.write(b"test data") + stream.write(b"test data") - # Wait a bit for the error to be set - time.sleep(0.1) + # Wait a bit for the error to be set + time.sleep(0.1) - with pytest.raises(IOError, match="Background upload failed"): - stream.write(b"more data") - finally: - # Close inside the patch scope so finalizer-driven cleanup - # doesn't fire against a later test's mocks. - with pytest.raises(IOError): - stream.close() + with pytest.raises(IOError, match="Background upload failed"): + stream.write(b"more data") def test_multiple_close_calls(self, large_binary): """Test that multiple close() calls are safe.""" @@ -249,7 +243,7 @@ def test_multiple_close_calls(self, large_binary): class TestCleanupFailedUpload: - """Direct unit tests for _cleanup_failed_upload's silent-swallow path.""" + """Direct unit tests for the upload worker's silent-swallow cleanup.""" @pytest.fixture def large_binary(self): From 726b676fbcc122694a6225404d2e5b375b03e55b Mon Sep 17 00:00:00 2001 From: "Matthew B." Date: Wed, 27 May 2026 15:46:43 -0700 Subject: [PATCH 3/3] Clean up upload worker code by removing comment Removed comment about reusing S3 for cleanup in upload worker. Signed-off-by: Matthew B. --- .../main/python/pytexera/storage/large_binary_output_stream.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/amber/src/main/python/pytexera/storage/large_binary_output_stream.py b/amber/src/main/python/pytexera/storage/large_binary_output_stream.py index cb5c6288c62..0a47a853cb3 100644 --- a/amber/src/main/python/pytexera/storage/large_binary_output_stream.py +++ b/amber/src/main/python/pytexera/storage/large_binary_output_stream.py @@ -153,7 +153,6 @@ def write(self, b: Union[bytes, bytearray]) -> int: if self._upload_thread is None: def upload_worker(): - # Reuse this s3 for cleanup so finalizers never re-resolve it. s3 = None try: large_binary_manager._ensure_bucket_exists(self._bucket_name) @@ -167,7 +166,7 @@ def upload_worker(): Bucket=self._bucket_name, Key=self._object_key ) except Exception: - pass # original upload error takes precedence + pass with self._lock: self._upload_exception = e finally: