Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ public enum ApiKeys {
CLOSE_STREAM(ApiMessageType.CLOSE_STREAM, false, true),
PREPARE_S3_OBJECT(ApiMessageType.PREPARE_S3_OBJECT, false, true),
COMMIT_WALOBJECT(ApiMessageType.COMMIT_WALOBJECT, false, true),
COMMIT_COMPACT_OBJECT(ApiMessageType.COMMIT_COMPACT_OBJECT, false, true),
COMMIT_STREAM_OBJECT(ApiMessageType.COMMIT_STREAM_OBJECT, false, true);
COMMIT_STREAM_OBJECT(ApiMessageType.COMMIT_STREAM_OBJECT, false, true),
GET_STREAMS_OFFSET(ApiMessageType.GET_STREAMS_OFFSET, false, true);
// Kafka on S3 inject end

private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
* limitations under the License.
*/

package org.apache.kafka.common.requests;
package org.apache.kafka.common.requests.s3;

import org.apache.kafka.common.message.CloseStreamRequestData;
import org.apache.kafka.common.message.CloseStreamResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiError;

public class CloseStreamRequest extends AbstractRequest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
* limitations under the License.
*/

package org.apache.kafka.common.requests;
package org.apache.kafka.common.requests.s3;

import java.util.Map;
import org.apache.kafka.common.message.CloseStreamResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;

public class CloseStreamResponse extends AbstractResponse {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
* limitations under the License.
*/

package org.apache.kafka.common.requests;
package org.apache.kafka.common.requests.s3;

import org.apache.kafka.common.message.CommitStreamObjectRequestData;
import org.apache.kafka.common.message.CommitStreamObjectResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiError;

public class CommitStreamObjectRequest extends AbstractRequest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
* limitations under the License.
*/

package org.apache.kafka.common.requests;
package org.apache.kafka.common.requests.s3;

import java.util.Map;
import org.apache.kafka.common.message.CommitStreamObjectResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;

public class CommitStreamObjectResponse extends AbstractResponse {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
* limitations under the License.
*/

package org.apache.kafka.common.requests;
package org.apache.kafka.common.requests.s3;

import org.apache.kafka.common.message.CommitWALObjectRequestData;
import org.apache.kafka.common.message.CommitWALObjectResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiError;

public class CommitWALObjectRequest extends AbstractRequest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
* limitations under the License.
*/

package org.apache.kafka.common.requests;
package org.apache.kafka.common.requests.s3;

import java.util.Map;
import org.apache.kafka.common.message.CommitWALObjectResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;

public class CommitWALObjectResponse extends AbstractResponse {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
* limitations under the License.
*/

package org.apache.kafka.common.requests;
package org.apache.kafka.common.requests.s3;

import org.apache.kafka.common.message.DeleteStreamRequestData;
import org.apache.kafka.common.message.DeleteStreamResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiError;

public class DeleteStreamRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<DeleteStreamRequest> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
package org.apache.kafka.common.requests.s3;

import java.util.Map;
import org.apache.kafka.common.message.DeleteStreamResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;

public class DeleteStreamResponse extends AbstractResponse {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,49 +15,54 @@
* limitations under the License.
*/

package org.apache.kafka.common.requests;
package org.apache.kafka.common.requests.s3;

import org.apache.kafka.common.message.CommitCompactObjectRequestData;
import org.apache.kafka.common.message.CommitCompactObjectResponseData;
import org.apache.kafka.common.message.GetStreamsOffsetRequestData;
import org.apache.kafka.common.message.CreateStreamResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiError;

public class CommitCompactObjectRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<CommitCompactObjectRequest> {
public class GetStreamsOffsetRequest extends AbstractRequest {

private final CommitCompactObjectRequestData data;
public Builder(CommitCompactObjectRequestData data) {
super(ApiKeys.COMMIT_COMPACT_OBJECT);
public static class Builder extends AbstractRequest.Builder<GetStreamsOffsetRequest> {

private final GetStreamsOffsetRequestData data;
public Builder(GetStreamsOffsetRequestData data) {
super(ApiKeys.GET_STREAMS_OFFSET);
this.data = data;
}

@Override
public CommitCompactObjectRequest build(short version) {
return new CommitCompactObjectRequest(data, version);
public GetStreamsOffsetRequest build(short version) {
return new GetStreamsOffsetRequest(data, version);
}

@Override
public String toString() {
return data.toString();
}
}
private final CommitCompactObjectRequestData data;

public CommitCompactObjectRequest(CommitCompactObjectRequestData data, short version) {
super(ApiKeys.DELETE_STREAM, version);
private final GetStreamsOffsetRequestData data;

public GetStreamsOffsetRequest(GetStreamsOffsetRequestData data, short version) {
super(ApiKeys.GET_STREAMS_OFFSET, version);
this.data = data;
}

@Override
public CommitCompactObjectResponse getErrorResponse(int throttleTimeMs, Throwable e) {
public CreateStreamResponse getErrorResponse(int throttleTimeMs, Throwable e) {
ApiError apiError = ApiError.fromThrowable(e);
CommitCompactObjectResponseData response = new CommitCompactObjectResponseData()
CreateStreamResponseData response = new CreateStreamResponseData()
.setErrorCode(apiError.error().code())
.setThrottleTimeMs(throttleTimeMs);
return new CommitCompactObjectResponse(response);
return new CreateStreamResponse(response);
}

@Override
public CommitCompactObjectRequestData data() {
public GetStreamsOffsetRequestData data() {
return data;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,24 @@
* limitations under the License.
*/

package org.apache.kafka.common.requests;
package org.apache.kafka.common.requests.s3;

import java.util.Map;
import org.apache.kafka.common.message.CommitCompactObjectResponseData;
import org.apache.kafka.common.message.GetStreamsOffsetResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;

public class CommitCompactObjectResponse extends AbstractResponse {
public class GetStreamsOffsetResponse extends AbstractResponse {
private final GetStreamsOffsetResponseData data;

private final CommitCompactObjectResponseData data;

public CommitCompactObjectResponse(CommitCompactObjectResponseData data) {
super(ApiKeys.COMMIT_COMPACT_OBJECT);
public GetStreamsOffsetResponse(GetStreamsOffsetResponseData data) {
super(ApiKeys.GET_STREAMS_OFFSET);
this.data = data;
}

@Override
public CommitCompactObjectResponseData data() {
public GetStreamsOffsetResponseData data() {
return data;
}

Expand All @@ -50,5 +50,4 @@ public int throttleTimeMs() {
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
* limitations under the License.
*/

package org.apache.kafka.common.requests;
package org.apache.kafka.common.requests.s3;

import org.apache.kafka.common.message.OpenStreamRequestData;
import org.apache.kafka.common.message.OpenStreamResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiError;

public class OpenStreamRequest extends AbstractRequest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
* limitations under the License.
*/

package org.apache.kafka.common.requests;
package org.apache.kafka.common.requests.s3;

import java.util.Map;
import org.apache.kafka.common.message.OpenStreamResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;

public class OpenStreamResponse extends AbstractResponse {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
* limitations under the License.
*/

package org.apache.kafka.common.requests;
package org.apache.kafka.common.requests.s3;

import org.apache.kafka.common.message.PrepareS3ObjectRequestData;
import org.apache.kafka.common.message.PrepareS3ObjectResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiError;

public class PrepareS3ObjectRequest extends AbstractRequest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
* limitations under the License.
*/

package org.apache.kafka.common.requests;
package org.apache.kafka.common.requests.s3;

import java.util.Map;
import org.apache.kafka.common.message.PrepareS3ObjectResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;

public class PrepareS3ObjectResponse extends AbstractResponse {
private final PrepareS3ObjectResponseData data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// limitations under the License.

{
"apiKey": 70,
"apiKey": 503,
"type": "request",
"listeners": [
"controller",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// limitations under the License.

{
"apiKey": 70,
"apiKey": 503,
"type": "response",
"name": "CloseStreamResponse",
"validVersions": "0",
Expand Down
Loading