Skip to content

Commit

Permalink
k/fetch_request: setting partition id in fetch errors
Browse files Browse the repository at this point in the history
When fetch response partition error is sent we have to set partition id
in the response or client will treat it as unknown partition.

Fixes: redpanda-data#126

Signed-off-by: Michal Maslanka <michal@vectorized.io>
  • Loading branch information
mmaslankaprv authored and andresaristizabal committed Nov 26, 2020
1 parent 9edcea0 commit 94ddb9c
Showing 1 changed file with 11 additions and 9 deletions.
20 changes: 11 additions & 9 deletions src/v/kafka/requests/fetch_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,19 +265,21 @@ std::ostream& operator<<(std::ostream& o, const fetch_response& r) {
* Make a partition response error.
*/
static fetch_response::partition_response
make_partition_response_error(error_code error) {
make_partition_response_error(model::partition_id p_id, error_code error) {
return fetch_response::partition_response{
.id = p_id,
.error = error,
.high_watermark = model::offset(0),
.last_stable_offset = model::offset(0),
.high_watermark = model::offset(-1),
.last_stable_offset = model::offset(-1),
.record_set = iobuf(),
};
}

static ss::future<fetch_response::partition_response>
make_ready_partition_response_error(error_code error) {
return ss::make_ready_future<fetch_response::partition_response>(
make_partition_response_error(error));
// partiton id will be modified when assembling the response further
make_partition_response_error(model::partition_id(-1), error));
}

/**
Expand Down Expand Up @@ -437,7 +439,7 @@ handle_ntp_fetch(op_context& octx, model::ntp ntp, fetch_config config) {
* error codes.
*/
octx.set_partition_response(make_partition_response_error(
error_code::unknown_server_error));
p_id, error_code::unknown_server_error));
}
});
}
Expand All @@ -455,15 +457,15 @@ static ss::future<> fetch_topic_partition(

if (
model::timeout_clock::now() > octx.deadline.value_or(model::no_timeout)) {
octx.set_partition_response(
make_partition_response_error(error_code::request_timed_out));
octx.set_partition_response(make_partition_response_error(
part.id, error_code::request_timed_out));
return ss::now();
}

// if over budget create placeholder response
if (octx.bytes_left <= 0) {
octx.set_partition_response(
make_partition_response_error(error_code::message_too_large));
octx.set_partition_response(make_partition_response_error(
part.id, error_code::message_too_large));
return ss::now();
}
// if we already have data in response for this partition skip it
Expand Down

0 comments on commit 94ddb9c

Please sign in to comment.