Skip to content

Conversation

@mattisonchao
Copy link
Member

@mattisonchao mattisonchao commented Aug 23, 2022

Fixes #17224

Master Issue: #17224

Motivation

From the log, we can see that when we deleted the schema, the schema was re-registered due to the reconnecting of the producer.

2022-08-23T12:12:32,213 - INFO - [pulsar-web-32-16:Producer@642] - Disconnecting producer: Producer{topic=PersistentTopic{topic=persistent://public/test-namespace-sfyppbtzkaqjgdgw/test-delete-topic-and-schema}, client=/127.0.0.1:59612, producerName=test-0-0, producerId=0}
2022-08-23T12:12:32,213 - INFO - [pulsar-web-32-16:Producer@642] - Disconnecting producer: Producer{topic=PersistentTopic{topic=persistent://public/test-namespace-sfyppbtzkaqjgdgw/test-delete-topic-and-schema}, client=/127.0.0.1:59612, producerName=test-0-1, producerId=1}
2022-08-23T12:12:32,217 - INFO - [pulsar-client-io-37-1:ClientCnx@748] - [localhost/127.0.0.1:59605] Broker notification of Closed producer: 0
2022-08-23T12:12:32,217 - INFO - [pulsar-client-io-37-1:ConnectionHandler@139] - [persistent://public/test-namespace-sfyppbtzkaqjgdgw/test-delete-topic-and-schema] [test-0-0] Closed connection [id: 0x9682e2cf, L:/127.0.0.1:59612 - R:localhost/127.0.0.1:59605] -- Will try again in 0.1 s
2022-08-23T12:12:32,218 - INFO - [pulsar-client-io-37-1:ClientCnx@748] - [localhost/127.0.0.1:59605] Broker notification of Closed producer: 1
2022-08-23T12:12:32,218 - INFO - [pulsar-client-io-37-1:ConnectionHandler@139] - [persistent://public/test-namespace-sfyppbtzkaqjgdgw/test-delete-topic-and-schema] [test-0-1] Closed connection [id: 0x9682e2cf, L:/127.0.0.1:59612 - R:localhost/127.0.0.1:59605] -- Will try again in 0.1 s
2022-08-23T12:12:32,218 - INFO - [ForkJoinPool.commonPool-worker-1:AbstractTopic@646] - Delete schema storage of id: public/test-namespace-sfyppbtzkaqjgdgw/test-delete-topic-and-schema
2022-08-23T12:12:32,221 - INFO - [ForkJoinPool.commonPool-worker-2:BookkeeperSchemaStorage@388] - deleteSchema : public/test-namespace-sfyppbtzkaqjgdgw/test-delete-topic-and-schema, forcefully : false
2022-08-23T12:12:32,240 - INFO - [metadata-store-12-1:BookkeeperSchemaStorage@417] - deleted schema path : /schemas/public/test-namespace-sfyppbtzkaqjgdgw/test-delete-topic-and-schema
2022-08-23T12:12:32,240 - INFO - [metadata-store-12-1:SchemaRegistryServiceImpl@276] - [public/test-namespace-sfyppbtzkaqjgdgw/test-delete-topic-and-schema] Delete schema storage finished
2022-08-23T12:12:32,241 - INFO - [metadata-store-12-1:NamespaceEventsSystemTopicFactory@42] - Create topic policies system topic client persistent://public/test-namespace-sfyppbtzkaqjgdgw/__change_events
2022-08-23T12:12:32,272 - INFO - [pulsar-io-6-3:ProducerImpl@1617] - [persistent://public/test-namespace-sfyppbtzkaqjgdgw/__change_events] [null] Creating producer on cnx [id: 0x9ed5f11f, L:/127.0.0.1:59613 - R:localhost/127.0.0.1:59605]
2022-08-23T12:12:32,278 - INFO - [pulsar-io-6-4:SchemaRegistryServiceImpl@167] - [public/test-namespace-sfyppbtzkaqjgdgw/__change_events] 1 schemas is found
2022-08-23T12:12:32,286 - INFO - [mock-pulsar-bk-OrderedExecutor-0-0:ServerCnx@1443] - [/127.0.0.1:59613] Created new producer: Producer{topic=SystemTopic{topic=persistent://public/test-namespace-sfyppbtzkaqjgdgw/__change_events}, client=/127.0.0.1:59613, producerName=test-0-2, producerId=0}
2022-08-23T12:12:32,288 - INFO - [pulsar-io-6-3:ProducerImpl@1672] - [persistent://public/test-namespace-sfyppbtzkaqjgdgw/__change_events] [test-0-2] Created producer on cnx [id: 0x9ed5f11f, L:/127.0.0.1:59613 - R:localhost/127.0.0.1:59605]
2022-08-23T12:12:32,319 - INFO - [pulsar-timer-43-1:ConnectionHandler@143] - [persistent://public/test-namespace-sfyppbtzkaqjgdgw/test-delete-topic-and-schema] [test-0-0] Reconnecting after timeout
2022-08-23T12:12:32,323 - WARN - [pulsar-io-6-3:Crc32cIntChecksum@44] - Failed to load Circe JNI library. Falling back to Java based CRC32c provider
2022-08-23T12:12:32,324 - INFO - [pulsar-timer-43-1:ConnectionHandler@143] - [persistent://public/test-namespace-sfyppbtzkaqjgdgw/test-delete-topic-and-schema] [test-0-1] Reconnecting after timeout
2022-08-23T12:12:32,351 - INFO - [pulsar-web-32-16:Slf4jRequestLogWriter@62] - 127.0.0.1 - - [23/Aug/2022:12:12:32 +0800] “GET /lookup/v2/topic/persistent/public/test-namespace-sfyppbtzkaqjgdgw/test-delete-topic-and-schema HTTP/1.1” 200 217 “-” “Pulsar-Java-v2.11.0-SNAPSHOT” 13
2022-08-23T12:12:32,351 - INFO - [pulsar-web-32-14:Slf4jRequestLogWriter@62] - 127.0.0.1 - - [23/Aug/2022:12:12:32 +0800] “GET /lookup/v2/topic/persistent/public/test-namespace-sfyppbtzkaqjgdgw/test-delete-topic-and-schema HTTP/1.1" 200 217 “-” “Pulsar-Java-v2.11.0-SNAPSHOT” 23
2022-08-23T12:12:32,351 - INFO - [pulsar-client-io-37-1:ProducerImpl@1617] - [persistent://public/test-namespace-sfyppbtzkaqjgdgw/test-delete-topic-and-schema] [test-0-0] Creating producer on cnx [id: 0x9682e2cf, L:/127.0.0.1:59612 - R:localhost/127.0.0.1:59605]
2022-08-23T12:12:32,353 - INFO - [pulsar-client-io-37-1:ProducerImpl@1617] - [persistent://public/test-namespace-sfyppbtzkaqjgdgw/test-delete-topic-and-schema] [test-0-1] Creating producer on cnx [id: 0x9682e2cf, L:/127.0.0.1:59612 - R:localhost/127.0.0.1:59605]
2022-08-23T12:12:32,362 - INFO - [metadata-store-12-1:SchemaRegistryServiceImpl@167] - [public/test-namespace-sfyppbtzkaqjgdgw/test-delete-topic-and-schema] 0 schemas is found
2022-08-23T12:12:32,363 - INFO - [metadata-store-12-1:SchemaRegistryServiceImpl@167] - [public/test-namespace-sfyppbtzkaqjgdgw/test-delete-topic-and-schema] 0 schemas is found

2022-08-23T12:12:32,365 - INFO - [mock-pulsar-bk-OrderedExecutor-0-0:PulsarMockBookKeeper@122] - Creating ledger 8
2022-08-23T12:12:32,366 - INFO - [mock-pulsar-bk-OrderedExecutor-0-0:PulsarMockBookKeeper@122] - Creating ledger 9

We can look at the code as follow to know why we can add the schema even topic is fenced.

service.getOrCreateTopic(topicName.toString()).thenCompose((Topic topic) -> {
// Before creating producer, check if backlog quota exceeded
// on topic for size based limit and time based limit
CompletableFuture<Void> backlogQuotaCheckFuture = CompletableFuture.allOf(
topic.checkBacklogQuotaExceeded(producerName, BacklogQuotaType.destination_storage),
topic.checkBacklogQuotaExceeded(producerName, BacklogQuotaType.message_age));
backlogQuotaCheckFuture.thenRun(() -> {
// Check whether the producer will publish encrypted messages or not
if ((topic.isEncryptionRequired() || encryptionRequireOnProducer) && !isEncrypted) {
String msg = String.format("Encryption is required in %s", topicName);
log.warn("[{}] {}", remoteAddress, msg);
if (producerFuture.completeExceptionally(new ServerMetadataException(msg))) {
commandSender.sendErrorResponse(requestId, ServerError.MetadataError, msg);
}
producers.remove(producerId, producerFuture);
return;
}
disableTcpNoDelayIfNeeded(topicName.toString(), producerName);
CompletableFuture<SchemaVersion> schemaVersionFuture = tryAddSchema(topic, schema);
schemaVersionFuture.exceptionally(exception -> {

Modifications

  • Close producer before deleting topic.

Verifying this change

  • Make sure that the change passes the CI checks.

Documentation

Check the box below or label this PR directly.

Need to update docs?

  • doc-required
    (Your PR needs to update docs and you will update later)

  • doc-not-needed
    (Please explain why)

  • doc
    (Your PR contains doc changes)

  • doc-complete
    (Docs have been already added)

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Aug 23, 2022
@mattisonchao mattisonchao self-assigned this Aug 23, 2022
@mattisonchao mattisonchao added this to the 2.12.0 milestone Aug 23, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Flaky-test: SchemaTest.testDeleteTopicAndSchema

3 participants