Skip to content

Add two new optional channels to the ProducerConfig for handling failures and acks#128

Closed
schleppy wants to merge 4 commits intoIBM:masterfrom
CrowdStrike:FailedAndAckingChannels
Closed

Add two new optional channels to the ProducerConfig for handling failures and acks#128
schleppy wants to merge 4 commits intoIBM:masterfrom
CrowdStrike:FailedAndAckingChannels

Conversation

@schleppy
Copy link
Contributor

@schleppy schleppy commented Jul 3, 2014

...
FailedChannel, when provided will have all messages that are not acked
sent as type *ConsumerEvent. The application using this package is
responsible for reading from the failed channel or it will eventually
block.

AckingChannel, when provided will provide counts per partition, per
topic of acked messages after each flush. The application using this
package is responsible for reading from the acking channel or is will
eventually block.

This commit also includes a change for the error sent in the
DroppedMessagesError struct to the errorCb to be the block error instead
of the error returned from Produce.

…ures and acks

FailedChannel, when provided will have all messages that are not acked
sent as type *ConsumerEvent. The application using this package is
responsible for reading from the failed channel or it will eventually
block.

AckingChannel, when provided will provide counts per partition, per
topic of acked messages after each flush. The application using this
package is responsible for reading from the acking channel or is will
eventually block.

This commit also includes a change for the error sent in the
DroppedMessagesError struct to the errorCb to be the block error instead
of the error returned from Produce.
@eapache
Copy link
Contributor

eapache commented Jul 3, 2014

LGTM. @burke @fw42 @sirupsen ?

@eapache eapache closed this Jul 3, 2014
@eapache
Copy link
Contributor

eapache commented Jul 3, 2014

sorry, wrong button

@eapache eapache reopened this Jul 3, 2014
@sirupsen
Copy link
Contributor

sirupsen commented Jul 3, 2014

Makes sense to me.

Are you guys seeing this happening often?

@schleppy
Copy link
Contributor Author

schleppy commented Jul 3, 2014

@sirupsen I assume the question was for me so I will answer as if it were.

Failures, no - not often, but they do happen and the system kafka is supporting needs 100% message guarantees. As far as the acking, we see it all the time :)

The acking channel is simply to give us the ability to generate metrics about how many messages / time we are acking, how many messages are "outstanding", etc.

@sirupsen
Copy link
Contributor

sirupsen commented Jul 3, 2014

👍 We should use this too, I'm surprised we had nothing like this already.

@eapache eapache mentioned this pull request Jul 8, 2014
@otoolep
Copy link

otoolep commented Jul 8, 2014

This design makes sense to me, however I am very new to this code.

producer.go Outdated
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: think you meant partition (typo in the AckingChannel comment)

@fw42
Copy link
Contributor

fw42 commented Jul 8, 2014

Looks good to me, besides the name of that variable.

@graemej
Copy link

graemej commented Jul 8, 2014

Couple nitpicks, but looks sane to me.

producer.go Outdated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like the idea of naming a channel .*Channel. How about something like FailedEvents and BrokerAcks, TopicPartitionAcks, Acknowledgements, or even just Acks.

@burke
Copy link
Contributor

burke commented Jul 9, 2014

Channel naming aside, I like the implementation. 👍

@schleppy
Copy link
Contributor Author

schleppy commented Jul 9, 2014

Not sure why the name change caused the 1.1 tests to fail. I don't have gvm or 1.1 on this machine. Would someone else mind taking a look?

…gger a Travis build and see if test failure was a fluke.
@schleppy
Copy link
Contributor Author

schleppy commented Jul 9, 2014

I changed a comment and the go1.1 tests passed. Not sure what the problem is with the test, or if this was some sort of TravisCI fluke, but with the comment changed, tests pass.

@eapache
Copy link
Contributor

eapache commented Jul 10, 2014

That test failure seems to happen randomly ~1/5 times; I have been trying to track it down for over a week now without success. Needless to say, it isn't your fault but if you have any ideas please share :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does the receiver know which topic this is for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't, and it was never my intention to include that information as it is not needed in my current application. What I do care about is purely the values to know I sent N messages, and N messages have been acked. I could certainly add it by wrapping the map in a struct that includes the topic if that is desired.

type AcksPerPartition struct {
    topic         string
    partitionAcks map[int32]int64
}

On an slightly unrelated note, I just had a thought. Is the "acksPerPartition" map actually reflecting "acksPerReplica"? I have inspected the data before, so I feel pretty confident that it is per partition, but I am not 100% sure now and don't have access to verify at the moment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, OK, I just thought topic would be something you'd need.

len(prb) is, I believe, the total number of messages sent in this request - so you're not overcounting with replication, but you are overcounting if the request includes multiple topics/partitions. I'm not actually sure that there's a nice way to get the right count at the moment; the grouping is done in prb.toRequest and isn't returned (I guess you could dig it out of the ProduceRequest object itself, but that gets ugly when you try to deal with compressed messages).

@wvanbergen
Copy link
Contributor

Anything else that needs to happen to this, except a rebase?

@eapache
Copy link
Contributor

eapache commented Jul 23, 2014

We were discussing whether this over or under-counted the acks - I suggested it was probably overcounting, and haven't heard back... (see discussion on line 419)

@schleppy
Copy link
Contributor Author

tl;dr: For my specific use case, this code is in production and works exactly the way I want it to. I am okay with maintaining our fork as we vendor the code anyway. If there is a desire for me to dig deeper and possibly augment the code base a bit, I think I can get this working in a general way. (That was a long tldr)

Sorry for not responding for so long, but I have had a really strong work commitment over the past couple weeks and haven't really had time to respond.

So, I am not entirely sure where to go with this to be honest. I have mostly worked on this feature around my requirements and not around the requirements of general users. For my use case, a single producer maps directly to a single topic so the errors / acks received are specific to that topic and I don't have "cross pollination" in the prb like others may have. I have several producers but they work off of different sources and the source defines the topic so while I see the issue with the acking code, it isn't something I have to / have had to deal with.

I don't over-count based on partitions either because I iterate over the map of partition_index:count pairs and first ensure they are all equal, then ack that number if they all match. This is what the pertinent code looks like:

    for {
        select {
        case ackCounts := <-kafkaAcks:
            if valid, num := ensureConsistentAcks(ackCounts); valid {
                ackNotifier <- num
                ...
            } else {
                ...
            }
        }
    }

I realize my use case might not be generic enough, but I am not sure how much you would like me to go into trying to reconstruct the counts from requests with multiple topics by trying to pull apart ProduceRequest object. I am certainly willing to give it more effort to make it general enough for all use cases but I have a feeling that will be adding to the code base quite a bit. I am also willing to maintain our fork if that is necessary.

I appreciate the work that has gone into this project, and really appreciate it being open-sourced.

@eapache
Copy link
Contributor

eapache commented Jul 24, 2014

So I've looked at this again; it definitely doesn't return the right values when multiple topics are produced to at once; we shouldn't merge it until that is fixed. Unfortunately, fixing that correctly looks like a relatively hard problem given the current architecture (especially regarding compressed messages). If it works for your current case and you're OK maintaining the separate patch on top then I think that is the best solution for now (though I would add a big XXX caveat to your patch so that if somebody tries to use it for multiple topics in the future they'll understand what's going on).

I'm kind of hoping that eventually #132 will turn into something useful; that architecture makes this whole problem trivial from the get-go.

@eapache eapache closed this Jul 24, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants