Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Directly return a DeliveryFuture from FutureProducer::send_copy #53

Merged
merged 5 commits into from Jul 2, 2017

Conversation

wrl
Copy link
Contributor

@wrl wrl commented Jun 29, 2017

Returning a Result made it very difficult to use send_copy() in a chain of futures, and, since a Future is just an asynchronous Result, this commit changes the DeliveryFuture to yield a KafkaError on failure. Previously, the DeliveryFuture would only yield Canceled, and, to continue supporting this, a new variant of KafkaError was added, FutureCanceled.

This makes the FutureProducer API considerably more ergonomic. For example, in a function which returns a BoxFuture<(), ::std::io::Error>, this is what was necessary before:

return match kafka_producer.send_copy::<_, ()>(
    "test", None, Some(&*format!("msg on {}", topic)), None, None) {
    Ok(report) => {
        report
            .map_err(|_| io::Error::new(io::ErrorKind::Other,
                                        "failed to send kafka message"))
            .and_then(|_| future::ok(()))
            .boxed()
    },

    Err(_) => {
        future::err(io::Error::new(
                io::ErrorKind::InvalidData, "failed to send kafka message"))
            .boxed()
    }
}

This becomes even more complicated with the fact that match arms must return the same type and the types of chained Futures can easily get very complex.

Compare that to the code one can write with this PR applied:

return kafka_producer
    .send_copy::<_, ()>("test", None, Some(&*format!("msg on {}", topic)), None, None)
    .map_err(|_| io::Error::new(io::ErrorKind::Other,
                                "failed to send kafka message"))
    .and_then(|_| future::ok(()))
    .boxed();

Much less verbose, and, more importantly, it flows naturally as part of a chain of futures.

Full disclosure: there is a compilation warning for the unused Result returned by tx.send() in two places, but this warning was present before the change and I'm also not sure what the correct way to handle an error arising in such a situation actually is.

@wrl
Copy link
Contributor Author

wrl commented Jun 29, 2017

I'll get the tests fixed up.

@fede1024
Copy link
Owner

Thanks! Yes I agree that using the current interface is a pain. The new one seems to make much more sense.

The warning message happens because you changed from the deprecated complete to the new send. I'm not sure how that error should be handled and i haven't had time to give a look. Can you keep the current method for now (complete)? I plan to move to the new one as part of a separate change (i'll open a new issue for it).

Also, could you remove the change to rdkafka-sys/librdkafka from the PR?

@wrl
Copy link
Contributor Author

wrl commented Jun 29, 2017

So, I opted to just eat the unused result from the tx.send() calls. If you'd still like me to roll those back, let me know.

Tests are hanging, going to investigate.

@wrl
Copy link
Contributor Author

wrl commented Jun 29, 2017

Not sure what's up with the tests. Can reproduce locally with docker-compose, can't reproduce testing outside of a container against a kafka server.

@fede1024
Copy link
Owner

fede1024 commented Jun 29, 2017 via email

@wrl
Copy link
Contributor Author

wrl commented Jun 29, 2017

Yeah, I'm setting the partition number to 3 (otherwise a handful of tests fail immediately).

@fede1024
Copy link
Owner

I see the tests failing occasionally with the new 0.11.0-RC1 version of librdkafka in master. I'll try to debug and fix the issue.

@fede1024
Copy link
Owner

fede1024 commented Jul 2, 2017

Tests should be working now.

William Light and others added 5 commits July 2, 2017 17:36
Returning a Result made it very difficult to use `send_copy()` in a
chain of futures, and, since a Future is just an asynchronous Result,
this commit changes the DeliveryFuture to yield a KafkaError on failure.
@wrl
Copy link
Contributor Author

wrl commented Jul 2, 2017

Alright, rebased onto master and everything's passing now. Thanks for looking into that!

@fede1024
Copy link
Owner

fede1024 commented Jul 2, 2017

Thanks for the PR, I'll merge it.

Coming back to the example you provided: an Ok(_) result from send_copy doesn't necessarily mean that production was successful; the delivery report should be checked for errors as well (you might know this already and you might have skipped it for brevity). The way it works right now is very close to the C interface, but I'm considering changing the API so that the delivery report is only returned in case the message was actually produced successfully, and return an error otherwise.

I'll give it a try and see if it gets more ergonomic. Feedback is welcome :)

@fede1024 fede1024 merged commit 1c2f5c4 into fede1024:master Jul 2, 2017
@thijsc
Copy link
Collaborator

thijsc commented Aug 14, 2017

I had a look at upgrading our project to 0.12 with this change and am a bit confused about pre-delivery errors.

We match on RDKafkaError::QueueFull and RDKafkaError::MessageSizeTooLarge for example. These happen immediately before any polling. How should error handling for this be added after this change?

@fede1024
Copy link
Owner

If one of these errors occurs it should cause an error result in the returned future when it's polled the first time. The only difference will be that there is no clear separation between the errors that can happen immediately and the ones that can happen asynchronously.

Would this change prevent you from handling those errors properly?

@thijsc
Copy link
Collaborator

thijsc commented Aug 16, 2017

I'm not sure if this is an issue for us. I'll try to upgrade again with this knowledge and will let you know.

@thijsc
Copy link
Collaborator

thijsc commented Sep 20, 2017

I was able to do some further testing and this indeed breaks our use case. We can sometimes have some traffic peaks and in that case we hold of producing a bit. Another piece of error handling we do is clean up a message if it's too big.

Before this pull this is possible (sample from our code):

Ok(f) => return Ok(f),
Err(KafkaError::MessageProduction(RDKafkaError::QueueFull)) => {
    warn!("Producer queue full for {} on attempt {}, waiting one second", topic, attempt);
    thread::sleep(Duration::from_secs(1));
},
Err(KafkaError::MessageProduction(RDKafkaError::MessageSizeTooLarge)) => {
    return Err(ErrorKind::KafkaMessageTooLarge(size).into())
},
Err(err) => bail!(err)

After the pull it's only possible after waiting for the future. But then it's too late, we're producing a bunch of stuff and need the error when it actually occurs and not later.

So for us this change does not make sense.

@fede1024
Copy link
Owner

Thanks for the report! So, in my understanding, the main issue is that there are some errors that you want to handle immediately, before sending other messages, and others that you can deal with later on.

  • Regarding the KafkaMessageTooLarge error, I think that the new API would work just fine. You will just have to handle the error somewhere else, probably where you already handle the other possible errors for the futures.
  • Regarding the queue being full: I think this is the only error that must be handled straight-away, otherwise the producer will keep failing. There is an option in librdkafka to make the producer block if the queue is full: in this way you will never see the error, and you won't have to sleep.

Would this work for you?

@thijsc
Copy link
Collaborator

thijsc commented Sep 21, 2017

Yes, there are two distinct categories of errors when producing in librdkafka: Things can fail to be accepted onto the producer queue and the actual producing can fail.

In my mind it's a bad choice to hide this. Generally I want to get the error when it occurs, not as part of handling the delivery reports that the future provides.

In the case of message too large I need it straight away so you have a chance to split up the message into smaller parts for example. I you cannot get this error when producing you'd have to keep every single message around in memory until the delivery report can be checked.

So I really do think this is not a good change. We're hiding behaviour of the underlying library just for convenience. This has real downsides too.

@thijsc
Copy link
Collaborator

thijsc commented Sep 21, 2017

Not an expert on Futures, but maybe send_copy could return a https://docs.rs/futures/0.1.16/futures/future/struct.FutureResult.html which contains the future? This way I guess you could map_err it?

@fede1024
Copy link
Owner

I agree that we shouldn't hide behavior of the underlying library if not strictly needed, and making the API a bit nicer might not be a reason strong enough. At the same time, I feel there must be a better way than a Result. The FutureResult you suggest sounds promising. I'll check how the API would look like with it. Thanks! :)

@fede1024
Copy link
Owner

FutureResult seems a good solution to me. This is the code provided as example by @wrl:

kafka_producer
    .send_copy::<_, ()>("test", None, Some(&*format!("msg on {}", topic)), None, None)
    .map_err(|_| io::Error::new(io::ErrorKind::Other, "failed to send kafka message"))
    .and_then(|_| future::ok(()))
    .boxed()

and this is the code using a FutureResult<DeliveryFuture, KafkaError>:

producer
        .send_copy2::<_, ()>("test", None, Some(&*format!("msg on {}", topic)), None, None)
        .and_then(|delivery_future| delivery_future.and_then(|_delivery_report| future::ok(())))
        .map_err(|_e| io::Error::new(io::ErrorKind::Other, "failed to send kafka message"))
        .boxed()

I named the variables to show what they mean (also notice that this code won't check the value of the delivery_report, meaning that some errors won't be detected). @thijsc I think this solution should work for you, while remaining ergonomic enough. I pushed a send_copy2 branch where I added the send_copy2 method. Could you try it out and see if it fits your use case?

@fede1024
Copy link
Owner

fede1024 commented Oct 3, 2017

@thijsc I'm experimenting with a new API for both the BaseProducer and the FutureProducer that will return the whole message back to the user in case of failure. This should cover the MessageTooBig scenario that you described.
For the QueueFull, I think the best solution is to make the BaseProducer behave exactly how the librdkafka one works (returning an error), while instead the FutureProducer should block automatically. The idea behind the FutureProducer, and the higher level api in general, is that the user doesn't have to know about librdkafka, and low level details should be hidden. If the user really requires fine control over the Kafka producer, the BaseProducer should be used instead.
I think that with these two changes, your use case should be supported by the FutureProducer and should also require less code. Let me know what you think :) i'll push the branch soon.

@thijsc
Copy link
Collaborator

thijsc commented Nov 3, 2017

Sorry for the slow response, I was working on some other stuff in the mean time.

For us not having the FutureProducer immediately return errors when enqueuing onto librdkafka's internal queue completely breaks our code. The way I look at it this is an essential part of how this library works and this should not be hidden.

It's not only waiting for a full queue to clear up. Another very important use case is knowing whether a message is too big. This is also something we need to know beforehand so we can set aside that message for further processing.

If this API stays as is it means we'll have to rewrite our entire code base to move to the base producer with some polling threads. I could also live with a smarter BaseProducer that starts a polling thread internally. I've use a similar approach in the Ruby gem I'm working on and have been happy with that approach. For us the futures integration is not relevant at the moment.

If our use case is a huge outlier I could live with that decision. But I do feel that this is all pretty normal. It's the way librdkafka is structured, so I do not see a good argument for abstracting over that.

@fede1024
Copy link
Owner

fede1024 commented Nov 4, 2017

I definitely think your use case is not an outlier, and I'd like to find an API that works for you and that at the same time allows to easily use the send in a chain of futures, which in my understanding can only be done by returning something that implements future.

I thought that the new API would be able to cover your case:

  1. It will automatically block when the queue is full, if configured to do so.
  2. It will give you back a copy of the message if the send failed.

Having automatic "queue full" handling (1) should help in most use cases. If that's not the case in yours, I'd like to know why and see if maybe we can come up with something better. Similarly goes for (2): the message is returned when the production fails, so implementing the logic you were mentioning should still be possible. Maybe it's harder because you'll have to handle it asynchronously and you won't have access to the producer anymore?

I see other alternatives: for example it should be possible to have additional methods defined on the DeliveryFuture that can be used to check if the send failed straight away. Your code could simply check if those method return anything immediately after calling send_copy. Alternatively, I recently pulled out the thread handling code from the FutureProducer, and created a PollingProducer, which simply adds a polling thread to the BaseProducer. It should be pretty similar to what you described. Some methods are currently private, but I can easily make the public.

Could you give me some more details on why the new API doesn't work? Would an approach based on the PollingProducer work better?

@thijsc
Copy link
Collaborator

thijsc commented Nov 4, 2017

The PollingProducer looks like exactly what we need. Let me see if I can port our code to that producer.

@fede1024
Copy link
Owner

fede1024 commented Nov 4, 2017

Good. I'll make all of its methods public and add some documentation.

@thijsc
Copy link
Collaborator

thijsc commented Nov 30, 2017

I tried to port our code to the new ThreadedProducer but was unsuccessful. That producer does not return a result we can wait for like the FutureProducer.

We basically need exactly what the FutureProducer used to do before this pull request. I think we could achieve this by letting ThreadedProducer return a delivery handle. There's no need for futures for that. Are you open to that?

Another thought: Could we split the futures stuff out into a feature and make the crate fully work without it?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants