Skip to content

Commit

Permalink
[improve][io] PIP-297: Support terminating Function & Connector with …
Browse files Browse the repository at this point in the history
…the fatal exception (#21143)

PIP: #21079 

### Motivation

Currently, the connector and function cannot terminate the function instance if there are fatal exceptions thrown
outside the function instance thread. The current implementation of the connector and Pulsar Function exception handler
cannot handle the fatal exceptions that are thrown outside the function instance thread.

For example, suppose we have a sink connector that uses its own threads to batch-sink the data to an external system. If
any fatal exceptions occur in those threads, the function instance thread will not be aware of them and will
not be able to terminate the connector. This will cause the connector to hang indefinitely. There is a related issue
here: #9464

The same problem exists for the source connector. The source connector may also use a separate thread to fetch data from
an external system. If any fatal exceptions happen in that thread, the connector will also hang forever. This issue has
been observed for the Kafka source connector: #9464. We have fixed it by adding
the notifyError method to the `PushSource` class in PIP-281: #20807. However, this
does not solve the same problem that all source connectors face because not all connectors are implemented based on
the `PushSource` class.

The problem is same for the Pulsar Function. Currently, the function can't throw fatal exceptions to the function
framework. We need to provide a way for the function developer to implement it.

We need a way for the connector and function developers to throw fatal exceptions outside the function instance
thread. The function framework should catch these exceptions and terminate the function accordingly.

### Modifications

 Introduce a new method `fatal` to the context. All the connector implementation code and the function code 
 can use this context and call the `fatal` method to terminate the instance while raising a fatal exception. 
  
 After the connector or function raises the fatal exception, the function instance thread will be interrupted. 
 The function framework then could catch the exception, log it, and then terminate the function instance.
  • Loading branch information
RobertIndie committed Sep 14, 2023
1 parent 39f2d1d commit 1ac19fc
Show file tree
Hide file tree
Showing 7 changed files with 262 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,4 +217,10 @@ default ClientBuilder getPulsarClientBuilder() {
throw new UnsupportedOperationException("not implemented");
}

/**
* Terminate the function instance with a fatal exception.
*
* @param t the fatal exception to be raised
*/
void fatal(Throwable t);
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,19 +137,22 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable

private final Function.FunctionDetails.ComponentType componentType;

private final java.util.function.Consumer<Throwable> fatalHandler;

public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
SecretsProvider secretsProvider, FunctionCollectorRegistry collectorRegistry,
String[] metricsLabels,
Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager,
StateManager stateManager, PulsarAdmin pulsarAdmin, ClientBuilder clientBuilder)
throws PulsarClientException {
StateManager stateManager, PulsarAdmin pulsarAdmin, ClientBuilder clientBuilder,
java.util.function.Consumer<Throwable> fatalHandler) {
this.config = config;
this.logger = logger;
this.clientBuilder = clientBuilder;
this.client = client;
this.pulsarAdmin = pulsarAdmin;
this.topicSchema = new TopicSchema(client, Thread.currentThread().getContextClassLoader());
this.statsManager = statsManager;
this.fatalHandler = fatalHandler;

this.producerBuilder = (ProducerBuilderImpl<?>) client.newProducer().blockIfQueueFull(true).enableBatching(true)
.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -534,6 +537,11 @@ public ClientBuilder getPulsarClientBuilder() {
return clientBuilder;
}

@Override
public void fatal(Throwable t) {
fatalHandler.accept(t);
}

private <T> Producer<T> getProducer(String topicName, Schema<T> schema) throws PulsarClientException {
Producer<T> producer;
if (tlPublishProducers != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {

private JavaInstance javaInstance;
@Getter
private Throwable deathException;
private volatile Throwable deathException;

// function stats
private ComponentStatsManager stats;
Expand Down Expand Up @@ -282,9 +282,14 @@ private synchronized void setup() throws Exception {
ContextImpl setupContext() throws PulsarClientException {
Logger instanceLog = LoggerFactory.getILoggerFactory().getLogger(
"function-" + instanceConfig.getFunctionDetails().getName());
Thread currentThread = Thread.currentThread();
Consumer<Throwable> fatalHandler = throwable -> {
this.deathException = throwable;
currentThread.interrupt();
};
return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider,
collectorRegistry, metricsLabels, this.componentType, this.stats, stateManager,
pulsarAdmin, clientBuilder);
pulsarAdmin, clientBuilder, fatalHandler);
}

public interface AsyncResultConsumer {
Expand Down Expand Up @@ -340,16 +345,35 @@ public void run() {
// process the synchronous results
handleResult(currentRecord, result);
}

if (deathException != null) {
// Ideally the current java instance thread will be interrupted when the deathException is set.
// But if the CompletableFuture returned by the Pulsar Function is completed exceptionally(the
// function has invoked the fatal method) before being put into the JavaInstance
// .pendingAsyncRequests, the interrupted exception may be thrown when putting this future to
// JavaInstance.pendingAsyncRequests. The interrupted exception would be caught by the JavaInstance
// and be skipped.
// Therefore, we need to handle this case by checking the deathException here and rethrow it.
throw deathException;
}
}
} catch (Throwable t) {
log.error("[{}] Uncaught exception in Java Instance", FunctionCommon.getFullyQualifiedInstanceId(
instanceConfig.getFunctionDetails().getTenant(),
instanceConfig.getFunctionDetails().getNamespace(),
instanceConfig.getFunctionDetails().getName(),
instanceConfig.getInstanceId()), t);
deathException = t;
if (deathException != null) {
log.error("[{}] Fatal exception occurred in the instance", FunctionCommon.getFullyQualifiedInstanceId(
instanceConfig.getFunctionDetails().getTenant(),
instanceConfig.getFunctionDetails().getNamespace(),
instanceConfig.getFunctionDetails().getName(),
instanceConfig.getInstanceId()), deathException);
} else {
log.error("[{}] Uncaught exception in Java Instance", FunctionCommon.getFullyQualifiedInstanceId(
instanceConfig.getFunctionDetails().getTenant(),
instanceConfig.getFunctionDetails().getNamespace(),
instanceConfig.getFunctionDetails().getName(),
instanceConfig.getInstanceId()), t);
deathException = t;
}
if (stats != null) {
stats.incrSysExceptions(t);
stats.incrSysExceptions(deathException);
}
} finally {
log.info("Closing instance");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
Expand Down Expand Up @@ -120,7 +121,7 @@ public void setup() throws PulsarClientException {
client,
new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(),
pulsarAdmin, clientBuilder);
pulsarAdmin, clientBuilder, t -> {});
context.setCurrentMessageContext((Record<String>) () -> null);
}

Expand Down Expand Up @@ -234,7 +235,7 @@ public void testGetPulsarAdminWithExposePulsarAdminDisabled() throws PulsarClien
new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(),
new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(),
pulsarAdmin, clientBuilder);
pulsarAdmin, clientBuilder, t -> {});
context.getPulsarAdmin();
}

Expand All @@ -248,7 +249,7 @@ public void testUnsupportedExtendedSinkContext() throws PulsarClientException {
new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(),
new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(),
pulsarAdmin, clientBuilder);
pulsarAdmin, clientBuilder, t -> {});
try {
context.seek("z", 0, Mockito.mock(MessageId.class));
Assert.fail("Expected exception");
Expand Down Expand Up @@ -279,7 +280,7 @@ public void testExtendedSinkContext() throws PulsarClientException {
new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(),
new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(),
pulsarAdmin, clientBuilder);
pulsarAdmin, clientBuilder, t -> {});
Consumer<?> mockConsumer = Mockito.mock(Consumer.class);
when(mockConsumer.getTopic()).thenReturn(TopicName.get("z").toString());
context.setInputConsumers(Lists.newArrayList(mockConsumer));
Expand Down Expand Up @@ -311,7 +312,7 @@ public void testGetConsumer() throws PulsarClientException {
new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(),
new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(),
pulsarAdmin, clientBuilder);
pulsarAdmin, clientBuilder, t -> {});
Consumer<?> mockConsumer = Mockito.mock(Consumer.class);
when(mockConsumer.getTopic()).thenReturn(TopicName.get("z").toString());
context.setInputConsumers(Lists.newArrayList(mockConsumer));
Expand All @@ -335,7 +336,7 @@ public void testGetConsumerMultiTopic() throws PulsarClientException {
new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(),
new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(),
pulsarAdmin, clientBuilder);
pulsarAdmin, clientBuilder, t -> {});
ConsumerImpl<?> consumer1 = Mockito.mock(ConsumerImpl.class);
when(consumer1.getTopic()).thenReturn(TopicName.get("first").toString());
ConsumerImpl<?> consumer2 = Mockito.mock(ConsumerImpl.class);
Expand Down Expand Up @@ -438,4 +439,23 @@ public Map<String, String> getProperties() {
assertEquals(record.getProperties().get("prop-key"), "prop-value");
assertNull(record.getValue());
}

@Test
public void testFatal() {
Throwable fatalException = new Exception("test-fatal-exception");
AtomicBoolean fatalInvoked = new AtomicBoolean(false);
context = new ContextImpl(
config,
logger,
client,
new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(),
new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(),
pulsarAdmin, clientBuilder, t -> {
assertEquals(t, fatalException);
fatalInvoked.set(true);
});
context.fatal(fatalException);
assertTrue(fatalInvoked.get());
}
}
Loading

0 comments on commit 1ac19fc

Please sign in to comment.