From 15fb4a0a6f04455694c251312f66c9d5a568d2df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Mili=C4=87?= Date: Tue, 21 Apr 2026 10:45:15 +0200 Subject: [PATCH 1/3] feature: fall back unmigrated v7.0.0 paths to v6.0.0 via Lift bridge with X-OBP-Version-Served header --- .../api/util/http4s/Http4sLiftWebBridge.scala | 36 ++++++++++++++++--- .../Http4sServerIntegrationTest.scala | 32 +++++++++++++++-- 2 files changed, 62 insertions(+), 6 deletions(-) diff --git a/obp-api/src/main/scala/code/api/util/http4s/Http4sLiftWebBridge.scala b/obp-api/src/main/scala/code/api/util/http4s/Http4sLiftWebBridge.scala index 3b9c41a82a..989ff3b57b 100644 --- a/obp-api/src/main/scala/code/api/util/http4s/Http4sLiftWebBridge.scala +++ b/obp-api/src/main/scala/code/api/util/http4s/Http4sLiftWebBridge.scala @@ -27,6 +27,32 @@ object Http4sLiftWebBridge extends MdcLoggable { private lazy val continuationTimeoutMs: Long = APIUtil.getPropsAsLongValue("http4s.continuation.timeout.ms", 60000L) + private lazy val apiPathZero: String = APIUtil.getPropsValue("apiPathZero", "obp") + private lazy val v700Path: String = s"/$apiPathZero/v7.0.0" + // Version that the bridge falls back to for unmigrated v7.0.0 paths. + private lazy val fallbackVersion: String = APIUtil.getPropsValue("http4s.v700.fallback.version", "v6.0.0") + private lazy val fallbackPath: String = s"/$apiPathZero/$fallbackVersion" + + /** + * If the request targets a v7.0.0 path that was not claimed by any http4s handler + * (otherwise it would never reach this bridge), rewrite the URI to the configured + * fallback version (default v6.0.0) so Lift can serve it transparently. + * + * Returns the (possibly rewritten) request and the served version string to attach + * as X-OBP-Version-Served, or None when no rewrite was needed. + */ + private def rewriteIfV700(req: Request[IO]): (Request[IO], Option[String]) = { + val pathStr = req.uri.path.renderString + if (pathStr == v700Path || pathStr.startsWith(v700Path + "/")) { + val rewrittenPath = fallbackPath + pathStr.drop(v700Path.length) + logger.info(s"[BRIDGE] v7.0.0 fallback: $pathStr → $rewrittenPath (served by $fallbackVersion)") + val newUri = req.uri.withPath(Uri.Path.unsafeFromString(rewrittenPath)) + (req.withUri(newUri), Some(fallbackVersion)) + } else { + (req, None) + } + } + def routes: HttpRoutes[IO] = HttpRoutes.of[IO] { case req => dispatch(req) } @@ -38,12 +64,13 @@ object Http4sLiftWebBridge extends MdcLoggable { } def dispatch(req: Request[IO]): IO[Response[IO]] = { - val uri = req.uri.renderString + val (effectiveReq, servedVersion) = rewriteIfV700(req) + val uri = req.uri.renderString val method = req.method.name logger.debug(s"Http4sLiftBridge dispatching: $method $uri, S.inStatefulScope_? = ${S.inStatefulScope_?}") val result = for { - bodyBytes <- req.body.compile.to(Array) - liftReq = buildLiftReq(req, bodyBytes) + bodyBytes <- effectiveReq.body.compile.to(Array) + liftReq = buildLiftReq(effectiveReq, bodyBytes) liftResp <- IO { val session = LiftRules.statelessSession.vend.apply(liftReq) S.init(Full(liftReq), session) { @@ -71,7 +98,8 @@ object Http4sLiftWebBridge extends MdcLoggable { } yield { logger.debug(s"[BRIDGE] Http4sLiftBridge completed: $method $uri -> ${http4sResponse.status.code}") logger.debug(s"Http4sLiftBridge completed: $method $uri -> ${http4sResponse.status.code}") - ensureStandardHeaders(req, http4sResponse) + val baseResp = ensureStandardHeaders(req, http4sResponse) + servedVersion.fold(baseResp)(v => baseResp.putHeaders(Header.Raw(CIString("X-OBP-Version-Served"), v))) } result.handleErrorWith { e => logger.error(s"[BRIDGE] Uncaught exception in dispatch: $method $uri - ${e.getMessage}", e) diff --git a/obp-api/src/test/scala/code/api/http4sbridge/Http4sServerIntegrationTest.scala b/obp-api/src/test/scala/code/api/http4sbridge/Http4sServerIntegrationTest.scala index 50dd4ce621..bd2e4ef402 100644 --- a/obp-api/src/test/scala/code/api/http4sbridge/Http4sServerIntegrationTest.scala +++ b/obp-api/src/test/scala/code/api/http4sbridge/Http4sServerIntegrationTest.scala @@ -42,6 +42,17 @@ class Http4sServerIntegrationTest extends ServerSetup with DefaultUsers with Ser AccountAccess.bulkDelete_!!() } + private def makeHttp4sGetRequestFull(path: String, reqHeaders: Map[String, String] = Map.empty): (Int, String, Option[String]) = { + val request = url(s"$baseUrl$path") + val requestWithHeaders = reqHeaders.foldLeft(request) { case (req, (key, value)) => + req.addHeader(key, value) + } + val response = Http.default(requestWithHeaders.setHeader("Accept", "*/*") > as.Response(p => + (p.getStatusCode, p.getResponseBody, Option(p.getHeader("X-OBP-Version-Served")).filter(_.nonEmpty)) + )) + Await.result(response, 10.seconds) + } + private def makeHttp4sGetRequest(path: String, headers: Map[String, String] = Map.empty): (Int, String) = { val request = url(s"$baseUrl$path") val requestWithHeaders = headers.foldLeft(request) { case (req, (key, value)) => @@ -234,14 +245,31 @@ class Http4sServerIntegrationTest extends ServerSetup with DefaultUsers with Ser scenario("GET /obp/v7.0.0/resource-docs/v7.0.0/obp returns resource docs", Http4sServerIntegrationTag) { When("We request resource documentation") val (status, body) = makeHttp4sGetRequest("/obp/v7.0.0/resource-docs/v7.0.0/obp") - + Then("We should get a 200 response") status should equal(200) - + And("Response should contain resource docs array") val json = parse(body) json \ "resource_docs" should not equal JObject(Nil) } + + scenario("v7.0.0 unmigrated path falls back to v6.0.0 via Lift bridge", Http4sServerIntegrationTag) { + When("We request an unmigrated v7.0.0 endpoint (/consumers/current exists in v6 but not v7)") + val (status, body, versionServed) = makeHttp4sGetRequestFull("/obp/v7.0.0/consumers/current") + + Then("We get a proper OBP error response, not a version-not-found 404") + status should (equal(401) or equal(200) or equal(403)) + + And("X-OBP-Version-Served header indicates the fallback version") + versionServed should equal(Some("v6.0.0")) + + When("We request a native v7.0.0 endpoint (/banks is migrated)") + val (_, _, nativeVersionServed) = makeHttp4sGetRequestFull("/obp/v7.0.0/banks") + + Then("Native v7 endpoints do not set X-OBP-Version-Served") + nativeVersionServed should equal(None) + } } feature("HTTP4S v5.0.0 Native Endpoints") { From 05a7b71bdaa1f5196f52ae08d3607b01c35509bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Mili=C4=87?= Date: Tue, 21 Apr 2026 11:02:24 +0200 Subject: [PATCH 2/3] =?UTF-8?q?performance:=20defer=20Hikari=20connection?= =?UTF-8?q?=20acquisition=20until=20first=20DB=20call=20=E2=80=94=20REST/S?= =?UTF-8?q?OAP-only=20endpoints=20never=20touch=20the=20pool?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../util/http4s/RequestScopeConnection.scala | 88 +++++++++++++------ .../util/http4s/ResourceDocMiddleware.scala | 82 +++++++++-------- .../http4s/RequestScopeConnectionTest.scala | 46 +++++++++- .../api/v7_0_0/Http4s700TransactionTest.scala | 5 +- 4 files changed, 155 insertions(+), 66 deletions(-) diff --git a/obp-api/src/main/scala/code/api/util/http4s/RequestScopeConnection.scala b/obp-api/src/main/scala/code/api/util/http4s/RequestScopeConnection.scala index b42b99ce82..d7b7a58023 100644 --- a/obp-api/src/main/scala/code/api/util/http4s/RequestScopeConnection.scala +++ b/obp-api/src/main/scala/code/api/util/http4s/RequestScopeConnection.scala @@ -35,27 +35,28 @@ import scala.concurrent.Future * worker thread — so the Future body sees the same proxy as the IO fiber. * * FLOW per request (ResourceDocMiddleware): - * 1. Borrow a real Connection from HikariCP. - * 2. Wrap it in a non-closing proxy (commit/rollback/close are no-ops). - * 3. Store the proxy in requestProxyLocal (IOLocal) only — currentProxy (TTL) is - * NOT set here to avoid leaving compute threads dirty. - * 4. Run validateOnly (auth, roles, entity lookups) — outside the transaction, on - * auto-commit vendor connections. On Left: return error response, no transaction - * opened. On Right (GET/HEAD): run routes.run directly on auto-commit connections. - * On Right (POST/PUT/DELETE/PATCH): open the transaction and run routes.run inside it. - * 5. Each IO.fromFuture call site uses RequestScopeConnection.fromFuture, which in - * a single synchronous IO.defer block on compute thread T: - * a. Sets currentProxy (TTL) on T. - * b. Evaluates `fut` — the Future is submitted; TtlRunnable captures T's TTL. - * c. Removes currentProxy from T immediately after submission (T is clean). - * d. Awaits the already-submitted future asynchronously. - * 6. Inside the Future, Lift Mapper calls DB.use(DefaultConnectionIdentifier). - * RequestAwareConnectionManager.newConnection reads currentProxy (TTL — set by - * TtlRunnable on the worker thread) and returns the proxy → all mapper calls - * share one underlying Connection. - * 7. The proxy's no-op commit/close prevents Lift from committing or releasing - * the connection at the end of each individual DB.use scope. - * 8. At request end: commit (or rollback on exception) and close the real connection. + * 1. Set requestLazyAcquire to a once-only acquisition IO — no connection borrowed yet. + * 2. Run validateOnly (auth, roles, entity lookups) — outside any transaction, on + * auto-commit vendor connections. On Left: return error response, clean up IOLocals. + * On Right (GET/HEAD): run routes.run directly on auto-commit connections. + * On Right (POST/PUT/DELETE/PATCH): run routes.run inside the lazy transaction scope. + * 3. On the FIRST fromFuture call that touches DB: + * a. ensureProxy reads requestProxyLocal — fast path if already cached in this fiber. + * b. Falls through to requestLazyAcquire: invokes the once-only IO which borrows + * a real Connection, wraps it in a non-closing proxy, and completes the request- + * scoped Deferred. Concurrent callers that lose the race discard their connection + * and wait for the Deferred — all fibers end up with the same proxy. + * c. Caches the proxy in requestProxyLocal (fiber-local) for subsequent calls. + * d. Sets currentProxy (TTL) on compute thread T so TtlRunnable carries it to the + * Future worker thread. + * 4. Inside the Future, Lift Mapper calls DB.use(DefaultConnectionIdentifier). + * RequestAwareConnectionManager.newConnection reads currentProxy (TTL) and returns + * the proxy → all mapper calls share one underlying Connection. + * 5. The proxy's no-op commit/close prevents Lift from committing or releasing the + * connection at the end of each individual DB.use scope. + * 6. At request end: deferred.tryGet (held in withBusinessDBTransaction's closure): + * - None → endpoint made zero DB calls; pool unaffected, nothing to commit or close. + * - Some(realConn, _) → commit (or rollback on error/cancel), then close realConn. * * METRIC WRITES: recordMetric runs in IO.blocking (blocking pool, no TTL from compute * thread). currentProxy.get() returns null there, so RequestAwareConnectionManager @@ -69,12 +70,25 @@ import scala.concurrent.Future object RequestScopeConnection extends MdcLoggable { /** - * Fiber-local proxy reference. Readable from any IO step in the request fiber - * regardless of which compute thread runs it. This is the source of truth. + * Fiber-local proxy reference. Set lazily on the first fromFuture call that + * needs a DB connection. Used as a fast-path cache on subsequent calls in the + * same fiber so the IOLocal / Deferred lookup is skipped. */ val requestProxyLocal: IOLocal[Option[Connection]] = IOLocal[Option[Connection]](None).unsafeRunSync()(IORuntime.global) + /** + * Fiber-local handle to the once-only acquisition IO installed by + * withBusinessDBTransaction. None outside a transaction scope (GET/HEAD, or before + * the first POST/PUT/DELETE request scope is set up). + * + * The IO[Connection] value is the same object reference across all fibers that + * inherit it (IOLocal copy-on-fork copies the reference, not the IO's internal + * Deferred state), so concurrent callers safely serialise through the Deferred. + */ + val requestLazyAcquire: IOLocal[Option[IO[Connection]]] = + IOLocal[Option[IO[Connection]]](None).unsafeRunSync()(IORuntime.global) + /** * Thread-local proxy reference, propagated to Future workers via TtlRunnable. * Set from requestProxyLocal immediately before each IO(Future { }) submission. @@ -119,8 +133,9 @@ object RequestScopeConnection extends MdcLoggable { /** * Drop-in replacement for IO.fromFuture(IO(fut)). * - * Reads the request proxy from the IOLocal (reliable across IO thread switches), - * then — in a single synchronous IO.defer block on the current compute thread T: + * Ensures a proxy connection is available (acquiring lazily on first call if + * withBusinessDBTransaction set up a lazy acquisition scope), then — in a single + * synchronous IO.defer block on the current compute thread T: * 1. Sets TTL on T so TtlRunnable captures it at Future-submission time. * 2. Evaluates `fut`, which submits the Future to the OBP EC; the TtlRunnable * wraps the submitted task and carries the proxy to the Future's worker thread. @@ -132,7 +147,7 @@ object RequestScopeConnection extends MdcLoggable { * proxy via the TtlRunnable captured in step 2. */ def fromFuture[A](fut: => Future[A]): IO[A] = - requestProxyLocal.get.flatMap { proxyOpt => + ensureProxy.flatMap { proxyOpt => IO.defer { proxyOpt.foreach(currentProxy.set) // (1) set TTL on current thread T val f = fut // (2) submit Future; TtlRunnable captures proxy from T @@ -140,6 +155,27 @@ object RequestScopeConnection extends MdcLoggable { IO.fromFuture(IO.pure(f)) // await the already-submitted future } } + + /** + * Returns the proxy for the current fiber, acquiring it lazily on first call. + * + * Fast path: requestProxyLocal is already set (same fiber, subsequent call) → O(1). + * Slow path: requestLazyAcquire holds an acquisition IO → invoke it, cache proxy in + * requestProxyLocal for this fiber's future calls. + * No-op: neither is set (GET/HEAD, or no transaction scope) → None, TTL stays clear. + */ + private def ensureProxy: IO[Option[Connection]] = + requestProxyLocal.get.flatMap { + case some @ Some(_) => IO.pure(some) + case None => + requestLazyAcquire.get.flatMap { + case None => IO.pure(None) + case Some(acquire) => + acquire.flatMap { proxy => + requestProxyLocal.set(Some(proxy)).as(Some(proxy)) + } + } + } } /** diff --git a/obp-api/src/main/scala/code/api/util/http4s/ResourceDocMiddleware.scala b/obp-api/src/main/scala/code/api/util/http4s/ResourceDocMiddleware.scala index 9bafe7db75..8097cfc0e6 100644 --- a/obp-api/src/main/scala/code/api/util/http4s/ResourceDocMiddleware.scala +++ b/obp-api/src/main/scala/code/api/util/http4s/ResourceDocMiddleware.scala @@ -16,6 +16,7 @@ import net.liftweb.common.{Box, Empty, Full} import org.http4s._ import org.http4s.headers.`Content-Type` +import java.sql.Connection import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal @@ -132,51 +133,58 @@ object ResourceDocMiddleware extends MdcLoggable { } /** - * Wraps the business-logic IO in a request-scoped DB transaction. + * Activates a lazy request-scoped DB transaction for mutating methods + * (POST/PUT/DELETE/PATCH). GET/HEAD bypass this entirely. * - * Called only for mutating methods (POST/PUT/DELETE/PATCH) after validateOnly succeeds. - * GET/HEAD bypass this entirely and run on auto-commit vendor connections, avoiding - * a pool borrow + empty-commit overhead on every read request. + * NO connection is borrowed upfront. Instead, a once-only acquisition IO is + * installed in requestLazyAcquire. The first fromFuture call that actually needs + * a DB connection triggers the acquisition; endpoints that only call external REST + * or SOAP connectors never touch the pool at all. * - * Borrows a Connection from HikariCP via Resource.make so close() is guaranteed - * even if commit/rollback throws or the fiber is cancelled. The proxy prevents - * Lift's internal DB.use lifecycle from committing or returning the connection - * prematurely. + * Concurrent acquisition (rare — most handlers are sequential for-comprehensions): + * the inner Deferred serialises callers. The first fiber to complete it wins; + * any concurrent loser closes its own connection immediately and shares the winner's + * proxy. All fibers use one underlying Connection and one transaction. * * currentProxy (TTL) is NOT set here. Every DB call goes through * RequestScopeConnection.fromFuture, which atomically sets + submits + clears the - * TTL within a single IO.defer block on the compute thread, so the thread is never - * left dirty after the fromFuture call returns. + * TTL within a single IO.defer block on the compute thread. * - * On success: commits, then Resource finalizer closes. - * On error/cancellation: rolls back (errors swallowed to preserve original cause), - * then Resource finalizer closes. - * - * Metric writes (IO.blocking in recordMetric) run on the blocking pool where - * currentProxy is not set — they get their own pool connection and commit - * independently, matching v6 behaviour. + * On success (connection was acquired): commit, then close. + * On error/cancel (connection was acquired): rollback (errors swallowed), then close. + * If no DB call was made: deferred is never completed → nothing to commit or close. */ private def withBusinessDBTransaction(io: IO[Response[IO]]): IO[Response[IO]] = - Resource.make( - IO.blocking(APIUtil.vendor.HikariDatasource.ds.getConnection()) - )(conn => - IO.blocking { try { conn.close() } catch { case _: Exception => () } } - ).use { realConn => - val proxy = RequestScopeConnection.makeProxy(realConn) - for { - _ <- RequestScopeConnection.requestProxyLocal.set(Some(proxy)) - // Note: currentProxy (TTL) is NOT set here. Every DB call goes through - // RequestScopeConnection.fromFuture, which atomically sets + submits + clears - // the TTL within a single IO.defer block on the compute thread. - result <- io.guaranteeCase { - case Outcome.Succeeded(_) => - RequestScopeConnection.requestProxyLocal.set(None) *> - IO.blocking { realConn.commit() } - case _ => - RequestScopeConnection.requestProxyLocal.set(None) *> - IO.blocking { try { realConn.rollback() } catch { case _: Exception => () } } - } - } yield result + Deferred[IO, (Connection, Connection)].flatMap { deferred => + // acquireOnce: idempotent across concurrent callers via the Deferred. + // The loser of the complete() race discards its own connection and awaits + // the winner's proxy so all fibers share one transaction. + val acquireOnce: IO[Connection] = for { + realConn <- IO.blocking(APIUtil.vendor.HikariDatasource.ds.getConnection()) + _ <- IO.blocking { realConn.setAutoCommit(false) } + proxy = RequestScopeConnection.makeProxy(realConn) + ok <- deferred.complete((realConn, proxy)) + _ <- if (!ok) IO.blocking { try { realConn.close() } catch { case _: Exception => () } } + else IO.unit + p <- deferred.get.map(_._2) + } yield p + + RequestScopeConnection.requestLazyAcquire.set(Some(acquireOnce)).bracket(_ => + io.guaranteeCase { outcome => + deferred.tryGet.flatMap { + case None => IO.unit // no DB calls — pool unaffected + case Some((realConn, _)) => + RequestScopeConnection.requestProxyLocal.set(None) *> + (outcome match { + case Outcome.Succeeded(_) => + IO.blocking { realConn.commit() } + case _ => + IO.blocking { try { realConn.rollback() } catch { case _: Exception => () } } + }) *> + IO.blocking { try { realConn.close() } catch { case _: Exception => () } } + } + } + )(_ => RequestScopeConnection.requestLazyAcquire.set(None)) } /** diff --git a/obp-api/src/test/scala/code/api/util/http4s/RequestScopeConnectionTest.scala b/obp-api/src/test/scala/code/api/util/http4s/RequestScopeConnectionTest.scala index 9a92aa51d4..9ea91ce33c 100644 --- a/obp-api/src/test/scala/code/api/util/http4s/RequestScopeConnectionTest.scala +++ b/obp-api/src/test/scala/code/api/util/http4s/RequestScopeConnectionTest.scala @@ -1,5 +1,6 @@ package code.api.util.http4s +import cats.effect.{IO, IOLocal} import cats.effect.unsafe.IORuntime import net.liftweb.common.{Box, Empty, Full} import net.liftweb.db.ConnectionManager @@ -28,8 +29,10 @@ class RequestScopeConnectionTest extends FeatureSpec with Matchers with GivenWhe implicit val runtime: IORuntime = IORuntime.global after { - // Reset global TTL state so tests are independent. + // Reset global TTL state and fiber-locals so tests are independent. RequestScopeConnection.currentProxy.set(null) + RequestScopeConnection.requestProxyLocal.set(None).unsafeRunSync() + RequestScopeConnection.requestLazyAcquire.set(None).unsafeRunSync() } // ─── helpers ───────────────────────────────────────────────────────────────── @@ -242,6 +245,47 @@ class RequestScopeConnectionTest extends FeatureSpec with Matchers with GivenWhe seen shouldBe null } + scenario("Future observes the proxy via TTL when acquired lazily through requestLazyAcquire") { + Given("requestProxyLocal is None but requestLazyAcquire holds an acquisition IO") + val proxy = RequestScopeConnection.makeProxy(trackingConn(new ConnectionTracker)) + val acquireIO: IO[Connection] = IO.pure(proxy) + + val program = for { + _ <- RequestScopeConnection.requestLazyAcquire.set(Some(acquireIO)) + seen <- RequestScopeConnection.fromFuture { + Future { RequestScopeConnection.currentProxy.get() } + } + } yield seen + + When("fromFuture is executed with only requestLazyAcquire populated") + val seen = program.unsafeRunSync() + + Then("The Future observed the proxy acquired lazily through the TransmittableThreadLocal") + seen should be theSameInstanceAs proxy + } + + scenario("Lazy acquisition is skipped when requestProxyLocal already holds a proxy") { + Given("requestProxyLocal already holds a cached proxy") + val cachedProxy = RequestScopeConnection.makeProxy(trackingConn(new ConnectionTracker)) + var acquireCalled = false + val acquireIO: IO[Connection] = IO { acquireCalled = true; cachedProxy } + + val program = for { + _ <- RequestScopeConnection.requestProxyLocal.set(Some(cachedProxy)) + _ <- RequestScopeConnection.requestLazyAcquire.set(Some(acquireIO)) + seen <- RequestScopeConnection.fromFuture { + Future { RequestScopeConnection.currentProxy.get() } + } + } yield seen + + When("fromFuture is executed") + val seen = program.unsafeRunSync() + + Then("The cached proxy is used and the acquisition IO is never called") + seen should be theSameInstanceAs cachedProxy + acquireCalled shouldBe false + } + scenario("fromFuture returns the value produced by the Future") { Given("A simple Future that returns a known value") val program = for { diff --git a/obp-api/src/test/scala/code/api/v7_0_0/Http4s700TransactionTest.scala b/obp-api/src/test/scala/code/api/v7_0_0/Http4s700TransactionTest.scala index f20b44eb96..5687b05ae6 100644 --- a/obp-api/src/test/scala/code/api/v7_0_0/Http4s700TransactionTest.scala +++ b/obp-api/src/test/scala/code/api/v7_0_0/Http4s700TransactionTest.scala @@ -18,9 +18,10 @@ import scala.concurrent.duration._ /** * Integration tests for the v7 request-scoped transaction feature. * - * Each HTTP request handled by the http4s stack runs inside + * Each mutating HTTP request handled by the http4s stack runs inside * `ResourceDocMiddleware.withBusinessDBTransaction`, which: - * - Borrows one real JDBC connection from HikariCP + * - Lazily borrows one real JDBC connection from HikariCP on the first DB call + * (endpoints that only call external REST/SOAP connectors never touch the pool) * - Wraps it in a non-closing proxy so Lift Mapper cannot commit early * - Commits on Outcome.Succeeded (HTTP 2xx or error response) * - Rolls back on Outcome.Errored / Outcome.Canceled (uncaught exception) From 4a3e1a501895de695bb80ae4e5d28ba763956950 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Mili=C4=87?= Date: Tue, 21 Apr 2026 11:32:18 +0200 Subject: [PATCH 3/3] feature: add 504 timeout to v7 endpoints reusing long_endpoint_timeout prop (default 55 s, same as v6) --- .../util/http4s/ResourceDocMiddleware.scala | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/obp-api/src/main/scala/code/api/util/http4s/ResourceDocMiddleware.scala b/obp-api/src/main/scala/code/api/util/http4s/ResourceDocMiddleware.scala index 8097cfc0e6..2bffe0e0e4 100644 --- a/obp-api/src/main/scala/code/api/util/http4s/ResourceDocMiddleware.scala +++ b/obp-api/src/main/scala/code/api/util/http4s/ResourceDocMiddleware.scala @@ -2,6 +2,7 @@ package code.api.util.http4s import cats.data.{EitherT, Kleisli, OptionT} import cats.effect._ +import code.api.Constant import code.api.v7_0_0.Http4s700 import code.api.APIFailureNewStyle import code.api.util.APIUtil.ResourceDoc @@ -18,6 +19,7 @@ import org.http4s.headers.`Content-Type` import java.sql.Connection import scala.collection.mutable.ArrayBuffer +import scala.concurrent.duration._ import scala.util.control.NonFatal /** @@ -41,6 +43,10 @@ object ResourceDocMiddleware extends MdcLoggable { /** Type alias for http4s OptionT route effect */ type HttpF[A] = OptionT[IO, A] + // Same prop as FutureUtil.defaultTimeout — one setting controls both v6 and v7. + // Default 55 s. Override with long_endpoint_timeout in props. + private def endpointTimeoutMs: Long = Constant.longEndpointTimeoutInMillis + /** Type alias for validation effect using EitherT */ type Validation[A] = EitherT[IO, Response[IO], A] @@ -104,7 +110,7 @@ object ResourceDocMiddleware extends MdcLoggable { // GET/HEAD are safe methods — no writes, no transaction needed; they run on // auto-commit vendor connections (same as validation). All other methods // (POST/PUT/DELETE/PATCH) wrap routes.run in withBusinessDBTransaction. - OptionT( + val work: IO[Option[Response[IO]]] = validateOnly(req, resourceDoc, pathParams, ccWithDoc).flatMap { case Left(errorResponse) => IO.pure(Option(errorResponse)) @@ -118,7 +124,7 @@ object ResourceDocMiddleware extends MdcLoggable { else withBusinessDBTransaction(routeIO) executed.map(Option(_)) } - ) + OptionT(work.timeoutTo(endpointTimeoutMs.millis, endpointTimeoutResponse(req))) case None => // No matching ResourceDoc: fallback to original route (NO transaction scope opened). @@ -132,6 +138,18 @@ object ResourceDocMiddleware extends MdcLoggable { } } + /** 504 response emitted when endpointTimeoutMs elapses before the handler completes. */ + private def endpointTimeoutResponse(req: Request[IO]): IO[Option[Response[IO]]] = IO { + logger.warn( + s"[ResourceDocMiddleware] Endpoint timeout after ${endpointTimeoutMs}ms: " + + s"${req.method.name} ${req.uri.renderString}" + ) + val body = s"""{"message":"Request timeout: backend service did not respond within ${endpointTimeoutMs}ms."}""" + Some(ensureJsonContentType( + Response[IO](org.http4s.Status.GatewayTimeout).withEntity(body.getBytes("UTF-8")) + )) + } + /** * Activates a lazy request-scoped DB transaction for mutating methods * (POST/PUT/DELETE/PATCH). GET/HEAD bypass this entirely.