Skip to content

Commit

Permalink
bigquery-refactor-new-intf
Browse files Browse the repository at this point in the history
  • Loading branch information
gisripa committed May 22, 2024
1 parent 0766bd0 commit 07a9762
Show file tree
Hide file tree
Showing 23 changed files with 1,173 additions and 651 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ airbyteJavaConnector {
'gcs-destinations',
'core',
]
useLocalCdk = false
useLocalCdk = true
}

java {
Expand All @@ -22,7 +22,7 @@ java {
}

application {
mainClass = 'io.airbyte.integrations.destination.bigquery.BigQueryDestination'
mainClass = 'io.airbyte.integrations.destination.bigquery.BigQueryDestinationKt'
applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0',
'-XX:NativeMemoryTracking=detail', '-XX:+UnlockDiagnosticVMOptions',
'-XX:GCLockerRetryAllocationCount=100',
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil;
import io.airbyte.commons.exceptions.ConfigErrorException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.joda.time.DateTime;
Expand All @@ -46,7 +45,7 @@ public BigQueryGcsOperations(final BigQuery bigQuery,
final StandardNameTransformer gcsNameTransformer,
final GcsDestinationConfig gcsConfig,
final GcsStorageOperations gcsStorageOperations,
final String datasetLocation,
final String datasetLocation, // TODO: Is this information same as GcsConfig.bucketRegion?
final UUID randomStagingId,
final DateTime syncDatetime,
final boolean keepStagingFiles) {
Expand Down Expand Up @@ -157,16 +156,6 @@ public void copyIntoTableFromStage(final String datasetId,
}
}

@Deprecated
public void cleanUpStage(final String datasetId, final String stream, final List<String> stagedFiles) {
if (keepStagingFiles) {
return;
}

LOGGER.info("Deleting staging files for stream {} (dataset {}): {}", stream, datasetId, stagedFiles);
gcsStorageOperations.cleanUpBucketObject(getStagingRootPath(datasetId, stream), stagedFiles);
}

public void dropTableIfExists(final String datasetId, final TableId tableId) {
LOGGER.info("Deleting target table {} (dataset {})", tableId, datasetId);
bigQuery.delete(tableId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,53 @@

package io.airbyte.integrations.destination.bigquery;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.TableId;
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer;
import io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer;
import io.airbyte.cdk.integrations.destination.async.buffers.BufferManager;
import io.airbyte.cdk.integrations.destination.async.state.FlushFailure;
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseFunction;
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction;
import io.airbyte.integrations.base.destination.typing_deduping.DefaultTyperDeduper;
import io.airbyte.integrations.base.destination.typing_deduping.NoOpTyperDeduperWithV1V2Migrations;
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper;
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter;
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler;
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator;
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV1V2Migrator;
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV2TableMigrator;
import io.airbyte.integrations.destination.bigquery.uploader.BigQueryDirectUploader;
import io.airbyte.integrations.destination.bigquery.uploader.BigQueryUploaderFactory;
import io.airbyte.integrations.destination.bigquery.uploader.config.UploaderConfig;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Slf4j
@SuppressWarnings("try")
public class BigQueryRecordStandardConsumer extends AsyncStreamConsumer {

private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryRecordStandardConsumer.class);

public BigQueryRecordStandardConsumer(Consumer<AirbyteMessage> outputRecordCollector,
OnStartFunction onStart,
OnCloseFunction onClose,
Expand All @@ -41,4 +68,144 @@ public BigQueryRecordStandardConsumer(Consumer<AirbyteMessage> outputRecordColle
Executors.newFixedThreadPool(2));
}

public static SerializedAirbyteMessageConsumer createStandardConsumer(final BigQuery bigquery,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final ParsedCatalog parsedCatalog,
final Consumer<AirbyteMessage> outputRecordCollector,
final BigQuerySqlGenerator sqlGenerator,
final BigQueryDestinationHandler destinationHandler,
final boolean disableTypeDedupe)
throws Exception {
// Code related to initializing standard insert consumer isolated in this class file.
final TyperDeduper typerDeduper =
buildTyperDeduper(sqlGenerator, parsedCatalog, destinationHandler, bigquery, disableTypeDedupe);
return getStandardRecordConsumer(bigquery, config, catalog, parsedCatalog, outputRecordCollector, typerDeduper);

}

private static SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuery bigquery,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final ParsedCatalog parsedCatalog,
final Consumer<AirbyteMessage> outputRecordCollector,
final TyperDeduper typerDeduper)
throws Exception {
final Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader>> writeConfigs = getUploaderMap(
bigquery,
config,
catalog,
parsedCatalog);

final String bqNamespace = BigQueryUtils.getDatasetId(config);

return new BigQueryRecordStandardConsumer(
outputRecordCollector,
() -> {
typerDeduper.prepareSchemasAndRunMigrations();

// Set up our raw tables
writeConfigs.get().forEach((streamId, uploader) -> {
final StreamConfig stream = parsedCatalog.getStream(streamId);
if (stream.getDestinationSyncMode() == DestinationSyncMode.OVERWRITE) {
// For streams in overwrite mode, truncate the raw table.
// non-1s1t syncs actually overwrite the raw table at the end of the sync, so we only do this in
// 1s1t mode.
final TableId rawTableId = TableId.of(stream.getId().getRawNamespace(), stream.getId().getRawName());
LOGGER.info("Deleting Raw table {}", rawTableId);
if (!bigquery.delete(rawTableId)) {
LOGGER.info("Raw table {} not found, continuing with creation", rawTableId);
}
LOGGER.info("Creating table {}", rawTableId);
BigQueryUtils.createPartitionedTableIfNotExists(bigquery, rawTableId, BigQueryRecordFormatter.SCHEMA_V2);
} else {
uploader.createRawTable();
}
});

typerDeduper.prepareFinalTables();
},
(hasFailed, streamSyncSummaries) -> {
try {
Thread.sleep(30 * 1000);
typerDeduper.typeAndDedupe(streamSyncSummaries);
typerDeduper.commitFinalTables();
typerDeduper.cleanup();
} catch (final Exception e) {
throw new RuntimeException(e);
}
},
catalog,
bqNamespace,
writeConfigs);
}

protected static Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader>> getUploaderMap(
final BigQuery bigquery,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final ParsedCatalog parsedCatalog)
throws IOException {
return () -> {
final ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader> uploaderMap = new ConcurrentHashMap<>();
for (final ConfiguredAirbyteStream configStream : catalog.getStreams()) {
final AirbyteStream stream = configStream.getStream();
final StreamConfig parsedStream;

final String targetTableName;

parsedStream = parsedCatalog.getStream(stream.getNamespace(), stream.getName());
targetTableName = parsedStream.getId().getRawName();

final UploaderConfig uploaderConfig = UploaderConfig
.builder()
.bigQuery(bigquery)
.parsedStream(parsedStream)
.bigQueryClientChunkSize(BigQueryUtils.getBigQueryClientChunkSize(config))
.datasetLocation(BigQueryUtils.getDatasetLocation(config))
.formatter(new BigQueryRecordFormatter(new BigQuerySQLNameTransformer()))
.targetTableName(targetTableName)
.build();

try {
putStreamIntoUploaderMap(stream, uploaderConfig, uploaderMap);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
return uploaderMap;
};
}

protected static void putStreamIntoUploaderMap(final AirbyteStream stream,
final UploaderConfig uploaderConfig,
final Map<AirbyteStreamNameNamespacePair, BigQueryDirectUploader> uploaderMap)
throws IOException {
uploaderMap.put(
AirbyteStreamNameNamespacePair.fromAirbyteStream(stream),
BigQueryUploaderFactory.getUploader(uploaderConfig));
}

private static TyperDeduper buildTyperDeduper(final BigQuerySqlGenerator sqlGenerator,
final ParsedCatalog parsedCatalog,
final BigQueryDestinationHandler destinationHandler,
final BigQuery bigquery,
final boolean disableTypeDedupe) {
final BigQueryV1V2Migrator migrator = new BigQueryV1V2Migrator(bigquery, new BigQuerySQLNameTransformer());
final BigQueryV2TableMigrator v2RawTableMigrator = new BigQueryV2TableMigrator(bigquery);

if (disableTypeDedupe) {
return new NoOpTyperDeduperWithV1V2Migrations<>(
sqlGenerator, destinationHandler, parsedCatalog, migrator, v2RawTableMigrator, List.of());
}

return new DefaultTyperDeduper<>(
sqlGenerator,
destinationHandler,
parsedCatalog,
migrator,
v2RawTableMigrator,
List.of());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@

package io.airbyte.integrations.destination.bigquery;

import static io.airbyte.integrations.destination.bigquery.helpers.LoggerHelper.getJobErrorMessage;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.cloud.RetryOption;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
Expand Down Expand Up @@ -37,15 +36,20 @@
import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.json.Jsons;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.logging.log4j.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.threeten.bp.Duration;

public class BigQueryUtils {

Expand Down Expand Up @@ -185,7 +189,7 @@ public static Table createTable(final BigQuery bigquery, final String datasetNam
* @return Table BigQuery table object to be referenced for deleting, otherwise empty meaning table
* was not successfully created
*/
static void createPartitionedTableIfNotExists(final BigQuery bigquery, final TableId tableId, final Schema schema) {
public static void createPartitionedTableIfNotExists(final BigQuery bigquery, final TableId tableId, final Schema schema) {
try {
final var chunkingColumn = JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT;
final TimePartitioning partitioning = TimePartitioning.newBuilder(TimePartitioning.Type.DAY)
Expand Down Expand Up @@ -316,17 +320,41 @@ public static void waitForJobFinish(final Job job) throws InterruptedException {
if (job != null) {
AirbyteExceptionHandler.addStringForDeinterpolation(job.getEtag());
try {
LOGGER.info("Waiting for job finish {}. Status: {}", job, job.getStatus());
job.waitFor();
LOGGER.info("Job finish {} with status {}", job, job.getStatus());
LOGGER.info("Waiting for Job {} to finish. Status: {}", job.getJobId(), job.getStatus());
// Default totalTimeout is 12 Hours, 30 minutes seems reasonable
final Job completedJob = job.waitFor(RetryOption.totalTimeout(Duration.ofMinutes(30)));
if (completedJob == null) {
// job no longer exists
LOGGER.warn("Job {} No longer exists", job.getJobId());
} else if (completedJob.getStatus().getError() != null) {
// job failed, handle error
LOGGER.error("Job {} failed with errors {}", completedJob.getJobId(), completedJob.getStatus().getError().toString());
throw new RuntimeException(
"Fail to complete a load job in big query, Job id: " + completedJob.getJobId() +
", with error: " + completedJob.getStatus().getError());
} else {
// job completed successfully
LOGGER.info("Job {} completed successfully, job info {}", completedJob.getJobId(), completedJob);
}
} catch (final BigQueryException e) {
final String errorMessage = getJobErrorMessage(e.getErrors(), job);
LOGGER.error(errorMessage);
throw new BigQueryException(e.getCode(), errorMessage, e);
}
} else {
LOGGER.warn("Received null value for Job, nothing to waitFor");
}
}

private static String getJobErrorMessage(List<BigQueryError> errors, Job job) {
if (errors == null || errors.isEmpty()) {
return StringUtils.EMPTY;

}
return String.format("An error occurred during execution of job: %s, \n For more details see Big Query Error collection: %s:", job,
errors.stream().map(BigQueryError::toString).collect(Collectors.joining(",\n ")));
}

public static HeaderProvider getHeaderProvider() {
final String connectorName = getConnectorNameOrDefault();
return () -> ImmutableMap.of("user-agent", String.format(USER_AGENT_FORMAT, connectorName));
Expand All @@ -338,4 +366,13 @@ private static String getConnectorNameOrDefault() {
.orElse("destination-bigquery");
}

public static void printHeapMemoryConsumption() {
final int mb = 1024 * 1024;
final MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
final long xmx = memoryBean.getHeapMemoryUsage().getMax() / mb;
final long xms = memoryBean.getHeapMemoryUsage().getInit() / mb;
LOGGER.info("Initial Memory (xms) mb = {}", xms);
LOGGER.info("Max Memory (xmx) : mb = {}", xmx);
}

}
Loading

0 comments on commit 07a9762

Please sign in to comment.