-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DBZ-7043: Emit a notification when completed reading from a capture instance #4939
Conversation
@ramanenka Thanks a lot for the PR. Before metging I'd like to propose one change. @mfvitale Has implemented Notification feature in Debezium 2.3. What if you change the code in the way that instead of a direct call to stored procedure a notification will be emitted? The SP call itself would be implemented as a custom notification specific for your environment. No new config option will be needed and the code will be generalized. |
@jpechane Thanks for the suggestion. It sound reasonable. I'll look into rewriting the patch to use Notification feature in the upcoming weeks. |
@jpechane I've updated the PR with the suggested change to emit a notification instead of calling the SP directly. Could you please take a look? |
@ramanenka LGTM. Can you please fix the compilation issues with |
@@ -37,7 +37,7 @@ public interface ChangeEventSourceFactory<P extends Partition, O extends OffsetC | |||
/** | |||
* Returns a streaming change event source that starts streaming at the given offset. | |||
*/ | |||
StreamingChangeEventSource<P, O> getStreamingChangeEventSource(); | |||
StreamingChangeEventSource<P, O> getStreamingChangeEventSource(NotificationService<P, O> notificationService); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd propose a sligtly different approach. The method signature should not be changed. A new default
method with the new signature will be introduced that will by defaul call the old method. Only connectors requiring notification service will override it (namely SQL Server). In that case the change will be minimally intrusive. WDYT?
The implementation itself is fine from my PoV.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand the idea correctly it's gonna look something like this:
StreamingChangeEventSource<P, O> getStreamingChangeEventSource();
default StreamingChangeEventSource<P, O> getStreamingChangeEventSourceWithNotificationService(NotificationService<P, O> notificationService) {
return getStreamingChangeEventSource();
}
Then inside io.debezium.pipeline.ChangeEventSourceCoordinator#initStreamEvents
the streamingSource
instance would be created this way
streamingSource = changeEventSourceFactory.getStreamingChangeEventSourceWithNotificationService(notificationService);
instead of
streamingSource = changeEventSourceFactory.getStreamingChangeEventSource();
This way changes in connectors other than SQL Server are not needed. The downside I see here is that although SqlServerChangeEventSourceFactory
won't use getStreamingChangeEventSource
method it still has to implement it as it's required by ChangeEventSourceFactory
interface. This is not critical but it's not so nice.
Another way could be to just add notificationService
to the list of SqlServerChangeEventSourceFactory
constructor arguments and then use it to instantiate SqlServerStreamingChangeEventSource
. This way it won't even require changes to ChangeEventSourceFactory
interface:
@@ -32,10 +32,11 @@ public class SqlServerChangeEventSourceFactory implements ChangeEventSourceFacto
private final EventDispatcher<SqlServerPartition, TableId> dispatcher;
private final Clock clock;
private final SqlServerDatabaseSchema schema;
+ private final NotificationService<SqlServerPartition, SqlServerOffsetContext> notificationService;
public SqlServerChangeEventSourceFactory(SqlServerConnectorConfig configuration, MainConnectionProvidingConnectionFactory<SqlServerConnection> connectionFactory,
SqlServerConnection metadataConnection, ErrorHandler errorHandler, EventDispatcher<SqlServerPartition, TableId> dispatcher,
- Clock clock, SqlServerDatabaseSchema schema) {
+ Clock clock, SqlServerDatabaseSchema schema, NotificationService<SqlServerPartition, SqlServerOffsetContext> notificationService) {
this.configuration = configuration;
this.connectionFactory = connectionFactory;
this.metadataConnection = metadataConnection;
@@ -43,6 +44,7 @@ public SqlServerChangeEventSourceFactory(SqlServerConnectorConfig configuration,
this.dispatcher = dispatcher;
this.clock = clock;
this.schema = schema;
+ this.notificationService = notificationService;
}
@Override
@@ -60,7 +62,8 @@ public StreamingChangeEventSource<SqlServerPartition, SqlServerOffsetContext> ge
dispatcher,
errorHandler,
clock,
- schema);
+ schema,
+ notificationService);
}
Would this be a better way to go? What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another way could be to just add notificationService to the list of SqlServerChangeEventSourceFactory constructor arguments and then use it to instantiate SqlServerStreamingChangeEventSource. This way it won't even require changes to ChangeEventSourceFactory interface
That sounds like a perfect solution right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jpechane I've updated the PR with the change agreed above.
…nstance Let the database know when the connector has finished reading from a capture instance by calling a stored procedure.
@ramanenka Applied, thanks! |
Once the connector has finished reading from a capture instance and has committed the offset emit a notification that would let the subscribers know that the capture instance has been read completely.