Skip to content

Commit

Permalink
Handle delivery.report.only.error in Python (#84)
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Feb 28, 2017
1 parent ece3df4 commit 9eaefa5
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 1 deletion.
4 changes: 4 additions & 0 deletions confluent_kafka/src/Producer.c
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm,
goto done;
}

/* Skip callback if delivery.report.only.error=true */
if (self->u.Producer.dr_only_error && !rkm->err)
goto done;

msgobj = Message_new0(self, rkm);

args = Py_BuildValue("(OO)",
Expand Down
21 changes: 20 additions & 1 deletion confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -1124,7 +1124,26 @@ static int producer_conf_set_special (Handle *self, rd_kafka_conf_t *conf,
}

return 1;
}

} else if (!strcmp(name, "delivery.report.only.error")) {
/* Since we allocate msgstate for each produced message
* with a callback we can't use delivery.report.only.error
* as-is, as we wouldn't be able to ever free those msgstates.
* Instead we shortcut this setting in the Python client,
* providing the same functionality from dr_msg_cb trampoline.
*/

if (!PyBool_Check(valobj)) {
cfl_PyErr_Format(
RD_KAFKA_RESP_ERR__INVALID_ARG,
"%s requires bool", name);
return -1;
}

self->u.Producer.dr_only_error = valobj == Py_True;

return 1;
}

return 0; /* Not handled */
}
Expand Down
2 changes: 2 additions & 0 deletions confluent_kafka/src/confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ typedef struct {
const rd_kafka_topic_t *,
const void *, size_t, int32_t,
void *, void *); /**< Fallback C partitioner*/

int dr_only_error; /**< delivery.report.only.error */
} Producer;

/**
Expand Down
56 changes: 56 additions & 0 deletions examples/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,62 @@ def verify_producer():
# Block until all messages are delivered/failed
p.flush()

#
# Additional isolated tests
#
test_producer_dr_only_error()



def test_producer_dr_only_error():
"""
The C delivery.report.only.error configuration property
can't be used with the Python client since the Python client
allocates a msgstate for each produced message that has a callback,
and on success (with delivery.report.only.error=true) the delivery report
will not be called and the msgstate will thus never be freed.
Since a proper broker is required for messages to be succesfully sent
this test must be run from the integration tests rather than
the unit tests.
"""
p = confluent_kafka.Producer({"bootstrap.servers": bootstrap_servers,
'broker.address.family':'v4',
"delivery.report.only.error":True})

class DrOnlyTest (object):
def __init__ (self):
self.remaining = 1

def handle_err (self, err, msg):
""" This delivery handler should only get called for errored msgs """
assert "BAD:" in msg.value().decode('utf-8')
assert err is not None
self.remaining -= 1

def handle_success (self, err, msg):
""" This delivery handler should never get called """
# FIXME: Can we verify that it is actually garbage collected?
assert "GOOD:" in msg.value().decode('utf-8')
assert err is None
assert False, "should never come here"

state = DrOnlyTest()

print('only.error: Verifying delivery.report.only.error')
p.produce(topic, "BAD: This message will make not make it".encode('utf-8'),
partition=99, on_delivery=state.handle_err)

p.produce(topic, "GOOD: This message will make make it".encode('utf-8'),
on_delivery=state.handle_success)

print('only.error: Waiting for flush')
p.flush(10000)

print('only.error: Remaining messages now %d' % state.remaining)
assert state.remaining == 0



def verify_avro():
from confluent_kafka import avro
Expand Down

0 comments on commit 9eaefa5

Please sign in to comment.