Skip to content

Commit

Permalink
Async Snowflake Destination (#26703)
Browse files Browse the repository at this point in the history
* snowflake at end of coding retreat week

* turn off async snowflake

* add comment

* Fixed test to point to the ShimMessageConsumer instead of AsyncMessageConsumer

* Automated Change

---------

Co-authored-by: ryankfu <ryan.fu@airbyte.io>
Co-authored-by: ryankfu <ryankfu@users.noreply.github.com>
  • Loading branch information
3 people committed May 31, 2023
1 parent 88438bc commit 4ac62f3
Show file tree
Hide file tree
Showing 27 changed files with 1,865 additions and 304 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@

package io.airbyte.integrations.destination_async;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.SerializedAirbyteMessageConsumer;
import io.airbyte.integrations.destination.buffered_stream_consumer.OnStartFunction;
import io.airbyte.integrations.destination_async.buffers.BufferEnqueue;
import io.airbyte.integrations.destination_async.buffers.BufferManager;
import io.airbyte.integrations.destination_async.partial_messages.PartialAirbyteMessage;
import io.airbyte.integrations.destination_async.state.FlushFailure;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.util.Optional;
Expand All @@ -31,19 +35,18 @@
* {@link FlushWorkers}. See the other linked class for more detail.
*/
@Slf4j
public class AsyncStreamConsumer implements AirbyteMessageConsumer {
public class AsyncStreamConsumer implements SerializedAirbyteMessageConsumer {

private static final Logger LOGGER = LoggerFactory.getLogger(AsyncStreamConsumer.class);

private static final String NON_STREAM_STATE_IDENTIFIER = "GLOBAL";
private final Consumer<AirbyteMessage> outputRecordCollector;
private final OnStartFunction onStart;
private final OnCloseFunction onClose;
private final ConfiguredAirbyteCatalog catalog;
private final BufferManager bufferManager;
private final BufferEnqueue bufferEnqueue;
private final FlushWorkers flushWorkers;
private final Set<StreamDescriptor> streamNames;
private final FlushFailure flushFailure;

private boolean hasStarted;
private boolean hasClosed;
Expand All @@ -54,16 +57,27 @@ public AsyncStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
final DestinationFlushFunction flusher,
final ConfiguredAirbyteCatalog catalog,
final BufferManager bufferManager) {
this(outputRecordCollector, onStart, onClose, flusher, catalog, bufferManager, new FlushFailure());
}

@VisibleForTesting
public AsyncStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
final OnStartFunction onStart,
final OnCloseFunction onClose,
final DestinationFlushFunction flusher,
final ConfiguredAirbyteCatalog catalog,
final BufferManager bufferManager,
final FlushFailure flushFailure) {
hasStarted = false;
hasClosed = false;

this.outputRecordCollector = outputRecordCollector;
this.onStart = onStart;
this.onClose = onClose;
this.catalog = catalog;
this.bufferManager = bufferManager;
bufferEnqueue = bufferManager.getBufferEnqueue();
flushWorkers = new FlushWorkers(this.bufferManager.getBufferDequeue(), flusher);
this.flushFailure = flushFailure;
flushWorkers = new FlushWorkers(bufferManager.getBufferDequeue(), flusher, outputRecordCollector, flushFailure, bufferManager.getStateManager());
streamNames = StreamDescriptorUtils.fromConfiguredCatalog(catalog);
}

Expand All @@ -79,15 +93,77 @@ public void start() throws Exception {
}

@Override
public void accept(final AirbyteMessage message) throws Exception {
public void accept(final String messageString, final Integer sizeInBytes) throws Exception {
Preconditions.checkState(hasStarted, "Cannot accept records until consumer has started");
propagateFlushWorkerExceptionIfPresent();
/*
* intentionally putting extractStream outside the buffer manager so that if in the future we want
* to try to use a threadpool to partial deserialize to get record type and stream name, we can do
* it without touching buffer manager.
* to try to use a thread pool to partially deserialize to get record type and stream name, we can
* do it without touching buffer manager.
*/
extractStream(message)
.ifPresent(streamDescriptor -> bufferEnqueue.addRecord(streamDescriptor, message));
deserializeAirbyteMessage(messageString)
.ifPresent(message -> {
if (message.getType() == Type.RECORD) {
validateRecord(message);
}

bufferEnqueue.addRecord(message, sizeInBytes);
});
}

/**
* Deserializes to a {@link PartialAirbyteMessage} which can represent both a Record or a State
* Message
*
* @param messageString the string to deserialize
* @return PartialAirbyteMessage if the message is valid, empty otherwise
*/
private Optional<PartialAirbyteMessage> deserializeAirbyteMessage(final String messageString) {
final Optional<PartialAirbyteMessage> messageOptional = Jsons.tryDeserialize(messageString, PartialAirbyteMessage.class)
.map(partial -> partial.withSerialized(messageString));
if (messageOptional.isPresent()) {
return messageOptional;
} else {
if (isStateMessage(messageString)) {
throw new IllegalStateException("Invalid state message: " + messageString);
} else {
LOGGER.error("Received invalid message: " + messageString);
return Optional.empty();
}
}
}

/**
* Tests whether the provided JSON string represents a state message.
*
* @param input a JSON string that represents an {@link AirbyteMessage}.
* @return {@code true} if the message is a state message, {@code false} otherwise.
*/
private static boolean isStateMessage(final String input) {
final Optional<AirbyteTypeMessage> deserialized = Jsons.tryDeserialize(input, AirbyteTypeMessage.class);
return deserialized.filter(airbyteTypeMessage -> airbyteTypeMessage.getType() == Type.STATE).isPresent();
}

/**
* Custom class that can be used to parse a JSON message to determine the type of the represented
* {@link AirbyteMessage}.
*/
private static class AirbyteTypeMessage {

@JsonProperty("type")
@JsonPropertyDescription("Message type")
private AirbyteMessage.Type type;

@JsonProperty("type")
public AirbyteMessage.Type getType() {
return type;
}

@JsonProperty("type")
public void setType(final AirbyteMessage.Type type) {
this.type = type;
}

}

@Override
Expand All @@ -100,48 +176,32 @@ public void close() throws Exception {
// we need to close the workers before closing the bufferManagers (and underlying buffers)
// or we risk in-memory data.
flushWorkers.close();

bufferManager.close();
onClose.call();
LOGGER.info("{} closed.", AsyncStreamConsumer.class);

// as this throws an exception, we need to be after all other close functions.
propagateFlushWorkerExceptionIfPresent();
LOGGER.info("{} closed", AsyncStreamConsumer.class);
}

// todo (cgardens) - handle global state.
/**
* Extract the stream from the message, if the message is a record or state. Otherwise, we don't
* care.
*
* @param message message to extract stream from
* @return stream descriptor if the message is a record or state, otherwise empty. In the case of
* global state messages the stream descriptor is hardcoded
*/
private Optional<StreamDescriptor> extractStream(final AirbyteMessage message) {
if (message.getType() == Type.RECORD) {
final StreamDescriptor streamDescriptor = new StreamDescriptor()
.withNamespace(message.getRecord().getNamespace())
.withName(message.getRecord().getStream());

validateRecord(message, streamDescriptor);

return Optional.of(streamDescriptor);
} else if (message.getType() == Type.STATE) {
if (message.getState().getType() == AirbyteStateType.STREAM) {
return Optional.of(message.getState().getStream().getStreamDescriptor());
} else {
return Optional.of(new StreamDescriptor().withNamespace(NON_STREAM_STATE_IDENTIFIER).withNamespace(NON_STREAM_STATE_IDENTIFIER));
}
} else {
return Optional.empty();
private void propagateFlushWorkerExceptionIfPresent() throws Exception {
if (flushFailure.isFailed()) {
throw flushFailure.getException();
}
}

private void validateRecord(final AirbyteMessage message, final StreamDescriptor streamDescriptor) {
private void validateRecord(final PartialAirbyteMessage message) {
final StreamDescriptor streamDescriptor = new StreamDescriptor()
.withNamespace(message.getRecord().getNamespace())
.withName(message.getRecord().getStream());
// if stream is not part of list of streams to sync to then throw invalid stream exception
if (!streamNames.contains(streamDescriptor)) {
throwUnrecognizedStream(catalog, message);
}
}

private static void throwUnrecognizedStream(final ConfiguredAirbyteCatalog catalog, final AirbyteMessage message) {
private static void throwUnrecognizedStream(final ConfiguredAirbyteCatalog catalog, final PartialAirbyteMessage message) {
throw new IllegalArgumentException(
String.format("Message contained record from a stream that was not in the catalog. \ncatalog: %s , \nmessage: %s",
Jsons.serialize(catalog), Jsons.serialize(message)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.destination_async;

import io.airbyte.integrations.destination_async.partial_messages.PartialAirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.util.stream.Stream;
Expand Down Expand Up @@ -35,7 +36,7 @@ public interface DestinationFlushFunction {
* {@link #getOptimalBatchSizeBytes()} size
* @throws Exception
*/
void flush(StreamDescriptor decs, Stream<AirbyteMessage> stream) throws Exception;
void flush(StreamDescriptor decs, Stream<PartialAirbyteMessage> stream) throws Exception;

/**
* When invoking {@link #flush(StreamDescriptor, Stream)}, best effort attempt to invoke flush with
Expand Down

0 comments on commit 4ac62f3

Please sign in to comment.