Skip to content

Commit

Permalink
bigquery handles airbyte_meta/sync_id/generation_id
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed May 23, 2024
1 parent b7dae03 commit 0f92bd0
Show file tree
Hide file tree
Showing 41 changed files with 396 additions and 162 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ airbyteJavaConnector {
'gcs-destinations',
'core',
]
useLocalCdk = false
useLocalCdk = true
}

java {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.RangePartitioning;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler;
Expand Down Expand Up @@ -178,26 +178,34 @@ public static Table createTable(final BigQuery bigquery, final String datasetNam
*/
public static void createPartitionedTableIfNotExists(final BigQuery bigquery, final TableId tableId, final Schema schema) {
try {
final var chunkingColumn = JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT;
final TimePartitioning partitioning = TimePartitioning.newBuilder(TimePartitioning.Type.DAY)
.setField(chunkingColumn)
// Partition by generation ID. This will be useful for when we want to build
// hybrid refreshes.
final RangePartitioning partitioning = RangePartitioning.newBuilder()
.setField(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID)
.setRange(RangePartitioning.Range.newBuilder()
.setStart(0L)
// Bigquery allows a table to have up to 10_000 partitions.
.setEnd(10_000L)
// Somewhat conservative estimate. This should avoid issues with
// users running many merge refreshes.
.setInterval(5L)
.build())
.build();

final Clustering clustering = Clustering.newBuilder()
.setFields(ImmutableList.of(chunkingColumn))
.setFields(ImmutableList.of(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT))
.build();

final StandardTableDefinition tableDefinition =
StandardTableDefinition.newBuilder()
.setSchema(schema)
.setTimePartitioning(partitioning)
.setRangePartitioning(partitioning)
.setClustering(clustering)
.build();
final TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build();

final Table table = bigquery.getTable(tableInfo.getTableId());
if (table != null && table.exists()) {
// TODO: Handle migration from v1 -> v2
LOGGER.info("Partitioned table ALREADY EXISTS: {}", tableId);
} else {
bigquery.create(tableInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.destination.bigquery.formatter;

import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.QueryParameterValue;
import com.google.cloud.bigquery.Schema;
Expand All @@ -12,7 +14,6 @@
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage;
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteRecordMessage;
import io.airbyte.commons.json.Jsons;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

Expand All @@ -26,21 +27,24 @@ public class BigQueryRecordFormatter {
Field.of(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, StandardSQLTypeName.STRING),
Field.of(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, StandardSQLTypeName.TIMESTAMP),
Field.of(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, StandardSQLTypeName.TIMESTAMP),
Field.of(JavaBaseConstants.COLUMN_NAME_DATA, StandardSQLTypeName.STRING));
Field.of(JavaBaseConstants.COLUMN_NAME_DATA, StandardSQLTypeName.STRING),
Field.of(JavaBaseConstants.COLUMN_NAME_AB_META, StandardSQLTypeName.STRING),
Field.of(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, StandardSQLTypeName.INT64));

public BigQueryRecordFormatter() {}

public String formatRecord(PartialAirbyteMessage recordMessage) {
// Map.of has a @NonNull requirement, so creating a new Hash map
final HashMap<String, Object> destinationV2record = new HashMap<>();
destinationV2record.put(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, UUID.randomUUID().toString());
destinationV2record.put(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, getEmittedAtField(recordMessage.getRecord()));
destinationV2record.put(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, null);
destinationV2record.put(JavaBaseConstants.COLUMN_NAME_DATA, recordMessage.getSerialized());
return Jsons.serialize(destinationV2record);
public String formatRecord(PartialAirbyteMessage recordMessage, long generationId) {
final ObjectNode record = (ObjectNode) Jsons.emptyObject();
record.put(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, UUID.randomUUID().toString());
record.put(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, getEmittedAtField(recordMessage.getRecord()));
record.set(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, NullNode.instance);
record.put(JavaBaseConstants.COLUMN_NAME_DATA, recordMessage.getSerialized());
record.put(JavaBaseConstants.COLUMN_NAME_AB_META, Jsons.serialize(recordMessage.getRecord().getMeta()));
record.put(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, generationId);
return Jsons.serialize(record);
}

private Object getEmittedAtField(final PartialAirbyteRecordMessage recordMessage) {
private String getEmittedAtField(final PartialAirbyteRecordMessage recordMessage) {
// Bigquery represents TIMESTAMP to the microsecond precision, so we convert to microseconds then
// use BQ helpers to string-format correctly.
final long emittedAtMicroseconds = TimeUnit.MICROSECONDS.convert(recordMessage.getEmittedAt(), TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ public Sql createTable(final StreamConfig stream, final String suffix, final boo
_airbyte_raw_id STRING NOT NULL,
_airbyte_extracted_at TIMESTAMP NOT NULL,
_airbyte_meta JSON NOT NULL,
_airbyte_generation_id INTEGER,
${column_declarations}
)
PARTITION BY (DATE_TRUNC(_airbyte_extracted_at, DAY))
Expand Down Expand Up @@ -310,7 +311,8 @@ private String insertNewRecords(final StreamConfig stream,
${column_list}
_airbyte_meta,
_airbyte_raw_id,
_airbyte_extracted_at
_airbyte_extracted_at,
_airbyte_generation_id
)
${extractNewRawRecords};""");
}
Expand Down Expand Up @@ -397,17 +399,20 @@ private String upsertNewRecords(final StreamConfig stream,
${columnAssignments}
_airbyte_meta = new_record._airbyte_meta,
_airbyte_raw_id = new_record._airbyte_raw_id,
_airbyte_extracted_at = new_record._airbyte_extracted_at
_airbyte_extracted_at = new_record._airbyte_extracted_at,
_airbyte_generation_id = new_record._airbyte_generation_id
WHEN NOT MATCHED ${cdcSkipInsertClause} THEN INSERT (
${column_list}
_airbyte_meta,
_airbyte_raw_id,
_airbyte_extracted_at
_airbyte_extracted_at,
_airbyte_generation_id
) VALUES (
${newRecordColumnList}
new_record._airbyte_meta,
new_record._airbyte_raw_id,
new_record._airbyte_extracted_at
new_record._airbyte_extracted_at,
new_record._airbyte_generation_id
);""");
}

Expand Down Expand Up @@ -438,7 +443,7 @@ private String extractNewRawRecords(final StreamConfig stream,
WHEN (JSON_QUERY(PARSE_JSON(`_airbyte_data`, wide_number_mode=>'round'), '$."${raw_col_name}"') IS NOT NULL)
AND (JSON_TYPE(JSON_QUERY(PARSE_JSON(`_airbyte_data`, wide_number_mode=>'round'), '$."${raw_col_name}"')) != 'null')
AND (${json_extract} IS NULL)
THEN 'Problem with `${raw_col_name}`'
THEN JSON '{"field":"${raw_col_name}","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"}'
ELSE NULL
END"""))
.collect(joining(",\n")) + "]";
Expand Down Expand Up @@ -488,7 +493,9 @@ WITH intermediate_data AS (
${column_casts}
${column_errors} AS column_errors,
_airbyte_raw_id,
_airbyte_extracted_at
_airbyte_extracted_at,
_airbyte_meta,
_airbyte_generation_id
FROM ${project_id}.${raw_table_id}
WHERE (
_airbyte_loaded_at IS NULL
Expand All @@ -497,17 +504,26 @@ WITH intermediate_data AS (
), new_records AS (
SELECT
${column_list}
to_json(struct(COALESCE((SELECT ARRAY_AGG(unnested_column_errors IGNORE NULLS) FROM UNNEST(column_errors) unnested_column_errors), []) AS errors)) AS _airbyte_meta,
to_json(json_set(
coalesce(parse_json(_airbyte_meta), JSON'{}'),
'$.changes',
json_array_append(
coalesce(json_query(parse_json(_airbyte_meta), '$.changes'), JSON'[]'),
'$',
COALESCE((SELECT ARRAY_AGG(unnested_column_errors IGNORE NULLS) FROM UNNEST(column_errors) unnested_column_errors), [])
)
)) as _airbyte_meta,
_airbyte_raw_id,
_airbyte_extracted_at
_airbyte_extracted_at,
_airbyte_generation_id
FROM intermediate_data
), numbered_rows AS (
SELECT *, row_number() OVER (
PARTITION BY ${pk_list} ORDER BY ${cursor_order_clause} `_airbyte_extracted_at` DESC
) AS row_number
FROM new_records
)
SELECT ${column_list} _airbyte_meta, _airbyte_raw_id, _airbyte_extracted_at
SELECT ${column_list} _airbyte_meta, _airbyte_raw_id, _airbyte_extracted_at, _airbyte_generation_id
FROM numbered_rows
WHERE row_number = 1""");
} else {
Expand All @@ -527,17 +543,28 @@ WITH intermediate_data AS (
${column_casts}
${column_errors} AS column_errors,
_airbyte_raw_id,
_airbyte_extracted_at
_airbyte_extracted_at,
_airbyte_meta,
_airbyte_generation_id
FROM ${project_id}.${raw_table_id}
WHERE
_airbyte_loaded_at IS NULL
${extractedAtCondition}
)
SELECT
${column_list}
to_json(struct(COALESCE((SELECT ARRAY_AGG(unnested_column_errors IGNORE NULLS) FROM UNNEST(column_errors) unnested_column_errors), []) AS errors)) AS _airbyte_meta,
to_json(json_set(
coalesce(parse_json(_airbyte_meta), JSON'{}'),
'$.changes',
json_array_append(
coalesce(json_query(parse_json(_airbyte_meta), '$.changes'), JSON'[]'),
'$',
COALESCE((SELECT ARRAY_AGG(unnested_column_errors IGNORE NULLS) FROM UNNEST(column_errors) unnested_column_errors), [])
)
)) as _airbyte_meta,
_airbyte_raw_id,
_airbyte_extracted_at
_airbyte_extracted_at,
_airbyte_generation_id
FROM intermediate_data""");
}
}
Expand Down Expand Up @@ -599,7 +626,9 @@ public Sql migrateFromV1toV2(final StreamId streamId, final String namespace, fi
_airbyte_raw_id STRING,
_airbyte_data STRING,
_airbyte_extracted_at TIMESTAMP,
_airbyte_loaded_at TIMESTAMP
_airbyte_loaded_at TIMESTAMP,
_airbyte_meta STRING,
_airbyte_generation_id INTEGER
)
PARTITION BY DATE(_airbyte_extracted_at)
CLUSTER BY _airbyte_extracted_at
Expand All @@ -608,7 +637,9 @@ PARTITION BY DATE(_airbyte_extracted_at)
_airbyte_ab_id AS _airbyte_raw_id,
_airbyte_data AS _airbyte_data,
_airbyte_emitted_at AS _airbyte_extracted_at,
CAST(NULL AS TIMESTAMP) AS _airbyte_loaded_at
CAST(NULL AS TIMESTAMP) AS _airbyte_loaded_at,
'{"sync_id": 0, "changes": []}' AS _airbyte_meta,
0 as _airbyte_generation_id
FROM ${project_id}.${v1_raw_table}
);
"""));
Expand All @@ -626,8 +657,8 @@ private String escapeColumnNameForJsonPath(final String stringContents) {
return stringContents
// Consider the JSON blob {"foo\\bar": 42}.
// This is an object with key foo\bar.
// The JSONPath for this is (something like...?) $."foo\\bar" (i.e. 2 backslashes).
// TODO is that jsonpath correct?
// The JSONPath for this is $."foo\\bar" (i.e. 2 backslashes to represent the single
// backslash in the key).
// When we represent that path as a SQL string, the backslashes are doubled (to 4): '$."foo\\\\bar"'
// And we're writing that in a Java string, so we have to type out 8 backslashes:
// "'$.\"foo\\\\\\\\bar\"'"
Expand All @@ -637,7 +668,7 @@ private String escapeColumnNameForJsonPath(final String stringContents) {
// which is \\" in a SQL string: '$."foo\\"bar"'
// The backslashes become \\\\ in java, and the quote becomes \": "'$.\"foo\\\\\"bar\"'"
.replace("\"", "\\\\\"")
// Here we're escaping a SQL string, so we only need a single backslash (which is 2, beacuse Java).
// Here we're escaping a SQL string, so we only need a single backslash (which is 2, because Java).
.replace("'", "\\'");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import io.airbyte.integrations.destination.bigquery.BigQueryUtils.*
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter
import io.airbyte.integrations.destination.bigquery.migrators.BigQueryDV2Migration
import io.airbyte.integrations.destination.bigquery.migrators.BigQueryDestinationState
import io.airbyte.integrations.destination.bigquery.migrators.BigqueryAirbyteMetaAndGenerationIdMigration
import io.airbyte.integrations.destination.bigquery.operation.BigQueryDirectLoadingStorageOperation
import io.airbyte.integrations.destination.bigquery.operation.BigQueryGcsStorageOperation
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler
Expand Down Expand Up @@ -226,7 +227,11 @@ class BigQueryDestination : BaseConnector(), Destination {
)
val destinationHandler = BigQueryDestinationHandler(bigquery, datasetLocation)

val migrations = listOf(BigQueryDV2Migration(sqlGenerator, bigquery))
val migrations =
listOf(
BigQueryDV2Migration(sqlGenerator, bigquery),
BigqueryAirbyteMetaAndGenerationIdMigration(bigquery),
)

if (uploadingMethod == UploadingMethod.STANDARD) {
val bigQueryClientChunkSize = getBigQueryClientChunkSize(config)
Expand Down Expand Up @@ -289,7 +294,7 @@ class BigQueryDestination : BaseConnector(), Destination {
bigQueryGcsStorageOperations,
initialStatus,
FileUploadFormat.CSV,
V2_WITHOUT_META,
V2_WITH_GENERATION,
disableTD
)
},
Expand Down
Loading

0 comments on commit 0f92bd0

Please sign in to comment.