Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidate timer threads in functions #3109

Merged
merged 4 commits into from Dec 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -393,6 +393,9 @@ public String getStatsAsString() throws IOException {

@Override
public void close() {
scheduledFuture.cancel(false);
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
scheduledFuture = null;
}
}
}
Expand Up @@ -18,17 +18,24 @@
*/
package org.apache.pulsar.functions.instance;

import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.Getter;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;

public class InstanceCache {

private static InstanceCache instance;

public final ScheduledExecutorService executor;
@Getter
private final ScheduledExecutorService scheduledExecutorService;

private InstanceCache() {
executor = Executors.newSingleThreadScheduledExecutor();
ThreadFactory namedThreadFactory =
new DefaultThreadFactory("function-timer-thread");
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(namedThreadFactory);
}

public static InstanceCache getInstanceCache() {
Expand All @@ -43,7 +50,7 @@ public static InstanceCache getInstanceCache() {
public static void shutdown() {
synchronized (InstanceCache.class) {
if (instance != null) {
instance.executor.shutdown();
instance.scheduledExecutorService.shutdown();
}
instance = null;
}
Expand Down
Expand Up @@ -216,7 +216,7 @@ public void run() {
if (this.collectorRegistry == null) {
this.collectorRegistry = new CollectorRegistry();
}
this.stats = new FunctionStatsManager(this.collectorRegistry, this.metricsLabels, this.instanceCache.executor);
this.stats = new FunctionStatsManager(this.collectorRegistry, this.metricsLabels, this.instanceCache.getScheduledExecutorService());

ContextImpl contextImpl = setupContext();
javaInstance = setupJavaInstance(contextImpl);
Expand Down
Expand Up @@ -34,6 +34,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceCache;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.InstanceCommunication;
Expand All @@ -49,6 +50,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -122,8 +124,8 @@ public class JavaInstanceMain implements AutoCloseable {
private RuntimeSpawner runtimeSpawner;
private ThreadRuntimeFactory containerFactory;
private Long lastHealthCheckTs = null;
private ScheduledExecutorService timer;
private HTTPServer metricsServer;
private ScheduledFuture healthCheckTimer;

public JavaInstanceMain() { }

Expand Down Expand Up @@ -215,18 +217,14 @@ public void run() {
metricsServer = new HTTPServer(new InetSocketAddress(metrics_port), collectorRegistry, true);

if (expectedHealthCheckInterval > 0) {
timer = Executors.newSingleThreadScheduledExecutor();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
if (System.currentTimeMillis() - lastHealthCheckTs > 3 * expectedHealthCheckInterval * 1000) {
log.info("Haven't received health check from spawner in a while. Stopping instance...");
close();
}
} catch (Exception e) {
log.error("Error occurred when checking for latest health check", e);
healthCheckTimer = InstanceCache.getInstanceCache().getScheduledExecutorService().scheduleAtFixedRate(() -> {
try {
if (System.currentTimeMillis() - lastHealthCheckTs > 3 * expectedHealthCheckInterval * 1000) {
log.info("Haven't received health check from spawner in a while. Stopping instance...");
close();
}
} catch (Exception e) {
log.error("Error occurred when checking for latest health check", e);
}
}, expectedHealthCheckInterval * 1000, expectedHealthCheckInterval * 1000, TimeUnit.MILLISECONDS);
}
Expand Down Expand Up @@ -260,15 +258,17 @@ public void close() {
if (runtimeSpawner != null) {
runtimeSpawner.close();
}
if (timer != null) {
timer.shutdown();
if (healthCheckTimer != null) {
healthCheckTimer.cancel(false);
}
if (containerFactory != null) {
containerFactory.close();
}
if (metricsServer != null) {
metricsServer.stop();
}

InstanceCache.shutdown();
} catch (Exception ex) {
System.err.println(ex);
}
Expand Down
Expand Up @@ -30,6 +30,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceCache;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.InstanceCommunication;
Expand All @@ -45,6 +46,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -64,7 +66,7 @@ class ProcessRuntime implements Runtime {
private Throwable deathException;
private ManagedChannel channel;
private InstanceControlGrpc.InstanceControlFutureStub stub;
private ScheduledExecutorService timer;
private ScheduledFuture timer;
private InstanceConfig instanceConfig;
private final Long expectedHealthCheckInterval;
private final SecretsProviderConfigurator secretsProviderConfigurator;
Expand Down Expand Up @@ -138,19 +140,14 @@ public void start() {
.build();
stub = InstanceControlGrpc.newFutureStub(channel);

timer = Executors.newSingleThreadScheduledExecutor();
timer.scheduleAtFixedRate(new TimerTask() {

@Override
public void run() {
CompletableFuture<InstanceCommunication.HealthCheckResult> result = healthCheck();
try {
result.get();
} catch (Exception e) {
log.error("Health check failed for {}-{}",
instanceConfig.getFunctionDetails().getName(),
instanceConfig.getInstanceId(), e);
}
timer = InstanceCache.getInstanceCache().getScheduledExecutorService().scheduleAtFixedRate(() -> {
CompletableFuture<InstanceCommunication.HealthCheckResult> result = healthCheck();
try {
result.get();
} catch (Exception e) {
log.error("Health check failed for {}-{}",
instanceConfig.getFunctionDetails().getName(),
instanceConfig.getInstanceId(), e);
}
}, expectedHealthCheckInterval, expectedHealthCheckInterval, TimeUnit.SECONDS);
}
Expand All @@ -164,7 +161,7 @@ public void join() throws Exception {
@Override
public void stop() {
if (timer != null) {
timer.shutdown();
timer.cancel(false);
}
if (process != null) {
process.destroyForcibly();
Expand Down
Expand Up @@ -27,12 +27,15 @@
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.instance.InstanceCache;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
Expand All @@ -50,7 +53,7 @@ public class RuntimeSpawner implements AutoCloseable {

@Getter
private Runtime runtime;
private Timer processLivenessCheckTimer;
private ScheduledFuture processLivenessCheckTimer;
private int numRestarts;
private long instanceLivenessCheckFreqMs;
private Throwable runtimeDeathException;
Expand Down Expand Up @@ -79,27 +82,23 @@ public void start() throws Exception {

// monitor function runtime to make sure it is running. If not, restart the function runtime
if (!runtimeFactory.externallyManaged() && instanceLivenessCheckFreqMs > 0) {
processLivenessCheckTimer = new Timer();
processLivenessCheckTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
Runtime runtime = RuntimeSpawner.this.runtime;
if (runtime != null && !runtime.isAlive()) {
log.error("{}/{}/{}-{} Function Container is dead with exception.. restarting", details.getTenant(),
details.getNamespace(), details.getName(), runtime.getDeathException());
// Just for the sake of sanity, just destroy the runtime
try {
runtime.stop();
runtimeDeathException = runtime.getDeathException();
runtime.start();
} catch (Exception e) {
log.error("{}/{}/{}-{} Function Restart failed", details.getTenant(),
details.getNamespace(), details.getName(), e, e);
}
numRestarts++;
processLivenessCheckTimer = InstanceCache.getInstanceCache().getScheduledExecutorService().scheduleAtFixedRate(() -> {
Runtime runtime = RuntimeSpawner.this.runtime;
if (runtime != null && !runtime.isAlive()) {
log.error("{}/{}/{}-{} Function Container is dead with exception.. restarting", details.getTenant(),
details.getNamespace(), details.getName(), runtime.getDeathException());
// Just for the sake of sanity, just destroy the runtime
try {
runtime.stop();
runtimeDeathException = runtime.getDeathException();
runtime.start();
} catch (Exception e) {
log.error("{}/{}/{}-{} Function Restart failed", details.getTenant(),
details.getNamespace(), details.getName(), e, e);
}
numRestarts++;
}
}, instanceLivenessCheckFreqMs, instanceLivenessCheckFreqMs);
}, instanceLivenessCheckFreqMs, instanceLivenessCheckFreqMs, TimeUnit.MILLISECONDS);
}
}

Expand Down Expand Up @@ -139,7 +138,7 @@ public CompletableFuture<String> getFunctionStatusAsJson(int instanceId) {
public void close() {
// cancel liveness checker before stopping runtime.
if (processLivenessCheckTimer != null) {
processLivenessCheckTimer.cancel();
processLivenessCheckTimer.cancel(false);
processLivenessCheckTimer = null;
}
if (null != runtime) {
Expand Down