Skip to content

Commit

Permalink
CDK destinations: Future based output reader for T+D test (#34727)
Browse files Browse the repository at this point in the history
  • Loading branch information
gisripa committed Feb 1, 2024
1 parent b8fb1ee commit ab6ea6d
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 30 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.16.4 | 2024-02-01 | [\#34727](https://github.com/airbytehq/airbyte/pull/34727) | Add future based stdout consumer in BaseTypingDedupingTest |
| 0.16.3 | 2024-01-30 | [\#34669](https://github.com/airbytehq/airbyte/pull/34669) | Fix org.apache.logging.log4j:log4j-slf4j-impl version conflicts. |
| 0.16.2 | 2024-01-29 | [\#34630](https://github.com/airbytehq/airbyte/pull/34630) | expose NamingTransformer to sub-classes in destinations JdbcSqlGenerator. |
| 0.16.1 | 2024-01-29 | [\#34533](https://github.com/airbytehq/airbyte/pull/34533) | Add a safe method to execute DatabaseMetadata's Resultset returning queries. |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.16.3
version=0.16.4
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.stream.Stream;
Expand Down Expand Up @@ -598,18 +599,21 @@ public void identicalNameSimultaneousSync() throws Exception {
// Start two concurrent syncs
final AirbyteDestination sync1 = startSync(catalog1);
final AirbyteDestination sync2 = startSync(catalog2);
CompletableFuture<List<io.airbyte.protocol.models.AirbyteMessage>> outFuture1 = destinationOutputFuture(sync1);
CompletableFuture<List<io.airbyte.protocol.models.AirbyteMessage>> outFuture2 = destinationOutputFuture(sync2);

// Write some messages to both syncs. Write a lot of data to sync 2 to try and force a flush.
pushMessages(messages1, sync1);
for (int i = 0; i < 100_000; i++) {
pushMessages(messages2, sync2);
}
endSync(sync1);
endSync(sync1, outFuture1);
// Write some more messages to the second sync. It should not be affected by the first sync's
// shutdown.
for (int i = 0; i < 100_000; i++) {
pushMessages(messages2, sync2);
}
endSync(sync2);
endSync(sync2, outFuture2);

// For simplicity, don't verify the raw table. Assume that if the final table is correct, then
// the raw data is correct. This is generally a safe assumption.
Expand Down Expand Up @@ -767,8 +771,26 @@ protected void runSync(final ConfiguredAirbyteCatalog catalog,
final Function<JsonNode, JsonNode> configTransformer)
throws Exception {
final AirbyteDestination destination = startSync(catalog, imageName, configTransformer);
final CompletableFuture<List<io.airbyte.protocol.models.AirbyteMessage>> outputFuture = destinationOutputFuture(destination);
pushMessages(messages, destination);
endSync(destination);
endSync(destination, outputFuture);
}

// In the background, read messages from the destination until it terminates. We need to clear
// stdout in real time, to prevent the buffer from filling up and blocking the destination.
private CompletableFuture<List<io.airbyte.protocol.models.AirbyteMessage>> destinationOutputFuture(final AirbyteDestination destination) {
final CompletableFuture<List<io.airbyte.protocol.models.AirbyteMessage>> outputFuture = new CompletableFuture<>();
Executors.newSingleThreadExecutor().submit((Callable<Void>) () -> {
final List<io.airbyte.protocol.models.AirbyteMessage> destinationMessages = new ArrayList<>();
while (!destination.isFinished()) {
// attemptRead isn't threadsafe, we read stdout fully here.
// i.e. we shouldn't call attemptRead anywhere else.
destination.attemptRead().ifPresent(destinationMessages::add);
}
outputFuture.complete(destinationMessages);
return null;
});
return outputFuture;
}

protected AirbyteDestination startSync(final ConfiguredAirbyteCatalog catalog) throws Exception {
Expand Down Expand Up @@ -825,26 +847,6 @@ protected AirbyteDestination startSync(final ConfiguredAirbyteCatalog catalog,

destination.start(destinationConfig, jobRoot, Collections.emptyMap());

// In the background, read messages from the destination until it terminates. We need to clear
// stdout in real time, to prevent the buffer from filling up and blocking the destination.
// TODO Eventually we'll want to somehow extract the state messages while a sync is running, to
// verify checkpointing.
final ExecutorService messageHandler = Executors.newSingleThreadExecutor(
// run as a daemon thread just in case we run into an exception or something
r -> {
final Thread t = Executors.defaultThreadFactory().newThread(r);
t.setDaemon(true);
return t;
});
messageHandler.submit(() -> {
while (!destination.isFinished()) {
// attemptRead isn't threadsafe, we read stdout fully here.
// i.e. we shouldn't call attemptRead anywhere else.
destination.attemptRead();
}
});
messageHandler.shutdown();

return destination;
}

Expand All @@ -853,12 +855,13 @@ protected static void pushMessages(final List<AirbyteMessage> messages, final Ai
message -> Exceptions.toRuntime(() -> destination.accept(convertProtocolObject(message, io.airbyte.protocol.models.AirbyteMessage.class))));
}

protected static void endSync(final AirbyteDestination destination) throws Exception {
protected void endSync(final AirbyteDestination destination,
final CompletableFuture<List<io.airbyte.protocol.models.AirbyteMessage>> destinationOutputFuture)
throws Exception {
destination.notifyEndOfInput();
// Wait until process is finished cleanly.
while (!destination.isFinished()) {
Thread.sleep(1000);
}
// TODO Eventually we'll want to somehow extract the state messages while a sync is running, to
// verify checkpointing.
destinationOutputFuture.join();
destination.close();
}

Expand Down

0 comments on commit ab6ea6d

Please sign in to comment.