-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[pulsar-broker] Fix: handle topic loading failure due to broken schema ledger #9212
Conversation
b7b579d
to
fbe7fb9
Compare
/pulsarbot run-failure-checks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lgtm
I left one nit about a little typo in a test
Awesome work
producer.close(); | ||
|
||
String key = TopicName.get(fqtnOne).getSchemaName(); | ||
BookkeeperSchemaStorage schemaStrogate = (BookkeeperSchemaStorage) pulsar.getSchemaStorage(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: strogate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed it.
@@ -41,6 +41,8 @@ | |||
|
|||
CompletableFuture<SchemaVersion> deleteSchemaStorage(String schemaId); | |||
|
|||
CompletableFuture<SchemaVersion> deleteSchemaStorage(String schemaId, boolean forcefully); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mark this a default method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that will be tricky because deleteSchemaStorage(schemaId)
calls deleteSchemaStorage(schemaId, false)
. and we can't define default behavior of deleteSchemaStorage(schemaId, false)
by calling existing deleteSchemaStorage(schemaId)
because it creates cyclic call.
@@ -32,6 +32,8 @@ | |||
|
|||
CompletableFuture<List<CompletableFuture<StoredSchema>>> getAll(String key); | |||
|
|||
CompletableFuture<SchemaVersion> delete(String key, boolean forcefully); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make this a default method?
trimDeletedSchemaAndGetList(list); | ||
// clean up the broken schema from zk | ||
deleteSchemaStorage(schemaId, true).handle((sv, th) -> { | ||
log.info("Deletion of {} {}", schemaId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What a user needs to do when he/she reads this message? Especially if it is "Deletion of ... failed", it might be worth adding more details for the "failed" case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, added.
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
…a ledger add more error log fix list assignment
63f6953
to
e4634f2
Compare
…a ledger (apache#9212) add more error log fix list assignment
Motivation
Sometimes schema ledger gets deleted but failed to clean up schema-locator in zk which can make the topic unavailable and inaccessible in the broker. It mainly happens when the broker tries to delete an inactive topic and the user tries to connect the producer again to that topic and can't access the topic due to the below error:
In this case, if the storage ledger doesn't exist then it's a non-recoverable error and the broker should be resilient to clean up the broken schema locator and allow the topic to load again.
client-error
Broker-error
Modification