Skip to content

Commit

Permalink
Replace Timeouts for Sources Take 2 (#3077) (#3093)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens committed Apr 27, 2021
1 parent 721802d commit 864d378
Show file tree
Hide file tree
Showing 7 changed files with 380 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ public StandardSyncOutput run(StandardSyncInput syncInput, Path jobRoot) throws
final StandardTargetConfig targetConfig = WorkerUtils.syncToTargetConfig(syncInput);
targetConfig.setCatalog(mapper.mapCatalog(targetConfig.getCatalog()));

// note: resources are closed in the opposite order in which they are declared. thus source will be
// closed first (which is what we want).
try (destination; source) {
destination.start(targetConfig, jobRoot);
source.start(tapConfig, jobRoot);
Expand Down
83 changes: 77 additions & 6 deletions airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package io.airbyte.workers;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.config.StandardTapConfig;
import io.airbyte.config.StandardTargetConfig;
Expand All @@ -32,8 +33,12 @@
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
import io.airbyte.workers.process.IntegrationLauncher;
import io.airbyte.workers.process.ProcessBuilderFactory;
import io.airbyte.workers.protocols.airbyte.HeartbeatMonitor;
import java.nio.file.Path;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
Expand All @@ -52,18 +57,84 @@ public static void gentleClose(final Process process, final long timeout, final
} catch (InterruptedException e) {
LOGGER.error("Exception while while waiting for process to finish", e);
}

if (process.isAlive()) {
LOGGER.warn("Process is taking too long to finish. Killing it");
process.destroy();
forceShutdown(process, Duration.of(1, ChronoUnit.MINUTES));
}
}

/**
* As long as the the heartbeatMonitor detects a heartbeat, the process will be allowed to continue.
* This method checks the heartbeat once every minute. Once there is no heartbeat detected, if the
* process has ended, then the method returns. If the process is still running it is given a grace
* period of the timeout arguments passed into the method. Once those expire the process is killed
* forcibly. If the process cannot be killed, this method will log that this is the case, but then
* returns.
*
* @param process - process to monitor.
* @param heartbeatMonitor - tracks if the heart is still beating for the given process.
* @param checkHeartbeatDuration - grace period to give the process to die after its heart stops
* beating.
* @param checkHeartbeatDuration - frequency with which the heartbeat of the process is checked.
* @param forcedShutdownDuration - amount of time to wait if a process needs to be destroyed
* forcibly.
*/
public static void gentleCloseWithHeartbeat(final Process process,
final HeartbeatMonitor heartbeatMonitor,
Duration gracefulShutdownDuration,
Duration checkHeartbeatDuration,
Duration forcedShutdownDuration) {
gentleCloseWithHeartbeat(
process,
heartbeatMonitor,
gracefulShutdownDuration,
checkHeartbeatDuration,
forcedShutdownDuration,
WorkerUtils::forceShutdown);
}

@VisibleForTesting
static void gentleCloseWithHeartbeat(final Process process,
final HeartbeatMonitor heartbeatMonitor,
final Duration gracefulShutdownDuration,
final Duration checkHeartbeatDuration,
final Duration forcedShutdownDuration,
final BiConsumer<Process, Duration> forceShutdown) {

while (process.isAlive() && heartbeatMonitor.isBeating()) {
try {
process.waitFor(1, TimeUnit.MINUTES);
process.waitFor(checkHeartbeatDuration.toMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOGGER.error("Exception while while killing the process", e);
LOGGER.error("Exception while waiting for process to finish", e);
}
if (process.isAlive()) {
LOGGER.warn("Couldn't kill the process. You might have a zombie ({})", process.info().commandLine());
}

if (process.isAlive()) {
try {
process.waitFor(gracefulShutdownDuration.toMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOGGER.error("Exception during grace period for process to finish", e);
}
}

// if we were unable to exist gracefully, force shutdown...
if (process.isAlive()) {
forceShutdown.accept(process, forcedShutdownDuration);
}
}

@VisibleForTesting
static void forceShutdown(Process process, Duration lastChanceDuration) {
LOGGER.warn("Process is taking too long to finish. Killing it");
process.destroy();
try {
process.waitFor(lastChanceDuration.toMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOGGER.error("Exception while while killing the process", e);
}
if (process.isAlive()) {
LOGGER.error("Couldn't kill the process. You might have a zombie ({})", process.info().commandLine());
}
}

public static void closeProcess(Process process) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,31 +37,42 @@
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.process.IntegrationLauncher;
import java.nio.file.Path;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultAirbyteSource implements AirbyteSource {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAirbyteSource.class);

private static final Duration HEARTBEAT_FRESH_DURATION = Duration.of(5, ChronoUnit.MINUTES);
private static final Duration CHECK_HEARTBEAT_DURATION = Duration.of(10, ChronoUnit.SECONDS);
// todo (cgardens) - keep the graceful shutdown consistent with current behavior for release. make
// sure everything is working well before we reduce this to something more reasonable.
private static final Duration GRACEFUL_SHUTDOWN_DURATION = Duration.of(10, ChronoUnit.HOURS);
private static final Duration FORCED_SHUTDOWN_DURATION = Duration.of(1, ChronoUnit.MINUTES);

private final IntegrationLauncher integrationLauncher;
private final AirbyteStreamFactory streamFactory;
private final HeartbeatMonitor heartbeatMonitor;

private Process tapProcess = null;
private Iterator<AirbyteMessage> messageIterator = null;

public DefaultAirbyteSource(final IntegrationLauncher integrationLauncher) {
this(integrationLauncher, new DefaultAirbyteStreamFactory());
this(integrationLauncher, new DefaultAirbyteStreamFactory(), new HeartbeatMonitor(HEARTBEAT_FRESH_DURATION));
}

@VisibleForTesting
DefaultAirbyteSource(final IntegrationLauncher integrationLauncher,
final AirbyteStreamFactory streamFactory) {
final AirbyteStreamFactory streamFactory,
final HeartbeatMonitor heartbeatMonitor) {
this.integrationLauncher = integrationLauncher;
this.streamFactory = streamFactory;
this.heartbeatMonitor = heartbeatMonitor;
}

@Override
Expand All @@ -82,6 +93,7 @@ public void start(StandardTapConfig input, Path jobRoot) throws Exception {
LineGobbler.gobble(tapProcess.getErrorStream(), LOGGER::error);

messageIterator = streamFactory.create(IOs.newBufferedReader(tapProcess.getInputStream()))
.peek(message -> heartbeatMonitor.beat())
.filter(message -> message.getType() == Type.RECORD || message.getType() == Type.STATE)
.iterator();
}
Expand All @@ -107,7 +119,13 @@ public void close() throws Exception {
}

LOGGER.debug("Closing tap process");
WorkerUtils.gentleClose(tapProcess, 10, TimeUnit.HOURS);
WorkerUtils.gentleCloseWithHeartbeat(
tapProcess,
heartbeatMonitor,
GRACEFUL_SHUTDOWN_DURATION,
CHECK_HEARTBEAT_DURATION,
FORCED_SHUTDOWN_DURATION);

if (tapProcess.isAlive() || tapProcess.exitValue() != 0) {
throw new WorkerException("Tap process wasn't successful");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.workers.protocols.airbyte;

import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

public class HeartbeatMonitor {

private final Duration heartBeatFreshDuration;
private final Supplier<Instant> nowSupplier;
private final AtomicReference<Instant> lastBeat;

public HeartbeatMonitor(Duration heartBeatFreshDuration) {
this(heartBeatFreshDuration, Instant::now);
}

@VisibleForTesting
public HeartbeatMonitor(Duration heartBeatFreshDuration, Supplier<Instant> nowSupplier) {
this.heartBeatFreshDuration = heartBeatFreshDuration;
this.nowSupplier = nowSupplier;
this.lastBeat = new AtomicReference<>(null);
}

public void beat() {
lastBeat.set(nowSupplier.get());
}

public boolean isBeating() {
final Instant instantFetched = lastBeat.get();
final Instant now = nowSupplier.get();
return instantFetched != null && instantFetched.plus(heartBeatFreshDuration).isAfter(now);
}

}
131 changes: 131 additions & 0 deletions airbyte-workers/src/test/java/io/airbyte/workers/WorkerUtilsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.workers;

import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;

import io.airbyte.workers.protocols.airbyte.HeartbeatMonitor;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

class WorkerUtilsTest {

@Nested
class GentleCloseWithHeartbeat {

private final Duration CHECK_HEARTBEAT_DURATION = Duration.of(10, ChronoUnit.MILLIS);

private final Duration SHUTDOWN_TIME_DURATION = Duration.of(100, ChronoUnit.MILLIS);

private Process process;
private HeartbeatMonitor heartbeatMonitor;
private BiConsumer<Process, Duration> forceShutdown;

@SuppressWarnings("unchecked")
@BeforeEach
void setup() {
process = mock(Process.class);
heartbeatMonitor = mock(HeartbeatMonitor.class);
forceShutdown = mock(BiConsumer.class);
}

private void runShutdown() {
WorkerUtils.gentleCloseWithHeartbeat(
process,
heartbeatMonitor,
SHUTDOWN_TIME_DURATION,
CHECK_HEARTBEAT_DURATION,
SHUTDOWN_TIME_DURATION,
forceShutdown);
}

@SuppressWarnings("BusyWait")
@DisplayName("Verify that shutdown waits indefinitely when heartbeat and process are healthy.")
@Test
void testStartsWait() throws InterruptedException {
when(process.isAlive()).thenReturn(true);
final AtomicInteger recordedBeats = new AtomicInteger(0);
doAnswer((ignored) -> {
recordedBeats.incrementAndGet();
return true;
}).when(heartbeatMonitor).isBeating();

final Thread thread = new Thread(this::runShutdown);

thread.start();

// block until the loop is running.
while (recordedBeats.get() < 3) {
Thread.sleep(10);
}

thread.stop();
}

@Test
@DisplayName("Test heartbeat ends and graceful shutdown.")
void testGracefulShutdown() {
when(heartbeatMonitor.isBeating()).thenReturn(false);
when(process.isAlive()).thenReturn(false);

runShutdown();

verifyNoInteractions(forceShutdown);
}

@Test
@DisplayName("Test heartbeat ends and shutdown is forced.")
void testForcedShutdown() {
when(heartbeatMonitor.isBeating()).thenReturn(false);
when(process.isAlive()).thenReturn(true);

runShutdown();

verify(forceShutdown).accept(process, SHUTDOWN_TIME_DURATION);
}

@Test
@DisplayName("Test process dies.")
void testProcessDies() {
when(heartbeatMonitor.isBeating()).thenReturn(true);
when(process.isAlive()).thenReturn(false);
runShutdown();

verifyNoInteractions(forceShutdown);
}

}

}
Loading

0 comments on commit 864d378

Please sign in to comment.