Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Important: fix offsets of compressed messageSet #33

Merged
merged 42 commits into from
Jun 20, 2017
Merged
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
bd10ea0
Implement Offset Commit/Fetch API
gregf1 Sep 7, 2016
4cdf870
Disable tcp checks. This is a temporary workaround for the fact this…
gregf1 Nov 9, 2016
f564d5c
Add support for ApiVersions and loading supported versions from server
dams Mar 30, 2017
73d61ae
remove debugging
dams Mar 31, 2017
dd10476
fix APIVERSION into DEFAULT_APIVERSION in test
dams Mar 31, 2017
123bae2
Support v1, v2, v3 of various API endpoints, including Timestamps
dams Apr 5, 2017
be502ed
Add .travis.yml for CI testing
Mar 27, 2017
7c391a6
Use Gzip::Faster
Mar 29, 2017
e811fab
Fix travis build config
Mar 31, 2017
4dbb061
Recent kafka requires jdk8 so require it in travis config
Apr 1, 2017
c95885a
Use jdk8 by default
Apr 1, 2017
9b6bc19
Update JAVA_HOME so it points to jdk8
Apr 8, 2017
652e1a7
Cache perl modules
Apr 9, 2017
721ec11
Increase timeouts in completion tests
Apr 9, 2017
f173d22
Preserve original IO error in connection exceptions
Apr 10, 2017
a18dae5
Enable debug output in completion tests
Apr 10, 2017
96afaa7
Increase timeouts
Apr 15, 2017
c148721
WIP
dams Apr 19, 2017
9c3c340
Add support for ApiVersions and loading supported versions from server
dams Mar 30, 2017
4ca7436
remove debugging
dams Mar 31, 2017
29bd286
fix APIVERSION into DEFAULT_APIVERSION in test
dams Mar 31, 2017
dd45139
Support v1, v2, v3 of various API endpoints, including Timestamps
dams Apr 5, 2017
5628ae8
Add documentation, fix tests, spellings and make Perl critics happy
dams Apr 19, 2017
7212e94
Merge branch 'dev/refactor_offsets'
dams Apr 19, 2017
aeb0565
. fixed the plural of RECEIVE_EARLIEST_OFFSET and RECEIVE_LATEST_OFFSETS
dams Apr 20, 2017
c9ffb1a
fix typo and make tests happy
dams Apr 21, 2017
da2ad68
. now api version detection happens per broker, and only when needed.
dams Apr 28, 2017
de530f8
fix doc
dams Apr 28, 2017
354d00d
when fetching a lot of messages or big payloads, with a large timeout…
dams Apr 28, 2017
c807fba
Save the original request version
dams May 4, 2017
ab35545
Better api version storing and caching
dams May 4, 2017
fe7799a
Merge remote-tracking branch 'gregf1/master'
dams May 4, 2017
431b2ba
typo
dams May 4, 2017
6cfe3e6
This workaround is not needed anymore because the aggressive call of …
dams May 5, 2017
b0d9c97
added api version supported by the offsetfetch and commit implementation
dams May 5, 2017
0543aa8
fix correlation_id creation for offset commit and offset fetch api
dams May 9, 2017
edf557e
Merge branch 'master' into bk-master
dams May 9, 2017
5bdec0c
fix spelling fot xt/author/01_collection.t
dams May 9, 2017
4bfe934
parameters passing was very buggy. If a param key was not a valid one…
May 9, 2017
47db663
take in account the fact that v1 messages in messagesets are larger, …
Jun 2, 2017
ecc5339
important fix : make sure the offset of the decoded messageSet in cas…
Jun 15, 2017
12c0976
Merge branch 'master' into bk-master
dams Jun 16, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions lib/Kafka/Protocol.pm
Original file line number Diff line number Diff line change
Expand Up @@ -1676,7 +1676,10 @@ sub _decode_fetch_response_template {

# Decrypts MessageSet
sub _decode_MessageSet_array {
my ( $response, $MessageSetSize, $i_ref, $MessageSet_array_ref) = @_;
my ( $response, $MessageSetSize, $i_ref, $MessageSet_array_ref, $_override_offset) = @_;
# $_override_offset should not be set manually, it's used when recursing,
# to decode internal compressed message, which have offsets starting at 0,
# 1, etc, and instead we need to set the external wrapping message offset.

my $data = $response->{data};
my $data_array_size = scalar @{ $data };
Expand All @@ -1685,8 +1688,12 @@ sub _decode_MessageSet_array {
my ( $Message, $MessageSize, $Crc, $Key_length, $Value_length );
while ( $MessageSetSize && $$i_ref < $data_array_size ) {

my $message_offset = _unpack64( $data->[ $$i_ref++ ] );
if (defined $_override_offset) {
$message_offset = $_override_offset;
}
$Message = {
Offset => _unpack64( $data->[ $$i_ref++ ] ), # Offset
Offset => $message_offset, # Offset
};

$MessageSize = $data->[ $$i_ref++ ]; # MessageSize
Expand Down Expand Up @@ -1753,6 +1760,7 @@ sub _decode_MessageSet_array {
$size, # message set size
\$i, # i_ref
$MessageSet_array_ref,
$message_offset
);
} else {
push( @$MessageSet_array_ref, $Message );
Expand Down