Skip to content

Commit

Permalink
Adapt source-mssql to latest Kotlin converted CDK (#36772)
Browse files Browse the repository at this point in the history
  • Loading branch information
rodireich committed Apr 4, 2024
1 parent deb47ab commit 814d95c
Show file tree
Hide file tree
Showing 31 changed files with 115 additions and 120 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ Maven and Gradle will automatically reference the correct (pinned) version of th

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.29.1 | 2024-04-03 | [\#36772](https://github.com/airbytehq/airbyte/pull/36772) | Changes to make source-mssql compileable |
| 0.29.0 | 2024-04-02 | [\#36759](https://github.com/airbytehq/airbyte/pull/36759) | Build artifact publication changes and fixes. |
| 0.28.21 | 2024-04-02 | [\#36673](https://github.com/airbytehq/airbyte/pull/36673) | Change the destination message parsing to use standard java/kotlin classes. Adds logging to catch empty lines. |
| 0.28.20 | 2024-04-01 | [\#36584](https://github.com/airbytehq/airbyte/pull/36584) | Changes to make source-postgres compileable |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ object DataTypeUtils {
return dateFormat.format(date)
}

@JvmStatic
fun toISOTimeString(dateTime: LocalDateTime): String {
return DateTimeFormatter.ISO_TIME.format(dateTime.toLocalTime())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
}

@Throws(SQLException::class)
protected fun putBinary(
protected open fun putBinary(
node: ObjectNode,
columnName: String?,
resultSet: ResultSet,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource
* close the returned stream to release the database connection. Otherwise, there will be a
* connection leak.
*
* @param statementCreator create a [PreparedStatement] from a [Connection].
* @paramstatementCreator create a [PreparedStatement] from a [Connection].
* @param recordTransform transform each record of that result set into the desired type. do NOT
* just pass the [ResultSet] through. it is a stateful object will not be accessible if returned
* from recordTransform.
Expand Down Expand Up @@ -195,10 +195,10 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource
}

@Throws(SQLException::class)
fun queryMetadata(sql: String, vararg params: String): ResultSetMetaData {
fun queryMetadata(sql: String, vararg params: String): ResultSetMetaData? {
unsafeQuery(
{ c: Connection -> getPreparedStatement(sql, params, c) },
{ obj: ResultSet -> obj.metaData }
{ obj: ResultSet -> obj.metaData },
)
.use { q ->
return q.findFirst().orElse(null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ open class JdbcSourceOperations :
}

@Throws(SQLException::class)
protected fun setTimestampWithTimezone(
protected open fun setTimestampWithTimezone(
preparedStatement: PreparedStatement,
parameterIndex: Int,
value: String?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ protected constructor(@JvmField protected val driverClassName: String) : BaseCon
val POSTGRES_CONNECT_TIMEOUT_DEFAULT_DURATION: Duration = Duration.ofSeconds(10)

const val CONNECT_TIMEOUT_KEY: String = "connectTimeout"
val CONNECT_TIMEOUT_DEFAULT: Duration = Duration.ofSeconds(60)
@JvmField val CONNECT_TIMEOUT_DEFAULT: Duration = Duration.ofSeconds(60)

/**
* Retrieves connectionTimeout value from connection properties in millis, default minimum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import java.util.*

object SshHelpers {
@get:Throws(IOException::class)
@JvmStatic
val specAndInjectSsh: ConnectorSpecification?
get() = getSpecAndInjectSsh(Optional.empty())

Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.29.0
version=0.29.1
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ protected constructor(val container: C) : AutoCloseable {

@Volatile private lateinit var dslContext: DSLContext

protected val databaseId: Int = nextDatabaseId.getAndIncrement()
@JvmField protected val databaseId: Int = nextDatabaseId.getAndIncrement()
@JvmField
protected val containerId: Int =
containerUidToId!!.computeIfAbsent(container.containerId) { k: String? ->
containerUidToId!!.computeIfAbsent(container.containerId) { _: String? ->
nextContainerId!!.getAndIncrement()
}!!
private val dateFormat: DateFormat = SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")
Expand All @@ -68,6 +69,7 @@ protected constructor(val container: C) : AutoCloseable {
return retVal
}

@Suppress("UNCHECKED_CAST")
protected fun self(): T {
return this as T
}
Expand All @@ -77,19 +79,19 @@ protected constructor(val container: C) : AutoCloseable {
if (this.isInitialized) {
throw RuntimeException("TestDatabase instance is already initialized")
}
connectionProperties!![key] = value
connectionProperties[key] = value
return self()
}

/** Enqueues a SQL statement to be executed when this object is closed. */
fun onClose(fmtSql: String, vararg fmtArgs: Any?): T {
cleanupSQL!!.add(String.format(fmtSql!!, *fmtArgs))
cleanupSQL.add(String.format(fmtSql, *fmtArgs))
return self()
}

/** Executes a SQL statement after calling String.format on the arguments. */
fun with(fmtSql: String, vararg fmtArgs: Any?): T {
execSQL(Stream.of(String.format(fmtSql!!, *fmtArgs)))
execSQL(Stream.of(String.format(fmtSql, *fmtArgs)))
return self()
}

Expand All @@ -98,8 +100,8 @@ protected constructor(val container: C) : AutoCloseable {
* object. This typically entails at least a CREATE DATABASE and a CREATE USER. Also Initializes
* the [DataSource] and [DSLContext] owned by this object.
*/
fun initialized(): T? {
inContainerBootstrapCmd()!!.forEach { cmds: Stream<String> -> this.execInContainer(cmds) }
open fun initialized(): T? {
inContainerBootstrapCmd().forEach { cmds: Stream<String> -> this.execInContainer(cmds) }
this.dataSource =
DataSourceFactory.create(
userName,
Expand All @@ -117,7 +119,7 @@ protected constructor(val container: C) : AutoCloseable {
}

val isInitialized: Boolean
get() = dslContext != null
get() = ::dslContext.isInitialized

protected abstract fun inContainerBootstrapCmd(): Stream<Stream<String>>

Expand All @@ -137,7 +139,7 @@ protected constructor(val container: C) : AutoCloseable {
val userName: String
get() = withNamespace("user")

val password: String
open val password: String?
get() = "password"

fun getDataSource(): DataSource? {
Expand All @@ -154,11 +156,11 @@ protected constructor(val container: C) : AutoCloseable {
return dslContext
}

val jdbcUrl: String?
open val jdbcUrl: String?
get() =
String.format(
databaseDriver!!.urlFormatString,
container!!.host,
container.host,
container.firstMappedPort,
databaseName
)
Expand All @@ -169,7 +171,7 @@ protected constructor(val container: C) : AutoCloseable {
protected fun execSQL(sql: Stream<String>) {
try {
database!!.query<Any?> { ctx: DSLContext? ->
sql!!.forEach { statement: String? ->
sql.forEach { statement: String? ->
LOGGER!!.info("executing SQL statement {}", statement)
ctx!!.execute(statement)
}
Expand All @@ -194,7 +196,7 @@ protected constructor(val container: C) : AutoCloseable {
)
)
)
val exec = container!!.execInContainer(*cmd.toTypedArray<String?>())
val exec = container.execInContainer(*cmd.toTypedArray<String?>())
if (exec!!.exitCode == 0) {
LOGGER.info(
formatLogLine(
Expand Down Expand Up @@ -248,7 +250,7 @@ protected constructor(val container: C) : AutoCloseable {
}

override fun close() {
execSQL(cleanupSQL!!.stream())
execSQL(cleanupSQL.stream())
execInContainer(inContainerUndoBootstrapCmd())
LOGGER!!.info("closing database databaseId=$databaseId")
}
Expand All @@ -259,46 +261,47 @@ protected constructor(val container: C) : AutoCloseable {
protected val builder: ImmutableMap.Builder<Any, Any> = ImmutableMap.builder()

fun build(): JsonNode {
return Jsons.jsonNode(builder!!.build())
return Jsons.jsonNode(builder.build())
}

@Suppress("UNCHECKED_CAST")
fun self(): B {
return this as B
}

fun with(key: Any, value: Any): B {
builder!!.put(key, value)
builder.put(key, value)
return self()
}

fun withDatabase(): B {
return this.with(JdbcUtils.DATABASE_KEY, testDatabase!!.databaseName)
return this.with(JdbcUtils.DATABASE_KEY, testDatabase.databaseName)
}

fun withCredentials(): B {
return this.with(JdbcUtils.USERNAME_KEY, testDatabase!!.userName)
.with(JdbcUtils.PASSWORD_KEY, testDatabase.password)
return this.with(JdbcUtils.USERNAME_KEY, testDatabase.userName)
.with(JdbcUtils.PASSWORD_KEY, testDatabase.password!!)
}

fun withResolvedHostAndPort(): B {
return this.with(
JdbcUtils.HOST_KEY,
HostPortResolver.resolveHost(testDatabase!!.container)
HostPortResolver.resolveHost(testDatabase.container)
)
.with(JdbcUtils.PORT_KEY, HostPortResolver.resolvePort(testDatabase.container))
}

fun withHostAndPort(): B {
return this.with(JdbcUtils.HOST_KEY, testDatabase!!.container!!.host)
.with(JdbcUtils.PORT_KEY, testDatabase.container!!.firstMappedPort)
return this.with(JdbcUtils.HOST_KEY, testDatabase.container.host)
.with(JdbcUtils.PORT_KEY, testDatabase.container.firstMappedPort)
}

fun withoutSsl(): B {
open fun withoutSsl(): B {
return with(JdbcUtils.SSL_KEY, false)
}

fun withSsl(sslMode: MutableMap<Any, Any>): B {
return with(JdbcUtils.SSL_KEY, true)!!.with(JdbcUtils.SSL_MODE_KEY, sslMode)
open fun withSsl(sslMode: MutableMap<Any?, Any?>): B {
return with(JdbcUtils.SSL_KEY, true).with(JdbcUtils.SSL_MODE_KEY, sslMode)
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ class AirbyteFileOffsetBackingStore(
return offsetManager
}

@JvmStatic
fun initializeDummyStateForSnapshotPurpose(): AirbyteFileOffsetBackingStore {
val cdcWorkingDir: Path
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,7 @@ class AirbyteSchemaHistoryStorage(
private val reader: DocumentReader = DocumentReader.defaultReader()
private val writer: DocumentWriter = DocumentWriter.defaultWriter()

class SchemaHistory<T>(schema: T, isCompressed: Boolean) {
val schema: T
val isCompressed: Boolean

init {
this.schema = schema
this.isCompressed = isCompressed
}
}
data class SchemaHistory<T>(val schema: T, val isCompressed: Boolean)

fun read(): SchemaHistory<String> {
val fileSizeMB = path.toFile().length().toDouble() / (ONE_MB)
Expand Down Expand Up @@ -240,6 +232,7 @@ class AirbyteSchemaHistoryStorage(
return string.toByteArray(StandardCharsets.UTF_8).size.toDouble() / (ONE_MB)
}

@JvmStatic
fun initializeDBHistory(
schemaHistory: SchemaHistory<Optional<JsonNode>>?,
compressSchemaHistoryForState: Boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ object RecordWaitTimeUtil {
}
}

@JvmStatic
fun getFirstRecordWaitTime(config: JsonNode): Duration {
val isTest = config.has("is_test") && config["is_test"].asBoolean()
var firstRecordWaitTime = DEFAULT_FIRST_RECORD_WAIT_TIME
Expand All @@ -67,6 +68,7 @@ object RecordWaitTimeUtil {
return firstRecordWaitTime
}

@JvmStatic
fun getSubsequentRecordWaitTime(config: JsonNode): Duration {
var subsequentRecordWaitTime = DEFAULT_SUBSEQUENT_RECORD_WAIT_TIME
val isTest = config.has("is_test") && config["is_test"].asBoolean()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ abstract class AbstractJdbcSource<Datatype>(

/** Some databases need special column names in the query. */
@Throws(SQLException::class)
protected fun getWrappedColumnNames(
protected open fun getWrappedColumnNames(
database: JdbcDatabase?,
connection: Connection?,
columnNames: List<String>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import org.slf4j.LoggerFactory
object RelationalDbQueryUtils {
private val LOGGER: Logger = LoggerFactory.getLogger(RelationalDbQueryUtils::class.java)

@JvmStatic
fun getIdentifierWithQuoting(identifier: String, quoteString: String): String {
// double-quoted values within a database name or column name should be wrapped with extra
// quoteString
Expand Down Expand Up @@ -58,10 +59,12 @@ object RelationalDbQueryUtils {
}

/** @return the input identifier with quotes. */
@JvmStatic
fun enquoteIdentifier(identifier: String?, quoteString: String?): String {
return quoteString + identifier + quoteString
}

@JvmStatic
fun <Database : SqlDatabase?> queryTable(
database: Database,
sqlQuery: String?,
Expand All @@ -87,6 +90,7 @@ object RelationalDbQueryUtils {
)
}

@JvmStatic
fun logStreamSyncStatus(streams: List<ConfiguredAirbyteStream>, syncType: String?) {
if (streams.isEmpty()) {
LOGGER.info("No Streams will be synced via {}.", syncType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ object RelationalDbReadUtil {
.collect(Collectors.toList())
}

@JvmStatic
fun identifyStreamsForCursorBased(
catalog: ConfiguredAirbyteCatalog,
streamsForInitialLoad: List<ConfiguredAirbyteStream>
Expand All @@ -54,6 +55,7 @@ object RelationalDbReadUtil {
.collect(Collectors.toList())
}

@JvmStatic
fun convertNameNamespacePairFromV0(
v1NameNamespacePair: io.airbyte.protocol.models.AirbyteStreamNameNamespacePair
): AirbyteStreamNameNamespacePair {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ object StateGeneratorUtils {
* @param airbyteStateMessage A [AirbyteStateType.LEGACY] state message.
* @return A [AirbyteStateType.GLOBAL] state message.
*/
@JvmStatic
fun convertLegacyStateToGlobalState(
airbyteStateMessage: AirbyteStateMessage
): AirbyteStateMessage {
Expand Down
Loading

0 comments on commit 814d95c

Please sign in to comment.