diff --git a/amber/src/test/scala/org/apache/texera/web/service/ExecutionsMetadataPersistServiceSpec.scala b/amber/src/test/scala/org/apache/texera/web/service/ExecutionsMetadataPersistServiceSpec.scala new file mode 100644 index 00000000000..f0f583ecc47 --- /dev/null +++ b/amber/src/test/scala/org/apache/texera/web/service/ExecutionsMetadataPersistServiceSpec.scala @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.service + +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState +import org.apache.texera.amber.engine.common.Utils.maptoStatusCode +import org.apache.texera.amber.engine.common.executionruntimestate.ExecutionMetadataStore +import org.apache.texera.dao.MockTexeraDB +import org.apache.texera.dao.jooq.generated.Tables._ +import org.apache.texera.dao.jooq.generated.tables.daos.{ + UserDao, + WorkflowComputingUnitDao, + WorkflowDao, + WorkflowExecutionsDao, + WorkflowVersionDao +} +import org.apache.texera.dao.jooq.generated.tables.pojos.{ + User, + Workflow, + WorkflowComputingUnit, + WorkflowExecutions, + WorkflowVersion +} +import org.apache.texera.web.storage.ExecutionStateStore +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} + +import java.sql.Timestamp +import java.util.UUID + +class ExecutionsMetadataPersistServiceSpec + extends AnyFlatSpec + with Matchers + with BeforeAndAfterAll + with BeforeAndAfterEach + with MockTexeraDB { + + // Randomise the seeded wid/uid so a parallel run of unrelated specs that + // happen to seed the same ids wouldn't collide on the embedded postgres. + private val testWid = 7000 + scala.util.Random.nextInt(1000) + private val testUid = 7000 + scala.util.Random.nextInt(1000) + + private var workflowDao: WorkflowDao = _ + private var workflowVersionDao: WorkflowVersionDao = _ + private var workflowExecutionsDao: WorkflowExecutionsDao = _ + private var userDao: UserDao = _ + private var computingUnitDao: WorkflowComputingUnitDao = _ + private var seededVid: Integer = _ + private var seededCuid: Integer = _ + + override protected def beforeAll(): Unit = { + initializeDBAndReplaceDSLContext() + } + + override protected def afterAll(): Unit = { + shutdownDB() + } + + override protected def beforeEach(): Unit = { + val cfg = getDSLContext.configuration() + workflowDao = new WorkflowDao(cfg) + workflowVersionDao = new WorkflowVersionDao(cfg) + workflowExecutionsDao = new WorkflowExecutionsDao(cfg) + userDao = new UserDao(cfg) + computingUnitDao = new WorkflowComputingUnitDao(cfg) + cleanup() + + val user = new User + user.setUid(testUid) + user.setName("metadata_persist_spec_user") + user.setEmail(s"user_${UUID.randomUUID()}@example.com") + user.setPassword("password") + userDao.insert(user) + + val workflow = new Workflow + workflow.setWid(testWid) + workflow.setName(s"wf_${UUID.randomUUID().toString.substring(0, 8)}") + workflow.setContent("{}") + workflow.setDescription("") + workflow.setCreationTime(new Timestamp(System.currentTimeMillis())) + workflow.setLastModifiedTime(new Timestamp(System.currentTimeMillis())) + workflowDao.insert(workflow) + + // Seed one version explicitly so insertNewExecution's getLatestVersion + // takes the happy "max(existing)" branch instead of falling into the + // back-compat "insert one for you" branch (which would drag in + // WorkflowVersionResource.insertNewVersion's diff/aggregate logic). + val version = new WorkflowVersion + version.setWid(testWid) + version.setContent("{}") + version.setCreationTime(new Timestamp(System.currentTimeMillis())) + workflowVersionDao.insert(version) + seededVid = version.getVid + + // workflow_executions.cuid has a FK to workflow_computing_unit; seed one + // so insertNewExecution's setCuid(...) call doesn't trip the constraint. + val cu = new WorkflowComputingUnit + cu.setUid(testUid) + cu.setName("test-cu") + computingUnitDao.insert(cu) + seededCuid = cu.getCuid + } + + override protected def afterEach(): Unit = cleanup() + + private def cleanup(): Unit = { + getDSLContext + .deleteFrom(WORKFLOW_EXECUTIONS) + .where( + WORKFLOW_EXECUTIONS.VID.in( + getDSLContext + .select(WORKFLOW_VERSION.VID) + .from(WORKFLOW_VERSION) + .where(WORKFLOW_VERSION.WID.eq(testWid)) + ) + ) + .execute() + getDSLContext + .deleteFrom(WORKFLOW_VERSION) + .where(WORKFLOW_VERSION.WID.eq(testWid)) + .execute() + getDSLContext.deleteFrom(WORKFLOW).where(WORKFLOW.WID.eq(testWid)).execute() + getDSLContext + .deleteFrom(WORKFLOW_COMPUTING_UNIT) + .where(WORKFLOW_COMPUTING_UNIT.UID.eq(testUid)) + .execute() + getDSLContext.deleteFrom(USER).where(USER.UID.eq(testUid)).execute() + } + + // Helper: insert an execution row tied to the seeded version and return its eid. + private def seedExecution(status: Byte = 0): Integer = { + val exec = new WorkflowExecutions + exec.setVid(seededVid) + exec.setUid(testUid) + exec.setStatus(status) + exec.setResult("") + exec.setStartingTime(new Timestamp(System.currentTimeMillis())) + exec.setBookmarked(false) + exec.setName("seeded execution") + exec.setEnvironmentVersion("test-env") + workflowExecutionsDao.insert(exec) + exec.getEid + } + + // -- insertNewExecution ----------------------------------------------------- + + "insertNewExecution" should "insert a row tied to the latest workflow version" in { + val id = ExecutionsMetadataPersistService.insertNewExecution( + WorkflowIdentity(testWid.toLong), + Some(testUid), + executionName = "named-execution", + environmentVersion = "env-1", + computingUnitId = seededCuid + ) + id should not be ExecutionIdentity(0L) + + val stored = workflowExecutionsDao.fetchOneByEid(id.id.toInt) + stored should not be null + stored.getVid shouldBe seededVid + stored.getUid shouldBe testUid + stored.getName shouldBe "named-execution" + stored.getEnvironmentVersion shouldBe "env-1" + stored.getCuid shouldBe seededCuid + } + + it should "skip setName when executionName is the empty string" in { + val id = ExecutionsMetadataPersistService.insertNewExecution( + WorkflowIdentity(testWid.toLong), + Some(testUid), + executionName = "", + environmentVersion = "env-2", + computingUnitId = seededCuid + ) + val stored = workflowExecutionsDao.fetchOneByEid(id.id.toInt) + // The DDL default for workflow_executions.name is 'Untitled Execution' + // (sql/texera_ddl.sql). The production code path explicitly does not + // call setName for an empty string, so the row should fall back to the + // DDL default rather than persisting "". + stored.getName shouldBe "Untitled Execution" + } + + it should "throw a DB constraint violation when uid is None" in { + // The method signature accepts Option[Integer] for uid and calls + // `newExecution.setUid(uid.orNull)`, but workflow_executions.uid is + // NOT NULL per texera_ddl.sql, so passing None propagates a jOOQ + // DataAccessException. Pinning the current behavior so a future fix — + // either tightening the signature to a required Integer or making the + // column nullable — breaks the spec deliberately. See follow-up bug. + val ex = intercept[org.jooq.exception.DataAccessException] { + ExecutionsMetadataPersistService.insertNewExecution( + WorkflowIdentity(testWid.toLong), + None, + executionName = "anonymous", + environmentVersion = "env-3", + computingUnitId = seededCuid + ) + } + ex.getMessage should include("uid") + } + + // -- tryGetExistingExecution ------------------------------------------------ + + "tryGetExistingExecution" should "return Some(row) for a known eid" in { + val eid = seedExecution() + val fetched = ExecutionsMetadataPersistService.tryGetExistingExecution( + ExecutionIdentity(eid.longValue()) + ) + fetched shouldBe defined + fetched.get.getEid shouldBe eid + } + + it should "currently return Some(null) for an unknown eid (latent bug)" in { + // Pin actual behavior: `fetchOneByEid` returns null for a miss (no throw), + // and the production code wraps the result in `Some(...)` before the + // try/catch can convert it to None. So the Option contract is broken for + // misses — callers that destructure with `getOrElse` get the null through. + // The catch only fires on hard errors (e.g. a closed connection), which + // is why the method name says "tryGet". Filed as a follow-up bug. + val fetched = ExecutionsMetadataPersistService.tryGetExistingExecution( + ExecutionIdentity(-1L) + ) + fetched shouldBe Some(null) + } + + it should "(intended contract) return None for an unknown eid" in { + // xfail-strict equivalent in ScalaTest: invert via intercept. When the + // bug above is fixed (e.g. `Option(workflowExecutionsDao.fetchOneByEid(...))` + // instead of `Some(...)`), the inner assertion will pass, intercept will + // not catch a TestFailedException, and this test will flip red — forcing + // the pinned-behavior test above to be updated in the same PR. + intercept[org.scalatest.exceptions.TestFailedException] { + val fetched = ExecutionsMetadataPersistService.tryGetExistingExecution( + ExecutionIdentity(-1L) + ) + fetched shouldBe None + } + } + + // -- tryUpdateExistingExecution --------------------------------------------- + + "tryUpdateExistingExecution" should "apply the update function to the stored row" in { + val eid = seedExecution(status = 0) + ExecutionsMetadataPersistService.tryUpdateExistingExecution( + ExecutionIdentity(eid.longValue()) + ) { exec => + exec.setStatus(2.toByte) // PAUSED + exec.setName("renamed via update") + } + val after = workflowExecutionsDao.fetchOneByEid(eid) + after.getStatus shouldBe 2.toByte + after.getName shouldBe "renamed via update" + } + + it should "swallow the update error for an unknown eid and leave existing rows untouched" in { + // Pin the silent-failure contract: fetchOneByEid returns null, the + // update closure NPEs on it, the catch logs "Unable to update execution" + // and continues. The seeded row stays untouched. + val eid = seedExecution(status = 1) + noException should be thrownBy + ExecutionsMetadataPersistService.tryUpdateExistingExecution( + ExecutionIdentity(-1L) + ) { exec => exec.setStatus(9.toByte) } + val after = workflowExecutionsDao.fetchOneByEid(eid) + after.getStatus shouldBe 1.toByte + } + + // -- ExecutionStateStore.updateWorkflowState -------------------------------- + // + // updateWorkflowState wraps tryUpdateExistingExecution to also bump the + // in-memory ExecutionMetadataStore's `state`. Exercising it here keeps the + // DB-backed setup (workflow/version/execution rows) in one place; the + // pure-logic ExecutionStateStoreSpec sibling has no DB dependency. + + "ExecutionStateStore.updateWorkflowState" should "set status via maptoStatusCode and return the metadata store with new state" in { + val eid = seedExecution(status = 0) + val before = workflowExecutionsDao.fetchOneByEid(eid) + val beforeTs = before.getLastUpdateTime + val store = ExecutionMetadataStore( + state = WorkflowAggregatedState.UNINITIALIZED, + executionId = ExecutionIdentity(eid.longValue()) + ) + val updated = + ExecutionStateStore.updateWorkflowState(WorkflowAggregatedState.COMPLETED, store) + updated.state shouldBe WorkflowAggregatedState.COMPLETED + + val after = workflowExecutionsDao.fetchOneByEid(eid) + after.getStatus shouldBe maptoStatusCode(WorkflowAggregatedState.COMPLETED) + // lastUpdateTime is set unconditionally to System.currentTimeMillis(). + // Asserting it advanced past the seeded null/older value catches a + // regression that drops the setLastUpdateTime call. + Option(beforeTs) match { + case Some(t) => after.getLastUpdateTime.getTime should be >= t.getTime + case None => after.getLastUpdateTime should not be null + } + } + + it should "still return a metadataStore with the new state when the eid is unknown" in { + // updateWorkflowState first calls tryUpdateExistingExecution (which + // silently swallows the unknown-eid error) and then unconditionally + // returns metadataStore.withState(state). Document this so a future + // refactor that makes the failure surface (e.g. via Try / Either) has + // a spec to migrate. + val store = ExecutionMetadataStore( + state = WorkflowAggregatedState.UNINITIALIZED, + executionId = ExecutionIdentity(-1L) + ) + val updated = + ExecutionStateStore.updateWorkflowState(WorkflowAggregatedState.FAILED, store) + updated.state shouldBe WorkflowAggregatedState.FAILED + } +}