Skip to content

Commit

Permalink
Corrected produce w/ timestamp version check to >0.9.3. (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Jan 30, 2017
1 parent b97615b commit 80d7f30
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 7 deletions.
6 changes: 3 additions & 3 deletions kafka/integration_test.go
Expand Up @@ -709,11 +709,11 @@ func TestConsumerPollRebalance(t *testing.T) {

// TestProducerConsumerTimestamps produces messages with timestamps
// and verifies them on consumption.
// Requires librdkafka >=0.9.3 and Kafka >=0.10.0.0
// Requires librdkafka >=0.9.4 and Kafka >=0.10.0.0
func TestProducerConsumerTimestamps(t *testing.T) {
numver, strver := LibraryVersion()
if numver < 0x00090300 {
t.Skipf("Requires librdkafka >=0.9.3 (currently on %s)", strver)
if numver < 0x00090400 {
t.Skipf("Requires librdkafka >=0.9.4 (currently on %s)", strver)
}

if !testconfRead() {
Expand Down
4 changes: 2 additions & 2 deletions kafka/producer.go
Expand Up @@ -34,7 +34,7 @@ rd_kafka_resp_err_t do_produce (rd_kafka_t *rk,
int64_t timestamp,
uintptr_t cgoid) {
#if RD_KAFKA_VERSION >= 0x00090300
#ifdef RD_KAFKA_V_TIMESTAMP
return rd_kafka_producev(rk,
RD_KAFKA_V_RKT(rkt),
RD_KAFKA_V_PARTITION(partition),
Expand Down Expand Up @@ -147,7 +147,7 @@ func (p *Producer) produce(msg *Message, msgFlags int, deliveryChan chan Event)
// transmit queue, thus returning immediately.
// The delivery report will be sent on the provided deliveryChan if specified,
// or on the Producer object's Events() channel if not.
// msg.Timestamp requires librdkafka >= 0.9.3 (else returns ErrNotImplemented),
// msg.Timestamp requires librdkafka >= 0.9.4 (else returns ErrNotImplemented),
// api.version.request=true, and broker >= 0.10.0.0.
// Returns an error if message could not be enqueued.
func (p *Producer) Produce(msg *Message, deliveryChan chan Event) error {
Expand Down
4 changes: 2 additions & 2 deletions kafka/producer_test.go
Expand Up @@ -62,9 +62,9 @@ func TestProducerAPIs(t *testing.T) {
err = p.Produce(&Message{TopicPartition: TopicPartition{Topic: &topic2, Partition: 0}, Timestamp: time.Now()}, nil)
numver, strver := LibraryVersion()
t.Logf("Produce with timestamp on %s returned: %s", strver, err)
if numver < 0x00090300 {
if numver < 0x00090400 {
if err == nil || err.(Error).Code() != ErrNotImplemented {
t.Errorf("Expected Produce with timestamp to fail with ErrNotImplemented on %s, got: %s", strver, err)
t.Errorf("Expected Produce with timestamp to fail with ErrNotImplemented on %s (0x%x), got: %s", strver, numver, err)
}
} else {
if err != nil {
Expand Down

0 comments on commit 80d7f30

Please sign in to comment.