Skip to content
Permalink
Browse files

Access Vitess using the MySQL driver (#1144)

Adds a new VITESS_MYSQL data source type which accesses Vitess using the
mysql protocol and the MySQL JDBC driver
  • Loading branch information...
tirsen committed Aug 13, 2019
1 parent dd8ed62 commit 4034251dd478983ebeefbb30f6a0c962d11a8454
@@ -9,10 +9,10 @@ import misk.inject.toKey
import misk.jdbc.DataSourceConfig
import misk.jdbc.DataSourceConnector
import misk.jdbc.DataSourceDecorator
import misk.jdbc.DataSourceService
import misk.jdbc.DataSourceType
import misk.jdbc.TruncateTablesService
import misk.jdbc.VitessScaleSafetyChecks
import misk.time.ForceUtcTimeZoneService
import misk.vitess.StartVitessService
import okhttp3.OkHttpClient
import javax.inject.Provider
@@ -36,12 +36,14 @@ class HibernateTestingModule(
private val shutDownStatements: List<String> = listOf()
) : KAbstractModule() {
override fun configure() {
install(ServiceModule<ForceUtcTimeZoneService>())

val truncateTablesServiceKey = TruncateTablesService::class.toKey(qualifier)

val transacterKey = Transacter::class.toKey(qualifier)
val transacterProvider = getProvider(transacterKey)

if ((config == null || config.type == DataSourceType.VITESS)) {
if (config == null || config.type == DataSourceType.VITESS || config.type == DataSourceType.VITESS_MYSQL) {
bindVitessChecks(transacterProvider)
}

@@ -53,7 +53,7 @@ class TruncateTablesService(
DataSourceType.HSQLDB -> {
"SELECT TABLE_NAME FROM INFORMATION_SCHEMA.SYSTEM_TABLES WHERE TABLE_TYPE='TABLE'"
}
DataSourceType.VITESS -> {
DataSourceType.VITESS, DataSourceType.VITESS_MYSQL -> {
"SHOW VSCHEMA TABLES"
}
}
@@ -369,7 +369,7 @@ class VitessScaleSafetyChecks(
private var vtgate: Connection? = null

override fun decorate(dataSource: DataSource): DataSource {
if (config.type != DataSourceType.VITESS) return dataSource
if (config.type != DataSourceType.VITESS && config.type != DataSourceType.VITESS_MYSQL) return dataSource

val proxy = ProxyDataSource(dataSource)
proxy.proxyConfig = ProxyConfig.Builder()
@@ -451,6 +451,9 @@ fun Connection.isVitess(): Boolean {
if (metaData.driverName.startsWith("Vitess")) {
return true
}
if (metaData.databaseProductVersion.endsWith("Vitess")) {
return true
}
// If we're using the MySQL Driver we can check if the underlying connection
// uses some Vitess specific syntax
try {
@@ -22,7 +22,8 @@ class SchemaMigratorService internal constructor(
val schemaMigrator = schemaMigratorProvider.get()
val connector = connectorProvider.get()
if (environment == Environment.TESTING || environment == Environment.DEVELOPMENT) {
if (connector.config().type != DataSourceType.VITESS) {
val type = connector.config().type
if (type != DataSourceType.VITESS && type != DataSourceType.VITESS_MYSQL) {
val appliedMigrations = schemaMigrator.initialize()
migrationState = schemaMigrator.applyAll("SchemaMigratorService", appliedMigrations)
} else {
@@ -18,11 +18,13 @@ enum class DataSourceType(
hibernateDialect = "org.hibernate.dialect.H2Dialect"
),
VITESS(
// TODO: Switch back to mysql protocol when this issue is fixed: https://github.com/vitessio/vitess/issues/4100
// Find the correct buildJdbcUrl and port in the git history
driverClassName = "io.vitess.jdbc.VitessDriver",
hibernateDialect = "misk.hibernate.VitessDialect"
),
VITESS_MYSQL(
driverClassName = MYSQL.driverClassName,
hibernateDialect = "misk.hibernate.VitessDialect"
),
}

/** Configuration element for an individual datasource */
@@ -63,6 +65,13 @@ data class DataSourceConfig(
database = database ?: ""
)
}
DataSourceType.VITESS_MYSQL -> {
copy(
port = port ?: 27003,
host = host ?: "127.0.0.1",
database = database ?: ""
)
}
DataSourceType.HSQLDB -> {
this
}
@@ -88,12 +97,19 @@ data class DataSourceConfig(
}

return when (type) {
DataSourceType.MYSQL -> {
DataSourceType.MYSQL, DataSourceType.VITESS_MYSQL -> {
var queryParams = "?useLegacyDatetimeCode=false"
if (env == Environment.TESTING || env == Environment.DEVELOPMENT) {
queryParams += "&createDatabaseIfNotExist=true"
}

if (type == DataSourceType.VITESS_MYSQL) {
// TODO(jontirsen): Try turning on server side prepared statements again when this issue
// has been fixed: https://github.com/vitessio/vitess/issues/5075
queryParams += "&useServerPrepStmts=false"
queryParams += "&useUnicode=true"
}

var trustStoreUrl: String? = null
if (!config.trust_certificate_key_store_path.isNullOrBlank()) {
trustStoreUrl = "file://${config.trust_certificate_key_store_path}"
@@ -63,7 +63,7 @@ class DataSourceService(
hikariConfig.connectionTimeout = config.connection_timeout.toMillis()
hikariConfig.maxLifetime = config.connection_max_lifetime.toMillis()

if (config.type == DataSourceType.MYSQL || config.type == DataSourceType.VITESS) {
if (config.type == DataSourceType.MYSQL || config.type == DataSourceType.VITESS || config.type == DataSourceType.VITESS_MYSQL) {
hikariConfig.minimumIdle = 5
if (config.type == DataSourceType.MYSQL) {
hikariConfig.connectionInitSql = "SET time_zone = '+00:00'"
@@ -73,7 +73,11 @@ class DataSourceService(
hikariConfig.dataSourceProperties["cachePrepStmts"] = "true"
hikariConfig.dataSourceProperties["prepStmtCacheSize"] = "250"
hikariConfig.dataSourceProperties["prepStmtCacheSqlLimit"] = "2048"
hikariConfig.dataSourceProperties["useServerPrepStmts"] = "true"
if (config.type == DataSourceType.MYSQL) {
// TODO(jontirsen): Try turning on server side prepared statements again when this issue
// has been fixed: https://github.com/vitessio/vitess/issues/5075
hikariConfig.dataSourceProperties["useServerPrepStmts"] = "true"
}
hikariConfig.dataSourceProperties["useLocalSessionState"] = "true"
hikariConfig.dataSourceProperties["rewriteBatchedStatements"] = "true"
hikariConfig.dataSourceProperties["cacheResultSetMetadata"] = "true"
@@ -28,21 +28,15 @@ class PingDatabaseService @Inject constructor(
val dataSource = DriverDataSource(
jdbcUrl, config.type.driverClassName, Properties(), config.username, config.password)
retry(10, ExponentialBackoff(Duration.ofMillis(20), Duration.ofMillis(1000))) {
val connection = try {
dataSource.connect()
} catch (e: Exception) {
logger.error(e) { "failed to get a data source connection" }
throw RuntimeException("failed to get a data source connection $jdbcUrl", e)
}
try {
connection.use { c ->
dataSource.connect().use { c ->
check(c.createStatement().use { s ->
s.executeQuery("SELECT 1 FROM dual").uniqueInt()
} == 1)
// During cluster start up we sometimes have an empty list of shards so lets also
// wait until the shards are loaded (this is generally only an issue during tests)
if (connection.isVitess()) {
check(connection.createStatement().use { s ->
if (c.isVitess()) {
check(c.createStatement().use { s ->
s.executeQuery("SHOW VITESS_SHARDS").map { rs -> rs.getString(1) }
}.isNotEmpty())
}
@@ -58,6 +58,7 @@ class VitessCluster(
val moshi: Moshi = Moshi.Builder().build()
) {
val schemaDir: Path
val configDir: Path

init {
if (config.vitess_schema_dir != null) {
@@ -85,6 +86,15 @@ class VitessCluster(
schemaDir.toFile().deleteRecursively()
})
}

// Copy out all the resources from the current package
// We use the my.cnf configuration file to configure MySQL (e.g. the default time zone)
configDir = Paths.get("/tmp/vitess_conf_${System.currentTimeMillis()}")
Files.createDirectories(configDir)
resourceLoader.copyTo("classpath:/misk/vitess", configDir)
Runtime.getRuntime().addShutdownHook(thread(start = false) {
configDir.toFile().deleteRecursively()
})
}

val keyspaceAdapter = moshi.adapter<Keyspace>()
@@ -107,7 +117,7 @@ class VitessCluster(
fun openMysqlConnection() = mysqlDataSource().connection

private fun dataSource(): DriverDataSource {
val jdbcUrl = config.buildJdbcUrl(TESTING)
val jdbcUrl = config.withDefaults().buildJdbcUrl(TESTING)
return DriverDataSource(
jdbcUrl, config.type.driverClassName, Properties(), config.username, config.password)
}
@@ -154,10 +164,19 @@ class DockerVitessCluster(
val shardCounts = keyspaces.values.map { it.shardCount() }.joinToString(",")

val schemaVolume = Volume("/vt/src/vitess.io/vitess/schema")
val confVolume = Volume("/vt/src/vitess.io/vitess/config/miskcnf")
val httpPort = ExposedPort.tcp(cluster.httpPort)
if (cluster.config.port != null && cluster.config.port != cluster.grpcPort) {
throw RuntimeException(
"Config port ${cluster.config.port} has to match Vitess Docker container: ${cluster.grpcPort}")
if (cluster.config.type == DataSourceType.VITESS) {
if (cluster.config.port != null && cluster.config.port != cluster.grpcPort) {
throw RuntimeException(
"Config port ${cluster.config.port} has to match Vitess Docker container: ${cluster.grpcPort}")
}
}
if (cluster.config.type == DataSourceType.VITESS_MYSQL) {
if (cluster.config.port != null && cluster.config.port != cluster.vtgateMysqlPort) {
throw RuntimeException(
"Config port ${cluster.config.port} has to match Vitess Docker container: ${cluster.grpcPort}")
}
}
val grpcPort = ExposedPort.tcp(cluster.grpcPort)
val mysqlPort = ExposedPort.tcp(cluster.mysqlPort)
@@ -180,7 +199,11 @@ class DockerVitessCluster(
// Increase the transaction timeout so you can have a breakpoint
// inside a transaction without it timing out
"-queryserver-config-transaction-timeout=${Duration.ofHours(24).toMillis()}",
"-extra_my_cnf=/vt/src/vitess.io/vitess/config/mycnf/rbr.cnf",
"-extra_my_cnf=" +
listOf(
"/vt/src/vitess.io/vitess/config/mycnf/rbr.cnf",
"/vt/src/vitess.io/vitess/config/miskcnf/misk.cnf"
).joinToString(":"),
"-keyspaces=$keyspacesArg",
"-num_shards=$shardCounts"
)
@@ -209,8 +232,11 @@ class DockerVitessCluster(
"Starting Vitess cluster with command: ${cmd.joinToString(" ")}")
containerId = docker.createContainerCmd(VITESS_IMAGE)
.withCmd(cmd.toList())
.withVolumes(schemaVolume)
.withBinds(Bind(cluster.schemaDir.toAbsolutePath().toString(), schemaVolume))
.withVolumes(schemaVolume, confVolume)
.withBinds(
Bind(cluster.schemaDir.toAbsolutePath().toString(), schemaVolume),
Bind(cluster.configDir.toAbsolutePath().toString(), confVolume)
)
.withExposedPorts(httpPort, grpcPort, mysqlPort, vtgateMysqlPort)
.withPortBindings(ports)
.withTty(true)
@@ -236,7 +262,7 @@ class DockerVitessCluster(
}

private fun waitUntilHealthy() {
retry(10, ExponentialBackoff(Duration.ofMillis(20), Duration.ofMillis(1000))) {
retry(10, ExponentialBackoff(Duration.ofMillis(20), Duration.ofMillis(5000))) {
cluster.openVtgateConnection().use { c ->
try {
val result =
@@ -351,7 +377,9 @@ class StartVitessService(

fun cluster() = cluster?.cluster

fun shouldRunVitess() = config.type == DataSourceType.VITESS && (environment == TESTING || environment == DEVELOPMENT)
fun shouldRunVitess() =
(config.type == DataSourceType.VITESS || config.type == DataSourceType.VITESS_MYSQL) &&
(environment == TESTING || environment == DEVELOPMENT)

override fun shutDown() {
}
@@ -8,13 +8,14 @@ import misk.environment.Environment
import misk.environment.EnvironmentModule
import misk.inject.KAbstractModule
import misk.jdbc.DataSourceConfig
import misk.jdbc.DataSourceType
import misk.logging.LogCollectorModule
import misk.testing.MockTracingBackendModule
import misk.time.FakeClockModule

/** This module creates movies, actors, and characters tables for several Hibernate tests. */
class MoviesTestModule(
private val useVitess: Boolean = true
private val type: DataSourceType = DataSourceType.VITESS_MYSQL
) : KAbstractModule() {
override fun configure() {
install(LogCollectorModule())
@@ -33,14 +34,17 @@ class MoviesTestModule(
}

private fun selectDataSourceConfig(config: MoviesConfig): DataSourceConfig {
return if (useVitess)
config.data_source
else
config.mysql_data_source
return when (type) {
DataSourceType.VITESS -> config.vitess_data_source
DataSourceType.VITESS_MYSQL -> config.vitess_mysql_data_source
DataSourceType.MYSQL -> config.mysql_data_source
DataSourceType.HSQLDB -> throw RuntimeException("Not supported (yet?)")
}
}

data class MoviesConfig(
val data_source: DataSourceConfig,
val vitess_data_source: DataSourceConfig,
val vitess_mysql_data_source: DataSourceConfig,
val mysql_data_source: DataSourceConfig
) : Config
}
@@ -1,20 +1,20 @@
package misk.hibernate

import misk.jdbc.DataSourceType
import misk.testing.MiskTest
import misk.testing.MiskTestModule
import misk.time.FakeClock
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import java.time.Instant
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.ZoneId
import java.time.ZoneOffset
import java.util.concurrent.TimeUnit
import javax.inject.Inject

@MiskTest(startService = true)
class TimestampListenerTest {
@MiskTestModule
val module = MoviesTestModule()

abstract class TimestampListenerTest {
@Inject @Movies lateinit var transacter: Transacter
@Inject lateinit var queryFactory: Query.Factory
@Inject lateinit var clock: FakeClock
@@ -67,11 +67,29 @@ class TimestampListenerTest {
s.setLong(1, movie.id.id)
s.executeQuery().use { rs ->
rs.next()
assertThat(rs.getTimestamp(1).toLocalDateTime().toInstant(ZoneOffset.UTC))
.isEqualTo(clock.instant())
assertThat(rs.getTimestamp(1).time)
.isEqualTo(clock.instant().toEpochMilli())
}
}
}
}
}
}

@MiskTest(startService = true)
class MySQLTimestampListenerTest : TimestampListenerTest() {
@MiskTestModule
val module = MoviesTestModule(DataSourceType.MYSQL)
}

@MiskTest(startService = true)
class VitessMySQLTimestampListenerTest : TimestampListenerTest() {
@MiskTestModule
val module = MoviesTestModule(DataSourceType.VITESS_MYSQL)
}

@MiskTest(startService = true)
class VitessTimestampListenerTest : TimestampListenerTest() {
@MiskTestModule
val module = MoviesTestModule(DataSourceType.VITESS)
}
@@ -9,6 +9,7 @@ import misk.hibernate.RealTransacter.Companion.DB_COMMIT_SPAN_NAME
import misk.hibernate.RealTransacter.Companion.DB_ROLLBACK_SPAN_NAME
import misk.hibernate.RealTransacter.Companion.DB_TRANSACTION_SPAN_NAME
import misk.hibernate.RealTransacter.Companion.TRANSACTER_SPAN_TAG
import misk.jdbc.DataSourceType
import misk.logging.LogCollector
import misk.testing.MiskTest
import misk.testing.MiskTestModule
@@ -25,7 +26,7 @@ import kotlin.test.assertFailsWith
@MiskTest(startService = true)
class TransacterTest {
@MiskTestModule
val module = MoviesTestModule()
val module = MoviesTestModule(DataSourceType.VITESS_MYSQL)

@Inject @Movies lateinit var transacter: Transacter
@Inject lateinit var queryFactory: Query.Factory
@@ -12,6 +12,7 @@ import misk.hibernate.Transacter
import misk.hibernate.createInSameShard
import misk.hibernate.shard
import misk.hibernate.transaction
import misk.jdbc.DataSourceType
import misk.testing.MiskTest
import misk.testing.MiskTestModule
import org.assertj.core.api.Assertions.assertThat
@@ -23,7 +24,7 @@ import javax.inject.Inject
@MiskTest(startService = true)
class BulkShardMigratorMySqlTest {
@MiskTestModule
val module = MoviesTestModule(useVitess = false)
val module = MoviesTestModule(DataSourceType.MYSQL)

@Inject @Movies private lateinit var transacter: Transacter
@Inject @Movies lateinit var sessionFactory: SessionFactory

0 comments on commit 4034251

Please sign in to comment.
You can’t perform that action at this time.