Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-191: Support batched message using entry filter #16680

Open
AnonHxy opened this issue Jul 19, 2022 · 22 comments
Open

PIP-191: Support batched message using entry filter #16680

AnonHxy opened this issue Jul 19, 2022 · 22 comments

Comments

@AnonHxy
Copy link
Contributor

AnonHxy commented Jul 19, 2022

discuss mail-thread: https://lists.apache.org/thread/cdw5c2lpj5nwzl2zqyv8mphsqqv9vozj

Motivation

  • This PIP introduces a way to support batched message using entry filter without having to deserilize the entry, through restricting same properties of messages in one batch.

  • We already have  a plug-in way to filter entries in broker, aka PIP-105 PIP 105: Support pluggable entry filter in Dispatcher #12269.  But this way has some drawback:

    • It doesn't support batched message naturally. Because the entry filter only knows the main header of the entry and doesn't dig into the payload to deserialize the single message meta.
    • If the developer of the entry filter wants to filter batched message, he/she have to deserialize the payload to get the each message's properties , which will bring higher memory and cpu workload .
  • Let's expain the current entry filters in detail. Skip to "Solution" part directly if you have already been clear about the drawback above.

    • Today, when an entry filter receives an entry, it gets an Entry that has:

       public interface Entry {
       byte[] getData();
       byte[] getDataAndRelease();
       int getLength();
       ByteBuf getDataBuffer();
       Position getPosition();
       long getLedgerId();
       long getEntryId();
       boolean release();
       }
      

      The Entry interface doesn't let you know if this is Batched Entry.
      You also get FilterContext:

      @Data
      public class FilterContext {
          private Subscription subscription;
          private MessageMetadata msgMetadata;
          private Consumer consumer;
      

      and in MessageMetadata, you have

       // differentiate single and batch message metadata
        optional int32 num_messages_in_batch = 11 [default = 1];
      

      Which enables you to know this entry is batched.

    • The developer can determine what class would deserialize the entry byte array into a list of separate messages.
      So currently, given the entry is batched, the filter developer can act on it only by paying the cost of deserializing it.

  • Soultions
    How can we using entry filter with batched messages, and without having to deserilize the entry?

    • One of rejected alternatives
      One of alternatives is that we can alter the producers to extract specific properties from each message and place those properties values in the message metadata of the Batched Entry. The filter can then use the values to decide if to reject/accept.

      The problem is that if you have different values for a given property for each message in the batch, then the filter author can't provide a reject or accept for this entry since some messages are rejected, and some are accepted.

    • Soultion
      So the only solution is to change the way messages are batched and collect the records into a batch only if they have the same values for the properties configured to be extracted. If a message is added to the producer and the properties are not the same as the batched records, it will trigger a send of this batch and start a new batch with that message.

In summary, this proposal introduces another trigger condition to send the batch, on top of the current max count, max size, and max delay: Once a message is requested to be added to a batch of its properties (partial properteis as defined in a new configuration) values are different from the records in the batch (i.e. 1st record properties values), it will trigger the batch flush (i.e send and clear).

API Changes

  • Because we know which key/value of properties will be used in our entry filter, so we only need pick the properties which will be used to appy this proposal. Add a producer config to specialize the properties key/value. Only messages have same key/value of properties in the config will apply this proposal.

    org.apache.pulsar.client.impl.conf.ProducerConfigurationData#restrictSameValuesInBatchProperties
    
    • The  restrictSameValuesInBatchProperties type is Map<String, List<String>>, the map'key is the properties key, and map'value is the properties values.
    • If restrictSameValuesInBatchProperties is empty (default is empty), that means this grouped by properties will not take effect.
    • Messages with properties have same key/value contains in restrictSameValuesInBatchProperties will be placed into same batch.

Implementation

  • When call org.apache.pulsar.client.impl.BatchMessageContainerImpl#add,  we extract the message properties and add it to metadata:
 public boolean add(MessageImpl<?> msg, SendCallback callback) {

        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] add message to batch, num messages in batch so far {}", topicName, producerName,
                    numMessagesInBatch);
        }

        if (++numMessagesInBatch == 1) {
            try {
                // some properties are common amongst the different messages in the batch, hence we just pick it up from
                // the first message
                messageMetadata.setSequenceId(msg.getSequenceId());
                List<KeyValue> filterProperties = getProperties(msg);
                if (!filterProperties.isEmpty()) {
                    messageMetadata.addAllProperties(filterProperties);  // and message properties here
                }
  •  Also we need to add a method hasSameProperties like hasSameSchema.  Messages with same properties can be added to the same batch. Once a message with different properties is added, the producer will triger flush and sending the batch.
 private boolean canAddToCurrentBatch(MessageImpl<?> msg) {
     return batchMessageContainer.haveEnoughSpace(msg)  // messageContainer controls the memory
               && (!isMultiSchemaEnabled(false) || batchMessageContainer.hasSameSchema(msg))
                && batchMessageContainer.hasSameProperties(msg)  //  invoke it here 
                && batchMessageContainer.hasSameTxn(msg);
    }

  • In summary, most of modification in this proposal just are:

    • Extract the first message properties in the batch and fill into the BatchMessageContainerImpl#messageMetada
    • Check if the sending message has same properties with the properties in BatchMessageContainerImpl#messageMetada additionally in ProducerImpl#canAddToCurrentBatch method.

Example

There is an example maybe helpful to understand this:

  • Let's set restrictSameValuesInBatchProperties=<region=us,eu; version=1,2>
    This means only key named region values 'us' or 'eu', and version values '1' or'2' will be extracted to the batch meta properties

  • Then we have a producer that sends the messges below in order:

    • msg1 with properties: <region: eu>
    • msg2 with properties: <region: eu>
    • msg3 with properties: <region: eu, version:1, tag:a>
    • msg4 with properties: <region: eu, version:1>
    • msg5 with properties: <region: us, version:1>
    • msg6 with properties: <region: us, version:2>
    • msg7 with properties: <region: us, version:5>
    • msg8 with properties: <region: us, version:6>
  • The process of properties extraction will be:

    • msg1 and msg2 have the same properties: <region: eu>, so they will put into the same batch
    • msg3 and msg4 have the same properties: <region: eu, version:1>. tag:a in msg3 will be ignored because the restrictSameValuesInBatchProperties doesn't contains 'tag'. So msg3 and msg4 will put into the same batch.
    • msg5 and msg6 have different properties, because the value of version is different. So we publish msg5 and msg6 with different batch.
    • msg7 and msg8 have the same properties <region:us>, and <version> will be ignored because it's values doesn't exist in restrictSameValuesInBatchProperties.
  • Just to summarize, the result will be:

batch meta properties single meta properties payload single meta properties payload
batch1 <region: eu> <region: eu> msg1 <region: eu> msg2
batch2 <region: eu, version:1> <region: eu, version:1, tag:a> msg3 <region: eu, version:1> msg4
batch3 <region: us, version:1> <region: us, version:1> msg5
batch4 <region: us, version:2> <region: us, version:2> msg6
batch5 <region: us> <region: us, version:5> msg7 <region: us, version:6> msg7

Trade-off

The side effect of this behavior is that it can easily end up with tiny batches, perhaps even 1 record per batch. There is a good chance once they turn this feature on, they will lose all performance benefits of batching since the batches will be very small. It completely depends on the distribution of values.

In spite of this, we shoud clarify that, entry filter dosen't support batched messages currently. So this proposal gives a big chance that batched messages can also using entry filter. It bring great benefits especially when you have konw the distrbution of values.

Reject Alternatives

  • Implement a AbstractBatchMessageContainer ,  saying BatchMessagePropertiesBasedContainer, keeping messages with same properties in a single hashmap entry,  like BatchMessageKeyBasedContainer.

Rejection reason:  This will publish messages out of order 

@eolivelli
Copy link
Contributor

I understand the problem, and I have already thought about it, because I am the author of some filters (especially the JMS Selectors Filter) but we have to clarify more this PIP.

  1. It is not clear to me if you want to push all the messages metadata in the main header of the entry or only the metatadata of the first entry, or only the KEY.
  2. we have to clarify the new message format of the entry and update the protocol documents
  3. existing EntryFilters won't be able to work well with the new format, we must find a way to make them fail and not process garbage
  4. the same problem applies to Interceptors and Protocol Handlers (like KOP), we must make it clear in this PIP what is the upgrade path and give some suggestions to the developers of such components

@AnonHxy
Copy link
Contributor Author

AnonHxy commented Jul 22, 2022

  1. It is not clear to me if you want to push all the messages metadata
    in the main header of the entry or only the metatadata of the first
    entry, or only the KEY.

I just want to copy the partial message properties(we call it batch properties here) to the main header of the entry, without modifying the payload or single message metadata. All messages in one batch must have the same batch properties (both key and value).

  1. we have to clarify the new message format of the entry and update
    the protocol documents

It seems that the protocol documents has not described the batch message main header. It just describes the SingleMessageMetadata. I agree with that It's better to update the protocol documents to describe the batch message main header.

  1. existing EntryFilters won't be able to work well with the new
    format, we must find a way to make them fail and not process garbage

It seems that there is no compatibility problems. Because I just add properties to the main header , which is empty before, and there is no modification for the payload and single message metadata. The existing EntryFilters can be able to work well with the new
format, the same as the existing Protocol Handlers. :)

@asafm
Copy link
Contributor

asafm commented Jul 24, 2022

All messages in one batch must have the same batch properties (both key and value

Is this true now or do you plan to enforce it?

batchedFilterProperties

What does this contain? Property names to extract from the messages to be placed in the batch entry metadata properties?

Regarding the extraction. Say I have Message 1 with property region=eu, and Message 2 with property region=us. The end result of batch metadata properties will be region=eu, us?
The reason I'm asking is: Can you please clarify in the doc the process of properties extraction exactly.

@AnonHxy
Copy link
Contributor Author

AnonHxy commented Jul 24, 2022

Hi Asaf @asafm

Is this true now or do you plan to enforce it?

This is what I plan to enforce it. The batch entry metadata properties is empty now

What does this contain? Property names to extract from the messages to be placed in the batch entry metadata properties?

It contains the properties keys that will be put into the batch entry metadata properties.

For example:

  • Let's set batchedFilterProperties=<region, version>
    This means only key named region and versionwill be extracted to the batch meta properties

  • Then we have a producer that sends the messges below in order:

    • msg1 with properties: <region: eu>
    • msg2 with properties: <region: eu>
    • msg3 with properties: <region: eu, version:1, tag:a>
    • msg4 with properties: <region: eu, version:1>
    • msg5 with properties: <region: us, version:1>
    • msg6 with properties: <region: us, version:2>
  • The process of properties extraction will be:

    • msg1 and msg2 have the same properties: <region: eu>, so they will put into the same batch
    • msg3 and msg4 have the same properties: <region: eu, version:1>. tag:a in msg3 will be ignored because the batchedFilterProperties doesn't contains 'tag'. So msg3 and msg4 will put into the same batch.
    • msg5 and msg6 have different properties, because the value of version is different. So we publish msg5 and msg6 with different batch.
  • Just to summarize, the result will be:

batch meta properties single meta properties payload single meta properties payload
batch1 <region: eu> <region: eu> msg1 <region: eu> msg2
batch2 <region: eu, version:1> <region: eu, version:1, tag:a> msg3 <region: eu, version:1> msg4
batch3 <region: us, version:1> <region: us, version:1> msg5
batch4 <region: us, version:2> <region: us, version:2> msg6

@asafm
Copy link
Contributor

asafm commented Jul 24, 2022

Ok, what you just wrote, if I understand correctly is pretty big and must be well documented in the proposal. You're saying that you will change the way batching works today. If today I have certain knobs which controls how batching is preparing: max batch size, timeout, max number of records and that's it, now you're changing it such that the batching will grouped by the properties defined (batchedFilterProperties). This might have serious performance hit not? Might generate smaller batches or increase latency?

Why not have list of values for property: region=us,eu ?

@AnonHxy
Copy link
Contributor Author

AnonHxy commented Jul 24, 2022

Might generate smaller batches or increase latency?

Yes. This will generate smaller batches if batchedFilterProperties is not empty. But the advantage is that there is no need to deserialize the entry in the broker or just publish non-batched messages if users want to fliter entries.

Why not have list of values for property: region=us,eu ?

Good idea. Users should know which values they will use. It seems reasonable.

@asafm
Copy link
Contributor

asafm commented Jul 26, 2022

Might generate smaller batches or increase latency?

Yes. This will generate smaller batches if batchedFilterProperties is not empty. But the advantage is that there is no need to deserialize the entry in the broker or just publish non-batched messages if users want to fliter entries.

Why not have list of values for property: region=us,eu ?

Good idea. Users should know which values they will use. It seems reasonable.

What I am suggesting is to turn this idea a bit: Instead of batching per the same values of keys, batch all messages, and for each property save a list of values in the metadata. This way you are not affecting the latency on the client side or batch size, yet you still have the ability to filter batches based on metadata without deserializing it.

@AnonHxy
Copy link
Contributor Author

AnonHxy commented Jul 26, 2022

What I am suggesting is to turn this idea a bit: Instead of batching per the same values of keys, batch all messages, and for each property save a list of values in the metadata. This way you are not affecting the latency on the client side or batch size, yet you still have the ability to filter batches based on metadata without deserializing it.

Thanks for your suggestion @asafm . If I understand correctly, it means that all the properties, which need to be filted, will be put into the entry header meta. So the protocol will be like below:

batch meta properties single meta properties payload single meta properties payload
batch <region: eu,us; version:1> <region: eu> msg1 <region: us; version:1> msg2

But this will not work will for entry filter I think, the reason is that:

For example, if our filter's condition is that :

 if region==eu
    return reject
else
   return accept

so the expected result is that msg1 will be rejected and msg2 will be accepted. However because msg1 and msg2 are in the same entry, so the entry filter will not work well. In other words, entry filter doesn't support batched message now.

@asafm
Copy link
Contributor

asafm commented Jul 26, 2022

Let me see if I understand correctly.

Today, when an entry filter receives an entry, it gets an Entry that has:

public interface Entry {

    /**
     * @return the data
     */
    byte[] getData();

    byte[] getDataAndRelease();

    /**
     * @return the entry length in bytes
     */
    int getLength();

    /**
     * @return the data buffer for the entry
     */
    ByteBuf getDataBuffer();

    /**
     * @return the position at which the entry was stored
     */
    Position getPosition();

    /**
     * @return ledgerId of the position
     */
    long getLedgerId();

    /**
     * @return entryId of the position
     */
    long getEntryId();

    /**
     * Release the resources (data) allocated for this entry and recycle if all the resources are deallocated (ref-count
     * of data reached to 0).
     */
    boolean release();
}

The Entry interface doesn't let you know if this is Batched Entry.

You also get FilterContext:

@Data
public class FilterContext {
    private Subscription subscription;
    private MessageMetadata msgMetadata;
    private Consumer consumer;

and in MessageMetadata, you have

    // differentiate single and batch message metadata
    optional int32 num_messages_in_batch = 11 [default = 1];

Which enables you to know this entry is batched.

The developer can determine what class would deserialize the entry byte array into a list of separate messages.

So currently, given the entry is batched, the filter author can act on it only by paying the cost of deserializing it, right?

You're saying we can alter the clients (all clients) to extract specific properties from each message and place those properties values in the message metadata of the Batched Entry. The filter can then use the values to decide if to reject/accept.
The problem is that if you have different values for a given property for each message in the batch, then the filter author can't provide a reject or accept for this entry since some messages are rejected, and some are accepted.

So the only solution offered in this suggestion is to change the way messages are batched and collect the records into a batch only if they have the same values for the properties configured to be extracted.

If this is ok and correct, I have notes on it:

  1. All I wrote above is not clearly stated in the PIP. IMO the PIP needs to be modified to reflect that explanation.
  2. As I wrote before, you are changing the core batching behavior of the client - the user needs to be fully aware of this and its implications. The user configures batchedFilterProperties without understanding the consequences of the altering of the batching behavior. One option may be to rename batchedFilterProperties to batchGroupByProperties so they will know the batching behavior is changing. I wouldn't specify the term filter here since the filter at this stage has no direct link.
  3. Don't you need to introduce new knobs to control the memory? Up until now, you collected records into a batch and sent them. Now you collect into multiple batches until a certain threshold - won't this consume more memory? How can I control this as a user?

@AnonHxy
Copy link
Contributor Author

AnonHxy commented Jul 27, 2022

Thank you for your patience @asafm . Yes, what you described above is correctly.

All I wrote above is not clearly stated in the PIP. IMO the PIP needs to be modified to reflect that explanation.

I will give a more detailed explanation in the PIP

One option may be to rename batchedFilterProperties to batchGroupByProperties so they will know the batching behavior is changing

I agree with you. batchGroupByProperties is better, which indicates batching behavior.

Don't you need to introduce new knobs to control the memory?

I'm not sure if I understand correctly. A batch with same properties will be published immediately if it reach the threshold. The only effect is that the batch size maybe smaller. So I think there is no need to take care of controling the memory in this PIP or introduce new knobs. Because the groupBy behavior will be done in the BatchMessageContainerImpl, which will control the memory.

For details:
We can add a method named hasSameProperties in BatchMessageContainerImpl, which will looks like the exsited method BatchMessageContainerImpl#hasSameSchema:

 public boolean hasSameProperties(MessageImpl<?> msg) {
        if (numMessagesInBatch == 0) {
            return true;
        }
        if (!messageMetadata.getPropertiesList().isEmpty()) {
            return getProperties(msg).isEmpty();
        }
        return getProperties(msg).equals(messageMetadata.getPropertiesList());
    }

And hasSameProperties will be invoked when producer try to add messages in the BatchMessageContainerImpl:

private boolean canAddToCurrentBatch(MessageImpl<?> msg) {
        return batchMessageContainer.haveEnoughSpace(msg)      // memory control is here
               && (!isMultiSchemaEnabled(false) || batchMessageContainer.hasSameSchema(msg))
                && batchMessageContainer.hasSameProperties(msg)     //  invoke it here
                && batchMessageContainer.hasSameTxn(msg);
    }

As we see above the batchMessageContainer has already checked the space.

@asafm
Copy link
Contributor

asafm commented Jul 31, 2022

Ok, regarding the memory I see that canEnqueueRequest makes sure we're not breaching memory limits for the client prior to insertion.

Regarding the actual implementation, I'm a bit puzzled. Why re-use BatchMessageContainerImpl which was geared towards a single batch and make it multi batch? Why not do like the BatchMessageKeyBasedContainer and create your own? You wrote that it will send messages out of order, but once you batch them in separate batches concurrently , this by design will make them out of order no?

@AnonHxy
Copy link
Contributor Author

AnonHxy commented Jul 31, 2022

Why not do like the BatchMessageKeyBasedContainer and create your own

  • I have considered this solution before. BatchMessageKeyBasedContainer doesn't keep message's order because it is just used for key_shared subscription mode. And the document about BatchMessageKeyBasedContainer describes it's batch action and we can see that it sends batched message out of the incoming messages order:

    /**
    * Key based batch message container.
    *
    * incoming single messages:
    * (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
    *
    * batched into multiple batch messages:
    * [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]
    */
    class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer {

  • If we implement a BatchMessagePropertiesBasedContainer like BatchMessageKeyBasedContainer, it has the same problem I think. For example:

 * incoming single messages:
 * (<version:1>, v1), (<region:2>, v1), (<region:3> v1), (<region:1>, v2), (<region:2>, v2), (<region:3>, v2), (<region:1>, v3), (<region:2>, v3), (<region:3>, v3)
 *
 * batched into multiple batch messages:
 * [(<version:1>, v1), (<version:1>, v2), (<version:1>, v3)], [(<version:2>, v1), (<version:2>, v2), (<version:2>, v3)], [(<version:3>, v1), (<version:3>, v2), (<version:3>, v3)]

@asafm
Copy link
Contributor

asafm commented Aug 1, 2022

Can you explain how in your proposal, you keep the exact ordering of the messages as they were sent to the producer while separating them into different batches based on properties?

@AnonHxy
Copy link
Contributor Author

AnonHxy commented Aug 1, 2022

Can you explain how in your proposal, you keep the exact ordering of the messages as they were sent to the producer while separating them into different batches based on properties?

Sure. There is no difference with normal batched sending, except we have to check hasSameProperties in ProducerImpl#canAddToCurrentBatch. I think the following legend can help explain this more clearly

image

Let me know if it is not clear enough or this doesn't explain the question correctly:) @asafm

@asafm
Copy link
Contributor

asafm commented Aug 1, 2022

I'm genuinely asking again, as I still don't understand.

Say I send M1(region=eu) to the producer.
It creates a batch and adds M1 to it.

Now I send M2(region=us) to the producer.
It checks current batch 1st record properties are different from M2 properties; thus, it goes left (No) on your diagram. This part I don't understand.
I expected it at this stage to create a batch for region=us and add M2 to it.
But I see written in your diagram that you "batched msgs in msgsContainer" (which container? for which properties? Which msgs are you referring to?) and you "and send them" (why? what is the trigger to send them? Size/time?) and then you clear and add M2 to the batch.

I reasoned that you will a batch container per properties values set, so if I also send M3(region=us) and M4(region=eu), I will get:
Batch 1 (region=eu): M1, M4
Batch 2 (region=us): M2, M3

When either batch reaches the trigger point (size/time), they will be sent, no?

So once I understand the diagram, I might be able to figure out how it maintains order.

In general, once understood, I believe that these question marks need to be explained in the PIP.

@AnonHxy
Copy link
Contributor Author

AnonHxy commented Aug 2, 2022

I expected it at this stage to create a batch for region=us and add M2 to it.
But I see written in your diagram that you "batched msgs in msgsContainer" (which container? for which properties? Which msgs are you referring to?) and you "and send them" (why? what is the trigger to send them? Size/time?) and then you clear and add M2 to the batch.

Let me try to explain the current batch process first. @asafm

msgsContainer in the above diagram refers to the ProducerImpl#batchMessageContainer, which has a list named BatchMessageContainerImpl#messages. And the msgList in the diagram refers to the BatchMessageContainerImpl#messages.

Also the msgsContainer has a BatchMessageContainerImpl#messageMetadata, which will save the first messages metadata in msgList. Note that the properties field of BatchMessageContainerImpl#messageMetada is always empty currently.

A ProducerImpl instance has only one msgsContainer. When a producer publishs messages with batch enable, the messages will first be put into the msgList. When the msgsContainer doesn't has enougth space(size or number of messages threshold) or flush task scheduled (time threshold), the the msgsContainer will be triggered to batch the messages in msgList and send them. And then the msgList will be cleared to prepare the next batch.

The "size or number of messages threshold" check action happens at ProducerImpl#canAddToCurrentBatch method, before put message into the msgList. If this check result is false, the msgsContainer batch action will be triggered. See line636 and line668

if (canAddToCurrentBatch(msg)) {
// should trigger complete the batch message, new message will add to a new batch and new batch
// sequence id use the new message, so that broker can handle the message duplication
if (sequenceId <= lastSequenceIdPushed) {
isLastSequenceIdPotentialDuplicated = true;
if (sequenceId <= lastSequenceIdPublished) {
log.warn("Message with sequence id {} is definitely a duplicate", sequenceId);
} else {
log.info("Message with sequence id {} might be a duplicate but cannot be determined at this"
+ " time.", sequenceId);
}
doBatchSendAndAdd(msg, callback, payload);
} else {
// Should flush the last potential duplicated since can't combine potential duplicated messages
// and non-duplicated messages into a batch.
if (isLastSequenceIdPotentialDuplicated) {
doBatchSendAndAdd(msg, callback, payload);
} else {
// handle boundary cases where message being added would exceed
// batch size and/or max message size
boolean isBatchFull = batchMessageContainer.add(msg, callback);
lastSendFuture = callback.getFuture();
payload.release();
if (isBatchFull) {
batchMessageAndSend(false);
} else {
maybeScheduleBatchFlushTask();
}
}
isLastSequenceIdPotentialDuplicated = false;
}
} else {
doBatchSendAndAdd(msg, callback, payload);
}

All above is current batch sending process.

The key of keeping order is that, there will be only one batch exist at the same time, and the batch will be send to channel in a single thread pool. The single thread pool is the SingleThreadEventExecutor attach to the eventloop: line2154

final ClientCnx cnx = getCnxIfReady();
if (cnx != null) {
if (op.msg != null && op.msg.getSchemaState() == None) {
tryRegisterSchema(cnx, op.msg, op.callback, this.connectionHandler.getEpoch());
return;
}
// If we do have a connection, the message is sent immediately, otherwise we'll try again once a new
// connection is established
op.cmd.retain();
cnx.ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, cnx, op));
stats.updateNumMsgsSent(op.numMessagesInBatch, op.batchSizeByte);

What I will do in this proposal just is:

  • Extract the first message properties in the batch and fill into the BatchMessageContainerImpl#messageMetada
  • Check if the sending message has same properties with the properties in BatchMessageContainerImpl#messageMetada additionally in ProducerImpl#canAddToCurrentBatch method.

So regarding the case you gave above, the process will be:

  • Sending M1(region=eu):
    - ProducerImpl#canAddToCurrentBatch return true
    - Add M1 to msgList, and fill (region=eu): to BatchMessageContainerImpl#messageMetada.
  • Sending M2(region=us)
    • ProducerImpl#canAddToCurrentBatch return false. Because it has different properties with BatchMessageContainerImpl#messageMetada.
    • This will trigger doBatchSendAndAdd
    • The msgContainer will batch M1(msgList only has M1) and send it, then clear the msgList to prepare next batch.
    • Add M2(region=us) to msgList, and fill (region=us): to BatchMessageContainerImpl#messageMetada.
  • Sending .....

@asafm
Copy link
Contributor

asafm commented Aug 2, 2022

Excellent explanation @AnonHxy - I finally understand the whole mechanism.

So in effect, you're adding another trigger condition to send the batch, on top of the current max count, max size, and max delay: Once a message is requested to be added to a batch of its properties (as defined in the configuration) values are different from the records in the batch (i.e. 1st record properties values) than you trigger the batch flush (i.e send and clear).

So the side effect of this behavior is that you can easily end up with tiny batches, perhaps even 1 record per batch. There is a good chance once they turn this feature on, they will lose all performance benefits of batching since the batches will be very small. It completely depends on the distribution of values. It might be a big trade-off you're asking from the user: You might trade off the performance of write and perhaps read, for getting the ability to have the server-side filter work for batches.

  1. I would for sure document that trade-off very clearly in the PIP and in the configuration page of the producer.

  2. I would rephrase the explanation in this PIP to document the behavior:

    So the only solution is to change the way messages are batched and collect the records into a batch only if they have the same values for the properties configured to be extracted

    -->

    "So the only solution is to change the way messages are batched and collect the records into a batch only if they have the same values for the properties configured to be extracted. If a message is added to the producer and the properties are not the same as the batched records, it will trigger a send of this batch and start a new batch with that message. "

  3. You need to emphasize throughout the document the trigger condition for sending once a message with different properties is added.

  4. Now that I understand, the name I suggested does not fit since batchGroupByProperties makes you think you are grouping records in several batches by those properties, which you don't since you have only 1 in-flight batch. Maybe restrictSameValuesInBatchProperties ?

@AnonHxy
Copy link
Contributor Author

AnonHxy commented Aug 3, 2022

I would for sure document that trade-off very clearly in the PIP and in the configuration page of the producer.

OK. Thanks for your suggestions @asafm . I have update the document and add the "Trade-off" part. And I also clarify in this part that this proposal gives a big chance that batched message can also using entry filter, especially when you have konw the distrbution of values. It will bring benfit because entry filter dosen't support batch message currently.

@codelipenghui codelipenghui added this to the 2.12.0 milestone Aug 11, 2022
@github-actions
Copy link

The issue had no activity for 30 days, mark with Stale label.

@github-actions github-actions bot added the Stale label Sep 11, 2022
@asafm
Copy link
Contributor

asafm commented Oct 3, 2022

What ended up with this feature?

@AnonHxy
Copy link
Contributor Author

AnonHxy commented Oct 3, 2022

What ended up with this feature?

There was a VOTE[1] about this PIP and end up with 1 bingding -1 and 1 non-bing +1. So maybe this feature need more discussion or be canceled @asafm

@github-actions github-actions bot removed the Stale label Oct 4, 2022
@github-actions
Copy link

github-actions bot commented Nov 3, 2022

The issue had no activity for 30 days, mark with Stale label.

@github-actions github-actions bot added the Stale label Nov 3, 2022
@RobertIndie RobertIndie removed this from the 3.0.0 milestone Apr 11, 2023
@RobertIndie RobertIndie added this to the 3.1.0 milestone Apr 11, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants