Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question: How to use consumer / producer in a web application #397

Closed
jfariablosis opened this issue Nov 13, 2020 · 24 comments
Closed

Question: How to use consumer / producer in a web application #397

jfariablosis opened this issue Nov 13, 2020 · 24 comments
Labels

Comments

@jfariablosis
Copy link

Bonjour

Nous sommes du Portugal.
Nous avons un probleme avec la configuration du kafka.

Le message cést:

Symfony\Component\Debug\Exception\FatalThrowableError
Class 'App\Http\Controllers\RdKafka\Conf' not found

Pouvez vous nous aider a resoudre ce problem?

Merci en avance
Joao Faria

@Steveb-p
Copy link
Contributor

You're missing as import:

use RdKafka\Conf;

or global namespace prefix:

$conf = new \RdKafka\Conf();

@jfariablosis
Copy link
Author

jfariablosis commented Nov 13, 2020 via email

@jfariablosis
Copy link
Author

jfariablosis commented Nov 13, 2020

Bonsoir

Nous avont des problemes avec le “consumer”, notament nous avons un probleme quando nous ne produisont un message..

Pouvez vous nous aider?? Nous envoyont un message du “response”

Pardonnez mois le francais

Producer

public function producer()
    {

        $conf = new \RdKafka\Conf();
        $conf->set('metadata.broker.list', '192.168.2.29:9092');

        //If you need to produce exactly once and want to keep the original produce order, uncomment the line below
        //$conf->set('enable.idempotence', 'true');

        $producer = new \RdKafka\Producer($conf);
        $topic = $producer->newTopic("ecco_topic");

        for ($i = 0; $i < 100; $i++) {
            $topic->produce(RD_KAFKA_PARTITION_UA, 0, "MENSAGEM $i");

        }
        $producer->poll(0);

        for ($flushRetries = 0; $flushRetries < 100; $flushRetries++) {
            $result = $producer->flush(10000);
            if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
                break;
            }
        }

        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
            throw new \RuntimeException('Was unable to flush, messages might be lost!');
        }

    }

Consumer

public function consumer()
    {

        $conf = new \RdKafka\Conf();

// Set the group id. This is required when storing offsets on the broker
        $conf->set('group.id', 'myConsumerGroup');

        $rk = new \RdKafka\Consumer($conf);
        $rk->addBrokers("192.168.2.29");

        $topicConf = new \RdKafka\TopicConf();
        $topicConf->set('auto.commit.interval.ms', 100);

// Set the offset store method to 'file'
        $topicConf->set('offset.store.method', 'broker');

// Alternatively, set the offset store method to 'none'
// $topicConf->set('offset.store.method', 'none');

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'earliest': start from the beginning
        $topicConf->set('auto.offset.reset', 'earliest');

        $topic = $rk->newTopic("ecco_topic", $topicConf);

        $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
        // Start consuming partition 0
        while (true) {
            $message = $topic->consume(0, 10000);
            if (null === $message || $message->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
                // Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
                continue;
            } elseif ($message->err) {
                echo $message->errstr(), "\n";
                break;
            } else {
                echo $message->payload, "\n";
            }

        }
    }

Output

MENSAGEM 0 MENSAGEM 1 MENSAGEM 2 MENSAGEM 3 MENSAGEM 4 MENSAGEM 5 MENSAGEM 6 MENSAGEM 7 MENSAGEM 8 MENSAGEM 9 MENSAGEM 10 MENSAGEM 11 MENSAGEM 12 MENSAGEM 13 MENSAGEM 14 MENSAGEM 15 MENSAGEM 16 MENSAGEM 17 MENSAGEM 18 MENSAGEM 19 MENSAGEM 20 MENSAGEM 21 MENSAGEM 22 MENSAGEM 23 MENSAGEM 24 MENSAGEM 25 MENSAGEM 26 MENSAGEM 27 MENSAGEM 28 MENSAGEM 29 MENSAGEM 30 MENSAGEM 31 MENSAGEM 32 MENSAGEM 33 MENSAGEM 34 MENSAGEM 35 MENSAGEM 36 MENSAGEM 37 MENSAGEM 38 MENSAGEM 39 MENSAGEM 40 MENSAGEM 41 MENSAGEM 42 MENSAGEM 43 MENSAGEM 44 MENSAGEM 45 MENSAGEM 46 MENSAGEM 47 MENSAGEM 48 MENSAGEM 49 MENSAGEM 50 MENSAGEM 51 MENSAGEM 52 MENSAGEM 53 MENSAGEM 54 MENSAGEM 55 MENSAGEM 56 MENSAGEM 57 MENSAGEM 58 MENSAGEM 59 MENSAGEM 60 MENSAGEM 61 MENSAGEM 62 MENSAGEM 63 MENSAGEM 64 MENSAGEM 65 MENSAGEM 66 MENSAGEM 67 MENSAGEM 68 MENSAGEM 69 MENSAGEM 70 MENSAGEM 71 MENSAGEM 72 MENSAGEM 73 MENSAGEM 74 MENSAGEM 75 MENSAGEM 76 MENSAGEM 77 MENSAGEM 78 MENSAGEM 79 MENSAGEM 80 MENSAGEM 81 MENSAGEM 82 MENSAGEM 83 MENSAGEM 84 MENSAGEM 85 MENSAGEM 86 MENSAGEM 87 MENSAGEM 88 MENSAGEM 89 MENSAGEM 90 MENSAGEM 91 MENSAGEM 92 MENSAGEM 93 MENSAGEM 94 MENSAGEM 95 MENSAGEM 96 MENSAGEM 97 MENSAGEM 98 MENSAGEM 99 Broker: Unknown member

Merci bien
Joao Faria

@nick-zh
Copy link
Collaborator

nick-zh commented Nov 13, 2020

Dear Joao

Please don't use the low level consumer, it is overly complex and poses unnecessary challenges. I advice you to use the high level consumer ✌️

Cheers

@gordett
Copy link

gordett commented Nov 13, 2020

ok, i will try it

thanks in advance

@gordett
Copy link

gordett commented Nov 13, 2020

$conf = new \RdKafka\Conf();

    $conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
        switch ($err) {
            case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                echo "Assign: ";
                var_dump($partitions);
                $kafka->assign($partitions);
                break;

            case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                echo "Revoke: ";
                var_dump($partitions);
                $kafka->assign(NULL);
                break;

            default:
                throw new \Exception($err);
        }
    });

    $conf->set('group.id', 'myConsumerGroup');

    $conf->set('metadata.broker.list', '192.168.2.29:9092');

    $conf->set('auto.offset.reset', 'earliest');

    $consumer = new \RdKafka\KafkaConsumer($conf);

    $consumer->subscribe(['ecco_topic']);

    echo "Waiting for partition assignment... (make take some time when\n";
    echo "quickly re-joining the group after leaving it.)\n";

    while (true) {
        $message = $consumer->consume(120*1000);
        switch ($message->err) {
            case RD_KAFKA_RESP_ERR_NO_ERROR:
                var_dump($message);
                break;
            case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                echo "No more messages; will wait for more\n";
                break;
            case RD_KAFKA_RESP_ERR__TIMED_OUT:
                echo "Timed out\n";
                break;
            default:
                throw new \Exception($message->errstr(), $message->err);
                break;
        }
    }

With this code, the consumer laravel app dont show anything

thanks for your attention

@nick-zh
Copy link
Collaborator

nick-zh commented Nov 16, 2020

@gordett if you see nothing at all (assign, waiting for assignment, etc.) it might be that you already have a consumer(s) running with the same consumer group. Imho even if you have no messages, after some point you should see No more messages and Timed out

@gordett
Copy link

gordett commented Nov 16, 2020

Hello @nick-zh

Me and @jfariablosis are implementing the above code. Thanks for your attention.

Resume:

public function producer()
    {
        $conf = new \RdKafka\Conf();
        $conf->set('metadata.broker.list', '192.168.2.29:9092');

        $producer = new \RdKafka\Producer($conf);
        $topic = $producer->newTopic("topic_for_show");

        for ($i = 0; $i < 100; $i++) {
            $topic->produce(RD_KAFKA_PARTITION_UA, 0, "MENSAGE $i");
            $producer->poll(0);
        }

        for ($flushRetries = 0; $flushRetries < 100; $flushRetries++) {
            $result = $producer->flush(10000);
            if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
                break;
            }
        }

        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
            throw new \RuntimeException('Was unable to flush, messages might be lost!');
        }
        echo "PUBLISHED";
    }

After run this code, i have the message in my browser.
image

And in my kafka i can see a topic created
image

, when i run this consumer

public function consumer()
    {
        echo "CONSUMER";
        $conf = new \RdKafka\Conf();

        $conf->set('group.id', 'myConsumerGroup');

        $rk = new \RdKafka\Consumer($conf);
        $rk->addBrokers("192.168.2.29");

        $topicConf = new \RdKafka\TopicConf();
        $topicConf->set('auto.commit.interval.ms', 100);
        $topicConf->set('offset.store.method', 'broker');
        $topicConf->set('auto.offset.reset', 'earliest');
        $topic = $rk->newTopic("ecco_topic", $topicConf);
        $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

        while (true) {
            $message = $topic->consume(0, 10000);
            print_r($message);
            if (null === $message || $message->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
                continue;
            } elseif ($message->err) {
                echo $message->errstr(), "\n";
                break;
            } else {
                echo $message->payload, "\n";
            }
        }
    }

i only see the tab browser in loading, but if i remove the while (and continue and break)

i can see this

image

sorry for my big response, but i dont understanding why happens this... and maybe its a little change

Thanks again

@nick-zh
Copy link
Collaborator

nick-zh commented Nov 16, 2020

As stated before, as a first step, please don't use the low level consumer. Also in the example you provided, you are writing to a different topic then you are reading, but your output is for the correct topic again. So the provided example is not consistent.
Also please format the code in the comment properly with three back ticks, i adjusted your comments to be more readable ✌️

@gordett
Copy link

gordett commented Nov 16, 2020

Hi again,

*Sorry for the wrong topic (my bad)

public function producer()
{
    $conf = new \RdKafka\Conf();
    $conf->set('metadata.broker.list', '192.168.2.29:9092');
    $producer = new \RdKafka\Producer($conf);
    $topic = $producer->newTopic("topic_for_show");

    for ($i = 0; $i < 100; $i++) {
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, "MENSAGE $i");
        $producer->poll(0);
    }

    for ($flushRetries = 0; $flushRetries < 100; $flushRetries++) {
        $result = $producer->flush(10000);
        if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
            break;
        }
    }

    if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
        throw new \RuntimeException('Was unable to flush, messages might be lost!');
    }
    echo "PUBLISHED";
}

image

public function consumer()
{
    echo "CONSUMER<br><br>";
    //Configurations
    $conf = new \RdKafka\Conf();
    $conf->set('group.id', 'myConsumerGroup');

    $rk = new \RdKafka\Consumer($conf);
    // my server kafka
    $rk->addBrokers("192.168.2.29");

    // topic config
    $topicConf = new \RdKafka\TopicConf();
    $topicConf->set('auto.commit.interval.ms', 100);
    $topicConf->set('offset.store.method', 'broker');
    $topicConf->set('auto.offset.reset', 'earliest');

    //topic to read
    $topic = $rk->newTopic("topic_for_show", $topicConf);

    $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
    // start a listen consumer
    while (true) {
        $message = $topic->consume(0, 10000);
        if (null === $message || $message->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
            continue;
        } elseif ($message->err) {
            echo $message->errstr(), "\n";
            break;
        } else {  
            print_r($message);
        }
    }
}

with while no show anything.
without while show one message for each refresh page
image

why dont print a message with while cycle?

Thanks for your attention

@nick-zh
Copy link
Collaborator

nick-zh commented Nov 16, 2020

Please use the high level consumer and check if this still happens. Also to be on the safe side, use a new consumer group

@Steveb-p
Copy link
Contributor

@gordett @jfariablosis that's because you're using while(true) 😄

Think about it for a while. You're asking PHP to enter an infinite loop. You're never breaking from it (unless an error occurs).
What you probably want to do is one of:

  1. Break when RD_KAFKA_RESP_ERR__PARTITION_EOF message is received or null (depends on configuration of librdkafka - don't worry about it now).
  2. Disable output buffering, so that PHP actually writes to standard output (ob_flush). Otherwise whatever you echo is kept in buffer and doesn't reach webserver, which times out eventually.
  3. Do not process messages using webserver (!). Usual use case for phprdkafka is for it to be a long running process in the background. While it will work for web requests, usually it's a bad idea.

@nick-zh
Copy link
Collaborator

nick-zh commented Nov 16, 2020

good catch, thx @Steveb-p

@gordett
Copy link

gordett commented Nov 16, 2020

Hi @nick-zh and @Steveb-p, thanks a lot for your attention

1. Break when RD_KAFKA_RESP_ERR__PARTITION_EOF message is received or null (depends on configuration of librdkafka - don't worry about it now).
=> but in my opinion, while its necessary to waiting a new message and do something with message..?

2. Disable output buffering, so that PHP actually writes to standard output (ob_flush). Otherwise whatever you echo is kept in buffer and doesn't reach webserver, which times out eventually.
=> ok, i will not do the "echo"

3. Do not process messages using webserver (!). Usual use case for phprdkafka is for it to be a long running process in the background. While it will work for web requests, usually it's a bad idea.
=> i think this is my real problem.... i dont know what i can do with the consumer.. and where.. or how.. (sorry for my words). what i need is react to some message in some device client like flutter app or web app js file

news...

with high lever consumer i can see a lot of messages. but if i refresh i cant see nothing. i need to change de group.id to see again the messages. Sometimes not work..

image

  public function consumer(){
    $conf = new \RdKafka\Conf();

    $conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
        switch ($err) {
            case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                echo "Assign: ";
                var_dump($partitions);
                $kafka->assign($partitions);
                break;

            case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                echo "Revoke: ";
                var_dump($partitions);
                $kafka->assign(NULL);
                break;

            default:
                throw new \Exception($err);
        }
    });

    $conf->set('group.id', 'myConsumerGroup2');
    $conf->set('metadata.broker.list', '192.168.2.29');
    $conf->set('auto.offset.reset', 'earliest');

    $consumer = new \RdKafka\KafkaConsumer($conf);

    $consumer->subscribe(['topic_for_show']);

    while (true) {
        $message = $consumer->consume(1000);
        print_r($message);
        switch ($message->err) {
            case RD_KAFKA_RESP_ERR_NO_ERROR:
                var_dump($message);
                break;
            case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                echo "No more messages; will wait for more\n";
                break;
            case RD_KAFKA_RESP_ERR__TIMED_OUT:
                echo "Timed out\n";
                break;
            default:
                throw new \Exception($message->errstr(), $message->err);
                break;
        }
    }
}

now that i have some mensages, i have a big question...

i can produce a message with laravel controller, when i make a request to some route
after, i have a native app in flutter that needs to consume the message and do something...
or in a other web app browser, with javascript, react to some new message like pusher

var channel = pusher.subscribe('my-channel'); channel.bind('my-event', function(data) { alert('Received my-event with message: ' + data.message); });

is it possible to do that? do you consider that i am following the right path?

Thanks a lot

@nick-zh
Copy link
Collaborator

nick-zh commented Nov 16, 2020

A consumer (group) will ideally consume the message only once. After successful processing of one (or several messages) it is customary to commit the messages (or have auto commit enabled). Committed messages won't be re-read by the same consumer group. The message(s) are still available (if within retention limits) for other consumer groups to read.
I can't speak about Flutter, but it is certainly possible to consume messages in JavaScript. I don't know too much what you guys are trying to achieve, but sounds ok so far 😄

@gordett
Copy link

gordett commented Nov 16, 2020

A consumer (group) will ideally consume the message only once. After successful processing of one (or several messages) it is customary to commit the messages (or have auto commit enabled). Committed messages won't be re-read by the same consumer group. The message(s) are still available (if within retention limits) for other consumer groups to read.

Yes, but if i produce more, i should see in consumer..

I can't speak about Flutter, but it is certainly possible to consume messages in JavaScript. I don't know too much what you guys are trying to achieve, but sounds ok so far 😄
hehehe, ok thanks a lot.. i search for examples to client js files and i cant find any example...

i dont understand, what i can do with consumer in php project.. because i dont want a page with a while (infinite loop) ... i want a page, that react a new message... and i only can do this with js file changing the dom od page

Thanks a lot for everything

@nick-zh
Copy link
Collaborator

nick-zh commented Nov 16, 2020

  1. Yes
  2. Yes sounds good

@Steveb-p
Copy link
Contributor

Steveb-p commented Nov 16, 2020

because i dont want a page with a while (infinite loop) ... i want a page, that react a new message

Your client app will need to query the webserver for new messages, receive a response and then immediately query again. This technique is also known as long-pooling long-polling (google it: https://www.google.com/search?q=long+polling).
Or, you can use websockets to connect to a long-running PHP process instead. There are a few libraries that can handle this for you, and probably a few with laravel integration as well.

Overall, familiarize yourself with consumer groups and others Kafka concepts. See if it will work for you. If you're building a messaging app, then Kafka is a bad choice - it just wasn't built for that.

@gordett
Copy link

gordett commented Nov 16, 2020

Hi @Steveb-p
Ok. Rceived and understood your message.
I might be wrong. Any tecnology suggestion for app messaging??

Thanks in advance

@Steveb-p
Copy link
Contributor

@gordett If you're after messaging between users, then look into AMQP protocol first. See if it fits what you're trying to build before commiting to a solution.

RabbitMQ is a popular implementation of a AMQP server, along with multitude of others.

Just make sure you understand the limitation of each solution you look into. Don't get me wrong, Kafka is great for what it does, which makes it extremely performant, but it comes at a price.

If in your app you need:

  1. Access messages from a specific date.
  2. Search through messages looking for a phrase.
  3. Delete messages.

Then Kafka is not for your use case.

@Steveb-p
Copy link
Contributor

Steveb-p commented Nov 16, 2020

@gordett @jfariablosis if you have any questions, you can also ask them on https://php-kafka.slack.com/ - this Slack isn't used too much, but Nick and I often sit there.

@gordett
Copy link

gordett commented Nov 16, 2020

Message app its only a example. I dont need search messages by date or phrase..

What i need is, some app flutter send a coordinates to a server (like api in laravel), and when this happens, i want to produce a message to my web page (html + js in laravel project)

the client consume de message produced and update de map with data of the message or make a request ajax/axios to get last coordenates....

the kafka is to the client page dont need to make a lot requests (infinite loop like setTimeout) without coordenates updated... with kafka the client update only when news coordenates are inserted

@Steveb-p
Copy link
Contributor

Steveb-p commented Nov 16, 2020

Then Kafka is a good choice.

You just need to remember to set a proper timeout in while loop (note that $consumer->consume(1000) the argument here is timeout for consumer - it blocks for at most the miliseconds you specify there). A reasonable timeout is usually a little less than 30 seconds for PHP process. Most webservers (nginx, apache) will terminate the connection if they don't receive a complete response from PHP in that time (unless configured otherwise, but I would stick to it).

Or use websockets, then you will have a permanent connection, and you'll be able to write to client directly. Familiarize yourself with those if you haven't already.

@nick-zh
Copy link
Collaborator

nick-zh commented Nov 16, 2020

Adjusted Slack link above so you guys can join. I'll close this ticket since the extension part has been resolved. Feel free to continue the discussion though ✌️

@nick-zh nick-zh closed this as completed Nov 16, 2020
@nick-zh nick-zh changed the title Kafka/conf Question: How to use consumer / producer in an web application Nov 16, 2020
@nick-zh nick-zh changed the title Question: How to use consumer / producer in an web application Question: How to use consumer / producer in a web application Nov 16, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants