Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,10 @@ if (file('.git').exists()) {
'streams/streams-scala/logs/*',
'licenses/*',
'**/generated/**',
'clients/src/test/resources/serializedData/*'
'clients/src/test/resources/serializedData/*',
'.github/**',
'tests/docker/es/docker-compose.yaml',
'tests/esk_test_suite.yml'
])
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,20 @@
*/
public class SlowFetchHintException extends RetriableException {
private static final long serialVersionUID = 1L;
public SlowFetchHintException() { super();}
public SlowFetchHintException() {
super();
}

public SlowFetchHintException(String message) { super(message); }
public SlowFetchHintException(String message) {
super(message);
}

public SlowFetchHintException(Throwable cause) { super(cause); }
public SlowFetchHintException(Throwable cause) {
super(cause);
}

public SlowFetchHintException(String message, Throwable cause) { super(message, cause); }
public SlowFetchHintException(String message, Throwable cause) {
super(message, cause);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.errors.s3;

import org.apache.kafka.common.errors.ApiException;

public class StreamExistException extends ApiException {
public StreamExistException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


package org.apache.kafka.common.errors.s3;

import org.apache.kafka.common.errors.ApiException;

public class StreamFencedException extends ApiException {

public StreamFencedException(String message) {
super(message);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.errors.s3;

import org.apache.kafka.common.errors.ApiException;

public class StreamNotExistException extends ApiException {

public StreamNotExistException(String message) {
super(message);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.s3.StreamExistException;
import org.apache.kafka.common.errors.s3.StreamFencedException;
import org.apache.kafka.common.errors.s3.StreamNotExistException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -370,7 +373,16 @@ public enum Errors {
TRANSACTIONAL_ID_NOT_FOUND(105, "The transactionalId could not be found", TransactionalIdNotFoundException::new),
FETCH_SESSION_TOPIC_ID_ERROR(106, "The fetch session encountered inconsistent topic ID usage", FetchSessionTopicIdException::new),
INELIGIBLE_REPLICA(107, "The new ISR contains at least one ineligible replica.", IneligibleReplicaException::new),
NEW_LEADER_ELECTED(108, "The AlterPartition request successfully updated the partition state but the leader has changed.", NewLeaderElectedException::new);
NEW_LEADER_ELECTED(108, "The AlterPartition request successfully updated the partition state but the leader has changed.", NewLeaderElectedException::new),

// Kafka on S3 inject start

STREAM_EXIST(501, "The stream already exists.", StreamExistException::new),
STREAM_NOT_EXIST(502, "The stream does not exist.", StreamNotExistException::new),
STREAM_FENCED(503, "The stream is fenced.", StreamFencedException::new);


// Kafka on S3 inject end

private static final Logger log = LoggerFactory.getLogger(Errors.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.kafka.common.message.OpenStreamRequestData;
import org.apache.kafka.common.message.OpenStreamResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;

public class OpenStreamRequest extends AbstractRequest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.Map;
import org.apache.kafka.common.message.OpenStreamResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;

public class OpenStreamResponse extends AbstractResponse {
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@SuppressWarnings("uncheck")
public class AlwaysSuccessClient implements Client {
private static final Logger LOGGER = LoggerFactory.getLogger(AlwaysSuccessClient.class);

Expand Down Expand Up @@ -128,7 +129,7 @@ static class StreamImpl implements Stream {
private final Stream stream;
private volatile boolean closed = false;
private final Map<String, Boolean> slowFetchingOffsetMap = new ConcurrentHashMap<>();
private final long SLOW_FETCH_TIMEOUT_MILLIS = 10;
private static final long SLOW_FETCH_TIMEOUT_MILLIS = 10;

public StreamImpl(Stream stream) {
this.stream = stream;
Expand Down Expand Up @@ -179,7 +180,7 @@ public CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, in
if (ex != null) {
if (closed) {
cf.completeExceptionally(new IllegalStateException("stream already closed"));
} else if (ex instanceof TimeoutException){
} else if (ex instanceof TimeoutException) {
LOGGER.info("Fetch stream[{}] [{},{}) timeout for {} ms, retry with slow fetching", streamId(), startOffset, endOffset, SLOW_FETCH_TIMEOUT_MILLIS);
cf.completeExceptionally(new SlowFetchHintException("fetch data too slowly, retry with slow fetching"));
slowFetchingOffsetMap.put(slowFetchKey, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public FetchResult fetch(long startOffset, long endOffset, int maxBytesHint) thr
return stream.fetch(startOffsetInStream + fixedStartOffset, startOffsetInStream + endOffset, maxBytesHint).thenApply(FetchResultWrapper::new).get();
} catch (ExecutionException e) {
if (e.getCause() instanceof SlowFetchHintException) {
throw (SlowFetchHintException)(e.getCause());
throw (SlowFetchHintException) (e.getCause());
} else {
throw new RuntimeException(e.getCause());
}
Expand Down
18 changes: 18 additions & 0 deletions core/src/test/scala/kafka/log/es/ElasticLogReopenTester.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


package kafka.log.es

import joptsimple.OptionParser
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.kafka.message;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kafka.message.FieldType.Int64FieldType;

public enum EntityType {
@JsonProperty("unknown")
Expand All @@ -41,10 +40,10 @@ public enum EntityType {

// Kafka on S3 inject start
@JsonProperty("streamId")
STREAM_ID(Int64FieldType.INSTANCE),
STREAM_ID(FieldType.Int64FieldType.INSTANCE),

@JsonProperty("streamEpoch")
STREAM_EPOCH(Int64FieldType.INSTANCE);
STREAM_EPOCH(FieldType.Int64FieldType.INSTANCE);
// Kafka on S3 inject end
private final FieldType baseType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,6 @@
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.CloseStreamRequest;
import org.apache.kafka.common.requests.CloseStreamResponse;
import org.apache.kafka.common.requests.CommitCompactObjectRequest;
import org.apache.kafka.common.requests.CommitStreamObjectRequest;
import org.apache.kafka.common.requests.PrepareS3ObjectRequest;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2186,44 +2186,52 @@ public CompletableFuture<Void> notifyS3ObjectDeleted(ControllerRequestContext co

@Override
public CompletableFuture<CreateStreamResponseData> createStream(ControllerRequestContext context, CreateStreamRequestData request) {
return null;
return appendWriteEvent("creatStream", context.deadlineNs(),
() -> streamControlManager.createStream(request));
}

@Override
public CompletableFuture<OpenStreamResponseData> openStream(ControllerRequestContext context, OpenStreamRequestData request) {
return null;
return appendWriteEvent("openStream", context.deadlineNs(),
() -> streamControlManager.openStream(request));
}

@Override
public CompletableFuture<CloseStreamResponseData> closeStream(ControllerRequestContext context, CloseStreamRequestData response) {
return null;
return appendWriteEvent("closeStream", context.deadlineNs(),
() -> streamControlManager.closeStream(response));
}

@Override
public CompletableFuture<DeleteStreamResponseData> deleteStream(ControllerRequestContext context, DeleteStreamRequestData request) {
return null;
return appendWriteEvent("deleteStream", context.deadlineNs(),
() -> streamControlManager.deleteStream(request));
}

@Override
public CompletableFuture<PrepareS3ObjectResponseData> prepareObject(ControllerRequestContext context, PrepareS3ObjectRequestData request) {
return null;
return appendWriteEvent("prepareObject", context.deadlineNs(),
() -> s3ObjectControlManager.prepareObject(request));
}

@Override
public CompletableFuture<CommitWALObjectResponseData> commitWALObject(ControllerRequestContext context, CommitWALObjectRequestData request) {
return null;
return appendWriteEvent("commitWALObject", context.deadlineNs(),
() -> streamControlManager.commitWALObject(request));
}

@Override
public CompletableFuture<CommitCompactObjectResponseData> commitCompactObject(ControllerRequestContext context,
CommitCompactObjectRequestData request) {
return null;
return appendWriteEvent("commitCompactObject", context.deadlineNs(),
() -> streamControlManager.commitCompactObject(request));
}

@Override
public CompletableFuture<CommitStreamObjectResponseData> commitStreamObject(ControllerRequestContext context,
CommitStreamObjectRequestData request) {
return null;
return appendWriteEvent("commitStreamObject", context.deadlineNs(),
() -> streamControlManager.commitStreamObject(request));
}
// Kafka on S3 inject end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.common.message.PrepareS3ObjectRequestData;
import org.apache.kafka.common.message.PrepareS3ObjectResponseData;
import org.apache.kafka.common.metadata.RemoveS3ObjectRecord;
import org.apache.kafka.common.metadata.S3ObjectRecord;
import org.apache.kafka.common.utils.LogContext;
Expand Down Expand Up @@ -119,6 +121,10 @@ public Long nextAssignedObjectId() {
return nextAssignedObjectId;
}

public ControllerResult<PrepareS3ObjectResponseData> prepareObject(PrepareS3ObjectRequestData data) {
throw new UnsupportedOperationException();
}

public void replay(S3ObjectRecord record) {
GenerateContextV0 ctx = new GenerateContextV0(clusterId, record.objectId());
String objectKey = S3ObjectKeyGeneratorManager.getByVersion(0).generate(ctx);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down
Loading