Skip to content

Commit

Permalink
Track whether the connection changed on retries
Browse files Browse the repository at this point in the history
We've had bugs where MySQL appears to be returning stale data. The
goal here is to see if this problem is isolated to reused connections.
  • Loading branch information
swankjesse committed Jul 26, 2019
1 parent 8ed880c commit 128b16b
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 54 deletions.
72 changes: 57 additions & 15 deletions misk-hibernate/src/main/kotlin/misk/hibernate/RealTransacter.kt
Expand Up @@ -29,7 +29,7 @@ internal class RealTransacter private constructor(
private val qualifier: KClass<out Annotation>,
private val sessionFactoryProvider: Provider<SessionFactory>,
private val config: DataSourceConfig,
private val threadLocalSession: ThreadLocal<RealSession>,
private val threadLatestSession: ThreadLocal<RealSession>,
private val options: TransacterOptions,
private val queryTracingListener: QueryTracingListener,
private val tracer: Tracer?
Expand All @@ -55,11 +55,11 @@ internal class RealTransacter private constructor(
get() = sessionFactoryProvider.get()

override val inTransaction: Boolean
get() = threadLocalSession.get() != null
get() = threadLatestSession.get()?.inTransaction ?: false

override fun isCheckEnabled(check: Check): Boolean {
val session = threadLocalSession.get()
return session == null || !session.disabledChecks.contains(check)
val session = threadLatestSession.get()
return session == null || !session.inTransaction || !session.disabledChecks.contains(check)
}

override fun <T> transaction(block: (session: Session) -> T): T {
Expand All @@ -81,22 +81,30 @@ internal class RealTransacter private constructor(
while (true) {
try {
attempt++
return transactionInternal(block)
val result = transactionInternal(block)

if (attempt > 0) {
logger.info {
"retried ${qualifier.simpleName} transaction succeeded (${attemptNote(attempt)})"
}
}

return result
} catch (e: Exception) {
if (!isRetryable(e)) throw e

if (attempt >= options.maxAttempts) {
logger.info {
"${qualifier.simpleName} recoverable transaction exception " +
"(attempt: $attempt), no more attempts"
"(${attemptNote(attempt)}), no more attempts"
}
throw e
}

val sleepDuration = backoff.nextRetry()
logger.info(e) {
"${qualifier.simpleName} recoverable transaction exception " +
"(attempt: $attempt), will retry after a $sleepDuration delay"
"(${attemptNote(attempt)}), will retry after a $sleepDuration delay"
}

if (!sleepDuration.isZero) {
Expand All @@ -106,6 +114,17 @@ internal class RealTransacter private constructor(
}
}

/**
* Returns a string describing the most recent attempt. This includes whether the attempt used the
* same connection which might help in diagnosing stale data problems.
*/
private fun attemptNote(attempt: Int): String {
if (attempt == 1) return "attempt 1"
val latestSession = threadLatestSession.get()!!
if (latestSession.sameConnection) return "attempt $attempt, same connection"
return "attempt $attempt, different connection"
}

private fun <T> transactionInternal(block: (session: Session) -> T): T {
return maybeWithTracing(DB_TRANSACTION_SPAN_NAME) { transactionInternalSession(block) }
}
Expand Down Expand Up @@ -161,7 +180,7 @@ internal class RealTransacter private constructor(
qualifier,
sessionFactoryProvider,
config,
threadLocalSession,
threadLatestSession,
options,
queryTracingListener,
tracer
Expand All @@ -173,14 +192,15 @@ internal class RealTransacter private constructor(
hibernateSession = hibernateSession,
config = config,
readOnly = options.readOnly,
disabledChecks = options.disabledChecks
disabledChecks = options.disabledChecks,
predecessor = threadLatestSession.get()
)

// Note that the RealSession is closed last so that close hooks run after the thread locals and
// Hibernate Session have been released. This way close hooks can start their own transactions.
realSession.use {
hibernateSession.use {
threadLocalSession.withValue(realSession) {
useSession(realSession) {
return block(realSession)
}
}
Expand Down Expand Up @@ -221,11 +241,15 @@ internal class RealTransacter private constructor(
override val hibernateSession: org.hibernate.Session,
val config: DataSourceConfig,
private val readOnly: Boolean,
var disabledChecks: EnumSet<Check>
var disabledChecks: EnumSet<Check>,
predecessor: RealSession?
) : Session, Closeable {
private val preCommitHooks = mutableListOf<() -> Unit>()
private val postCommitHooks = mutableListOf<() -> Unit>()
private val sessionCloseHooks = mutableListOf<() -> Unit>()
private val rootConnection = hibernateSession.rootConnection
internal val sameConnection = predecessor?.rootConnection == rootConnection
internal var inTransaction = false

init {
if (readOnly) {
Expand Down Expand Up @@ -373,6 +397,22 @@ internal class RealTransacter private constructor(
disabledChecks = previous
}
}

/**
* Returns the physical JDBC connection of this session. Hibernate creates one-time-use wrappers
* around the physical connections that talk to the database. This unwraps those so we can
* tell when a connection is involved in a stale data problem.
*/
private val org.hibernate.Session.rootConnection: Connection
get() {
var result: Connection = doReturningWork { connection -> connection }
while (result.isWrapperFor(Connection::class.java)) {
val unwrapped = result.unwrap(Connection::class.java)
if (unwrapped == result) break
result = unwrapped
}
return result
}
}

private fun <T> maybeWithTracing(spanName: String, block: () -> T): T {
Expand All @@ -384,14 +424,16 @@ internal class RealTransacter private constructor(
}
}

private inline fun <T, R> ThreadLocal<T>.withValue(value: T, block: () -> R): R {
check(get() == null) { "Attempted to start a nested session" }
private inline fun <R> useSession(session: RealSession, block: () -> R): R {
val previous = threadLatestSession.get()
check(previous == null || !previous.inTransaction) { "Attempted to start a nested session" }

set(value)
threadLatestSession.set(session)
session.inTransaction = true
try {
return block()
} finally {
remove()
session.inTransaction = false
}
}
}
51 changes: 13 additions & 38 deletions misk-hibernate/src/test/kotlin/misk/hibernate/TransacterTest.kt
Expand Up @@ -9,14 +9,14 @@ import misk.hibernate.RealTransacter.Companion.DB_COMMIT_SPAN_NAME
import misk.hibernate.RealTransacter.Companion.DB_ROLLBACK_SPAN_NAME
import misk.hibernate.RealTransacter.Companion.DB_TRANSACTION_SPAN_NAME
import misk.hibernate.RealTransacter.Companion.TRANSACTER_SPAN_TAG
import misk.logging.LogCollector
import misk.testing.MiskTest
import misk.testing.MiskTestModule
import org.assertj.core.api.Assertions.assertThat
import org.hibernate.exception.ConstraintViolationException
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import java.sql.Connection
import java.time.LocalDate
import java.util.concurrent.atomic.AtomicInteger
import javax.inject.Inject
Expand All @@ -30,6 +30,7 @@ class TransacterTest {
@Inject @Movies lateinit var transacter: Transacter
@Inject lateinit var queryFactory: Query.Factory
@Inject lateinit var tracer: MockTracer
@Inject lateinit var logCollector: LogCollector

@Test
fun happyPath() {
Expand Down Expand Up @@ -484,49 +485,23 @@ class TransacterTest {
assertThat(logs).containsExactly("hook invoked")
}

@Disabled("TODO(jwilson): not sure if this is possible, but it would be nice behavior")
@Test
fun retriesRotateConnections() {
val logs = mutableListOf<String>()
val connectionsList = mutableListOf<Connection>()
fun retriesIncludeConnectionReuse() {
logCollector.takeMessages()

val callCount = AtomicInteger()
transacter.retries(3).transaction { session ->
session.useConnection { connection ->

val rootConnection = connection.rootConnection()
var index = connectionsList.indexOf(rootConnection)
if (index == -1) {
index = connectionsList.size
connectionsList += rootConnection
}

logs += "transaction on connectionsList[$index]"
}

transacter.retries(3).transaction {
if (callCount.getAndIncrement() < 2) throw RetryTransactionException()
}

assertThat(callCount.get()).isEqualTo(3)
assertThat(logs).containsExactly(
"transaction on connectionsList[0]",
"transaction on connectionsList[1]",
"transaction on connectionsList[2]"
)
}

/**
* The JDBC connections we use are one-time-use wrappers around the real connections that talk to
* the database. This gets at those connections so we can differentiate them.
*/
private fun Connection.rootConnection(): Connection {
var result = this
while (result.isWrapperFor(Connection::class.java)) {
val unwrapped = result.unwrap(Connection::class.java)
if (unwrapped == result) break
result = unwrapped
}
return result
val logs = logCollector.takeMessages(RealTransacter::class)
assertThat(logs).hasSize(3)
assertThat(logs[0]).matches("Movies recoverable transaction exception " +
"\\(attempt 1\\), will retry after a PT.*S delay")
assertThat(logs[1]).matches("Movies recoverable transaction exception " +
"\\(attempt 2, same connection\\), will retry after a PT.*S delay")
assertThat(logs[2]).matches(
"retried Movies transaction succeeded \\(attempt 3, same connection\\)")
}

private fun tracingAssertions(committed: Boolean) {
Expand Down
Expand Up @@ -61,7 +61,6 @@ internal class ClientMetricsInterceptorTest {
assertThat(client.ping(AppRequest(404)).execute().code()).isEqualTo(404)
assertThat(client.ping(AppRequest(503)).execute().code()).isEqualTo(503)


SoftAssertions.assertSoftly { softly ->
softly.assertThat(requestDuration.count("pinger.ping", "all")).isEqualTo(6)
softly.assertThat(requestDuration.count("pinger.ping", "202")).isEqualTo(1)
Expand Down

0 comments on commit 128b16b

Please sign in to comment.