-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[pulsar-broker] PIP-100 Support pluggable topic factory #12235
Conversation
e3d3d2e
to
649af8f
Compare
Thanks for your contribution. For this PR, do we need to update docs? (The PR template contains info about doc, which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks) |
ping |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left one comment on the API, can you please take a look ?
Generally I am not a fan of this kind of features because we allow users to interact with the internals of Pulsar and tweak the behaviour using the internal APIs that are not expected to be "maintained", I mean, that we can change from version to version (even point releases).
We are already full of this injection points, so I am not going to block this.
My "request changes" is about my comment to add a "close" method to the API
*/ | ||
public interface TopicFactory { | ||
|
||
<T extends Topic> T create(String topic, ManagedLedger ledger, BrokerService brokerService, Class<T> topicClazz); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we add a close
method ?
this will allow cleanup operations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@eolivelli addressed it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extended Closeable
interface
566d2ad
to
46c5ecf
Compare
@eolivelli addressed the comment. can you PTAL and unblock the PR. |
ping. @eolivelli can you please unblock the PR if your concern is addressed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @rdhabalia
Sorry for late reply
I read again the patch and I found that the new close method is not called, and I also added a couple of important comments in my opinion.
|
||
@Override | ||
public void close() throws IOException { | ||
// No-op |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please verify the number of calls to this method
return (T) newTopic; | ||
} | ||
} catch (Throwable e) { | ||
log.warn("Failed to create persistent topic using factory {}, {}", topic, e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we should fail otherwise the system will be using the default implementation, leading to unpredictable behaviour
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually handling the exception and falling back to default implementation strategy was intentional to avoid unavailability of topic. however, I have updated it to throw exception for now so, we can merge this PR as it's open for almost 6 months now.
@@ -327,6 +328,7 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws | |||
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-backlog-quota-checker")); | |||
this.authenticationService = new AuthenticationService(pulsar.getConfiguration()); | |||
this.blockedDispatchers = new ConcurrentOpenHashSet<>(); | |||
this.topicFactory = createPersistentTopicFactory(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are not shutting down (calling 'close') the new instance
try { | ||
return (TopicFactory) Class.forName(topicFactoryClassName).newInstance(); | ||
} catch (Exception e) { | ||
log.warn("Failed to initialize topic factory class {}", topicFactoryClassName, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should fail, otherwise the system will use the default configuration, leading to unexpected behaviour
c905b37
to
7432bfb
Compare
@eolivelli addressed the comments. please unblock when get a chance. |
@eolivelli ping |
ping |
add Closeable fix test close factory and handle exception
Hi @rdhabalia, just wanted to check in on the docs because it didn't attach any doc label. Does it require to add or update any docs to help users better understand it? Thanks. |
Since the content gap still exists, I attached the |
@momo-jun and @rdhabalia Can we please get some documentation and clarification on how this feature can be used for Migration of topics and migration of topics across clusters? |
@chiragbparikh, thanks for asking. |
Motivation
Users select the topic type based on the application use case and requirement of topic behavior. However, in some circumstances, users need some additional behavior on top of the existing implementation and even would like to inject custom workflow in existing topic behavior. Such special circumstances are mostly needed when users would like to do smooth migrations of topics or pulsar clusters without impacting producer and consumer applications. In such scenarios, users can override publish or dispatch behavior of the topic and plug in the additional workflow. For example: perform dual write on multiple topics while migration or, skip messages published from the specific source without explicit publish failures, ignore specific subscription source without generating a client-side error, or without impacting client applications. This feature will be useful for any kind of migration where the pulsar cluster is serving live topics and require custom topic level treatment for flawless server-side migration and without impacting client application especially legacy applications which are hard to change.
Modification
topicFactoryClassName
andTopicFactory
interface.Result