Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message headers support (C) #1480

Merged
merged 3 commits into from
Jan 4, 2018
Merged

Message headers support (C) #1480

merged 3 commits into from
Jan 4, 2018

Conversation

edenhill
Copy link
Contributor

No description provided.

@@ -187,6 +189,27 @@ static void msg_consume (rd_kafka_message_t *rkmessage,
!timestamp ? 0 :
(int)time(NULL) - (int)(timestamp/1000));
}

if (!rd_kafka_message_headers(rkmessage, &hdrs)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some n00b commentary for your entertainment. I find this difficult to read, though I guess it's a Cism and so no problem. I'd have written this here: if (rd_kafka_message_headers(rkmessage, &hdrs) == RD_KAFKA_RESP_ERR_NO_ERROR).


fprintf(stdout, "%% Headers:");

while (!rd_kafka_header_iter_all(hdrs, idx++,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd remove the word iter from this method name since you're iterating outside the method call (with the idx++). also the word all. perhaps just rd_kafka_header_get or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, what about rd_kafka_header_get() for the key-specific one, and rd_kafka_header_enum() or ..list() or ..get_all() for the one that iterates all keys?

if (!hdrs)
hdrs = rd_kafka_headers_new(8);

err = rd_kafka_header_add(hdrs, name, name_sz, val, -1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bit tricky (dual meaning of the -1), but I can't think of anything I prefer and I guess it's ok.

* null-terminated string and the length is automatically
* acquired using strlen().
*
* @returns RD_KAFKA_RESP_ERR__READ_ONLY if the headers are read-only,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RD_KAFKA_RESP_ERR__READ_ONLY is never returned I think? do you have plans for this? is it a bit odd for a C api to try and enforce this sort of thing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got this from the Java API, but it turned out to be an internal API that we dont need to mimic. Will remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Second thought I think we'll keep it so that headers can be "closed" when final message serialization has taken place, this would stop (future) late-stage interceptors from thinking they could modify the headers after serialization.

/**
* @brief Remove all headers for the given key (if any).
*
* @returns RD_KAFKA_RESP_ERR__READ_ONLY if the headers are read-only,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment about readonly as above

* @param name Header to find (last match).
* @param valuep (out) Set to a (null-terminated) const pointer to the value
* (may be NULL).
* @param sizep (out) Set to the value's size (not including null-terminator).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps include a note about what it means if sizep == -1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sizep will not be set to -1 (null values have a size of 0)

src/rdkafka.h Outdated
* @param sizep (out) Set to the value's size (not including null-terminator).
*/
RD_EXPORT rd_kafka_resp_err_t
rd_kafka_header_iter (const rd_kafka_headers_t *hdrs, size_t idx,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't like iter in the name here either. maybe rd_kafka_header_get is appropriate for this as well (not sure of the best name).

@@ -195,8 +195,6 @@ rd_tmpabuf_write_str0 (const char *func, int line,
"expected %"PRIusz" bytes > %"PRIusz \
" remaining bytes", \
__len0, rd_kafka_buf_read_remain(rkbuf)); \
(rkbuf)->rkbuf_err = RD_KAFKA_RESP_ERR__BAD_MSG; \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not going to bother understanding why you took this out, I assume you know what you're doing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parse_fail() already sets it

* @brief header_t(name) to char * comparator
*/
static int rd_kafka_header_cmp_str (void *_a, void *_b) {
const rd_kafka_header_t *a = _a;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why you have void* parameters then immediately cast?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comparator signature is int () (void *, void *), passing a function with another signature would cause a compiler warning

@mhowlett
Copy link
Contributor

mhowlett commented Nov 2, 2017

LGTM after you've addressed comments as you see fit.

@almazik
Copy link

almazik commented Nov 22, 2017

Is there a progress on this merge request? This feature is important for us...

@johnistan
Copy link
Contributor

johnistan commented Dec 18, 2017

I have played around with python support for this feature. I haven't found a public API to find out how many headers an msg has. leading to hacks like:

        //HACK to figure out size of headers
        while (!rd_kafka_header_iter_all(hdrs, header_size++,
                                         &header_key, &header_value, &header_value_size)) {
        }

Can we add a rd_kafka_header_length to the public API? Or is there an existing approach I am not seeing.

@edenhill
Copy link
Contributor Author

@johnistan Good idea, will add.

@edenhill
Copy link
Contributor Author

edenhill commented Jan 2, 2018

@almazik This PR is pretty much ready to be merged, just waiting final review from @mhowlett on iter API renaming.

@mhowlett
Copy link
Contributor

mhowlett commented Jan 4, 2018

@edenhill - go for it. lgtm.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants