Skip to content

Commit

Permalink
Stop using gentle close with heartbeat (#8036)
Browse files Browse the repository at this point in the history
* use gentle close instead of gentle close with heartbeat when closing source

* also lower destination process gentle close duration

* add test connectors
  • Loading branch information
lmossman committed Nov 17, 2021
1 parent 747597a commit 51866fd
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 32 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package io.airbyte.integrations.destination.e2e_test;

import static java.lang.Thread.sleep;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FailAfterNDestination extends BaseConnector implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(FailAfterNDestination.class);

@Override
public AirbyteConnectionStatus check(final JsonNode config) {
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
}

@Override
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
return new FailAfterNConsumer(config.get("num_messages").asLong(), outputRecordCollector);
}

public static class FailAfterNConsumer implements AirbyteMessageConsumer {

private final Consumer<AirbyteMessage> outputRecordCollector;
private final long numMessagesAfterWhichToFail;
private long numMessagesSoFar;

public FailAfterNConsumer(final long numMessagesAfterWhichToFail, final Consumer<AirbyteMessage> outputRecordCollector) {
this.numMessagesAfterWhichToFail = numMessagesAfterWhichToFail;
this.outputRecordCollector = outputRecordCollector;
this.numMessagesSoFar = 0;
}

@Override
public void start() {}

@Override
public void accept(final AirbyteMessage message) throws Exception {
LOGGER.info("received record: {}", message);
numMessagesSoFar += 1;
LOGGER.info("received {} messages so far", numMessagesSoFar);

if (numMessagesSoFar > numMessagesAfterWhichToFail) {
throw new IllegalStateException("Forcing a fail after processing " + numMessagesAfterWhichToFail + " messages.");
}

if (message.getType() == Type.STATE) {
LOGGER.info("emitting state: {}", message);
outputRecordCollector.accept(message);
}
}

@Override
public void close() {}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ public class TestingDestinations extends BaseConnector implements Destination {
public enum TestDestinationType {
LOGGING,
THROTTLED,
SILENT
SILENT,
FAILING
}

public TestingDestinations() {
this(ImmutableMap.<TestDestinationType, Destination>builder()
.put(TestDestinationType.LOGGING, new LoggingDestination())
.put(TestDestinationType.THROTTLED, new ThrottledDestination())
.put(TestDestinationType.SILENT, new SilentDestination())
.put(TestDestinationType.FAILING, new FailAfterNDestination())
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,22 @@
"type": "integer"
}
}
},
{
"title": "Failing",
"required": ["type", "num_messages"],
"additionalProperties": false,
"properties": {
"type": {
"type": "string",
"const": "FAILING",
"default": "FAILING"
},
"num_messages": {
"description": "Number of messages after which to fail.",
"type": "integer"
}
}
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.source.e2e_test;

import static java.lang.Thread.sleep;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -51,13 +53,23 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config, final C
final Predicate<Long> anotherRecordPredicate =
config.has("max_records") ? recordNumber -> recordNumber < config.get("max_records").asLong() : recordNumber -> true;

final long sleepTime = config.has("message_interval") ? config.get("message_interval").asLong() : 3000L;

final AtomicLong i = new AtomicLong();

return AutoCloseableIterators.fromIterator(new AbstractIterator<>() {

@Override
protected AirbyteMessage computeNext() {
if (anotherRecordPredicate.test(i.get())) {
if (i.get() != 0) {
try {
LOGGER.info("sleeping for {} ms", sleepTime);
sleep(sleepTime);
} catch (final InterruptedException e) {
throw new RuntimeException();
}
}
i.incrementAndGet();
LOGGER.info("source emitting record {}:", i.get());
return new AirbyteMessage()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
},
{
"title": "Infinite Feed",
"required": ["type", "max_records"],
"required": ["type", "max_records", "message_interval"],
"additionalProperties": false,
"properties": {
"type": {
Expand All @@ -36,6 +36,11 @@
"title": "Max Records",
"description": "Number of records to emit. If not set, defaults to infinity.",
"type": "integer"
},
"message_interval": {
"title": "Message Interval",
"description": "Interval between messages in ms.",
"type": "integer"
}
}
}
Expand Down
25 changes: 6 additions & 19 deletions airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public static void gentleClose(final Process process, final long timeout, final
}

if (process.isAlive()) {
forceShutdown(process, Duration.of(1, ChronoUnit.MINUTES));
closeProcess(process, Duration.of(1, ChronoUnit.MINUTES));
}
}

Expand Down Expand Up @@ -90,7 +90,7 @@ public static void gentleCloseWithHeartbeat(final Process process,
gracefulShutdownDuration,
checkHeartbeatDuration,
forcedShutdownDuration,
WorkerUtils::forceShutdown);
WorkerUtils::closeProcess);
}

@VisibleForTesting
Expand Down Expand Up @@ -134,28 +134,15 @@ static void gentleCloseWithHeartbeat(final Process process,
}
}

@VisibleForTesting
static void forceShutdown(final Process process, final Duration lastChanceDuration) {
LOGGER.warn("Process is taking too long to finish. Killing it");
process.destroy();
try {
process.waitFor(lastChanceDuration.toMillis(), TimeUnit.MILLISECONDS);
} catch (final InterruptedException e) {
LOGGER.error("Exception while while killing the process", e);
}
if (process.isAlive()) {
LOGGER.error("Couldn't kill the process. You might have a zombie process.");
}
}

public static void closeProcess(final Process process, final int duration, final TimeUnit timeUnit) {
public static void closeProcess(final Process process, final Duration lastChanceDuration) {
if (process == null) {
return;
}
try {
process.destroy();
process.waitFor(duration, timeUnit);
process.waitFor(lastChanceDuration.toMillis(), TimeUnit.MILLISECONDS);
if (process.isAlive()) {
LOGGER.warn("Process is still alive after calling destroy. Attempting to destroy forcibly...");
process.destroyForcibly();
}
} catch (final InterruptedException e) {
Expand All @@ -172,7 +159,7 @@ public static void wait(final Process process) {
}

public static void cancelProcess(final Process process) {
closeProcess(process, 10, TimeUnit.SECONDS);
closeProcess(process, Duration.of(10, ChronoUnit.SECONDS));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void close() throws Exception {
}

LOGGER.debug("Closing destination process");
WorkerUtils.gentleClose(destinationProcess, 10, TimeUnit.HOURS);
WorkerUtils.gentleClose(destinationProcess, 1, TimeUnit.MINUTES);
if (destinationProcess.isAlive() || destinationProcess.exitValue() != 0) {
final String message =
destinationProcess.isAlive() ? "Destination has not terminated " : "Destination process exit with code " + destinationProcess.exitValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.time.temporal.ChronoUnit;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -32,11 +33,7 @@ public class DefaultAirbyteSource implements AirbyteSource {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAirbyteSource.class);

private static final Duration HEARTBEAT_FRESH_DURATION = Duration.of(5, ChronoUnit.MINUTES);
private static final Duration CHECK_HEARTBEAT_DURATION = Duration.of(10, ChronoUnit.SECONDS);
// todo (cgardens) - keep the graceful shutdown consistent with current behavior for release. make
// sure everything is working well before we reduce this to something more reasonable.
private static final Duration GRACEFUL_SHUTDOWN_DURATION = Duration.of(10, ChronoUnit.HOURS);
private static final Duration FORCED_SHUTDOWN_DURATION = Duration.of(1, ChronoUnit.MINUTES);
private static final Duration GRACEFUL_SHUTDOWN_DURATION = Duration.of(1, ChronoUnit.MINUTES);

private static final MdcScope.Builder CONTAINER_LOG_MDC_BUILDER = new Builder()
.setLogPrefix("source")
Expand Down Expand Up @@ -110,12 +107,10 @@ public void close() throws Exception {
}

LOGGER.debug("Closing source process");
WorkerUtils.gentleCloseWithHeartbeat(
WorkerUtils.gentleClose(
sourceProcess,
heartbeatMonitor,
GRACEFUL_SHUTDOWN_DURATION,
CHECK_HEARTBEAT_DURATION,
FORCED_SHUTDOWN_DURATION);
GRACEFUL_SHUTDOWN_DURATION.toMillis(),
TimeUnit.MILLISECONDS);

if (sourceProcess.isAlive() || sourceProcess.exitValue() != 0) {
final String message = sourceProcess.isAlive() ? "Source has not terminated " : "Source process exit with code " + sourceProcess.exitValue();
Expand Down

0 comments on commit 51866fd

Please sign in to comment.