Skip to content

[Enhancement] Support using sql to filter messages #18483

@hzh0425

Description

@hzh0425

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

Currently, Pulsar uses the pluggable EntryFilter method for message filtering based on #12269. Users need to implement EntryFilter by themselves, and then filter messages on the broker side.
This way will require many tedious steps, as shown in the documentation https://github.com/apache/pulsar/blob/master/site2/docs/develop-plugin.md.

We hope to introduce a more convenient way to filter messages through sql. Consumers can implement automatic filtering on the broker side through simple subscriptions without implementing EntryFilter.

For example
We have the following message, which has two properties:

producer. newMessage()
                    .value(messageValue)
                    .property("country", "cn")
                    .property("age", "30")
                    .send();

On the consumer side, we want to filter messages through sql:

 ConsumerBuilder<String> builder = client. newConsumer(Schema. STRING)
                .topic(topicName)
                .subscriptionName(randomName(8))
                .subscriptionType(SubscriptionType. Exclusive);

builder.subscriptBySql("country IS NOT NULL AND age IS NOT NULL AND country == 'cn' AND age >= 30 ")

Solution

We need to implement a sql filtering mechanism on the broker side, that is, an EntryFilter that can support sql.
This way, users don't need to implement EntryFilter themselves.

In addition, we need to add an api similar to 'subscriptBySql' on the client side, so that users can subscribe more easily

Alternatives

No response

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions