From 51866fd37a3b702b406612898697ba0f977abfb9 Mon Sep 17 00:00:00 2001 From: Lake Mossman Date: Wed, 17 Nov 2021 15:06:14 -0800 Subject: [PATCH] Stop using gentle close with heartbeat (#8036) * use gentle close instead of gentle close with heartbeat when closing source * also lower destination process gentle close duration * add test connectors --- .../e2e_test/FailAfterNDestination.java | 69 +++++++++++++++++++ .../e2e_test/TestingDestinations.java | 4 +- .../src/main/resources/spec.json | 16 +++++ .../source/e2e_test/InfiniteFeedSource.java | 12 ++++ .../src/main/resources/spec.json | 7 +- .../java/io/airbyte/workers/WorkerUtils.java | 25 ++----- .../airbyte/DefaultAirbyteDestination.java | 2 +- .../airbyte/DefaultAirbyteSource.java | 15 ++-- 8 files changed, 118 insertions(+), 32 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/FailAfterNDestination.java diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/FailAfterNDestination.java b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/FailAfterNDestination.java new file mode 100644 index 00000000000000..e45fcdaf06f31f --- /dev/null +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/FailAfterNDestination.java @@ -0,0 +1,69 @@ +package io.airbyte.integrations.destination.e2e_test; + +import static java.lang.Thread.sleep; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.integrations.BaseConnector; +import io.airbyte.integrations.base.AirbyteMessageConsumer; +import io.airbyte.integrations.base.Destination; +import io.airbyte.protocol.models.AirbyteConnectionStatus; +import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteMessage.Type; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FailAfterNDestination extends BaseConnector implements Destination { + + private static final Logger LOGGER = LoggerFactory.getLogger(FailAfterNDestination.class); + + @Override + public AirbyteConnectionStatus check(final JsonNode config) { + return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); + } + + @Override + public AirbyteMessageConsumer getConsumer(final JsonNode config, + final ConfiguredAirbyteCatalog catalog, + final Consumer outputRecordCollector) { + return new FailAfterNConsumer(config.get("num_messages").asLong(), outputRecordCollector); + } + + public static class FailAfterNConsumer implements AirbyteMessageConsumer { + + private final Consumer outputRecordCollector; + private final long numMessagesAfterWhichToFail; + private long numMessagesSoFar; + + public FailAfterNConsumer(final long numMessagesAfterWhichToFail, final Consumer outputRecordCollector) { + this.numMessagesAfterWhichToFail = numMessagesAfterWhichToFail; + this.outputRecordCollector = outputRecordCollector; + this.numMessagesSoFar = 0; + } + + @Override + public void start() {} + + @Override + public void accept(final AirbyteMessage message) throws Exception { + LOGGER.info("received record: {}", message); + numMessagesSoFar += 1; + LOGGER.info("received {} messages so far", numMessagesSoFar); + + if (numMessagesSoFar > numMessagesAfterWhichToFail) { + throw new IllegalStateException("Forcing a fail after processing " + numMessagesAfterWhichToFail + " messages."); + } + + if (message.getType() == Type.STATE) { + LOGGER.info("emitting state: {}", message); + outputRecordCollector.accept(message); + } + } + + @Override + public void close() {} + + } +} diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/TestingDestinations.java b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/TestingDestinations.java index 51974f4751dcdc..c6bf3ee08dbedc 100644 --- a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/TestingDestinations.java +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/TestingDestinations.java @@ -27,7 +27,8 @@ public class TestingDestinations extends BaseConnector implements Destination { public enum TestDestinationType { LOGGING, THROTTLED, - SILENT + SILENT, + FAILING } public TestingDestinations() { @@ -35,6 +36,7 @@ public TestingDestinations() { .put(TestDestinationType.LOGGING, new LoggingDestination()) .put(TestDestinationType.THROTTLED, new ThrottledDestination()) .put(TestDestinationType.SILENT, new SilentDestination()) + .put(TestDestinationType.FAILING, new FailAfterNDestination()) .build()); } diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-e2e-test/src/main/resources/spec.json index dbcda43bc9b787..39df3bbafb295b 100644 --- a/airbyte-integrations/connectors/destination-e2e-test/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/resources/spec.json @@ -48,6 +48,22 @@ "type": "integer" } } + }, + { + "title": "Failing", + "required": ["type", "num_messages"], + "additionalProperties": false, + "properties": { + "type": { + "type": "string", + "const": "FAILING", + "default": "FAILING" + }, + "num_messages": { + "description": "Number of messages after which to fail.", + "type": "integer" + } + } } ] } diff --git a/airbyte-integrations/connectors/source-e2e-test/src/main/java/io/airbyte/integrations/source/e2e_test/InfiniteFeedSource.java b/airbyte-integrations/connectors/source-e2e-test/src/main/java/io/airbyte/integrations/source/e2e_test/InfiniteFeedSource.java index 717f3275bc0f63..f93f91c8fc522f 100644 --- a/airbyte-integrations/connectors/source-e2e-test/src/main/java/io/airbyte/integrations/source/e2e_test/InfiniteFeedSource.java +++ b/airbyte-integrations/connectors/source-e2e-test/src/main/java/io/airbyte/integrations/source/e2e_test/InfiniteFeedSource.java @@ -4,6 +4,8 @@ package io.airbyte.integrations.source.e2e_test; +import static java.lang.Thread.sleep; + import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.AbstractIterator; import com.google.common.collect.ImmutableMap; @@ -51,6 +53,8 @@ public AutoCloseableIterator read(final JsonNode config, final C final Predicate anotherRecordPredicate = config.has("max_records") ? recordNumber -> recordNumber < config.get("max_records").asLong() : recordNumber -> true; + final long sleepTime = config.has("message_interval") ? config.get("message_interval").asLong() : 3000L; + final AtomicLong i = new AtomicLong(); return AutoCloseableIterators.fromIterator(new AbstractIterator<>() { @@ -58,6 +62,14 @@ public AutoCloseableIterator read(final JsonNode config, final C @Override protected AirbyteMessage computeNext() { if (anotherRecordPredicate.test(i.get())) { + if (i.get() != 0) { + try { + LOGGER.info("sleeping for {} ms", sleepTime); + sleep(sleepTime); + } catch (final InterruptedException e) { + throw new RuntimeException(); + } + } i.incrementAndGet(); LOGGER.info("source emitting record {}:", i.get()); return new AirbyteMessage() diff --git a/airbyte-integrations/connectors/source-e2e-test/src/main/resources/spec.json b/airbyte-integrations/connectors/source-e2e-test/src/main/resources/spec.json index 1a42ba0d1cbc3e..88729274d7aa9f 100644 --- a/airbyte-integrations/connectors/source-e2e-test/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/source-e2e-test/src/main/resources/spec.json @@ -24,7 +24,7 @@ }, { "title": "Infinite Feed", - "required": ["type", "max_records"], + "required": ["type", "max_records", "message_interval"], "additionalProperties": false, "properties": { "type": { @@ -36,6 +36,11 @@ "title": "Max Records", "description": "Number of records to emit. If not set, defaults to infinity.", "type": "integer" + }, + "message_interval": { + "title": "Message Interval", + "description": "Interval between messages in ms.", + "type": "integer" } } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java index b9712be66b4e84..b41b70e9448b30 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java @@ -59,7 +59,7 @@ public static void gentleClose(final Process process, final long timeout, final } if (process.isAlive()) { - forceShutdown(process, Duration.of(1, ChronoUnit.MINUTES)); + closeProcess(process, Duration.of(1, ChronoUnit.MINUTES)); } } @@ -90,7 +90,7 @@ public static void gentleCloseWithHeartbeat(final Process process, gracefulShutdownDuration, checkHeartbeatDuration, forcedShutdownDuration, - WorkerUtils::forceShutdown); + WorkerUtils::closeProcess); } @VisibleForTesting @@ -134,28 +134,15 @@ static void gentleCloseWithHeartbeat(final Process process, } } - @VisibleForTesting - static void forceShutdown(final Process process, final Duration lastChanceDuration) { - LOGGER.warn("Process is taking too long to finish. Killing it"); - process.destroy(); - try { - process.waitFor(lastChanceDuration.toMillis(), TimeUnit.MILLISECONDS); - } catch (final InterruptedException e) { - LOGGER.error("Exception while while killing the process", e); - } - if (process.isAlive()) { - LOGGER.error("Couldn't kill the process. You might have a zombie process."); - } - } - - public static void closeProcess(final Process process, final int duration, final TimeUnit timeUnit) { + public static void closeProcess(final Process process, final Duration lastChanceDuration) { if (process == null) { return; } try { process.destroy(); - process.waitFor(duration, timeUnit); + process.waitFor(lastChanceDuration.toMillis(), TimeUnit.MILLISECONDS); if (process.isAlive()) { + LOGGER.warn("Process is still alive after calling destroy. Attempting to destroy forcibly..."); process.destroyForcibly(); } } catch (final InterruptedException e) { @@ -172,7 +159,7 @@ public static void wait(final Process process) { } public static void cancelProcess(final Process process) { - closeProcess(process, 10, TimeUnit.SECONDS); + closeProcess(process, Duration.of(10, ChronoUnit.SECONDS)); } /** diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java index f05bdb288b5057..306b52c186c5f2 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java @@ -107,7 +107,7 @@ public void close() throws Exception { } LOGGER.debug("Closing destination process"); - WorkerUtils.gentleClose(destinationProcess, 10, TimeUnit.HOURS); + WorkerUtils.gentleClose(destinationProcess, 1, TimeUnit.MINUTES); if (destinationProcess.isAlive() || destinationProcess.exitValue() != 0) { final String message = destinationProcess.isAlive() ? "Destination has not terminated " : "Destination process exit with code " + destinationProcess.exitValue(); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java index d24c567f1528ac..910b02a99c2b21 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java @@ -24,6 +24,7 @@ import java.time.temporal.ChronoUnit; import java.util.Iterator; import java.util.Optional; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,11 +33,7 @@ public class DefaultAirbyteSource implements AirbyteSource { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAirbyteSource.class); private static final Duration HEARTBEAT_FRESH_DURATION = Duration.of(5, ChronoUnit.MINUTES); - private static final Duration CHECK_HEARTBEAT_DURATION = Duration.of(10, ChronoUnit.SECONDS); - // todo (cgardens) - keep the graceful shutdown consistent with current behavior for release. make - // sure everything is working well before we reduce this to something more reasonable. - private static final Duration GRACEFUL_SHUTDOWN_DURATION = Duration.of(10, ChronoUnit.HOURS); - private static final Duration FORCED_SHUTDOWN_DURATION = Duration.of(1, ChronoUnit.MINUTES); + private static final Duration GRACEFUL_SHUTDOWN_DURATION = Duration.of(1, ChronoUnit.MINUTES); private static final MdcScope.Builder CONTAINER_LOG_MDC_BUILDER = new Builder() .setLogPrefix("source") @@ -110,12 +107,10 @@ public void close() throws Exception { } LOGGER.debug("Closing source process"); - WorkerUtils.gentleCloseWithHeartbeat( + WorkerUtils.gentleClose( sourceProcess, - heartbeatMonitor, - GRACEFUL_SHUTDOWN_DURATION, - CHECK_HEARTBEAT_DURATION, - FORCED_SHUTDOWN_DURATION); + GRACEFUL_SHUTDOWN_DURATION.toMillis(), + TimeUnit.MILLISECONDS); if (sourceProcess.isAlive() || sourceProcess.exitValue() != 0) { final String message = sourceProcess.isAlive() ? "Source has not terminated " : "Source process exit with code " + sourceProcess.exitValue();