Skip to content
Permalink
Browse files

Support Transacter#replicaRead on MySQL

This makes it API compatible with the equivalent functionality for Vitess.
  • Loading branch information...
tirsen committed Oct 9, 2019
1 parent 323ebeb commit 4a39cc5e7d9d5ddd2b18e5c8cbb956473ad2f38d
@@ -10,15 +10,16 @@ import misk.inject.asSingleton
import misk.inject.keyOf
import misk.inject.setOfType
import misk.inject.toKey
import misk.jdbc.DataSourceClusterConfig
import misk.jdbc.DataSourceConfig
import misk.jdbc.DataSourceConnector
import misk.jdbc.DataSourceDecorator
import misk.jdbc.DataSourceService
import misk.jdbc.DataSourceType
import misk.jdbc.DatabasePool
import misk.jdbc.SpanInjector
import misk.jdbc.PingDatabaseService
import misk.jdbc.RealDatabasePool
import misk.jdbc.SpanInjector
import misk.metrics.Metrics
import misk.resources.ResourceLoader
import misk.vitess.StartVitessService
@@ -52,20 +53,36 @@ private const val ROW_COUNT_WARNING_LIMIT = 2000
class HibernateModule(
private val qualifier: KClass<out Annotation>,
config: DataSourceConfig,
private val readerQualifier: KClass<out Annotation>?,
readerConfig: DataSourceConfig?,
val databasePool: DatabasePool = RealDatabasePool
) : KAbstractModule() {
val config = config.withDefaults()
val readerConfig = readerConfig?.withDefaults()

override fun configure() {
val sessionFactoryProvider = getProvider(keyOf<SessionFactory>(qualifier))
val environmentProvider: Provider<Environment> = getProvider(keyOf<Environment>())
constructor(
qualifier: KClass<out Annotation>,
config: DataSourceConfig,
databasePool: DatabasePool = RealDatabasePool
) : this(qualifier, config, null, null, databasePool)

newMultibinder<DataSourceDecorator>(qualifier)
constructor(
qualifier: KClass<out Annotation>,
readerQualifier: KClass<out Annotation>,
cluster: DataSourceClusterConfig,
databasePool: DatabasePool = RealDatabasePool
) : this(qualifier, cluster.writer, readerQualifier, cluster.reader, databasePool)

override fun configure() {
if (readerQualifier != null) {
check(readerConfig != null) {
"Reader not configured for datasource $readerQualifier"
}
}

bind<Query.Factory>().to<ReflectionQuery.Factory>()
bind<QueryLimitsConfig>()
.toInstance(QueryLimitsConfig(MAX_MAX_ROWS, ROW_COUNT_ERROR_LIMIT, ROW_COUNT_WARNING_LIMIT))
bind(keyOf<DataSourceConfig>(qualifier)).toInstance(config)

// Bind StartVitessService.
install(ServiceModule<StartVitessService>(qualifier))
@@ -76,44 +93,21 @@ class HibernateModule(
}
}).asSingleton()

// Bind PingDatabaseService.
bind(keyOf<PingDatabaseService>(qualifier)).toProvider(Provider {
PingDatabaseService(config, environmentProvider.get())
}).asSingleton()
// TODO(rhall): depending on Vitess is a hack to simulate Vitess has already been started in the
// env. This is to remove flakiness in tests that are not waiting until Vitess is ready.
// This should be replaced with an ExternalDependency that manages vitess.
install(ServiceModule<PingDatabaseService>(qualifier)
.dependsOn<StartVitessService>(qualifier))
bindDataSource(qualifier, config, true)
if (readerQualifier != null && readerConfig != null) {
bindDataSource(readerQualifier, readerConfig, false)
}

// Bind DataSourceService.
val dataSourceDecoratorsKey = setOfType(DataSourceDecorator::class).toKey(qualifier)
val dataSourceDecoratorsProvider = getProvider(dataSourceDecoratorsKey)
bind(keyOf<DataSource>(qualifier))
.toProvider(keyOf<DataSourceService>(qualifier))
.asSingleton()
bind(keyOf<DataSourceService>(qualifier)).toProvider(object : Provider<DataSourceService> {
@com.google.inject.Inject(optional = true) var metrics: Metrics? = null
override fun get() = DataSourceService(
qualifier = qualifier,
baseConfig = config,
environment = environmentProvider.get(),
dataSourceDecorators = dataSourceDecoratorsProvider.get(),
databasePool = databasePool,
metrics = metrics
)
}).asSingleton()
val dataSourceServiceProvider = getProvider(keyOf<DataSourceService>(qualifier))
bind(keyOf<DataSourceConnector>(qualifier)).toProvider(dataSourceServiceProvider)
val connectorProvider = getProvider(keyOf<DataSourceConnector>(qualifier))
install(ServiceModule<DataSourceService>(qualifier)
.dependsOn<PingDatabaseService>(qualifier))
newMultibinder<DataSourceDecorator>(qualifier)

// Bind SchemaMigratorService.
val transacterKey = Transacter::class.toKey(qualifier)
val transacterProvider = getProvider(transacterKey)
val schemaMigratorKey = SchemaMigrator::class.toKey(qualifier)
val schemaMigratorProvider = getProvider(schemaMigratorKey)
val transacterKey = Transacter::class.toKey(qualifier)
val transacterProvider = getProvider(transacterKey)
val connectorProvider = getProvider(keyOf<DataSourceConnector>(qualifier))
val sessionFactoryProvider = getProvider(keyOf<SessionFactory>(qualifier))
val readerSessionFactoryProvider =
if (readerQualifier != null) getProvider(keyOf<SessionFactory>(readerQualifier)) else null

bind(schemaMigratorKey).toProvider(object : Provider<SchemaMigrator> {
@Inject lateinit var resourceLoader: ResourceLoader
@@ -124,17 +118,6 @@ class HibernateModule(
connector = connectorProvider.get()
)
}).asSingleton()
bind(transacterKey).toProvider(object : Provider<Transacter> {
@com.google.inject.Inject(optional = true) val tracer: Tracer? = null
@Inject lateinit var queryTracingListener: QueryTracingListener
override fun get(): RealTransacter = RealTransacter(
qualifier = qualifier,
sessionFactoryProvider = sessionFactoryProvider,
config = config,
queryTracingListener = queryTracingListener,
tracer = tracer
)
}).asSingleton()

val schemaMigratorServiceKey = keyOf<SchemaMigratorService>(qualifier)
bind(schemaMigratorServiceKey)
@@ -147,16 +130,101 @@ class HibernateModule(
connectorProvider = connectorProvider
)
}).asSingleton()
multibind<HealthCheck>().to(schemaMigratorServiceKey)

install(ServiceModule<SchemaMigratorService>(qualifier)
.dependsOn<DataSourceService>(qualifier))

multibind<HealthCheck>().to(schemaMigratorServiceKey)

bind(transacterKey).toProvider(object : Provider<Transacter> {
@com.google.inject.Inject(optional = true) val tracer: Tracer? = null
@Inject lateinit var queryTracingListener: QueryTracingListener
override fun get(): RealTransacter = RealTransacter(
qualifier = qualifier,
sessionFactoryProvider = sessionFactoryProvider,
readerSessionFactoryProvider = readerSessionFactoryProvider,
config = config,
queryTracingListener = queryTracingListener,
tracer = tracer
)
}).asSingleton()

// Install other modules.
install(object : HibernateEntityModule(qualifier) {
override fun configureHibernate() {
bindListener(EventType.PRE_INSERT).to<TimestampListener>()
bindListener(EventType.PRE_UPDATE).to<TimestampListener>()
bindListener(EventType.PRE_INSERT).to<QueryTracingListener>()
bindListener(EventType.POST_INSERT).to<QueryTracingListener>()
bindListener(EventType.PRE_UPDATE).to<QueryTracingListener>()
bindListener(EventType.POST_UPDATE).to<QueryTracingListener>()
bindListener(EventType.PRE_DELETE).to<QueryTracingListener>()
bindListener(EventType.POST_DELETE).to<QueryTracingListener>()
}
})

install(ExceptionMapperModule
.create<RetryTransactionException, RetryTransactionExceptionMapper>())
install(ExceptionMapperModule
.create<ConstraintViolationException, ConstraintViolationExceptionMapper>())
install(ExceptionMapperModule
.create<OptimisticLockException, OptimisticLockExceptionMapper>())
}

private fun bindDataSource(
qualifier: KClass<out Annotation>,
config: DataSourceConfig,
isWriter: Boolean
) {

// These items are configured on the writer qualifier only
val entitiesProvider = getProvider(setOfType(HibernateEntity::class).toKey(this.qualifier))
val dataSourceDecoratorsKey = setOfType(DataSourceDecorator::class).toKey(this.qualifier)
val eventListenersProvider = getProvider(setOfType(ListenerRegistration::class).toKey(this.qualifier))

val environmentProvider: Provider<Environment> = getProvider(keyOf<Environment>())
val sessionFactoryProvider = getProvider(keyOf<SessionFactory>(qualifier))

bind(keyOf<DataSourceConfig>(qualifier)).toInstance(config)

// Bind PingDatabaseService.
bind(keyOf<PingDatabaseService>(qualifier)).toProvider(Provider {
PingDatabaseService(config, environmentProvider.get())
}).asSingleton()
// TODO(rhall): depending on Vitess is a hack to simulate Vitess has already been started in the
// env. This is to remove flakiness in tests that are not waiting until Vitess is ready.
// This should be replaced with an ExternalDependency that manages vitess.
// TODO(jontirsen): I don't think this is needed anymore...
install(ServiceModule<PingDatabaseService>(qualifier)
.dependsOn<StartVitessService>(this.qualifier))

// Bind DataSourceService.
val dataSourceDecoratorsProvider = getProvider(dataSourceDecoratorsKey)
bind(keyOf<DataSource>(qualifier))
.toProvider(keyOf<DataSourceService>(qualifier))
.asSingleton()
bind(keyOf<DataSourceService>(qualifier)).toProvider(object : Provider<DataSourceService> {
@com.google.inject.Inject(optional = true) var metrics: Metrics? = null
override fun get(): DataSourceService {
return DataSourceService(
qualifier = qualifier,
baseConfig = config,
environment = environmentProvider.get(),
dataSourceDecorators = dataSourceDecoratorsProvider.get(),
databasePool = databasePool,
// TODO provide metrics to the reader pool but need a different metric key prefix
metrics = if (isWriter) metrics else null
)
}
}).asSingleton()
val dataSourceServiceProvider = getProvider(keyOf<DataSourceService>(qualifier))
bind(keyOf<DataSourceConnector>(qualifier)).toProvider(dataSourceServiceProvider)
install(ServiceModule<DataSourceService>(qualifier)
.dependsOn<PingDatabaseService>(qualifier))

val sessionFactoryServiceProvider = getProvider(keyOf<SessionFactoryService>(qualifier))

// Bind SessionFactoryService as implementation of TransacterService.
val entitiesProvider = getProvider(setOfType(HibernateEntity::class).toKey(qualifier))
val eventListenersProvider =
getProvider(setOfType(ListenerRegistration::class).toKey(qualifier))
val hibernateInjectorAccessProvider = getProvider(HibernateInjectorAccess::class.java)
val dataSourceProvider = getProvider(keyOf<DataSource>(qualifier))

@@ -174,36 +242,28 @@ class HibernateModule(
listenerRegistrations = eventListenersProvider.get()
)
}).asSingleton()
install(ServiceModule<TransacterService>(qualifier)
.enhancedBy<SchemaMigratorService>(qualifier)
.dependsOn<DataSourceService>(qualifier))

if (isWriter) {
install(ServiceModule<TransacterService>(qualifier)
.enhancedBy<SchemaMigratorService>(qualifier)
.dependsOn<DataSourceService>(qualifier))
} else {
install(ServiceModule<TransacterService>(qualifier)
.dependsOn<DataSourceService>(qualifier))
}

if (config.type == DataSourceType.VITESS_MYSQL) {
val jaegerSpanInjectorDecoratorKey = SpanInjector::class.toKey(qualifier)
bind(jaegerSpanInjectorDecoratorKey)
.toProvider(object : Provider<SpanInjector> {
@com.google.inject.Inject(optional = true)
var tracer: Tracer? = null
.toProvider(object : Provider<SpanInjector> {
@com.google.inject.Inject(optional = true)
var tracer: Tracer? = null

override fun get(): SpanInjector =
SpanInjector(tracer, config)
}).asSingleton()
override fun get(): SpanInjector =
SpanInjector(tracer, config)
}).asSingleton()
}

// Install other modules.
install(object : HibernateEntityModule(qualifier) {
override fun configureHibernate() {
bindListener(EventType.PRE_INSERT).to<TimestampListener>()
bindListener(EventType.PRE_UPDATE).to<TimestampListener>()
bindListener(EventType.PRE_INSERT).to<QueryTracingListener>()
bindListener(EventType.POST_INSERT).to<QueryTracingListener>()
bindListener(EventType.PRE_UPDATE).to<QueryTracingListener>()
bindListener(EventType.POST_UPDATE).to<QueryTracingListener>()
bindListener(EventType.PRE_DELETE).to<QueryTracingListener>()
bindListener(EventType.POST_DELETE).to<QueryTracingListener>()
}
})

val healthCheckKey = keyOf<HealthCheck>(qualifier)
bind(healthCheckKey)
.toProvider(object : Provider<HibernateHealthCheck> {
@@ -214,12 +274,5 @@ class HibernateModule(
})
.asSingleton()
multibind<HealthCheck>().to(healthCheckKey)

install(ExceptionMapperModule
.create<RetryTransactionException, RetryTransactionExceptionMapper>())
install(ExceptionMapperModule
.create<ConstraintViolationException, ConstraintViolationExceptionMapper>())
install(ExceptionMapperModule
.create<OptimisticLockException, OptimisticLockExceptionMapper>())
}
}

0 comments on commit 4a39cc5

Please sign in to comment.
You can’t perform that action at this time.