Skip to content

Commit

Permalink
Serialized Message Consumer (#26700)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens committed May 31, 2023
1 parent 1218e6f commit 567f839
Show file tree
Hide file tree
Showing 7 changed files with 379 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@

package io.airbyte.integrations.base;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import java.util.Optional;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;

public interface Destination extends Integration {

Expand All @@ -29,8 +35,130 @@ AirbyteMessageConsumer getConsumer(JsonNode config,
Consumer<AirbyteMessage> outputRecordCollector)
throws Exception;

/**
* Default implementation allows us to not have to touch existing destinations while avoiding a lot
* of conditional statements in {@link IntegrationRunner}.
*
* @param config config
* @param catalog catalog
* @param outputRecordCollector outputRecordCollector
* @return AirbyteMessageConsumer wrapped in SerializedAirbyteMessageConsumer to maintain legacy
* behavior.
* @throws Exception exception
*/
default SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector)
throws Exception {
return new ShimToSerializedAirbyteMessageConsumer(getConsumer(config, catalog, outputRecordCollector));
}

static void defaultOutputRecordCollector(final AirbyteMessage message) {
System.out.println(Jsons.serialize(message));
}

/**
* Backwards-compatibility wrapper for an AirbyteMessageConsumer. Strips the sizeInBytes argument
* away from the .accept call.
*/
@Slf4j
class ShimToSerializedAirbyteMessageConsumer implements SerializedAirbyteMessageConsumer {

private final AirbyteMessageConsumer consumer;

public ShimToSerializedAirbyteMessageConsumer(final AirbyteMessageConsumer consumer) {
this.consumer = consumer;
}

@Override
public void start() throws Exception {
consumer.start();
}

/**
* Consumes an {@link AirbyteMessage} for processing.
* <p>
* If the provided JSON string is invalid AND represents a {@link AirbyteMessage.Type#STATE}
* message, processing is halted. Otherwise, the invalid message is logged and execution continues.
*
* @param inputString JSON representation of an {@link AirbyteMessage}.
* @throws Exception if an invalid state message is provided or the consumer is unable to accept the
* provided message.
*/
@Override
public void accept(final String inputString, final Integer sizeInBytes) throws Exception {
consumeMessage(consumer, inputString);
}

@Override
public void close() throws Exception {
consumer.close();
}

/**
* Consumes an {@link AirbyteMessage} for processing.
* <p>
* If the provided JSON string is invalid AND represents a {@link AirbyteMessage.Type#STATE}
* message, processing is halted. Otherwise, the invalid message is logged and execution continues.
*
* @param consumer An {@link AirbyteMessageConsumer} that can handle the provided message.
* @param inputString JSON representation of an {@link AirbyteMessage}.
* @throws Exception if an invalid state message is provided or the consumer is unable to accept the
* provided message.
*/
@VisibleForTesting
static void consumeMessage(final AirbyteMessageConsumer consumer, final String inputString) throws Exception {

final Optional<AirbyteMessage> messageOptional = Jsons.tryDeserialize(inputString, AirbyteMessage.class);
if (messageOptional.isPresent()) {
consumer.accept(messageOptional.get());
} else {
if (isStateMessage(inputString)) {
throw new IllegalStateException("Invalid state message: " + inputString);
} else {
log.error("Received invalid message: " + inputString);
}
}
}

/**
* Tests whether the provided JSON string represents a state message.
*
* @param input a JSON string that represents an {@link AirbyteMessage}.
* @return {@code true} if the message is a state message, {@code false} otherwise.
*/
@SuppressWarnings("OptionalIsPresent")
private static boolean isStateMessage(final String input) {
final Optional<AirbyteTypeMessage> deserialized = Jsons.tryDeserialize(input, AirbyteTypeMessage.class);
if (deserialized.isPresent()) {
return deserialized.get().getType() == Type.STATE;
} else {
return false;
}
}

/**
* Custom class for parsing a JSON message to determine the type of the represented
* {@link AirbyteMessage}. Do the bare minimum deserialisation by reading only the type field.
*/
private static class AirbyteTypeMessage {

@JsonProperty("type")
@JsonPropertyDescription("Message type")
private AirbyteMessage.Type type;

@JsonProperty("type")
public AirbyteMessage.Type getType() {
return type;
}

@JsonProperty("type")
public void setType(final AirbyteMessage.Type type) {
this.type = type;
}

}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

package io.airbyte.integrations.base;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand All @@ -22,17 +20,17 @@
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.validation.json.JsonSchemaValidator;
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.List;
import java.util.Optional;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ThreadUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
Expand Down Expand Up @@ -76,7 +74,7 @@ public IntegrationRunner(final Source source) {
this.cliParser = cliParser;
this.outputRecordCollector = outputRecordCollector;
// integration iface covers the commands that are the same for both source and destination.
this.integration = source != null ? source : destination;
integration = source != null ? source : destination;
this.source = source;
this.destination = destination;
validator = new JsonSchemaValidator();
Expand Down Expand Up @@ -147,9 +145,20 @@ private void runInternal(final IntegrationConfig parsed) throws Exception {
final JsonNode config = parseConfig(parsed.getConfigPath());
validateConfig(integration.spec().getConnectionSpecification(), config, "WRITE");
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);
try (final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, outputRecordCollector)) {
runConsumer(consumer);
}

final Procedure consumeWriteStreamCallable = () -> {
try (final SerializedAirbyteMessageConsumer consumer = destination.getSerializedMessageConsumer(config, catalog, outputRecordCollector)) {
consumeWriteStream(consumer);
}
};

watchForOrphanThreads(
consumeWriteStreamCallable,
() -> System.exit(FORCED_EXIT_CODE),
INTERRUPT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES,
EXIT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES);
}
default -> throw new IllegalStateException("Unexpected value: " + parsed.getCommand());
}
Expand Down Expand Up @@ -197,31 +206,51 @@ private void produceMessages(final AutoCloseableIterator<AirbyteMessage> message
}

@VisibleForTesting
static void consumeWriteStream(final AirbyteMessageConsumer consumer) throws Exception {
// use a Scanner that only processes new line characters to strictly abide with the
// https://jsonlines.org/ standard
final Scanner input = new Scanner(System.in, StandardCharsets.UTF_8).useDelimiter("[\r\n]+");
consumer.start();
while (input.hasNext()) {
consumeMessage(consumer, input.next());
static void consumeWriteStream(final SerializedAirbyteMessageConsumer consumer) throws Exception {
try (final BufferedInputStream bis = new BufferedInputStream(System.in);
final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
consumeWriteStream(consumer, bis, baos);
}
}

private static void runConsumer(final AirbyteMessageConsumer consumer) throws Exception {
watchForOrphanThreads(
() -> consumeWriteStream(consumer),
() -> System.exit(FORCED_EXIT_CODE),
INTERRUPT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES,
EXIT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES);
@VisibleForTesting
static void consumeWriteStream(final SerializedAirbyteMessageConsumer consumer,
final BufferedInputStream bis,
final ByteArrayOutputStream baos)
throws Exception {
consumer.start();

final byte[] buffer = new byte[8192]; // 8K buffer
int bytesRead;
boolean lastWasNewLine = false;

while ((bytesRead = bis.read(buffer)) != -1) {
for (int i = 0; i < bytesRead; i++) {
final byte b = buffer[i];
if (b == '\n' || b == '\r') {
if (!lastWasNewLine && baos.size() > 0) {
consumer.accept(baos.toString(StandardCharsets.UTF_8), baos.size());
baos.reset();
}
lastWasNewLine = true;
} else {
baos.write(b);
lastWasNewLine = false;
}
}
}

// Handle last line if there's one
if (baos.size() > 0) {
consumer.accept(baos.toString(StandardCharsets.UTF_8), baos.size());
}
}

/**
* This method calls a runMethod and make sure that it won't produce orphan non-daemon active
* threads once it is done. Active non-daemon threads blocks JVM from exiting when the main thread
* is done, whereas daemon ones don't.
*
* <p>
* If any active non-daemon threads would be left as orphans, this method will schedule some
* interrupt/exit hooks after giving it some time delay to close up properly. It is generally
* preferred to have a proper closing sequence from children threads instead of interrupting or
Expand All @@ -244,7 +273,7 @@ static void watchForOrphanThreads(final Procedure runMethod,
.stream()
// daemon threads don't block the JVM if the main `currentThread` exits, so they are not problematic
.filter(runningThread -> !runningThread.getName().equals(currentThread.getName()) && !runningThread.isDaemon())
.collect(Collectors.toList());
.toList();
if (!runningThreads.isEmpty()) {
LOGGER.warn("""
The main thread is exiting while children non-daemon threads from a connector are still active.
Expand Down Expand Up @@ -275,32 +304,6 @@ static void watchForOrphanThreads(final Procedure runMethod,
}
}

/**
* Consumes an {@link AirbyteMessage} for processing.
*
* If the provided JSON string is invalid AND represents a {@link AirbyteMessage.Type#STATE}
* message, processing is halted. Otherwise, the invalid message is logged and execution continues.
*
* @param consumer An {@link AirbyteMessageConsumer} that can handle the provided message.
* @param inputString JSON representation of an {@link AirbyteMessage}.
* @throws Exception if an invalid state message is provided or the consumer is unable to accept the
* provided message.
*/
@VisibleForTesting
static void consumeMessage(final AirbyteMessageConsumer consumer, final String inputString) throws Exception {

final Optional<AirbyteMessage> messageOptional = Jsons.tryDeserialize(inputString, AirbyteMessage.class);
if (messageOptional.isPresent()) {
consumer.accept(messageOptional.get());
} else {
if (isStateMessage(inputString)) {
throw new IllegalStateException("Invalid state message: " + inputString);
} else {
LOGGER.error("Received invalid message: " + inputString);
}
}
}

private static String dumpThread(final Thread thread) {
return String.format("%s (%s)\n Thread stacktrace: %s", thread.getName(), thread.getState(),
Strings.join(List.of(thread.getStackTrace()), "\n at "));
Expand Down Expand Up @@ -336,41 +339,4 @@ static String parseConnectorVersion(final String connectorImage) {
return tokens[tokens.length - 1];
}

/**
* Tests whether the provided JSON string represents a state message.
*
* @param input a JSON string that represents an {@link AirbyteMessage}.
* @return {@code true} if the message is a state message, {@code false} otherwise.
*/
private static boolean isStateMessage(final String input) {
final Optional<AirbyteTypeMessage> deserialized = Jsons.tryDeserialize(input, AirbyteTypeMessage.class);
if (deserialized.isPresent()) {
return deserialized.get().getType() == Type.STATE;
} else {
return false;
}
}

/**
* Custom class that can be used to parse a JSON message to determine the type of the represented
* {@link AirbyteMessage}.
*/
private static class AirbyteTypeMessage {

@JsonProperty("type")
@JsonPropertyDescription("Message type")
private AirbyteMessage.Type type;

@JsonProperty("type")
public AirbyteMessage.Type getType() {
return type;
}

@JsonProperty("type")
public void setType(final AirbyteMessage.Type type) {
this.type = type;
}

}

}

0 comments on commit 567f839

Please sign in to comment.