Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.texera.web

import io.reactivex.rxjava3.disposables.Disposable
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import java.util.concurrent.atomic.AtomicInteger

class SubscriptionManagerSpec extends AnyFlatSpec with Matchers {

// Minimal concrete subject — SubscriptionManager is a trait, and the
// production mixins (SessionState, WorkflowService, ...) drag in heavy
// dependencies that aren't part of what this spec is exercising.
private class TestManager extends SubscriptionManager

// A Disposable that counts dispose() invocations so the spec can tell
// "disposed exactly once" from "disposed twice" without resorting to
// Mockito.
private class CountingDisposable extends Disposable {
private val disposed = new AtomicInteger(0)
override def dispose(): Unit = disposed.incrementAndGet()
override def isDisposed: Boolean = disposed.get() > 0
def disposeCount: Int = disposed.get()
}

"unsubscribeAll" should "dispose every added subscription in insertion order" in {
val mgr = new TestManager
val order = scala.collection.mutable.ListBuffer.empty[String]
val a = Disposable.fromAction(() => order += "a")
val b = Disposable.fromAction(() => order += "b")
val c = Disposable.fromAction(() => order += "c")
mgr.addSubscription(a)
mgr.addSubscription(b)
mgr.addSubscription(c)
mgr.unsubscribeAll()
order.toList shouldBe List("a", "b", "c")
}

it should "clear the internal buffer so a second call is a no-op" in {
val mgr = new TestManager
val d = new CountingDisposable
mgr.addSubscription(d)
mgr.unsubscribeAll()
mgr.unsubscribeAll()
d.disposeCount shouldBe 1
}

it should "do nothing when no subscriptions have been added" in {
val mgr = new TestManager
noException should be thrownBy mgr.unsubscribeAll()
}

it should "let new subscriptions accumulate after a previous unsubscribeAll" in {
val mgr = new TestManager
val first = new CountingDisposable
val second = new CountingDisposable
mgr.addSubscription(first)
mgr.unsubscribeAll()
mgr.addSubscription(second)
mgr.unsubscribeAll()
first.disposeCount shouldBe 1
second.disposeCount shouldBe 1
}
}
116 changes: 116 additions & 0 deletions amber/src/test/scala/org/apache/texera/web/WebsocketInputSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.texera.web

import org.apache.texera.web.model.websocket.request.{
HeartBeatRequest,
TexeraWebSocketRequest,
WorkflowKillRequest
}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import java.util.concurrent.atomic.AtomicReference
import scala.collection.mutable

class WebsocketInputSpec extends AnyFlatSpec with Matchers {

// Each test gets a fresh input + error sink. Throwables routed through
// `errorHandler` land in `errors` for assertion.
private def newInputWithErrorSink(): (WebsocketInput, mutable.ListBuffer[Throwable]) = {
val errors = mutable.ListBuffer.empty[Throwable]
(new WebsocketInput(errors += _), errors)
}

"subscribe" should "deliver requests whose runtime class matches T" in {
val (input, errors) = newInputWithErrorSink()
val received = mutable.ListBuffer.empty[(HeartBeatRequest, Option[Integer])]
val sub = input.subscribe[HeartBeatRequest]((req, uid) => received += ((req, uid)))
try {
val req = HeartBeatRequest()
input.onNext(req, Some(7))
received.toList shouldBe List((req, Some(7)))
errors shouldBe empty
} finally sub.dispose()
}

it should "silently drop requests that do not match T" in {
val (input, errors) = newInputWithErrorSink()
val received = mutable.ListBuffer.empty[TexeraWebSocketRequest]
val sub = input.subscribe[HeartBeatRequest]((req, _) => received += req)
try {
input.onNext(WorkflowKillRequest(), None)
input.onNext(WorkflowKillRequest(), Some(1))
received shouldBe empty
errors shouldBe empty
} finally sub.dispose()
}

it should "pass through uidOpt verbatim, including None" in {
val (input, _) = newInputWithErrorSink()
val seenUids = mutable.ListBuffer.empty[Option[Integer]]
val sub = input.subscribe[HeartBeatRequest]((_, uid) => seenUids += uid)
try {
input.onNext(HeartBeatRequest(), None)
input.onNext(HeartBeatRequest(), Some(42))
// Integer is a Java boxed type; compare via .map(_.intValue) to
// avoid hinging on Integer identity.
seenUids.map(_.map(_.intValue)).toList shouldBe List(None, Some(42))
} finally sub.dispose()
}

"subscribe" should "route exceptions thrown inside the callback to errorHandler" in {
val (input, errors) = newInputWithErrorSink()
val boom = new RuntimeException("boom")
val sub = input.subscribe[HeartBeatRequest]((_, _) => throw boom)
try {
input.onNext(HeartBeatRequest(), None)
errors.toList shouldBe List(boom)
} finally sub.dispose()
}

it should "keep delivering events to other subscribers after one callback throws" in {
val (input, errors) = newInputWithErrorSink()
val survivorCount = new AtomicReference[Int](0)
val throwingSub =
input.subscribe[HeartBeatRequest]((_, _) => throw new IllegalStateException("nope"))
val survivorSub = input.subscribe[HeartBeatRequest]((_, _) => survivorCount.updateAndGet(_ + 1))
try {
input.onNext(HeartBeatRequest(), None)
input.onNext(HeartBeatRequest(), None)
survivorCount.get() shouldBe 2
errors should have size 2
} finally {
throwingSub.dispose()
survivorSub.dispose()
}
}

it should "stop delivering events after the returned Disposable is disposed" in {
val (input, _) = newInputWithErrorSink()
val count = new AtomicReference[Int](0)
val sub = input.subscribe[HeartBeatRequest]((_, _) => count.updateAndGet(_ + 1))
input.onNext(HeartBeatRequest(), None)
sub.dispose()
input.onNext(HeartBeatRequest(), None)
input.onNext(HeartBeatRequest(), None)
count.get() shouldBe 1
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.texera.web.auth

import org.apache.texera.auth.SessionUser
import org.apache.texera.dao.jooq.generated.enums.UserRoleEnum
import org.apache.texera.dao.jooq.generated.tables.pojos.User
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class UserRoleAuthorizerSpec extends AnyFlatSpec with Matchers {

// The dropwizard Authorizer contract hands the authenticator a
// SessionUser; reproduce that with the jooq-generated POJO so the test
// exercises the same isRoleOf path the production filter does.
private def sessionFor(role: UserRoleEnum): SessionUser = {
val u = new User()
u.setRole(role)
new SessionUser(u)
}

"authorize" should "return true when the role string matches the user's role" in {
UserRoleAuthorizer.authorize(sessionFor(UserRoleEnum.ADMIN), "ADMIN") shouldBe true
UserRoleAuthorizer.authorize(sessionFor(UserRoleEnum.REGULAR), "REGULAR") shouldBe true
}

it should "return false when the user's role differs from the requested one" in {
UserRoleAuthorizer.authorize(sessionFor(UserRoleEnum.REGULAR), "ADMIN") shouldBe false
UserRoleAuthorizer.authorize(sessionFor(UserRoleEnum.INACTIVE), "REGULAR") shouldBe false
}

it should "throw IllegalArgumentException when the requested role is not a UserRoleEnum value" in {
// Bubbled up from UserRoleEnum.valueOf. Documenting the behavior so a
// future @RolesAllowed typo can't silently downgrade to "always deny".
an[IllegalArgumentException] should be thrownBy
UserRoleAuthorizer.authorize(sessionFor(UserRoleEnum.ADMIN), "SUPER_ADMIN")
}

it should "treat the role string as case-sensitive (enum names are uppercase)" in {
an[IllegalArgumentException] should be thrownBy
UserRoleAuthorizer.authorize(sessionFor(UserRoleEnum.ADMIN), "admin")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.texera.web.storage

import org.apache.texera.amber.engine.common.executionruntimestate.{
ExecutionBreakpointStore,
ExecutionConsoleStore,
ExecutionMetadataStore,
ExecutionStatsStore
}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class ExecutionStateStoreSpec extends AnyFlatSpec with Matchers {

"ExecutionStateStore" should "initialise each child store with the empty proto default" in {
val s = new ExecutionStateStore
s.statsStore.getState shouldBe ExecutionStatsStore()
s.metadataStore.getState shouldBe ExecutionMetadataStore()
s.consoleStore.getState shouldBe ExecutionConsoleStore()
s.breakpointStore.getState shouldBe ExecutionBreakpointStore()
s.reconfigurationStore.getState shouldBe ExecutionReconfigurationStore()
}

// Order is part of the contract: WorkflowLifecycleManager subscribes
// to metadataStore by position-independent name, but ExecutionResultService
// iterates getAllStores when wiring diff handlers in bulk, so a re-order
// would shuffle which handler runs against which state.
"getAllStores" should "return all five stores in stats/console/breakpoint/metadata/reconfiguration order" in {
val s = new ExecutionStateStore
val stores = s.getAllStores.toList
stores should have size 5
stores(0) should be theSameInstanceAs s.statsStore
stores(1) should be theSameInstanceAs s.consoleStore
stores(2) should be theSameInstanceAs s.breakpointStore
stores(3) should be theSameInstanceAs s.metadataStore
stores(4) should be theSameInstanceAs s.reconfigurationStore
}
}
Loading
Loading