Skip to content

[Bug] Segmentation fault on acknowledgeCumulativeAsync called in the listener #516

@BewareMyPower

Description

@BewareMyPower

Search before asking

  • I searched in the issues and found nothing similar.

Version

Minimal reproduce step

Inside a ubuntu:22.04 container, apply the patch on apache/pulsar-client-node@0d6fb72

diff --git a/pulsar-client-cpp.txt b/pulsar-client-cpp.txt
index 40630d9..cb827f7 100644
--- a/pulsar-client-cpp.txt
+++ b/pulsar-client-cpp.txt
@@ -1,2 +1,2 @@
-CPP_CLIENT_BASE_URL=https://archive.apache.org/dist/pulsar/pulsar-client-cpp-3.7.0
-CPP_CLIENT_VERSION=3.7.0
+CPP_CLIENT_BASE_URL=https://dist.apache.org/repos/dist/dev/pulsar/pulsar-client-cpp/pulsar-client-cpp-3.8.0-candidate-3
+CPP_CLIENT_VERSION=3.8.0
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index ce3693f..0b936d8 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -23,10 +23,7 @@ const Pulsar = require('../index');
 (() => {
   describe('End To End', () => {
     test.each([
-      { serviceUrl: 'pulsar://localhost:6650', listenerName: undefined },
-      { serviceUrl: 'pulsar+ssl://localhost:6651', listenerName: 'localhost6651' },
-      { serviceUrl: 'http://localhost:8080', listenerName: undefined },
-      { serviceUrl: 'https://localhost:8443', listenerName: 'localhost8443' },
+      { serviceUrl: 'pulsar://host.docker.internal:6650', listenerName: undefined },
     ])('Produce/Consume to $serviceUrl', async ({ serviceUrl, listenerName }) => {
       const client = new Pulsar.Client({
         serviceUrl,
@@ -77,7 +74,7 @@ const Pulsar = require('../index');

     test('negativeAcknowledge', async () => {
       const client = new Pulsar.Client({
-        serviceUrl: 'pulsar://localhost:6650',
+        serviceUrl: 'pulsar://host.docker.internal:6650',
         operationTimeoutSeconds: 30,
       });

@@ -126,7 +123,7 @@ const Pulsar = require('../index');

     test('getRedeliveryCount', async () => {
       const client = new Pulsar.Client({
-        serviceUrl: 'pulsar://localhost:6650',
+        serviceUrl: 'pulsar://host.docker.internal:6650',
         operationTimeoutSeconds: 30,
       });

@@ -171,7 +168,7 @@ const Pulsar = require('../index');

     test('Produce/Consume Listener', async () => {
       const client = new Pulsar.Client({
-        serviceUrl: 'pulsar://localhost:6650',
+        serviceUrl: 'pulsar://host.docker.internal:6650',
         operationTimeoutSeconds: 30,
       });

@@ -223,7 +220,7 @@ const Pulsar = require('../index');

root@17197a9b65d7:~/pulsar-client-node# git diff --cached > 1.diff
root@17197a9b65d7:~/pulsar-client-node# cat 1.diff
diff --git a/pulsar-client-cpp.txt b/pulsar-client-cpp.txt
index 40630d9..cb827f7 100644
--- a/pulsar-client-cpp.txt
+++ b/pulsar-client-cpp.txt
@@ -1,2 +1,2 @@
-CPP_CLIENT_BASE_URL=https://archive.apache.org/dist/pulsar/pulsar-client-cpp-3.7.0
-CPP_CLIENT_VERSION=3.7.0
+CPP_CLIENT_BASE_URL=https://dist.apache.org/repos/dist/dev/pulsar/pulsar-client-cpp/pulsar-client-cpp-3.8.0-candidate-3
+CPP_CLIENT_VERSION=3.8.0
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index ce3693f..0b936d8 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -23,10 +23,7 @@ const Pulsar = require('../index');
 (() => {
   describe('End To End', () => {
     test.each([
-      { serviceUrl: 'pulsar://localhost:6650', listenerName: undefined },
-      { serviceUrl: 'pulsar+ssl://localhost:6651', listenerName: 'localhost6651' },
-      { serviceUrl: 'http://localhost:8080', listenerName: undefined },
-      { serviceUrl: 'https://localhost:8443', listenerName: 'localhost8443' },
+      { serviceUrl: 'pulsar://host.docker.internal:6650', listenerName: undefined },
     ])('Produce/Consume to $serviceUrl', async ({ serviceUrl, listenerName }) => {
       const client = new Pulsar.Client({
         serviceUrl,
@@ -77,7 +74,7 @@ const Pulsar = require('../index');

     test('negativeAcknowledge', async () => {
       const client = new Pulsar.Client({
-        serviceUrl: 'pulsar://localhost:6650',
+        serviceUrl: 'pulsar://host.docker.internal:6650',
         operationTimeoutSeconds: 30,
       });

@@ -126,7 +123,7 @@ const Pulsar = require('../index');

     test('getRedeliveryCount', async () => {
       const client = new Pulsar.Client({
-        serviceUrl: 'pulsar://localhost:6650',
+        serviceUrl: 'pulsar://host.docker.internal:6650',
         operationTimeoutSeconds: 30,
       });

@@ -171,7 +168,7 @@ const Pulsar = require('../index');

     test('Produce/Consume Listener', async () => {
       const client = new Pulsar.Client({
-        serviceUrl: 'pulsar://localhost:6650',
+        serviceUrl: 'pulsar://host.docker.internal:6650',
         operationTimeoutSeconds: 30,
       });

@@ -223,7 +220,7 @@ const Pulsar = require('../index');

     test('Produce/Read Listener', async () => {
       const client = new Pulsar.Client({
-        serviceUrl: 'pulsar://localhost:6650',
+        serviceUrl: 'pulsar://host.docker.internal:6650',
         operationTimeoutSeconds: 30,
       });

@@ -273,7 +270,7 @@ const Pulsar = require('../index');

     test('Share consumers with message listener', async () => {
       const client = new Pulsar.Client({
-        serviceUrl: 'pulsar://localhost:6650',
+        serviceUrl: 'pulsar://host.docker.internal:6650',
         operationTimeoutSeconds: 30,
       });

@@ -336,7 +333,7 @@ const Pulsar = require('../index');

     test('Share readers with message listener', async () => {
       const client = new Pulsar.Client({
-        serviceUrl: 'pulsar://localhost:6650',
+        serviceUrl: 'pulsar://host.docker.internal:6650',
         operationTimeoutSeconds: 30,
       });

@@ -390,7 +387,7 @@ const Pulsar = require('../index');

     test('Message Listener error handling', async () => {
       const client = new Pulsar.Client({
-        serviceUrl: 'pulsar://localhost:6650',
+        serviceUrl: 'pulsar://host.docker.internal:6650',
       });
       let syncFinsh;
       const syncPromise = new Promise((resolve) => {
@@ -450,7 +447,7 @@ const Pulsar = require('../index');

     test('acknowledgeCumulative', async () => {
       const client = new Pulsar.Client({
-        serviceUrl: 'pulsar://localhost:6650',
+        serviceUrl: 'pulsar://host.docker.internal:6650',
         operationTimeoutSeconds: 30,
       });

@@ -497,7 +494,7 @@ const Pulsar = require('../index');

     test('subscriptionInitialPosition', async () => {
       const client = new Pulsar.Client({
-        serviceUrl: 'pulsar://localhost:6650',
+        serviceUrl: 'pulsar://host.docker.internal:6650',
         operationTimeoutSeconds: 30,
       });

@@ -566,7 +563,7 @@ const Pulsar = require('../index');

     test('Produce/Read', async () => {
       const client = new Pulsar.Client({
-        serviceUrl: 'pulsar://localhost:6650',
+        serviceUrl: 'pulsar://host.docker.internal:6650',
         operationTimeoutSeconds: 30,
       });
       expect(client).not.toBeNull();
@@ -613,7 +610,7 @@ const Pulsar = require('../index');

     test('Produce-Delayed/Consume', async () => {
       const client = new Pulsar.Client({
-        serviceUrl: 'pulsar://localhost:6650',
+        serviceUrl: 'pulsar://host.docker.internal:6650',
         operationTimeoutSeconds: 30,
       });
       expect(client).not.toBeNull();
@@ -669,7 +666,7 @@ const Pulsar = require('../index');

     test('Produce/Consume/Unsubscribe', async () => {
       const client = new Pulsar.Client({
-        serviceUrl: 'pulsar://localhost:6650',
+        serviceUrl: 'pulsar://host.docker.internal:6650',
         operationTimeoutSeconds: 30,
       });

@@ -732,7 +729,7 @@ const Pulsar = require('../index');

     test('Produce/Read (Compression)', async () => {
       const client = new Pulsar.Client({
-        serviceUrl: 'pulsar://localhost:6650',
+        serviceUrl: 'pulsar://host.docker.internal:6650',
         operationTimeoutSeconds: 30,
       });
       expect(client).not.toBeNull();
@@ -780,7 +777,7 @@ const Pulsar = require('../index');

     test('Produce/Consume-Pattern', async () => {
       const client = new Pulsar.Client({
-        serviceUrl: 'pulsar://localhost:6650',
+        serviceUrl: 'pulsar://host.docker.internal:6650',
         operationTimeoutSeconds: 30,
       });
       expect(client).not.toBeNull();
@@ -842,7 +839,7 @@ const Pulsar = require('../index');

     test('Produce/Consume-Multi-Topic', async () => {
       const client = new Pulsar.Client({
-        serviceUrl: 'pulsar://localhost:6650',
+        serviceUrl: 'pulsar://host.docker.internal:6650',
         operationTimeoutSeconds: 30,
       });
       expect(client).not.toBeNull();
@@ -903,7 +900,7 @@ const Pulsar = require('../index');

     test('Produce/Consume and validate MessageId', async () => {
       const client = new Pulsar.Client({
-        serviceUrl: 'pulsar://localhost:6650',
+        serviceUrl: 'pulsar://host.docker.internal:6650',
         operationTimeoutSeconds: 30,
       });

@@ -949,7 +946,7 @@ const Pulsar = require('../index');
     });
     test('Basic produce and consume encryption', async () => {
       const client = new Pulsar.Client({
-        serviceUrl: 'pulsar://localhost:6650',
+        serviceUrl: 'pulsar://host.docker.internal:6650',
         operationTimeoutSeconds: 30,
       });

@@ -993,7 +990,7 @@ const Pulsar = require('../index');
     });
     test('Basic produce and read encryption', async () => {
       const client = new Pulsar.Client({
-        serviceUrl: 'pulsar://localhost:6650',
+        serviceUrl: 'pulsar://host.docker.internal:6650',
         operationTimeoutSeconds: 30,
       });

@@ -1034,7 +1031,7 @@ const Pulsar = require('../index');
     });
     test('Produce/Consume/Read/IsConnected', async () => {
       const client = new Pulsar.Client({
-        serviceUrl: 'pulsar://localhost:6650',
+        serviceUrl: 'pulsar://host.docker.internal:6650',
         operationTimeoutSeconds: 30,
       });

@@ -1076,7 +1073,7 @@ const Pulsar = require('../index');

     test('Consumer seek by message Id', async () => {
       const client = new Pulsar.Client({
-        serviceUrl: 'pulsar://localhost:6650',
+        serviceUrl: 'pulsar://host.docker.internal:6650',
         operationTimeoutSeconds: 30,
       });

@@ -1116,7 +1113,7 @@ const Pulsar = require('../index');

     test('Consumer seek by timestamp', async () => {
       const client = new Pulsar.Client({
-        serviceUrl: 'pulsar://localhost:6650',
+        serviceUrl: 'pulsar://host.docker.internal:6650',
         operationTimeoutSeconds: 30,
       });

@@ -1164,7 +1161,7 @@ const Pulsar = require('../index');

     test('Reader seek by message Id', async () => {
       const client = new Pulsar.Client({
-        serviceUrl: 'pulsar://localhost:6650',
+        serviceUrl: 'pulsar://host.docker.internal:6650',
         operationTimeoutSeconds: 30,
       });

@@ -1205,7 +1202,7 @@ const Pulsar = require('../index');

     test('Reader seek by timestamp', async () => {
       const client = new Pulsar.Client({
-        serviceUrl: 'pulsar://localhost:6650',
+        serviceUrl: 'pulsar://host.docker.internal:6650',
         operationTimeoutSeconds: 30,
       });

@@ -1255,7 +1252,7 @@ const Pulsar = require('../index');

     test('Message chunking', async () => {
       const client = new Pulsar.Client({
-        serviceUrl: 'pulsar://localhost:6650',
+        serviceUrl: 'pulsar://host.docker.internal:6650',
         operationTimeoutSeconds: 30,
       });

@@ -1286,49 +1283,6 @@ const Pulsar = require('../index');
       await client.close();
     });

-    test('AuthenticationToken token supplier', async () => {
-      const mockTokenSupplier = jest.fn().mockReturnValue('token');
-      const auth = new Pulsar.AuthenticationToken({
-        token: mockTokenSupplier,
-      });
-      const client = new Pulsar.Client({
-        serviceUrl: 'pulsar://localhost:6650',
-        authentication: auth,
-      });
-
-      // A producer/consumer is needed to triger the callback function
-      const topic = 'persistent://public/default/token-auth';
-      const producer = await client.createProducer({
-        topic,
-      });
-      expect(producer).not.toBeNull();
-      expect(mockTokenSupplier).toHaveBeenCalledTimes(1);
-
-      await producer.close();
-      await client.close();
-    });
-
-    test('AuthenticationToken async token supplier', async () => {
-      const mockTokenSupplier = jest.fn().mockResolvedValue('token');
-      const auth = new Pulsar.AuthenticationToken({
-        token: mockTokenSupplier,
-      });
-      const client = new Pulsar.Client({
-        serviceUrl: 'pulsar://localhost:6650',
-        authentication: auth,
-      });
-
-      // A producer/consumer is needed to triger the callback function
-      const topic = 'persistent://public/default/token-auth';
-      const producer = await client.createProducer({
-        topic,
-      });
-      expect(producer).not.toBeNull();
-      expect(mockTokenSupplier).toHaveBeenCalledTimes(1);
-
-      await producer.close();
-      await client.close();
-    });
   });
   describe('KeyBasedBatchingTest', () => {
     let client;
@@ -1338,7 +1292,7 @@ const Pulsar = require('../index');

     beforeAll(async () => {
       client = new Pulsar.Client({
-        serviceUrl: 'pulsar://localhost:6650',
+        serviceUrl: 'pulsar://host.docker.internal:6650',
       });
     });

Run Pulsar 4.1.1 standalone on the host machine (macOS 15), then run the following command from the container:

ulimit -c unlimited
apt-get install -y xz-utils python3 make
./pkg/linux/download-cpp-client.sh
npm run test -- end_to_end.test

What did you expect to see?

The test passes

What did you see instead?

The test crashed with core generated.

# which node
/root/.nvm/versions/node/v18.20.8/bin/node
root@17197a9b65d7:~/pulsar-client-node# gdb $(which node) core
GNU gdb (Ubuntu 12.1-0ubuntu1~22.04.2) 12.1
...
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib/aarch64-linux-gnu/libthread_db.so.1".
Core was generated by `node /root/pulsar-client-node/node_modules/.bin/jest --verbose --detectOpenHand'.
Program terminated with signal SIGSEGV, Segmentation fault.
#0  0x0000ffffa6bc4d1c in pulsar::ConsumerImpl::acknowledgeCumulativeAsync(pulsar::MessageId const&, std::function<void (pulsar::Result)> const&) () from /root/pulsar-client-node/lib/binding/pulsar.node
[Current thread is 1 (Thread 0xffff9e00f040 (LWP 9194))]
(gdb) bt
#0  0x0000ffffa6bc4d1c in pulsar::ConsumerImpl::acknowledgeCumulativeAsync(pulsar::MessageId const&, std::function<void (pulsar::Result)> const&) () from /root/pulsar-client-node/lib/binding/pulsar.node
#1  0x0000ffffa6bc8b30 in pulsar::ConsumerImpl::internalListener() () from /root/pulsar-client-node/lib/binding/pulsar.node
#2  0x0000ffffa6bfb824 in asio::detail::executor_op<asio::detail::binder0<std::function<void ()> >, std::allocator<void>, asio::detail::scheduler_operation>::do_complete(void*, asio::detail::scheduler_operation*, std::error_code const&, unsigned long) () from /root/pulsar-client-node/lib/binding/pulsar.node
#3  0x0000ffffa6bf11d8 in asio::detail::scheduler::run(std::error_code&) [clone .isra.0] () from /root/pulsar-client-node/lib/binding/pulsar.node
#4  0x0000ffffa6bf1654 in pulsar::ExecutorService::start()::{lambda()#1}::operator()() const () from /root/pulsar-client-node/lib/binding/pulsar.node

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions