diff --git a/amber/src/test/scala/org/apache/texera/web/SubscriptionManagerSpec.scala b/amber/src/test/scala/org/apache/texera/web/SubscriptionManagerSpec.scala new file mode 100644 index 00000000000..c590216c6a7 --- /dev/null +++ b/amber/src/test/scala/org/apache/texera/web/SubscriptionManagerSpec.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web + +import io.reactivex.rxjava3.disposables.Disposable +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import java.util.concurrent.atomic.AtomicInteger + +class SubscriptionManagerSpec extends AnyFlatSpec with Matchers { + + // Minimal concrete subject — SubscriptionManager is a trait, and the + // production mixins (SessionState, WorkflowService, ...) drag in heavy + // dependencies that aren't part of what this spec is exercising. + private class TestManager extends SubscriptionManager + + // A Disposable that counts dispose() invocations so the spec can tell + // "disposed exactly once" from "disposed twice" without resorting to + // Mockito. + private class CountingDisposable extends Disposable { + private val disposed = new AtomicInteger(0) + override def dispose(): Unit = disposed.incrementAndGet() + override def isDisposed: Boolean = disposed.get() > 0 + def disposeCount: Int = disposed.get() + } + + "unsubscribeAll" should "dispose every added subscription in insertion order" in { + val mgr = new TestManager + val order = scala.collection.mutable.ListBuffer.empty[String] + val a = Disposable.fromAction(() => order += "a") + val b = Disposable.fromAction(() => order += "b") + val c = Disposable.fromAction(() => order += "c") + mgr.addSubscription(a) + mgr.addSubscription(b) + mgr.addSubscription(c) + mgr.unsubscribeAll() + order.toList shouldBe List("a", "b", "c") + } + + it should "clear the internal buffer so a second call is a no-op" in { + val mgr = new TestManager + val d = new CountingDisposable + mgr.addSubscription(d) + mgr.unsubscribeAll() + mgr.unsubscribeAll() + d.disposeCount shouldBe 1 + } + + it should "do nothing when no subscriptions have been added" in { + val mgr = new TestManager + noException should be thrownBy mgr.unsubscribeAll() + } + + it should "let new subscriptions accumulate after a previous unsubscribeAll" in { + val mgr = new TestManager + val first = new CountingDisposable + val second = new CountingDisposable + mgr.addSubscription(first) + mgr.unsubscribeAll() + mgr.addSubscription(second) + mgr.unsubscribeAll() + first.disposeCount shouldBe 1 + second.disposeCount shouldBe 1 + } +} diff --git a/amber/src/test/scala/org/apache/texera/web/WebsocketInputSpec.scala b/amber/src/test/scala/org/apache/texera/web/WebsocketInputSpec.scala new file mode 100644 index 00000000000..f111b5ff5e2 --- /dev/null +++ b/amber/src/test/scala/org/apache/texera/web/WebsocketInputSpec.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web + +import org.apache.texera.web.model.websocket.request.{ + HeartBeatRequest, + TexeraWebSocketRequest, + WorkflowKillRequest +} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import java.util.concurrent.atomic.AtomicReference +import scala.collection.mutable + +class WebsocketInputSpec extends AnyFlatSpec with Matchers { + + // Each test gets a fresh input + error sink. Throwables routed through + // `errorHandler` land in `errors` for assertion. + private def newInputWithErrorSink(): (WebsocketInput, mutable.ListBuffer[Throwable]) = { + val errors = mutable.ListBuffer.empty[Throwable] + (new WebsocketInput(errors += _), errors) + } + + "subscribe" should "deliver requests whose runtime class matches T" in { + val (input, errors) = newInputWithErrorSink() + val received = mutable.ListBuffer.empty[(HeartBeatRequest, Option[Integer])] + val sub = input.subscribe[HeartBeatRequest]((req, uid) => received += ((req, uid))) + try { + val req = HeartBeatRequest() + input.onNext(req, Some(7)) + received.toList shouldBe List((req, Some(7))) + errors shouldBe empty + } finally sub.dispose() + } + + it should "silently drop requests that do not match T" in { + val (input, errors) = newInputWithErrorSink() + val received = mutable.ListBuffer.empty[TexeraWebSocketRequest] + val sub = input.subscribe[HeartBeatRequest]((req, _) => received += req) + try { + input.onNext(WorkflowKillRequest(), None) + input.onNext(WorkflowKillRequest(), Some(1)) + received shouldBe empty + errors shouldBe empty + } finally sub.dispose() + } + + it should "pass through uidOpt verbatim, including None" in { + val (input, _) = newInputWithErrorSink() + val seenUids = mutable.ListBuffer.empty[Option[Integer]] + val sub = input.subscribe[HeartBeatRequest]((_, uid) => seenUids += uid) + try { + input.onNext(HeartBeatRequest(), None) + input.onNext(HeartBeatRequest(), Some(42)) + // Integer is a Java boxed type; compare via .map(_.intValue) to + // avoid hinging on Integer identity. + seenUids.map(_.map(_.intValue)).toList shouldBe List(None, Some(42)) + } finally sub.dispose() + } + + "subscribe" should "route exceptions thrown inside the callback to errorHandler" in { + val (input, errors) = newInputWithErrorSink() + val boom = new RuntimeException("boom") + val sub = input.subscribe[HeartBeatRequest]((_, _) => throw boom) + try { + input.onNext(HeartBeatRequest(), None) + errors.toList shouldBe List(boom) + } finally sub.dispose() + } + + it should "keep delivering events to other subscribers after one callback throws" in { + val (input, errors) = newInputWithErrorSink() + val survivorCount = new AtomicReference[Int](0) + val throwingSub = + input.subscribe[HeartBeatRequest]((_, _) => throw new IllegalStateException("nope")) + val survivorSub = input.subscribe[HeartBeatRequest]((_, _) => survivorCount.updateAndGet(_ + 1)) + try { + input.onNext(HeartBeatRequest(), None) + input.onNext(HeartBeatRequest(), None) + survivorCount.get() shouldBe 2 + errors should have size 2 + } finally { + throwingSub.dispose() + survivorSub.dispose() + } + } + + it should "stop delivering events after the returned Disposable is disposed" in { + val (input, _) = newInputWithErrorSink() + val count = new AtomicReference[Int](0) + val sub = input.subscribe[HeartBeatRequest]((_, _) => count.updateAndGet(_ + 1)) + input.onNext(HeartBeatRequest(), None) + sub.dispose() + input.onNext(HeartBeatRequest(), None) + input.onNext(HeartBeatRequest(), None) + count.get() shouldBe 1 + } +} diff --git a/amber/src/test/scala/org/apache/texera/web/auth/UserRoleAuthorizerSpec.scala b/amber/src/test/scala/org/apache/texera/web/auth/UserRoleAuthorizerSpec.scala new file mode 100644 index 00000000000..d83e36244ba --- /dev/null +++ b/amber/src/test/scala/org/apache/texera/web/auth/UserRoleAuthorizerSpec.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.auth + +import org.apache.texera.auth.SessionUser +import org.apache.texera.dao.jooq.generated.enums.UserRoleEnum +import org.apache.texera.dao.jooq.generated.tables.pojos.User +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class UserRoleAuthorizerSpec extends AnyFlatSpec with Matchers { + + // The dropwizard Authorizer contract hands the authenticator a + // SessionUser; reproduce that with the jooq-generated POJO so the test + // exercises the same isRoleOf path the production filter does. + private def sessionFor(role: UserRoleEnum): SessionUser = { + val u = new User() + u.setRole(role) + new SessionUser(u) + } + + "authorize" should "return true when the role string matches the user's role" in { + UserRoleAuthorizer.authorize(sessionFor(UserRoleEnum.ADMIN), "ADMIN") shouldBe true + UserRoleAuthorizer.authorize(sessionFor(UserRoleEnum.REGULAR), "REGULAR") shouldBe true + } + + it should "return false when the user's role differs from the requested one" in { + UserRoleAuthorizer.authorize(sessionFor(UserRoleEnum.REGULAR), "ADMIN") shouldBe false + UserRoleAuthorizer.authorize(sessionFor(UserRoleEnum.INACTIVE), "REGULAR") shouldBe false + } + + it should "throw IllegalArgumentException when the requested role is not a UserRoleEnum value" in { + // Bubbled up from UserRoleEnum.valueOf. Documenting the behavior so a + // future @RolesAllowed typo can't silently downgrade to "always deny". + an[IllegalArgumentException] should be thrownBy + UserRoleAuthorizer.authorize(sessionFor(UserRoleEnum.ADMIN), "SUPER_ADMIN") + } + + it should "treat the role string as case-sensitive (enum names are uppercase)" in { + an[IllegalArgumentException] should be thrownBy + UserRoleAuthorizer.authorize(sessionFor(UserRoleEnum.ADMIN), "admin") + } +} diff --git a/amber/src/test/scala/org/apache/texera/web/storage/ExecutionStateStoreSpec.scala b/amber/src/test/scala/org/apache/texera/web/storage/ExecutionStateStoreSpec.scala new file mode 100644 index 00000000000..1319e57000e --- /dev/null +++ b/amber/src/test/scala/org/apache/texera/web/storage/ExecutionStateStoreSpec.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.storage + +import org.apache.texera.amber.engine.common.executionruntimestate.{ + ExecutionBreakpointStore, + ExecutionConsoleStore, + ExecutionMetadataStore, + ExecutionStatsStore +} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class ExecutionStateStoreSpec extends AnyFlatSpec with Matchers { + + "ExecutionStateStore" should "initialise each child store with the empty proto default" in { + val s = new ExecutionStateStore + s.statsStore.getState shouldBe ExecutionStatsStore() + s.metadataStore.getState shouldBe ExecutionMetadataStore() + s.consoleStore.getState shouldBe ExecutionConsoleStore() + s.breakpointStore.getState shouldBe ExecutionBreakpointStore() + s.reconfigurationStore.getState shouldBe ExecutionReconfigurationStore() + } + + // Order is part of the contract: WorkflowLifecycleManager subscribes + // to metadataStore by position-independent name, but ExecutionResultService + // iterates getAllStores when wiring diff handlers in bulk, so a re-order + // would shuffle which handler runs against which state. + "getAllStores" should "return all five stores in stats/console/breakpoint/metadata/reconfiguration order" in { + val s = new ExecutionStateStore + val stores = s.getAllStores.toList + stores should have size 5 + stores(0) should be theSameInstanceAs s.statsStore + stores(1) should be theSameInstanceAs s.consoleStore + stores(2) should be theSameInstanceAs s.breakpointStore + stores(3) should be theSameInstanceAs s.metadataStore + stores(4) should be theSameInstanceAs s.reconfigurationStore + } +} diff --git a/amber/src/test/scala/org/apache/texera/web/storage/StateStoreSpec.scala b/amber/src/test/scala/org/apache/texera/web/storage/StateStoreSpec.scala new file mode 100644 index 00000000000..ead54d8e8e2 --- /dev/null +++ b/amber/src/test/scala/org/apache/texera/web/storage/StateStoreSpec.scala @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.storage + +import org.apache.texera.web.model.websocket.event.TexeraWebSocketEvent +import org.apache.texera.web.model.websocket.response.HeartBeatResponse +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import scala.collection.mutable + +class StateStoreSpec extends AnyFlatSpec with Matchers { + + // A dummy event so diff handlers can emit something concrete that the + // websocket-event observable will carry through to subscribers. + private case class TaggedEvent(label: String) extends TexeraWebSocketEvent + + "getState" should "return the default state before any updates" in { + val store = new StateStore[Int](7) + store.getState shouldBe 7 + } + + "updateState" should "replace state with the function's result" in { + val store = new StateStore[Int](0) + store.updateState(_ + 1) + store.updateState(_ + 10) + store.getState shouldBe 11 + } + + it should "publish each new value on the state observable" in { + val store = new StateStore[String]("init") + val seen = mutable.ListBuffer.empty[String] + val sub = store.getStateObservable.subscribe(v => seen += v) + try { + store.updateState(_ => "a") + store.updateState(_ => "b") + // BehaviorSubject replays the current ("init") on subscribe, then + // delivers the two updates. + seen.toList shouldBe List("init", "a", "b") + } finally sub.dispose() + } + + "getWebsocketEventObservable" should "skip emissions when updateState returns an equal value" in { + val store = new StateStore[Int](0) + val fired = new java.util.concurrent.atomic.AtomicInteger(0) + store.registerDiffHandler { (_, _) => + fired.incrementAndGet() + Iterable.empty + } + val sub = store.getWebsocketEventObservable.subscribe(_ => ()) + try { + store.updateState(_ => 0) // no-op: filter drops, handler should not fire + store.updateState(_ => 1) // changes: handler fires once + store.updateState(_ => 1) // no-op again + fired.get() shouldBe 1 + } finally sub.dispose() + } + + it should "pass (oldState, newState) into every registered diff handler" in { + val store = new StateStore[Int](0) + val pairs = mutable.ListBuffer.empty[(Int, Int)] + store.registerDiffHandler { (oldS, newS) => + pairs += ((oldS, newS)) + Iterable.empty + } + val sub = store.getWebsocketEventObservable.subscribe(_ => ()) + try { + store.updateState(_ => 1) + store.updateState(_ + 4) + pairs.toList shouldBe List((0, 1), (1, 5)) + } finally sub.dispose() + } + + it should "flatten events from multiple diff handlers in registration order" in { + val store = new StateStore[Int](0) + store.registerDiffHandler((_, _) => Iterable(TaggedEvent("h1-a"), TaggedEvent("h1-b"))) + store.registerDiffHandler((_, _) => Iterable(TaggedEvent("h2"))) + val emitted = mutable.ListBuffer.empty[String] + val sub = store.getWebsocketEventObservable.subscribe { evts => + evts.foreach { case TaggedEvent(label) => emitted += label; case _ => () } + } + try { + store.updateState(_ + 1) + emitted.toList shouldBe List("h1-a", "h1-b", "h2") + } finally sub.dispose() + } + + "registerDiffHandler" should "return a Disposable that stops the handler from firing" in { + val store = new StateStore[Int](0) + val fired = new java.util.concurrent.atomic.AtomicInteger(0) + val handler = store.registerDiffHandler { (_, _) => + fired.incrementAndGet() + Iterable.empty + } + val sub = store.getWebsocketEventObservable.subscribe(_ => ()) + try { + store.updateState(_ + 1) + fired.get() shouldBe 1 + handler.dispose() + store.updateState(_ + 1) + fired.get() shouldBe 1 // unchanged after dispose + } finally sub.dispose() + } + + it should "tolerate double dispose without removing other handlers" in { + val store = new StateStore[Int](0) + val countA = new java.util.concurrent.atomic.AtomicInteger(0) + val countB = new java.util.concurrent.atomic.AtomicInteger(0) + val handlerA = store.registerDiffHandler { (_, _) => + countA.incrementAndGet() + Iterable.empty + } + store.registerDiffHandler { (_, _) => + countB.incrementAndGet() + Iterable.empty + } + val sub = store.getWebsocketEventObservable.subscribe(_ => ()) + try { + handlerA.dispose() + handlerA.dispose() // no-op + store.updateState(_ + 1) + countA.get() shouldBe 0 + countB.get() shouldBe 1 + } finally sub.dispose() + } + + "getWebsocketEventObservable" should "deliver events to multiple subscribers" in { + val store = new StateStore[Int](0) + store.registerDiffHandler((_, _) => Iterable(HeartBeatResponse())) + val countA = new java.util.concurrent.atomic.AtomicInteger(0) + val countB = new java.util.concurrent.atomic.AtomicInteger(0) + val subA = store.getWebsocketEventObservable.subscribe(_ => countA.incrementAndGet()) + val subB = store.getWebsocketEventObservable.subscribe(_ => countB.incrementAndGet()) + try { + store.updateState(_ + 1) + store.updateState(_ + 1) + countA.get() shouldBe 2 + countB.get() shouldBe 2 + } finally { + subA.dispose() + subB.dispose() + } + } +} diff --git a/amber/src/test/scala/org/apache/texera/web/storage/WorkflowStateStoreSpec.scala b/amber/src/test/scala/org/apache/texera/web/storage/WorkflowStateStoreSpec.scala new file mode 100644 index 00000000000..78d73ba7848 --- /dev/null +++ b/amber/src/test/scala/org/apache/texera/web/storage/WorkflowStateStoreSpec.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.storage + +import org.apache.texera.amber.core.storage.result.WorkflowResultStore +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class WorkflowStateStoreSpec extends AnyFlatSpec with Matchers { + + "WorkflowStateStore" should "initialise resultStore with an empty WorkflowResultStore" in { + val s = new WorkflowStateStore + s.resultStore.getState shouldBe WorkflowResultStore() + s.resultStore.getState.resultInfo shouldBe empty + } + + "getAllStores" should "expose resultStore and only resultStore" in { + val s = new WorkflowStateStore + val stores = s.getAllStores.toList + stores should have size 1 + stores.head should be theSameInstanceAs s.resultStore + } +}