Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,32 @@ object Http4sLiftWebBridge extends MdcLoggable {
private lazy val continuationTimeoutMs: Long =
APIUtil.getPropsAsLongValue("http4s.continuation.timeout.ms", 60000L)

private lazy val apiPathZero: String = APIUtil.getPropsValue("apiPathZero", "obp")
private lazy val v700Path: String = s"/$apiPathZero/v7.0.0"
// Version that the bridge falls back to for unmigrated v7.0.0 paths.
private lazy val fallbackVersion: String = APIUtil.getPropsValue("http4s.v700.fallback.version", "v6.0.0")
private lazy val fallbackPath: String = s"/$apiPathZero/$fallbackVersion"

/**
* If the request targets a v7.0.0 path that was not claimed by any http4s handler
* (otherwise it would never reach this bridge), rewrite the URI to the configured
* fallback version (default v6.0.0) so Lift can serve it transparently.
*
* Returns the (possibly rewritten) request and the served version string to attach
* as X-OBP-Version-Served, or None when no rewrite was needed.
*/
private def rewriteIfV700(req: Request[IO]): (Request[IO], Option[String]) = {
val pathStr = req.uri.path.renderString
if (pathStr == v700Path || pathStr.startsWith(v700Path + "/")) {
val rewrittenPath = fallbackPath + pathStr.drop(v700Path.length)
logger.info(s"[BRIDGE] v7.0.0 fallback: $pathStr → $rewrittenPath (served by $fallbackVersion)")
val newUri = req.uri.withPath(Uri.Path.unsafeFromString(rewrittenPath))
(req.withUri(newUri), Some(fallbackVersion))
} else {
(req, None)
}
}

def routes: HttpRoutes[IO] = HttpRoutes.of[IO] {
case req => dispatch(req)
}
Expand All @@ -38,12 +64,13 @@ object Http4sLiftWebBridge extends MdcLoggable {
}

def dispatch(req: Request[IO]): IO[Response[IO]] = {
val uri = req.uri.renderString
val (effectiveReq, servedVersion) = rewriteIfV700(req)
val uri = req.uri.renderString
val method = req.method.name
logger.debug(s"Http4sLiftBridge dispatching: $method $uri, S.inStatefulScope_? = ${S.inStatefulScope_?}")
val result = for {
bodyBytes <- req.body.compile.to(Array)
liftReq = buildLiftReq(req, bodyBytes)
bodyBytes <- effectiveReq.body.compile.to(Array)
liftReq = buildLiftReq(effectiveReq, bodyBytes)
liftResp <- IO {
val session = LiftRules.statelessSession.vend.apply(liftReq)
S.init(Full(liftReq), session) {
Expand Down Expand Up @@ -71,7 +98,8 @@ object Http4sLiftWebBridge extends MdcLoggable {
} yield {
logger.debug(s"[BRIDGE] Http4sLiftBridge completed: $method $uri -> ${http4sResponse.status.code}")
logger.debug(s"Http4sLiftBridge completed: $method $uri -> ${http4sResponse.status.code}")
ensureStandardHeaders(req, http4sResponse)
val baseResp = ensureStandardHeaders(req, http4sResponse)
servedVersion.fold(baseResp)(v => baseResp.putHeaders(Header.Raw(CIString("X-OBP-Version-Served"), v)))
}
result.handleErrorWith { e =>
logger.error(s"[BRIDGE] Uncaught exception in dispatch: $method $uri - ${e.getMessage}", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,28 @@ import scala.concurrent.Future
* worker thread — so the Future body sees the same proxy as the IO fiber.
*
* FLOW per request (ResourceDocMiddleware):
* 1. Borrow a real Connection from HikariCP.
* 2. Wrap it in a non-closing proxy (commit/rollback/close are no-ops).
* 3. Store the proxy in requestProxyLocal (IOLocal) only — currentProxy (TTL) is
* NOT set here to avoid leaving compute threads dirty.
* 4. Run validateOnly (auth, roles, entity lookups) — outside the transaction, on
* auto-commit vendor connections. On Left: return error response, no transaction
* opened. On Right (GET/HEAD): run routes.run directly on auto-commit connections.
* On Right (POST/PUT/DELETE/PATCH): open the transaction and run routes.run inside it.
* 5. Each IO.fromFuture call site uses RequestScopeConnection.fromFuture, which in
* a single synchronous IO.defer block on compute thread T:
* a. Sets currentProxy (TTL) on T.
* b. Evaluates `fut` — the Future is submitted; TtlRunnable captures T's TTL.
* c. Removes currentProxy from T immediately after submission (T is clean).
* d. Awaits the already-submitted future asynchronously.
* 6. Inside the Future, Lift Mapper calls DB.use(DefaultConnectionIdentifier).
* RequestAwareConnectionManager.newConnection reads currentProxy (TTL — set by
* TtlRunnable on the worker thread) and returns the proxy → all mapper calls
* share one underlying Connection.
* 7. The proxy's no-op commit/close prevents Lift from committing or releasing
* the connection at the end of each individual DB.use scope.
* 8. At request end: commit (or rollback on exception) and close the real connection.
* 1. Set requestLazyAcquire to a once-only acquisition IO — no connection borrowed yet.
* 2. Run validateOnly (auth, roles, entity lookups) — outside any transaction, on
* auto-commit vendor connections. On Left: return error response, clean up IOLocals.
* On Right (GET/HEAD): run routes.run directly on auto-commit connections.
* On Right (POST/PUT/DELETE/PATCH): run routes.run inside the lazy transaction scope.
* 3. On the FIRST fromFuture call that touches DB:
* a. ensureProxy reads requestProxyLocal — fast path if already cached in this fiber.
* b. Falls through to requestLazyAcquire: invokes the once-only IO which borrows
* a real Connection, wraps it in a non-closing proxy, and completes the request-
* scoped Deferred. Concurrent callers that lose the race discard their connection
* and wait for the Deferred — all fibers end up with the same proxy.
* c. Caches the proxy in requestProxyLocal (fiber-local) for subsequent calls.
* d. Sets currentProxy (TTL) on compute thread T so TtlRunnable carries it to the
* Future worker thread.
* 4. Inside the Future, Lift Mapper calls DB.use(DefaultConnectionIdentifier).
* RequestAwareConnectionManager.newConnection reads currentProxy (TTL) and returns
* the proxy → all mapper calls share one underlying Connection.
* 5. The proxy's no-op commit/close prevents Lift from committing or releasing the
* connection at the end of each individual DB.use scope.
* 6. At request end: deferred.tryGet (held in withBusinessDBTransaction's closure):
* - None → endpoint made zero DB calls; pool unaffected, nothing to commit or close.
* - Some(realConn, _) → commit (or rollback on error/cancel), then close realConn.
*
* METRIC WRITES: recordMetric runs in IO.blocking (blocking pool, no TTL from compute
* thread). currentProxy.get() returns null there, so RequestAwareConnectionManager
Expand All @@ -69,12 +70,25 @@ import scala.concurrent.Future
object RequestScopeConnection extends MdcLoggable {

/**
* Fiber-local proxy reference. Readable from any IO step in the request fiber
* regardless of which compute thread runs it. This is the source of truth.
* Fiber-local proxy reference. Set lazily on the first fromFuture call that
* needs a DB connection. Used as a fast-path cache on subsequent calls in the
* same fiber so the IOLocal / Deferred lookup is skipped.
*/
val requestProxyLocal: IOLocal[Option[Connection]] =
IOLocal[Option[Connection]](None).unsafeRunSync()(IORuntime.global)

/**
* Fiber-local handle to the once-only acquisition IO installed by
* withBusinessDBTransaction. None outside a transaction scope (GET/HEAD, or before
* the first POST/PUT/DELETE request scope is set up).
*
* The IO[Connection] value is the same object reference across all fibers that
* inherit it (IOLocal copy-on-fork copies the reference, not the IO's internal
* Deferred state), so concurrent callers safely serialise through the Deferred.
*/
val requestLazyAcquire: IOLocal[Option[IO[Connection]]] =
IOLocal[Option[IO[Connection]]](None).unsafeRunSync()(IORuntime.global)

/**
* Thread-local proxy reference, propagated to Future workers via TtlRunnable.
* Set from requestProxyLocal immediately before each IO(Future { }) submission.
Expand Down Expand Up @@ -119,8 +133,9 @@ object RequestScopeConnection extends MdcLoggable {
/**
* Drop-in replacement for IO.fromFuture(IO(fut)).
*
* Reads the request proxy from the IOLocal (reliable across IO thread switches),
* then — in a single synchronous IO.defer block on the current compute thread T:
* Ensures a proxy connection is available (acquiring lazily on first call if
* withBusinessDBTransaction set up a lazy acquisition scope), then — in a single
* synchronous IO.defer block on the current compute thread T:
* 1. Sets TTL on T so TtlRunnable captures it at Future-submission time.
* 2. Evaluates `fut`, which submits the Future to the OBP EC; the TtlRunnable
* wraps the submitted task and carries the proxy to the Future's worker thread.
Expand All @@ -132,14 +147,35 @@ object RequestScopeConnection extends MdcLoggable {
* proxy via the TtlRunnable captured in step 2.
*/
def fromFuture[A](fut: => Future[A]): IO[A] =
requestProxyLocal.get.flatMap { proxyOpt =>
ensureProxy.flatMap { proxyOpt =>
IO.defer {
proxyOpt.foreach(currentProxy.set) // (1) set TTL on current thread T
val f = fut // (2) submit Future; TtlRunnable captures proxy from T
currentProxy.remove() // (3) clear TTL on T — T is clean after this point
IO.fromFuture(IO.pure(f)) // await the already-submitted future
}
}

/**
* Returns the proxy for the current fiber, acquiring it lazily on first call.
*
* Fast path: requestProxyLocal is already set (same fiber, subsequent call) → O(1).
* Slow path: requestLazyAcquire holds an acquisition IO → invoke it, cache proxy in
* requestProxyLocal for this fiber's future calls.
* No-op: neither is set (GET/HEAD, or no transaction scope) → None, TTL stays clear.
*/
private def ensureProxy: IO[Option[Connection]] =
requestProxyLocal.get.flatMap {
case some @ Some(_) => IO.pure(some)
case None =>
requestLazyAcquire.get.flatMap {
case None => IO.pure(None)
case Some(acquire) =>
acquire.flatMap { proxy =>
requestProxyLocal.set(Some(proxy)).as(Some(proxy))
}
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package code.api.util.http4s

import cats.data.{EitherT, Kleisli, OptionT}
import cats.effect._
import code.api.Constant
import code.api.v7_0_0.Http4s700
import code.api.APIFailureNewStyle
import code.api.util.APIUtil.ResourceDoc
Expand All @@ -16,7 +17,9 @@ import net.liftweb.common.{Box, Empty, Full}
import org.http4s._
import org.http4s.headers.`Content-Type`

import java.sql.Connection
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.util.control.NonFatal

/**
Expand All @@ -40,6 +43,10 @@ object ResourceDocMiddleware extends MdcLoggable {
/** Type alias for http4s OptionT route effect */
type HttpF[A] = OptionT[IO, A]

// Same prop as FutureUtil.defaultTimeout — one setting controls both v6 and v7.
// Default 55 s. Override with long_endpoint_timeout in props.
private def endpointTimeoutMs: Long = Constant.longEndpointTimeoutInMillis

/** Type alias for validation effect using EitherT */
type Validation[A] = EitherT[IO, Response[IO], A]

Expand Down Expand Up @@ -103,7 +110,7 @@ object ResourceDocMiddleware extends MdcLoggable {
// GET/HEAD are safe methods — no writes, no transaction needed; they run on
// auto-commit vendor connections (same as validation). All other methods
// (POST/PUT/DELETE/PATCH) wrap routes.run in withBusinessDBTransaction.
OptionT(
val work: IO[Option[Response[IO]]] =
validateOnly(req, resourceDoc, pathParams, ccWithDoc).flatMap {
case Left(errorResponse) =>
IO.pure(Option(errorResponse))
Expand All @@ -117,7 +124,7 @@ object ResourceDocMiddleware extends MdcLoggable {
else withBusinessDBTransaction(routeIO)
executed.map(Option(_))
}
)
OptionT(work.timeoutTo(endpointTimeoutMs.millis, endpointTimeoutResponse(req)))

case None =>
// No matching ResourceDoc: fallback to original route (NO transaction scope opened).
Expand All @@ -131,52 +138,71 @@ object ResourceDocMiddleware extends MdcLoggable {
}
}

/** 504 response emitted when endpointTimeoutMs elapses before the handler completes. */
private def endpointTimeoutResponse(req: Request[IO]): IO[Option[Response[IO]]] = IO {
logger.warn(
s"[ResourceDocMiddleware] Endpoint timeout after ${endpointTimeoutMs}ms: " +
s"${req.method.name} ${req.uri.renderString}"
)
val body = s"""{"message":"Request timeout: backend service did not respond within ${endpointTimeoutMs}ms."}"""
Some(ensureJsonContentType(
Response[IO](org.http4s.Status.GatewayTimeout).withEntity(body.getBytes("UTF-8"))
))
}

/**
* Wraps the business-logic IO in a request-scoped DB transaction.
* Activates a lazy request-scoped DB transaction for mutating methods
* (POST/PUT/DELETE/PATCH). GET/HEAD bypass this entirely.
*
* Called only for mutating methods (POST/PUT/DELETE/PATCH) after validateOnly succeeds.
* GET/HEAD bypass this entirely and run on auto-commit vendor connections, avoiding
* a pool borrow + empty-commit overhead on every read request.
* NO connection is borrowed upfront. Instead, a once-only acquisition IO is
* installed in requestLazyAcquire. The first fromFuture call that actually needs
* a DB connection triggers the acquisition; endpoints that only call external REST
* or SOAP connectors never touch the pool at all.
*
* Borrows a Connection from HikariCP via Resource.make so close() is guaranteed
* even if commit/rollback throws or the fiber is cancelled. The proxy prevents
* Lift's internal DB.use lifecycle from committing or returning the connection
* prematurely.
* Concurrent acquisition (rare — most handlers are sequential for-comprehensions):
* the inner Deferred serialises callers. The first fiber to complete it wins;
* any concurrent loser closes its own connection immediately and shares the winner's
* proxy. All fibers use one underlying Connection and one transaction.
*
* currentProxy (TTL) is NOT set here. Every DB call goes through
* RequestScopeConnection.fromFuture, which atomically sets + submits + clears the
* TTL within a single IO.defer block on the compute thread, so the thread is never
* left dirty after the fromFuture call returns.
*
* On success: commits, then Resource finalizer closes.
* On error/cancellation: rolls back (errors swallowed to preserve original cause),
* then Resource finalizer closes.
* TTL within a single IO.defer block on the compute thread.
*
* Metric writes (IO.blocking in recordMetric) run on the blocking pool where
* currentProxy is not set — they get their own pool connection and commit
* independently, matching v6 behaviour.
* On success (connection was acquired): commit, then close.
* On error/cancel (connection was acquired): rollback (errors swallowed), then close.
* If no DB call was made: deferred is never completed → nothing to commit or close.
*/
private def withBusinessDBTransaction(io: IO[Response[IO]]): IO[Response[IO]] =
Resource.make(
IO.blocking(APIUtil.vendor.HikariDatasource.ds.getConnection())
)(conn =>
IO.blocking { try { conn.close() } catch { case _: Exception => () } }
).use { realConn =>
val proxy = RequestScopeConnection.makeProxy(realConn)
for {
_ <- RequestScopeConnection.requestProxyLocal.set(Some(proxy))
// Note: currentProxy (TTL) is NOT set here. Every DB call goes through
// RequestScopeConnection.fromFuture, which atomically sets + submits + clears
// the TTL within a single IO.defer block on the compute thread.
result <- io.guaranteeCase {
case Outcome.Succeeded(_) =>
RequestScopeConnection.requestProxyLocal.set(None) *>
IO.blocking { realConn.commit() }
case _ =>
RequestScopeConnection.requestProxyLocal.set(None) *>
IO.blocking { try { realConn.rollback() } catch { case _: Exception => () } }
}
} yield result
Deferred[IO, (Connection, Connection)].flatMap { deferred =>
// acquireOnce: idempotent across concurrent callers via the Deferred.
// The loser of the complete() race discards its own connection and awaits
// the winner's proxy so all fibers share one transaction.
val acquireOnce: IO[Connection] = for {
realConn <- IO.blocking(APIUtil.vendor.HikariDatasource.ds.getConnection())
_ <- IO.blocking { realConn.setAutoCommit(false) }
proxy = RequestScopeConnection.makeProxy(realConn)
ok <- deferred.complete((realConn, proxy))
_ <- if (!ok) IO.blocking { try { realConn.close() } catch { case _: Exception => () } }
else IO.unit
p <- deferred.get.map(_._2)
} yield p

RequestScopeConnection.requestLazyAcquire.set(Some(acquireOnce)).bracket(_ =>
io.guaranteeCase { outcome =>
deferred.tryGet.flatMap {
case None => IO.unit // no DB calls — pool unaffected
case Some((realConn, _)) =>
RequestScopeConnection.requestProxyLocal.set(None) *>
(outcome match {
case Outcome.Succeeded(_) =>
IO.blocking { realConn.commit() }
case _ =>
IO.blocking { try { realConn.rollback() } catch { case _: Exception => () } }
}) *>
IO.blocking { try { realConn.close() } catch { case _: Exception => () } }
}
}
)(_ => RequestScopeConnection.requestLazyAcquire.set(None))
}

/**
Expand Down
Loading
Loading