From 2080c42e9dfbf92250ab06cde1a9f7c17061ff8e Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Sun, 3 May 2026 19:46:17 -0700 Subject: [PATCH 1/4] refactor(auth): move USER_LAST_ACTIVE_TIME write out of JwtAuthFilter JwtAuthFilter previously did a synchronous USER_LAST_ACTIVE_TIME upsert on every authenticated request, coupling JWT verification to a per- request DB round-trip and mixing user-management concerns into the auth pipeline. Strip the DB write out of the filter. The filter is now pure: extract token, verify, set SecurityContext. Add a UserActivityTracker in common/auth that throttles per-uid DB writes by an in-memory cooldown (default 5 minutes) and runs the upsert on a single-thread daemon executor so request threads never wait on DB latency. Tracker is in common/auth so the throttle/CAS logic and its tests live with the rest of auth-adjacent code. Add a UserActivityEventListener (Jersey ApplicationEventListener) in access-control-service. The listener observes RESOURCE_METHOD_FINISHED at Jersey's monitoring layer (no ContainerRequestFilter semantics, cannot reject or transform requests) and forwards uid to the tracker. Listener lives only in access-control-service: USER_LAST_ACTIVE_TIME is a user-management concern, and authenticated client sessions necessarily contact this service often (UI navigation, permission checks, LiteLLM proxy) so other services do not need to mirror this listener. Other services keep their JwtAuthFilter registration but no longer incidentally write to USER_LAST_ACTIVE_TIME. Tests: 5-case spec for tracker cooldown/CAS in common/auth, 5-case spec for listener trigger gating in access-control-service. Both run synchronously without DB or Jersey runtime via injected upsert function and Mockito-based RequestEvent stubs. Closes #4887 --- .../texera/service/AccessControlService.scala | 7 ++ .../activity/UserActivityEventListener.scala | 69 +++++++++++ .../UserActivityEventListenerSpec.scala | 112 ++++++++++++++++++ common/auth/build.sbt | 3 +- .../apache/texera/auth/JwtAuthFilter.scala | 21 +--- .../texera/auth/UserActivityTracker.scala | 105 ++++++++++++++++ .../texera/auth/UserActivityTrackerSpec.scala | 110 +++++++++++++++++ 7 files changed, 407 insertions(+), 20 deletions(-) create mode 100644 access-control-service/src/main/scala/org/apache/texera/service/activity/UserActivityEventListener.scala create mode 100644 access-control-service/src/test/scala/org/apache/texera/service/activity/UserActivityEventListenerSpec.scala create mode 100644 common/auth/src/main/scala/org/apache/texera/auth/UserActivityTracker.scala create mode 100644 common/auth/src/test/scala/org/apache/texera/auth/UserActivityTrackerSpec.scala diff --git a/access-control-service/src/main/scala/org/apache/texera/service/AccessControlService.scala b/access-control-service/src/main/scala/org/apache/texera/service/AccessControlService.scala index 0ab9f0fbfee..21d367e2bb1 100644 --- a/access-control-service/src/main/scala/org/apache/texera/service/AccessControlService.scala +++ b/access-control-service/src/main/scala/org/apache/texera/service/AccessControlService.scala @@ -26,6 +26,7 @@ import io.dropwizard.core.setup.{Bootstrap, Environment} import org.apache.texera.amber.config.StorageConfig import org.apache.texera.auth.{JwtAuthFilter, RequestLoggingFilter, SessionUser} import org.apache.texera.dao.SqlServer +import org.apache.texera.service.activity.UserActivityEventListener import org.apache.texera.service.resource.{ AccessControlResource, HealthCheckResource, @@ -77,6 +78,12 @@ class AccessControlService extends Application[AccessControlServiceConfiguration new io.dropwizard.auth.AuthValueFactoryProvider.Binder(classOf[SessionUser]) ) + // Record USER_LAST_ACTIVE_TIME on every matched, completed request. + // Lives only in this service because authenticated client sessions + // contact access-control-service often enough to capture activity + // with high recall. + environment.jersey.register(new UserActivityEventListener()) + // Route request logs through SLF4J, controlled by TEXERA_SERVICE_LOG_LEVEL RequestLoggingFilter.register(environment.getApplicationContext) } diff --git a/access-control-service/src/main/scala/org/apache/texera/service/activity/UserActivityEventListener.scala b/access-control-service/src/main/scala/org/apache/texera/service/activity/UserActivityEventListener.scala new file mode 100644 index 00000000000..c2ede9f2376 --- /dev/null +++ b/access-control-service/src/main/scala/org/apache/texera/service/activity/UserActivityEventListener.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.service.activity + +import jakarta.ws.rs.ext.Provider +import org.apache.texera.auth.{SessionUser, UserActivityTracker} +import org.glassfish.jersey.server.monitoring.{ + ApplicationEvent, + ApplicationEventListener, + RequestEvent, + RequestEventListener +} + +/** Records user activity (USER_LAST_ACTIVE_TIME) once per matched, completed + * request. Intentionally NOT a ContainerRequestFilter: + * + * - It cannot reject or transform a request — it only observes. + * - It runs at Jersey's monitoring layer, not the auth pipeline, so + * activity tracking is decoupled from authentication concerns. + * - It listens for RESOURCE_METHOD_FINISHED only, so requests that + * fail before reaching a handler (no auth, 404, 4xx in earlier + * filters) do not count as user activity. + * + * The DB write itself is throttled per-uid by [[UserActivityTracker]]. + * + * Lives in access-control-service because USER_LAST_ACTIVE_TIME is a + * user-management concern; the assumption is that any authenticated + * client session contacts this service often enough (UI navigation, + * permission checks, LiteLLM proxy) to capture activity with high + * recall, so other services do not need to mirror this listener. + */ +@Provider +class UserActivityEventListener(track: Integer => Unit = UserActivityTracker.markActive) + extends ApplicationEventListener { + + override def onEvent(event: ApplicationEvent): Unit = () + + override def onRequest(requestEvent: RequestEvent): RequestEventListener = + new RequestEventListener { + override def onEvent(event: RequestEvent): Unit = { + if (event.getType == RequestEvent.Type.RESOURCE_METHOD_FINISHED) { + val sc = event.getContainerRequest.getSecurityContext + if (sc != null) { + sc.getUserPrincipal match { + case u: SessionUser if u.getUid != null => track(u.getUid) + case _ => + } + } + } + } + } +} diff --git a/access-control-service/src/test/scala/org/apache/texera/service/activity/UserActivityEventListenerSpec.scala b/access-control-service/src/test/scala/org/apache/texera/service/activity/UserActivityEventListenerSpec.scala new file mode 100644 index 00000000000..75dbd9dabb7 --- /dev/null +++ b/access-control-service/src/test/scala/org/apache/texera/service/activity/UserActivityEventListenerSpec.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.service.activity + +import jakarta.ws.rs.core.SecurityContext +import org.apache.texera.auth.SessionUser +import org.apache.texera.dao.jooq.generated.enums.UserRoleEnum +import org.apache.texera.dao.jooq.generated.tables.pojos.User +import org.glassfish.jersey.server.ContainerRequest +import org.glassfish.jersey.server.monitoring.RequestEvent +import org.mockito.Mockito.{mock, when} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import java.security.Principal +import java.util.concurrent.ConcurrentLinkedQueue + +class UserActivityEventListenerSpec extends AnyFlatSpec with Matchers { + + private def sessionUser(uid: Integer): SessionUser = { + val u = new User(uid, "u", null, null, null, null, UserRoleEnum.REGULAR, null, null, null, null) + new SessionUser(u) + } + + private def buildEvent(eventType: RequestEvent.Type, sc: SecurityContext): RequestEvent = { + val req = mock(classOf[ContainerRequest]) + when(req.getSecurityContext).thenReturn(sc) + val event = mock(classOf[RequestEvent]) + when(event.getType).thenReturn(eventType) + when(event.getContainerRequest).thenReturn(req) + event + } + + private def buildSecurityContext(principal: Principal): SecurityContext = { + val sc = mock(classOf[SecurityContext]) + when(sc.getUserPrincipal).thenReturn(principal) + sc + } + + private def setup() = { + val recorded = new ConcurrentLinkedQueue[Integer]() + val listener = new UserActivityEventListener(uid => { recorded.add(uid); () }) + val rel = listener.onRequest(mock(classOf[RequestEvent])) + (rel, recorded) + } + + "UserActivityEventListener" should "invoke the tracker on RESOURCE_METHOD_FINISHED with a SessionUser principal" in { + val (rel, recorded) = setup() + rel.onEvent( + buildEvent( + RequestEvent.Type.RESOURCE_METHOD_FINISHED, + buildSecurityContext(sessionUser(42)) + ) + ) + recorded.size shouldBe 1 + recorded.peek() shouldBe 42 + } + + it should "ignore RequestEvent types other than RESOURCE_METHOD_FINISHED" in { + val (rel, recorded) = setup() + val sc = buildSecurityContext(sessionUser(42)) + rel.onEvent(buildEvent(RequestEvent.Type.START, sc)) + rel.onEvent(buildEvent(RequestEvent.Type.RESOURCE_METHOD_START, sc)) + rel.onEvent(buildEvent(RequestEvent.Type.FINISHED, sc)) + recorded.isEmpty shouldBe true + } + + it should "ignore non-SessionUser principals" in { + val (rel, recorded) = setup() + val anon: Principal = new Principal { + override def getName: String = "anon" + } + rel.onEvent( + buildEvent(RequestEvent.Type.RESOURCE_METHOD_FINISHED, buildSecurityContext(anon)) + ) + recorded.isEmpty shouldBe true + } + + it should "ignore SessionUser with null uid" in { + val (rel, recorded) = setup() + rel.onEvent( + buildEvent( + RequestEvent.Type.RESOURCE_METHOD_FINISHED, + buildSecurityContext(sessionUser(null)) + ) + ) + recorded.isEmpty shouldBe true + } + + it should "ignore null SecurityContext" in { + val (rel, recorded) = setup() + rel.onEvent(buildEvent(RequestEvent.Type.RESOURCE_METHOD_FINISHED, null)) + recorded.isEmpty shouldBe true + } +} diff --git a/common/auth/build.sbt b/common/auth/build.sbt index ce8d2243a1a..a33da64fea5 100644 --- a/common/auth/build.sbt +++ b/common/auth/build.sbt @@ -58,5 +58,6 @@ libraryDependencies ++= Seq( "org.bitbucket.b_c" % "jose4j" % "0.9.6", // for jwt parser "jakarta.ws.rs" % "jakarta.ws.rs-api" % "3.0.0", // for JwtAuthFilter "jakarta.servlet" % "jakarta.servlet-api" % "5.0.0" % "provided", // for RequestLoggingFilter - "org.eclipse.jetty" % "jetty-servlet" % "11.0.24" % "provided" // for FilterHolder + "org.eclipse.jetty" % "jetty-servlet" % "11.0.24" % "provided", // for FilterHolder + "org.scalatest" %% "scalatest" % "3.2.17" % Test ) \ No newline at end of file diff --git a/common/auth/src/main/scala/org/apache/texera/auth/JwtAuthFilter.scala b/common/auth/src/main/scala/org/apache/texera/auth/JwtAuthFilter.scala index ed9615b451a..56985156302 100644 --- a/common/auth/src/main/scala/org/apache/texera/auth/JwtAuthFilter.scala +++ b/common/auth/src/main/scala/org/apache/texera/auth/JwtAuthFilter.scala @@ -20,23 +20,16 @@ package org.apache.texera.auth import com.typesafe.scalalogging.LazyLogging -import jakarta.ws.rs.container.{ContainerRequestContext, ContainerRequestFilter, ResourceInfo} -import jakarta.ws.rs.core.{Context, HttpHeaders, SecurityContext} +import jakarta.ws.rs.container.{ContainerRequestContext, ContainerRequestFilter} +import jakarta.ws.rs.core.{HttpHeaders, SecurityContext} import jakarta.ws.rs.ext.Provider -import org.apache.texera.dao.SqlServer -import org.apache.texera.dao.jooq.generated.Tables.USER_LAST_ACTIVE_TIME import org.apache.texera.dao.jooq.generated.enums.UserRoleEnum import java.security.Principal -import java.time.OffsetDateTime @Provider class JwtAuthFilter extends ContainerRequestFilter with LazyLogging { - @Context - private var resourceInfo: ResourceInfo = _ - private def ctx = SqlServer.getInstance().createDSLContext() - override def filter(requestContext: ContainerRequestContext): Unit = { val authHeader = requestContext.getHeaderString(HttpHeaders.AUTHORIZATION) @@ -46,16 +39,6 @@ class JwtAuthFilter extends ContainerRequestFilter with LazyLogging { if (userOpt.isPresent) { val user = userOpt.get() - - ctx - .insertInto(USER_LAST_ACTIVE_TIME) - .set(USER_LAST_ACTIVE_TIME.UID, user.getUid) - .set(USER_LAST_ACTIVE_TIME.LAST_ACTIVE_TIME, OffsetDateTime.now()) - .onConflict(USER_LAST_ACTIVE_TIME.UID) // conflict on primary key uid - .doUpdate() - .set(USER_LAST_ACTIVE_TIME.LAST_ACTIVE_TIME, OffsetDateTime.now()) - .execute() - requestContext.setSecurityContext(new SecurityContext { override def getUserPrincipal: Principal = user override def isUserInRole(role: String): Boolean = diff --git a/common/auth/src/main/scala/org/apache/texera/auth/UserActivityTracker.scala b/common/auth/src/main/scala/org/apache/texera/auth/UserActivityTracker.scala new file mode 100644 index 00000000000..d53697b679e --- /dev/null +++ b/common/auth/src/main/scala/org/apache/texera/auth/UserActivityTracker.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.auth + +import com.typesafe.scalalogging.LazyLogging +import org.apache.texera.dao.SqlServer +import org.apache.texera.dao.jooq.generated.Tables.USER_LAST_ACTIVE_TIME + +import java.time.{Duration, Instant, OffsetDateTime, ZoneOffset} +import java.util.concurrent.{ConcurrentHashMap, Executor, Executors} + +/** Per-uid activity timestamp recorder. The actual DB upsert is throttled + * by a per-uid in-memory cooldown so that a user hitting the API at high + * RPS produces at most one USER_LAST_ACTIVE_TIME write per + * `writeInterval`. The upsert itself runs on the supplied `executor` so + * request threads never wait on DB latency. + * + * Class form (with injectable upsert / executor / clock) exists so the + * cooldown/CAS logic can be unit-tested without a DB. The companion + * object [[UserActivityTracker]] is the production singleton. + */ +class UserActivityTracker( + writeInterval: Duration, + upsertFn: (Integer, Instant) => Unit, + executor: Executor, + clock: () => Instant +) { + private val lastClaimed = new ConcurrentHashMap[Integer, Instant]() + + /** Record the user as active. Lock-free; performs at most one upsert per + * uid per `writeInterval`. Safe to call from any thread. + */ + def markActive(uid: Integer): Unit = { + if (uid == null) return + val now = clock() + val prev = lastClaimed.get(uid) + if (prev != null && Duration.between(prev, now).compareTo(writeInterval) < 0) return + + // CAS to claim the write slot for this uid. If another thread won the + // race, drop this call. + val claimed = + if (prev == null) lastClaimed.putIfAbsent(uid, now) == null + else lastClaimed.replace(uid, prev, now) + if (!claimed) return + + executor.execute(() => upsertFn(uid, now)) + } +} + +object UserActivityTracker extends LazyLogging { + + private val WRITE_INTERVAL: Duration = Duration.ofMinutes(5) + + private val writer: Executor = Executors.newSingleThreadExecutor((r: Runnable) => { + val t = new Thread(r, "user-activity-writer") + t.setDaemon(true) + t + }) + + private val instance = new UserActivityTracker( + WRITE_INTERVAL, + defaultUpsert, + writer, + () => Instant.now() + ) + + /** Production entry point. Delegates to the singleton tracker. */ + def markActive(uid: Integer): Unit = instance.markActive(uid) + + private def defaultUpsert(uid: Integer, ts: Instant): Unit = { + try { + val ctx = SqlServer.getInstance().createDSLContext() + val odt = OffsetDateTime.ofInstant(ts, ZoneOffset.UTC) + ctx + .insertInto(USER_LAST_ACTIVE_TIME) + .set(USER_LAST_ACTIVE_TIME.UID, uid) + .set(USER_LAST_ACTIVE_TIME.LAST_ACTIVE_TIME, odt) + .onConflict(USER_LAST_ACTIVE_TIME.UID) + .doUpdate() + .set(USER_LAST_ACTIVE_TIME.LAST_ACTIVE_TIME, odt) + .execute() + } catch { + case e: Throwable => + // Tracking is best-effort; never propagate failures. + logger.warn(s"USER_LAST_ACTIVE_TIME upsert for uid=$uid failed: ${e.getMessage}") + } + } +} diff --git a/common/auth/src/test/scala/org/apache/texera/auth/UserActivityTrackerSpec.scala b/common/auth/src/test/scala/org/apache/texera/auth/UserActivityTrackerSpec.scala new file mode 100644 index 00000000000..039b8a3d172 --- /dev/null +++ b/common/auth/src/test/scala/org/apache/texera/auth/UserActivityTrackerSpec.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.auth + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import java.time.{Duration, Instant} +import java.util.concurrent.{ConcurrentLinkedQueue, Executor} +import java.util.concurrent.atomic.AtomicReference + +class UserActivityTrackerSpec extends AnyFlatSpec with Matchers { + + // Synchronous executor: runnable runs on the calling thread, so the + // test can observe upsert invocations deterministically. + private val sameThread: Executor = (cmd: Runnable) => cmd.run() + + private class Recorder { + val calls = new ConcurrentLinkedQueue[(Integer, Instant)]() + def upsert(uid: Integer, ts: Instant): Unit = { calls.add((uid, ts)); () } + } + + private def makeTracker( + writeInterval: Duration, + recorder: Recorder, + clock: AtomicReference[Instant] + ) = + new UserActivityTracker(writeInterval, recorder.upsert, sameThread, () => clock.get()) + + "UserActivityTracker" should "trigger an upsert on the first call for a uid" in { + val recorder = new Recorder + val now = Instant.parse("2026-01-01T00:00:00Z") + val clock = new AtomicReference[Instant](now) + val tracker = makeTracker(Duration.ofMinutes(5), recorder, clock) + + tracker.markActive(42) + + recorder.calls.size shouldBe 1 + val (uid, ts) = recorder.calls.peek() + uid shouldBe 42 + ts shouldBe now + } + + it should "skip upserts within the cooldown window" in { + val recorder = new Recorder + val t0 = Instant.parse("2026-01-01T00:00:00Z") + val clock = new AtomicReference[Instant](t0) + val tracker = makeTracker(Duration.ofMinutes(5), recorder, clock) + + tracker.markActive(42) + clock.set(t0.plus(Duration.ofMinutes(2))) + tracker.markActive(42) + clock.set(t0.plus(Duration.ofMinutes(4).plusSeconds(59))) + tracker.markActive(42) + + recorder.calls.size shouldBe 1 + } + + it should "fire another upsert once the cooldown elapses" in { + val recorder = new Recorder + val t0 = Instant.parse("2026-01-01T00:00:00Z") + val clock = new AtomicReference[Instant](t0) + val tracker = makeTracker(Duration.ofMinutes(5), recorder, clock) + + tracker.markActive(42) + clock.set(t0.plus(Duration.ofMinutes(5))) + tracker.markActive(42) + + recorder.calls.size shouldBe 2 + } + + it should "track different uids independently" in { + val recorder = new Recorder + val clock = new AtomicReference[Instant](Instant.parse("2026-01-01T00:00:00Z")) + val tracker = makeTracker(Duration.ofMinutes(5), recorder, clock) + + tracker.markActive(1) + tracker.markActive(2) + tracker.markActive(3) + + recorder.calls.size shouldBe 3 + } + + it should "treat null uid as a no-op" in { + val recorder = new Recorder + val clock = new AtomicReference[Instant](Instant.parse("2026-01-01T00:00:00Z")) + val tracker = makeTracker(Duration.ofMinutes(5), recorder, clock) + + tracker.markActive(null) + + recorder.calls.size shouldBe 0 + } +} From b07feef42328752898cdeb017d6db6723bd685c3 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Sun, 3 May 2026 20:05:23 -0700 Subject: [PATCH 2/4] fixup: add coverage for ApplicationEvent no-op and default-arg ctor --- .../UserActivityEventListenerSpec.scala | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/access-control-service/src/test/scala/org/apache/texera/service/activity/UserActivityEventListenerSpec.scala b/access-control-service/src/test/scala/org/apache/texera/service/activity/UserActivityEventListenerSpec.scala index 75dbd9dabb7..f0f38d4bd59 100644 --- a/access-control-service/src/test/scala/org/apache/texera/service/activity/UserActivityEventListenerSpec.scala +++ b/access-control-service/src/test/scala/org/apache/texera/service/activity/UserActivityEventListenerSpec.scala @@ -24,7 +24,7 @@ import org.apache.texera.auth.SessionUser import org.apache.texera.dao.jooq.generated.enums.UserRoleEnum import org.apache.texera.dao.jooq.generated.tables.pojos.User import org.glassfish.jersey.server.ContainerRequest -import org.glassfish.jersey.server.monitoring.RequestEvent +import org.glassfish.jersey.server.monitoring.{ApplicationEvent, RequestEvent} import org.mockito.Mockito.{mock, when} import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers @@ -109,4 +109,19 @@ class UserActivityEventListenerSpec extends AnyFlatSpec with Matchers { rel.onEvent(buildEvent(RequestEvent.Type.RESOURCE_METHOD_FINISHED, null)) recorded.isEmpty shouldBe true } + + it should "no-op on ApplicationEvent (lifecycle hook unused)" in { + val recorded = new ConcurrentLinkedQueue[Integer]() + val listener = new UserActivityEventListener(uid => { recorded.add(uid); () }) + val appEvent = mock(classOf[ApplicationEvent]) + listener.onEvent(appEvent) + recorded.isEmpty shouldBe true + } + + it should "construct with the default tracker without invoking it" in { + // Default-arg path: new UserActivityEventListener() resolves track to + // UserActivityTracker.markActive but does not call it (the listener + // only calls track when a matched SessionUser arrives). + new UserActivityEventListener() should not be null + } } From d8dae22d7e0573419a0a9cb14a82fa1890395e36 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Sun, 3 May 2026 20:09:02 -0700 Subject: [PATCH 3/4] fixup: address review comments on UserActivityTracker - Wrap upsertFn invocation inside the executor task in try/NonFatal so injected implementations cannot leak exceptions, with full stack trace logged. - Replace catch Throwable with NonFatal in defaultUpsert so OOME and the like still propagate, and pass the exception to logger to keep the stack trace. - Add evictStale() with TTL cleanup at 2 * writeInterval; schedule a daemon ScheduledExecutorService in the production singleton to run it once per WRITE_INTERVAL. Bounds lastClaimed for long-lived processes with many distinct uids. - Replace newSingleThreadExecutor (unbounded queue) with a bounded ThreadPoolExecutor (capacity 256) and DiscardOldestPolicy. Under DB stalls or write storms, oldest pending tasks are dropped; the next request from the same uid will re-claim and re-write once cooldown elapses. Adds two tests: evictStale eviction window, and exception swallowing on a throwing upsertFn. --- .../texera/auth/UserActivityTracker.scala | 136 +++++++++++++----- .../texera/auth/UserActivityTrackerSpec.scala | 33 +++++ 2 files changed, 133 insertions(+), 36 deletions(-) diff --git a/common/auth/src/main/scala/org/apache/texera/auth/UserActivityTracker.scala b/common/auth/src/main/scala/org/apache/texera/auth/UserActivityTracker.scala index d53697b679e..d3979322893 100644 --- a/common/auth/src/main/scala/org/apache/texera/auth/UserActivityTracker.scala +++ b/common/auth/src/main/scala/org/apache/texera/auth/UserActivityTracker.scala @@ -24,7 +24,17 @@ import org.apache.texera.dao.SqlServer import org.apache.texera.dao.jooq.generated.Tables.USER_LAST_ACTIVE_TIME import java.time.{Duration, Instant, OffsetDateTime, ZoneOffset} -import java.util.concurrent.{ConcurrentHashMap, Executor, Executors} +import java.util.concurrent.{ + ArrayBlockingQueue, + ConcurrentHashMap, + Executor, + Executors, + ScheduledExecutorService, + ThreadFactory, + ThreadPoolExecutor, + TimeUnit +} +import scala.util.control.NonFatal /** Per-uid activity timestamp recorder. The actual DB upsert is throttled * by a per-uid in-memory cooldown so that a user hitting the API at high @@ -41,38 +51,81 @@ class UserActivityTracker( upsertFn: (Integer, Instant) => Unit, executor: Executor, clock: () => Instant -) { +) extends LazyLogging { + private val lastClaimed = new ConcurrentHashMap[Integer, Instant]() + // Eviction window: an entry is stale once 2*writeInterval has passed + // since its last claim. The factor keeps in-cooldown entries safe from + // eviction while still bounding `lastClaimed` for users who have gone + // away. + private val staleAfter: Duration = writeInterval.multipliedBy(2) + /** Record the user as active. Lock-free; performs at most one upsert per - * uid per `writeInterval`. Safe to call from any thread. + * uid per `writeInterval`. Never propagates failures to the caller. */ def markActive(uid: Integer): Unit = { if (uid == null) return - val now = clock() - val prev = lastClaimed.get(uid) - if (prev != null && Duration.between(prev, now).compareTo(writeInterval) < 0) return - - // CAS to claim the write slot for this uid. If another thread won the - // race, drop this call. - val claimed = - if (prev == null) lastClaimed.putIfAbsent(uid, now) == null - else lastClaimed.replace(uid, prev, now) - if (!claimed) return - - executor.execute(() => upsertFn(uid, now)) + try { + val now = clock() + val prev = lastClaimed.get(uid) + if (prev != null && Duration.between(prev, now).compareTo(writeInterval) < 0) return + + // CAS to claim the write slot for this uid. If another thread won + // the race, drop this call. + val claimed = + if (prev == null) lastClaimed.putIfAbsent(uid, now) == null + else lastClaimed.replace(uid, prev, now) + if (!claimed) return + + executor.execute(() => + try upsertFn(uid, now) + catch { + case NonFatal(e) => + logger.warn(s"User activity upsert failed (uid=$uid)", e) + } + ) + } catch { + case NonFatal(e) => + logger.warn(s"markActive failed (uid=$uid)", e) + } } + + /** Drop entries whose last-claimed time is older than `2 * writeInterval`. + * Bounds `lastClaimed` for long-lived processes with many distinct uids. + * Safe to call concurrently with [[markActive]]. + */ + def evictStale(): Unit = { + try { + val cutoff = clock().minus(staleAfter) + lastClaimed.entrySet().removeIf(e => e.getValue.isBefore(cutoff)) + } catch { + case NonFatal(e) => logger.warn("evictStale failed", e) + } + } + + /** Visible for tests. */ + private[auth] def cooldownSize: Int = lastClaimed.size() } object UserActivityTracker extends LazyLogging { private val WRITE_INTERVAL: Duration = Duration.ofMinutes(5) + // Bounded queue: under DB stalls or write storms, oldest pending tasks + // are dropped (DiscardOldest). The next request from the same uid will + // re-claim and re-write once cooldown elapses, so dropping a stale + // pending write does not lose the activity signal long-term. + private val WRITER_QUEUE_CAPACITY = 256 - private val writer: Executor = Executors.newSingleThreadExecutor((r: Runnable) => { - val t = new Thread(r, "user-activity-writer") - t.setDaemon(true) - t - }) + private val writer: Executor = new ThreadPoolExecutor( + 1, + 1, + 0L, + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue[Runnable](WRITER_QUEUE_CAPACITY), + daemonThreadFactory("user-activity-writer"), + new ThreadPoolExecutor.DiscardOldestPolicy + ) private val instance = new UserActivityTracker( WRITE_INTERVAL, @@ -81,25 +134,36 @@ object UserActivityTracker extends LazyLogging { () => Instant.now() ) + // Periodic eviction of stale uid entries, running once per WRITE_INTERVAL. + private val cleanup: ScheduledExecutorService = + Executors.newSingleThreadScheduledExecutor(daemonThreadFactory("user-activity-cleanup")) + cleanup.scheduleAtFixedRate( + () => instance.evictStale(), + WRITE_INTERVAL.toMillis, + WRITE_INTERVAL.toMillis, + TimeUnit.MILLISECONDS + ) + /** Production entry point. Delegates to the singleton tracker. */ def markActive(uid: Integer): Unit = instance.markActive(uid) private def defaultUpsert(uid: Integer, ts: Instant): Unit = { - try { - val ctx = SqlServer.getInstance().createDSLContext() - val odt = OffsetDateTime.ofInstant(ts, ZoneOffset.UTC) - ctx - .insertInto(USER_LAST_ACTIVE_TIME) - .set(USER_LAST_ACTIVE_TIME.UID, uid) - .set(USER_LAST_ACTIVE_TIME.LAST_ACTIVE_TIME, odt) - .onConflict(USER_LAST_ACTIVE_TIME.UID) - .doUpdate() - .set(USER_LAST_ACTIVE_TIME.LAST_ACTIVE_TIME, odt) - .execute() - } catch { - case e: Throwable => - // Tracking is best-effort; never propagate failures. - logger.warn(s"USER_LAST_ACTIVE_TIME upsert for uid=$uid failed: ${e.getMessage}") - } + val ctx = SqlServer.getInstance().createDSLContext() + val odt = OffsetDateTime.ofInstant(ts, ZoneOffset.UTC) + ctx + .insertInto(USER_LAST_ACTIVE_TIME) + .set(USER_LAST_ACTIVE_TIME.UID, uid) + .set(USER_LAST_ACTIVE_TIME.LAST_ACTIVE_TIME, odt) + .onConflict(USER_LAST_ACTIVE_TIME.UID) + .doUpdate() + .set(USER_LAST_ACTIVE_TIME.LAST_ACTIVE_TIME, odt) + .execute() } + + private def daemonThreadFactory(name: String): ThreadFactory = + (r: Runnable) => { + val t = new Thread(r, name) + t.setDaemon(true) + t + } } diff --git a/common/auth/src/test/scala/org/apache/texera/auth/UserActivityTrackerSpec.scala b/common/auth/src/test/scala/org/apache/texera/auth/UserActivityTrackerSpec.scala index 039b8a3d172..c8ad6063014 100644 --- a/common/auth/src/test/scala/org/apache/texera/auth/UserActivityTrackerSpec.scala +++ b/common/auth/src/test/scala/org/apache/texera/auth/UserActivityTrackerSpec.scala @@ -107,4 +107,37 @@ class UserActivityTrackerSpec extends AnyFlatSpec with Matchers { recorder.calls.size shouldBe 0 } + + it should "evict cooldown entries older than 2 * writeInterval" in { + val recorder = new Recorder + val t0 = Instant.parse("2026-01-01T00:00:00Z") + val clock = new AtomicReference[Instant](t0) + val tracker = makeTracker(Duration.ofMinutes(5), recorder, clock) + + tracker.markActive(1) + tracker.markActive(2) + tracker.cooldownSize shouldBe 2 + + // 9 minutes — under 2 * writeInterval (10), nothing evicted + clock.set(t0.plus(Duration.ofMinutes(9))) + tracker.evictStale() + tracker.cooldownSize shouldBe 2 + + // 11 minutes — past 2 * writeInterval, both entries evicted + clock.set(t0.plus(Duration.ofMinutes(11))) + tracker.evictStale() + tracker.cooldownSize shouldBe 0 + } + + it should "swallow upsertFn exceptions instead of propagating to the caller" in { + val t0 = Instant.parse("2026-01-01T00:00:00Z") + val clock = new AtomicReference[Instant](t0) + val throwing: (Integer, Instant) => Unit = + (_, _) => throw new RuntimeException("simulated DB outage") + val tracker = + new UserActivityTracker(Duration.ofMinutes(5), throwing, sameThread, () => clock.get()) + + // Must not throw — the wrapper catches NonFatal from upsertFn. + noException should be thrownBy tracker.markActive(42) + } } From 561d462462fee6850da2600a4f3e6a7944cf6ddc Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Sun, 3 May 2026 20:34:48 -0700 Subject: [PATCH 4/4] fixup: lift listener handling into a top-level method, add run() spec MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Refactor UserActivityEventListener: - Inner anonymous RequestEventListener becomes a SAM-converted lambda, removing the anonymous-class boilerplate that jacoco was reporting branch-partials on. - Per-event branching moves to a top-level companion method `handle`, visible to tests so the per-event logic is exercised without going through the SAM dispatch. - Replace `event.getType == ...` with `event.getType eq ...`. The Type is a Java enum (singletons), so reference equality is semantically correct and compiles to a single if_acmpne, avoiding Scala's BoxesRunTime.equals branch fan-out (the partial-coverage noise on the comparison line). Tests: - UserActivityEventListenerSpec rewritten to drive `handle` directly for the per-event cases (5), plus 3 listener-level smoke tests for the SAM dispatch / lifecycle / default-arg ctor paths. - New AccessControlServiceRunSpec verifies UserActivityEventListener is registered when run() is invoked. Mocks Environment, so this also incidentally covers the rest of run() — no longer at 0%. --- .../activity/UserActivityEventListener.scala | 34 +++++--- .../service/AccessControlServiceRunSpec.scala | 49 ++++++++++++ .../UserActivityEventListenerSpec.scala | 79 +++++++++++-------- 3 files changed, 115 insertions(+), 47 deletions(-) create mode 100644 access-control-service/src/test/scala/org/apache/texera/service/AccessControlServiceRunSpec.scala diff --git a/access-control-service/src/main/scala/org/apache/texera/service/activity/UserActivityEventListener.scala b/access-control-service/src/main/scala/org/apache/texera/service/activity/UserActivityEventListener.scala index c2ede9f2376..e5fa785f53e 100644 --- a/access-control-service/src/main/scala/org/apache/texera/service/activity/UserActivityEventListener.scala +++ b/access-control-service/src/main/scala/org/apache/texera/service/activity/UserActivityEventListener.scala @@ -52,18 +52,28 @@ class UserActivityEventListener(track: Integer => Unit = UserActivityTracker.mar override def onEvent(event: ApplicationEvent): Unit = () + // SAM-converted lambda: avoids an inner anonymous class so coverage + // tooling sees a flat method body. Logic lives in the companion's + // `handle` so tests can drive it directly. override def onRequest(requestEvent: RequestEvent): RequestEventListener = - new RequestEventListener { - override def onEvent(event: RequestEvent): Unit = { - if (event.getType == RequestEvent.Type.RESOURCE_METHOD_FINISHED) { - val sc = event.getContainerRequest.getSecurityContext - if (sc != null) { - sc.getUserPrincipal match { - case u: SessionUser if u.getUid != null => track(u.getUid) - case _ => - } - } - } - } + (event: RequestEvent) => UserActivityEventListener.handle(event, track) +} + +object UserActivityEventListener { + + /** Process a single Jersey request event. Public-package for tests so the + * per-request branching is exercised without a Jersey runtime. + */ + private[activity] def handle(event: RequestEvent, track: Integer => Unit): Unit = { + // `eq` (reference equality) is correct here because Type is a Java enum + // — its constants are singletons. It also compiles to a single + // `if_acmpne`, sidestepping Scala's BoxesRunTime.equals branch fan-out. + if (!(event.getType eq RequestEvent.Type.RESOURCE_METHOD_FINISHED)) return + val sc = event.getContainerRequest.getSecurityContext + if (sc == null) return + sc.getUserPrincipal match { + case u: SessionUser if u.getUid != null => track(u.getUid) + case _ => } + } } diff --git a/access-control-service/src/test/scala/org/apache/texera/service/AccessControlServiceRunSpec.scala b/access-control-service/src/test/scala/org/apache/texera/service/AccessControlServiceRunSpec.scala new file mode 100644 index 00000000000..89979ee816f --- /dev/null +++ b/access-control-service/src/test/scala/org/apache/texera/service/AccessControlServiceRunSpec.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.service + +import io.dropwizard.core.setup.Environment +import io.dropwizard.jersey.setup.JerseyEnvironment +import io.dropwizard.jetty.MutableServletContextHandler +import io.dropwizard.jetty.setup.ServletEnvironment +import org.apache.texera.service.activity.UserActivityEventListener +import org.mockito.ArgumentMatchers.isA +import org.mockito.Mockito.{mock, verify, when} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class AccessControlServiceRunSpec extends AnyFlatSpec with Matchers { + + "AccessControlService.run" should "register UserActivityEventListener on the Jersey environment" in { + val jersey = mock(classOf[JerseyEnvironment]) + val servlets = mock(classOf[ServletEnvironment]) + val context = mock(classOf[MutableServletContextHandler]) + val env = mock(classOf[Environment]) + when(env.jersey).thenReturn(jersey) + when(env.servlets).thenReturn(servlets) + when(env.getApplicationContext).thenReturn(context) + + val service = new AccessControlService + service.run(mock(classOf[AccessControlServiceConfiguration]), env) + + verify(jersey).register(isA(classOf[UserActivityEventListener])) + verify(jersey).setUrlPattern("/api/*") + } +} diff --git a/access-control-service/src/test/scala/org/apache/texera/service/activity/UserActivityEventListenerSpec.scala b/access-control-service/src/test/scala/org/apache/texera/service/activity/UserActivityEventListenerSpec.scala index f0f38d4bd59..3d99f4e7fb2 100644 --- a/access-control-service/src/test/scala/org/apache/texera/service/activity/UserActivityEventListenerSpec.scala +++ b/access-control-service/src/test/scala/org/apache/texera/service/activity/UserActivityEventListenerSpec.scala @@ -54,74 +54,83 @@ class UserActivityEventListenerSpec extends AnyFlatSpec with Matchers { sc } - private def setup() = { - val recorded = new ConcurrentLinkedQueue[Integer]() - val listener = new UserActivityEventListener(uid => { recorded.add(uid); () }) - val rel = listener.onRequest(mock(classOf[RequestEvent])) - (rel, recorded) - } + private def newRecorder(): ConcurrentLinkedQueue[Integer] = new ConcurrentLinkedQueue[Integer]() + private def trackTo(q: ConcurrentLinkedQueue[Integer]): Integer => Unit = + uid => { q.add(uid); () } - "UserActivityEventListener" should "invoke the tracker on RESOURCE_METHOD_FINISHED with a SessionUser principal" in { - val (rel, recorded) = setup() - rel.onEvent( - buildEvent( - RequestEvent.Type.RESOURCE_METHOD_FINISHED, - buildSecurityContext(sessionUser(42)) - ) + "UserActivityEventListener.handle" should "invoke the tracker on RESOURCE_METHOD_FINISHED with a SessionUser principal" in { + val recorded = newRecorder() + UserActivityEventListener.handle( + buildEvent(RequestEvent.Type.RESOURCE_METHOD_FINISHED, buildSecurityContext(sessionUser(42))), + trackTo(recorded) ) recorded.size shouldBe 1 recorded.peek() shouldBe 42 } it should "ignore RequestEvent types other than RESOURCE_METHOD_FINISHED" in { - val (rel, recorded) = setup() + val recorded = newRecorder() val sc = buildSecurityContext(sessionUser(42)) - rel.onEvent(buildEvent(RequestEvent.Type.START, sc)) - rel.onEvent(buildEvent(RequestEvent.Type.RESOURCE_METHOD_START, sc)) - rel.onEvent(buildEvent(RequestEvent.Type.FINISHED, sc)) + UserActivityEventListener.handle(buildEvent(RequestEvent.Type.START, sc), trackTo(recorded)) + UserActivityEventListener.handle( + buildEvent(RequestEvent.Type.RESOURCE_METHOD_START, sc), + trackTo(recorded) + ) + UserActivityEventListener.handle(buildEvent(RequestEvent.Type.FINISHED, sc), trackTo(recorded)) recorded.isEmpty shouldBe true } it should "ignore non-SessionUser principals" in { - val (rel, recorded) = setup() - val anon: Principal = new Principal { - override def getName: String = "anon" - } - rel.onEvent( - buildEvent(RequestEvent.Type.RESOURCE_METHOD_FINISHED, buildSecurityContext(anon)) + val recorded = newRecorder() + val anon: Principal = new Principal { override def getName: String = "anon" } + UserActivityEventListener.handle( + buildEvent(RequestEvent.Type.RESOURCE_METHOD_FINISHED, buildSecurityContext(anon)), + trackTo(recorded) ) recorded.isEmpty shouldBe true } it should "ignore SessionUser with null uid" in { - val (rel, recorded) = setup() - rel.onEvent( + val recorded = newRecorder() + UserActivityEventListener.handle( buildEvent( RequestEvent.Type.RESOURCE_METHOD_FINISHED, buildSecurityContext(sessionUser(null)) - ) + ), + trackTo(recorded) ) recorded.isEmpty shouldBe true } it should "ignore null SecurityContext" in { - val (rel, recorded) = setup() - rel.onEvent(buildEvent(RequestEvent.Type.RESOURCE_METHOD_FINISHED, null)) + val recorded = newRecorder() + UserActivityEventListener.handle( + buildEvent(RequestEvent.Type.RESOURCE_METHOD_FINISHED, null), + trackTo(recorded) + ) recorded.isEmpty shouldBe true } + // Listener-level smoke tests: verify the SAM lambda + dispatch glue, + // not the per-event branching (which lives in `handle`). + "UserActivityEventListener" should "dispatch RequestEvent to the handle function" in { + val recorded = newRecorder() + val listener = new UserActivityEventListener(trackTo(recorded)) + val rel = listener.onRequest(mock(classOf[RequestEvent])) + rel.onEvent( + buildEvent(RequestEvent.Type.RESOURCE_METHOD_FINISHED, buildSecurityContext(sessionUser(7))) + ) + recorded.peek() shouldBe 7 + } + it should "no-op on ApplicationEvent (lifecycle hook unused)" in { - val recorded = new ConcurrentLinkedQueue[Integer]() - val listener = new UserActivityEventListener(uid => { recorded.add(uid); () }) - val appEvent = mock(classOf[ApplicationEvent]) - listener.onEvent(appEvent) + val recorded = newRecorder() + val listener = new UserActivityEventListener(trackTo(recorded)) + listener.onEvent(mock(classOf[ApplicationEvent])) recorded.isEmpty shouldBe true } it should "construct with the default tracker without invoking it" in { - // Default-arg path: new UserActivityEventListener() resolves track to - // UserActivityTracker.markActive but does not call it (the listener - // only calls track when a matched SessionUser arrives). new UserActivityEventListener() should not be null } }