Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pulsar-client] Fix send to deadLetterTopic not working when reach maxRedeliverCount #14317

Merged

Conversation

Shawyeok
Copy link
Contributor

Motivation

If a message reached maxRedeliverCount, it will send to deadLetterTopic, since 2.8.0, this mechanism is broken, it was introduced in #9970

You can reproduce with code below:

class Scratch {

    private static final Logger LOG = LoggerFactory.getLogger(Scratch.class);

    public static void main(String[] args) throws Exception {
        PulsarClient pulsarClient = PulsarClient.builder()
                .serviceUrl(args[0])
                .build();
        DeadLetterPolicy deadLetterPolicy = DeadLetterPolicy.builder()
                .maxRedeliverCount(0)
                .build();
        Consumer<byte[]> consumer = pulsarClient.newConsumer()
                .topic(args[1])
                .subscriptionName("consumeTest")
                .subscriptionType(SubscriptionType.Shared)
                .deadLetterPolicy(deadLetterPolicy)
                .ackTimeout(10, TimeUnit.SECONDS)
                .subscribe();
        int i = 1;
        Message<byte[]> message;
        while ((message = consumer.receive()) != null) {
            MessageId messageId = message.getMessageId();
            LOG.info("Receive a message {}: {}", messageId, new String(message.getData()));
            if (i-- > 0) {
                LOG.info("Skip a message {}", messageId);
                continue;
            }
            consumer.acknowledge(messageId);
        }
        new CountDownLatch(1).await();
    }
}

It will log exception below:

Dead letter producer exception with topic: {{topic}}
java.util.concurrent.CompletionException: java.lang.NullPointerException
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
	at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:659)
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632)
	at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.NullPointerException
	at java.util.Objects.requireNonNull(Objects.java:203)
	at org.apache.pulsar.client.impl.schema.AbstractSchema.atSchemaVersion(AbstractSchema.java:81)
	at org.apache.pulsar.client.impl.MessageImpl.getReaderSchema(MessageImpl.java:398)
	at org.apache.pulsar.client.impl.ConsumerImpl.lambda$processPossibleToDLQ$38(ConsumerImpl.java:1692)
	at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656)
	... 6 more
    // MessageImpl#getReaderSchema
    public Optional<Schema<?>> getReaderSchema() {
        ensureSchemaIsLoaded();
        if (schema == null) {
            return Optional.empty();
        }
        if (schema instanceof AutoConsumeSchema) {
            byte[] schemaVersion = getSchemaVersion();
            return Optional.of(((AutoConsumeSchema) schema)
                    .atSchemaVersion(schemaVersion));
        } else if (schema instanceof AbstractSchema) {
            byte[] schemaVersion = getSchemaVersion();    // schemaVersion may be null,  e.g. BYTES schema
            return Optional.of(((AbstractSchema<?>) schema)
                    .atSchemaVersion(schemaVersion));           // if schemaVersion is null, a NPE will throw
        } else {
            return Optional.of(schema);
        }
    }

Modifications

Make AbstractSchema#atSchemaVersion throw NPE only if supportSchemaVersioning is true and schemaVersion is null.

Verifying this change

This change added tests and can be verified as follows:

  • Added a unit test for MessageImpl#getReaderSchema with a message which has BYTES schema

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)
  • The rest endpoints: (no)
  • The admin cli options: (no)
  • Anything that affects deployment: (no)

Documentation

Check the box below or label this PR directly (if you have committer privilege).

Need to update docs?

  • doc-required

    (If you need help on updating docs, create a doc issue)

  • no-need-doc

Bugfix only

  • doc

    (If this PR contains doc changes)

@Shawyeok
Copy link
Contributor Author

Could you please take a look about this? @hangc0276 @congbobo184

Copy link
Contributor

@Technoboy- Technoboy- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@315157973 315157973 merged commit 16beb9d into apache:master Feb 22, 2022
@Test
public void testMessageImplGetReaderSchema() {
MessageMetadata builder = new MessageMetadata();
builder.hasSchemaVersion();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Shawyeok - Why was this line necessary? It seems like it's not doing anything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, should I open a PR to remove this line?

michaeljmarshall pushed a commit that referenced this pull request Feb 24, 2022
…14317)

If a message reached maxRedeliverCount, it will send to deadLetterTopic, since 2.8.0, this mechanism is broken, it was introduced in #9970

(cherry picked from commit 16beb9d)
@michaeljmarshall michaeljmarshall added the cherry-picked/branch-2.8 Archived: 2.8 is end of life label Feb 24, 2022
@codelipenghui codelipenghui modified the milestones: 2.11.0, 2.10.0 Feb 25, 2022
codelipenghui pushed a commit that referenced this pull request Feb 25, 2022
…14317)

If a message reached maxRedeliverCount, it will send to deadLetterTopic, since 2.8.0, this mechanism is broken, it was introduced in #9970

(cherry picked from commit 16beb9d)
nicoloboschi pushed a commit to nicoloboschi/pulsar that referenced this pull request Mar 1, 2022
…pache#14317)

If a message reached maxRedeliverCount, it will send to deadLetterTopic, since 2.8.0, this mechanism is broken, it was introduced in apache#9970
gaoran10 pushed a commit that referenced this pull request Mar 1, 2022
…14317)

If a message reached maxRedeliverCount, it will send to deadLetterTopic, since 2.8.0, this mechanism is broken, it was introduced in #9970

(cherry picked from commit 16beb9d)
@gaoran10 gaoran10 added the cherry-picked/branch-2.9 Archived: 2.9 is end of life label Mar 2, 2022
Nicklee007 pushed a commit to Nicklee007/pulsar that referenced this pull request Apr 20, 2022
…pache#14317)

If a message reached maxRedeliverCount, it will send to deadLetterTopic, since 2.8.0, this mechanism is broken, it was introduced in apache#9970
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/client cherry-picked/branch-2.8 Archived: 2.8 is end of life cherry-picked/branch-2.9 Archived: 2.9 is end of life doc-not-needed Your PR changes do not impact docs release/2.8.3 release/2.9.2
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

8 participants