Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import kafka.log.es.api.RecordBatch;
import kafka.log.es.api.Stream;
import kafka.log.es.api.StreamClient;
import kafka.log.s3.S3StreamClient;
import org.apache.kafka.common.errors.es.SlowFetchHintException;
import org.apache.kafka.common.utils.ThreadUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -69,7 +68,7 @@ public class AlwaysSuccessClient implements Client {
ThreadUtils.createThreadFactory("fetch-callback-scheduler-%d", true));
private final ScheduledExecutorService delayFetchScheduler = Executors.newScheduledThreadPool(1,
ThreadUtils.createThreadFactory("fetch-delayer-%d", true));
private final StreamClientImpl streamClient;
private final StreamClient streamClient;
private final KVClient kvClient;
private final Delayer delayer;
/**
Expand Down Expand Up @@ -185,9 +184,7 @@ public CompletableFuture<Stream> openStream(long streamId, OpenStreamOptions opt
}

public void shutdown() {
if (streamClient instanceof S3StreamClient) {
((S3StreamClient) streamClient).shutdown();
}
streamClient.shutdown();
}

private void openStream0(long streamId, OpenStreamOptions options, CompletableFuture<Stream> cf) {
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/kafka/log/es/ElasticRedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ public CompletableFuture<Stream> createAndOpenStream(CreateStreamOptions createS
public CompletableFuture<Stream> openStream(long streamId, OpenStreamOptions openStreamOptions) {
return CompletableFuture.completedFuture(new StreamImpl(jedis, streamId));
}

@Override
public void shutdown() {

}
}

static class KVClientImpl implements KVClient {
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/kafka/log/es/MemoryClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ public CompletableFuture<Stream> createAndOpenStream(CreateStreamOptions createS
public CompletableFuture<Stream> openStream(long streamId, OpenStreamOptions openStreamOptions) {
return CompletableFuture.completedFuture(new StreamImpl(streamId));
}

@Override
public void shutdown() {

}
}

public static class KVClientImpl implements KVClient {
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/log/es/api/StreamClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,6 @@ public interface StreamClient {
* @return {@link Stream}.
*/
CompletableFuture<Stream> openStream(long streamId, OpenStreamOptions options);

void shutdown();
}
11 changes: 8 additions & 3 deletions core/src/main/scala/kafka/log/es/utils/Threads.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@

import org.slf4j.Logger;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;

public class Threads {

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory, Logger logger) {
return new ScheduledThreadPoolExecutor(1, threadFactory) {
public static ScheduledThreadPoolExecutor newSingleThreadScheduledExecutor(ThreadFactory threadFactory, Logger logger) {
return newSingleThreadScheduledExecutor(threadFactory, logger, false);
}

public static ScheduledThreadPoolExecutor newSingleThreadScheduledExecutor(ThreadFactory threadFactory, Logger logger, boolean removeOnCancelPolicy) {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, threadFactory) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
Expand All @@ -35,6 +38,8 @@ protected void afterExecute(Runnable r, Throwable t) {
}
}
};
executor.setRemoveOnCancelPolicy(removeOnCancelPolicy);
return executor;
}

}
4 changes: 0 additions & 4 deletions core/src/main/scala/kafka/log/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ public class S3Storage implements Storage {
private final ScheduledExecutorService backgroundExecutor = Threads.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("s3-storage-background", true), LOGGER);

private final ScheduledExecutorService streamObjectCompactionExecutor = Threads.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("stream-object-compaction-background", true), LOGGER);

private final ObjectManager objectManager;
private final S3Operator s3Operator;
private final S3BlockCache blockCache;
Expand All @@ -83,7 +80,6 @@ public S3Storage(KafkaConfig config, WriteAheadLog log, ObjectManager objectMana
public void close() {
mainExecutor.shutdown();
backgroundExecutor.shutdown();
streamObjectCompactionExecutor.shutdown();
}

@Override
Expand Down
18 changes: 4 additions & 14 deletions core/src/main/scala/kafka/log/s3/S3Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,10 @@ public class S3Stream implements Stream {
private final StreamManager streamManager;
private final Status status;
private final Function<Long, Void> closeHook;
private StreamObjectsCompactionTask streamObjectsCompactionTask;
private final StreamObjectsCompactionTask streamObjectsCompactionTask;

public S3Stream(long streamId, long epoch, long startOffset, long nextOffset, Storage storage, StreamManager streamManager) {
this(streamId, epoch, startOffset, nextOffset, storage, streamManager, x -> null);
}

public S3Stream(long streamId, long epoch, long startOffset, long nextOffset, Storage storage, StreamManager streamManager, Function<Long, Void> closeHook) {
public S3Stream(long streamId, long epoch, long startOffset, long nextOffset, Storage storage,
StreamManager streamManager, StreamObjectsCompactionTask.Builder compactionTaskBuilder, Function<Long, Void> closeHook) {
this.streamId = streamId;
this.epoch = epoch;
this.startOffset = startOffset;
Expand All @@ -71,18 +68,11 @@ public S3Stream(long streamId, long epoch, long startOffset, long nextOffset, St
this.status = new Status();
this.storage = storage;
this.streamManager = streamManager;
this.streamObjectsCompactionTask = compactionTaskBuilder.withStream(this).build();
this.closeHook = closeHook;
}

public void initCompactionTask(StreamObjectsCompactionTask streamObjectsCompactionTask) {
this.streamObjectsCompactionTask = streamObjectsCompactionTask;
}

public void triggerCompactionTask() throws ExecutionException, InterruptedException {
if (streamObjectsCompactionTask == null) {
throw new RuntimeException("stream objects compaction task is null");
}

streamObjectsCompactionTask.prepare();
streamObjectsCompactionTask.doCompactions().get();
}
Expand Down
31 changes: 17 additions & 14 deletions core/src/main/scala/kafka/log/s3/S3StreamClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import kafka.log.es.api.CreateStreamOptions;
import kafka.log.es.api.OpenStreamOptions;
Expand All @@ -43,8 +44,9 @@

public class S3StreamClient implements StreamClient {
private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamClient.class);
private final ScheduledExecutorService streamObjectCompactionExecutor = Threads.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("stream-object-compaction-background", true), LOGGER);
private final ScheduledThreadPoolExecutor streamObjectCompactionExecutor = Threads.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("stream-object-compaction-background", true), LOGGER, true);
private ScheduledFuture<?> scheduledCompactionTaskFuture;
private final Map<Long, S3Stream> openedStreams;

private final StreamManager streamManager;
Expand Down Expand Up @@ -78,7 +80,7 @@ public CompletableFuture<Stream> openStream(long streamId, OpenStreamOptions ope
* Start stream objects compactions.
*/
private void startStreamObjectsCompactions() {
streamObjectCompactionExecutor.scheduleWithFixedDelay(() -> {
scheduledCompactionTaskFuture = streamObjectCompactionExecutor.scheduleWithFixedDelay(() -> {
List<S3Stream> operationStreams = new LinkedList<>(openedStreams.values());
operationStreams.forEach(stream -> {
if (stream.isClosed()) {
Expand All @@ -101,25 +103,26 @@ private void startStreamObjectsCompactions() {
private CompletableFuture<Stream> openStream0(long streamId, long epoch) {
return streamManager.openStream(streamId, epoch).
thenApply(metadata -> {
StreamObjectsCompactionTask.Builder builder = new StreamObjectsCompactionTask.Builder(objectManager, s3Operator)
.withCompactedStreamObjectMaxSize(config.s3StreamObjectCompactionMaxSize())
.withCompactableStreamObjectLivingTimeInMs(config.s3StreamObjectCompactionLivingTimeThreshold());
S3Stream stream = new S3Stream(
metadata.getStreamId(), metadata.getEpoch(),
metadata.getStartOffset(), metadata.getNextOffset(),
storage, streamManager, id -> {
openedStreams.remove(id);
return null;
});
stream.initCompactionTask(generateStreamObjectsCompactionTask(stream));
storage, streamManager, builder, id -> {
openedStreams.remove(id);
return null;
});
openedStreams.put(streamId, stream);
return stream;
});
}

private StreamObjectsCompactionTask generateStreamObjectsCompactionTask(S3Stream stream) {
return new StreamObjectsCompactionTask(objectManager, s3Operator, stream,
config.s3StreamObjectCompactionMaxSize(), config.s3StreamObjectCompactionLivingTimeThreshold());
}

public void shutdown() {
// cancel the submitted task if not started; do not interrupt the task if it is running.
if (scheduledCompactionTaskFuture != null) {
scheduledCompactionTaskFuture.cancel(false);
}
streamObjectCompactionExecutor.shutdown();
}
}
35 changes: 32 additions & 3 deletions core/src/main/scala/kafka/log/s3/StreamObjectsCompactionTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package kafka.log.s3;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
Expand Down Expand Up @@ -136,15 +135,17 @@ public void prepare() {
public Queue<List<S3StreamObjectMetadata>> prepareCompactGroups(long startSearchingOffset) {
long startOffset = Utils.max(startSearchingOffset, stream.startOffset());
List<S3StreamObjectMetadata> rawFetchedStreamObjects = objectManager
.getStreamObjects(stream.streamId(), startOffset, stream.nextOffset(), Integer.MAX_VALUE);
.getStreamObjects(stream.streamId(), startOffset, stream.nextOffset(), Integer.MAX_VALUE)
.stream()
.sorted()
.collect(Collectors.toList());

this.nextStartSearchingOffset = calculateNextStartSearchingOffset(rawFetchedStreamObjects, startOffset);

List<S3StreamObjectMetadata> streamObjects = rawFetchedStreamObjects
.stream()
.filter(streamObject -> streamObject.objectSize() < compactedStreamObjectMaxSize)
.collect(Collectors.toList());
Collections.sort(streamObjects);

return groupContinuousObjects(streamObjects)
.stream()
Expand Down Expand Up @@ -276,4 +277,32 @@ public HaltException(String message) {
super(message);
}
}

public static class Builder {
private ObjectManager objectManager;
private S3Operator s3Operator;
private S3Stream stream;
private long compactedStreamObjectMaxSize;
private long compactableStreamObjectLivingTimeInMs;

public Builder(ObjectManager objectManager, S3Operator s3Operator) {
this.objectManager = objectManager;
this.s3Operator = s3Operator;
}
public Builder withStream(S3Stream stream) {
this.stream = stream;
return this;
}
public Builder withCompactedStreamObjectMaxSize(long compactedStreamObjectMaxSize) {
this.compactedStreamObjectMaxSize = compactedStreamObjectMaxSize;
return this;
}
public Builder withCompactableStreamObjectLivingTimeInMs(long compactableStreamObjectLivingTimeInMs) {
this.compactableStreamObjectLivingTimeInMs = compactableStreamObjectLivingTimeInMs;
return this;
}
public StreamObjectsCompactionTask build() {
return new StreamObjectsCompactionTask(objectManager, s3Operator, stream, compactedStreamObjectMaxSize, compactableStreamObjectLivingTimeInMs);
}
}
}
5 changes: 5 additions & 0 deletions core/src/test/java/kafka/log/es/AlwaysSuccessClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,11 @@ public CompletableFuture<Stream> openStream(long streamId, OpenStreamOptions ope
}
return CompletableFuture.completedFuture(new TestStreamImpl(streamId, delayMillis, exceptionHint));
}

@Override
public void shutdown() {

}
}

static class TestStreamImpl implements Stream {
Expand Down
3 changes: 2 additions & 1 deletion core/src/test/java/kafka/log/s3/S3StreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public class S3StreamTest {
public void setup() {
storage = mock(Storage.class);
streamManager = mock(StreamManager.class);
stream = new S3Stream(233, 1, 100, 233, storage, streamManager);
StreamObjectsCompactionTask.Builder builder = new StreamObjectsCompactionTask.Builder(null, null);
stream = new S3Stream(233, 1, 100, 233, storage, streamManager, builder, v -> null);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
@Tag("S3Unit")
public class StreamObjectCopyerTest {
@Test
public void testCompact() throws ExecutionException, InterruptedException {
public void testCopy() throws ExecutionException, InterruptedException {
long targetObjectId = 10;
long streamId = 233;

Expand Down
Loading