Skip to content

Commit

Permalink
Async Destination V0: Async Staging Consumer Factory. (#26371)
Browse files Browse the repository at this point in the history
Follow up to #26366 .

Pull in the async consumer changes into the Consumer Factory. Also take the chance to split out the StagingConsumerFactory with the goal of clarifying the various general, serial and async functions.

Instead of one massive factory file, split into
- GeneralStagingFunction.java
- AsyncFlush.java
- SerialFlush.java
representing the general buckets of code we have today.

I'm sure we can do smarter things here. This is the bare minimum to unblock us + 'leave things better than we found them'.
  • Loading branch information
davinchia committed May 22, 2023
1 parent 4aca67b commit 21b36e7
Show file tree
Hide file tree
Showing 4 changed files with 375 additions and 164 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.staging;

import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.destination.jdbc.WriteConfig;
import io.airbyte.integrations.destination.record_buffer.FileBuffer;
import io.airbyte.integrations.destination.s3.csv.CsvSerializedBuffer;
import io.airbyte.integrations.destination.s3.csv.StagingDatabaseCsvSheetGenerator;
import io.airbyte.integrations.destination_async.DestinationFlushFunction;
import io.airbyte.protocol.models.Jsons;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;

/**
* Async flushing logic. Flushing async prevents backpressure and is the superior flushing strategy.
*/
@Slf4j
class AsyncFlush implements DestinationFlushFunction {

private final Map<StreamDescriptor, WriteConfig> streamDescToWriteConfig;
private final StagingOperations stagingOperations;
private final JdbcDatabase database;
private final ConfiguredAirbyteCatalog catalog;

public AsyncFlush(final Map<StreamDescriptor, WriteConfig> streamDescToWriteConfig,
final StagingOperations stagingOperations,
final JdbcDatabase database,
final ConfiguredAirbyteCatalog catalog) {
this.streamDescToWriteConfig = streamDescToWriteConfig;
this.stagingOperations = stagingOperations;
this.database = database;
this.catalog = catalog;
}

// todo(davin): exceptions are too broad.
@Override
public void flush(final StreamDescriptor decs, final Stream<AirbyteMessage> stream) throws Exception {
// write this to a file - serilizable buffer?
// where do we create all the write configs?
log.info("Starting staging flush..");
CsvSerializedBuffer writer = null;
try {
writer = new CsvSerializedBuffer(
new FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX),
new StagingDatabaseCsvSheetGenerator(),
true);

log.info("Converting to CSV file..");

// reassign as lambdas require references to be final.
final CsvSerializedBuffer finalWriter = writer;
stream.forEach(record -> {
try {
// todo(davin): handle non-record airbyte messages.
finalWriter.accept(record.getRecord());
} catch (final Exception e) {
throw new RuntimeException(e);
}
});
} catch (final Exception e) {
throw new RuntimeException(e);
}

log.info("Converted to CSV file..");
writer.flush();
log.info("Flushing buffer for stream {} ({}) to staging", decs.getName(), FileUtils.byteCountToDisplaySize(writer.getByteCount()));
if (!streamDescToWriteConfig.containsKey(decs)) {
throw new IllegalArgumentException(
String.format("Message contained record from a stream that was not in the catalog. \ncatalog: %s", Jsons.serialize(catalog)));
}

final WriteConfig writeConfig = streamDescToWriteConfig.get(decs);
final String schemaName = writeConfig.getOutputSchemaName();
final String stageName = stagingOperations.getStageName(schemaName, writeConfig.getStreamName());
final String stagingPath =
stagingOperations.getStagingPath(StagingConsumerFactory.RANDOM_CONNECTION_ID, schemaName, writeConfig.getStreamName(),
writeConfig.getWriteDatetime());
try {
log.info("Starting upload to stage..");
final String stagedFile = stagingOperations.uploadRecordsToStage(database, writer, schemaName, stageName, stagingPath);
GeneralStagingFunctions.copyIntoTableFromStage(database, stageName, stagingPath, List.of(stagedFile), writeConfig.getOutputTableName(),
schemaName,
stagingOperations);
} catch (final Exception e) {
log.error("Failed to flush and commit buffer data into destination's raw table", e);
throw new RuntimeException("Failed to upload buffer to stage and commit to destination", e);
}

writer.close();
}

@Override
public long getOptimalBatchSizeBytes() {
// todo(davin): this should be per-destination specific. currently this is for Snowflake.
return 200 * 1024 * 1024;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.staging;

import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.destination.buffered_stream_consumer.OnCloseFunction;
import io.airbyte.integrations.destination.buffered_stream_consumer.OnStartFunction;
import io.airbyte.integrations.destination.jdbc.WriteConfig;
import java.util.ArrayList;
import java.util.List;
import lombok.extern.slf4j.Slf4j;

/**
* Functions and logic common to all flushing strategies.
*/
@Slf4j
public class GeneralStagingFunctions {

public static OnStartFunction onStartFunction(final JdbcDatabase database,
final StagingOperations stagingOperations,
final List<WriteConfig> writeConfigs) {
return () -> {
log.info("Preparing raw tables in destination started for {} streams", writeConfigs.size());
final List<String> queryList = new ArrayList<>();
for (final WriteConfig writeConfig : writeConfigs) {
final String schema = writeConfig.getOutputSchemaName();
final String stream = writeConfig.getStreamName();
final String dstTableName = writeConfig.getOutputTableName();
final String stageName = stagingOperations.getStageName(schema, stream);
final String stagingPath =
stagingOperations.getStagingPath(StagingConsumerFactory.RANDOM_CONNECTION_ID, schema, stream, writeConfig.getWriteDatetime());

log.info("Preparing staging area in destination started for schema {} stream {}: target table: {}, stage: {}",
schema, stream, dstTableName, stagingPath);

stagingOperations.createSchemaIfNotExists(database, schema);
stagingOperations.createTableIfNotExists(database, schema, dstTableName);
stagingOperations.createStageIfNotExists(database, stageName);

/*
* When we're in OVERWRITE, clear out the table at the start of a sync, this is an expected side
* effect of checkpoint and the removal of temporary tables
*/
switch (writeConfig.getSyncMode()) {
case OVERWRITE -> queryList.add(stagingOperations.truncateTableQuery(database, schema, dstTableName));
case APPEND, APPEND_DEDUP -> {}
default -> throw new IllegalStateException("Unrecognized sync mode: " + writeConfig.getSyncMode());
}

log.info("Preparing staging area in destination completed for schema {} stream {}", schema, stream);
}
log.info("Executing finalization of tables.");
stagingOperations.executeTransaction(database, queryList);
};
}

/**
* Handles copying data from staging area to destination table and clean up of staged files if
* upload was unsuccessful
*/
public static void copyIntoTableFromStage(final JdbcDatabase database,
final String stageName,
final String stagingPath,
final List<String> stagedFiles,
final String tableName,
final String schemaName,
final StagingOperations stagingOperations)
throws Exception {
try {
stagingOperations.copyIntoTableFromStage(database, stageName, stagingPath, stagedFiles,
tableName, schemaName);
} catch (final Exception e) {
stagingOperations.cleanUpStage(database, stageName, stagedFiles);
log.info("Cleaning stage path {}", stagingPath);
throw new RuntimeException("Failed to upload data from stage " + stagingPath, e);
}
}

/**
* Tear down process, will attempt to try to clean out any staging area
*
* @param database database used for syncing
* @param stagingOperations collection of SQL queries necessary for writing data into a staging area
* @param writeConfigs configuration settings for all destination connectors needed to write
* @param purgeStagingData drop staging area if true, keep otherwise
* @return
*/
public static OnCloseFunction onCloseFunction(final JdbcDatabase database,
final StagingOperations stagingOperations,
final List<WriteConfig> writeConfigs,
final boolean purgeStagingData) {
return (hasFailed) -> {
// After moving data from staging area to the target table (airybte_raw) clean up the staging
// area (if user configured)
log.info("Cleaning up destination started for {} streams", writeConfigs.size());
for (final WriteConfig writeConfig : writeConfigs) {
final String schemaName = writeConfig.getOutputSchemaName();
if (purgeStagingData) {
final String stageName = stagingOperations.getStageName(schemaName, writeConfig.getStreamName());
log.info("Cleaning stage in destination started for stream {}. schema {}, stage: {}", writeConfig.getStreamName(), schemaName,
stageName);
stagingOperations.dropStageIfExists(database, stageName);
}
}
log.info("Cleaning up destination completed.");
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.staging;

import static java.util.stream.Collectors.joining;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.destination.jdbc.WriteConfig;
import io.airbyte.integrations.destination.record_buffer.FlushBufferFunction;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;

/**
* Serial flushing logic. Though simpler, this causes unnecessary backpressure and slows down the
* entire pipeline.
* <p>
* Note: This class should be re-written so that is implements the {@link FlushBufferFunction}
* interface, instead of return an anonymous function implementing this interface for clarity. As of
* this writing, we avoid doing so to simplify the migration to async flushing.
*/
@Slf4j
public class SerialFlush {

/**
* Logic handling how destinations with staging areas (aka bucket storages) will flush their buffer
*
* @param database database used for syncing
* @param stagingOperations collection of SQL queries necessary for writing data into a staging area
* @param writeConfigs configuration settings for all destination connectors needed to write
* @param catalog collection of configured streams (e.g. API endpoints or database tables)
* @return
*/
@VisibleForTesting
public static FlushBufferFunction function(
final JdbcDatabase database,
final StagingOperations stagingOperations,
final List<WriteConfig> writeConfigs,
final ConfiguredAirbyteCatalog catalog) {
// TODO: (ryankfu) move this block of code that executes before the lambda to #onStartFunction
final Set<WriteConfig> conflictingStreams = new HashSet<>();
final Map<AirbyteStreamNameNamespacePair, WriteConfig> pairToWriteConfig = new HashMap<>();
for (final WriteConfig config : writeConfigs) {
final AirbyteStreamNameNamespacePair streamIdentifier = toNameNamespacePair(config);
if (pairToWriteConfig.containsKey(streamIdentifier)) {
conflictingStreams.add(config);
final WriteConfig existingConfig = pairToWriteConfig.get(streamIdentifier);
// The first conflicting stream won't have any problems, so we need to explicitly add it here.
conflictingStreams.add(existingConfig);
} else {
pairToWriteConfig.put(streamIdentifier, config);
}
}
if (!conflictingStreams.isEmpty()) {
final String message = String.format(
"You are trying to write multiple streams to the same table. Consider switching to a custom namespace format using ${SOURCE_NAMESPACE}, or moving one of them into a separate connection with a different stream prefix. Affected streams: %s",
conflictingStreams.stream().map(config -> config.getNamespace() + "." + config.getStreamName()).collect(joining(", ")));
throw new ConfigErrorException(message);
}
return (pair, writer) -> {
log.info("Flushing buffer for stream {} ({}) to staging", pair.getName(), FileUtils.byteCountToDisplaySize(writer.getByteCount()));
if (!pairToWriteConfig.containsKey(pair)) {
throw new IllegalArgumentException(
String.format("Message contained record from a stream that was not in the catalog. \ncatalog: %s", Jsons.serialize(catalog)));
}

final WriteConfig writeConfig = pairToWriteConfig.get(pair);
final String schemaName = writeConfig.getOutputSchemaName();
final String stageName = stagingOperations.getStageName(schemaName, writeConfig.getStreamName());
final String stagingPath =
stagingOperations.getStagingPath(StagingConsumerFactory.RANDOM_CONNECTION_ID, schemaName, writeConfig.getStreamName(),
writeConfig.getWriteDatetime());
try (writer) {
writer.flush();
final String stagedFile = stagingOperations.uploadRecordsToStage(database, writer, schemaName, stageName, stagingPath);
GeneralStagingFunctions.copyIntoTableFromStage(database, stageName, stagingPath, List.of(stagedFile), writeConfig.getOutputTableName(),
schemaName,
stagingOperations);
} catch (final Exception e) {
log.error("Failed to flush and commit buffer data into destination's raw table", e);
throw new RuntimeException("Failed to upload buffer to stage and commit to destination", e);
}
};
}

private static AirbyteStreamNameNamespacePair toNameNamespacePair(final WriteConfig config) {
return new AirbyteStreamNameNamespacePair(config.getStreamName(), config.getNamespace());
}

}

0 comments on commit 21b36e7

Please sign in to comment.