-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka_lwt.consume_batch is not thread-safe #3
Comments
Reproduced. The following main leads to a segmentation fault when messages are produced: open Lwt
let rec count_message topic partition =
Kafka_lwt.consume_batch topic partition
>>= fun messages ->
Lwt_io.printl (string_of_int (List.length messages))
>>= fun () ->
count_message topic partition
let main =
let consumer = Kafka.new_consumer ["metadata.broker.list","localhost:9092"] in
let topic = Kafka.new_topic consumer "test" [] in
Kafka.consume_start topic 0 Kafka.offset_end;
Kafka.consume_start topic 1 Kafka.offset_end;
Lwt.join [
count_message topic 0;
count_message topic 1
]
let _ =
Lwt_main.run main |
I added missing calls to CAMLparam and CAMLreturn. This removes the former segmentation fault, which was systematically during garbage collection. But the issue is deeper and memory is still corrupted. The fact that there is one shared topic handler and 2 partitions is determinant : nothing wrong appends if we use two topic handlers or if only one of the partition has actually messages to be consumed. |
I observe memory corruption when trying to run Kafka_lwt.consume_batch on the same topic object but different partitions in two Lwt threads. Is it supposed to work like this?
The text was updated successfully, but these errors were encountered: