Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Consume failed in two cases : when using public/default username AND when using the "correct" username the tenant/namespace of the topic #1564

Open
ohadomrad opened this issue Nov 10, 2022 · 2 comments
Labels

Comments

@ohadomrad
Copy link

The bug description
While testing KoP as a kafka protocol handler for pulsar, I succeeded in producing messages to a partitioned topic
But failed in consuming the messages.

case 1
According to KoP docs, the username needed to be the tenant/namespace of the topic
But, when I used this username, the consumer keep been restarted, and eventually the session is closed.

case 2
I tried to use the username of public/default and the consumer succeeded to join the group but is not consuming any messages.

Bug conclusions
I assume that there is a problem with the compatiblity to kafka in group coordinator and group.id, over pulsar.

To Reproduce these cases

  1. Building the cluster : 3 brokers, 3 bookies, 3 local zookeepers, and 3 global zookeepers. each instance runs on different machines.

  2. Configuring KoP

    messagingProtocols=kafka
    protocolHandlerDirectory=./protocols
    kafkaListeners=SASL_PLAINTEXT://<broker_hostname>:9092
    saslAllowedMechanisms=PLAIN
    allowAutoTopicCreationType=partitioned

  3. Creating
    tenant, namespace, partitioned topic, subscription, JWT, and giving consume produce roles

  4. Writing the attached code using kafkajs

Expected behavior
Giving the tenant/namespace of the topic as the consumer username and succesfuly connect to the broker, join to the group and start consuming the messages in the topic

Brokers logs in case 1
ERROR io.streamnative.pulsar.handlers.kop.kafka RequestHandler - Caught error in the handler, closing channel
Add any other context about the problem here.

Logs Screenshots

kafkajs logs in case 1
This logs repeating themself
kafkajs logs

kafkajs logs - case 2
case 2 - kafkajs (2)

The code

const {ConfigResourceTypes, Kafka} = require('kafkajs');

MESSAGE_EVERY = 1000;
const MESSAGE_SIZE = 10;
const TOPIC_NAME = 'persistent://ohad-test/kop/kop_test_partition';

// const kopUser = 'public/default';
const kopUser = 'ohad-test/kop';
const kopPassword = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY';

const KOP_CONN = new Kafka({
    brokers: ['broker1:9092', 'broker2:9092', 'broker3:9092'],
    sasl: {
        mechanism: 'PLAIN',
        username: kopUser,
        password: kopPassword
    }
});


const produce = async(producer, topicName, i) => {
    await producer.connect();
    async function prodos() {
        await producer.send({
            topic: topicName,
            messages: [
                {key: null, value: 'a'.repeat(MESSAGE_SIZE)}
            ]
        });
        console.log(`Produce ${topicName} - Hello ${i}`);
        i +=1;
    }
    setInterval(prodos, MESSAGE_EVERY);
}

const consume = async (consumer, topicName) => {
    await consumer.connect();
    await consumer.subscribe({topic: topicName, fromBeginning: true});
    await consumer.run({
        eachMessage: async({topic, partition, message}) => {
            console.log({
                topic,
                value: message.value.toString(),
            });
        },
    });
}

const run = async () => {
    const KOP_PRODUCER = KOP_CONN.producer();
    await produce(KOP_PRODUCER, TOPIC_NAME, 0);

    const KOP_CONSUMER = KOP_CONN.consumer({groupId: kopUser});
    await consume(KOP_CONSUMER, TOPIC_NAME);
}

run();
@BewareMyPower
Copy link
Collaborator

BewareMyPower commented Nov 17, 2022

It works well in my local env without authentication in standalone.

  • Pulsar 2.10.2
  • KoP 2.10.1.12

conf/standalone.conf:

messagingProtocols=kafka
allowAutoTopicCreationType=partitioned
kafkaListeners=PLAINTEXT://127.0.0.1:9092
brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
kafkaTransactionCoordinatorEnabled=true
brokerDeduplicationEnabled=true
entryFormat=kafka

Outputs:

{
  topic: 'persistent://public/default/kop_test_partition',
  value: 'aaaaaaaaaa'
}
{
  topic: 'persistent://public/default/kop_test_partition',
  value: 'aaaaaaaaaa'
}
{
  topic: 'persistent://public/default/kop_test_partition',
  value: 'aaaaaaaaaa'
}
Produce persistent://public/default/kop_test_partition - Hello 3
Produce persistent://public/default/kop_test_partition - Hello 4
Produce persistent://public/default/kop_test_partition - Hello 5
Produce persistent://public/default/kop_test_partition - Hello 6
Produce persistent://public/default/kop_test_partition - Hello 7
{
  topic: 'persistent://public/default/kop_test_partition',
  value: 'aaaaaaaaaa'
}
{
  topic: 'persistent://public/default/kop_test_partition',
  value: 'aaaaaaaaaa'
}

Here are my diff with the code you provided:

5,9c5
< const TOPIC_NAME = 'persistent://ohad-test/kop/kop_test_partition';
<
< // const kopUser = 'public/default';
< const kopUser = 'ohad-test/kop';
< const kopPassword = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY';
---
> const TOPIC_NAME = 'persistent://public/default/kop_test_partition';
12,17c8
<     brokers: ['broker1:9092', 'broker2:9092', 'broker3:9092'],
<     sasl: {
<         mechanism: 'PLAIN',
<         username: kopUser,
<         password: kopPassword
<     }
---
>     brokers: ['localhost:9092']
19a11
> const kopUser = 'ohad-test/kop';

@BewareMyPower
Copy link
Collaborator

Then I've configured the token authentication and the diff became:

5c5
< const TOPIC_NAME = 'persistent://ohad-test/kop/kop_test_partition';
---
> const TOPIC_NAME = 'persistent://public/default/kop_test_partition';
7,9c7,9
< // const kopUser = 'public/default';
< const kopUser = 'ohad-test/kop';
< const kopPassword = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY';
---
> const kopUser = 'public/default';
> //const kopUser = 'ohad-test/kop';
> const kopPassword = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.lGt9MMdVEEAdtOQTfLDtAhBkYuy6kWjTQkCDMB6Aim0';
12c12
<     brokers: ['broker1:9092', 'broker2:9092', 'broker3:9092'],
---
>     brokers: ['localhost:9092'],

It still works.

The additional configs:

authenticationEnabled=true
authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken
brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
brokerClientAuthenticationParameters=token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.lGt9MMdVEEAdtOQTfLDtAhBkYuy6kWjTQkCDMB6Aim0
superUserRoles=test-user
tokenSecretKey=file:///home/xyz/apache-pulsar-2.10.2/my-secret.key
saslAllowedMechanisms=PLAIN

The steps to generate the key:

bin/pulsar tokens create-secret-key --output my-secret.key
bin/pulsar tokens create --secret-key ./my-secret.key --subject test-user

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

2 participants