Skip to content

[feat] Support consumer batch receive on C client.#230

Closed
shibd wants to merge 2 commits intoapache:mainfrom
shibd:c_batch_receive
Closed

[feat] Support consumer batch receive on C client.#230
shibd wants to merge 2 commits intoapache:mainfrom
shibd:c_batch_receive

Conversation

@shibd
Copy link
Member

@shibd shibd commented Mar 24, 2023

Motivation

Support consumer batch receive on C client.

Modifications

  • Add messages on C.
  • Support batch receive and async batch receive API.

Verifying this change

  • Add ConsumerTest.testCBatchReceive to cover it.

Documentation

  • doc-required
    (Your PR needs to update docs and you will update later)

  • doc-not-needed
    (Please explain why)

  • doc
    (Your PR contains doc changes)

  • doc-complete
    (Docs have been already added)

@shibd shibd added the enhancement New feature or request label Mar 24, 2023
@shibd shibd added this to the 3.2.0 milestone Mar 24, 2023
@shibd shibd self-assigned this Mar 24, 2023
@shibd shibd force-pushed the c_batch_receive branch from 38dbfdd to 3bcc8c3 Compare March 25, 2023 07:48
@shibd shibd requested a review from BewareMyPower April 9, 2023 11:50
extern "C" {
#endif

#include <pulsar/defines.h>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#include <pulsar/defines.h>
#include <pulsar/defines.h>
#include "message.h"

Include message.h to have pulsar_message_t's definition.

image

Comment on lines +56 to +57
pulsar_messages_t *msgs = pulsar_messages_create();
pulsar_result res = pulsar_consumer_batch_receive(consumer, &msgs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need the pulsar_messages_create function for users. Unlike pulsar_message_t, which should be created by users when sending messages, pulsar_messages_t should only be created by pulsar_consumer_batch_receive.

Suggested change
pulsar_messages_t *msgs = pulsar_messages_create();
pulsar_result res = pulsar_consumer_batch_receive(consumer, &msgs);
pulsar_messages_t *msgs = NULL;
pulsar_result res = pulsar_consumer_batch_receive(consumer, &msgs);
ASSERT_TRUE(msgs != NULL);

When res is pulsar_result_OK, it's guaranteed that msgs points to an object allocated by the library and should be deallocated by pulsar_messages_free.

With the current implementation, there is a memory leak that the instance created by pulsar_messages_create won't be released.

$ valgrind --leak-check=full ./tests/pulsar-tests --gtest_filter='C_ConsumerConfigurationTest.*'
...
==5249== LEAK SUMMARY:
==5249==    definitely lost: 360 bytes in 12 blocks
==5249==    indirectly lost: 12,336 bytes in 87 blocks
==5249==      possibly lost: 0 bytes in 0 blocks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even with my suggested change, there is still a memory leak. The other methods might also have potential memory leaks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When res is pulsar_result_OK, it's guaranteed that msgs points to an object allocated by the library and should be deallocated by pulsar_messages_free.

With the current implementation, the user needs to manually free the pulsar_messages_t, is there anything wrong with this?

    pulsar_messages_t *msgs = pulsar_messages_create();
    pulsar_result res = pulsar_consumer_batch_receive(consumer, &msgs);
    // do something.
    pulsar_messages_free(msgs);

With the current implementation, there is a memory leak that the instance created by pulsar_messages_create won't be released.

You seem to be executing C_ConsumerConfigurationTest, in this test, the configured memory is not freed, I added it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    // 1. You allocated a `pulsar_messages_t` object via `pulsar_messages_created()` and
    //   let `msgs` points to this piece of memory
    pulsar_messages_t *msgs = pulsar_messages_create();
    // 2. After `pulsar_consumer_batch_receive`, `msgs` points to another piece of memory that
    //   is allocated inside `pulsar_consumer_batch_receive`.
    pulsar_result res = pulsar_consumer_batch_receive(consumer, &msgs);
    // 3. The memory allocated inside `pulsar_consumer_batch_receive` is deallocated.
    pulsar_messages_free(msgs);
    // NOTE: Now, the memory allocated by `pulsar_messages_created()` could never be deallocated!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You seem to be executing C_ConsumerConfigurationTest,

Then I find another problem. Your test class in tests/c/c_ConsumerTest.cc is C_ConsumerConfigurationTest, not C_ConsumerTest.

TEST(C_ConsumerConfigurationTest, testCBatchReceive) {

It's better to make class name similar with the file name. BTW, I ran the valgrind check for this single test and the memory leak still exists.

$ valgrind --leak-check=full ./tests/pulsar-tests --gtest_filter='*.testCBatchReceive'
==24889== LEAK SUMMARY:
==24889==    definitely lost: 344 bytes in 11 blocks
==24889==    indirectly lost: 11,568 bytes in 82 blocks
==24889==      possibly lost: 0 bytes in 0 blocks

After I changed the initial value of message to NULL, there were still memory leak:

==25180==    by 0x5A3537: C_ConsumerConfigurationTest_testCBatchReceive_Test::TestBody() (c_ConsumerTest.cc:61)
...
==25180==    definitely lost: 320 bytes in 10 blocks
==25180==    indirectly lost: 11,568 bytes in 82 blocks
==25180==      possibly lost: 0 bytes in 0 blocks
        pulsar_message_t *msg = pulsar_messages_get(msgs, i);

So there is something wrong with the pulsar_messages_get function.

@shibd shibd force-pushed the c_batch_receive branch from f18e797 to 5744611 Compare April 14, 2023 15:32
@BewareMyPower
Copy link
Contributor

BewareMyPower commented Apr 14, 2023

After thinking for a while, I think the APIs should also be designed carefully. And we should fix the memory leak. So I opened another PR (#252) for that.

In my PR, the main differences are:

  • For pulsar_messages_t, only support read-only operations (get(i) and size()). We should not expose methods that could modify it.
  • The pulsar_message_t* returned by pulsar_messages_get should not be freed. It should just be a const reference to a message for users to view the content. Allocating a pulsar_message_t for it brings unnecessary overhead.
  • More detailed API docs to tell users how to use these C APIs.
  • The memory leak is fixed.

Let's move to my PR for further discussion.

@shibd shibd closed this Apr 17, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants