Skip to content

Commit

Permalink
convert #36432 to kotlin 2/2
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Mar 26, 2024
1 parent 877d632 commit 8b8455b
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 104 deletions.
Empty file.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.db.jdbc.AirbyteRecordData
import java.sql.*


interface JdbcCompatibleSourceOperations<SourceType> : SourceOperations<ResultSet, SourceType> {
@Throws(SQLException::class)
fun convertDatabaseRowToAirbyteRecordData(queryContext: ResultSet?): AirbyteRecordData?
fun convertDatabaseRowToAirbyteRecordData(queryContext: ResultSet): AirbyteRecordData
/**
* Read from a result set, and copy the value of the column at colIndex to the Json object.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import io.airbyte.cdk.db.JdbcCompatibleSourceOperations
import io.airbyte.commons.json.Jsons
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.math.BigDecimal
import java.sql.*
import java.sql.Date
Expand All @@ -21,20 +19,23 @@ import java.time.*
import java.time.chrono.IsoEra
import java.time.format.DateTimeParseException
import java.util.*

import org.slf4j.Logger
import org.slf4j.LoggerFactory

/** Source operation skeleton for JDBC compatible databases. */
abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
JdbcCompatibleSourceOperations<Datatype> {

private val LOGGER: Logger = LoggerFactory.getLogger(AbstractJdbcCompatibleSourceOperations::class.java)
private val LOGGER: Logger =
LoggerFactory.getLogger(AbstractJdbcCompatibleSourceOperations::class.java)

@Throws(SQLException::class)
fun convertDatabaseRowToAirbyteRecordData(queryContext: ResultSet): AirbyteRecordData {
override fun convertDatabaseRowToAirbyteRecordData(queryContext: ResultSet): AirbyteRecordData {
// the first call communicates with the database. after that the result is cached.
val columnCount = queryContext.metaData.columnCount
val jsonNode = Jsons.jsonNode(emptyMap<Any, Any>()) as ObjectNode
val metaChanges: MutableList<AirbyteRecordMessageMetaChange> = ArrayList<AirbyteRecordMessageMetaChange>()
val metaChanges: MutableList<AirbyteRecordMessageMetaChange> =
ArrayList<AirbyteRecordMessageMetaChange>()

for (i in 1..columnCount) {
val columnName = queryContext.metaData.getColumnName(i)
Expand All @@ -44,10 +45,13 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
} catch (e: java.lang.Exception) {
LOGGER.info("Failed to serialize column: {}, with error {}", columnName, e.message)
metaChanges.add(
AirbyteRecordMessageMetaChange()
.withField(columnName)
.withChange(AirbyteRecordMessageMetaChange.Change.NULLED)
.withReason(AirbyteRecordMessageMetaChange.Reason.SOURCE_SERIALIZATION_ERROR))
AirbyteRecordMessageMetaChange()
.withField(columnName)
.withChange(AirbyteRecordMessageMetaChange.Change.NULLED)
.withReason(
AirbyteRecordMessageMetaChange.Reason.SOURCE_SERIALIZATION_ERROR
)
)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ interface CdcMetadataInjector<T> {
throw RuntimeException("Not Supported")
}

fun addMetaDataToRowsFetchedOutsideDebezium(record: ObjectNode?) {
throw java.lang.RuntimeException("Not Supported")
}

/**
* As part of Airbyte record we need to add the namespace (schema name)
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import io.airbyte.cdk.db.JdbcCompatibleSourceOperations
import io.airbyte.cdk.db.SqlDatabase
import io.airbyte.cdk.db.factory.DataSourceFactory.close
import io.airbyte.cdk.db.factory.DataSourceFactory.create
import io.airbyte.cdk.db.jdbc.AirbyteRecordData
import io.airbyte.cdk.db.jdbc.JdbcConstants.INTERNAL_COLUMN_NAME
import io.airbyte.cdk.db.jdbc.JdbcConstants.INTERNAL_COLUMN_SIZE
import io.airbyte.cdk.db.jdbc.JdbcConstants.INTERNAL_COLUMN_TYPE
Expand Down Expand Up @@ -42,6 +43,7 @@ import io.airbyte.cdk.integrations.source.jdbc.dto.JdbcPrivilegeDto
import io.airbyte.cdk.integrations.source.relationaldb.AbstractDbSource
import io.airbyte.cdk.integrations.source.relationaldb.CursorInfo
import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils
import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifier
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager
import io.airbyte.commons.functional.CheckedConsumer
Expand Down Expand Up @@ -100,50 +102,73 @@ abstract class AbstractJdbcSource<Datatype>(
tableName: String,
syncMode: SyncMode,
cursorField: Optional<String>
): AutoCloseableIterator<JsonNode>? {
LOGGER.info("Queueing query for table: {}", tableName)
val quoteString = this.quoteString!!
// This corresponds to the initial sync for in INCREMENTAL_MODE, where the ordering of the
// records
// matters
// as intermediate state messages are emitted (if the connector emits intermediate state).
if (syncMode == SyncMode.INCREMENTAL && stateEmissionFrequency > 0) {
val quotedCursorField =
RelationalDbQueryUtils.enquoteIdentifier(cursorField.get(), quoteString)
return RelationalDbQueryUtils.queryTable(
database,
String.format(
"SELECT %s FROM %s ORDER BY %s ASC",
RelationalDbQueryUtils.enquoteIdentifierList(columnNames, quoteString),
RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting(
schemaName,
tableName,
quoteString
),
quotedCursorField
),
tableName,
schemaName
)
} else {
// If we are in FULL_REFRESH mode, state messages are never emitted, so we don't care
// about ordering
// of the records.
return RelationalDbQueryUtils.queryTable(
database,
String.format(
"SELECT %s FROM %s",
RelationalDbQueryUtils.enquoteIdentifierList(columnNames, quoteString),
RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting(
schemaName,
tableName,
quoteString
): AutoCloseableIterator<AirbyteRecordData> {
AbstractDbSource.LOGGER.info("Queueing query for table: {}", tableName)
val airbyteStream = AirbyteStreamUtils.convertFromNameAndNamespace(tableName, schemaName)
return AutoCloseableIterators.lazyIterator<AirbyteRecordData>(
Supplier<AutoCloseableIterator<AirbyteRecordData>> {
try {
val stream =
database.unsafeQuery(
{ connection: Connection ->
AbstractDbSource.LOGGER.info(
"Preparing query for table: {}",
tableName
)
val fullTableName: String =
RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting(
schemaName,
tableName,
quoteString!!
)

val wrappedColumnNames =
getWrappedColumnNames(
database,
connection,
columnNames,
schemaName,
tableName
)
val sql =
java.lang.StringBuilder(
String.format(
"SELECT %s FROM %s",
wrappedColumnNames,
fullTableName
)
)
// if the connector emits intermediate states, the incremental query
// must be sorted by the cursor
// field
if (
syncMode == SyncMode.INCREMENTAL && stateEmissionFrequency > 0
) {
val quotedCursorField: String =
enquoteIdentifier(cursorField.get(), quoteString)
sql.append(String.format(" ORDER BY %s ASC", quotedCursorField))
}

val preparedStatement = connection.prepareStatement(sql.toString())
AbstractDbSource.LOGGER.info(
"Executing query for table {}: {}",
tableName,
preparedStatement
)
preparedStatement
},
sourceOperations::convertDatabaseRowToAirbyteRecordData
)
return@Supplier AutoCloseableIterators.fromStream<AirbyteRecordData>(
stream,
airbyteStream
)
),
tableName,
schemaName
)
}
} catch (e: SQLException) {
throw java.lang.RuntimeException(e)
}
},
airbyteStream
)
}

/**
Expand Down Expand Up @@ -433,37 +458,34 @@ abstract class AbstractJdbcSource<Datatype>(
return sourceOperations.isCursorType(type)
}

public override fun queryTableIncremental(
override fun queryTableIncremental(
database: JdbcDatabase,
columnNames: List<String>,
schemaName: String?,
tableName: String,
cursorInfo: CursorInfo,
cursorFieldType: Datatype
): AutoCloseableIterator<JsonNode>? {
LOGGER.info("Queueing query for table: {}", tableName)
): AutoCloseableIterator<AirbyteRecordData> {
AbstractDbSource.LOGGER.info("Queueing query for table: {}", tableName)
val airbyteStream = AirbyteStreamUtils.convertFromNameAndNamespace(tableName, schemaName)
return AutoCloseableIterators.lazyIterator(
{
val quoteString = this.quoteString!!
try {
val stream =
database.unsafeQuery(
CheckedFunction<Connection, PreparedStatement, SQLException?> {
connection: Connection ->
LOGGER.info("Preparing query for table: {}", tableName)
val fullTableName =
{ connection: Connection ->
AbstractDbSource.LOGGER.info(
"Preparing query for table: {}",
tableName
)
val fullTableName: String =
RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting(
schemaName,
tableName,
quoteString
)
val quotedCursorField =
RelationalDbQueryUtils.enquoteIdentifier(
cursorInfo.cursorField,
quoteString
quoteString!!
)

val quotedCursorField: String =
enquoteIdentifier(cursorInfo.cursorField, quoteString)
val operator: String
if (cursorInfo.cursorRecordCount <= 0L) {
operator = ">"
Expand All @@ -476,7 +498,7 @@ abstract class AbstractJdbcSource<Datatype>(
cursorFieldType,
cursorInfo.cursor
)
LOGGER.info(
AbstractDbSource.LOGGER.info(
"Table {} cursor count: expected {}, actual {}",
tableName,
cursorInfo.cursorRecordCount,
Expand All @@ -489,7 +511,6 @@ abstract class AbstractJdbcSource<Datatype>(
">="
}
}

val wrappedColumnNames =
getWrappedColumnNames(
database,
Expand All @@ -514,9 +535,8 @@ abstract class AbstractJdbcSource<Datatype>(
if (stateEmissionFrequency > 0) {
sql.append(String.format(" ORDER BY %s ASC", quotedCursorField))
}

val preparedStatement = connection.prepareStatement(sql.toString())
LOGGER.info(
AbstractDbSource.LOGGER.info(
"Executing query for table {}: {}",
tableName,
preparedStatement
Expand All @@ -529,12 +549,9 @@ abstract class AbstractJdbcSource<Datatype>(
)
preparedStatement
},
CheckedFunction<ResultSet, JsonNode, SQLException?> {
queryResult: ResultSet? ->
sourceOperations.rowToJson(queryResult!!)
}
sourceOperations::convertDatabaseRowToAirbyteRecordData
)
return@lazyIterator AutoCloseableIterators.fromStream<JsonNode>(
return@lazyIterator AutoCloseableIterators.fromStream<AirbyteRecordData>(
stream,
airbyteStream
)
Expand All @@ -546,6 +563,10 @@ abstract class AbstractJdbcSource<Datatype>(
)
}

protected fun getCountColumnName(): String {
return "record_count"
}

/** Some databases need special column names in the query. */
@Throws(SQLException::class)
protected fun getWrappedColumnNames(
Expand All @@ -558,9 +579,6 @@ abstract class AbstractJdbcSource<Datatype>(
return RelationalDbQueryUtils.enquoteIdentifierList(columnNames, quoteString!!)
}

protected val countColumnName: String
get() = "record_count"

@Throws(SQLException::class)
protected fun getActualCursorRecordCount(
connection: Connection,
Expand All @@ -569,7 +587,7 @@ abstract class AbstractJdbcSource<Datatype>(
cursorFieldType: Datatype,
cursor: String?
): Long {
val columnName = countColumnName
val columnName = getCountColumnName()
val cursorRecordStatement: PreparedStatement
if (cursor == null) {
val cursorRecordQuery =
Expand Down
Loading

0 comments on commit 8b8455b

Please sign in to comment.