Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor functions to use Sink interface #1708

Merged
merged 4 commits into from
May 2, 2018

Conversation

jerrypeng
Copy link
Contributor

No description provided.

@jerrypeng
Copy link
Contributor Author

@sijie @srkukarni please review.

CompletableFuture<Void> write(T value) throws Exception;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exception should be thrown through CompletableFuture<Void>, no?

It is a bit strange to throw exception for a method that returns CompletableFuture

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I will change

void close() throws Exception;
}

private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private static

Copy link
Contributor Author

@jerrypeng jerrypeng May 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this cannot be static. I am using the PulsarClient and PulsarSinkConfig from the parent class

}
}

private class PulsarSinkAtLeastOnceProcessor implements PulsarSinkProcessor {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private static

Copy link
Contributor Author

@jerrypeng jerrypeng May 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this cannot be static. I am using the PulsarClient and PulsarSinkConfig from the parent class


// currently on PulsarRecord
Producer producer = outputProducer.getProducer(pulsarRecord.getTopicName(),
Integer.parseInt(pulsarRecord.getPartitionId()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can change MultiConsumerOneOutputTopicProducers to use Map<String, Producer>, rather than converting string back to int


@Override
public CompletableFuture<Void> write(T value) throws Exception {
return null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw new UnsupportedException(..)?

}

@Override
public void close() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to close the producers here, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch will add

@@ -29,7 +32,9 @@
* <p>There is a default implementation provided for wrapping up the user provided {@link Sink}. Pulsar sink
* should be implemented using this interface to ensure supporting effective-once.
*/
public interface RuntimeSink<T> extends Sink<T> {
//public interface RuntimeSink<T> extends Sink<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the changes here are not needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup will remove

@merlimat merlimat added this to the 2.1.0-incubating milestone May 2, 2018
@sijie sijie merged commit c3c8013 into apache:master May 2, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants