Skip to content

Commit

Permalink
convert #36432 to kotlin 2/2
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Mar 27, 2024
1 parent 6d91dbc commit 0972a11
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import java.util.*
import org.slf4j.Logger
import org.slf4j.LoggerFactory

import org.slf4j.Logger
import org.slf4j.LoggerFactory

/** Source operation skeleton for JDBC compatible databases. */
abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
JdbcCompatibleSourceOperations<Datatype> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ interface CdcMetadataInjector<T> {
throw RuntimeException("Not Supported")
}

fun addMetaDataToRowsFetchedOutsideDebezium(record: ObjectNode?) {
throw java.lang.RuntimeException("Not Supported")
}

/**
* As part of Airbyte record we need to add the namespace (schema name)
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import io.airbyte.cdk.db.JdbcCompatibleSourceOperations
import io.airbyte.cdk.db.SqlDatabase
import io.airbyte.cdk.db.factory.DataSourceFactory.close
import io.airbyte.cdk.db.factory.DataSourceFactory.create
import io.airbyte.cdk.db.jdbc.AirbyteRecordData
import io.airbyte.cdk.db.jdbc.JdbcConstants.INTERNAL_COLUMN_NAME
import io.airbyte.cdk.db.jdbc.JdbcConstants.INTERNAL_COLUMN_SIZE
import io.airbyte.cdk.db.jdbc.JdbcConstants.INTERNAL_COLUMN_TYPE
Expand Down Expand Up @@ -42,6 +43,7 @@ import io.airbyte.cdk.integrations.source.jdbc.dto.JdbcPrivilegeDto
import io.airbyte.cdk.integrations.source.relationaldb.AbstractDbSource
import io.airbyte.cdk.integrations.source.relationaldb.CursorInfo
import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils
import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifier
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager
import io.airbyte.commons.functional.CheckedConsumer
Expand Down Expand Up @@ -100,50 +102,73 @@ abstract class AbstractJdbcSource<Datatype>(
tableName: String,
syncMode: SyncMode,
cursorField: Optional<String>
): AutoCloseableIterator<JsonNode>? {
LOGGER.info("Queueing query for table: {}", tableName)
val quoteString = this.quoteString!!
// This corresponds to the initial sync for in INCREMENTAL_MODE, where the ordering of the
// records
// matters
// as intermediate state messages are emitted (if the connector emits intermediate state).
if (syncMode == SyncMode.INCREMENTAL && stateEmissionFrequency > 0) {
val quotedCursorField =
RelationalDbQueryUtils.enquoteIdentifier(cursorField.get(), quoteString)
return RelationalDbQueryUtils.queryTable(
database,
String.format(
"SELECT %s FROM %s ORDER BY %s ASC",
RelationalDbQueryUtils.enquoteIdentifierList(columnNames, quoteString),
RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting(
schemaName,
tableName,
quoteString
),
quotedCursorField
),
tableName,
schemaName
)
} else {
// If we are in FULL_REFRESH mode, state messages are never emitted, so we don't care
// about ordering
// of the records.
return RelationalDbQueryUtils.queryTable(
database,
String.format(
"SELECT %s FROM %s",
RelationalDbQueryUtils.enquoteIdentifierList(columnNames, quoteString),
RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting(
schemaName,
tableName,
quoteString
): AutoCloseableIterator<AirbyteRecordData> {
AbstractDbSource.LOGGER.info("Queueing query for table: {}", tableName)
val airbyteStream = AirbyteStreamUtils.convertFromNameAndNamespace(tableName, schemaName)
return AutoCloseableIterators.lazyIterator<AirbyteRecordData>(
Supplier<AutoCloseableIterator<AirbyteRecordData>> {
try {
val stream =
database.unsafeQuery(
{ connection: Connection ->
AbstractDbSource.LOGGER.info(
"Preparing query for table: {}",
tableName
)
val fullTableName: String =
RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting(
schemaName,
tableName,
quoteString!!
)

val wrappedColumnNames =
getWrappedColumnNames(
database,
connection,
columnNames,
schemaName,
tableName
)
val sql =
java.lang.StringBuilder(
String.format(
"SELECT %s FROM %s",
wrappedColumnNames,
fullTableName
)
)
// if the connector emits intermediate states, the incremental query
// must be sorted by the cursor
// field
if (
syncMode == SyncMode.INCREMENTAL && stateEmissionFrequency > 0
) {
val quotedCursorField: String =
enquoteIdentifier(cursorField.get(), quoteString)
sql.append(String.format(" ORDER BY %s ASC", quotedCursorField))
}

val preparedStatement = connection.prepareStatement(sql.toString())
AbstractDbSource.LOGGER.info(
"Executing query for table {}: {}",
tableName,
preparedStatement
)
preparedStatement
},
sourceOperations::convertDatabaseRowToAirbyteRecordData
)
return@Supplier AutoCloseableIterators.fromStream<AirbyteRecordData>(
stream,
airbyteStream
)
),
tableName,
schemaName
)
}
} catch (e: SQLException) {
throw java.lang.RuntimeException(e)
}
},
airbyteStream
)
}

/**
Expand Down Expand Up @@ -433,37 +458,34 @@ abstract class AbstractJdbcSource<Datatype>(
return sourceOperations.isCursorType(type)
}

public override fun queryTableIncremental(
override fun queryTableIncremental(
database: JdbcDatabase,
columnNames: List<String>,
schemaName: String?,
tableName: String,
cursorInfo: CursorInfo,
cursorFieldType: Datatype
): AutoCloseableIterator<JsonNode>? {
LOGGER.info("Queueing query for table: {}", tableName)
): AutoCloseableIterator<AirbyteRecordData> {
AbstractDbSource.LOGGER.info("Queueing query for table: {}", tableName)
val airbyteStream = AirbyteStreamUtils.convertFromNameAndNamespace(tableName, schemaName)
return AutoCloseableIterators.lazyIterator(
{
val quoteString = this.quoteString!!
try {
val stream =
database.unsafeQuery(
CheckedFunction<Connection, PreparedStatement, SQLException?> {
connection: Connection ->
LOGGER.info("Preparing query for table: {}", tableName)
val fullTableName =
{ connection: Connection ->
AbstractDbSource.LOGGER.info(
"Preparing query for table: {}",
tableName
)
val fullTableName: String =
RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting(
schemaName,
tableName,
quoteString
)
val quotedCursorField =
RelationalDbQueryUtils.enquoteIdentifier(
cursorInfo.cursorField,
quoteString
quoteString!!
)

val quotedCursorField: String =
enquoteIdentifier(cursorInfo.cursorField, quoteString)
val operator: String
if (cursorInfo.cursorRecordCount <= 0L) {
operator = ">"
Expand All @@ -476,7 +498,7 @@ abstract class AbstractJdbcSource<Datatype>(
cursorFieldType,
cursorInfo.cursor
)
LOGGER.info(
AbstractDbSource.LOGGER.info(
"Table {} cursor count: expected {}, actual {}",
tableName,
cursorInfo.cursorRecordCount,
Expand All @@ -489,7 +511,6 @@ abstract class AbstractJdbcSource<Datatype>(
">="
}
}

val wrappedColumnNames =
getWrappedColumnNames(
database,
Expand All @@ -514,9 +535,8 @@ abstract class AbstractJdbcSource<Datatype>(
if (stateEmissionFrequency > 0) {
sql.append(String.format(" ORDER BY %s ASC", quotedCursorField))
}

val preparedStatement = connection.prepareStatement(sql.toString())
LOGGER.info(
AbstractDbSource.LOGGER.info(
"Executing query for table {}: {}",
tableName,
preparedStatement
Expand All @@ -529,12 +549,9 @@ abstract class AbstractJdbcSource<Datatype>(
)
preparedStatement
},
CheckedFunction<ResultSet, JsonNode, SQLException?> {
queryResult: ResultSet? ->
sourceOperations.rowToJson(queryResult!!)
}
sourceOperations::convertDatabaseRowToAirbyteRecordData
)
return@lazyIterator AutoCloseableIterators.fromStream<JsonNode>(
return@lazyIterator AutoCloseableIterators.fromStream<AirbyteRecordData>(
stream,
airbyteStream
)
Expand All @@ -546,6 +563,10 @@ abstract class AbstractJdbcSource<Datatype>(
)
}

protected fun getCountColumnName(): String {
return "record_count"
}

/** Some databases need special column names in the query. */
@Throws(SQLException::class)
protected fun getWrappedColumnNames(
Expand All @@ -558,9 +579,6 @@ abstract class AbstractJdbcSource<Datatype>(
return RelationalDbQueryUtils.enquoteIdentifierList(columnNames, quoteString!!)
}

protected val countColumnName: String
get() = "record_count"

@Throws(SQLException::class)
protected fun getActualCursorRecordCount(
connection: Connection,
Expand All @@ -569,7 +587,7 @@ abstract class AbstractJdbcSource<Datatype>(
cursorFieldType: Datatype,
cursor: String?
): Long {
val columnName = countColumnName
val columnName = getCountColumnName()
val cursorRecordStatement: PreparedStatement
if (cursor == null) {
val cursorRecordQuery =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import io.airbyte.cdk.db.AbstractDatabase
import io.airbyte.cdk.db.IncrementalUtils.getCursorField
import io.airbyte.cdk.db.IncrementalUtils.getCursorFieldOptional
import io.airbyte.cdk.db.IncrementalUtils.getCursorType
import io.airbyte.cdk.db.jdbc.AirbyteRecordData
import io.airbyte.cdk.db.jdbc.JdbcDatabase
import io.airbyte.cdk.integrations.JdbcConnector
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility
Expand Down Expand Up @@ -416,14 +417,14 @@ protected constructor(driverClassName: String) :
.filter { o: String -> selectedFieldsInCatalog.contains(o) }
.collect(Collectors.toList())

val iterator: AutoCloseableIterator<AirbyteMessage?>
val iterator: AutoCloseableIterator<AirbyteMessage>
// checks for which sync mode we're using based on the configured airbytestream
// this is where the bifurcation between full refresh and incremental
if (airbyteStream.syncMode == SyncMode.INCREMENTAL) {
val cursorField = getCursorField(airbyteStream)
val cursorInfo = stateManager!!.getCursorInfo(pair)

val airbyteMessageIterator: AutoCloseableIterator<AirbyteMessage?>
val airbyteMessageIterator: AutoCloseableIterator<AirbyteMessage>
if (cursorInfo!!.map { it.cursor }.isPresent) {
airbyteMessageIterator =
getIncrementalStream(
Expand Down Expand Up @@ -525,7 +526,7 @@ protected constructor(driverClassName: String) :
table: TableInfo<CommonField<DataType>>,
cursorInfo: CursorInfo,
emittedAt: Instant
): AutoCloseableIterator<AirbyteMessage?> {
): AutoCloseableIterator<AirbyteMessage> {
val streamName = airbyteStream.stream.name
val namespace = airbyteStream.stream.namespace
val cursorField = getCursorField(airbyteStream)
Expand Down Expand Up @@ -577,7 +578,7 @@ protected constructor(driverClassName: String) :
emittedAt: Instant,
syncMode: SyncMode,
cursorField: Optional<String>
): AutoCloseableIterator<AirbyteMessage?> {
): AutoCloseableIterator<AirbyteMessage> {
val queryStream =
queryTableFullRefresh(
database,
Expand Down Expand Up @@ -745,7 +746,7 @@ protected constructor(driverClassName: String) :
tableName: String,
syncMode: SyncMode,
cursorField: Optional<String>
): AutoCloseableIterator<JsonNode>?
): AutoCloseableIterator<AirbyteRecordData>

/**
* Read incremental data from a table. Incremental read should return only records where cursor
Expand All @@ -762,7 +763,7 @@ protected constructor(driverClassName: String) :
tableName: String,
cursorInfo: CursorInfo,
cursorFieldType: DataType
): AutoCloseableIterator<JsonNode>?
): AutoCloseableIterator<AirbyteRecordData>

protected val stateEmissionFrequency: Int
/**
Expand Down Expand Up @@ -794,28 +795,37 @@ protected constructor(driverClassName: String) :
const val DISCOVER_TRACE_OPERATION_NAME: String = "discover-operation"
const val READ_TRACE_OPERATION_NAME: String = "read-operation"

private val LOGGER: Logger = LoggerFactory.getLogger(AbstractDbSource::class.java)
@JvmStatic
protected val LOGGER: Logger = LoggerFactory.getLogger(AbstractDbSource::class.java)

private fun getMessageIterator(
recordIterator: AutoCloseableIterator<JsonNode>?,
recordIterator: AutoCloseableIterator<AirbyteRecordData>,
streamName: String,
namespace: String,
emittedAt: Long
): AutoCloseableIterator<AirbyteMessage?> {
): AutoCloseableIterator<AirbyteMessage> {
return AutoCloseableIterators.transform(
recordIterator,
AirbyteStreamNameNamespacePair(streamName, namespace)
) { r: JsonNode? ->
) { airbyteRecordData ->
AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withRecord(
AirbyteRecordMessage()
.withStream(streamName)
.withNamespace(namespace)
.withEmittedAt(emittedAt)
.withData(r)
.withData(airbyteRecordData.rawRowData)
.withMeta(
if (isMetaChangesEmptyOrNull(airbyteRecordData.meta)) null
else airbyteRecordData.meta
)
)
}
}

private fun isMetaChangesEmptyOrNull(meta: AirbyteRecordMessageMeta?): Boolean {
return meta == null || meta.changes == null || meta.changes.isEmpty()
}
}
}

0 comments on commit 0972a11

Please sign in to comment.