Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pip][design] PIP-281: Add notifyError method on PushSource #20807

Merged
merged 6 commits into from
Aug 15, 2023
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions pip/pip-281.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Title: [io] Add notifyError method on PushSource

## Motivation

In function framework, when [source.read()](https://github.com/apache/pulsar/blob/f7c0b3c49c9ad8c28d0b00aa30d727850eb8bc04/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L496-L506) method throw exception, it will trigger close function instance. If it is in the k8s environment, it will be restarted
shibd marked this conversation as resolved.
Show resolved Hide resolved

On io source connector, we provide [PushSource](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java) class for users to use, and users can extend this class to quickly implement the push message model.
shibd marked this conversation as resolved.
Show resolved Hide resolved
It overrides the `read` method and provides the `consume` method for the user to call.

However, if the source connector that extends from the class,
shibd marked this conversation as resolved.
Show resolved Hide resolved
it cannot notify the function framework if it encounters an exception while consuming data internally,
in other words, the function call `source.read()` never triggers an exception and never exits the process.


## Goals

Add `notifyError` method on PushSource, This method can receive an exception and put the exception in the queue. The next time an exception is `read`, will throws it.
shibd marked this conversation as resolved.
Show resolved Hide resolved
```java

public Record<T> read() throws Exception {
Record<T> record = queue.take();
if (record instanceof ErrorNotifierRecord) {
throw ((ErrorNotifierRecord) record).getException();
}
return record;
}


/**
* Allows the source to notify errors asynchronously.
* @param ex
*/
public void notifyError(Exception ex) {
consume(new ErrorNotifierRecord(ex));
}
}
```

Just like the implementation of the current [BatchPushSource](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java)


BTW: This is a very simple change and is forward compatible. Sorry, I didn't notice that this change requires PIP before, so the related PRs have been merged.
- https://github.com/apache/pulsar/pull/20791

If this PIP vote does not pass, I revert this PR after that.
shibd marked this conversation as resolved.
Show resolved Hide resolved


### Compatibility

This PIP is to provide a method for users to use, not an new interface.
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
shibd marked this conversation as resolved.
Show resolved Hide resolved

- So it is forward compatible
- However, connector using this method are not backward compatible.
For example, the Kafka source connector compiles with version 3.1(include this pip) Pulsar dependencies and uses the `notifyError` method, and if it switches back to version 3.0(exclude this pip) Pulsar compilation, it will encounter compilation errors.
shibd marked this conversation as resolved.
Show resolved Hide resolved

### In Scope

Use this method, Like all current source connectors that extends the PushSource, process exit can be implemented. Such as:
shibd marked this conversation as resolved.
Show resolved Hide resolved
- [KafkaSourceConnector](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java)
- [CanalSourceConnector](https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java#L43)
- [MongoSourceConnector](https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java#L59)
- etc.

### Out of Scope
None

## Design & Implementation Details

- Abstract BatchPushSource logic to AbstractPushSource.
- Let PushSource to extends AbstractPushSource to extend a new method(notifyError).

Please refer this PR: https://github.com/apache/pulsar/pull/20791

## Note
None


## Concrete Example

### BEFORE
- Not possible

### AFTER

```java
public class PushSourceTest {

PushSource testBatchSource = new PushSource() {
@Override
public void open(Map config, SourceContext context) throws Exception {

}

@Override
public void close() throws Exception {

}
};

@Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "test exception")
public void testNotifyErrors() throws Exception {
testBatchSource.notifyError(new RuntimeException("test exception"));
testBatchSource.readNext();
}
}
```

## Links
None