Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ROCKETMQ-121]Support message filtering based on SQL92 #82

Closed
wants to merge 3 commits into from

Conversation

vsair
Copy link
Contributor

@vsair vsair commented Mar 22, 2017

Support message filtering based on SQL92


So far, RocketMQ only support message filtering feature by TAG, but one message only can own one tag, this is too limited to meet complex business requirements.

So, we want to define and implement a reasonable filter language based on a subset of the SQL 92 expression syntax to support customized message filtering.

Why subset of SQL92

Let RocketMQ has the ability of message filtering is the purpose of this issue, no matter SQL92 or any other languages.

As I know, ActiveMQ already impllement this functionality based on JavaCC, it's simple and exntensible.So I just extract it and integrate into RocketMQ, only some grammars:

  1. numeric comparison, like >, >=, <,<=, BETWEEN
  2. character comparison, like =, <>, IN
  3. check NULL
  4. logical AND, logical OR, logical NOT

Design

  • New Module, rocketmq-filter

The implementation of SQL92 language is placed in this module which have dependency on common module.

Broker compile or evaluate expression through the interface of FilterSpi contained in FilterFactory that manage all FilterSpi and also support new one to register.

  • How to manage consumer's expression data

Different from tag filtering, expression of SQL92 should be compiled first to check whether is leagal and then use the complied expression to compute. This procedure is designed to take place at broker.

ConsumerManager manage the suscriptions of push consumer, and ConsumerFilterManager manage the expression info of push consumer who wish to filter message by special language, the info includes data version, expression, compiled expression, alive time and etc.

  • How to filter message by expression

I redesign the interface getMessage of MessageStore by replace the last parameter SubscriptionData to MessageFilter that is also refactored. The purpose is to make module rocketmq-store has no relation with protocol.

When get message, the implementation ExpressionMessageFilter would check whether the message is matched by BitsArray which will be refered later or evaluation, just as the mechanism of tag filtering.

  • Optimization, pre-calculate the filtering result when build consume queue

It's poor performance to do filter when pull message:

  1. off-heap to heap, once every consumer subscribed same topic pull message.
  2. decode message properties, once every consumer subscribed same topic pull message.

BloomFilter and pre-calculation are adopted to optimize the situation:

  1. Every consumer has been asigned some bit position of BloomFilter when register to broker.
  2. When broker build queue after message into CommitLog, the consumer's filtering result would be calculated, and all resuls are assembled as a BitsArray saved in ConsumeQueueExt.
  3. ConsumeQueueExt is a store file linked to ConsumeQueue, ConsumeQueue could find the data by the tagsCode whitch is already replaced by the address(for compitable, the range is Long.MIN_VALUE to Integer.MIN_VALUE) generated by ConsumeQueueExt.
  4. ExpressionMessageFilter could use the BitsArray to check whether the message is matched. Because of BloomFilter's collision, it also need to decode properties to do calculation for matched message(may could be reduced by check the collision, not include in this edition).

This optimization is suitable for:

  1. High subscription ratio.
  2. Large properties.

Interface

Only push consumer could filter message by SQL92 expression in this edition.

@coveralls
Copy link

Coverage Status

Coverage increased (+3.4%) to 35.261% when pulling e258309 on vsair:ROCKETMQ-121 into 72e6def on apache:develop.

1 similar comment
@coveralls
Copy link

Coverage Status

Coverage increased (+3.4%) to 35.261% when pulling e258309 on vsair:ROCKETMQ-121 into 72e6def on apache:develop.

@coveralls
Copy link

Coverage Status

Coverage increased (+3.8%) to 35.617% when pulling a9cea30 on vsair:ROCKETMQ-121 into 72e6def on apache:develop.

2 similar comments
@coveralls
Copy link

Coverage Status

Coverage increased (+3.8%) to 35.617% when pulling a9cea30 on vsair:ROCKETMQ-121 into 72e6def on apache:develop.

@coveralls
Copy link

Coverage Status

Coverage increased (+3.8%) to 35.617% when pulling a9cea30 on vsair:ROCKETMQ-121 into 72e6def on apache:develop.

@vsair vsair changed the title [Rocketmq 121]Support message filtering based on SQL92 [Rocketmq-121]Support message filtering based on SQL92 Mar 27, 2017
@vsair vsair changed the title [Rocketmq-121]Support message filtering based on SQL92 [ROCKETMQ-121]Support message filtering based on SQL92 Mar 27, 2017
@vongosling
Copy link
Member

vongosling commented Mar 30, 2017

@shroman @lizhanhui @lollipopjin Could you help us to review this great job ?

public interface ConsumerIdsChangeListener {
void consumerIdsChanged(final String group, final List<Channel> channels);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great refactoring

@coveralls
Copy link

Coverage Status

Coverage increased (+3.7%) to 35.609% when pulling 67fe978 on vsair:ROCKETMQ-121 into 45a64fd on apache:develop.

2 similar comments
@coveralls
Copy link

Coverage Status

Coverage increased (+3.7%) to 35.609% when pulling 67fe978 on vsair:ROCKETMQ-121 into 45a64fd on apache:develop.

@coveralls
Copy link

Coverage Status

Coverage increased (+3.7%) to 35.609% when pulling 67fe978 on vsair:ROCKETMQ-121 into 45a64fd on apache:develop.


import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.filter.util.BitsArray;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BitMap ? Could we use the roaring bitmap, https://github.com/RoaringBitmap/RoaringBitmap


ret = filterData.getCompiledExpression().evaluate(context);
} catch (Throwable e) {
log.error("Calc filter bit map error!commitLogOffset=" + request.getCommitLogOffset() +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use {} replace +

log.warn("Spend {} ms to calc bit map, consumerNum={}, topic={}", eclipseTime, filterDatas.size(), request.getTopic());
}
} catch (Throwable e) {
log.error("Calc bit map error! topic=" + request.getTopic() + ", offset=" + request.getCommitLogOffset()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow the previous comment

return this.deadTime >= this.bornTime;
}

public long deadHowLong() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is deadhowlong

}

@Override
public boolean equals(Object o) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use ToStringBuilder in Commons Lang3 to build equal and hash override method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already replaced by EqualsBuilder, ToStringBuilder and HashCodeBuilder.

* Represents a constant expression
* <p>
* This class was taken from ActiveMQ org.apache.activemq.filter.ConstantExpression,
* but:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great~

* limitations under the License.
*/

/* Generated By:JavaCC: Do not edit this line. ParseException.java Version 5.0 */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why java Version 5.0

* significant. This allows a positive 32-bit number to be returned for all
* cases.
* <br>Don't change the order of algorithms, add new algorithm to last if you want.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommended commons codec's hashing algorithms or guava hash

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll check whether the third party's jar could meet the scene.

/**
* Simple implement of bloom filter.
*/
public class BloomFilter {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommended existent mature BloomFilter in guava or hahoop‘s

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, I replaced HashAlgorithm to Guava Hashing.murmur3_128. Most classes of Guava's BloomFilter are private and final, which could not be included by RocketMQ, such as BitArray saved in store.So I didn't use it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, murmur3_128

* @param <K>
* @param <V>
*/
public class LRUCache<K, V> extends LinkedHashMap<K, V> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we replace with guava CacheBuilder?

@coveralls
Copy link

Coverage Status

Coverage increased (+3.7%) to 35.602% when pulling 0692568 on vsair:ROCKETMQ-121 into 45a64fd on apache:develop.

2 similar comments
@coveralls
Copy link

Coverage Status

Coverage increased (+3.7%) to 35.602% when pulling 0692568 on vsair:ROCKETMQ-121 into 45a64fd on apache:develop.

@coveralls
Copy link

Coverage Status

Coverage increased (+3.7%) to 35.602% when pulling 0692568 on vsair:ROCKETMQ-121 into 45a64fd on apache:develop.

@coveralls
Copy link

Coverage Status

Coverage increased (+3.4%) to 35.262% when pulling 24f6ada on vsair:ROCKETMQ-121 into 45a64fd on apache:develop.

2 similar comments
@coveralls
Copy link

Coverage Status

Coverage increased (+3.4%) to 35.262% when pulling 24f6ada on vsair:ROCKETMQ-121 into 45a64fd on apache:develop.

@coveralls
Copy link

Coverage Status

Coverage increased (+3.4%) to 35.262% when pulling 24f6ada on vsair:ROCKETMQ-121 into 45a64fd on apache:develop.

@lizhanhui
Copy link
Contributor

Will look into this PR today.

@vongosling
Copy link
Member

vongosling commented Apr 17, 2017

I think only servtool‘s module has guava dependency, so there is no transitive dependency pollution on sdk, right?

@vongosling
Copy link
Member

please @lizhanhui @shroman help to review this great PR :-)

@vsair
Copy link
Contributor Author

vsair commented Apr 17, 2017

@vongosling yeah, only server's modules have guava dependency, client doesn't.

@zhouxinyu
Copy link
Member

@lizhanhui , Do you have any other thoughts about this PR? If no, I will merge this PR soon.

And please @vsair help resolve the conflicting files.

@lizhanhui
Copy link
Contributor

@zhouxinyu I have checked the major data flow and it looks good to me. I have not scrutinized the changes line by line yet. You may merge it first and we may discuss potential issues hereafter.

1. Add filter module
2. Manage consumer filter expression
3. Support pre calculate filter result when build consume queue.
4. Check whether server support feature of sql when consumer start, maybe it's not the best solution (When network is not stable?).
1. Srvutil module include Guava
2. Replace LRUCache to CacheBuilder, replace HashAlgorithm to Guava Hashing.murmur3_128
@coveralls
Copy link

Coverage Status

Coverage increased (+3.3%) to 37.892% when pulling 09dca78 on vsair:ROCKETMQ-121 into 42f78c2 on apache:develop.

2 similar comments
@coveralls
Copy link

Coverage Status

Coverage increased (+3.3%) to 37.892% when pulling 09dca78 on vsair:ROCKETMQ-121 into 42f78c2 on apache:develop.

@coveralls
Copy link

Coverage Status

Coverage increased (+3.3%) to 37.892% when pulling 09dca78 on vsair:ROCKETMQ-121 into 42f78c2 on apache:develop.

@vsair
Copy link
Contributor Author

vsair commented Apr 21, 2017

@zhouxinyu Conflicts have been resolved. Thanks.

@dongeforever
Copy link
Member

It seems that this PR could be merged now. @vongosling @zhouxinyu @lizhanhui

@Jaskey
Copy link
Contributor

Jaskey commented Jun 12, 2017

@vsair

I have two things to confirms,

  1. if the old messages which exists before consumer subscribe with the filter expression must be filtered by decodng properties to do calculation ?

  2. expectConsumerNumUseFilter = 32 and maxErrorRateOfBloomFilter = 20 can be configured and changed, what if I change this value and restart, does the calculated results in the ConsumeQueueExt updated accordingly?

@vsair
Copy link
Contributor Author

vsair commented Jun 12, 2017 via email

@Jaskey
Copy link
Contributor

Jaskey commented Jun 12, 2017

@vsair

Thanks for your clarification. I am just trying to go through the main logic of sql filter.

For the point of

When they are changed, the consumer's subscription will be ignored when
starting, so the results int bloom filter also will be ignored. That is
mean server will do calculation again and not update the results already
existed in Store.

where is that code logic ? I search for usage of getExpectConsumerNumUseFilter but I find that it's only used to create a new bloom filter instance when starting , then how does broker detect that the configuration is changed? Are we saying the same thing.

What I am concerned is that, if we change any configuration about the bloom filter, the bit result may be different as it was before changing configuration , so it will be different from the result persisted in the consume queue ext. I wonder that will broker works as usual to filter the expected messages as it did before.

JiaMingLiu93 pushed a commit to JiaMingLiu93/rocketmq that referenced this pull request May 28, 2020
JiaMingLiu93 pushed a commit to JiaMingLiu93/rocketmq that referenced this pull request May 28, 2020
pingww pushed a commit that referenced this pull request Aug 26, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants