Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing keyed partitioner #378

Closed
wants to merge 4 commits into from
Closed

Fixing keyed partitioner #378

wants to merge 4 commits into from

Conversation

itamarwe
Copy link

This pull request solves the problem of #354

When sending the first events with the producer, the topic metadata is not yet updated and the partitions array is empty. It causes the partitioner to return NaN which fails the producer.

This pull request returns 0 as the chosen partition as long as the partitions array is empty.

It still feels like a workaround, since I would expect the producer to update the metadata before firing the ready event. So it would be great if anyone could pick that up.

@eugeneware
Copy link

Great job @itamarwe !

@bullidium
Copy link

From our use of the library we came across the same issue when using the producer as it doesn't have any previous metadata about a partition. We got around it temporarily by refreshing the metadata once the client and producer are both connected and ready, as this gives the necessary information to the buildPayloads method of the HighLevelProducer, as we haven't had time to figure out the best place to fix the issue. As part of the send it already checks the payload against the metadata, fails, refreshes the metadata and then fails checks a second time.

I agree that simply returning 0 is a workaround, especially in our case where messages with the same key are placed on the same partition to ensure order. In this case returning 0 for the first message on boot of the process, would cause out of order messages. In my opinion this means the KeyedPartitioner is no longer useful, as you use that partitioner to have finer grained control over which partition your messages go onto.

Currently our workaround looks like:

this.client = new kafka.Client(this.connectionString, this.name);
this.producer = new kafka.HighLevelProducer(this.client, {
    partitionerType: 3, // Keyed partitioner
});
this.producer.on('ready', () => {
    this.client.refreshMetadata(this.topics, (err) => {
        if (err) {
            this.emit('error', err);
        }
        this.emit('ready');
    });
});

I would propose to cause a metadata refresh during the connect method of the HighLevelProducer, but would require knowledge of the topics or cause a natural refresh of the metadata during construction of the client. Ideally the best solution would be to rebuild the payload following a failure of the clients send method, as the refresh will have been performed. But I couldn't make up my mind on the best approach to go down, so thought it would be worth a discussion.

@estliberitas
Copy link
Contributor

estliberitas commented May 29, 2016

Well, though this PR does not fix the root cause (i.e. partitioner.getPartition() must be called after successful metadata update) it at least temporary fixes issues caused by the current and wrong behavior.

So, LGTM.

@itamarwe
Copy link
Author

@estliberitas How would you fix the root cause? I don't mind investing some time on it.
I think the relevant cases are on initialization (the metadata is empty) and after an update.
Any ideas of how to solve this properly?

@estliberitas
Copy link
Contributor

@itamarwe well, since the work on module was iterative without any limitations, design goals, etc. I let myself say it's just got very complex and I already don't understand how things work there. If we ever to change things, we would re-design it and make it a much better piece of code. Or provide a plan for iterative refactoring.

@estliberitas
Copy link
Contributor

@itamarwe last comment was for the long-term, in a short-term: we should care about buildPayloads() being called only if metadata exists already. Right now there is no restriction for that at least in high-level producer as far as I see.

@estliberitas
Copy link
Contributor

When we get 'brokersChanged' event, it's already some kind of race condition, right?
https://github.com/SOHU-Co/kafka-node/blob/master/lib/highLevelProducer.js#L85

And it's not reflected in send() or buildPayloads() in any way, right?
https://github.com/SOHU-Co/kafka-node/blob/master/lib/highLevelProducer.js#L104

@itamarwe
Copy link
Author

Would you say the send method should update the metadata and rebuild the payload in case of failure?

Another option would be to restrict sending when the metadata is not updated, but on the other hand, due to the asynchronous nature of node, the brokers can update from the moment the send function was invoked until the message is actually delivered. So upon a brokers change, some messages will fail regardless.

Perhaps the right solution is to have a mechanism that updates the metadata and rebuilds the payload upon failure?

@bullidium
Copy link

bullidium commented May 30, 2016

@estliberitas I would disagree that adding in other "wrong" behaviour is worth a temporary fix. Because if you need to guarantee ordering of messages being placed on kafka, having the first message when the process boots up being placed to partition 0 will allow messages to be out of order. The whole reason you want to use the keyed partitioner is you want all messages with the same key to be placed on the same partition, and this temporary fix will stop it doing that. It will also be incredibly hard to track down where the inconsistency is coming from, where as at the moment it throws and error, which using something like my workaround above will fix and guarantee ordering.

If you don't care about ordering of messages, then the partition that a message is placed into doesn't really matter, and you can just use a partitioner like the CyclicPartitioner.

@itamarwe
Copy link
Author

@mrjpsycho I tend to agree with you. Let's try to find a more suitable solution.
Nevertheless, will your workaround survive brokers change? Any other event we should account for?

@estliberitas
Copy link
Contributor

@mrjpsycho agree.

@itamarwe it won't survive brokers change, because it's only wrapper around initialization, forcing metadata refresh.

@bullidium
Copy link

@itamarwe as @estliberitas says it won't survive things like brokers changing, we added in some additional attempts to refresh the metadata if a send from the HighLevelProducer fails incase a broker goes down, to mitigate the issues somewhat. This was the whole reason we've not been able to figure out the best place(s) to add in refreshing of metadatas, as there are potentially many scenarios that require it, on both producing and consuming sides. Its going to need some careful consideration for the best location for the refresh to occur.

@estliberitas
Copy link
Contributor

@itamarwe @mrjpsycho let's create a separate issue for this - meaning correcting of flow for publisher only for now. My plan is to fix publishing part once and for all. And consider as more cases as possible: brokers changing, streaming, batching, async (de)compression, etc. The only problem is I almost don't have a time for this now. So if you'd start, I'd join discussion later.

@craighardcastle
Copy link

Hi,

I have also just come across this bug and I have implement a workaround in the HighLevelProducer that seems to work.

Currently the send method looks like this:

HighLevelProducer.prototype.send = function (payloads, cb) {
    this.client.sendProduceRequest(this.buildPayloads(payloads, this.client.topicMetadata), this.requireAcks, this.ackTimeoutMs, cb);
};

I have changed it to be this:

HighLevelProducer.prototype.send = function (payloads, cb) {
    var self = this;

    var required = payloads.filter(function(p){ return !p.hasOwnProperty('partition') }).map(function(p){ return p.topic });
    var missing = required.filter(function(t){ return !self.client.topicMetadata[t] });

    var doSend = function() {
        self.client.sendProduceRequest(self.buildPayloads(payloads, self.client.topicMetadata), self.requireAcks, self.ackTimeoutMs, cb);
    };

    if (missing.length) {
        // Need to refresh meta data because it is not set
        // for some of the topics and we won't able to calculate
        // the correct partitions for some of the payloads
        return this.client.refreshMetadata(missing, function() {
            doSend();
        });
    }

    doSend();
};

The issue as mentioned above is that the topicMetadata isn't set when you try to send. This means there isn't a list of topic partitions needed to calculate the partition number to use based on the hash of the payload key. My work around checks for topicMetadata for each of the specified topics and if there are any missing then it will refresh the topicMetadata and then send. Otherwise it just sends as expected.

Not sure if this is the correct fix, but seems to do the trick for me.

@itamarwe
Copy link
Author

itamarwe commented Jul 6, 2016

@craighardcastle Your fix looks good to me. What worries me, is that it will refresh the metadata for every send until the metadata is refreshed. It would probably be better to only do it once, and then send all of the outstanding events once the metadata is refreshed.

@craighardcastle
Copy link

craighardcastle commented Jul 6, 2016

Yeah valid point @itamarwe. I have only been using a simple test app and therefore had no concurrent send requests. The simplest way to solve that issue is to use an async queue with a concurrency of 1:

var async = require('async');

var HighLevelProducer = function (client, options) {
    //... other constructor code

    this.sendQueue = async.queue(function(task, callback) {
        task(callback);
    }, 1);

    this.connect();
};

HighLevelProducer.prototype.send = function (payloads, cb) {
    var self = this;

    this.sendQueue.push(function(next) {
        var required = payloads.filter(function(p){ return !p.hasOwnProperty('partition') }).map(function(p){ return p.topic });
        var missing = required.filter(function(t){ return !self.client.topicMetadata[t] });

        var doSend = function() {
            self.client.sendProduceRequest(self.buildPayloads(payloads, self.client.topicMetadata), self.requireAcks, self.ackTimeoutMs, function(err, data) {
                cb(err, data);
                next();
            });
        };

        if (missing.length) {
            // Need to refresh meta data because it is not set
            // for some of the topics and we won't able to calculate
            // the correct partitions for some of the payloads
            return self.client.refreshMetadata(missing, function() {
                doSend();
            });
        }

        doSend();
    });
};

@itamarwe
Copy link
Author

itamarwe commented Jul 7, 2016

@craighardcastle Nice fix. Still, this would mean that the events would always be sent synchronously, which isn't always desired.
I was thinking more in the direction of using a flag while updating the metadata:

var HighLevelProducer = function (client, options) {
    //... other constructor code

    this.refreshingMetadata = false;
    this.sendQueue = [];

    this.connect();
};

HighLevelProducer.prototype.send = function (payloads, cb) {
    var self = this;
    var required = payloads.filter(function(p){ return !p.hasOwnProperty('partition'); }).map(function(p){ return p.topic; });
    var missing = required.filter(function(t){ return !self.client.topicMetadata[t]; });

    var doSend = function() {
        self.client.sendProduceRequest(self.buildPayloads(payloads, self.client.topicMetadata), self.requireAcks, self.ackTimeoutMs, function(err, data) {
            cb(err, data);
        });
    };

    if (self.refreshingMetadata||self.sendQueue.length){
       return self.sendQueue.push(doSend);
    }

    if (missing.length===0){
      return doSend();
    }

    self.refreshingMetadata = true;
    self.client.refreshMetadata(missing, function() {
        self.refreshingMetadata = false;
        while (self.sendQueue.length) {
          sendFn = self.sendQueue.shift();
          sendFn();
        }
        doSend();
    });
};

@craighardcastle
Copy link

craighardcastle commented Jul 7, 2016

Hi. Async queue is asynchronous not synchronous. If you mean it will send the queued messages in series then yes that is true, but it seems important to me to make sure that the queued requests are sent in the order in which the send method was called and the async queue guarantees this. Without making sure they are done in order then the topic could receive the messages out of order and this could cause all kinds of problems for consumers and log compaction for keyed messages.

The problem here is that until send is called the code doesn't know whether it has the meta data for the specified topic(s). So the simplest solution is to wrap up the send in the async queue but this does affect all topics and makes every send request be processed in series. I guess you could maybe go a bit further and make an async queue per topic, thereby making it only series per topic, which doesn't seem bad to me, however the send() method takes a list of payloads that can each have a mixture of topics but that all have one callback, so writing the code to handle the multiple refresh calls and then the separate topic queues would be tricky.

Not sure on this one. Maybe @mrjpsycho @eugeneware @estliberitas or @mpj might have an opinion?

You could just change the docs to say "you must call refreshMetadata() before you send the first message to a new topic using a keyed message" :)

@itamarwe
Copy link
Author

itamarwe commented Jul 7, 2016

Right. I see what you mean about the different topics. My code above does not address that.

Regarding the synchronous vs asynchronous discussion, I think that the producer should leave the decision of whether or not to send the payloads in series to the developer. A developer could decide to send all events in order by waiting for the callback, or to send all events asynchronously. That way, the developer has the flexibility to decide.

@craighardcastle
Copy link

craighardcastle commented Jul 7, 2016

Agreed, which is why the solution is tricky. Generally speaking I would think it you are using keyed messages then you do want the messages to be sent in order (per topic partition), but this method needs to work with all the partitioner types, and for some of those the send order is not necessarily important.

Starting to think just calling client.refreshMetadata() before you send the first message to a new topic might be the best workaround for now. Then, as you said, a developer can have total control of ordering and can just make sure they call refresh before calling send for the first time.

@itamarwe itamarwe closed this Jul 21, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants