Skip to content

Commit

Permalink
✨ Destinations bigquery+snowflake: Release DV2 (#29783)
Browse files Browse the repository at this point in the history
Co-authored-by: edgao <edgao@users.noreply.github.com>
Co-authored-by: Evan Tahler <evan@airbyte.io>
Co-authored-by: evantahler <evantahler@users.noreply.github.com>
  • Loading branch information
4 people committed Aug 29, 2023
1 parent e95d704 commit 44f840d
Show file tree
Hide file tree
Showing 39 changed files with 263 additions and 449 deletions.
16 changes: 0 additions & 16 deletions airbyte-ci/connectors/pipelines/pipelines/actions/environments.py
Original file line number Diff line number Diff line change
Expand Up @@ -678,14 +678,6 @@ def with_integration_base_java(context: PipelineContext, build_platform: Platfor


BASE_DESTINATION_NORMALIZATION_BUILD_CONFIGURATION = {
"destination-bigquery": {
"dockerfile": "Dockerfile",
"dbt_adapter": "dbt-bigquery==1.0.0",
"integration_name": "bigquery",
"normalization_image": "airbyte/normalization:0.4.3",
"supports_in_connector_normalization": True,
"yum_packages": [],
},
"destination-clickhouse": {
"dockerfile": "clickhouse.Dockerfile",
"dbt_adapter": "dbt-clickhouse>=1.4.0",
Expand Down Expand Up @@ -742,14 +734,6 @@ def with_integration_base_java(context: PipelineContext, build_platform: Platfor
"supports_in_connector_normalization": True,
"yum_packages": [],
},
"destination-snowflake": {
"dockerfile": "snowflake.Dockerfile",
"dbt_adapter": "dbt-snowflake==1.0.0",
"integration_name": "snowflake",
"normalization_image": "airbyte/normalization-snowflake:0.4.3",
"supports_in_connector_normalization": True,
"yum_packages": ["gcc-c++"],
},
"destination-tidb": {
"dockerfile": "tidb.Dockerfile",
"dbt_adapter": "dbt-tidb==1.0.1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.jackson.MoreMappers;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.TypingAndDedupingFlag;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import java.util.UUID;
import org.apache.avro.Schema;
Expand All @@ -33,21 +32,15 @@ public AvroRecordFactory(final Schema schema, final JsonAvroConverter converter)

public GenericData.Record getAvroRecord(final UUID id, final AirbyteRecordMessage recordMessage) throws JsonProcessingException {
final ObjectNode jsonRecord = MAPPER.createObjectNode();
if (TypingAndDedupingFlag.isDestinationV2()) {
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, id.toString());
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, recordMessage.getEmittedAt());
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, (Long) null);
} else {
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_ID, id.toString());
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, recordMessage.getEmittedAt());
}
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_ID, id.toString());
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, recordMessage.getEmittedAt());
jsonRecord.setAll((ObjectNode) recordMessage.getData());

return converter.convertToGenericDataRecord(WRITER.writeValueAsBytes(jsonRecord), schema);
}

public GenericData.Record getAvroRecord(JsonNode formattedData) throws JsonProcessingException {
var bytes = WRITER.writeValueAsBytes(formattedData);
public GenericData.Record getAvroRecord(final JsonNode formattedData) throws JsonProcessingException {
final var bytes = WRITER.writeValueAsBytes(formattedData);
return converter.convertToGenericDataRecord(bytes, schema);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.TypingAndDedupingFlag;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import java.sql.Timestamp;
import java.time.Instant;
Expand All @@ -30,12 +29,16 @@
*/
public class StagingDatabaseCsvSheetGenerator implements CsvSheetGenerator {

private final boolean use1s1t;
private final boolean useDestinationsV2Columns;
private final List<String> header;

public StagingDatabaseCsvSheetGenerator() {
use1s1t = TypingAndDedupingFlag.isDestinationV2();
this.header = use1s1t ? JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES : JavaBaseConstants.LEGACY_RAW_TABLE_COLUMNS;
this(false);
}

public StagingDatabaseCsvSheetGenerator(final boolean useDestinationsV2Columns) {
this.useDestinationsV2Columns = useDestinationsV2Columns;
this.header = this.useDestinationsV2Columns ? JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES : JavaBaseConstants.LEGACY_RAW_TABLE_COLUMNS;
}

// TODO is this even used anywhere?
Expand All @@ -56,7 +59,7 @@ public List<Object> getDataRow(final JsonNode formattedData) {

@Override
public List<Object> getDataRow(final UUID id, final String formattedString, final long emittedAt) {
if (use1s1t) {
if (useDestinationsV2Columns) {
return List.of(
id,
Timestamp.from(Instant.ofEpochMilli(emittedAt)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,16 @@ class AsyncFlush implements DestinationFlushFunction {
private final TypeAndDedupeOperationValve typerDeduperValve;
private final TyperDeduper typerDeduper;
private final long optimalBatchSizeBytes;
private final boolean useDestinationsV2Columns;

public AsyncFlush(final Map<StreamDescriptor, WriteConfig> streamDescToWriteConfig,
final StagingOperations stagingOperations,
final JdbcDatabase database,
final ConfiguredAirbyteCatalog catalog,
final TypeAndDedupeOperationValve typerDeduperValve,
final TyperDeduper typerDeduper) {
this(streamDescToWriteConfig, stagingOperations, database, catalog, typerDeduperValve, typerDeduper, 50 * 1024 * 1024);
final TyperDeduper typerDeduper,
final boolean useDestinationsV2Columns) {
this(streamDescToWriteConfig, stagingOperations, database, catalog, typerDeduperValve, typerDeduper, 50 * 1024 * 1024, useDestinationsV2Columns);
}

public AsyncFlush(final Map<StreamDescriptor, WriteConfig> streamDescToWriteConfig,
Expand All @@ -56,14 +58,16 @@ public AsyncFlush(final Map<StreamDescriptor, WriteConfig> streamDescToWriteConf
// resource the connector will usually at most fill up around 150 MB in a single queue. By lowering
// the batch size, the AsyncFlusher will flush in smaller batches which allows for memory to be
// freed earlier similar to a sliding window effect
long optimalBatchSizeBytes) {
final long optimalBatchSizeBytes,
final boolean useDestinationsV2Columns) {
this.streamDescToWriteConfig = streamDescToWriteConfig;
this.stagingOperations = stagingOperations;
this.database = database;
this.catalog = catalog;
this.typerDeduperValve = typerDeduperValve;
this.typerDeduper = typerDeduper;
this.optimalBatchSizeBytes = optimalBatchSizeBytes;
this.useDestinationsV2Columns = useDestinationsV2Columns;
}

@Override
Expand All @@ -72,7 +76,7 @@ public void flush(final StreamDescriptor decs, final Stream<PartialAirbyteMessag
try {
writer = new CsvSerializedBuffer(
new FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX),
new StagingDatabaseCsvSheetGenerator(),
new StagingDatabaseCsvSheetGenerator(useDestinationsV2Columns),
true);

// reassign as lambdas require references to be final.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.SerializedAirbyteMessageConsumer;
import io.airbyte.integrations.base.TypingAndDedupingFlag;
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeOperationValve;
Expand Down Expand Up @@ -74,8 +73,9 @@ public AirbyteMessageConsumer create(final Consumer<AirbyteMessage> outputRecord
final TypeAndDedupeOperationValve typerDeduperValve,
final TyperDeduper typerDeduper,
final ParsedCatalog parsedCatalog,
final String defaultNamespace) {
final List<WriteConfig> writeConfigs = createWriteConfigs(namingResolver, config, catalog, parsedCatalog);
final String defaultNamespace,
final boolean useDestinationsV2Columns) {
final List<WriteConfig> writeConfigs = createWriteConfigs(namingResolver, config, catalog, parsedCatalog, useDestinationsV2Columns);
return new BufferedStreamConsumer(
outputRecordCollector,
GeneralStagingFunctions.onStartFunction(database, stagingOperations, writeConfigs, typerDeduper),
Expand All @@ -99,10 +99,12 @@ public SerializedAirbyteMessageConsumer createAsync(final Consumer<AirbyteMessag
final TypeAndDedupeOperationValve typerDeduperValve,
final TyperDeduper typerDeduper,
final ParsedCatalog parsedCatalog,
final String defaultNamespace) {
final List<WriteConfig> writeConfigs = createWriteConfigs(namingResolver, config, catalog, parsedCatalog);
final String defaultNamespace,
final boolean useDestinationsV2Columns) {
final List<WriteConfig> writeConfigs = createWriteConfigs(namingResolver, config, catalog, parsedCatalog, useDestinationsV2Columns);
final var streamDescToWriteConfig = streamDescToWriteConfig(writeConfigs);
final var flusher = new AsyncFlush(streamDescToWriteConfig, stagingOperations, database, catalog, typerDeduperValve, typerDeduper);
final var flusher =
new AsyncFlush(streamDescToWriteConfig, stagingOperations, database, catalog, typerDeduperValve, typerDeduper, useDestinationsV2Columns);
return new AsyncStreamConsumer(
outputRecordCollector,
GeneralStagingFunctions.onStartFunction(database, stagingOperations, writeConfigs, typerDeduper),
Expand Down Expand Up @@ -156,22 +158,24 @@ private static StreamDescriptor toStreamDescriptor(final WriteConfig config) {
private static List<WriteConfig> createWriteConfigs(final NamingConventionTransformer namingResolver,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final ParsedCatalog parsedCatalog) {
final ParsedCatalog parsedCatalog,
final boolean useDestinationsV2Columns) {

return catalog.getStreams().stream().map(toWriteConfig(namingResolver, config, parsedCatalog)).collect(toList());
return catalog.getStreams().stream().map(toWriteConfig(namingResolver, config, parsedCatalog, useDestinationsV2Columns)).collect(toList());
}

private static Function<ConfiguredAirbyteStream, WriteConfig> toWriteConfig(final NamingConventionTransformer namingResolver,
final JsonNode config,
final ParsedCatalog parsedCatalog) {
final ParsedCatalog parsedCatalog,
final boolean useDestinationsV2Columns) {
return stream -> {
Preconditions.checkNotNull(stream.getDestinationSyncMode(), "Undefined destination sync mode");
final AirbyteStream abStream = stream.getStream();
final String streamName = abStream.getName();

final String outputSchema;
final String tableName;
if (TypingAndDedupingFlag.isDestinationV2()) {
if (useDestinationsV2Columns) {
final StreamId streamId = parsedCatalog.getStream(abStream.getNamespace(), streamName).id();
outputSchema = streamId.rawNamespace();
tableName = streamId.rawName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,6 @@ public class BigQueryDenormalizedDestination extends BigQueryDestination {

private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDenormalizedDestination.class);

@Override
protected String getTargetTableName(final String streamName) {
// This BigQuery destination does not write to a staging "raw" table but directly to a normalized
// table
return namingResolver.getIdentifier(streamName);
}

@Override
protected Map<UploaderType, BigQueryRecordFormatter> getFormatterMap(final JsonNode jsonSchema) {
return Map.of(UploaderType.STANDARD, new DefaultBigQueryDenormalizedRecordFormatter(jsonSchema, namingResolver),
Expand Down Expand Up @@ -73,23 +66,14 @@ protected Function<JsonNode, BigQueryRecordFormatter> getRecordFormatterCreator(
return streamSchema -> new GcsBigQueryDenormalizedRecordFormatter(streamSchema, namingResolver);
}

/**
* This BigQuery destination does not write to a staging "raw" table but directly to a normalized
* table.
*/
@Override
protected Function<String, String> getTargetTableNameTransformer(final BigQuerySQLNameTransformer namingResolver) {
return namingResolver::getIdentifier;
}

@Override
protected void putStreamIntoUploaderMap(AirbyteStream stream,
UploaderConfig uploaderConfig,
Map<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>> uploaderMap)
protected void putStreamIntoUploaderMap(final AirbyteStream stream,
final UploaderConfig uploaderConfig,
final Map<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>> uploaderMap)
throws IOException {
String datasetId = BigQueryUtils.sanitizeDatasetId(uploaderConfig.getConfigStream().getStream().getNamespace());
Table existingTable = uploaderConfig.getBigQuery().getTable(datasetId, uploaderConfig.getTargetTableName());
BigQueryRecordFormatter formatter = uploaderConfig.getFormatter();
final String datasetId = BigQueryUtils.sanitizeDatasetId(uploaderConfig.getConfigStream().getStream().getNamespace());
final Table existingTable = uploaderConfig.getBigQuery().getTable(datasetId, uploaderConfig.getTargetTableName());
final BigQueryRecordFormatter formatter = uploaderConfig.getFormatter();

if (existingTable != null) {
LOGGER.info("Target table already exists. Checking could we use the default destination processing.");
Expand All @@ -104,7 +88,7 @@ protected void putStreamIntoUploaderMap(AirbyteStream stream,
LOGGER.info("Target table is not created yet. The default destination processing will be used.");
}

AbstractBigQueryUploader<?> uploader = BigQueryUploaderFactory.getUploader(uploaderConfig);
final AbstractBigQueryUploader<?> uploader = BigQueryUploaderFactory.getUploader(uploaderConfig);
uploaderMap.put(
AirbyteStreamNameNamespacePair.fromAirbyteStream(stream),
uploader);
Expand All @@ -119,7 +103,8 @@ protected void putStreamIntoUploaderMap(AirbyteStream stream,
* @param existingSchema BigQuery schema of the existing table (created by previous run)
* @return Are calculated fields same as we have in the existing table
*/
private boolean compareSchemas(com.google.cloud.bigquery.Schema expectedSchema, @Nullable com.google.cloud.bigquery.Schema existingSchema) {
private boolean compareSchemas(final com.google.cloud.bigquery.Schema expectedSchema,
@Nullable final com.google.cloud.bigquery.Schema existingSchema) {
if (expectedSchema != null && existingSchema == null) {
LOGGER.warn("Existing schema is null when we expect {}", expectedSchema);
return false;
Expand All @@ -131,11 +116,11 @@ private boolean compareSchemas(com.google.cloud.bigquery.Schema expectedSchema,
return false;
}

var expectedFields = expectedSchema.getFields();
var existingFields = existingSchema.getFields();
final var expectedFields = expectedSchema.getFields();
final var existingFields = existingSchema.getFields();

for (Field expectedField : expectedFields) {
var existingField = existingFields.get(expectedField.getName());
for (final Field expectedField : expectedFields) {
final var existingField = existingFields.get(expectedField.getName());
if (isDifferenceBetweenFields(expectedField, existingField)) {
LOGGER.warn("Expected field {} is different from existing field {}", expectedField, existingField);
return false;
Expand All @@ -146,7 +131,7 @@ private boolean compareSchemas(com.google.cloud.bigquery.Schema expectedSchema,
return true;
}

private boolean isDifferenceBetweenFields(Field expectedField, Field existingField) {
private boolean isDifferenceBetweenFields(final Field expectedField, final Field existingField) {
if (existingField == null) {
return true;
} else {
Expand All @@ -165,9 +150,9 @@ private boolean isDifferenceBetweenFields(Field expectedField, Field existingFie
* @param existingField existing field structure
* @return is critical difference in the field modes
*/
private boolean compareRepeatedMode(Field expectedField, Field existingField) {
var expectedMode = expectedField.getMode();
var existingMode = existingField.getMode();
private boolean compareRepeatedMode(final Field expectedField, final Field existingField) {
final var expectedMode = expectedField.getMode();
final var existingMode = existingField.getMode();

if (expectedMode != null && expectedMode.equals(REPEATED) || existingMode != null && existingMode.equals(REPEATED)) {
return expectedMode != null && expectedMode.equals(existingMode);
Expand All @@ -176,17 +161,17 @@ private boolean compareRepeatedMode(Field expectedField, Field existingField) {
}
}

private boolean compareSubFields(Field expectedField, Field existingField) {
var expectedSubFields = expectedField.getSubFields();
var existingSubFields = existingField.getSubFields();
private boolean compareSubFields(final Field expectedField, final Field existingField) {
final var expectedSubFields = expectedField.getSubFields();
final var existingSubFields = existingField.getSubFields();

if (expectedSubFields == null || expectedSubFields.isEmpty()) {
return true;
} else if (existingSubFields == null || existingSubFields.isEmpty()) {
return false;
} else {
for (Field expectedSubField : expectedSubFields) {
var existingSubField = existingSubFields.get(expectedSubField.getName());
for (final Field expectedSubField : expectedSubFields) {
final var existingSubField = existingSubFields.get(expectedSubField.getName());
if (isDifferenceBetweenFields(expectedSubField, existingSubField)) {
return false;
}
Expand Down

0 comments on commit 44f840d

Please sign in to comment.