From dd8883b40d5796aea1575cf3fa19bbc647cfb882 Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Thu, 28 May 2026 10:56:22 -0700 Subject: [PATCH 01/18] feat: eid-scoped large binary create + per-execution delete (#4123) --- .../service/util/LargeBinaryManager.scala | 57 ++++++++-- .../service/util/LargeBinaryManagerSpec.scala | 102 +++++++++++++----- 2 files changed, 122 insertions(+), 37 deletions(-) diff --git a/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala index 44db3929f27..f5e63ae3ced 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala @@ -33,23 +33,62 @@ object LargeBinaryManager extends LazyLogging { val DEFAULT_BUCKET: String = "texera-large-binaries" /** - * Creates a new LargeBinary reference. + * Worker-scoped execution context. It is set on the data-processing thread when an + * executor is initialized, so that create() can stamp each object key with its owning + * execution id without threading the id through every operator. A thread-local keeps + * concurrent executions in the same JVM isolated, because each worker runs on its own + * data-processing thread. + */ + private val currentExecutionId: ThreadLocal[Option[Long]] = + ThreadLocal.withInitial(() => Option.empty[Long]) + + /** Sets the execution id for large binaries created on the current thread. */ + def setCurrentExecutionId(executionId: Long): Unit = + currentExecutionId.set(Some(executionId)) + + /** + * Creates a new LargeBinary reference scoped to the current execution. * The actual data upload happens separately via LargeBinaryOutputStream. * - * @return S3 URI string for the new LargeBinary (format: s3://bucket/key) + * @return S3 URI string for the new LargeBinary (format: s3://bucket/objects/{eid}/{uuid}) */ def create(): String = { - val objectKey = s"objects/${System.currentTimeMillis()}/${UUID.randomUUID()}" - val uri = s"s3://$DEFAULT_BUCKET/$objectKey" - - uri + val eid = currentExecutionId + .get() + .getOrElse( + throw new IllegalStateException( + "LargeBinaryManager.create() requires an execution context, " + + "but none was set on the current thread." + ) + ) + val objectKey = s"objects/$eid/${UUID.randomUUID()}" + s"s3://$DEFAULT_BUCKET/$objectKey" } /** - * Deletes all large binaries from the bucket. + * Deletes all large binaries belonging to a single execution. * - * @throws java.lang.Exception if the deletion fails - * @return Unit + * @param executionId the execution whose large binaries should be removed + */ + def deleteByExecution(executionId: Long): Unit = { + try { + S3StorageClient.deleteDirectory(DEFAULT_BUCKET, s"objects/$executionId") + logger.info( + s"Deleted large binaries for execution $executionId from bucket: $DEFAULT_BUCKET" + ) + } catch { + case e: Exception => + logger.warn( + s"Failed to delete large binaries for execution $executionId " + + s"from bucket: $DEFAULT_BUCKET", + e + ) + } + } + + /** + * Deletes all large binaries from the bucket. Destructive maintenance use only. + * Removed in a later task once no caller remains. */ def deleteAllObjects(): Unit = { try { diff --git a/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala index 77d142efeeb..0a15f0832fa 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala @@ -21,8 +21,18 @@ package org.apache.texera.service.util import org.apache.texera.amber.core.tuple.LargeBinary import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.BeforeAndAfterEach -class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { +class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase with BeforeAndAfterEach { + + /** Execution id used by the bulk of the tests. */ + private val TestExecutionId: Long = 9999L + + /** Each test creates large binaries; they need an execution context on the thread. */ + override def beforeEach(): Unit = { + super.beforeEach() + LargeBinaryManager.setCurrentExecutionId(TestExecutionId) + } /** Creates a large binary from string data and returns it. */ private def createLargeBinary(data: String): LargeBinary = { @@ -54,7 +64,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(stream.readAllBytes().sameElements(data.getBytes)) stream.close() - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryInputStream should read exact number of bytes") { @@ -67,7 +77,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(result.sameElements("0123456789".getBytes)) stream.close() - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryInputStream should handle reading more bytes than available") { @@ -81,7 +91,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(result.sameElements(data.getBytes)) stream.close() - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryInputStream should support standard single-byte read") { @@ -94,7 +104,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(stream.read() == -1) // EOF stream.close() - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryInputStream should return -1 at EOF") { @@ -105,7 +115,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(stream.read() == -1) stream.close() - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryInputStream should throw exception when reading from closed stream") { @@ -117,7 +127,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assertThrows[java.io.IOException](stream.read()) assertThrows[java.io.IOException](stream.readAllBytes()) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryInputStream should handle multiple close calls") { @@ -127,7 +137,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { stream.close() stream.close() // Should not throw - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryInputStream should read large data correctly") { @@ -145,7 +155,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(result.sameElements(largeData)) stream.close() - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } // ======================================== @@ -200,18 +210,18 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { out2.close() } - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryManager should handle delete with no objects gracefully") { - LargeBinaryManager.deleteAllObjects() // Should not throw exception + LargeBinaryManager.deleteByExecution(TestExecutionId) // Should not throw exception } test("LargeBinaryManager should delete all objects") { val pointer1 = createLargeBinary("Test data") val pointer2 = createLargeBinary("Test data") - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryManager should create bucket if it doesn't exist") { @@ -219,7 +229,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assertStandardBucket(pointer) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryManager should handle large objects correctly") { @@ -237,7 +247,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { stream.close() assert(readData.sameElements(largeData)) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryManager should generate unique URIs for different objects") { @@ -261,7 +271,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(pointer1.getUri != pointer2.getUri) assert(pointer1.getObjectKey != pointer2.getObjectKey) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryInputStream should handle multiple reads from the same large binary") { @@ -279,7 +289,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(readData1.sameElements(data.getBytes)) assert(readData2.sameElements(data.getBytes)) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryManager should properly parse bucket name and object key from large binary") { @@ -289,7 +299,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(largeBinary.getObjectKey.nonEmpty) assert(!largeBinary.getObjectKey.startsWith("/")) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } // ======================================== @@ -309,7 +319,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assertStandardBucket(largeBinary) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryInputStream constructor should read large binary contents") { @@ -322,7 +332,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(readData.sameElements(data.getBytes)) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryOutputStream and LargeBinaryInputStream should work together end-to-end") { @@ -344,7 +354,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(readData.sameElements(data.getBytes)) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } // ======================================== @@ -368,7 +378,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(readData.sameElements(data.getBytes)) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryOutputStream should create large binary") { @@ -381,7 +391,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assertStandardBucket(largeBinary) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryOutputStream should handle large data correctly") { @@ -399,7 +409,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(readData.sameElements(largeData)) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryOutputStream should handle multiple writes") { @@ -416,7 +426,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(readData.sameElements("Hello World!".getBytes)) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryOutputStream should throw exception when writing to closed stream") { @@ -427,7 +437,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assertThrows[java.io.IOException](outStream.write("more".getBytes)) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryOutputStream should handle close() being called multiple times") { @@ -437,7 +447,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { outStream.close() outStream.close() // Should not throw - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("New LargeBinary() constructor should create unique URIs") { @@ -447,7 +457,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(largeBinary1.getUri != largeBinary2.getUri) assert(largeBinary1.getObjectKey != largeBinary2.getObjectKey) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinary() and LargeBinaryOutputStream API should be symmetric with input") { @@ -466,6 +476,42 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(readData.sameElements(data.getBytes)) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) + } + + test("create() stamps the object key with the current execution id") { + LargeBinaryManager.setCurrentExecutionId(123L) + val uri = LargeBinaryManager.create() + assert(uri.startsWith("s3://texera-large-binaries/objects/123/")) + } + + test("deleteByExecution removes only the target execution's binaries") { + // Create one binary under execution 1001 and another under 1002. + LargeBinaryManager.setCurrentExecutionId(1001L) + createLargeBinary("data for 1001") + LargeBinaryManager.setCurrentExecutionId(1002L) + createLargeBinary("data for 1002") + + // Delete only execution 1001's binaries. + LargeBinaryManager.deleteByExecution(1001L) + + try { + assert(!S3StorageClient.directoryExists("texera-large-binaries", "objects/1001")) + assert(S3StorageClient.directoryExists("texera-large-binaries", "objects/1002")) + } finally { + LargeBinaryManager.deleteByExecution(1002L) + } + } + + test("create() throws when no execution context is set on the thread") { + // Run on a fresh thread, where the thread-local defaults to None. + @volatile var caught: Option[Throwable] = None + val t = new Thread(() => { + try LargeBinaryManager.create() + catch { case e: Throwable => caught = Some(e) } + }) + t.start() + t.join() + assert(caught.exists(_.isInstanceOf[IllegalStateException])) } } From d0d6fd92f124c3234adbfe9ae9fc5aec8813f4f7 Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Thu, 28 May 2026 11:03:13 -0700 Subject: [PATCH 02/18] feat: add executionId to InitializeExecutorRequest (#4123) Also update existing call site in RegionExecutionCoordinator to pass None for the new field (required because ScalaPB no_default_values_in_constructor is true). --- .../texera/amber/engine/architecture/rpc/controlcommands.proto | 1 + .../architecture/scheduling/RegionExecutionCoordinator.scala | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto index 1f55927e4ae..016571fd10d 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto @@ -252,6 +252,7 @@ message InitializeExecutorRequest { int32 totalWorkerCount = 1; core.OpExecInitInfo opExecInitInfo = 2; bool isSource = 3; + core.ExecutionIdentity executionId = 4; } message UpdateExecutorRequest { diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index 5a9df11b589..81e10a89eba 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -404,7 +404,8 @@ class RegionExecutionCoordinator( InitializeExecutorRequest( workerConfigs.length, physicalOp.opExecInitInfo, - physicalOp.isSourceOperator + physicalOp.isSourceOperator, + None ), asyncRPCClient.mkContext(workerId) ) From add9b95fc49a77d5725b44a0dc729df451b1fdb1 Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Thu, 28 May 2026 11:06:22 -0700 Subject: [PATCH 03/18] feat: send executionId to workers on executor init (#4123) --- .../architecture/scheduling/RegionExecutionCoordinator.scala | 2 +- .../texera/amber/engine/architecture/worker/WorkerSpec.scala | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index 81e10a89eba..91191af139f 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -405,7 +405,7 @@ class RegionExecutionCoordinator( workerConfigs.length, physicalOp.opExecInitInfo, physicalOp.isSourceOperator, - None + Some(physicalOp.executionId) ), asyncRPCClient.mkContext(workerId) ) diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/WorkerSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/WorkerSpec.scala index df9cb086a60..5494b763e24 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/WorkerSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/WorkerSpec.scala @@ -200,7 +200,8 @@ class WorkerSpec OpExecWithClassName( "org.apache.texera.amber.engine.architecture.worker.DummyOperatorExecutor" ), - isSource = false + isSource = false, + executionId = None ), AsyncRPCContext(CONTROLLER, identifier1), 4 From decab59a211c9a71e1683dfdd3ee046dbb196ab5 Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Thu, 28 May 2026 11:07:28 -0700 Subject: [PATCH 04/18] feat: set large-binary execution context on JVM worker init (#4123) --- .../worker/promisehandlers/InitializeExecutorHandler.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala index 969b466a1b2..8a5c71747bc 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala @@ -27,6 +27,7 @@ import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn import org.apache.texera.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer import org.apache.texera.amber.util.VirtualIdentityUtils +import org.apache.texera.service.util.LargeBinaryManager trait InitializeExecutorHandler { this: DataProcessorRPCHandlerInitializer => @@ -44,6 +45,7 @@ trait InitializeExecutorHandler { ) ) cachedTotalWorkerCount = req.totalWorkerCount + req.executionId.foreach(eid => LargeBinaryManager.setCurrentExecutionId(eid.id)) setupExecutor(req.opExecInitInfo, workerIdx, cachedTotalWorkerCount) EmptyReturn() } From d23650421d7afb300cc35b4dd70dc9076b6d317f Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Thu, 28 May 2026 11:08:34 -0700 Subject: [PATCH 05/18] fix: scope execution cleanup to the execution's large binaries (#4123) --- .../org/apache/texera/web/service/WorkflowService.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala index 809faf6a520..3f6a1db9f6e 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala @@ -311,7 +311,7 @@ class WorkflowService( * 2. Clears URI references from the execution registry * 3. Safely clears all result and console message documents * 4. Expires Iceberg snapshots for runtime statistics - * 5. Deletes large binaries from MinIO + * 5. Deletes this execution's large binaries from MinIO * * @param eid The execution identity to clean up resources for */ @@ -348,7 +348,7 @@ class WorkflowService( logger.debug(s"Error processing document at $uri: ${error.getMessage}") } } - // Delete large binaries - LargeBinaryManager.deleteAllObjects() + // Delete this execution's large binaries + LargeBinaryManager.deleteByExecution(eid.id) } } From 51e49d1b8dfd4c9835913917c94d734e1ca75ac5 Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Thu, 28 May 2026 11:11:13 -0700 Subject: [PATCH 06/18] fix: scope deleteWorkflow large-binary cleanup to its executions (#4123) --- .../resource/dashboard/user/workflow/WorkflowResource.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala index cb910d11c3c..6d6e68a2b5f 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala @@ -611,7 +611,8 @@ class WorkflowResource extends LazyLogging { .asScala .toList - LargeBinaryManager.deleteAllObjects() + // Delete large binaries for each execution belonging to the workflows being removed + eids.foreach(eid => LargeBinaryManager.deleteByExecution(eid.longValue())) // Collect all URIs related to executions for cleanup val uris = eids.flatMap { eid => From 511461001fd54be8928e9364aea0a2cc111ca218 Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Thu, 28 May 2026 11:12:22 -0700 Subject: [PATCH 07/18] refactor: remove unused bucket-wide deleteAllObjects (#4123) --- .../texera/service/util/LargeBinaryManager.scala | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala index f5e63ae3ced..5a7342b00e0 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala @@ -86,18 +86,4 @@ object LargeBinaryManager extends LazyLogging { } } - /** - * Deletes all large binaries from the bucket. Destructive maintenance use only. - * Removed in a later task once no caller remains. - */ - def deleteAllObjects(): Unit = { - try { - S3StorageClient.deleteDirectory(DEFAULT_BUCKET, "objects") - logger.info(s"Successfully deleted all large binaries from bucket: $DEFAULT_BUCKET") - } catch { - case e: Exception => - logger.warn(s"Failed to delete large binaries from bucket: $DEFAULT_BUCKET", e) - } - } - } From 98453602dfec33d60eeee9b9ce7166702d31ea4c Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Thu, 28 May 2026 11:16:07 -0700 Subject: [PATCH 08/18] feat: eid-scoped large binary create on Python worker (#4123) --- .../control/initialize_executor_handler.py | 4 ++ .../python/core/storage/storage_config.py | 3 ++ .../pytexera/storage/large_binary_manager.py | 18 ++++++--- .../storage/test_large_binary_manager.py | 39 +++++++++++++++++-- 4 files changed, 56 insertions(+), 8 deletions(-) diff --git a/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py b/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py index 2c2dc1ad3c7..9918fd8dc05 100644 --- a/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py +++ b/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py @@ -16,6 +16,7 @@ # under the License. from core.architecture.handlers.control.control_handler_base import ControlHandler +from core.storage.storage_config import StorageConfig from core.util import get_one_of from proto.org.apache.texera.amber.core import OpExecWithCode from proto.org.apache.texera.amber.engine.architecture.rpc import ( @@ -27,6 +28,9 @@ class InitializeExecutorHandler(ControlHandler): async def initialize_executor(self, req: InitializeExecutorRequest) -> EmptyReturn: op_exec_with_code: OpExecWithCode = get_one_of(req.op_exec_init_info) + StorageConfig.EXECUTION_ID = ( + req.execution_id.id if req.execution_id is not None else None + ) self.context.executor_manager.initialize_executor( op_exec_with_code.code, req.is_source, op_exec_with_code.language ) diff --git a/amber/src/main/python/core/storage/storage_config.py b/amber/src/main/python/core/storage/storage_config.py index 82335909874..07fbae58232 100644 --- a/amber/src/main/python/core/storage/storage_config.py +++ b/amber/src/main/python/core/storage/storage_config.py @@ -42,6 +42,9 @@ class StorageConfig: S3_AUTH_USERNAME = None S3_AUTH_PASSWORD = None + # Execution context (set per worker at executor init, used to scope large binaries) + EXECUTION_ID = None + @classmethod def initialize( cls, diff --git a/amber/src/main/python/pytexera/storage/large_binary_manager.py b/amber/src/main/python/pytexera/storage/large_binary_manager.py index e061eac6228..8883348b4dc 100644 --- a/amber/src/main/python/pytexera/storage/large_binary_manager.py +++ b/amber/src/main/python/pytexera/storage/large_binary_manager.py @@ -22,7 +22,6 @@ and LargeBinaryInputStream/LargeBinaryOutputStream instead. """ -import time import uuid from loguru import logger from core.storage.storage_config import StorageConfig @@ -66,13 +65,22 @@ def _ensure_bucket_exists(bucket: str): def create() -> str: """ - Creates a new largebinary reference with a unique S3 URI. + Creates a new largebinary reference with a unique, execution-scoped S3 URI. + + The object key is namespaced by the current execution id so cleanup can delete + only this execution's objects. The execution id is injected by the system (set on + StorageConfig when the worker is initialized); callers never pass it. Returns: - S3 URI string (format: s3://bucket/key) + S3 URI string (format: s3://bucket/objects/{execution_id}/{uuid}) """ _ensure_bucket_exists(DEFAULT_BUCKET) - timestamp_ms = int(time.time() * 1000) + execution_id = StorageConfig.EXECUTION_ID + if execution_id is None: + raise RuntimeError( + "largebinary() requires an execution context, but " + "StorageConfig.EXECUTION_ID is not set." + ) unique_id = uuid.uuid4() - object_key = f"objects/{timestamp_ms}/{unique_id}" + object_key = f"objects/{execution_id}/{unique_id}" return f"s3://{DEFAULT_BUCKET}/{object_key}" diff --git a/amber/src/test/python/pytexera/storage/test_large_binary_manager.py b/amber/src/test/python/pytexera/storage/test_large_binary_manager.py index 1942e91f8bc..48e9dfe9511 100644 --- a/amber/src/test/python/pytexera/storage/test_large_binary_manager.py +++ b/amber/src/test/python/pytexera/storage/test_large_binary_manager.py @@ -42,6 +42,11 @@ def setup_storage_config(self): s3_auth_username="minioadmin", s3_auth_password="minioadmin", ) + # Provide a default execution id so create() doesn't raise. + original_eid = StorageConfig.EXECUTION_ID + StorageConfig.EXECUTION_ID = 1 + yield + StorageConfig.EXECUTION_ID = original_eid def test_get_s3_client_initializes_once(self): """Test that S3 client is initialized and cached.""" @@ -119,7 +124,7 @@ def test_ensure_bucket_exists_creates_bucket_when_missing(self): mock_client.create_bucket.assert_called_once_with(Bucket="test-bucket") def test_create_generates_unique_uri(self): - """Test that create() generates a unique S3 URI.""" + """Test that create() generates a unique execution-scoped S3 URI.""" large_binary_manager._s3_client = None with patch("boto3.client") as mock_boto3_client: @@ -130,10 +135,10 @@ def test_create_generates_unique_uri(self): uri = large_binary_manager.create() - # Check URI format + # Check URI format: s3://bucket/objects/{eid}/{uuid} assert uri.startswith("s3://") assert uri.startswith(f"s3://{large_binary_manager.DEFAULT_BUCKET}/") - assert "objects/" in uri + assert f"objects/{StorageConfig.EXECUTION_ID}/" in uri # Verify bucket was checked/created mock_client.head_bucket.assert_called_once_with( @@ -152,3 +157,31 @@ def test_create_uses_default_bucket(self): uri = large_binary_manager.create() assert large_binary_manager.DEFAULT_BUCKET in uri + assert f"objects/{StorageConfig.EXECUTION_ID}/" in uri + + +import re + + +def test_create_stamps_execution_id(monkeypatch): + # Avoid touching real S3 while testing key generation. + monkeypatch.setattr( + large_binary_manager, "_ensure_bucket_exists", lambda bucket: None + ) + monkeypatch.setattr(StorageConfig, "EXECUTION_ID", 42, raising=False) + + uri = large_binary_manager.create() + + assert re.fullmatch( + r"s3://texera-large-binaries/objects/42/[0-9a-fA-F-]+", uri + ) + + +def test_create_without_execution_context_raises(monkeypatch): + monkeypatch.setattr( + large_binary_manager, "_ensure_bucket_exists", lambda bucket: None + ) + monkeypatch.setattr(StorageConfig, "EXECUTION_ID", None, raising=False) + + with pytest.raises(RuntimeError): + large_binary_manager.create() From 94c280478db02eec11633e2ae8109e240de3b3f1 Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Thu, 28 May 2026 11:32:10 -0700 Subject: [PATCH 09/18] fix: make Python large-binary execution-context guard fail-fast (#4123) betterproto returns an empty (falsy) ExecutionIdentity for an unset executionId field rather than None, so the previous `is not None` check never triggered and an unset id would silently produce objects/0/... Use truthiness so unset -> None -> create() raises, matching the JVM invariant. Also moves a stray mid-file `import re` to the top. --- .../handlers/control/initialize_executor_handler.py | 4 +++- .../python/pytexera/storage/test_large_binary_manager.py | 5 ++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py b/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py index 9918fd8dc05..cd6f1c0cfdb 100644 --- a/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py +++ b/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py @@ -28,8 +28,10 @@ class InitializeExecutorHandler(ControlHandler): async def initialize_executor(self, req: InitializeExecutorRequest) -> EmptyReturn: op_exec_with_code: OpExecWithCode = get_one_of(req.op_exec_init_info) + # betterproto returns an empty (falsy) ExecutionIdentity when the field is + # unset, not None, so use truthiness to detect a real id. StorageConfig.EXECUTION_ID = ( - req.execution_id.id if req.execution_id is not None else None + req.execution_id.id if req.execution_id else None ) self.context.executor_manager.initialize_executor( op_exec_with_code.code, req.is_source, op_exec_with_code.language diff --git a/amber/src/test/python/pytexera/storage/test_large_binary_manager.py b/amber/src/test/python/pytexera/storage/test_large_binary_manager.py index 48e9dfe9511..972fc8a2cb0 100644 --- a/amber/src/test/python/pytexera/storage/test_large_binary_manager.py +++ b/amber/src/test/python/pytexera/storage/test_large_binary_manager.py @@ -15,6 +15,8 @@ # specific language governing permissions and limitations # under the License. +import re + import pytest from unittest.mock import patch, MagicMock from pytexera.storage import large_binary_manager @@ -160,9 +162,6 @@ def test_create_uses_default_bucket(self): assert f"objects/{StorageConfig.EXECUTION_ID}/" in uri -import re - - def test_create_stamps_execution_id(monkeypatch): # Avoid touching real S3 while testing key generation. monkeypatch.setattr( From 8be15ca946f6e1a1a74ff0cb929317bbdf6b00fc Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Thu, 28 May 2026 13:29:53 -0700 Subject: [PATCH 10/18] refactor: hold large-binary execution id in a dedicated worker holder (#4123) Move the per-execution id out of StorageConfig (which holds only static system configuration sourced from storage.conf) into a dedicated module-level holder in large_binary_manager (set_current_execution_id), mirroring the JVM LargeBinaryManager. The Python init handler sets it via that API. --- .../control/initialize_executor_handler.py | 5 +++-- .../python/core/storage/storage_config.py | 3 --- .../pytexera/storage/large_binary_manager.py | 22 ++++++++++++++----- .../storage/test_large_binary_manager.py | 14 ++++++------ 4 files changed, 27 insertions(+), 17 deletions(-) diff --git a/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py b/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py index cd6f1c0cfdb..254f12a1acb 100644 --- a/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py +++ b/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py @@ -16,7 +16,6 @@ # under the License. from core.architecture.handlers.control.control_handler_base import ControlHandler -from core.storage.storage_config import StorageConfig from core.util import get_one_of from proto.org.apache.texera.amber.core import OpExecWithCode from proto.org.apache.texera.amber.engine.architecture.rpc import ( @@ -27,10 +26,12 @@ class InitializeExecutorHandler(ControlHandler): async def initialize_executor(self, req: InitializeExecutorRequest) -> EmptyReturn: + from pytexera.storage import large_binary_manager + op_exec_with_code: OpExecWithCode = get_one_of(req.op_exec_init_info) # betterproto returns an empty (falsy) ExecutionIdentity when the field is # unset, not None, so use truthiness to detect a real id. - StorageConfig.EXECUTION_ID = ( + large_binary_manager.set_current_execution_id( req.execution_id.id if req.execution_id else None ) self.context.executor_manager.initialize_executor( diff --git a/amber/src/main/python/core/storage/storage_config.py b/amber/src/main/python/core/storage/storage_config.py index 07fbae58232..82335909874 100644 --- a/amber/src/main/python/core/storage/storage_config.py +++ b/amber/src/main/python/core/storage/storage_config.py @@ -42,9 +42,6 @@ class StorageConfig: S3_AUTH_USERNAME = None S3_AUTH_PASSWORD = None - # Execution context (set per worker at executor init, used to scope large binaries) - EXECUTION_ID = None - @classmethod def initialize( cls, diff --git a/amber/src/main/python/pytexera/storage/large_binary_manager.py b/amber/src/main/python/pytexera/storage/large_binary_manager.py index 8883348b4dc..c77e996bad6 100644 --- a/amber/src/main/python/pytexera/storage/large_binary_manager.py +++ b/amber/src/main/python/pytexera/storage/large_binary_manager.py @@ -30,6 +30,18 @@ _s3_client = None DEFAULT_BUCKET = "texera-large-binaries" +# Per-worker execution context. A Python worker is a single process serving one +# execution, so a module-level value is sufficient (no thread-local needed). It is +# set at executor init and read by create() so the user-facing largebinary() API +# stays execution-id-free. +_current_execution_id = None + + +def set_current_execution_id(execution_id): + """Sets the execution id used to scope large binaries created by this worker.""" + global _current_execution_id + _current_execution_id = execution_id + def _get_s3_client(): """Get or initialize S3 client (lazy initialization, cached).""" @@ -68,18 +80,18 @@ def create() -> str: Creates a new largebinary reference with a unique, execution-scoped S3 URI. The object key is namespaced by the current execution id so cleanup can delete - only this execution's objects. The execution id is injected by the system (set on - StorageConfig when the worker is initialized); callers never pass it. + only this execution's objects. The execution id is injected by the system (set via + set_current_execution_id() when the worker is initialized); callers never pass it. Returns: S3 URI string (format: s3://bucket/objects/{execution_id}/{uuid}) """ _ensure_bucket_exists(DEFAULT_BUCKET) - execution_id = StorageConfig.EXECUTION_ID + execution_id = _current_execution_id if execution_id is None: raise RuntimeError( - "largebinary() requires an execution context, but " - "StorageConfig.EXECUTION_ID is not set." + "largebinary() requires an execution context, but no execution id " + "has been set for this worker." ) unique_id = uuid.uuid4() object_key = f"objects/{execution_id}/{unique_id}" diff --git a/amber/src/test/python/pytexera/storage/test_large_binary_manager.py b/amber/src/test/python/pytexera/storage/test_large_binary_manager.py index 972fc8a2cb0..285130751c2 100644 --- a/amber/src/test/python/pytexera/storage/test_large_binary_manager.py +++ b/amber/src/test/python/pytexera/storage/test_large_binary_manager.py @@ -45,10 +45,10 @@ def setup_storage_config(self): s3_auth_password="minioadmin", ) # Provide a default execution id so create() doesn't raise. - original_eid = StorageConfig.EXECUTION_ID - StorageConfig.EXECUTION_ID = 1 + original_eid = large_binary_manager._current_execution_id + large_binary_manager.set_current_execution_id(1) yield - StorageConfig.EXECUTION_ID = original_eid + large_binary_manager.set_current_execution_id(original_eid) def test_get_s3_client_initializes_once(self): """Test that S3 client is initialized and cached.""" @@ -140,7 +140,7 @@ def test_create_generates_unique_uri(self): # Check URI format: s3://bucket/objects/{eid}/{uuid} assert uri.startswith("s3://") assert uri.startswith(f"s3://{large_binary_manager.DEFAULT_BUCKET}/") - assert f"objects/{StorageConfig.EXECUTION_ID}/" in uri + assert f"objects/{large_binary_manager._current_execution_id}/" in uri # Verify bucket was checked/created mock_client.head_bucket.assert_called_once_with( @@ -159,7 +159,7 @@ def test_create_uses_default_bucket(self): uri = large_binary_manager.create() assert large_binary_manager.DEFAULT_BUCKET in uri - assert f"objects/{StorageConfig.EXECUTION_ID}/" in uri + assert f"objects/{large_binary_manager._current_execution_id}/" in uri def test_create_stamps_execution_id(monkeypatch): @@ -167,7 +167,7 @@ def test_create_stamps_execution_id(monkeypatch): monkeypatch.setattr( large_binary_manager, "_ensure_bucket_exists", lambda bucket: None ) - monkeypatch.setattr(StorageConfig, "EXECUTION_ID", 42, raising=False) + monkeypatch.setattr(large_binary_manager, "_current_execution_id", 42) uri = large_binary_manager.create() @@ -180,7 +180,7 @@ def test_create_without_execution_context_raises(monkeypatch): monkeypatch.setattr( large_binary_manager, "_ensure_bucket_exists", lambda bucket: None ) - monkeypatch.setattr(StorageConfig, "EXECUTION_ID", None, raising=False) + monkeypatch.setattr(large_binary_manager, "_current_execution_id", None) with pytest.raises(RuntimeError): large_binary_manager.create() From f05117613146c0d9441d0e867c59d31d0bf4394d Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Thu, 28 May 2026 13:38:00 -0700 Subject: [PATCH 11/18] refactor: read large-binary execution id through a getter (#4123) Add get_current_execution_id() and route create() and the tests through it instead of reading the module-level _current_execution_id directly, keeping the holder's access encapsulated. --- .../main/python/pytexera/storage/large_binary_manager.py | 7 ++++++- .../python/pytexera/storage/test_large_binary_manager.py | 6 +++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/amber/src/main/python/pytexera/storage/large_binary_manager.py b/amber/src/main/python/pytexera/storage/large_binary_manager.py index c77e996bad6..06acd1a5215 100644 --- a/amber/src/main/python/pytexera/storage/large_binary_manager.py +++ b/amber/src/main/python/pytexera/storage/large_binary_manager.py @@ -43,6 +43,11 @@ def set_current_execution_id(execution_id): _current_execution_id = execution_id +def get_current_execution_id(): + """Returns the execution id set for this worker, or None if unset.""" + return _current_execution_id + + def _get_s3_client(): """Get or initialize S3 client (lazy initialization, cached).""" global _s3_client @@ -87,7 +92,7 @@ def create() -> str: S3 URI string (format: s3://bucket/objects/{execution_id}/{uuid}) """ _ensure_bucket_exists(DEFAULT_BUCKET) - execution_id = _current_execution_id + execution_id = get_current_execution_id() if execution_id is None: raise RuntimeError( "largebinary() requires an execution context, but no execution id " diff --git a/amber/src/test/python/pytexera/storage/test_large_binary_manager.py b/amber/src/test/python/pytexera/storage/test_large_binary_manager.py index 285130751c2..6182f072efd 100644 --- a/amber/src/test/python/pytexera/storage/test_large_binary_manager.py +++ b/amber/src/test/python/pytexera/storage/test_large_binary_manager.py @@ -45,7 +45,7 @@ def setup_storage_config(self): s3_auth_password="minioadmin", ) # Provide a default execution id so create() doesn't raise. - original_eid = large_binary_manager._current_execution_id + original_eid = large_binary_manager.get_current_execution_id() large_binary_manager.set_current_execution_id(1) yield large_binary_manager.set_current_execution_id(original_eid) @@ -140,7 +140,7 @@ def test_create_generates_unique_uri(self): # Check URI format: s3://bucket/objects/{eid}/{uuid} assert uri.startswith("s3://") assert uri.startswith(f"s3://{large_binary_manager.DEFAULT_BUCKET}/") - assert f"objects/{large_binary_manager._current_execution_id}/" in uri + assert f"objects/{large_binary_manager.get_current_execution_id()}/" in uri # Verify bucket was checked/created mock_client.head_bucket.assert_called_once_with( @@ -159,7 +159,7 @@ def test_create_uses_default_bucket(self): uri = large_binary_manager.create() assert large_binary_manager.DEFAULT_BUCKET in uri - assert f"objects/{large_binary_manager._current_execution_id}/" in uri + assert f"objects/{large_binary_manager.get_current_execution_id()}/" in uri def test_create_stamps_execution_id(monkeypatch): From d885c2bdc0303d4cb2c5c9cd2cd334b7118eacae Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Thu, 28 May 2026 14:49:41 -0700 Subject: [PATCH 12/18] Format and refactoring --- .../control/initialize_executor_handler.py | 2 - .../pytexera/storage/large_binary_manager.py | 10 ++--- .../storage/test_large_binary_manager.py | 4 +- .../service/util/LargeBinaryManager.scala | 15 ++++++- .../util/LargeBinaryManagerUnitSpec.scala | 45 +++++++++++++++++++ 5 files changed, 62 insertions(+), 14 deletions(-) create mode 100644 common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerUnitSpec.scala diff --git a/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py b/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py index 254f12a1acb..46d851f2419 100644 --- a/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py +++ b/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py @@ -29,8 +29,6 @@ async def initialize_executor(self, req: InitializeExecutorRequest) -> EmptyRetu from pytexera.storage import large_binary_manager op_exec_with_code: OpExecWithCode = get_one_of(req.op_exec_init_info) - # betterproto returns an empty (falsy) ExecutionIdentity when the field is - # unset, not None, so use truthiness to detect a real id. large_binary_manager.set_current_execution_id( req.execution_id.id if req.execution_id else None ) diff --git a/amber/src/main/python/pytexera/storage/large_binary_manager.py b/amber/src/main/python/pytexera/storage/large_binary_manager.py index 06acd1a5215..35ebd5312c2 100644 --- a/amber/src/main/python/pytexera/storage/large_binary_manager.py +++ b/amber/src/main/python/pytexera/storage/large_binary_manager.py @@ -30,10 +30,7 @@ _s3_client = None DEFAULT_BUCKET = "texera-large-binaries" -# Per-worker execution context. A Python worker is a single process serving one -# execution, so a module-level value is sufficient (no thread-local needed). It is -# set at executor init and read by create() so the user-facing largebinary() API -# stays execution-id-free. +# Set at executor init and read by create() _current_execution_id = None @@ -82,11 +79,10 @@ def _ensure_bucket_exists(bucket: str): def create() -> str: """ - Creates a new largebinary reference with a unique, execution-scoped S3 URI. + Creates a new largebinary reference with a unique S3 URI. The object key is namespaced by the current execution id so cleanup can delete - only this execution's objects. The execution id is injected by the system (set via - set_current_execution_id() when the worker is initialized); callers never pass it. + only this execution's objects. Returns: S3 URI string (format: s3://bucket/objects/{execution_id}/{uuid}) diff --git a/amber/src/test/python/pytexera/storage/test_large_binary_manager.py b/amber/src/test/python/pytexera/storage/test_large_binary_manager.py index 6182f072efd..7b9b6f3ce11 100644 --- a/amber/src/test/python/pytexera/storage/test_large_binary_manager.py +++ b/amber/src/test/python/pytexera/storage/test_large_binary_manager.py @@ -171,9 +171,7 @@ def test_create_stamps_execution_id(monkeypatch): uri = large_binary_manager.create() - assert re.fullmatch( - r"s3://texera-large-binaries/objects/42/[0-9a-fA-F-]+", uri - ) + assert re.fullmatch(r"s3://texera-large-binaries/objects/42/[0-9a-fA-F-]+", uri) def test_create_without_execution_context_raises(monkeypatch): diff --git a/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala index 5a7342b00e0..069141f5723 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala @@ -70,9 +70,20 @@ object LargeBinaryManager extends LazyLogging { * * @param executionId the execution whose large binaries should be removed */ - def deleteByExecution(executionId: Long): Unit = { + def deleteByExecution(executionId: Long): Unit = + deleteByExecution(executionId, S3StorageClient.deleteDirectory) + + /** + * Overload that takes the directory-delete operation as a parameter. Visible for + * testing so specs can exercise the swallow-and-log error path without a live + * S3/MinIO endpoint. + */ + private[util] def deleteByExecution( + executionId: Long, + deleteDir: (String, String) => Unit + ): Unit = { try { - S3StorageClient.deleteDirectory(DEFAULT_BUCKET, s"objects/$executionId") + deleteDir(DEFAULT_BUCKET, s"objects/$executionId") logger.info( s"Deleted large binaries for execution $executionId from bucket: $DEFAULT_BUCKET" ) diff --git a/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerUnitSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerUnitSpec.scala new file mode 100644 index 00000000000..196a9ca7827 --- /dev/null +++ b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerUnitSpec.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.service.util + +import org.scalatest.funsuite.AnyFunSuite + +/** + * Unit tests for [[LargeBinaryManager.deleteByExecution]] that do not require a live + * S3/MinIO endpoint. The directory-delete operation is injected so both the success + * and the swallow-and-log error path can be exercised deterministically. + */ +class LargeBinaryManagerUnitSpec extends AnyFunSuite { + + test("deleteByExecution issues a delete scoped to the execution's object prefix") { + var captured: Option[(String, String)] = None + LargeBinaryManager.deleteByExecution( + 42L, + (bucket, prefix) => captured = Some((bucket, prefix)) + ) + assert(captured.contains((LargeBinaryManager.DEFAULT_BUCKET, "objects/42"))) + } + + test("deleteByExecution swallows exceptions raised by the underlying delete") { + // The error path logs and returns; it must not propagate the failure to callers. + LargeBinaryManager.deleteByExecution(7L, (_, _) => throw new RuntimeException("boom")) + succeed + } +} From 3330bf538db291b1beb02f2ced6e9d114c4108a6 Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Thu, 28 May 2026 17:08:33 -0700 Subject: [PATCH 13/18] Polish comments --- .../apache/texera/service/util/LargeBinaryManager.scala | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala index 069141f5723..467a3670213 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala @@ -34,10 +34,7 @@ object LargeBinaryManager extends LazyLogging { /** * Worker-scoped execution context. It is set on the data-processing thread when an - * executor is initialized, so that create() can stamp each object key with its owning - * execution id without threading the id through every operator. A thread-local keeps - * concurrent executions in the same JVM isolated, because each worker runs on its own - * data-processing thread. + * executor is initialized. */ private val currentExecutionId: ThreadLocal[Option[Long]] = ThreadLocal.withInitial(() => Option.empty[Long]) @@ -75,8 +72,7 @@ object LargeBinaryManager extends LazyLogging { /** * Overload that takes the directory-delete operation as a parameter. Visible for - * testing so specs can exercise the swallow-and-log error path without a live - * S3/MinIO endpoint. + * testing */ private[util] def deleteByExecution( executionId: Long, From 116291d1d6337e8f02776b77fe7489465392262a Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Mon, 1 Jun 2026 12:06:19 -0700 Subject: [PATCH 14/18] refactor: encapsulate large-binary state in a LargeBinaryManager class (#4123) Address review feedback: replace the module-level globals (_s3_client, DEFAULT_BUCKET, _current_execution_id) and free functions with a LargeBinaryManager class holding state as instance attributes, exposed as a single shared per-worker singleton. No more `global` statements; mirrors the JVM `object LargeBinaryManager`. Consumers import the singleton, so call sites are unchanged. Update the stream/type tests to patch the singleton instance. --- .../control/initialize_executor_handler.py | 2 +- .../python/core/models/type/large_binary.py | 2 +- .../storage/large_binary_input_stream.py | 2 +- .../pytexera/storage/large_binary_manager.py | 146 ++++++++++-------- .../storage/large_binary_output_stream.py | 2 +- .../core/models/type/test_large_binary.py | 3 +- .../storage/test_large_binary_input_stream.py | 2 +- .../storage/test_large_binary_manager.py | 2 +- .../test_large_binary_output_stream.py | 2 +- 9 files changed, 88 insertions(+), 75 deletions(-) diff --git a/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py b/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py index 46d851f2419..bf2de3f325c 100644 --- a/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py +++ b/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py @@ -26,7 +26,7 @@ class InitializeExecutorHandler(ControlHandler): async def initialize_executor(self, req: InitializeExecutorRequest) -> EmptyReturn: - from pytexera.storage import large_binary_manager + from pytexera.storage.large_binary_manager import large_binary_manager op_exec_with_code: OpExecWithCode = get_one_of(req.op_exec_init_info) large_binary_manager.set_current_execution_id( diff --git a/amber/src/main/python/core/models/type/large_binary.py b/amber/src/main/python/core/models/type/large_binary.py index 581a688912b..6a833c81461 100644 --- a/amber/src/main/python/core/models/type/large_binary.py +++ b/amber/src/main/python/core/models/type/large_binary.py @@ -63,7 +63,7 @@ def __init__(self, uri: Optional[str] = None): """ if uri is None: # Lazy import to avoid circular dependencies - from pytexera.storage import large_binary_manager + from pytexera.storage.large_binary_manager import large_binary_manager uri = large_binary_manager.create() diff --git a/amber/src/main/python/pytexera/storage/large_binary_input_stream.py b/amber/src/main/python/pytexera/storage/large_binary_input_stream.py index 68368c5c12d..f7fdfc4282e 100644 --- a/amber/src/main/python/pytexera/storage/large_binary_input_stream.py +++ b/amber/src/main/python/pytexera/storage/large_binary_input_stream.py @@ -60,7 +60,7 @@ def __init__(self, large_binary: largebinary): def _lazy_init(self): """Download from S3 on first read operation.""" - from pytexera.storage import large_binary_manager + from pytexera.storage.large_binary_manager import large_binary_manager s3 = large_binary_manager._get_s3_client() response = s3.get_object( diff --git a/amber/src/main/python/pytexera/storage/large_binary_manager.py b/amber/src/main/python/pytexera/storage/large_binary_manager.py index 35ebd5312c2..d2a4d68bf66 100644 --- a/amber/src/main/python/pytexera/storage/large_binary_manager.py +++ b/amber/src/main/python/pytexera/storage/large_binary_manager.py @@ -26,74 +26,86 @@ from loguru import logger from core.storage.storage_config import StorageConfig -# Module-level state -_s3_client = None -DEFAULT_BUCKET = "texera-large-binaries" -# Set at executor init and read by create() -_current_execution_id = None +class LargeBinaryManager: + """Manages execution-scoped large binaries in S3 for a worker process. - -def set_current_execution_id(execution_id): - """Sets the execution id used to scope large binaries created by this worker.""" - global _current_execution_id - _current_execution_id = execution_id - - -def get_current_execution_id(): - """Returns the execution id set for this worker, or None if unset.""" - return _current_execution_id - - -def _get_s3_client(): - """Get or initialize S3 client (lazy initialization, cached).""" - global _s3_client - if _s3_client is None: - try: - import boto3 - from botocore.config import Config - except ImportError as e: - raise RuntimeError("boto3 required. Install with: pip install boto3") from e - - _s3_client = boto3.client( - "s3", - endpoint_url=StorageConfig.S3_ENDPOINT, - aws_access_key_id=StorageConfig.S3_AUTH_USERNAME, - aws_secret_access_key=StorageConfig.S3_AUTH_PASSWORD, - region_name=StorageConfig.S3_REGION, - config=Config(signature_version="s3v4", s3={"addressing_style": "path"}), - ) - return _s3_client - - -def _ensure_bucket_exists(bucket: str): - """Ensure S3 bucket exists, creating it if necessary.""" - s3 = _get_s3_client() - try: - s3.head_bucket(Bucket=bucket) - except s3.exceptions.NoSuchBucket: - logger.debug(f"Bucket {bucket} not found, creating it") - s3.create_bucket(Bucket=bucket) - logger.info(f"Created bucket: {bucket}") - - -def create() -> str: + A Python worker is a single process serving one execution, so a single shared + instance (the module-level ``large_binary_manager``) holds the cached S3 client + and the current execution id. Mirrors the JVM ``LargeBinaryManager``. """ - Creates a new largebinary reference with a unique S3 URI. - - The object key is namespaced by the current execution id so cleanup can delete - only this execution's objects. - Returns: - S3 URI string (format: s3://bucket/objects/{execution_id}/{uuid}) - """ - _ensure_bucket_exists(DEFAULT_BUCKET) - execution_id = get_current_execution_id() - if execution_id is None: - raise RuntimeError( - "largebinary() requires an execution context, but no execution id " - "has been set for this worker." - ) - unique_id = uuid.uuid4() - object_key = f"objects/{execution_id}/{unique_id}" - return f"s3://{DEFAULT_BUCKET}/{object_key}" + DEFAULT_BUCKET = "texera-large-binaries" + + def __init__(self): + self._s3_client = None + # Execution context: set at executor init and read by create() so the + # user-facing largebinary() API stays execution-id-free. + self._current_execution_id = None + + def set_current_execution_id(self, execution_id): + """Sets the execution id used to scope large binaries created by this worker.""" + self._current_execution_id = execution_id + + def get_current_execution_id(self): + """Returns the execution id set for this worker, or None if unset.""" + return self._current_execution_id + + def _get_s3_client(self): + """Get or initialize the S3 client (lazy initialization, cached).""" + if self._s3_client is None: + try: + import boto3 + from botocore.config import Config + except ImportError as e: + raise RuntimeError( + "boto3 required. Install with: pip install boto3" + ) from e + + self._s3_client = boto3.client( + "s3", + endpoint_url=StorageConfig.S3_ENDPOINT, + aws_access_key_id=StorageConfig.S3_AUTH_USERNAME, + aws_secret_access_key=StorageConfig.S3_AUTH_PASSWORD, + region_name=StorageConfig.S3_REGION, + config=Config(signature_version="s3v4", s3={"addressing_style": "path"}), + ) + return self._s3_client + + def _ensure_bucket_exists(self, bucket: str): + """Ensure the S3 bucket exists, creating it if necessary.""" + s3 = self._get_s3_client() + try: + s3.head_bucket(Bucket=bucket) + except s3.exceptions.NoSuchBucket: + logger.debug(f"Bucket {bucket} not found, creating it") + s3.create_bucket(Bucket=bucket) + logger.info(f"Created bucket: {bucket}") + + def create(self) -> str: + """ + Creates a new largebinary reference with a unique, execution-scoped S3 URI. + + The object key is namespaced by the current execution id so cleanup can delete + only this execution's objects. The execution id is injected by the system (set + via set_current_execution_id() when the worker is initialized); callers never + pass it. + + Returns: + S3 URI string (format: s3://bucket/objects/{execution_id}/{uuid}) + """ + self._ensure_bucket_exists(self.DEFAULT_BUCKET) + execution_id = self.get_current_execution_id() + if execution_id is None: + raise RuntimeError( + "largebinary() requires an execution context, but no execution id " + "has been set for this worker." + ) + unique_id = uuid.uuid4() + object_key = f"objects/{execution_id}/{unique_id}" + return f"s3://{self.DEFAULT_BUCKET}/{object_key}" + + +# Shared singleton for the worker process. Consumers import this instance: +# from pytexera.storage.large_binary_manager import large_binary_manager +large_binary_manager = LargeBinaryManager() diff --git a/amber/src/main/python/pytexera/storage/large_binary_output_stream.py b/amber/src/main/python/pytexera/storage/large_binary_output_stream.py index 0cdf8a3679f..a5e71cf1771 100644 --- a/amber/src/main/python/pytexera/storage/large_binary_output_stream.py +++ b/amber/src/main/python/pytexera/storage/large_binary_output_stream.py @@ -29,7 +29,7 @@ from typing import Optional, Union from io import IOBase from core.models.type.large_binary import largebinary -from pytexera.storage import large_binary_manager +from pytexera.storage.large_binary_manager import large_binary_manager import threading import queue diff --git a/amber/src/test/python/core/models/type/test_large_binary.py b/amber/src/test/python/core/models/type/test_large_binary.py index 36310e1dd53..e5224a78d94 100644 --- a/amber/src/test/python/core/models/type/test_large_binary.py +++ b/amber/src/test/python/core/models/type/test_large_binary.py @@ -18,6 +18,7 @@ import pytest from unittest.mock import patch from core.models.type.large_binary import largebinary +from pytexera.storage.large_binary_manager import large_binary_manager class TestLargeBinary: @@ -31,7 +32,7 @@ def test_create_with_uri(self): def test_create_without_uri(self): """Test creating largebinary without URI (calls large_binary_manager.create).""" - with patch("pytexera.storage.large_binary_manager.create") as mock_create: + with patch.object(large_binary_manager, "create") as mock_create: mock_create.return_value = "s3://bucket/objects/123/uuid" large_binary = largebinary() assert large_binary.uri == "s3://bucket/objects/123/uuid" diff --git a/amber/src/test/python/pytexera/storage/test_large_binary_input_stream.py b/amber/src/test/python/pytexera/storage/test_large_binary_input_stream.py index 35bc5bc634d..a6863e726f8 100644 --- a/amber/src/test/python/pytexera/storage/test_large_binary_input_stream.py +++ b/amber/src/test/python/pytexera/storage/test_large_binary_input_stream.py @@ -20,7 +20,7 @@ from io import BytesIO from core.models.type.large_binary import largebinary from pytexera.storage.large_binary_input_stream import LargeBinaryInputStream -from pytexera.storage import large_binary_manager +from pytexera.storage.large_binary_manager import large_binary_manager class TestLargeBinaryInputStream: diff --git a/amber/src/test/python/pytexera/storage/test_large_binary_manager.py b/amber/src/test/python/pytexera/storage/test_large_binary_manager.py index 7b9b6f3ce11..c6d7461ae40 100644 --- a/amber/src/test/python/pytexera/storage/test_large_binary_manager.py +++ b/amber/src/test/python/pytexera/storage/test_large_binary_manager.py @@ -19,7 +19,7 @@ import pytest from unittest.mock import patch, MagicMock -from pytexera.storage import large_binary_manager +from pytexera.storage.large_binary_manager import large_binary_manager from core.storage.storage_config import StorageConfig diff --git a/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py b/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py index 17725d9c66a..129662e72f0 100644 --- a/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py +++ b/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py @@ -24,7 +24,7 @@ LargeBinaryOutputStream, _QueueReader, ) -from pytexera.storage import large_binary_manager +from pytexera.storage.large_binary_manager import large_binary_manager class TestLargeBinaryOutputStream: From ec952b2979872d6e6f538f2062a655989f0ffd81 Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Mon, 1 Jun 2026 12:37:39 -0700 Subject: [PATCH 15/18] test: cover create() in the MinIO-free LargeBinaryManager unit spec (#4123) The pure create() logic (execution-scoped key + fail-fast when no context is set) was only exercised by the MinIO-backed LargeBinaryManagerSpec. Move those two assertions into LargeBinaryManagerUnitSpec so they run without Docker and count toward coverage; the MinIO spec keeps the isolation test that genuinely needs a live S3 endpoint. deleteByExecution's success and swallow branches were already covered by the unit spec. --- .../service/util/LargeBinaryManagerSpec.scala | 18 ------------ .../util/LargeBinaryManagerUnitSpec.scala | 28 +++++++++++++++++++ 2 files changed, 28 insertions(+), 18 deletions(-) diff --git a/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala index 0a15f0832fa..c2eaff02f82 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala @@ -479,12 +479,6 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase with Bef LargeBinaryManager.deleteByExecution(TestExecutionId) } - test("create() stamps the object key with the current execution id") { - LargeBinaryManager.setCurrentExecutionId(123L) - val uri = LargeBinaryManager.create() - assert(uri.startsWith("s3://texera-large-binaries/objects/123/")) - } - test("deleteByExecution removes only the target execution's binaries") { // Create one binary under execution 1001 and another under 1002. LargeBinaryManager.setCurrentExecutionId(1001L) @@ -502,16 +496,4 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase with Bef LargeBinaryManager.deleteByExecution(1002L) } } - - test("create() throws when no execution context is set on the thread") { - // Run on a fresh thread, where the thread-local defaults to None. - @volatile var caught: Option[Throwable] = None - val t = new Thread(() => { - try LargeBinaryManager.create() - catch { case e: Throwable => caught = Some(e) } - }) - t.start() - t.join() - assert(caught.exists(_.isInstanceOf[IllegalStateException])) - } } diff --git a/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerUnitSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerUnitSpec.scala index 196a9ca7827..9cffe239ee8 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerUnitSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerUnitSpec.scala @@ -42,4 +42,32 @@ class LargeBinaryManagerUnitSpec extends AnyFunSuite { LargeBinaryManager.deleteByExecution(7L, (_, _) => throw new RuntimeException("boom")) succeed } + + test("create returns a URI scoped to the current thread's execution id") { + // create() reads a thread-local; run on a dedicated thread so the execution + // context is isolated and does not leak into other tests. + @volatile var uri: String = "" + val thread = new Thread(() => { + LargeBinaryManager.setCurrentExecutionId(555L) + uri = LargeBinaryManager.create() + }) + thread.start() + thread.join() + val prefix = s"s3://${LargeBinaryManager.DEFAULT_BUCKET}/objects/555/" + assert(uri.startsWith(prefix)) + // a unique (UUID) suffix follows the execution-scoped prefix + assert(uri.stripPrefix(prefix).nonEmpty) + } + + test("create throws when no execution context is set on the thread") { + // A fresh thread starts with no execution id, so create() must fail fast. + @volatile var caught: Option[Throwable] = None + val thread = new Thread(() => { + try LargeBinaryManager.create() + catch { case e: Throwable => caught = Some(e) } + }) + thread.start() + thread.join() + assert(caught.exists(_.isInstanceOf[IllegalStateException])) + } } From 9e78542ebb670b993ef1632240dd1039c091dda9 Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Mon, 1 Jun 2026 12:39:12 -0700 Subject: [PATCH 16/18] Format --- .../src/main/python/pytexera/storage/large_binary_manager.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/amber/src/main/python/pytexera/storage/large_binary_manager.py b/amber/src/main/python/pytexera/storage/large_binary_manager.py index d2a4d68bf66..6f93791dd92 100644 --- a/amber/src/main/python/pytexera/storage/large_binary_manager.py +++ b/amber/src/main/python/pytexera/storage/large_binary_manager.py @@ -68,7 +68,9 @@ def _get_s3_client(self): aws_access_key_id=StorageConfig.S3_AUTH_USERNAME, aws_secret_access_key=StorageConfig.S3_AUTH_PASSWORD, region_name=StorageConfig.S3_REGION, - config=Config(signature_version="s3v4", s3={"addressing_style": "path"}), + config=Config( + signature_version="s3v4", s3={"addressing_style": "path"} + ), ) return self._s3_client From 6d51024c787ed3a5d9ade5831ea34449fbf2d870 Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Mon, 1 Jun 2026 17:19:34 -0700 Subject: [PATCH 17/18] refactor: make Python LargeBinaryManager a __new__-guarded singleton (#4123) Address review feedback: remove the module-level singleton instance (a module global) and instead guard single-instance creation in the class via __new__, so LargeBinaryManager() always returns the same shared instance. Callers import and use the class directly; no module-level instance is exposed. --- .../control/initialize_executor_handler.py | 4 +-- .../python/core/models/type/large_binary.py | 4 +-- .../storage/large_binary_input_stream.py | 4 +-- .../pytexera/storage/large_binary_manager.py | 28 ++++++++++--------- .../storage/large_binary_output_stream.py | 9 +++--- .../core/models/type/test_large_binary.py | 5 +++- .../storage/test_large_binary_input_stream.py | 5 +++- .../storage/test_large_binary_manager.py | 5 +++- .../test_large_binary_output_stream.py | 5 +++- 9 files changed, 42 insertions(+), 27 deletions(-) diff --git a/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py b/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py index bf2de3f325c..4d0bd817a43 100644 --- a/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py +++ b/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py @@ -26,10 +26,10 @@ class InitializeExecutorHandler(ControlHandler): async def initialize_executor(self, req: InitializeExecutorRequest) -> EmptyReturn: - from pytexera.storage.large_binary_manager import large_binary_manager + from pytexera.storage.large_binary_manager import LargeBinaryManager op_exec_with_code: OpExecWithCode = get_one_of(req.op_exec_init_info) - large_binary_manager.set_current_execution_id( + LargeBinaryManager().set_current_execution_id( req.execution_id.id if req.execution_id else None ) self.context.executor_manager.initialize_executor( diff --git a/amber/src/main/python/core/models/type/large_binary.py b/amber/src/main/python/core/models/type/large_binary.py index 6a833c81461..34110f374e5 100644 --- a/amber/src/main/python/core/models/type/large_binary.py +++ b/amber/src/main/python/core/models/type/large_binary.py @@ -63,9 +63,9 @@ def __init__(self, uri: Optional[str] = None): """ if uri is None: # Lazy import to avoid circular dependencies - from pytexera.storage.large_binary_manager import large_binary_manager + from pytexera.storage.large_binary_manager import LargeBinaryManager - uri = large_binary_manager.create() + uri = LargeBinaryManager().create() if not uri.startswith("s3://"): raise ValueError(f"largebinary URI must start with 's3://', got: {uri}") diff --git a/amber/src/main/python/pytexera/storage/large_binary_input_stream.py b/amber/src/main/python/pytexera/storage/large_binary_input_stream.py index f7fdfc4282e..f17373c449e 100644 --- a/amber/src/main/python/pytexera/storage/large_binary_input_stream.py +++ b/amber/src/main/python/pytexera/storage/large_binary_input_stream.py @@ -60,9 +60,9 @@ def __init__(self, large_binary: largebinary): def _lazy_init(self): """Download from S3 on first read operation.""" - from pytexera.storage.large_binary_manager import large_binary_manager + from pytexera.storage.large_binary_manager import LargeBinaryManager - s3 = large_binary_manager._get_s3_client() + s3 = LargeBinaryManager()._get_s3_client() response = s3.get_object( Bucket=self._large_binary.get_bucket_name(), Key=self._large_binary.get_object_key(), diff --git a/amber/src/main/python/pytexera/storage/large_binary_manager.py b/amber/src/main/python/pytexera/storage/large_binary_manager.py index 6f93791dd92..bc21f1dc646 100644 --- a/amber/src/main/python/pytexera/storage/large_binary_manager.py +++ b/amber/src/main/python/pytexera/storage/large_binary_manager.py @@ -30,18 +30,25 @@ class LargeBinaryManager: """Manages execution-scoped large binaries in S3 for a worker process. - A Python worker is a single process serving one execution, so a single shared - instance (the module-level ``large_binary_manager``) holds the cached S3 client - and the current execution id. Mirrors the JVM ``LargeBinaryManager``. + Implemented as a singleton: ``LargeBinaryManager()`` always returns the same + instance, so the cached S3 client and the current execution id are shared across + all callers in the worker process. A Python worker is a single process serving one + execution. Mirrors the JVM ``object LargeBinaryManager``. """ DEFAULT_BUCKET = "texera-large-binaries" - def __init__(self): - self._s3_client = None - # Execution context: set at executor init and read by create() so the - # user-facing largebinary() API stays execution-id-free. - self._current_execution_id = None + _instance = None + + def __new__(cls): + if cls._instance is None: + instance = super().__new__(cls) + instance._s3_client = None + # Execution context: set at executor init and read by create() so the + # user-facing largebinary() API stays execution-id-free. + instance._current_execution_id = None + cls._instance = instance + return cls._instance def set_current_execution_id(self, execution_id): """Sets the execution id used to scope large binaries created by this worker.""" @@ -106,8 +113,3 @@ def create(self) -> str: unique_id = uuid.uuid4() object_key = f"objects/{execution_id}/{unique_id}" return f"s3://{self.DEFAULT_BUCKET}/{object_key}" - - -# Shared singleton for the worker process. Consumers import this instance: -# from pytexera.storage.large_binary_manager import large_binary_manager -large_binary_manager = LargeBinaryManager() diff --git a/amber/src/main/python/pytexera/storage/large_binary_output_stream.py b/amber/src/main/python/pytexera/storage/large_binary_output_stream.py index a5e71cf1771..2e86dacea3e 100644 --- a/amber/src/main/python/pytexera/storage/large_binary_output_stream.py +++ b/amber/src/main/python/pytexera/storage/large_binary_output_stream.py @@ -29,7 +29,7 @@ from typing import Optional, Union from io import IOBase from core.models.type.large_binary import largebinary -from pytexera.storage.large_binary_manager import large_binary_manager +from pytexera.storage.large_binary_manager import LargeBinaryManager import threading import queue @@ -154,8 +154,9 @@ def write(self, b: Union[bytes, bytearray]) -> int: def upload_worker(): try: - large_binary_manager._ensure_bucket_exists(self._bucket_name) - s3 = large_binary_manager._get_s3_client() + manager = LargeBinaryManager() + manager._ensure_bucket_exists(self._bucket_name) + s3 = manager._get_s3_client() reader = _QueueReader(self._queue) s3.upload_fileobj(reader, self._bucket_name, self._object_key) except Exception as e: @@ -231,7 +232,7 @@ def close(self) -> None: def _cleanup_failed_upload(self): """Clean up a failed upload by deleting the S3 object.""" try: - s3 = large_binary_manager._get_s3_client() + s3 = LargeBinaryManager()._get_s3_client() s3.delete_object(Bucket=self._bucket_name, Key=self._object_key) except Exception: # Ignore cleanup errors - we're already handling an upload failure diff --git a/amber/src/test/python/core/models/type/test_large_binary.py b/amber/src/test/python/core/models/type/test_large_binary.py index e5224a78d94..56348f42659 100644 --- a/amber/src/test/python/core/models/type/test_large_binary.py +++ b/amber/src/test/python/core/models/type/test_large_binary.py @@ -18,7 +18,10 @@ import pytest from unittest.mock import patch from core.models.type.large_binary import largebinary -from pytexera.storage.large_binary_manager import large_binary_manager +from pytexera.storage.large_binary_manager import LargeBinaryManager + +# The manager is a singleton; bind the shared instance for the tests. +large_binary_manager = LargeBinaryManager() class TestLargeBinary: diff --git a/amber/src/test/python/pytexera/storage/test_large_binary_input_stream.py b/amber/src/test/python/pytexera/storage/test_large_binary_input_stream.py index a6863e726f8..7e6a401aef7 100644 --- a/amber/src/test/python/pytexera/storage/test_large_binary_input_stream.py +++ b/amber/src/test/python/pytexera/storage/test_large_binary_input_stream.py @@ -20,7 +20,10 @@ from io import BytesIO from core.models.type.large_binary import largebinary from pytexera.storage.large_binary_input_stream import LargeBinaryInputStream -from pytexera.storage.large_binary_manager import large_binary_manager +from pytexera.storage.large_binary_manager import LargeBinaryManager + +# The manager is a singleton; bind the shared instance for the tests. +large_binary_manager = LargeBinaryManager() class TestLargeBinaryInputStream: diff --git a/amber/src/test/python/pytexera/storage/test_large_binary_manager.py b/amber/src/test/python/pytexera/storage/test_large_binary_manager.py index c6d7461ae40..f33f2798584 100644 --- a/amber/src/test/python/pytexera/storage/test_large_binary_manager.py +++ b/amber/src/test/python/pytexera/storage/test_large_binary_manager.py @@ -19,9 +19,12 @@ import pytest from unittest.mock import patch, MagicMock -from pytexera.storage.large_binary_manager import large_binary_manager +from pytexera.storage.large_binary_manager import LargeBinaryManager from core.storage.storage_config import StorageConfig +# The manager is a singleton; bind the shared instance for the tests. +large_binary_manager = LargeBinaryManager() + class TestLargeBinaryManager: @pytest.fixture(autouse=True) diff --git a/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py b/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py index 129662e72f0..654f0ba4e7b 100644 --- a/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py +++ b/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py @@ -24,7 +24,10 @@ LargeBinaryOutputStream, _QueueReader, ) -from pytexera.storage.large_binary_manager import large_binary_manager +from pytexera.storage.large_binary_manager import LargeBinaryManager + +# The manager is a singleton; bind the shared instance for the tests. +large_binary_manager = LargeBinaryManager() class TestLargeBinaryOutputStream: From aff0ab4daf46e9c505756b2ef1715aa6ec6dc9fb Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Mon, 1 Jun 2026 19:24:09 -0700 Subject: [PATCH 18/18] test: guard the LargeBinaryManager singleton invariant (#4123) Per review, add a simple test asserting LargeBinaryManager() always returns the same shared instance and that state set through one handle is visible through another. --- .../python/pytexera/storage/test_large_binary_manager.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/amber/src/test/python/pytexera/storage/test_large_binary_manager.py b/amber/src/test/python/pytexera/storage/test_large_binary_manager.py index f33f2798584..ccfbdc09a1c 100644 --- a/amber/src/test/python/pytexera/storage/test_large_binary_manager.py +++ b/amber/src/test/python/pytexera/storage/test_large_binary_manager.py @@ -185,3 +185,12 @@ def test_create_without_execution_context_raises(monkeypatch): with pytest.raises(RuntimeError): large_binary_manager.create() + + +def test_largebinarymanager_is_a_singleton(monkeypatch): + # Constructing the manager always returns the same shared instance. + assert LargeBinaryManager() is LargeBinaryManager() + + # State set through one handle is visible through another (shared instance). + monkeypatch.setattr(LargeBinaryManager(), "_current_execution_id", 314) + assert LargeBinaryManager().get_current_execution_id() == 314