Skip to content

Commit

Permalink
馃帀 BigQuery destinations with partitionned/clustered keys (#7240)
Browse files Browse the repository at this point in the history
* [ #5959 ][ #2579 ] Add support of partitioned tables by _airbyte_emitted_at field (#7141)

Co-authored-by: Andr茅s Bravo <andresbravog@gmail.com>
  • Loading branch information
ChristopheDuong and andresbravog committed Oct 25, 2021
1 parent 9a0b8da commit 27df558
Show file tree
Hide file tree
Showing 17 changed files with 249 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
- destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
name: BigQuery
dockerRepository: airbyte/destination-bigquery
dockerImageTag: 0.4.1
dockerImageTag: 0.5.0
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
- destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496
name: BigQuery (denormalized typed struct)
dockerRepository: airbyte/destination-bigquery-denormalized
dockerImageTag: 0.1.6
dockerImageTag: 0.1.7
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
- destinationDefinitionId: ca8f6566-e555-4b40-943a-545bf123117a
name: Google Cloud Storage (GCS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ class {{pascalCase name}}JdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTe
return {{pascalCase name}}Source.DRIVER_CLASS;
}

@Override
public AbstractJdbcSource getJdbcSource() {
// TODO
return null;
}

@AfterAll
static void cleanUp() {
// TODO close the container. Ex: "container.close();"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ protected JsonNode formatData(final FieldList fields, final JsonNode root) {
.collect(Collectors.toList()));

// "Array of Array of" (nested arrays) are not permitted by BigQuery ("Array of Record of Array of"
// is)
// Turn all "Array of" into "Array of Record of" instead
// is). Turn all "Array of" into "Array of Record of" instead
return Jsons.jsonNode(ImmutableMap.of(BigQueryDenormalizedDestination.NESTED_ARRAY_FIELD, items));
} else {
return root;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.4.1
LABEL io.airbyte.version=0.5.0
LABEL io.airbyte.name=airbyte/destination-bigquery
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ dependencies {
implementation 'com.google.cloud:google-cloud-bigquery:1.122.2'
implementation 'org.apache.commons:commons-lang3:3.11'

// csv
implementation 'org.apache.commons:commons-csv:1.4'

implementation project(':airbyte-config:models')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-protocol:models')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ public class BigQueryDestination extends BaseConnector implements Destination {

private static final com.google.cloud.bigquery.Schema SCHEMA = com.google.cloud.bigquery.Schema.of(
Field.of(JavaBaseConstants.COLUMN_NAME_AB_ID, StandardSQLTypeName.STRING),
// GCS works with only date\datetime formats, so need to have it a string for a while
// https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-csv#data_types
Field.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, StandardSQLTypeName.STRING),
Field.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, StandardSQLTypeName.TIMESTAMP),
Field.of(JavaBaseConstants.COLUMN_NAME_DATA, StandardSQLTypeName.STRING));

private final BigQuerySQLNameTransformer namingResolver;
Expand Down Expand Up @@ -318,10 +316,10 @@ private boolean isKeepFilesInGcs(final JsonNode config) {
if (loadingMethod != null && loadingMethod.get(BigQueryConsts.KEEP_GCS_FILES) != null
&& BigQueryConsts.KEEP_GCS_FILES_VAL
.equals(loadingMethod.get(BigQueryConsts.KEEP_GCS_FILES).asText())) {
LOGGER.info("All tmp files GCS will be kept in bucket when migration is finished");
LOGGER.info("All tmp files GCS will be kept in bucket when replication is finished");
return true;
} else {
LOGGER.info("All tmp files will be removed from GCS when migration is finished");
LOGGER.info("All tmp files will be removed from GCS when replication is finished");
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.CopyJobConfiguration;
import com.google.cloud.bigquery.CsvOptions;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobInfo.CreateDisposition;
import com.google.cloud.bigquery.JobInfo.WriteDisposition;
import com.google.cloud.bigquery.LoadJobConfiguration;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.QueryParameterValue;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.TableDataWriteChannel;
Expand All @@ -27,6 +29,7 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.string.Strings;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
Expand Down Expand Up @@ -119,7 +122,7 @@ public void acceptTracked(final AirbyteMessage message) throws IOException {
} else {
// GCS uploading way, this data will be moved to bigquery in close method
final GcsCsvWriter gcsCsvWriter = writer.getGcsCsvWriter();
gcsCsvWriter.write(UUID.randomUUID(), recordMessage);
writeRecordToCsv(gcsCsvWriter, recordMessage);
}
} else {
LOGGER.warn("Unexpected message: " + message.getType());
Expand All @@ -138,6 +141,23 @@ protected JsonNode formatRecord(final Schema schema, final AirbyteRecordMessage
JavaBaseConstants.COLUMN_NAME_EMITTED_AT, formattedEmittedAt));
}

protected void writeRecordToCsv(final GcsCsvWriter gcsCsvWriter, final AirbyteRecordMessage recordMessage) {
// Bigquery represents TIMESTAMP to the microsecond precision, so we convert to microseconds then
// use BQ helpers to string-format correctly.
final long emittedAtMicroseconds = TimeUnit.MICROSECONDS.convert(recordMessage.getEmittedAt(), TimeUnit.MILLISECONDS);
final String formattedEmittedAt = QueryParameterValue.timestamp(emittedAtMicroseconds).getValue();
final JsonNode formattedData = StandardNameTransformer.formatJsonPath(recordMessage.getData());
try {
gcsCsvWriter.getCsvPrinter().printRecord(
UUID.randomUUID().toString(),
formattedEmittedAt,
Jsons.serialize(formattedData));
} catch (IOException e) {
e.printStackTrace();
LOGGER.warn("An error occurred writing CSV file.");
}
}

@Override
public void close(final boolean hasFailed) {
LOGGER.info("Started closing all connections");
Expand Down Expand Up @@ -181,7 +201,7 @@ private void closeGcsStreamsAndCopyDataToBigQuery(final boolean hasFailed) {
try {
loadCsvFromGcsTruncate(pair);
} catch (final Exception e) {
LOGGER.error("Failed to load data from GCS CSV file to BibQuery tmp table with reason: " + e.getMessage());
LOGGER.error("Failed to load data from GCS CSV file to BigQuery tmp table with reason: " + e.getMessage());
throw new RuntimeException(e);
}
});
Expand All @@ -198,7 +218,7 @@ private void loadCsvFromGcsTruncate(final BigQueryWriteConfig bigQueryWriteConfi

// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests.
LOGGER.info(String.format("Started coping data from %s GCS csv file to %s tmp BigQuery table with schema: \n %s",
LOGGER.info(String.format("Started copying data from %s GCS csv file to %s tmp BigQuery table with schema: \n %s",
csvFile, tmpTable, schema));

final CsvOptions csvOptions = CsvOptions.newBuilder().setEncoding(UTF8).setSkipLeadingRows(1).build();
Expand All @@ -215,7 +235,7 @@ private void loadCsvFromGcsTruncate(final BigQueryWriteConfig bigQueryWriteConfi
// Load the table
final Job loadJob = bigquery.create(JobInfo.of(configuration));

LOGGER.info("Crated a new job GCS csv file to tmp BigQuery table: " + loadJob);
LOGGER.info("Created a new job GCS csv file to tmp BigQuery table: " + loadJob);
LOGGER.info("Waiting for job to complete...");

// Load data from a GCS parquet file into the table
Expand Down Expand Up @@ -272,15 +292,20 @@ private void closeNormalBigqueryStreams(final boolean hasFailed) {
}));

if (!hasFailed) {
LOGGER.info("Migration finished with no explicit errors. Copying data from tmp tables to permanent");
LOGGER.info("Replication finished with no explicit errors. Copying data from tmp tables to permanent");
writeConfigs.values()
.forEach(
bigQueryWriteConfig -> copyTable(bigquery, bigQueryWriteConfig.getTmpTable(), bigQueryWriteConfig.getTable(),
bigQueryWriteConfig.getSyncMode()));
bigQueryWriteConfig -> {
if (bigQueryWriteConfig.getSyncMode().equals(WriteDisposition.WRITE_APPEND)) {
partitionIfUnpartitioned(bigQueryWriteConfig, bigquery, bigQueryWriteConfig.getTable());
}
copyTable(bigquery, bigQueryWriteConfig.getTmpTable(), bigQueryWriteConfig.getTable(),
bigQueryWriteConfig.getSyncMode());
});
// BQ is still all or nothing if a failure happens in the destination.
outputRecordCollector.accept(lastStateMessage);
} else {
LOGGER.warn("Had errors while migrations");
LOGGER.warn("Had errors while replicating");
}
} finally {
// clean up tmp tables;
Expand Down Expand Up @@ -324,7 +349,6 @@ private static void copyTable(
final TableId sourceTableId,
final TableId destinationTableId,
final WriteDisposition syncMode) {

final CopyJobConfiguration configuration = CopyJobConfiguration.newBuilder(destinationTableId, sourceTableId)
.setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.setWriteDisposition(syncMode)
Expand All @@ -336,7 +360,67 @@ private static void copyTable(
LOGGER.error("Failed on copy tables with error:" + job.getStatus());
throw new RuntimeException("BigQuery was unable to copy table due to an error: \n" + job.getStatus().getError());
}
LOGGER.info("successfully copied tmp table: {} to final table: {}", sourceTableId, destinationTableId);
LOGGER.info("successfully copied table: {} to table: {}", sourceTableId, destinationTableId);
}

private void partitionIfUnpartitioned(final BigQueryWriteConfig bigQueryWriteConfig,
final BigQuery bigquery,
final TableId destinationTableId) {
try {
final QueryJobConfiguration queryConfig = QueryJobConfiguration
.newBuilder(
String.format("SELECT max(is_partitioning_column) as is_partitioned FROM `%s.%s.INFORMATION_SCHEMA.COLUMNS` WHERE TABLE_NAME = '%s';",
bigquery.getOptions().getProjectId(),
destinationTableId.getDataset(),
destinationTableId.getTable()))
.setUseLegacySql(false)
.build();
final ImmutablePair<Job, String> result = BigQueryUtils.executeQuery(bigquery, queryConfig);
result.getLeft().getQueryResults().getValues().forEach(row -> {
if (!row.get("is_partitioned").isNull() && row.get("is_partitioned").getStringValue().equals("NO")) {
LOGGER.info("Partitioning existing destination table {}", destinationTableId);
final String tmpPartitionTable = Strings.addRandomSuffix("_airbyte_partitioned_table", "_", 5);
final TableId tmpPartitionTableId = TableId.of(destinationTableId.getDataset(), tmpPartitionTable);
// make sure tmpPartitionTable does not already exist
bigquery.delete(tmpPartitionTableId);
// Use BigQuery SQL to copy because java api copy jobs does not support creating a table from a
// select query, see:
// https://cloud.google.com/bigquery/docs/creating-partitioned-tables#create_a_partitioned_table_from_a_query_result
final QueryJobConfiguration partitionQuery = QueryJobConfiguration
.newBuilder(
getCreatePartitionedTableFromSelectQuery(bigQueryWriteConfig.getSchema(), bigquery.getOptions().getProjectId(), destinationTableId,
tmpPartitionTable))
.setUseLegacySql(false)
.build();
BigQueryUtils.executeQuery(bigquery, partitionQuery);
// Copying data from a partitioned tmp table into an existing non-partitioned table does not make it
// partitioned... thus, we force re-create from scratch by completely deleting and creating new
// table.
bigquery.delete(destinationTableId);
copyTable(bigquery, tmpPartitionTableId, destinationTableId, WriteDisposition.WRITE_EMPTY);
bigquery.delete(tmpPartitionTableId);
}
});
} catch (final InterruptedException e) {
LOGGER.warn("Had errors while partitioning: ", e);
}
}

protected String getCreatePartitionedTableFromSelectQuery(final Schema schema,
final String projectId,
final TableId destinationTableId,
final String tmpPartitionTable) {
return String.format("create table `%s.%s.%s` (", projectId, destinationTableId.getDataset(), tmpPartitionTable)
+ schema.getFields().stream()
.map(field -> String.format("%s %s", field.getName(), field.getType()))
.collect(Collectors.joining(", "))
+ ") partition by date("
+ JavaBaseConstants.COLUMN_NAME_EMITTED_AT
+ ") as select "
+ schema.getFields().stream()
.map(Field::getName)
.collect(Collectors.joining(", "))
+ String.format(" from `%s.%s.%s`", projectId, destinationTableId.getDataset(), destinationTableId.getTable());
}

private void printHeapMemoryConsumption() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.Clustering;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Job;
Expand All @@ -18,8 +19,11 @@
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.JavaBaseConstants;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.lang3.tuple.ImmutablePair;
Expand Down Expand Up @@ -69,7 +73,7 @@ static void createSchemaAndTableIfNeeded(final BigQuery bigquery,
createSchemaTable(bigquery, schemaName, datasetLocation);
existingSchemas.add(schemaName);
}
BigQueryUtils.createTable(bigquery, schemaName, tmpTableName, schema);
BigQueryUtils.createPartitionedTable(bigquery, schemaName, tmpTableName, schema);
}

static void createSchemaTable(final BigQuery bigquery, final String datasetId, final String datasetLocation) {
Expand All @@ -80,18 +84,32 @@ static void createSchemaTable(final BigQuery bigquery, final String datasetId, f
}
}

// https://cloud.google.com/bigquery/docs/tables#create-table
static void createTable(final BigQuery bigquery, final String datasetName, final String tableName, final Schema schema) {
// https://cloud.google.com/bigquery/docs/creating-partitioned-tables#java
static void createPartitionedTable(final BigQuery bigquery, final String datasetName, final String tableName, final Schema schema) {
try {

final TableId tableId = TableId.of(datasetName, tableName);
final TableDefinition tableDefinition = StandardTableDefinition.of(schema);

final TimePartitioning partitioning = TimePartitioning.newBuilder(TimePartitioning.Type.DAY)
.setField(JavaBaseConstants.COLUMN_NAME_EMITTED_AT)
.build();

final Clustering clustering = Clustering.newBuilder()
.setFields(ImmutableList.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.build();

final StandardTableDefinition tableDefinition =
StandardTableDefinition.newBuilder()
.setSchema(schema)
.setTimePartitioning(partitioning)
.setClustering(clustering)
.build();
final TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build();

bigquery.create(tableInfo);
LOGGER.info("Table: {} created successfully", tableId);
} catch (final BigQueryException e) {
LOGGER.info("Table was not created. \n", e);
LOGGER.info("Partitioned Table: {} created successfully", tableId);
} catch (BigQueryException e) {
LOGGER.info("Partitioned table was not created. \n" + e);
}
}

Expand Down

0 comments on commit 27df558

Please sign in to comment.