Skip to content

Commit

Permalink
🎉 BigQuery destination: use serialized buffer for gcs staging (#11776)
Browse files Browse the repository at this point in the history
* Rebase bigquery changes to master

* Add comments

* Uncomment test code

* Format code

* Bump versions

* Fix denormalized destination target table name

* Fix avro schema for denormalized destination

* Remove unnecessary params from consumer factory

* Add back previous version

* Add warning about standard mode

* auto-bump connector version

* Bump version for bigquery in seed

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
tuliren and octavia-squidington-iii authored Apr 7, 2022
1 parent cb40ebc commit 8bd2d9b
Show file tree
Hide file tree
Showing 29 changed files with 830 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
- name: BigQuery
destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerRepository: airbyte/destination-bigquery
dockerImageTag: 1.0.2
dockerImageTag: 1.1.0
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
icon: bigquery.svg
resourceRequirements:
Expand All @@ -36,7 +36,7 @@
- name: BigQuery (denormalized typed struct)
destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496
dockerRepository: airbyte/destination-bigquery-denormalized
dockerImageTag: 0.2.15
dockerImageTag: 0.3.0
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
icon: bigquery.svg
resourceRequirements:
Expand Down
72 changes: 43 additions & 29 deletions airbyte-config/init/src/main/resources/seed/destination_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-bigquery:1.0.2"
- dockerImage: "airbyte/destination-bigquery:1.1.0"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery"
connectionSpecification:
Expand Down Expand Up @@ -314,7 +314,7 @@
project_id:
type: "string"
description: "The GCP project ID for the project containing the target BigQuery\
\ dataset. Read more <a href=\"https://cloud.google.com/iam/docs/creating-managing-service-accounts#creating\"\
\ dataset. Read more <a href=\"https://cloud.google.com/resource-manager/docs/creating-managing-projects#identifying_projects\"\
>here</a>."
title: "Project ID"
dataset_id:
Expand Down Expand Up @@ -369,7 +369,7 @@
\ <a href=\"https://docs.airbyte.com/integrations/destinations/bigquery#service-account-key\"\
>docs</a> if you need help generating this key. Default credentials will\
\ be used if this field is left empty."
title: "Credentials JSON (Optional)"
title: "Service Account Key JSON (Optional)"
airbyte_secret: true
transformation_priority:
type: "string"
Expand Down Expand Up @@ -495,7 +495,7 @@
- "overwrite"
- "append"
- "append_dedup"
- dockerImage: "airbyte/destination-bigquery-denormalized:0.2.15"
- dockerImage: "airbyte/destination-bigquery-denormalized:0.3.0"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery"
connectionSpecification:
Expand All @@ -508,14 +508,13 @@
additionalProperties: true
properties:
big_query_client_buffer_size_mb:
title: "Google BigQuery client chunk size"
description: "Google BigQuery client's chunk (buffer) size (MIN = 1, MAX\
\ = 15) for each table. The size that will be written by a single RPC.\
\ Written data will be buffered and only flushed upon reaching this size\
\ or closing the channel. It defaults to 15MiB. Smaller chunk size means\
\ less memory consumption, and is recommended for big data sets. For more\
\ details refer to the documentation <a href=\"https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.client.Client.html\"\
>here</a>"
title: "Google BigQuery Client Chunk Size (Optional)"
description: "Google BigQuery client's chunk (buffer) size (MIN=1, MAX =\
\ 15) for each table. The size that will be written by a single RPC. Written\
\ data will be buffered and only flushed upon reaching this size or closing\
\ the channel. The default 15MB value is used if not set explicitly. Read\
\ more <a href=\"https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.client.Client.html\"\
>here</a>."
type: "integer"
minimum: 1
maximum: 15
Expand All @@ -525,18 +524,22 @@
project_id:
type: "string"
description: "The GCP project ID for the project containing the target BigQuery\
\ dataset."
\ dataset. Read more <a href=\"https://cloud.google.com/resource-manager/docs/creating-managing-projects#identifying_projects\"\
>here</a>."
title: "Project ID"
dataset_id:
type: "string"
description: "Default BigQuery Dataset ID tables are replicated to if the\
\ source does not specify a namespace."
description: "The default BigQuery Dataset ID that tables are replicated\
\ to if the source does not specify a namespace. Read more <a href=\"\
https://cloud.google.com/bigquery/docs/datasets#create-dataset\">here</a>."
title: "Default Dataset ID"
dataset_location:
type: "string"
description: "The location of the dataset. Warning: Changes made after creation\
\ will not be applied."
title: "Dataset Location"
\ will not be applied. The default \"US\" value is used if not set explicitly.\
\ Read more <a href=\"https://cloud.google.com/bigquery/docs/locations\"\
>here</a>."
title: "Dataset Location (Optional)"
default: "US"
enum:
- "US"
Expand Down Expand Up @@ -573,19 +576,26 @@
credentials_json:
type: "string"
description: "The contents of the JSON service account key. Check out the\
\ <a href=\"https://docs.airbyte.io/integrations/destinations/bigquery\"\
\ <a href=\"https://docs.airbyte.com/integrations/destinations/bigquery#service-account-key\"\
>docs</a> if you need help generating this key. Default credentials will\
\ be used if this field is left empty."
title: "Credentials JSON"
title: "Service Account Key JSON (Optional)"
airbyte_secret: true
loading_method:
type: "object"
title: "Loading Method"
description: "Select the way that data will be uploaded to BigQuery."
title: "Loading Method *"
description: "Loading method used to send select the way data will be uploaded\
\ to BigQuery. <br/><b>Standard Inserts</b> - Direct uploading using SQL\
\ INSERT statements. This method is extremely inefficient and provided\
\ only for quick testing. In almost all cases, you should use staging.\
\ <br/><b>GCS Staging</b> - Writes large batches of records to a file,\
\ uploads the file to GCS, then uses <b>COPY INTO table</b> to upload\
\ the file. Recommended for most workloads for better speed and scalability.\
\ Read more about GCS Staging <a href=\"https://docs.airbyte.com/integrations/destinations/bigquery#gcs-staging\"\
>here</a>."
oneOf:
- title: "Standard Inserts"
additionalProperties: false
description: "Direct uploading using streams."
required:
- "method"
properties:
Expand All @@ -594,9 +604,6 @@
const: "Standard"
- title: "GCS Staging"
additionalProperties: false
description: "Writes large batches of records to a file, uploads the file\
\ to GCS, then uses <pre>COPY INTO table</pre> to upload the file. Recommended\
\ for large production workloads for better speed and scalability."
required:
- "method"
- "gcs_bucket_name"
Expand All @@ -609,16 +616,18 @@
gcs_bucket_name:
title: "GCS Bucket Name"
type: "string"
description: "The name of the GCS bucket."
description: "The name of the GCS bucket. Read more <a href=\"https://cloud.google.com/storage/docs/naming-buckets\"\
>here</a>."
examples:
- "airbyte_sync"
gcs_bucket_path:
title: "GCS Bucket Path"
description: "Directory under the GCS bucket where data will be written."
type: "string"
examples:
- "data_sync/test"
part_size_mb:
title: "Block Size (MB) for GCS multipart upload"
title: "Block Size (MB) for GCS Multipart Upload (Optional)"
description: "This is the size of a \"Part\" being buffered in memory.\
\ It limits the memory usage when writing. Larger values will allow\
\ to upload a bigger files and improve the speed, but consumes more\
Expand All @@ -633,14 +642,19 @@
type: "string"
description: "This upload method is supposed to temporary store records\
\ in GCS bucket. What do you want to do with data in GCS bucket\
\ when migration has finished?"
title: "GCS tmp files afterward processing"
\ when migration has finished? The default \"Delete all tmp files\
\ from GCS\" value is used if not set explicitly."
title: "GCS Tmp Files Afterward Processing (Optional)"
default: "Delete all tmp files from GCS"
enum:
- "Delete all tmp files from GCS"
- "Keep all tmp files in GCS"
credential:
title: "Credential"
description: "An HMAC key is a type of credential and can be associated\
\ with a service account or a user account in Cloud Storage. Read\
\ more <a href=\"https://cloud.google.com/storage/docs/authentication/hmackeys\"\
>here</a>."
type: "object"
oneOf:
- title: "HMAC key"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,11 @@ public long getByteCount() {
@Override
public void close() throws Exception {
if (!isClosed) {
inputStream.close();
// inputStream can be null if the accept method encounters
// an error before inputStream is initialized
if (inputStream != null) {
inputStream.close();
}
bufferStorage.deleteFile();
isClosed = true;
}
Expand Down
1 change: 1 addition & 0 deletions airbyte-integrations/builds.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
|:---------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Azure Blob Storage | [![destination-azure-blob-storage](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-azure-blob-storage%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-azure-blob-storage) |
| BigQuery | [![destination-bigquery](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-bigquery%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-bigquery) |
| BigQuery Denormalized | [![destination-bigquery-denormalized](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-bigquery-denormalized%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-bigquery-denormalized) |
| ClickHouse | [![destination-clickhouse](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-clickhouse%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-clickhouse) |
| Cassandra | [![destination-cassandra](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-cassandra%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-cassandra) |
| Databricks | [![destination-databricks](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-databricks%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-databricks) |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ ENV ENABLE_SENTRY true

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.2.15
LABEL io.airbyte.version=0.3.0
LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies {
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-integrations:connectors:destination-s3')
implementation project(':airbyte-integrations:connectors:destination-gcs')
implementation group: 'org.apache.parquet', name: 'parquet-avro', version: '1.12.0'

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-bigquery-denormalized')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,32 @@
package io.airbyte.integrations.destination.bigquery;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter;
import io.airbyte.integrations.destination.bigquery.formatter.DefaultBigQueryDenormalizedRecordFormatter;
import io.airbyte.integrations.destination.bigquery.formatter.GcsBigQueryDenormalizedRecordFormatter;
import io.airbyte.integrations.destination.bigquery.uploader.UploaderType;
import io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.avro.Schema;

public class BigQueryDenormalizedDestination extends BigQueryDestination {

private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDenormalizedDestination.class);

@Override
protected String getTargetTableName(final String streamName) {
// This BigQuery destination does not write to a staging "raw" table but directly to a normalized
// table
return getNamingResolver().getIdentifier(streamName);
return namingResolver.getIdentifier(streamName);
}

@Override
protected Map<UploaderType, BigQueryRecordFormatter> getFormatterMap(final JsonNode jsonSchema) {
return Map.of(UploaderType.STANDARD, new DefaultBigQueryDenormalizedRecordFormatter(jsonSchema, getNamingResolver()),
UploaderType.AVRO, new GcsBigQueryDenormalizedRecordFormatter(jsonSchema, getNamingResolver()));
return Map.of(UploaderType.STANDARD, new DefaultBigQueryDenormalizedRecordFormatter(jsonSchema, namingResolver),
UploaderType.AVRO, new GcsBigQueryDenormalizedRecordFormatter(jsonSchema, namingResolver));
}

/**
Expand All @@ -45,6 +46,27 @@ protected boolean isDefaultAirbyteTmpTableSchema() {
return false;
}

@Override
protected BiFunction<BigQueryRecordFormatter, AirbyteStreamNameNamespacePair, Schema> getAvroSchemaCreator() {
// the json schema needs to be processed by the record former to denormalize
return (formatter, pair) -> new JsonToAvroSchemaConverter().getAvroSchema(formatter.getJsonSchema(), pair.getName(),
pair.getNamespace(), true, false, false, true);
}

@Override
protected Function<JsonNode, BigQueryRecordFormatter> getRecordFormatterCreator(final BigQuerySQLNameTransformer namingResolver) {
return streamSchema -> new GcsBigQueryDenormalizedRecordFormatter(streamSchema, namingResolver);
}

/**
* This BigQuery destination does not write to a staging "raw" table but directly to a normalized
* table.
*/
@Override
protected Function<String, String> getTargetTableNameTransformer(final BigQuerySQLNameTransformer namingResolver) {
return namingResolver::getIdentifier;
}

public static void main(final String[] args) throws Exception {
final Destination destination = new BigQueryDenormalizedDestination();
new IntegrationRunner(destination).run(args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ void testNestedWrite(final JsonNode schema, final AirbyteMessage message) throws
final BigQueryDestination destination = new BigQueryDenormalizedDestination();
final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector);

consumer.start();
consumer.accept(message);
consumer.close();

Expand All @@ -257,6 +258,7 @@ void testWriteWithFormat() throws Exception {
final BigQueryDestination destination = new BigQueryDenormalizedDestination();
final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector);

consumer.start();
consumer.accept(MESSAGE_USERS3);
consumer.close();

Expand Down Expand Up @@ -293,6 +295,7 @@ void testIfJSONDateTimeWasConvertedToBigQueryFormat() throws Exception {
final BigQueryDestination destination = new BigQueryDenormalizedDestination();
final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector);

consumer.start();
consumer.accept(MESSAGE_USERS4);
consumer.close();

Expand All @@ -317,6 +320,7 @@ void testJsonReferenceDefinition() throws Exception {
final BigQueryDestination destination = new BigQueryDenormalizedDestination();
final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector);

consumer.start();
consumer.accept(MESSAGE_USERS5);
consumer.accept(MESSAGE_USERS6);
consumer.accept(EMPTY_MESSAGE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ ENV ENABLE_SENTRY true

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.2
LABEL io.airbyte.version=1.1.0
LABEL io.airbyte.name=airbyte/destination-bigquery
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ application {
dependencies {
implementation 'com.google.cloud:google-cloud-bigquery:1.122.2'
implementation 'org.apache.commons:commons-lang3:3.11'

// csv
implementation 'org.apache.commons:commons-csv:1.4'
implementation group: 'org.apache.parquet', name: 'parquet-avro', version: '1.12.0'

implementation group: 'com.google.cloud', name: 'google-cloud-storage', version: '2.4.5'
implementation group: 'com.codepoetics', name: 'protonpack', version: '1.13'
Expand Down
Loading

0 comments on commit 8bd2d9b

Please sign in to comment.