-
Notifications
You must be signed in to change notification settings - Fork 28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
API versioning, timestamps support and offsets queries #27
Conversation
. support for v1, v2, v3 version of Fetch API . support for v1 version of messages . support for v1, v2 version of Produce API . support for timestamp in messages
Change-Id: Ie1104395359f8a1f7958f22a00536b54ae0e3e89
Change-Id: I622e484c52431719f1eedc9655180fb1ddbb76c0
Change-Id: I51c3a4892846a327fb13488e320760793adc35ab
Change-Id: I8ba3391c980ed04a51ecd2ff0d628cc3916075e5
Change-Id: Ie461f89d3f6d217cf27786d32e04f33229bce3ab
Change-Id: I72d67662e1f83d0ec5518b5fe29509e95200ce82
Change-Id: I40516057b2a625438005953fc711cebbf14d5332
Change-Id: Id9cfd41b5277d0f00604dd63711b9f7207203018
Change-Id: I022707772c1e3df56d215f66986e295eed70d66a
Change-Id: Iac77a69ef2d9e04cce2536a3731bd7a1a775c1a7
Change-Id: I907d5448061c0ebe7a9aac55c0c0ba28734c3643
. support for v1, v2, v3 version of Fetch API . support for v1 version of messages . support for v1, v2 version of Produce API . support for timestamp in messages
. fixed messagesets parsing, now we don't need to keep the api version around, we use the MagicByte to find out the message format version . various minor fix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work implementing new protocol versions. However, there are two major issues:
- API version is broker-dependent, so you need to store and select appropriate api version after selecting server from cluster.
- There is no test which tries all implemented API versions so there may be subtle bugs. I understand that such test may be quite complex, but protocol encoding and decoding is complicated and error-prone, so at least some tests are needed for the new code.
lib/Kafka/Connection.pm
Outdated
# 0.10.0.0 so this call may fail. We simply ignore this failure and | ||
# carry on. | ||
$api_versions = $self->_get_supported_api_versions(); | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no catch{} for this try{}, and this may cause subtle errors silently ignored. For example, no one will see typo in the long method name above.
my $implemented_max_version = $IMPLEMENTED_APIVERSIONS->{$api_key} // -1; | ||
my $version = $kafka_max_version; | ||
$version > $implemented_max_version | ||
and $version = $implemented_max_version; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a minor note: why unusual way $version > $implemented_max_version and $version = $implemented_max_version
instead of more traditional $version = $implemented_max_version if $version > $implemented_max_version
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's the style I usually use. I prefer it because it keeps the order of the expressions, it's same as if ($foo) { $bar }
.
$foo and $bar
: the conditional expression is first. With $bar if $foo
, the order is reversed, which I don't like. I usually try to stick to the original code's style, but I have forgotten here.
lib/Kafka/Connection.pm
Outdated
} | ||
|
||
my $decoded_response = $protocol{ $APIKEY_APIVERSIONS }->{decode}->( $encoded_response_ref ); | ||
say STDERR format_message( '[%s] metadata response: %s', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this debug message should say "apiversions response" instead of "metadata".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indeed, thanks
lib/Kafka/Connection.pm
Outdated
@@ -474,6 +493,9 @@ sub new { | |||
$self->_error( $ERROR_MISMATCH_ARGUMENT, 'server is not specified' ) | |||
unless keys( %$IO_cache ); | |||
|
|||
$self->{dont_load_supported_api_versions} | |||
or $self->load_supported_api_versions(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also do this on each metadata update as well? Cluster metadata response may include new brokers, changed topics configuration etc. So API versions may also change, I suspect.
lib/Kafka/Connection.pm
Outdated
my @brokers = $self->_get_interviewed_servers; | ||
|
||
# receive metadata | ||
foreach my $broker ( @brokers ) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
API versions are broker-dependent. The cluster may have brokers of different versions running at the same time. This helps with Kafka release updates, when you add a new broker running updated version and then shut down older brokers. See rolling upgrade instructions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. I'll have to change where I store the versions, and a lot of other code.
lib/Kafka/Connection.pm
Outdated
# we detected from the Kafka server and that our protocol implements. | ||
# $self->{_api_versions}{$api_key} might be undef if detection against the | ||
# Kafka server was not done | ||
$request->{ApiVersion} //= $self->{_api_versions}{$api_key}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
API version is broker-dependent. You have to store those versions per broker, and encode request only after selecting a broker in ATTEMPT loop below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll do that, yes. There is something I don't understand: UPDATE: it's all happening in the
# hash metadata could be updated my $leader = $self->{_metadata}->{ $topic_name }->{ $partition }->{Leader};
How is the hash metadata updated ? from what I understand, the loop simply retry the same leader box, not sure how metadata can be changed behind our back ?continue
:)
@@ -352,10 +353,12 @@ sub fetch { | |||
|
|||
my $request = { | |||
ApiKey => $APIKEY_FETCH, | |||
ApiVersion => $api_version, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As API version is broker-dependent, you cannot set it here, before selecting a broker to work with. So I think protocol API versions should not be provided in fetch() and other methods parameters as user does not have any real control over broker selection, unless you want to request a minimum API version and produce error or warning if it is not supported by a selected broker. Or you may even loop over available brokers to find if any of them supports requested version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are right about supported versions per broker. However I think it's valid to have a $api_version
here as parameter, to allow the user to specify a given api version to use. I don't think it's a good approach to loop over available brokers to find the right api version: during a migration, it can easily lead to all request going to only one broker. I'd rather have client requests fails in some cases than trying to make it right by possibly DDOS'ing a broker
. fixed some typos
I think I've fixed most of the issues, except tests. I'll try to create some |
…s, there will be a lot of iteration, receiving data. It should not be considered as a retry attempt, as we received some data. So this change decrements retries so that it's not considered as a failed attempt
lib/Kafka/Connection.pm
Outdated
# detection against the Kafka server failed, or if | ||
# dont_load_supported_api_versions is true | ||
|
||
$request->{ApiVersion} //= $self->_get_api_versions($server)->{$api_key}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If previous attempt in this loop failed, we may select another broker here, but $request->{ApiVersion} is already defined and set to the version of the other server from the previous attempt. So we cannot use //= here.
lib/Kafka/Connection.pm
Outdated
|
||
# if the cached data we have is too old, delete it | ||
defined $self->{_api_versions}{$server} | ||
&& $self->{_api_versions}{$server}{__timestamp_sec__} + $self->{api_versions_refresh_delay_sec} < time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is not optimal. Server versions do not change often and never change while we have a working connection to the server. So I'd rather put versions into the IO cache. Initialize it once, when a new connection is established and do not refresh until that connection is closed. You can also keep global cache in $self and use it (if fresh enough) for the new connections instead of requesting versions from the server on each new connection. But if the new connection requested versions, they should be stored in IO cache and in global cache.
@@ -449,6 +449,7 @@ sub receive { | |||
if ( defined( $from_recv ) && length( $buf ) ) { | |||
$message .= $buf; | |||
$len_to_read = $length - length( $message ); | |||
--$retries; # this attempt was successful, don't count as a retry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to have fixed the Travis test failures. How do you found this? Anyway, thanks for finding and fixing the issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran a lot of tests, including using the kafka topics we have at work, with a shitload of data and very big payloads. these payload needed more than 30 loops so I saw that I had incomplete results and fetching errors, so I debugged.
When a request is done, it may be tried on different brokers in cas of failing brokers. Either the api version is enforced when the request was created, or it is detected against the broker used. However, the version should be detected again when we swtich broker. It was not the case before, now it is.
store the api versions inside the IO cache, so that it's removed/updated whenever needed via _update_metadata.
looks good to me now |
This is a better attempt at the recent PR that I closed because it was against dams/Kafka/master, which is a moving target.
This PR contains:
The commit history is ugly, sorry for that, I don't know how to fix it, fill free to rewrite the history if you can. It's quite a lot of code, I'm available for any feedback you have. All tests pass (except 43 that gives some errors as reported seperately, but the original code did as well)
Thanks