From 30e1e6a38ac966fec0a26b1bc03f662500367625 Mon Sep 17 00:00:00 2001 From: Gireesh Sreepathi Date: Wed, 15 May 2024 23:43:40 -0700 Subject: [PATCH] bq-delete-obsolote --- .../destination-bigquery/build.gradle | 5 +- .../destination-bigquery/metadata.yaml | 2 +- .../bigquery/BigQueryAsyncFlush.java | 105 --------- .../bigquery/BigQueryAsyncStandardFlush.java | 59 ----- .../bigquery/BigQueryGcsOperations.java | 195 ---------------- .../BigQueryRecordStandardConsumer.java | 211 ------------------ .../BigQueryStagingConsumerFactory.java | 158 ------------- .../destination/bigquery/BigQueryUtils.java | 25 +-- .../formatter/BigQueryRecordFormatter.java | 10 +- .../BigQueryDestinationHandler.java | 23 +- .../BigQueryV2TableMigrator.java | 81 ------- .../uploader/BigQueryDirectUploader.java | 81 ------- .../uploader/BigQueryUploaderFactory.java | 117 ---------- .../uploader/config/UploaderConfig.java | 24 -- .../bigquery/writer/BigQueryTableWriter.java | 38 ---- .../bigquery/BigQueryDestination.kt | 34 +-- .../operation/BigQueryStorageOperation.kt | 29 +-- .../bigquery/BigQueryDestinationTest.java | 6 - .../AbstractBigQueryTypingDedupingTest.java | 1 - docs/integrations/destinations/bigquery.md | 1 + 20 files changed, 50 insertions(+), 1155 deletions(-) delete mode 100644 airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAsyncFlush.java delete mode 100644 airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAsyncStandardFlush.java delete mode 100644 airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsOperations.java delete mode 100644 airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordStandardConsumer.java delete mode 100644 airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java delete mode 100644 airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryV2TableMigrator.java delete mode 100644 airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryDirectUploader.java delete mode 100644 airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryUploaderFactory.java delete mode 100644 airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/config/UploaderConfig.java delete mode 100644 airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/writer/BigQueryTableWriter.java diff --git a/airbyte-integrations/connectors/destination-bigquery/build.gradle b/airbyte-integrations/connectors/destination-bigquery/build.gradle index 1d6e880bf7da6..c23be7d3cc4b0 100644 --- a/airbyte-integrations/connectors/destination-bigquery/build.gradle +++ b/airbyte-integrations/connectors/destination-bigquery/build.gradle @@ -3,7 +3,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.35.0' + cdkVersionRequired = '0.35.7' features = [ 'db-destinations', 'datastore-bigquery', @@ -11,7 +11,7 @@ airbyteJavaConnector { 'gcs-destinations', 'core', ] - useLocalCdk = true + useLocalCdk = false } java { @@ -37,6 +37,5 @@ application { } dependencies { - implementation 'com.codepoetics:protonpack:1.13' implementation 'org.apache.commons:commons-text:1.10.0' } diff --git a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml index 16f3ecb5fcb8e..9ce4d0628bc24 100644 --- a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml +++ b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 - dockerImageTag: 2.4.20 + dockerImageTag: 2.5.0 dockerRepository: airbyte/destination-bigquery documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery githubIssueLabel: destination-bigquery diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAsyncFlush.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAsyncFlush.java deleted file mode 100644 index 142a307aa64f7..0000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAsyncFlush.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.bigquery; - -import com.google.cloud.bigquery.TableId; -import io.airbyte.cdk.integrations.base.JavaBaseConstants.DestinationColumns; -import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction; -import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage; -import io.airbyte.cdk.integrations.destination.record_buffer.FileBuffer; -import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer; -import io.airbyte.cdk.integrations.destination.s3.csv.CsvSerializedBuffer; -import io.airbyte.cdk.integrations.destination.s3.csv.StagingDatabaseCsvSheetGenerator; -import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; -import io.airbyte.integrations.base.destination.typing_deduping.StreamId; -import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.v0.StreamDescriptor; -import java.util.Map; -import java.util.stream.Stream; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.io.FileUtils; - -/** - * Async flushing logic. Flushing async prevents backpressure and is the superior flushing strategy. - */ -@Slf4j -class BigQueryAsyncFlush implements DestinationFlushFunction { - - private final Map streamConfigMap; - private final BigQueryGcsOperations stagingOperations; - private final ConfiguredAirbyteCatalog catalog; - - public BigQueryAsyncFlush( - final Map streamConfigMap, - final BigQueryGcsOperations stagingOperations, - final ConfiguredAirbyteCatalog catalog) { - this.streamConfigMap = streamConfigMap; - this.stagingOperations = stagingOperations; - this.catalog = catalog; - } - - @Override - public void flush(final StreamDescriptor decs, final Stream stream) throws Exception { - final SerializableBuffer writer; - try { - writer = new CsvSerializedBuffer( - new FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX), - new StagingDatabaseCsvSheetGenerator(DestinationColumns.V2_WITHOUT_META), - true); - - stream.forEach(record -> { - try { - writer.accept(record.getSerialized(), Jsons.serialize(record.getRecord().getMeta()), record.getRecord().getEmittedAt()); - } catch (final Exception e) { - throw new RuntimeException(e); - } - }); - } catch (final Exception e) { - throw new RuntimeException(e); - } - - writer.flush(); - log.info("Flushing CSV buffer for stream {} ({}) to staging", decs.getName(), FileUtils.byteCountToDisplaySize(writer.getByteCount())); - if (!streamConfigMap.containsKey(decs)) { - throw new IllegalArgumentException( - String.format("Message contained record from a stream that was not in the catalog. \ncatalog: %s", Jsons.serialize(catalog))); - } - - final StreamId streamId = streamConfigMap.get(decs).getId(); - try { - final String stagedFileName = stagingOperations.uploadRecordsToStage(streamId.getRawNamespace(), streamId.getOriginalName(), writer); - - stagingOperations.copyIntoTableFromStage( - streamId.getRawNamespace(), - streamId.getOriginalName(), - TableId.of(streamId.getRawNamespace(), streamId.getRawName()), - BigQueryRecordFormatter.SCHEMA_V2, - stagedFileName); - } catch (final Exception e) { - log.error("Failed to flush and commit buffer data into destination's raw table", e); - throw new RuntimeException("Failed to upload buffer to stage and commit to destination", e); - } - - writer.close(); - } - - @Override - public long getOptimalBatchSizeBytes() { - // Chosen arbitrarily (mostly to match legacy behavior). We have no reason to believe a larger - // number would be worse. - // This was previously set to 25MB, which ran into rate-limiting issues: - // https://cloud.google.com/bigquery/quotas#standard_tables - // > Your project can make up to 1,500 table modifications per table per day - return 200 * 1024 * 1024; - } - - @Override - public long getQueueFlushThresholdBytes() { - return 200 * 1024 * 1024; - } - -} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAsyncStandardFlush.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAsyncStandardFlush.java deleted file mode 100644 index 9cb5ba5792ba9..0000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAsyncStandardFlush.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.bigquery; - -import com.google.common.util.concurrent.RateLimiter; -import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction; -import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage; -import io.airbyte.integrations.destination.bigquery.uploader.BigQueryDirectUploader; -import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; -import io.airbyte.protocol.models.v0.StreamDescriptor; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Supplier; -import java.util.stream.Stream; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -public class BigQueryAsyncStandardFlush implements DestinationFlushFunction { - - // TODO remove this once the async framework supports rate-limiting/backpressuring - private static final RateLimiter rateLimiter = RateLimiter.create(0.07); - private final Supplier> uploaderMap; - - public BigQueryAsyncStandardFlush(final Supplier> uploaderMap) { - this.uploaderMap = uploaderMap; - } - - @Override - public void flush(final StreamDescriptor decs, final Stream stream) throws Exception { - rateLimiter.acquire(); - final ConcurrentMap uploaderMapSupplied = uploaderMap.get(); - final AtomicInteger recordCount = new AtomicInteger(); - stream.forEach(aibyteMessage -> { - try { - final AirbyteStreamNameNamespacePair sd = new AirbyteStreamNameNamespacePair(aibyteMessage.getRecord().getStream(), - aibyteMessage.getRecord().getNamespace()); - uploaderMapSupplied.get(sd).upload(aibyteMessage); - recordCount.getAndIncrement(); - } catch (final Exception e) { - log.error("An error happened while trying to flush a record to big query", e); - throw e; - } - }); - uploaderMapSupplied.values().forEach(test -> test.closeAfterPush()); - } - - @Override - public long getOptimalBatchSizeBytes() { - // todo(ryankfu): this should be per-destination specific. currently this is for Snowflake. - // The size chosen is currently for improving the performance of low memory connectors. With 1 Gi of - // resource the connector will usually at most fill up around 150 MB in a single queue. By lowering - // the batch size, the AsyncFlusher will flush in smaller batches which allows for memory to be - // freed earlier similar to a sliding window effect - return Double.valueOf(Runtime.getRuntime().maxMemory() * 0.2).longValue(); - } - -} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsOperations.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsOperations.java deleted file mode 100644 index 9b2bfd087bf0e..0000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsOperations.java +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.bigquery; - -import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.BigQueryException; -import com.google.cloud.bigquery.FormatOptions; -import com.google.cloud.bigquery.Job; -import com.google.cloud.bigquery.JobInfo; -import com.google.cloud.bigquery.JobInfo.WriteDisposition; -import com.google.cloud.bigquery.LoadJobConfiguration; -import com.google.cloud.bigquery.Schema; -import com.google.cloud.bigquery.TableId; -import io.airbyte.cdk.integrations.destination.StandardNameTransformer; -import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig; -import io.airbyte.cdk.integrations.destination.gcs.GcsStorageOperations; -import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer; -import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil; -import io.airbyte.commons.exceptions.ConfigErrorException; -import java.util.HashSet; -import java.util.Set; -import java.util.UUID; -import org.joda.time.DateTime; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class BigQueryGcsOperations { - - private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryGcsOperations.class); - - private final BigQuery bigQuery; - private final StandardNameTransformer gcsNameTransformer; - private final GcsDestinationConfig gcsConfig; - private final GcsStorageOperations gcsStorageOperations; - - private final String datasetLocation; - private final UUID randomStagingId; - private final DateTime syncDatetime; - private final boolean keepStagingFiles; - private final Set existingSchemas = new HashSet<>(); - - public BigQueryGcsOperations(final BigQuery bigQuery, - final StandardNameTransformer gcsNameTransformer, - final GcsDestinationConfig gcsConfig, - final GcsStorageOperations gcsStorageOperations, - final String datasetLocation, // TODO: Is this information same as GcsConfig.bucketRegion? - final UUID randomStagingId, - final DateTime syncDatetime, - final boolean keepStagingFiles) { - this.bigQuery = bigQuery; - this.gcsNameTransformer = gcsNameTransformer; - this.gcsConfig = gcsConfig; - this.gcsStorageOperations = gcsStorageOperations; - this.datasetLocation = datasetLocation; - this.randomStagingId = randomStagingId; - this.syncDatetime = syncDatetime; - this.keepStagingFiles = keepStagingFiles; - } - - /** - * @return {@code /_} - */ - private String getStagingRootPath(final String datasetId, final String stream) { - return gcsNameTransformer.applyDefaultCase(String.format("%s/%s_%s", - gcsConfig.getBucketPath(), - gcsNameTransformer.convertStreamName(datasetId), - gcsNameTransformer.convertStreamName(stream))); - } - - /** - * @return {@code /_//////} - */ - public String getStagingFullPath(final String datasetId, final String stream) { - return gcsNameTransformer.applyDefaultCase(String.format("%s/%s/%02d/%02d/%02d/%s/", - getStagingRootPath(datasetId, stream), - syncDatetime.year().get(), - syncDatetime.monthOfYear().get(), - syncDatetime.dayOfMonth().get(), - syncDatetime.hourOfDay().get(), - randomStagingId)); - } - - public void createSchemaIfNotExists(final String datasetId) { - if (!existingSchemas.contains(datasetId)) { - LOGGER.info("Creating dataset {}", datasetId); - try { - BigQueryUtils.getOrCreateDataset(bigQuery, datasetId, datasetLocation); - } catch (final BigQueryException e) { - if (ConnectorExceptionUtil.HTTP_AUTHENTICATION_ERROR_CODES.contains(e.getCode())) { - throw new ConfigErrorException(e.getMessage(), e); - } else { - throw e; - } - } - existingSchemas.add(datasetId); - } - } - - public void createTableIfNotExists(final TableId tableId, final Schema tableSchema) { - LOGGER.info("Creating target table {}", tableId); - BigQueryUtils.createPartitionedTableIfNotExists(bigQuery, tableId, tableSchema); - } - - public void createStageIfNotExists(final String datasetId, final String stream) { - final String objectPath = getStagingFullPath(datasetId, stream); - LOGGER.info("Creating staging path for stream {} (dataset {}): {}", stream, datasetId, objectPath); - gcsStorageOperations.createBucketIfNotExists(); - } - - public String uploadRecordsToStage(final String datasetId, final String stream, final SerializableBuffer writer) { - final String objectPath = getStagingFullPath(datasetId, stream); - LOGGER.info("Uploading records to staging for stream {} (dataset {}): {}", stream, datasetId, objectPath); - return gcsStorageOperations.uploadRecordsToBucket(writer, datasetId, objectPath); - } - - /** - * Similar to COPY INTO within - * {@link io.airbyte.cdk.integrations.destination.staging.StagingOperations} which loads the data - * stored in the stage area into a target table in the destination - * - * Reference - * https://googleapis.dev/java/google-cloud-clients/latest/index.html?com/google/cloud/bigquery/package-summary.html - */ - public void copyIntoTableFromStage(final String datasetId, - final String stream, - final TableId tableId, - final Schema tableSchema, - final String stagedFileName) { - LOGGER.info("Uploading records from staging files to target table {} (dataset {}): {}", - tableId, datasetId, stagedFileName); - - final String fullFilePath = String.format("gs://%s/%s%s", gcsConfig.getBucketName(), getStagingFullPath(datasetId, stream), stagedFileName); - LOGGER.info("Uploading staged file: {}", fullFilePath); - final LoadJobConfiguration configuration = LoadJobConfiguration.builder(tableId, fullFilePath) - .setFormatOptions(FormatOptions.csv()) - .setSchema(tableSchema) - .setWriteDisposition(WriteDisposition.WRITE_APPEND) - .setJobTimeoutMs(600000L) // 10 min - .build(); - - final Job loadJob = this.bigQuery.create(JobInfo.of(configuration)); - LOGGER.info("[{}] Created a new job to upload record(s) to target table {} (dataset {}): {}", loadJob.getJobId(), - tableId, datasetId, loadJob); - - try { - BigQueryUtils.waitForJobFinish(loadJob); - LOGGER.info("[{}] Target table {} (dataset {}) is successfully appended with staging files", loadJob.getJobId(), - tableId, datasetId); - } catch (final BigQueryException | InterruptedException e) { - throw new RuntimeException( - String.format("[%s] Failed to upload staging files to destination table %s (%s)", loadJob.getJobId(), - tableId, datasetId), - e); - } - } - - public void dropTableIfExists(final String datasetId, final TableId tableId) { - LOGGER.info("Deleting target table {} (dataset {})", tableId, datasetId); - bigQuery.delete(tableId); - } - - public void dropStageIfExists(final String datasetId, final String stream) { - if (keepStagingFiles) { - return; - } - - final String stagingDatasetPath = getStagingRootPath(datasetId, stream); - LOGGER.info("Cleaning up staging path for stream {} (dataset {}): {}", stream, datasetId, stagingDatasetPath); - gcsStorageOperations.dropBucketObject(stagingDatasetPath); - } - - /** - * "Truncates" table, this is a workaround to the issue with TRUNCATE TABLE in BigQuery where the - * table's partition filter must be turned off to truncate. Since deleting a table is a free - * operation this option re-uses functions that already exist - * - *

- * See: https://cloud.google.com/bigquery/pricing#free - *

- * - * @param datasetId equivalent to schema name - * @param tableId table name - * @param schema schema of the table to be deleted/created - */ - public void truncateTableIfExists(final String datasetId, - final TableId tableId, - final Schema schema) { - LOGGER.info("Truncating target table {} (dataset {})", tableId, datasetId); - dropTableIfExists(datasetId, tableId); - createTableIfNotExists(tableId, schema); - } - -} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordStandardConsumer.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordStandardConsumer.java deleted file mode 100644 index 86de7c49ae31d..0000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordStandardConsumer.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.bigquery; - -import com.fasterxml.jackson.databind.JsonNode; -import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.TableId; -import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer; -import io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer; -import io.airbyte.cdk.integrations.destination.async.buffers.BufferManager; -import io.airbyte.cdk.integrations.destination.async.state.FlushFailure; -import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseFunction; -import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction; -import io.airbyte.integrations.base.destination.typing_deduping.DefaultTyperDeduper; -import io.airbyte.integrations.base.destination.typing_deduping.NoOpTyperDeduperWithV1V2Migrations; -import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog; -import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; -import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper; -import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; -import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler; -import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator; -import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV1V2Migrator; -import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV2TableMigrator; -import io.airbyte.integrations.destination.bigquery.uploader.BigQueryDirectUploader; -import io.airbyte.integrations.destination.bigquery.uploader.BigQueryUploaderFactory; -import io.airbyte.integrations.destination.bigquery.uploader.config.UploaderConfig; -import io.airbyte.protocol.models.v0.AirbyteMessage; -import io.airbyte.protocol.models.v0.AirbyteStream; -import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; -import io.airbyte.protocol.models.v0.DestinationSyncMode; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; -import java.util.function.Consumer; -import java.util.function.Supplier; -import lombok.extern.slf4j.Slf4j; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Slf4j -@SuppressWarnings("try") -public class BigQueryRecordStandardConsumer extends AsyncStreamConsumer { - - private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryRecordStandardConsumer.class); - - public BigQueryRecordStandardConsumer(Consumer outputRecordCollector, - OnStartFunction onStart, - OnCloseFunction onClose, - ConfiguredAirbyteCatalog catalog, - String defaultNamespace, - Supplier> uploaderMap) { - super(outputRecordCollector, - onStart, - onClose, - new BigQueryAsyncStandardFlush(uploaderMap), - catalog, - new BufferManager((long) (Runtime.getRuntime().maxMemory() * 0.5)), - Optional.ofNullable(defaultNamespace), - new FlushFailure(), - Executors.newFixedThreadPool(2)); - } - - public static SerializedAirbyteMessageConsumer createStandardConsumer(final BigQuery bigquery, - final JsonNode config, - final ConfiguredAirbyteCatalog catalog, - final ParsedCatalog parsedCatalog, - final Consumer outputRecordCollector, - final BigQuerySqlGenerator sqlGenerator, - final BigQueryDestinationHandler destinationHandler, - final boolean disableTypeDedupe) - throws Exception { - // Code related to initializing standard insert consumer isolated in this class file. - final TyperDeduper typerDeduper = - buildTyperDeduper(sqlGenerator, parsedCatalog, destinationHandler, bigquery, disableTypeDedupe); - return getStandardRecordConsumer(bigquery, config, catalog, parsedCatalog, outputRecordCollector, typerDeduper); - - } - - private static SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuery bigquery, - final JsonNode config, - final ConfiguredAirbyteCatalog catalog, - final ParsedCatalog parsedCatalog, - final Consumer outputRecordCollector, - final TyperDeduper typerDeduper) - throws Exception { - final Supplier> writeConfigs = getUploaderMap( - bigquery, - config, - catalog, - parsedCatalog); - - final String bqNamespace = BigQueryUtils.getDatasetId(config); - - return new BigQueryRecordStandardConsumer( - outputRecordCollector, - () -> { - typerDeduper.prepareSchemasAndRunMigrations(); - - // Set up our raw tables - writeConfigs.get().forEach((streamId, uploader) -> { - final StreamConfig stream = parsedCatalog.getStream(streamId); - if (stream.getDestinationSyncMode() == DestinationSyncMode.OVERWRITE) { - // For streams in overwrite mode, truncate the raw table. - // non-1s1t syncs actually overwrite the raw table at the end of the sync, so we only do this in - // 1s1t mode. - final TableId rawTableId = TableId.of(stream.getId().getRawNamespace(), stream.getId().getRawName()); - LOGGER.info("Deleting Raw table {}", rawTableId); - if (!bigquery.delete(rawTableId)) { - LOGGER.info("Raw table {} not found, continuing with creation", rawTableId); - } - LOGGER.info("Creating table {}", rawTableId); - BigQueryUtils.createPartitionedTableIfNotExists(bigquery, rawTableId, BigQueryRecordFormatter.SCHEMA_V2); - } else { - uploader.createRawTable(); - } - }); - - typerDeduper.prepareFinalTables(); - }, - (hasFailed, streamSyncSummaries) -> { - try { - Thread.sleep(30 * 1000); - typerDeduper.typeAndDedupe(streamSyncSummaries); - typerDeduper.commitFinalTables(); - typerDeduper.cleanup(); - } catch (final Exception e) { - throw new RuntimeException(e); - } - }, - catalog, - bqNamespace, - writeConfigs); - } - - protected static Supplier> getUploaderMap( - final BigQuery bigquery, - final JsonNode config, - final ConfiguredAirbyteCatalog catalog, - final ParsedCatalog parsedCatalog) - throws IOException { - return () -> { - final ConcurrentMap uploaderMap = new ConcurrentHashMap<>(); - for (final ConfiguredAirbyteStream configStream : catalog.getStreams()) { - final AirbyteStream stream = configStream.getStream(); - final StreamConfig parsedStream; - - final String targetTableName; - - parsedStream = parsedCatalog.getStream(stream.getNamespace(), stream.getName()); - targetTableName = parsedStream.getId().getRawName(); - - final UploaderConfig uploaderConfig = UploaderConfig - .builder() - .bigQuery(bigquery) - .parsedStream(parsedStream) - .bigQueryClientChunkSize(BigQueryUtils.getBigQueryClientChunkSize(config)) - .datasetLocation(BigQueryUtils.getDatasetLocation(config)) - .formatter(new BigQueryRecordFormatter(new BigQuerySQLNameTransformer())) - .targetTableName(targetTableName) - .build(); - - try { - putStreamIntoUploaderMap(stream, uploaderConfig, uploaderMap); - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - return uploaderMap; - }; - } - - protected static void putStreamIntoUploaderMap(final AirbyteStream stream, - final UploaderConfig uploaderConfig, - final Map uploaderMap) - throws IOException { - uploaderMap.put( - AirbyteStreamNameNamespacePair.fromAirbyteStream(stream), - BigQueryUploaderFactory.getUploader(uploaderConfig)); - } - - private static TyperDeduper buildTyperDeduper(final BigQuerySqlGenerator sqlGenerator, - final ParsedCatalog parsedCatalog, - final BigQueryDestinationHandler destinationHandler, - final BigQuery bigquery, - final boolean disableTypeDedupe) { - final BigQueryV1V2Migrator migrator = new BigQueryV1V2Migrator(bigquery, new BigQuerySQLNameTransformer()); - final BigQueryV2TableMigrator v2RawTableMigrator = new BigQueryV2TableMigrator(bigquery); - - if (disableTypeDedupe) { - return new NoOpTyperDeduperWithV1V2Migrations<>( - sqlGenerator, destinationHandler, parsedCatalog, migrator, v2RawTableMigrator, List.of()); - } - - return new DefaultTyperDeduper<>( - sqlGenerator, - destinationHandler, - parsedCatalog, - migrator, - v2RawTableMigrator, - List.of()); - } - -} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java deleted file mode 100644 index 72f6362d61c0c..0000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.bigquery; - -import static io.airbyte.cdk.integrations.base.JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE; - -import com.google.cloud.bigquery.TableId; -import com.google.common.base.Functions; -import com.google.common.base.Preconditions; -import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer; -import io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer; -import io.airbyte.cdk.integrations.destination.async.buffers.BufferManager; -import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction; -import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseFunction; -import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction; -import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog; -import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; -import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper; -import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; -import io.airbyte.protocol.models.v0.AirbyteMessage; -import io.airbyte.protocol.models.v0.AirbyteStream; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.v0.DestinationSyncMode; -import io.airbyte.protocol.models.v0.StreamDescriptor; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.function.Consumer; -import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class mimics the same functionality as - * {@link io.airbyte.cdk.integrations.destination.staging.StagingConsumerFactory} which likely - * should be placed into a commons package to be utilized across all ConsumerFactories - */ -public class BigQueryStagingConsumerFactory { - - private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryStagingConsumerFactory.class); - - public SerializedAirbyteMessageConsumer createAsync( - final ConfiguredAirbyteCatalog catalog, - final Consumer outputRecordCollector, - final BigQueryGcsOperations bigQueryGcsOperations, - final TyperDeduper typerDeduper, - final ParsedCatalog parsedCatalog, - final String defaultNamespace) { - final Map streamConfigMap = createWriteConfigs( - catalog, - parsedCatalog); - - final DestinationFlushFunction flusher = new BigQueryAsyncFlush(streamConfigMap, bigQueryGcsOperations, catalog); - return new AsyncStreamConsumer( - outputRecordCollector, - onStartFunction(bigQueryGcsOperations, parsedCatalog.getStreams(), typerDeduper), - onCloseFunction(bigQueryGcsOperations, parsedCatalog.getStreams(), typerDeduper), - flusher, - catalog, - new BufferManager(getBigQueryBufferMemoryLimit()), - Optional.ofNullable(defaultNamespace)); - } - - /** - * Out BigQuery's uploader threads use a fair amount of memory. We believe this is largely due to - * the sdk client we use. - * - * @return number of bytes to make available for message buffering. - */ - private long getBigQueryBufferMemoryLimit() { - return (long) (Runtime.getRuntime().maxMemory() * 0.4); - } - - private Map createWriteConfigs(final ConfiguredAirbyteCatalog catalog, - final ParsedCatalog parsedCatalog) { - return catalog.getStreams().stream() - .map(configuredStream -> { - Preconditions.checkNotNull(configuredStream.getDestinationSyncMode(), "Undefined destination sync mode"); - - final AirbyteStream stream = configuredStream.getStream(); - return parsedCatalog.getStream(stream.getNamespace(), stream.getName()); - }) - .collect(Collectors.toMap( - c -> new StreamDescriptor().withName(c.getId().getOriginalName()).withNamespace(c.getId().getOriginalNamespace()), - Functions.identity())); - } - - /** - * @param bigQueryGcsOperations collection of Google Cloud Storage Operations - * @param streamConfigs configuration settings used to describe how to write data and where it - * exists - */ - private OnStartFunction onStartFunction(final BigQueryGcsOperations bigQueryGcsOperations, - final List streamConfigs, - final TyperDeduper typerDeduper) { - return () -> { - LOGGER.info("Preparing airbyte_raw tables in destination started for {} streams", streamConfigs.size()); - typerDeduper.prepareSchemasAndRunMigrations(); - - for (final StreamConfig streamConfig : streamConfigs) { - final var tableId = TableId.of(streamConfig.getId().getRawNamespace(), streamConfig.getId().getRawName()); - LOGGER.info("Preparing staging are in destination for schema: {}, stream: {}, target table: {}, stage: {}", - BigQueryRecordFormatter.SCHEMA_V2, streamConfig.getId().getOriginalName(), - tableId, streamConfig.getId().getOriginalName()); - // In Destinations V2, we will always use the 'airbyte_internal' schema/originalNamespace for raw - // tables - final String rawDatasetId = DEFAULT_AIRBYTE_INTERNAL_NAMESPACE; - // Regardless, ensure the schema the customer wants to write to exists - bigQueryGcsOperations.createSchemaIfNotExists(streamConfig.getId().getRawNamespace()); - // Schema used for raw and airbyte internal tables - bigQueryGcsOperations.createSchemaIfNotExists(rawDatasetId); - // Customer's destination schema - // With checkpointing, we will be creating the target table earlier in the setup such that - // the data can be immediately loaded from the staging area - bigQueryGcsOperations.createTableIfNotExists(tableId, BigQueryRecordFormatter.SCHEMA_V2); - bigQueryGcsOperations.createStageIfNotExists(rawDatasetId, streamConfig.getId().getOriginalName()); - // When OVERWRITE mode, truncate the destination's raw table prior to syncing data - if (streamConfig.getDestinationSyncMode() == DestinationSyncMode.OVERWRITE) { - // TODO: this might need special handling during the migration - bigQueryGcsOperations.truncateTableIfExists(rawDatasetId, tableId, BigQueryRecordFormatter.SCHEMA_V2); - } - } - - typerDeduper.prepareFinalTables(); - LOGGER.info("Preparing tables in destination completed."); - }; - } - - /** - * Tear down process, will attempt to clean out any staging area - * - * @param bigQueryGcsOperations collection of staging operations - * @param streamConfigs configuration settings used to describe how to write data and where it - * exists - */ - private OnCloseFunction onCloseFunction(final BigQueryGcsOperations bigQueryGcsOperations, - final List streamConfigs, - final TyperDeduper typerDeduper) { - return (hasFailed, streamSyncSummaries) -> { - /* - * Previously the hasFailed value was used to commit any remaining staged files into destination, - * however, with the changes to checkpointing this will no longer be necessary since despite partial - * successes, we'll be committing the target table (aka airbyte_raw) table throughout the sync - */ - typerDeduper.typeAndDedupe(streamSyncSummaries); - LOGGER.info("Cleaning up destination started for {} streams", streamConfigs.size()); - for (final StreamConfig streamConfig : streamConfigs) { - bigQueryGcsOperations.dropStageIfExists(streamConfig.getId().getRawNamespace(), streamConfig.getId().getOriginalName()); - } - typerDeduper.commitFinalTables(); - typerDeduper.cleanup(); - LOGGER.info("Cleaning up destination completed."); - }; - } - -} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java index 7c3cad404e25e..be378b795891e 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java @@ -36,12 +36,9 @@ import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig; import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.json.Jsons; -import java.lang.management.ManagementFactory; -import java.lang.management.MemoryMXBean; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; @@ -89,16 +86,6 @@ static Job waitForQuery(final Job queryJob) { } } - public static void createSchemaAndTableIfNeeded(final BigQuery bigquery, - final Set existingSchemas, - final String schemaName, - final String datasetLocation) { - if (!existingSchemas.contains(schemaName)) { - getOrCreateDataset(bigquery, schemaName, datasetLocation); - existingSchemas.add(schemaName); - } - } - public static Dataset getOrCreateDataset(final BigQuery bigquery, final String datasetId, final String datasetLocation) { Dataset dataset = bigquery.getDataset(datasetId); if (dataset == null || !dataset.exists()) { @@ -218,7 +205,8 @@ public static void createPartitionedTableIfNotExists(final BigQuery bigquery, fi } } catch (final BigQueryException e) { - LOGGER.error("Partitioned table was not created: " + tableId, e); + LOGGER.error("Partitioned table was not created: {}", tableId, e); + throw e; } } @@ -366,13 +354,4 @@ private static String getConnectorNameOrDefault() { .orElse("destination-bigquery"); } - public static void printHeapMemoryConsumption() { - final int mb = 1024 * 1024; - final MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean(); - final long xmx = memoryBean.getHeapMemoryUsage().getMax() / mb; - final long xms = memoryBean.getHeapMemoryUsage().getInit() / mb; - LOGGER.info("Initial Memory (xms) mb = {}", xms); - LOGGER.info("Max Memory (xmx) : mb = {}", xmx); - } - } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.java index ccb5d513b0482..1372e810d6e8a 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.java @@ -9,15 +9,12 @@ import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.StandardSQLTypeName; import io.airbyte.cdk.integrations.base.JavaBaseConstants; -import io.airbyte.cdk.integrations.destination.StandardNameTransformer; import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage; import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteRecordMessage; import io.airbyte.commons.json.Jsons; import java.util.HashMap; import java.util.UUID; import java.util.concurrent.TimeUnit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * The class formats incoming JsonSchema and AirbyteRecord in order to be inline with a @@ -30,13 +27,8 @@ public class BigQueryRecordFormatter { Field.of(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, StandardSQLTypeName.TIMESTAMP), Field.of(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, StandardSQLTypeName.TIMESTAMP), Field.of(JavaBaseConstants.COLUMN_NAME_DATA, StandardSQLTypeName.STRING)); - private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryRecordFormatter.class); - protected final StandardNameTransformer namingResolver; - - public BigQueryRecordFormatter(final StandardNameTransformer namingResolver) { - this.namingResolver = namingResolver; - } + public BigQueryRecordFormatter() {} public String formatRecord(PartialAirbyteMessage recordMessage) { // Map.of has a @NonNull requirement, so creating a new Hash map diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java index 056a620fd89f2..46ba1c8c886cd 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java @@ -33,6 +33,8 @@ import com.google.common.collect.Streams; import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler; import io.airbyte.cdk.integrations.base.JavaBaseConstants; +import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil; +import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.integrations.base.destination.typing_deduping.AlterTableReport; import io.airbyte.integrations.base.destination.typing_deduping.ColumnId; import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler; @@ -42,6 +44,7 @@ import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; import io.airbyte.integrations.base.destination.typing_deduping.TableNotMigratedException; +import io.airbyte.integrations.destination.bigquery.BigQueryUtils; import io.airbyte.integrations.destination.bigquery.migrators.BigQueryDestinationState; import java.math.BigInteger; import java.util.ArrayList; @@ -57,10 +60,10 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.text.StringSubstitutor; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -// TODO this stuff almost definitely exists somewhere else in our codebase. public class BigQueryDestinationHandler implements DestinationHandler { private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDestinationHandler.class); @@ -320,4 +323,22 @@ private static Set getPks(final StreamConfig stream) { : Collections.emptySet(); } + @Override + public void createNamespaces(@NotNull Set schemas) { + schemas.forEach(this::createDataset); + } + + private void createDataset(final String dataset) { + LOGGER.info("Creating dataset if not present {}", dataset); + try { + BigQueryUtils.getOrCreateDataset(bq, dataset, datasetLocation); + } catch (BigQueryException e) { + if (ConnectorExceptionUtil.HTTP_AUTHENTICATION_ERROR_CODES.contains(e.getCode())) { + throw new ConfigErrorException(e.getMessage(), e); + } else { + throw e; + } + } + } + } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryV2TableMigrator.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryV2TableMigrator.java deleted file mode 100644 index 61bd4602042b1..0000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryV2TableMigrator.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.bigquery.typing_deduping; - -import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.Field; -import com.google.cloud.bigquery.FieldList; -import com.google.cloud.bigquery.LegacySQLTypeName; -import com.google.cloud.bigquery.QueryJobConfiguration; -import com.google.cloud.bigquery.Schema; -import com.google.cloud.bigquery.Table; -import com.google.cloud.bigquery.TableId; -import io.airbyte.cdk.integrations.base.JavaBaseConstants; -import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; -import io.airbyte.integrations.base.destination.typing_deduping.V2TableMigrator; -import java.util.Map; -import org.apache.commons.text.StringSubstitutor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class BigQueryV2TableMigrator implements V2TableMigrator { - - private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryV2TableMigrator.class); - - private final BigQuery bq; - - public BigQueryV2TableMigrator(final BigQuery bq) { - this.bq = bq; - } - - @Override - public void migrateIfNecessary(final StreamConfig streamConfig) throws InterruptedException { - final Table rawTable = bq.getTable(TableId.of(streamConfig.getId().getRawNamespace(), streamConfig.getId().getRawName())); - if (rawTable != null && rawTable.exists()) { - final Schema existingRawSchema = rawTable.getDefinition().getSchema(); - final FieldList fields = existingRawSchema.getFields(); - if (fields.stream().noneMatch(f -> JavaBaseConstants.COLUMN_NAME_DATA.equals(f.getName()))) { - throw new IllegalStateException( - "Table does not have a column named _airbyte_data. We are likely colliding with a completely different table."); - } - final Field dataColumn = fields.get(JavaBaseConstants.COLUMN_NAME_DATA); - if (dataColumn.getType() == LegacySQLTypeName.JSON) { - LOGGER.info("Raw table has _airbyte_data of type JSON. Migrating to STRING."); - final String tmpRawTableId = BigQuerySqlGenerator.QUOTE + streamConfig.getId().getRawNamespace() + BigQuerySqlGenerator.QUOTE + "." - + BigQuerySqlGenerator.QUOTE + streamConfig.getId().getRawName() + "_airbyte_tmp" + BigQuerySqlGenerator.QUOTE; - bq.query(QueryJobConfiguration.of( - new StringSubstitutor(Map.of( - "raw_table", streamConfig.getId().rawTableId(BigQuerySqlGenerator.QUOTE), - "tmp_raw_table", tmpRawTableId, - "real_raw_table", BigQuerySqlGenerator.QUOTE + streamConfig.getId().getRawName() + BigQuerySqlGenerator.QUOTE)).replace( - // In full refresh / append mode, standard inserts is creating a non-partitioned raw table. - // (possibly also in overwrite mode?). - // We can't just CREATE OR REPLACE the table because bigquery will complain that we're trying to - // change the partitioning scheme. - // Do an explicit CREATE tmp + DROP + RENAME, similar to how we overwrite the final tables in - // OVERWRITE mode. - """ - CREATE TABLE ${tmp_raw_table} - PARTITION BY DATE(_airbyte_extracted_at) - CLUSTER BY _airbyte_extracted_at - AS ( - SELECT - _airbyte_raw_id, - _airbyte_extracted_at, - _airbyte_loaded_at, - to_json_string(_airbyte_data) as _airbyte_data - FROM ${raw_table} - ); - DROP TABLE IF EXISTS ${raw_table}; - ALTER TABLE ${tmp_raw_table} RENAME TO ${real_raw_table}; - """))); - LOGGER.info("Completed Data column Migration for stream {}", streamConfig.getId().getRawName()); - } else { - LOGGER.info("No Data column Migration Required for stream {}", streamConfig.getId().getRawName()); - } - } - } - -} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryDirectUploader.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryDirectUploader.java deleted file mode 100644 index 2142a8edd6e6f..0000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryDirectUploader.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.bigquery.uploader; - -import static io.airbyte.integrations.destination.bigquery.BigQueryUtils.printHeapMemoryConsumption; - -import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.StandardTableDefinition; -import com.google.cloud.bigquery.Table; -import com.google.cloud.bigquery.TableId; -import com.google.cloud.bigquery.TableInfo; -import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage; -import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; -import io.airbyte.integrations.destination.bigquery.writer.BigQueryTableWriter; -import java.io.IOException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class BigQueryDirectUploader { - - private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDirectUploader.class); - - protected final TableId table; - protected final BigQueryTableWriter writer; - protected final BigQuery bigQuery; - protected final BigQueryRecordFormatter recordFormatter; - - BigQueryDirectUploader(final TableId table, - final BigQueryTableWriter writer, - final BigQuery bigQuery, - final BigQueryRecordFormatter recordFormatter) { - this.table = table; - this.writer = writer; - this.bigQuery = bigQuery; - this.recordFormatter = recordFormatter; - } - - public void upload(final PartialAirbyteMessage airbyteMessage) { - try { - writer.write(recordFormatter.formatRecord(airbyteMessage)); - } catch (final IOException | RuntimeException e) { - LOGGER.error("Got an error while writing message: {}", e.getMessage(), e); - LOGGER.error(String.format( - "Failed to process a message for job: %s", - writer.toString())); - printHeapMemoryConsumption(); - throw new RuntimeException(e); - } - } - - public void closeAfterPush() { - try { - this.writer.close(); - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - - public void createRawTable() { - // Ensure that this table exists. - final Table rawTable = bigQuery.getTable(table); - if (rawTable == null) { - LOGGER.info("Creating raw table {}.", table); - bigQuery.create(TableInfo.newBuilder(table, StandardTableDefinition.of(BigQueryRecordFormatter.SCHEMA_V2)).build()); - } else { - LOGGER.info("Found raw table {}.", rawTable.getTableId()); - } - } - - @Override - public String toString() { - return "BigQueryDirectUploader{" + - "table=" + table.getTable() + - ", writer=" + writer.getClass() + - ", recordFormatter=" + recordFormatter.getClass() + - '}'; - } - -} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryUploaderFactory.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryUploaderFactory.java deleted file mode 100644 index 4089a17b7b02a..0000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryUploaderFactory.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.bigquery.uploader; - -import static io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter.*; - -import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.BigQueryException; -import com.google.cloud.bigquery.FormatOptions; -import com.google.cloud.bigquery.JobId; -import com.google.cloud.bigquery.JobInfo; -import com.google.cloud.bigquery.TableDataWriteChannel; -import com.google.cloud.bigquery.TableId; -import com.google.cloud.bigquery.WriteChannelConfiguration; -import io.airbyte.commons.exceptions.ConfigErrorException; -import io.airbyte.integrations.destination.bigquery.BigQueryUtils; -import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; -import io.airbyte.integrations.destination.bigquery.uploader.config.UploaderConfig; -import io.airbyte.integrations.destination.bigquery.writer.BigQueryTableWriter; -import java.io.IOException; -import java.util.HashSet; -import java.util.Set; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class BigQueryUploaderFactory { - - private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryUploaderFactory.class); - - private static final int HTTP_STATUS_CODE_FORBIDDEN = 403; - private static final int HTTP_STATUS_CODE_NOT_FOUND = 404; - - private static final String CONFIG_ERROR_MSG = """ - Failed to write to destination schema. - - 1. Make sure you have all required permissions for writing to the schema. - - 2. Make sure that the actual destination schema's location corresponds to location provided - in connector's config. - - 3. Try to change the "Destination schema" from "Mirror Source Structure" (if it's set) tp the - "Destination Default" option. - - More details: - """; - - public static BigQueryDirectUploader getUploader(final UploaderConfig uploaderConfig) - throws IOException { - final String dataset = uploaderConfig.getParsedStream().getId().getRawNamespace(); - final String datasetLocation = uploaderConfig.getDatasetLocation(); - final Set existingDatasets = new HashSet<>(); - - final BigQueryRecordFormatter recordFormatter = uploaderConfig.getFormatter(); - - final TableId targetTable = TableId.of(dataset, uploaderConfig.getTargetTableName()); - - BigQueryUtils.createSchemaAndTableIfNeeded( - uploaderConfig.getBigQuery(), - existingDatasets, - dataset, - datasetLocation); - - return getBigQueryDirectUploader( - uploaderConfig.getBigQueryClientChunkSize(), - targetTable, - uploaderConfig.getBigQuery(), - datasetLocation, - recordFormatter); - } - - private static BigQueryDirectUploader getBigQueryDirectUploader(final Integer bigQueryClientChunkSize, - final TableId targetTable, - final BigQuery bigQuery, - final String datasetLocation, - final BigQueryRecordFormatter formatter) { - // https://cloud.google.com/bigquery/docs/loading-data-local#loading_data_from_a_local_data_source - LOGGER.info("Will write raw data to {} with schema {}", targetTable, SCHEMA_V2); - final WriteChannelConfiguration writeChannelConfiguration = - WriteChannelConfiguration.newBuilder(targetTable) - .setCreateDisposition(JobInfo.CreateDisposition.CREATE_IF_NEEDED) - .setSchema(SCHEMA_V2) - .setFormatOptions(FormatOptions.json()) - .build(); // new-line delimited json. - - final JobId job = JobId.newBuilder() - .setRandomJob() - .setLocation(datasetLocation) - .setProject(bigQuery.getOptions().getProjectId()) - .build(); - - final TableDataWriteChannel writer; - - try { - writer = bigQuery.writer(job, writeChannelConfiguration); - } catch (final BigQueryException e) { - if (e.getCode() == HTTP_STATUS_CODE_FORBIDDEN || e.getCode() == HTTP_STATUS_CODE_NOT_FOUND) { - throw new ConfigErrorException(CONFIG_ERROR_MSG + e); - } else { - throw new BigQueryException(e.getCode(), e.getMessage()); - } - } - - // this this optional value. If not set - use default client's value (15MiG) - if (bigQueryClientChunkSize != null) { - writer.setChunkSize(bigQueryClientChunkSize); - } - - return new BigQueryDirectUploader( - targetTable, - new BigQueryTableWriter(writer), - bigQuery, - formatter); - } - -} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/config/UploaderConfig.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/config/UploaderConfig.java deleted file mode 100644 index d94b2145f0a7b..0000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/config/UploaderConfig.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.bigquery.uploader.config; - -import com.google.cloud.bigquery.BigQuery; -import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; -import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; -import lombok.Builder; -import lombok.Getter; - -@Builder -@Getter -public class UploaderConfig { - - private Integer bigQueryClientChunkSize; - private String datasetLocation; - private StreamConfig parsedStream; - private String targetTableName; - private BigQuery bigQuery; - private BigQueryRecordFormatter formatter; - -} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/writer/BigQueryTableWriter.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/writer/BigQueryTableWriter.java deleted file mode 100644 index 50c2608ae1714..0000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/writer/BigQueryTableWriter.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.bigquery.writer; - -import com.google.cloud.bigquery.Job; -import com.google.cloud.bigquery.TableDataWriteChannel; -import com.google.common.base.Charsets; -import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler; -import java.io.IOException; -import java.nio.ByteBuffer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public record BigQueryTableWriter(TableDataWriteChannel writeChannel) { - - private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryTableWriter.class); - - public void write(final String formattedData) throws IOException { - writeChannel.write(ByteBuffer.wrap((formattedData + "\n").getBytes(Charsets.UTF_8))); - } - - public void close() throws IOException { - this.writeChannel.close(); - try { - final Job job = writeChannel.getJob(); - if (job != null && job.getStatus().getError() != null) { - AirbyteExceptionHandler.addStringForDeinterpolation(job.getEtag()); - throw new RuntimeException("Fail to complete a load job in big query, Job id: " + writeChannel.getJob().getJobId() + - ", with error: " + writeChannel.getJob().getStatus().getError()); - } - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - -} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryDestination.kt b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryDestination.kt index 11f1328506941..91e520d9e8483 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryDestination.kt +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryDestination.kt @@ -3,14 +3,13 @@ */ package io.airbyte.integrations.destination.bigquery -import com.codepoetics.protonpack.Indexed -import com.codepoetics.protonpack.StreamUtils import com.fasterxml.jackson.databind.JsonNode import com.google.auth.oauth2.GoogleCredentials import com.google.cloud.bigquery.BigQuery import com.google.cloud.bigquery.BigQueryException import com.google.cloud.bigquery.BigQueryOptions import com.google.cloud.bigquery.QueryJobConfiguration +import com.google.cloud.storage.Storage import com.google.cloud.storage.StorageOptions import com.google.common.base.Charsets import io.airbyte.cdk.integrations.BaseConnector @@ -120,20 +119,24 @@ class BigQueryDestination : BaseConnector(), Destination { try { val credentials = getServiceAccountCredentials(config) - val storage = + val storage: Storage = StorageOptions.newBuilder() .setProjectId(config[bqConstants.CONFIG_PROJECT_ID].asText()) .setCredentials(credentials) .setHeaderProvider(getHeaderProvider()) .build() .service - val permissionsCheckStatusList = + val permissionsCheckStatusList: List = storage.testIamPermissions(bucketName, REQUIRED_PERMISSIONS) + // testIamPermissions returns a list of booleans + // in the same order of the presented permissions list missingPermissions.addAll( - StreamUtils.zipWithIndex(permissionsCheckStatusList.stream()) - .filter { i: Indexed -> !i.value } - .map { i: Indexed -> REQUIRED_PERMISSIONS[Math.toIntExact(i.index)] } + permissionsCheckStatusList + .asSequence() + .withIndex() + .filter { !it.value } + .map { REQUIRED_PERMISSIONS[it.index] } .toList(), ) @@ -227,12 +230,11 @@ class BigQueryDestination : BaseConnector(), Destination { if (uploadingMethod == UploadingMethod.STANDARD) { val bigQueryClientChunkSize = getBigQueryClientChunkSize(config) - val bigQueryFormatter = BigQueryRecordFormatter(BigQuerySQLNameTransformer()) val bigQueryLoadingStorageOperation = BigQueryDirectLoadingStorageOperation( bigquery, bigQueryClientChunkSize, - bigQueryFormatter, + BigQueryRecordFormatter(), sqlGenerator, destinationHandler, datasetLocation, @@ -242,15 +244,16 @@ class BigQueryDestination : BaseConnector(), Destination { parsedCatalog, destinationHandler, defaultNamespace, - { destinationInitialStatus: DestinationInitialStatus + { initialStatus: DestinationInitialStatus, disableTD -> StandardStreamOperation( bigQueryLoadingStorageOperation, - destinationInitialStatus, - disableTypeDedupe, + initialStatus, + disableTD ) }, migrations, + disableTypeDedupe, ) return createDirectUploadConsumer( outputRecordCollector, @@ -281,16 +284,17 @@ class BigQueryDestination : BaseConnector(), Destination { parsedCatalog, destinationHandler, defaultNamespace, - { destinationInitialStatus: DestinationInitialStatus -> + { initialStatus: DestinationInitialStatus, disableTD -> StagingStreamOperations( bigQueryGcsStorageOperations, - destinationInitialStatus, + initialStatus, FileUploadFormat.CSV, V2_WITHOUT_META, - disableTypeDedupe + disableTD ) }, migrations, + disableTypeDedupe, ) return createStagingConsumer( outputRecordCollector, diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryStorageOperation.kt b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryStorageOperation.kt index de4f828f29b4f..85f79885856e0 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryStorageOperation.kt +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryStorageOperation.kt @@ -5,10 +5,7 @@ package io.airbyte.integrations.destination.bigquery.operation import com.google.cloud.bigquery.BigQuery -import com.google.cloud.bigquery.BigQueryException import com.google.cloud.bigquery.TableId -import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil -import io.airbyte.commons.exceptions.ConfigErrorException import io.airbyte.integrations.base.destination.operation.StorageOperation import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig import io.airbyte.integrations.base.destination.typing_deduping.StreamId @@ -21,6 +18,7 @@ import io.airbyte.protocol.models.v0.DestinationSyncMode import io.github.oshai.kotlinlogging.KotlinLogging import java.time.Instant import java.util.* +import java.util.concurrent.ConcurrentHashMap private val log = KotlinLogging.logger {} @@ -30,10 +28,8 @@ abstract class BigQueryStorageOperation( private val destinationHandler: BigQueryDestinationHandler, protected val datasetLocation: String ) : StorageOperation { - private val existingSchemas = HashSet() + private val existingSchemas = ConcurrentHashMap.newKeySet() override fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode) { - // Prepare staging schema - createStagingDataset(streamId) // Prepare staging table. For overwrite, it does drop-create so we can skip explicit create. if (destinationSyncMode == DestinationSyncMode.OVERWRITE) { truncateStagingTable(streamId) @@ -42,23 +38,6 @@ abstract class BigQueryStorageOperation( } } - private fun createStagingDataset(streamId: StreamId) { - // create raw schema - if (!existingSchemas.contains(streamId.rawNamespace)) { - log.info { "Creating raw namespace ${streamId.rawNamespace}" } - try { - BigQueryUtils.getOrCreateDataset(bigquery, streamId.rawNamespace, datasetLocation) - } catch (e: BigQueryException) { - if (ConnectorExceptionUtil.HTTP_AUTHENTICATION_ERROR_CODES.contains(e.code)) { - throw ConfigErrorException(e.message!!, e) - } else { - throw e - } - } - existingSchemas.add(streamId.rawNamespace) - } - } - private fun createStagingTable(streamId: StreamId) { val tableId = TableId.of(streamId.rawNamespace, streamId.rawName) BigQueryUtils.createPartitionedTableIfNotExists( @@ -91,10 +70,6 @@ abstract class BigQueryStorageOperation( abstract override fun writeToStage(streamId: StreamId, data: Data) - override fun createFinalNamespace(streamId: StreamId) { - destinationHandler.execute(sqlGenerator.createSchema(streamId.finalNamespace)) - } - override fun createFinalTable(streamConfig: StreamConfig, suffix: String, replace: Boolean) { destinationHandler.execute(sqlGenerator.createTable(streamConfig, suffix, replace)) } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java index e522b901bd9cf..9e0b2eebad84a 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java @@ -135,12 +135,6 @@ class BigQueryDestinationTest { private AmazonS3 s3Client; - /* - * TODO: Migrate all BigQuery Destination configs (GCS, Denormalized, Normalized) to no longer use - * #partitionIfUnpartitioned then recombine Base Provider. The reason for breaking this method into - * a base class is because #testWritePartitionOverUnpartitioned is no longer used only in GCS - * Staging - */ private Stream successTestConfigProviderBase() { return Stream.of( Arguments.of("config"), diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.java index 5ba14f988f351..7574a7d8146ff 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.java @@ -33,7 +33,6 @@ import java.io.IOException; import java.nio.file.Path; import java.util.List; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; public abstract class AbstractBigQueryTypingDedupingTest extends BaseTypingDedupingTest { diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index 1cc376d9d75f3..3bab2ebd0c7fe 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -220,6 +220,7 @@ tutorials: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 2.5.0 | 2024-05-17 | [38132](https://github.com/airbytehq/airbyte/pull/38132) | Major rewrite of existing code, Adapting to CDK changes introduced in [38107](https://github.com/airbytehq/airbyte/pull/38107) | | 2.4.20 | 2024-05-13 | [38131](https://github.com/airbytehq/airbyte/pull/38131) | Cleanup `BigQueryWriteConfig` and reuse `StreamConfig`; Adapt to `StreamConfig` signature changes | | 2.4.19 | 2024-05-10 | [38125](https://github.com/airbytehq/airbyte/pull/38125) | adopt latest CDK code | | 2.4.18 | 2024-05-10 | [38111](https://github.com/airbytehq/airbyte/pull/38111) | No functional changes, deleting unused code |