Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

One message lag #111

Open
chunaiarun opened this issue Apr 15, 2024 · 11 comments
Open

One message lag #111

chunaiarun opened this issue Apr 15, 2024 · 11 comments

Comments

@chunaiarun
Copy link

Good morning
I am facing a strange issue
Our consumer program consumes all the messages and leaves out the last message for all the partitions present in the Kafka broker

Say We have 0, 1 and 2 partitions , for all of them. I can see 1 lag. At the end of consuming messages the Current offset value is always one number less than log-end-offset

I am using kfx.assign to subscribe and kfx.consumecb function to receive the messages

Pls let me know how to proceed

@chunaiarun
Copy link
Author

I am using .kfx.CommitOffsets function to commit offset

If I try to print .kfx.CommittedOffsets it shows the offset of the last but one offset

@chunaiarun
Copy link
Author

Config parameters
metadata.broker.list
group.id
fetch.wait.max.ms 10
statistics.interval.ms 10000
enable.auto.commit false
enable.auto.offset.store false
message.max.bytes 10000000

@chunaiarun
Copy link
Author

Uploading IMG_1027.jpeg…

@sshanks-kx
Copy link
Contributor

Will look to recreate & report back when done so. Thanks.

@chunaiarun
Copy link
Author

chunaiarun commented Apr 17, 2024 via email

@sshanks-kx
Copy link
Contributor

PositionOffsets uses rd_kafka_position (https://github.com/KxSystems/kafka/blob/c6de3f88803072d5ed778d8c39699b9a4752e325/kfk.c#L664C22-L664C39)

The documentation on librdkafka for this says "The offset field of each requested partition will be set to the offset of the last consumed message + 1"
https://docs.confluent.io/platform/7.5/clients/librdkafka/html/rdkafka_8h.html#a6e9e36bd9e6bf84a9f3092fcbfa3a9ac

Using our examples...

running test_producer.q to send msgs to topic1 ...

"Sending 2024.04.17D15:41:09.772013000"
"Sending 2024.04.17D15:41:10.772013000"
"Sending 2024.04.17D15:41:11.772013000"

running test_offsetc.q to sub, record msgs to data table & print offsets/commits....

looking at data table:

mtype topic client partition offset msgtime                       data                            key      headers                 rcvtime                      
----------------------------------------------------------------------------------------------------------------------------------------------------------------
      test1 0      0         28     2024.04.17D15:41:09.776000000 "2024.04.17D15:41:09.772013000" `byte$() (`symbol$())!`symbol$() 2024.04.17D15:41:09.781294000
      test1 0      0         29     2024.04.17D15:41:10.774000000 "2024.04.17D15:41:10.772013000" `byte$() (`symbol$())!`symbol$() 2024.04.17D15:41:10.780654000
      test1 0      0         30     2024.04.17D15:41:11.776000000 "2024.04.17D15:41:11.772013000" `byte$() (`symbol$())!`symbol$() 2024.04.17D15:41:11.780235000

looking at details printed after everything received/committed:

"Position:"
topic partition offset metadata
-------------------------------
test1 0         31     ""      
"Before commited:"
topic partition offset metadata
-------------------------------
test1 0         30     ""      
"After commited:"
topic partition offset metadata
-------------------------------
test1 0         30     "" 

from the above, I can see it consumed the last message (recorded/commited at position 30) and the PositionOffsets reports offset 31. Based on the librdkafka doc, this would appear to be correct.

I can change our docs to reflect that PositionOffsets provides "last consumed message + 1" as per librdkafkas rd_kafka_position documentation.

Does that explain what you are seeing or do you believe that your missing a message or some other issue?

Thanks

@chunaiarun
Copy link
Author

chunaiarun commented Apr 19, 2024 via email

@sshanks-kx
Copy link
Contributor

Thanks for the details. Will investigate & report back.

@sshanks-kx
Copy link
Contributor

If you haven't already, the issue might be that you need to add 1 received offset when using CommitOffsets ?
I will update the docs & add to example program.
ref: https://docs.confluent.io/platform/7.5/clients/librdkafka/html/rdkafka_8h.html#ab96539928328f14c3c9177ea0c896c87 "offset should be the offset where consumption will resume, i.e., the last processed offset + 1"

Recreation

Using automatic commits:

3 messages published, received with offsets 182,183,184.

The subscriber with enable.auto.commit set to true, I eventually see offsetcb firing, reporting offset 185:

0i
"Success"
,`topic`partition`offset`metadata!(`test1;0i;185;"")

Inspecting with kafka-consumer-groups program

kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group 0
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
0               test1           0          185             185             0               -               -               -

Using manual commits:

3 messages published, received with offsets 185,186,187

The subscriber with enable.auto.commit set to false, with .kfk.CommitOffsets called with current offset+1, I eventually see offsetcb firing, reporting offset 188:

0i
"Success"
,`topic`partition`offset`metadata!(`test1;0i;188;"")

Inspecting with kafka-consumer-groups program

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
0               test1           0          188             188             0               -               -               -

Example Client

Using this basic subscriber to show issue

\l ../kfk.q

kfk_cfg:(!) . flip(
    (`metadata.broker.list;`localhost:9092);
    (`group.id;`0);
    (`fetch.wait.max.ms;`10);
    (`statistics.interval.ms;`10000);
    (`enable.auto.commit;`false);
    (`enable.auto.offset.store;`true)
    );
client:.kfk.Consumer[kfk_cfg];

// Topics to be published to
topic1:`test1
data:();

// Default callback function overwritten for managing of consumption from all topics
.kfk.consumetopic[`]:{[msg]
    0N!string msg`offset;
    0N!string msg`data;
    msg[`data]:"c"$msg[`data];
    msg[`rcvtime]:.z.p;
    data,::enlist msg;
    .kfk.CommitOffsets[client;msg`topic;(enlist msg`partition)!(enlist 1+msg`offset);0b];
    0N!"kfk.consumetopic done";}

// Define Offset callback functionality
.kfk.offsetcb:{[cid;err;offsets]0N!"offsetcb";show (cid;err;offsets);}

// Subscribe to relevant topics from a defined client
.kfk.Assign[client;(enlist `test1)!enlist 0]

(I comment out .kfk.CommitOffsets line & change enable.auto.commit to true to compare with automatic commits)

@chunaiarun
Copy link
Author

chunaiarun commented Apr 23, 2024 via email

@sshanks-kx
Copy link
Contributor

fyi: Ive now updated the information around callbacks/calls. Can be found via https://github.com/KxSystems/kafka/blob/master/docs/reference.md . Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants