Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination Bigquery+Snowflake: Fix resuming truncate refresh #41041

Merged
merged 7 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
private val isTruncateSync: Boolean
private val rawTableSuffix: String
private val finalTmpTableSuffix: String
// null for truncate syncs, where we don't care at all about the initial status.
private val initialRawTableStatus: InitialRawTableStatus?
/**
* The status of the raw table that "matters" for this sync. Specifically:
* * For normal syncs / merge refreshes, this is the status of the real raw table)
* * For truncate refreshes, this is the status of the temp raw table (because we never even
* look at the real raw table)
*/
private val initialRawTableStatus: InitialRawTableStatus
johnny-schmidt marked this conversation as resolved.
Show resolved Hide resolved

/**
* After running any sync setup code, we may update the destination state. This field holds that
Expand All @@ -52,7 +57,7 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
if (isTruncateSync) {
prepareStageForTruncate(destinationInitialStatus, stream)
rawTableSuffix = TMP_TABLE_SUFFIX
initialRawTableStatus = null
initialRawTableStatus = destinationInitialStatus.initialTempRawTableStatus
} else {
rawTableSuffix = NO_SUFFIX
initialRawTableStatus = prepareStageForNormalSync(stream, destinationInitialStatus)
Expand Down Expand Up @@ -278,25 +283,32 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
if (
!isTruncateSync &&
syncSummary.recordsWritten == 0L &&
!initialRawTableStatus!!.hasUnprocessedRecords
!initialRawTableStatus.hasUnprocessedRecords
) {
log.info {
"Skipping typing and deduping for stream ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName} " +
"because it had no records during this sync and no unprocessed records from a previous sync."
}
} else if (isTruncateSync && (!streamSuccessful || syncSummary.recordsWritten == 0L)) {
} else if (
isTruncateSync &&
(!streamSuccessful ||
(syncSummary.recordsWritten == 0L &&
!(initialRawTableStatus.rawTableExists &&
initialRawTableStatus.hasUnprocessedRecords)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, I think I understand: before if we crashed and started again but had no more records, we'd incorrectly interpret that as there being no intermediate state. This clarifies: we need both 0 new records AND the tmp table not to exist.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep. tried to capture that in the comment :/ but this is definitely an ugly conditional

) {
// But truncate syncs should only T+D if the sync was successful, since we're T+Ding
// into a temp final table anyway. And we only need to check if _this_ sync emitted
// records, since we've nuked the old raw data.
// into a temp final table anyway.
// We only run T+D if the current sync had some records, or a previous attempt wrote
// some records to the temp raw table.
log.info {
"Skipping typing and deduping for stream ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName}. Stream success: $streamSuccessful; records written: ${syncSummary.recordsWritten}"
"Skipping typing and deduping for stream ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName} running as truncate sync. Stream success: $streamSuccessful; records written: ${syncSummary.recordsWritten}; temp raw table already existed: ${initialRawTableStatus.rawTableExists}; temp raw table had records: ${initialRawTableStatus.hasUnprocessedRecords}"
}
} else {
// In truncate mode, we want to read all the raw records. Typically, this is equivalent
// to filtering on timestamp, but might as well be explicit.
val timestampFilter =
if (!isTruncateSync) {
initialRawTableStatus!!.maxProcessedTimestamp
initialRawTableStatus.maxProcessedTimestamp
} else {
Optional.empty()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class AbstractStreamOperationTest {

verifySequence {
storageOperation.prepareStage(streamId, EXPECTED_SUFFIX)
storageOperation.createFinalTable(streamConfig, EXPECTED_SUFFIX, true)
storageOperation.createFinalTable(streamConfig, EXPECTED_SUFFIX, replace = true)
}
confirmVerified(storageOperation)

Expand Down Expand Up @@ -238,7 +238,7 @@ class AbstractStreamOperationTest {

verifySequence {
storageOperation.prepareStage(streamId, EXPECTED_SUFFIX)
storageOperation.createFinalTable(streamConfig, EXPECTED_SUFFIX, true)
storageOperation.createFinalTable(streamConfig, EXPECTED_SUFFIX, replace = true)
}
confirmVerified(storageOperation)

Expand Down Expand Up @@ -318,7 +318,7 @@ class AbstractStreamOperationTest {

verifySequence {
storageOperation.prepareStage(streamId, EXPECTED_SUFFIX)
storageOperation.createFinalTable(streamConfig, EXPECTED_SUFFIX, true)
storageOperation.createFinalTable(streamConfig, EXPECTED_SUFFIX, replace = true)
}
confirmVerified(storageOperation)

Expand All @@ -341,6 +341,68 @@ class AbstractStreamOperationTest {
)
}

/**
* 1. A previous truncate refresh attempt created a non-empty temp raw table, then failed.
* 2. Our current attempt is successful, but we emitted no new records
* 3. We should still run T+D for this table, because we need to T+D the previous attempt's
* records.
*/
@Test
fun existingNonEmptyTempRawTableNoNewRecords() {
val initialState =
mockk<DestinationInitialStatus<MinimumDestinationState.Impl>> {
every { streamConfig } returns this@Truncate.streamConfig
every { initialRawTableStatus } returns mockk<InitialRawTableStatus>()
every { initialTempRawTableStatus.rawTableExists } returns true
every { initialTempRawTableStatus.hasUnprocessedRecords } returns true
every { initialTempRawTableStatus.maxProcessedTimestamp } returns
maxProcessedTimestamp
// This doesn't matter too much, but thematically - there's a preexisting
// final table.
every { isFinalTablePresent } returns true
every { isFinalTableEmpty } returns false
every {
destinationState.withSoftReset<MinimumDestinationState.Impl>(any())
} returns destinationState
}
every { storageOperation.getStageGeneration(streamId, EXPECTED_SUFFIX) } returns 21

val streamOperations = TestStreamOperation(storageOperation, initialState)

verifySequence {
storageOperation.getStageGeneration(streamId, EXPECTED_SUFFIX)
storageOperation.prepareStage(streamId, EXPECTED_SUFFIX)
storageOperation.createFinalTable(streamConfig, EXPECTED_SUFFIX, replace = true)
}
confirmVerified(storageOperation)

clearMocks(storageOperation)
streamOperations.finalizeTable(
streamConfig,
StreamSyncSummary(0, AirbyteStreamStatus.COMPLETE)
)

verifySequence {
storageOperation.cleanupStage(streamId)
storageOperation.overwriteStage(streamId, EXPECTED_SUFFIX)
storageOperation.typeAndDedupe(
streamConfig,
// In a truncate refresh, we know we need to T+D the entire temp raw table
// so even though the initial temp raw table status has a real timestamp,
// we just ignore that and pass Optional.empty()
maxProcessedTimestamp = Optional.empty(),
EXPECTED_SUFFIX,
)
storageOperation.overwriteFinalTable(streamConfig, EXPECTED_SUFFIX)
}
confirmVerified(storageOperation)
checkUnnecessaryStub(
initialState,
initialState.initialRawTableStatus,
initialState.destinationState
)
}

@ParameterizedTest
@MethodSource(
"io.airbyte.integrations.base.destination.operation.AbstractStreamOperationTest#generationIds"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ airbyteJavaConnector {
'gcs-destinations',
'core',
]
useLocalCdk = false
useLocalCdk = true
}

java {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00Z", "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43, "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43, "id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}

{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 44, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 44, "id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "name": "Bob", "address": {"city": "New York", "state": "NY"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 44, "id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 44, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 44, "id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "name": "Bob", "address": {"city": "New York", "state": "NY"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 44, "id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 44}
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 44}
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 44}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// We keep the records from the first sync
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43}
{"_airbyte_extracted_at": "1970-01-01T00:00:01Z", "_airbyte_data": {"id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43}
// And append the records from the second sync
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 44}
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 44}
{"_airbyte_extracted_at": "1970-01-01T00:00:02Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 44}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins {
airbyteJavaConnector {
cdkVersionRequired = '0.40.9'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
useLocalCdk = true
}

java {
Expand Down
Loading