Skip to content

Commit

Permalink
Mysql to emit stream initial loader (#36932)
Browse files Browse the repository at this point in the history
Co-authored-by: Aaron ("AJ") Steers <aj@airbyte.io>
  • Loading branch information
xiaohansong and aaronsteers committed May 3, 2024
1 parent 7c0a6c5 commit b9dc205
Show file tree
Hide file tree
Showing 20 changed files with 1,029 additions and 276 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :--------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 0.31.8 | 2024-05-03 | [\#36932](https://github.com/airbytehq/airbyte/pull/36932) | CDK changes on resumable full refresh |
| 0.31.7 | 2024-05-02 | [\#36910](https://github.com/airbytehq/airbyte/pull/36910) | changes for destination-snowflake |
| 0.31.6 | 2024-05-02 | [\#37746](https://github.com/airbytehq/airbyte/pull/37746) | debuggability improvements. |
| 0.31.5 | 2024-04-30 | [\#37758](https://github.com/airbytehq/airbyte/pull/37758) | Set debezium max retries to zero |
Expand Down
@@ -1 +1 @@
version=0.31.7
version=0.31.8
Expand Up @@ -42,6 +42,7 @@ import io.airbyte.cdk.integrations.base.Source
import io.airbyte.cdk.integrations.source.jdbc.dto.JdbcPrivilegeDto
import io.airbyte.cdk.integrations.source.relationaldb.AbstractDbSource
import io.airbyte.cdk.integrations.source.relationaldb.CursorInfo
import io.airbyte.cdk.integrations.source.relationaldb.InitialLoadHandler
import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils
import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifier
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo
Expand All @@ -54,6 +55,7 @@ import io.airbyte.commons.util.AutoCloseableIterator
import io.airbyte.commons.util.AutoCloseableIterators
import io.airbyte.protocol.models.CommonField
import io.airbyte.protocol.models.JsonSchemaType
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
Expand All @@ -62,6 +64,7 @@ import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.ResultSet
import java.sql.SQLException
import java.time.Instant
import java.util.*
import java.util.function.Consumer
import java.util.function.Function
Expand All @@ -84,7 +87,7 @@ import org.slf4j.LoggerFactory
abstract class AbstractJdbcSource<Datatype>(
driverClass: String,
@JvmField val streamingQueryConfigProvider: Supplier<JdbcStreamingQueryConfig>,
sourceOperations: JdbcCompatibleSourceOperations<Datatype>
sourceOperations: JdbcCompatibleSourceOperations<Datatype>,
) : AbstractDbSource<Datatype, JdbcDatabase>(driverClass), Source {
@JvmField val sourceOperations: JdbcCompatibleSourceOperations<Datatype>

Expand All @@ -95,6 +98,61 @@ abstract class AbstractJdbcSource<Datatype>(
this.sourceOperations = sourceOperations
}

open fun supportResumableFullRefresh(
database: JdbcDatabase,
airbyteStream: ConfiguredAirbyteStream
): Boolean {
return false
}

open fun getInitialLoadHandler(
database: JdbcDatabase,
airbyteStream: ConfiguredAirbyteStream,
catalog: ConfiguredAirbyteCatalog?,
stateManager: StateManager?
): InitialLoadHandler<Datatype>? {
return null
}

override fun getFullRefreshStream(
database: JdbcDatabase,
airbyteStream: ConfiguredAirbyteStream,
catalog: ConfiguredAirbyteCatalog?,
stateManager: StateManager?,
namespace: String,
selectedDatabaseFields: List<String>,
table: TableInfo<CommonField<Datatype>>,
emittedAt: Instant,
syncMode: SyncMode,
cursorField: Optional<String>
): AutoCloseableIterator<AirbyteMessage> {
if (
supportResumableFullRefresh(database, airbyteStream) &&
syncMode == SyncMode.FULL_REFRESH
) {
val initialLoadHandler =
getInitialLoadHandler(database, airbyteStream, catalog, stateManager)
?: throw IllegalStateException(
"Must provide initialLoadHandler for resumable full refresh."
)
return initialLoadHandler.getIteratorForStream(airbyteStream, table, Instant.now())
}

// If flag is off, fall back to legacy non-resumable refresh
return super.getFullRefreshStream(
database,
airbyteStream,
catalog,
stateManager,
namespace,
selectedDatabaseFields,
table,
emittedAt,
syncMode,
cursorField,
)
}

override fun queryTableFullRefresh(
database: JdbcDatabase,
columnNames: List<String>,
Expand Down
Expand Up @@ -155,6 +155,8 @@ protected constructor(driverClassName: String) :
this.getAirbyteType(columnType)
}

initializeForStateManager(database, catalog, fullyQualifiedTableNameToInfo, stateManager)

val incrementalIterators =
getIncrementalIterators(
database,
Expand Down Expand Up @@ -188,6 +190,15 @@ protected constructor(driverClassName: String) :
}
}

// Optional - perform any initialization logic before read. For example, source connector
// can choose to load up state manager here.
protected open fun initializeForStateManager(
database: Database,
catalog: ConfiguredAirbyteCatalog,
tableNameToTable: Map<String?, TableInfo<CommonField<DataType>>>,
stateManager: StateManager
) {}

@Throws(SQLException::class)
protected fun validateCursorFieldForIncrementalTables(
tableNameToTable: Map<String?, TableInfo<CommonField<DataType>>>,
Expand Down Expand Up @@ -380,7 +391,14 @@ protected constructor(driverClassName: String) :

val table = tableNameToTable[fullyQualifiedTableName]!!
val tableReadIterator =
createReadIterator(database, airbyteStream, table, stateManager, emittedAt)
createReadIterator(
database,
airbyteStream,
catalog,
table,
stateManager,
emittedAt
)
iteratorList.add(tableReadIterator)
}
}
Expand All @@ -401,6 +419,7 @@ protected constructor(driverClassName: String) :
private fun createReadIterator(
database: Database,
airbyteStream: ConfiguredAirbyteStream,
catalog: ConfiguredAirbyteCatalog?,
table: TableInfo<CommonField<DataType>>,
stateManager: StateManager?,
emittedAt: Instant
Expand Down Expand Up @@ -442,7 +461,9 @@ protected constructor(driverClassName: String) :
airbyteMessageIterator =
getFullRefreshStream(
database,
streamName,
airbyteStream,
catalog,
stateManager,
namespace,
selectedDatabaseFields,
table,
Expand Down Expand Up @@ -475,7 +496,9 @@ protected constructor(driverClassName: String) :
iterator =
getFullRefreshStream(
database,
streamName,
airbyteStream,
catalog,
stateManager,
namespace,
selectedDatabaseFields,
table,
Expand Down Expand Up @@ -560,18 +583,22 @@ protected constructor(driverClassName: String) :
* Creates a AirbyteMessageIterator that contains all records for a database source connection
*
* @param database Source Database
* @param streamName name of an individual stream in which a stream represents a source (e.g.
* @param airbyteStream name of an individual stream in which a stream represents a source (e.g.
* API endpoint or database table)
* @param catalog List of streams (e.g. database tables or API endpoints) with settings on sync
* @param stateManager tracking the state from previous sync; used for resumable full refresh.
* @param namespace Namespace of the database (e.g. public)
* @param selectedDatabaseFields List of all interested database column names
* @param table information in tabular format
* @param emittedAt Time when data was emitted from the Source database
* @param syncMode The sync mode that this full refresh stream should be associated with.
* @return AirbyteMessageIterator with all records for a database source
*/
private fun getFullRefreshStream(
protected open fun getFullRefreshStream(
database: Database,
streamName: String,
airbyteStream: ConfiguredAirbyteStream,
catalog: ConfiguredAirbyteCatalog?,
stateManager: StateManager?,
namespace: String,
selectedDatabaseFields: List<String>,
table: TableInfo<CommonField<DataType>>,
Expand All @@ -588,7 +615,12 @@ protected constructor(driverClassName: String) :
syncMode,
cursorField
)
return getMessageIterator(queryStream, streamName, namespace, emittedAt.toEpochMilli())
return getMessageIterator(
queryStream,
airbyteStream.stream.name,
namespace,
emittedAt.toEpochMilli()
)
}

/**
Expand Down
@@ -0,0 +1,19 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.source.relationaldb

import io.airbyte.commons.util.AutoCloseableIterator
import io.airbyte.protocol.models.CommonField
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import java.time.Instant

interface InitialLoadHandler<T> {
fun getIteratorForStream(
airbyteStream: ConfiguredAirbyteStream,
table: TableInfo<CommonField<T>>,
emittedAt: Instant
): AutoCloseableIterator<AirbyteMessage>
}

0 comments on commit b9dc205

Please sign in to comment.