-
Notifications
You must be signed in to change notification settings - Fork 5k
Bigquery / Bulk Load CDK: implement T+D, truncate refresh, etc #57578
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bigquery / Bulk Load CDK: implement T+D, truncate refresh, etc #57578
Conversation
|
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
9d266d5 to
8acd8fb
Compare
f7306f6 to
d9fe774
Compare
bfef4d8 to
e2e04af
Compare
5e17a90 to
27e0aec
Compare
e2e04af to
47b9096
Compare
27e0aec to
a72b07f
Compare
765cf5b to
29483a6
Compare
a72b07f to
ff01262
Compare
29483a6 to
73e6ae3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a copy of the existing Sql class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is similar to the existing DestinationHandler interface, but trimmed down. The "initial status" stuff has ben moved to DestinationInitialStatusGatherer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What a name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, this refers to the Destination not the Destination. I was confused
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really a DatabaseDestinationHandler?
also FYI the other db stuff has db in the namespace (cdk.load.db.orchestration)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or StructuredDataStoreHandler if you want to get really pedantic, since it will also cover warehouses.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll go with "database", I feel like context is sufficiently clear that it also refers to warehouses :P
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed in (as yet unpushed) bafeb80
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is mostly identical to the old DestinationInitialStatus class, but cleaned up for kotlin style.
frifriSF59
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some nits / tweaks but I think this looks good
| object TypingDedupingUtil { | ||
| // copied wholesale from old CDK's StreamId | ||
| fun concatenateRawTableName(namespace: String, name: String): String { | ||
| val plainConcat = namespace + name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thoughts about this:
val plain = namespace + name
// Find all underscore‐runs, get the max length (or 1 if none)
val longestRun = Regex("_+")
.findAll(plain)
.map { it.value.length }
.maxOrNull()
.let { max(it ?: 0, 1) }
// Build: namespace + "_raw" + "_" * (longestRun + 1) + "stream_" + name
val underscores = "_".repeat(longestRun + 1)
return buildString {
append(namespace)
append("_raw")
append(underscores)
append("stream_")
append(name)
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done in 7477f85 (I used a string template instead of buildstring, IMO it's easier to read)
...main/kotlin/io/airbyte/cdk/load/orchestration/legacy_typing_deduping/TypingDedupingWriter.kt
Outdated
Show resolved
Hide resolved
| * | ||
| * (I have no explanation for the method names.) | ||
| */ | ||
| class BigQuerySQLNameTransformer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we mark this as deprecated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mm. I think what we should do is actually rename it + the methods for clarity. Not as part of this PR though, the diff would be really annoying
| """{ | ||
| "format_type": "CSV", | ||
| "flattening": "No flattening" | ||
| }""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have a .trimIndent() here?
.../kotlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.kt
Show resolved
Hide resolved
...tlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDatabaseHandler.kt
Show resolved
Hide resolved
...tlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDatabaseHandler.kt
Show resolved
Hide resolved
...tlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDatabaseHandler.kt
Outdated
Show resolved
Hide resolved
...tlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDatabaseHandler.kt
Outdated
Show resolved
Hide resolved
...e/integrations/destination/bigquery/typing_deduping/BigqueryDatabaseInitialStatusGatherer.kt
Show resolved
Hide resolved
...e/integrations/destination/bigquery/typing_deduping/BigqueryDatabaseInitialStatusGatherer.kt
Outdated
Show resolved
Hide resolved
# Conflicts: # airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt # airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.kt # airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.kt # airbyte-integrations/connectors/destination-bigquery/src/test-integration/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTestUtils.kt # airbyte-integrations/connectors/destination-bigquery/src/test-integration/kotlin/io/airbyte/integrations/destination/bigquery/BigqueryWriteTest.kt # airbyte-integrations/connectors/destination-bigquery/src/test/kotlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryDestinationHandlerTest.kt
# Conflicts: # airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt # airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.kt # airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.kt # airbyte-integrations/connectors/destination-bigquery/src/test-integration/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTestUtils.kt # airbyte-integrations/connectors/destination-bigquery/src/test-integration/kotlin/io/airbyte/integrations/destination/bigquery/BigqueryWriteTest.kt # airbyte-integrations/connectors/destination-bigquery/src/test/kotlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryDestinationHandlerTest.kt
# Conflicts: # airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt # airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.kt # airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.kt # airbyte-integrations/connectors/destination-bigquery/src/test-integration/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTestUtils.kt # airbyte-integrations/connectors/destination-bigquery/src/test-integration/kotlin/io/airbyte/integrations/destination/bigquery/BigqueryWriteTest.kt # airbyte-integrations/connectors/destination-bigquery/src/test/kotlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryDestinationHandlerTest.kt
in very broad strokes:
toolkits/load-db- the actual core logicBigqueryWriter.