Skip to content

Commit

Permalink
Checkpointing: documentation of lifecycle of how data flows through d…
Browse files Browse the repository at this point in the history
…estination connectors (#20118)

* Adds javadoc comments to files that touch writing data from destination connector into the destination

* Updates comments for copyTableQuery

* Fixed linter issues

* Fixed remaining formatting issues with javadoc paragraphs
  • Loading branch information
ryankfu committed Dec 16, 2022
1 parent 8af748e commit d03757a
Show file tree
Hide file tree
Showing 25 changed files with 250 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ public class EnvVariableFeatureFlags implements FeatureFlags {

public static final String USE_STREAM_CAPABLE_STATE = "USE_STREAM_CAPABLE_STATE";
public static final String AUTO_DETECT_SCHEMA = "AUTO_DETECT_SCHEMA";
// Set this value to true to see all messages from the source to destination, set to one second
// emission
public static final String LOG_CONNECTOR_MESSAGES = "LOG_CONNECTOR_MESSAGES";
public static final String NEED_STATE_VALIDATION = "NEED_STATE_VALIDATION";
public static final String APPLY_FIELD_SELECTION = "APPLY_FIELD_SELECTION";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,20 @@ public interface AirbyteMessageConsumer extends CheckedConsumer<AirbyteMessage,

void start() throws Exception;

/**
* Consumes all {@link AirbyteMessage}s
*
* @param message {@link AirbyteMessage} to be processed
* @throws Exception
*/
@Override
void accept(AirbyteMessage message) throws Exception;

/**
* Executes at the end of consumption of all incoming streamed data regardless of success or failure
*
* @throws Exception
*/
@Override
void close() throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@
* the {@link AirbyteMessageConsumer} interface. The original interface methods are wrapped in
* generic exception handlers - any exception is caught and logged.
*
* Two methods are intended for extension: - startTracked: Wraps set up of necessary
* infrastructure/configuration before message consumption. - acceptTracked: Wraps actual processing
* of each {@link io.airbyte.protocol.models.v0.AirbyteMessage}.
* Two methods are intended for extension:
* <ul>
* <li>startTracked: Wraps set up of necessary infrastructure/configuration before message
* consumption.</li>
* <li>acceptTracked: Wraps actual processing of each
* {@link io.airbyte.protocol.models.v0.AirbyteMessage}.</li>
* </ul>
*
* Though not necessary, we highly encourage using this class when implementing destinations. See
* child classes for examples.
Expand All @@ -26,6 +30,11 @@ public abstract class FailureTrackingAirbyteMessageConsumer implements AirbyteMe

private boolean hasFailed = false;

/**
* Wraps setup of necessary infrastructure/configuration before message consumption
*
* @throws Exception
*/
protected abstract void startTracked() throws Exception;

@Override
Expand All @@ -39,6 +48,12 @@ public void start() throws Exception {
}
}

/**
* Wraps actual processing of each {@link AirbyteMessage}
*
* @param msg {@link AirbyteMessage} to be processed
* @throws Exception
*/
protected abstract void acceptTracked(AirbyteMessage msg) throws Exception;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception {
}

// if the buffer flushes, update the states appropriately.
// TODO: ryankfu (if true, this is where bundling up logic to also commit to airbyte_raw table)
if (bufferingStrategy.addRecord(stream, message)) {
markStatesAsFlushedToTmpDestination();
}
Expand Down Expand Up @@ -191,6 +192,7 @@ protected void close(final boolean hasFailed) throws Exception {
onClose.accept(false);
}

// TODO: (ryankfu) at this section for when we close the stream and mark stream as committed
// if onClose succeeds without exception then we can emit the state record because it means its
// records were not only flushed, but committed.
stateManager.markFlushedAsCommitted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@
* during a sync, so a single instance of this manager is sufficient for a destination to track
* state during a sync.
*
* <p>
* Strategy: Delegates state messages of each type to a StateManager that is appropriate to that
* state type.
* </p>
*
* <p>
* Per the protocol, if state type is not set, assumes the LEGACY state type.
* </p>
*/
public class DefaultDestStateLifecycleManager implements DestStateLifecycleManager {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
* single state message represents the state for the ENTIRE connection. At the time of writing,
* GLOBAL and LEGACY state types are the state type that match this pattern.
*
* <p>
* Does NOT store duplicates. Because each state message represents the entire state for the
* connection, it only stores (and emits) the LAST state it received at each phase.
* </p>
*/
public class DestSingleStateLifecycleManager implements DestStateLifecycleManager {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
* stream. In these cases, at each state of the process, it tracks the LAST state message for EACH
* stream (no duplicates!).
*
* <p>
* Guaranteed to output state messages in order relative to other messages of the SAME state. Does
* NOT guarantee that state messages of different streams will be output in the order in which they
* were received. State messages across streams will be emitted in alphabetical order (primary sort
* on namespace, secondary on name).
* </p>
*/
public class DestStreamStateLifecycleManager implements DestStateLifecycleManager {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public interface BufferingStrategy extends AutoCloseable {
* Add a new message to the buffer while consuming streams
*
* @param stream - stream associated with record
* @param message - message to buffer
* @param message - {@link AirbyteMessage} to buffer
* @return true if this record cause ALL records in the buffer to flush, otherwise false.
* @throws Exception throw on failure
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ public class SerializedBufferingStrategy implements BufferingStrategy {
private long totalBufferSizeInBytes;
private final ConfiguredAirbyteCatalog catalog;

/**
* Creates instance of Serialized Buffering Strategy used to handle the logic of flushing buffer
* with an associated buffer type
*
* @param onCreateBuffer type of buffer used upon creation
* @param catalog collection of {@link io.airbyte.protocol.models.ConfiguredAirbyteStream}
* @param onStreamFlush buffer flush logic used throughout the streaming of messages
*/
public SerializedBufferingStrategy(final CheckedBiFunction<AirbyteStreamNameNamespacePair, ConfiguredAirbyteCatalog, SerializableBuffer, Exception> onCreateBuffer,
final ConfiguredAirbyteCatalog catalog,
final CheckedBiConsumer<AirbyteStreamNameNamespacePair, SerializableBuffer, Exception> onStreamFlush) {
Expand All @@ -39,6 +47,15 @@ public SerializedBufferingStrategy(final CheckedBiFunction<AirbyteStreamNameName
this.totalBufferSizeInBytes = 0;
}

/**
* Handles both adding records and when buffer is full to also flush
*
* @param stream - stream associated with record
* @param message - {@link AirbyteMessage} to buffer
* @return true if this {@link io.airbyte.protocol.models.AirbyteRecordMessage} causes buffer to
* flush all messages, otherwise false
* @throws Exception
*/
@Override
public boolean addRecord(final AirbyteStreamNameNamespacePair stream, final AirbyteMessage message) throws Exception {
boolean didFlush = false;
Expand Down Expand Up @@ -69,7 +86,7 @@ public boolean addRecord(final AirbyteStreamNameNamespacePair stream, final Airb
flushWriter(stream, streamBuffer);
/*
* Note: We intentionally do not mark didFlush as true in the branch of this conditional. Because
* this branch flushes individual streams, there is no guaranteee that it will flush records in the
* this branch flushes individual streams, there is no guarantee that it will flush records in the
* same order that state messages were received. The outcome here is that records get flushed but
* our updating of which state messages have been flushed falls behind.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ protected void startTracked() {
// todo (cgardens) - move contents of #write into this method.
}

/**
* Processes STATE and RECORD {@link AirbyteMessage} with all else logged as unexpected
*
* <li>For STATE messages emit messages back to the platform</li>
* <li>For RECORD messages upload message to associated Airbyte Stream. This means that RECORDS will be associated with their respective streams when
* more than one record exists</li>
*
* @param message {@link AirbyteMessage} to be processed
*/
@Override
public void acceptTracked(final AirbyteMessage message) {
if (message.getType() == Type.STATE) {
Expand All @@ -56,6 +65,11 @@ public void acceptTracked(final AirbyteMessage message) {
}
}

/**
* Processes {@link io.airbyte.protocol.models.AirbyteRecordMessage} by writing Airbyte stream data to Big Query Writer
*
* @param message record to be written
*/
private void processRecord(final AirbyteMessage message) {
final var pair = AirbyteStreamNameNamespacePair.fromRecordMessage(message.getRecord());
uploaderMap.get(pair).upload(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ public DatabricksAzureBlobStorageStreamCopier(final String stagingFolder,
final DatabricksDestinationConfig databricksConfig,
final ExtendedNameTransformer nameTransformer,
final SqlOperations sqlOperations,
SpecializedBlobClientBuilder specializedBlobClientBuilder,
AzureBlobStorageConfig azureConfig) {
final SpecializedBlobClientBuilder specializedBlobClientBuilder,
final AzureBlobStorageConfig azureConfig) {
super(stagingFolder, schema, configuredStream, database, databricksConfig, nameTransformer, sqlOperations);

this.specializedBlobClientBuilder = specializedBlobClientBuilder;
Expand Down Expand Up @@ -103,11 +103,11 @@ public String prepareStagingFile() {

try {

String accountKey = "doesntmatter";
String containerPath = String.format("%s/%s/%s/%s/", azureConfig.getContainerName(), stagingFolder, schemaName, streamName);
AzureBlobStorageFormatConfig formatConfig =
final String accountKey = "doesntmatter";
final String containerPath = String.format("%s/%s/%s/%s/", azureConfig.getContainerName(), stagingFolder, schemaName, streamName);
final AzureBlobStorageFormatConfig formatConfig =
new AzureBlobStorageCsvFormatConfig(Jsons.jsonNode(Map.of("flattening", "Root level flattening")));
AzureBlobStorageDestinationConfig config = new AzureBlobStorageDestinationConfig(azureConfig.getEndpointUrl(),
final AzureBlobStorageDestinationConfig config = new AzureBlobStorageDestinationConfig(azureConfig.getEndpointUrl(),
azureConfig.getAccountName(), accountKey, containerPath, 5,
formatConfig);
this.csvWriters.put(currentFile, new AzureBlobStorageCsvWriter(config, appendBlobClient, configuredStream));
Expand Down Expand Up @@ -143,7 +143,7 @@ protected String getCreateTempTableStatement() {
final AirbyteStream stream = configuredStream.getStream();
LOGGER.info("Json schema for stream {}: {}", stream.getName(), stream.getJsonSchema());

String schemaString = getSchemaString();
final String schemaString = getSchemaString();

LOGGER.info("[Stream {}] tmp table schema: {}", stream.getName(), schemaString);

Expand All @@ -155,13 +155,13 @@ protected String getCreateTempTableStatement() {

private String getSchemaString() {
// Databricks requires schema to be provided when creating delta table from CSV
StringBuilder schemaString = new StringBuilder("_airbyte_ab_id string, _airbyte_emitted_at string");
ObjectNode properties = (ObjectNode) configuredStream.getStream().getJsonSchema().get("properties");
List<String> recordHeaders = MoreIterators.toList(properties.fieldNames())
final StringBuilder schemaString = new StringBuilder("_airbyte_ab_id string, _airbyte_emitted_at string");
final ObjectNode properties = (ObjectNode) configuredStream.getStream().getJsonSchema().get("properties");
final List<String> recordHeaders = MoreIterators.toList(properties.fieldNames())
.stream().sorted().toList();
for (String header : recordHeaders) {
JsonNode node = properties.get(header);
String type = node.get("type").asText();
for (final String header : recordHeaders) {
final JsonNode node = properties.get(header);
final String type = node.get("type").asText();
schemaString.append(", `").append(header).append("` ").append(type.equals("number") ? "double" : type);
}
return schemaString.toString();
Expand All @@ -170,20 +170,20 @@ private String getSchemaString() {
@Override
public String generateMergeStatement(final String destTableName) {
LOGGER.info("Preparing to merge tmp table {} to dest table: {}, schema: {}, in destination.", tmpTableName, destTableName, schemaName);
var queries = new StringBuilder();
final var queries = new StringBuilder();
if (destinationSyncMode.equals(DestinationSyncMode.OVERWRITE)) {
queries.append(sqlOperations.truncateTableQuery(database, schemaName, destTableName));
LOGGER.info("Destination OVERWRITE mode detected. Dest table: {}, schema: {}, truncated.", destTableName, schemaName);
}
queries.append(sqlOperations.copyTableQuery(database, schemaName, tmpTableName, destTableName));
queries.append(sqlOperations.insertTableQuery(database, schemaName, tmpTableName, destTableName));

return queries.toString();
}

@Override
protected void deleteStagingFile() {
LOGGER.info("Begin cleaning azure blob staging files.");
for (AppendBlobClient appendBlobClient : blobClients.values()) {
for (final AppendBlobClient appendBlobClient : blobClients.values()) {
appendBlobClient.delete();
}
LOGGER.info("Azure Blob staging files cleaned.");
Expand All @@ -192,8 +192,8 @@ protected void deleteStagingFile() {
@Override
public void closeNonCurrentStagingFileWriters() throws Exception {
LOGGER.info("Begin closing non current file writers");
Set<String> removedKeys = new HashSet<>();
for (String key : activeStagingWriterFileNames) {
final Set<String> removedKeys = new HashSet<>();
for (final String key : activeStagingWriterFileNames) {
if (!key.equals(currentFile)) {
csvWriters.get(key).close(false);
csvWriters.remove(key);
Expand All @@ -208,8 +208,8 @@ public String getCurrentFile() {
return currentFile;
}

private static BlobContainerClient getBlobContainerClient(AppendBlobClient appendBlobClient) {
BlobContainerClient containerClient = appendBlobClient.getContainerClient();
private static BlobContainerClient getBlobContainerClient(final AppendBlobClient appendBlobClient) {
final BlobContainerClient containerClient = appendBlobClient.getContainerClient();
if (!containerClient.exists()) {
containerClient.create();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ private static OnCloseFunction onCloseFunction(final JdbcDatabase database,
case APPEND_DEDUP -> {}
default -> throw new IllegalStateException("Unrecognized sync mode: " + writeConfig.getSyncMode());
}
queryList.add(sqlOperations.copyTableQuery(database, schemaName, srcTableName, dstTableName));
queryList.add(sqlOperations.insertTableQuery(database, schemaName, srcTableName, dstTableName));
}

LOGGER.info("Executing finalization of tables.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public String truncateTableQuery(final JdbcDatabase database, final String schem
}

@Override
public String copyTableQuery(final JdbcDatabase database, final String schemaName, final String srcTableName, final String dstTableName) {
public String insertTableQuery(final JdbcDatabase database, final String schemaName, final String srcTableName, final String dstTableName) {
return String.format("INSERT INTO %s.%s SELECT * FROM %s.%s;\n", schemaName, dstTableName, schemaName, srcTableName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,18 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// todo (cgardens) - is it necessary to expose so much configurability in this interface. review if
// we can narrow the surface area.
/**
* todo (cgardens) - is it necessary to expose so much configurability in this interface. review if we can narrow the surface area.
*
* SQL queries required for successfully syncing to a destination connector. These operations include the ability to:
* <ul>
* <li>Write - insert records from source connector</li>
* <li>Create - overloaded function but primarily to create tables if they don't exist (e.g. tmp tables to "stage" records before finalizing
* to final table</li>
* <li>Drop - removes a table from the schema</li>
* <li>Insert - move data from one table to another table - usually used for inserting data from tmp to final table (aka airbyte_raw)</li>
* </ul>
*/
public interface SqlOperations {

Logger LOGGER = LoggerFactory.getLogger(JdbcBufferedConsumerFactory.class);
Expand Down Expand Up @@ -88,16 +98,18 @@ default boolean isSchemaExists(final JdbcDatabase database, final String schemaN
void insertRecords(JdbcDatabase database, List<AirbyteRecordMessage> records, String schemaName, String tableName) throws Exception;

/**
* Query to copy all records from source table to destination table. Both tables must be in the
* Query to insert all records from source table to destination table. Both tables must be in the
* specified schema. Assumes both table exist.
*
* <p>NOTE: this is an append-only operation meaning that data can be duplicated</p>
*
* @param database Database that the connector is syncing
* @param schemaName Name of schema
* @param sourceTableName Name of source table
* @param destinationTableName Name of destination table
* @return Query
* @return SQL Query string
*/
String copyTableQuery(JdbcDatabase database, String schemaName, String sourceTableName, String destinationTableName);
String insertTableQuery(JdbcDatabase database, String schemaName, String sourceTableName, String destinationTableName);

/**
* Given an arbitrary number of queries, execute a transaction.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@
import org.joda.time.DateTimeZone;

/**
* Write configuration POJO for all destinations extending {@link AbstractJdbcDestination}.
* Write configuration POJO (plain old java object) for all destinations extending {@link AbstractJdbcDestination}.
*/
public class WriteConfig {

private final String streamName;

private final String namespace;

private final String outputSchemaName;
private final String tmpTableName;
private final String outputTableName;
Expand Down
Loading

0 comments on commit d03757a

Please sign in to comment.