-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
destination-mysql: bump CDK and introduce generation_id #40572
base: graphite-base/40572
Are you sure you want to change the base?
destination-mysql: bump CDK and introduce generation_id #40572
Conversation
The latest updates on your projects. Learn more about Vercel for Git βοΈ 1 Ignored Deployment
|
Warning This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
This stack of pull requests is managed by Graphite. Learn more about stacking. Join @stephane-airbyte and the rest of your teammates on |
f1c264a
to
72ca729
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this will work as expected - it includes #38067, i.e. the logic to fail if we don't receive a stream status message (and the only way for us to receive stream status messages is to set supportsRefreshes: true
in the metadata, which we can't do without switching to the new interfaces)
so if you want to bump the CDK version directly to latest, then this PR also needs to include that change
@@ -35,7 +35,11 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource | |||
|
|||
@Throws(SQLException::class) | |||
override fun execute(sql: String?) { | |||
execute { connection: Connection -> connection.createStatement().execute(sql) } | |||
execute { connection: Connection -> | |||
LOGGER.info("executing SQL $sql") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add a logStatements
param just in case of more shenanigans
@@ -44,6 +48,7 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource | |||
connection.autoCommit = false | |||
for (s in queries) { | |||
LOGGER.info("executing query within transaction: $s") | |||
LOGGER.info(Thread.currentThread().stackTrace.joinToString("\n ")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: combine this log statement with the previous one, to avoid mixing up concurrent log messages
isAirbyteMetaColumnMatch(existingTable)) || | ||
!(existingTable.columns.containsKey( | ||
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID | ||
) && isAirbyteMetaColumnMatch(existingTable)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably copypasta error, I think you want a new isAirbyteGenerationColumnMatch
method?
@@ -58,6 +61,7 @@ abstract class JdbcSqlGeneratorIntegrationTest<DestinationState : MinimumDestina | |||
DSL.table(tableName), | |||
columnNames.map { columnName: String -> DSL.field(DSL.quotedName(columnName)) } | |||
) | |||
LOGGER.info("SGX columnNames=$columnNames") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is sgx?
import java.sql.SQLException | ||
import java.util.* | ||
import org.jooq.* | ||
import org.jooq.conf.ParamType | ||
import org.jooq.impl.DSL | ||
import org.jooq.impl.SQLDataType | ||
|
||
val LOGGER = KotlinLogging.logger {} | ||
|
||
abstract class JdbcSqlGeneratorIntegrationTest<DestinationState : MinimumDestinationState> : |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you need to add generation_id in createRawTable and insertFinalTableRecords
@@ -83,6 +85,7 @@ class MysqlSqlGeneratorIntegrationTest : | |||
.column(COLUMN_NAME_AB_EXTRACTED_AT, SQLDataType.TIMESTAMP(6).nullable(false)) | |||
.column(COLUMN_NAME_AB_LOADED_AT, SQLDataType.TIMESTAMP(6)) | |||
.column(COLUMN_NAME_AB_META, structType.nullable(true)) | |||
.column(COLUMN_NAME_AB_GENERATION_ID, structType.nullable(true)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
structType -> SQLDataType.BIGINT?
What
How
Review guide
User Impact
Can this PR be safely reverted and rolled back?