-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PIP-136] Sync Pulsar metadata across multiple clouds #16425
Conversation
@rdhabalia Please provide a correct documentation label for your PR. |
2388202
to
3c9b0b5
Compare
@codelipenghui why did we change PR release to 2.12 ? |
@rdhabalia I think that it has been a bulk update. I am reviewing the patch |
@NoArgsConstructor | ||
@ToString | ||
public class MetadataEvent { | ||
private String path; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add some 'version' field in order to better support protocol enhancements?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could have added version
later on without any schema incompatibility issue. However, there is not much harm to add. I have added it.
if (consumer != null) { | ||
return; | ||
} | ||
ConsumerBuilder<MetadataEvent> consumerBuilder = client.newConsumer(AvroSchema.of(MetadataEvent.class)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit:Please use Schema.AVRO()
Using AVRO with a Pojo that is not defined with the avro plugin leads to bad performance because it is going to use reflection.
We could generate the AVRO poco using the Maven plugin. I am 100% that it is worth (but not to be done in this patch)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can change to Schema.AVRO()
but it eventually calls AvroSchema.of. Also, it is called only once at server startup so, it should not have a performance issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as I said, it won't make much of a difference but I still have made the change so, we can progress for this PR.
@@ -221,7 +221,7 @@ public long millis() { | |||
} | |||
} | |||
|
|||
@Test(timeOut = 100000) | |||
@Test//(timeOut = 100000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please revert
MetadataEvent event = new MetadataEvent(path, null, new HashSet<>(), | ||
expectedVersion.orElse(null), Instant.now().toEpochMilli(), | ||
getMetadataEventSynchronizer().get().getClusterName(), NotificationType.Deleted); | ||
return getMetadataEventSynchronizer().get().notify(event) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we could write to the topic and then fail the write.
I understand this is a trade off.
How can we recover?
Like if I am deleting a topic and the delete fails, it may happen that on the remote cluster the topic is eventually deleted.
I wonder if we should use the topic as commit log and read from the topic from all the brokers and apply the changes coming only from the log
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that was the idea to use Topic is a WAL and it's doing the same thing along with managing API compatibility.
We are building a generic solution for metadata-store which is used for local metadata store as well and these APIs expect response in the same session the and can't depend on message processing by the another broker. It also has to maintain the API compatibility. Therefore, it processes the request and also publish a message which will be eventually be processed and consistent across all the clusters. So, it gives guarantee to have eventual consistency as all the clusters including local cluster who is going to publish will also process the message.
We have gone through the life cycle of this behavior and it should work without doing any compromise or having inconsistency. In fact, this behavior is also documented in PIP.
add unit test add findbug
3c9b0b5
to
4c7c764
Compare
@eolivelli thanks for reviewing it. I have addressed all the comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
3c89211
to
350ff16
Compare
@rdhabalia, I have moved it back to 2.10.0. There are so many old PRs and in-progress PRs, I try to move them all to 2.12.0 first and move them back after they are completed. |
* [PIP-136] Sync Pulsar policies across multiple clouds add unit test add findbug * address comments
@@ -350,7 +353,8 @@ public PulsarService(ServiceConfiguration config, | |||
this.offloaderStats = LedgerOffloaderStats.create(false, false, null, 0); | |||
} | |||
|
|||
public MetadataStore createConfigurationMetadataStore() throws MetadataStoreException { | |||
public MetadataStore createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This broke the broker's public API.
Motivation
Implementation of PIP-136
Sometimes it's not possible to share metadata-store (global zookeeper) between pulsar clusters deployed on separate cloud provider platforms, and synchronizing configuration metadata (policies) can be a critical path to share tenant/namespace/topic policies between clusters and administrate pulsar policies uniformly across all clusters. Therefore, we need a mechanism to sync configuration metadata between clusters deployed on the different cloud platforms.
PIP-136 also has a section
Rejected alternative
with use of system-topic along with metadata-synchronizer to replicate and persist local topic policies with it.doc-complete