Skip to content
Permalink
Browse files

Merge pull request #901 from square/sye/support-multiple-transactions

VitessScaleSafetyChecks: Support multiple independent transactions at the same time
  • Loading branch information...
Syeberman committed Jun 11, 2019
2 parents 615cc7b + 77e0490 commit f57457ae5a07f3954d06ccd947f1dc328fdffef6
@@ -28,10 +28,13 @@ import java.sql.ResultSet
import java.sql.SQLException
import java.sql.SQLSyntaxErrorException
import java.sql.Timestamp
import java.util.ArrayDeque
import java.util.Locale
import javax.inject.Singleton
import javax.sql.DataSource

private val vtgateKeyspaceIdRegex = "vtgate:: keyspace_id:([^ ]+)".toRegex()

internal data class Instruction(
val Opcode: String,
val Input: Instruction?
@@ -236,7 +239,7 @@ internal class Explanation {
}
}

open class ExtendedQueryExectionListener : QueryExecutionListener, MethodExecutionListener {
open class ExtendedQueryExecutionListener : QueryExecutionListener, MethodExecutionListener {
override fun beforeMethod(executionContext: MethodExecutionContext) {
if (isStartTransaction(executionContext)) {
beforeStartTransaction()
@@ -380,8 +383,8 @@ class VitessScaleSafetyChecks(
return proxy
}

inner class FullScatterDetector : ExtendedQueryExectionListener() {
val count = ThreadLocal.withInitial { 0 }
inner class FullScatterDetector : ExtendedQueryExecutionListener() {
private val count: ThreadLocal<Int> = ThreadLocal.withInitial { 0 }

override fun beforeQuery(query: String) {
if (!transacter.isCheckEnabled(Check.FULL_SCATTER)) return
@@ -402,7 +405,7 @@ class VitessScaleSafetyChecks(
}
}

inner class TableScanDetector : ExtendedQueryExectionListener() {
inner class TableScanDetector : ExtendedQueryExecutionListener() {
private val mysqlTimeBeforeQuery: ThreadLocal<Timestamp?> =
ThreadLocal.withInitial { null }

@@ -473,18 +476,17 @@ class VitessScaleSafetyChecks(
}
}

inner class CowriteDetector : ExtendedQueryExectionListener() {
private val keyspaceIdsThisTransaction: ThreadLocal<LinkedHashSet<String>> =
ThreadLocal.withInitial { LinkedHashSet<String>() }
inner class CowriteDetector : ExtendedQueryExecutionListener() {

private val transactionDeque: ThreadLocal<ArrayDeque<LinkedHashSet<String>>> =
ThreadLocal.withInitial { ArrayDeque<LinkedHashSet<String>>() }

override fun beforeStartTransaction() {
// Connect before the query because connecting spits out a bunch of crap in the general_log
// that makes it harder for us to get to the thread id
connect()

check(keyspaceIdsThisTransaction.get().isEmpty()) {
"Transaction state has not been cleaned up, beforeEndTransaction was never executed"
}
transactionDeque.get().push(LinkedHashSet())
}

override fun afterQuery(query: String) {
@@ -493,10 +495,10 @@ class VitessScaleSafetyChecks(

val queryInDatabase = extractLastDmlQuery() ?: return

val m = "vtgate:: keyspace_id:([^ ]+)".toRegex().find(queryInDatabase) ?: return
val m = vtgateKeyspaceIdRegex.find(queryInDatabase) ?: return
val keyspaceId = m.groupValues[1]

val keyspaceIds = keyspaceIdsThisTransaction.get()
val keyspaceIds = transactionDeque.get().peek()
keyspaceIds.add(keyspaceId)

if (keyspaceIds.size > 1) {
@@ -508,12 +510,12 @@ class VitessScaleSafetyChecks(
}

override fun beforeEndTransaction() {
keyspaceIdsThisTransaction.get().clear()
transactionDeque.get().pop()
}
}

val COMMENT_PATTERN = "/\\*+[^*]*\\*+(?:[^/*][^*]*\\*+)*/".toRegex()
val DML = setOf("insert", "delete", "update")
private val COMMENT_PATTERN = "/\\*+[^*]*\\*+(?:[^/*][^*]*\\*+)*/".toRegex()
private val DML = setOf("insert", "delete", "update")

private fun isDml(query: String): Boolean {
val first = query
@@ -7,7 +7,7 @@ import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertTrue

class VitessScaleSafetyChecksTest {
class ParseQueryPlansTest {
val queryPlans = """Length: 2
"select dbmovie0_.id as id1_2_, dbmovie0_.created_at as created_2_2_, dbmovie0_.name as name3_2_, dbmovie0_.release_date as release_4_2_, dbmovie0_.updated_at as updated_5_2_ from movies as dbmovie0_ where dbmovie0_.release_date < :v1 limit :v2"
{
@@ -63,4 +63,4 @@ class VitessScaleSafetyChecksTest {
assertTrue(plans[0].isScatter)
assertFalse(plans[1].isScatter)
}
}
}
@@ -3,4 +3,4 @@ package misk.hibernate
/**
* Marker interface for persistent entities that have a bounded growth and do not require sharding.
*/
interface DbUnsharded<T : DbEntity<T>> : DbEntity<T>
interface DbUnsharded<T : DbUnsharded<T>> : DbEntity<T>
@@ -57,7 +57,7 @@ internal class RealTransacter private constructor(

override fun isCheckEnabled(check : Check): Boolean {
val session = threadLocalSession.get()
return session != null && !session.disabledChecks.contains(check)
return session == null || !session.disabledChecks.contains(check)
}

override fun <T> transaction(lambda: (session: Session) -> T): T {
@@ -26,8 +26,8 @@ class DbActor() : DbRoot<DbActor>, DbTimestampedEntity {
@Column
var birth_date: LocalDate? = null

constructor(name: String, birthDate: LocalDate?) : this() {
constructor(name: String, birthDate: LocalDate? = null) : this() {
this.name = name
this.birth_date = birthDate
}
}
}
@@ -19,21 +19,34 @@ class RawHibernateApiTest {
@Test
fun happyPath() {
// Insert some movies in a transaction.
sessionFactory.openSession().use { session ->
val transaction = session.beginTransaction()
session.save(DbMovie("Jurassic Park", LocalDate.of(1993, 6, 9)))
val jpId = sessionFactory.openSession().use { session ->
var transaction = session.beginTransaction()
val jp = DbMovie("Jurassic Park", LocalDate.of(1993, 6, 9))
session.save(jp)
val jg = DbActor("Jeff Goldblum")
session.save(jg)
session.save(DbCharacter("Ian Malcolm", jp, jg))
transaction.commit()

transaction = session.beginTransaction()
session.save(DbMovie("Star Wars", LocalDate.of(1977, 5, 25)))
transaction.commit()

return@use jp.id
}

// Query those movies without a transaction.
sessionFactory.openSession().use { session ->
val criteriaBuilder = session.entityManagerFactory.criteriaBuilder
val criteria = criteriaBuilder.createQuery(DbMovie::class.java)
val queryRoot = criteria.from(DbMovie::class.java)
criteria.where(criteriaBuilder.notEqual(queryRoot.get<String>("name"), "Star Wars"))
val criteria = criteriaBuilder.createQuery(DbCharacter::class.java)
val queryRoot = criteria.from(DbCharacter::class.java)
criteria.where(
criteriaBuilder.equal(queryRoot.get<Id<DbMovie>>("movie_id"), jpId),
criteriaBuilder.notEqual(queryRoot.get<String>("name"), "Leia Organa")
)

val resultList = session.createQuery(criteria).resultList
assertThat(resultList.map { it.name }).containsExactly("Jurassic Park")
assertThat(resultList.map { it.name }).containsExactly("Ian Malcolm")
}
}
}
@@ -9,6 +9,8 @@ import misk.jdbc.TableScanException
import misk.jdbc.uniqueLong
import misk.testing.MiskTest
import misk.testing.MiskTestModule
import org.hibernate.SessionFactory
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import java.time.LocalDate
@@ -22,15 +24,15 @@ class ScaleSafetyTest {
@MiskTestModule
val module = MoviesTestModule()

@Inject @Movies lateinit var sessionFactory: SessionFactory
@Inject @Movies lateinit var transacter: Transacter
@Inject lateinit var queryFactory: Query.Factory

@Test
fun crossShardTransactions() {
val jg = transacter.save(
DbActor("Jeff Goldblum", LocalDate.of(1952, 10, 22)))
val cf = transacter.save(
DbActor("Carrie Fisher", null))
val cf = transacter.save(DbActor("Carrie Fisher"))

val jp = transacter.save(
DbMovie("Jurassic Park", LocalDate.of(1993, 6, 9)))
@@ -62,8 +64,7 @@ class ScaleSafetyTest {
fun transactionsSpanningEntityGroupsInTheSameShard() {
val jg = transacter.save(
DbActor("Jeff Goldblum", LocalDate.of(1952, 10, 22)))
val cf = transacter.save(
DbActor("Carrie Fisher", null))
val cf = transacter.save(DbActor("Carrie Fisher"))

val jp = transacter.save(
DbMovie("Jurassic Park", LocalDate.of(1993, 6, 9)))
@@ -87,6 +88,66 @@ class ScaleSafetyTest {
}
}

@Test
fun concurrentIndependentTransactions() {
// misk.hibernate.Transacter.transaction prevents nested transactions, and with good reason.
// However, in some very specific, Misk-internal cases, org.hibernate.SessionFactory can be used
// to spin up a new, independent transaction while another is ongoing.
val jg = transacter.save(
DbActor("Jeff Goldblum", LocalDate.of(1952, 10, 22)))
val cf = transacter.save(DbActor("Carrie Fisher"))
val ld = transacter.save(DbActor("Laura Dern"))
val sn = transacter.save(DbActor("Sam Neill"))

val jp = transacter.save(
DbMovie("Jurassic Park", LocalDate.of(1993, 6, 9)))
val sw = transacter.save(
DbMovie("Star Wars: The Last Jedi", LocalDate.of(2017, 12, 15)))

transacter.transaction { session ->
session.save(DbCharacter("Dr. Ian Malcolm", session.load(jp), session.load(jg)))

// The Hibernate session can operate on a different entity group. (This would normally be
// done in some sort of library call, where the Misk session is intentionally omitted.)
hibernateTransaction { hSession ->
hSession.save(DbCharacter("Leia Organa",
hSession.load(DbMovie::class.java, sw), hSession.load(DbActor::class.java, cf)))
}
}
assertEquals(setOf("Dr. Ian Malcolm"), movieCharacterNames(jp))
assertEquals(setOf("Leia Organa"), movieCharacterNames(sw))

// Regardless, one Hibernate session still cannot operate on two entity groups.
assertThrows<CowriteException> {
hibernateTransaction { hSession ->
hSession.save(DbCharacter("Dr. Ellie Sattler",
hSession.load(DbMovie::class.java, jp), hSession.load(DbActor::class.java, ld)))
hSession.save(DbCharacter("Vice Admiral Holdo",
hSession.load(DbMovie::class.java, sw), hSession.load(DbActor::class.java, ld)))
}
}
assertEquals(setOf("Dr. Ian Malcolm"), movieCharacterNames(jp))
assertEquals(setOf("Leia Organa"), movieCharacterNames(sw))

// Further, we should still detect cowrites in a Misk session even if a Hibernate session
// is used in the middle of it, and that Hibernate session is completely independent of any
// other transaction.
assertThrows<CowriteException> {
transacter.transaction { session ->
session.save(DbCharacter("Dr. Ellie Sattler", session.load(jp), session.load(ld)))

hibernateTransaction { hSession ->
hSession.save(DbCharacter("Dr. Alan Grant",
hSession.load(DbMovie::class.java, jp), hSession.load(DbActor::class.java, sn)))
}

session.save(DbCharacter("Vice Admiral Holdo", session.load(sw), session.load(ld)))
}
}
assertEquals(setOf("Dr. Ian Malcolm", "Dr. Alan Grant"), movieCharacterNames(jp))
assertEquals(setOf("Leia Organa"), movieCharacterNames(sw))
}

@Test
fun crossShardQueriesAreDetected() {
assertThrows<FullScatterException> {
@@ -139,6 +200,27 @@ class ScaleSafetyTest {
}
}
}

private fun movieCharacterNames(sw: Id<DbMovie>) = transacter.transaction { session ->
queryFactory.newQuery<CharacterQuery>().movieId(sw).list(session).map { it.name }.toSet()
}

private fun <R> hibernateTransaction(block: (hSession: org.hibernate.Session) -> R): R =
sessionFactory.openSession().use { hSession ->
val transaction = hSession.beginTransaction()
try {
val result = block(hSession)
transaction.commit()
return result
} catch (e: Throwable) {
try {
if (transaction.isActive) transaction.rollback()
} catch (suppressed: Exception) {
e.addSuppressed(suppressed)
}
throw e
}
}
}

private fun <T : DbEntity<T>> Transacter.save(entity: T): Id<T> = transaction { it.save(entity) }

0 comments on commit f57457a

Please sign in to comment.
You can’t perform that action at this time.