Skip to content

Commit

Permalink
Replace use of int32 by int32_to be compatible with OCaml 4.03.0.
Browse files Browse the repository at this point in the history
  • Loading branch information
didier-wenzek committed Jun 15, 2016
1 parent 31f1ba1 commit 2256a81
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 13 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ Kafka.produce producer_topic partition "message 2";;
let rec consume t p = match Kafka.consume ~timeout_ms t p with
| Kafka.Message(_,_,_,msg,_) -> msg
| Kafka.PartitionEnd(_,_,_) -> consume t p
| exception Kafka.Error(Kafka.TIMED_OUT,_) -> (Printf.fprintf stderr "Timeout after: %d ms\n%!" timeout_ms; consume t p)
| exception Kafka.Error(Kafka.TIMED_OUT,_) ->
(Printf.fprintf stderr "Timeout after: %d ms\n%!" timeout_ms; consume t p)
in
let msg = consume consumer_topic partition in assert (msg = "message 0");
let msg = consume consumer_topic partition in assert (msg = "message 1");
Expand Down
20 changes: 10 additions & 10 deletions ocaml_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,8 @@ value ocaml_kafka_consume_start(value caml_kafka_topic, value caml_kafka_partiti
CAMLparam3(caml_kafka_topic,caml_kafka_partition,caml_kafka_offset);

rd_kafka_topic_t *topic = get_handler(Field(caml_kafka_topic,0));
int32 partition = Int_val(caml_kafka_partition);
int64 offset = Int64_val(caml_kafka_offset);
int32_t partition = Int_val(caml_kafka_partition);
int64_t offset = Int64_val(caml_kafka_offset);
int err = rd_kafka_consume_start(topic, partition, offset);
if (err) {
rd_kafka_resp_err_t rd_errno = rd_kafka_errno2err(errno);
Expand All @@ -371,7 +371,7 @@ value ocaml_kafka_consume_stop(value caml_kafka_topic, value caml_kafka_partitio
CAMLparam2(caml_kafka_topic,caml_kafka_partition);

rd_kafka_topic_t *topic = get_handler(Field(caml_kafka_topic,0));
int32 partition = Int_val(caml_kafka_partition);
int32_t partition = Int_val(caml_kafka_partition);
int err = rd_kafka_consume_stop(topic, partition);
if (err) {
rd_kafka_resp_err_t rd_errno = rd_kafka_errno2err(errno);
Expand Down Expand Up @@ -458,7 +458,7 @@ value ocaml_kafka_consume(value caml_kafka_timeout, value caml_kafka_topic, valu
CAMLlocal1(caml_msg);

rd_kafka_topic_t *topic = get_handler(Field(caml_kafka_topic,0));
int32 partition = Int_val(caml_kafka_partition);
int32_t partition = Int_val(caml_kafka_partition);
int timeout = DEFAULT_TIMEOUT_MS;
if (Is_block(caml_kafka_timeout)) {
int t = Int_val(Field(caml_kafka_timeout, 0));
Expand All @@ -485,7 +485,7 @@ value ocaml_kafka_consume_batch(value caml_kafka_timeout, value caml_msg_count,
CAMLlocal1(caml_msg_list);

rd_kafka_topic_t *topic = get_handler(Field(caml_kafka_topic,0));
int32 partition = Int_val(caml_kafka_partition);
int32_t partition = Int_val(caml_kafka_partition);
int timeout = DEFAULT_TIMEOUT_MS;
if (Is_block(caml_kafka_timeout)) {
int t = Int_val(Field(caml_kafka_timeout, 0));
Expand Down Expand Up @@ -522,7 +522,7 @@ value ocaml_kafka_produce(value caml_kafka_topic, value caml_kafka_partition, va
CAMLlocal1(caml_key);

rd_kafka_topic_t *topic = get_handler(Field(caml_kafka_topic,0));
int32 partition = Int_val(caml_kafka_partition);
int32_t partition = Int_val(caml_kafka_partition);

void* payload = String_val(caml_msg);
size_t len = caml_string_length(caml_msg);
Expand Down Expand Up @@ -579,8 +579,8 @@ value ocaml_kafka_store_offset(value caml_kafka_topic, value caml_kafka_partitio
CAMLparam3(caml_kafka_topic,caml_kafka_partition,caml_kafka_offset);

rd_kafka_topic_t *topic = get_handler(Field(caml_kafka_topic,0));
int32 partition = Int_val(caml_kafka_partition);
int64 offset = Int64_val(caml_kafka_offset);
int32_t partition = Int_val(caml_kafka_partition);
int64_t offset = Int64_val(caml_kafka_offset);

rd_kafka_resp_err_t rd_errno = rd_kafka_offset_store(topic, partition, offset);
if (rd_errno) {
Expand Down Expand Up @@ -663,8 +663,8 @@ value ocaml_kafka_consume_start_queue(value caml_kafka_queue, value caml_kafka_t
Store_field(caml_kafka_queue,1,caml_cons);
}

int32 partition = Int_val(caml_kafka_partition);
int64 offset = Int64_val(caml_kafka_offset);
int32_t partition = Int_val(caml_kafka_partition);
int64_t offset = Int64_val(caml_kafka_offset);
int err = rd_kafka_consume_start_queue(topic, partition, offset, queue);
if (err) {
rd_kafka_resp_err_t rd_errno = rd_kafka_errno2err(errno);
Expand Down
4 changes: 2 additions & 2 deletions ocaml_lwt_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ struct job_consume {
struct lwt_unix_job job;

rd_kafka_topic_t *topic;
int32 partition;
int32_t partition;
int timeout;

value caml_kafka_topic; /* We hide the topic in the job, so we can attach it to message. */
Expand Down Expand Up @@ -163,7 +163,7 @@ struct job_consume_batch {
struct lwt_unix_job job;

rd_kafka_topic_t *topic;
int32 partition;
int32_t partition;
int timeout;
size_t msg_count;

Expand Down

0 comments on commit 2256a81

Please sign in to comment.