diff --git a/src/v/kafka/requests/fetch_request.cc b/src/v/kafka/requests/fetch_request.cc index 5f42bc19c4f75..226925c0b2b9c 100644 --- a/src/v/kafka/requests/fetch_request.cc +++ b/src/v/kafka/requests/fetch_request.cc @@ -265,11 +265,12 @@ std::ostream& operator<<(std::ostream& o, const fetch_response& r) { * Make a partition response error. */ static fetch_response::partition_response -make_partition_response_error(error_code error) { +make_partition_response_error(model::partition_id p_id, error_code error) { return fetch_response::partition_response{ + .id = p_id, .error = error, - .high_watermark = model::offset(0), - .last_stable_offset = model::offset(0), + .high_watermark = model::offset(-1), + .last_stable_offset = model::offset(-1), .record_set = iobuf(), }; } @@ -277,7 +278,8 @@ make_partition_response_error(error_code error) { static ss::future make_ready_partition_response_error(error_code error) { return ss::make_ready_future( - make_partition_response_error(error)); + // partiton id will be modified when assembling the response further + make_partition_response_error(model::partition_id(-1), error)); } /** @@ -437,7 +439,7 @@ handle_ntp_fetch(op_context& octx, model::ntp ntp, fetch_config config) { * error codes. */ octx.set_partition_response(make_partition_response_error( - error_code::unknown_server_error)); + p_id, error_code::unknown_server_error)); } }); } @@ -455,15 +457,15 @@ static ss::future<> fetch_topic_partition( if ( model::timeout_clock::now() > octx.deadline.value_or(model::no_timeout)) { - octx.set_partition_response( - make_partition_response_error(error_code::request_timed_out)); + octx.set_partition_response(make_partition_response_error( + part.id, error_code::request_timed_out)); return ss::now(); } // if over budget create placeholder response if (octx.bytes_left <= 0) { - octx.set_partition_response( - make_partition_response_error(error_code::message_too_large)); + octx.set_partition_response(make_partition_response_error( + part.id, error_code::message_too_large)); return ss::now(); } // if we already have data in response for this partition skip it