Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions checkstyle/import-control-core.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
<allow pkg="kafka.utils" />
<allow pkg="kafka.serializer" />
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.mockito" />


<!-- see KIP-544 for why KafkaYammerMetrics should be used instead of the global default yammer metrics registry
Expand All @@ -50,6 +51,7 @@
<allow pkg="io.netty" />
<allow pkg="redis.clients.jedis" />
<allow pkg="kafka.server" />
<allow pkg="kafka.log.s3" />
</subpackage>

<subpackage name="testkit">
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/kafka/log/s3/S3Client.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down
89 changes: 78 additions & 11 deletions core/src/main/scala/kafka/log/s3/S3Stream.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -17,26 +17,40 @@

package kafka.log.s3;

import com.automq.elasticstream.client.DefaultAppendResult;
import com.automq.elasticstream.client.api.AppendResult;
import com.automq.elasticstream.client.api.FetchResult;
import com.automq.elasticstream.client.api.RecordBatch;
import com.automq.elasticstream.client.api.RecordBatchWithContext;
import com.automq.elasticstream.client.api.Stream;
import kafka.log.s3.cache.ReadDataBlock;
import kafka.log.s3.cache.S3BlockCache;
import kafka.log.s3.model.StreamMetadata;
import kafka.log.s3.model.StreamRecordBatch;
import kafka.log.s3.objects.ObjectManager;
import kafka.log.s3.streams.StreamManager;

import java.util.LinkedList;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;

public class S3Stream implements Stream {
private final StreamMetadata metadata;
private final long streamId;
private final long epoch;
private final AtomicLong nextOffset;
private final Wal wal;
private final S3BlockCache blockCache;
private final StreamManager streamManager;
private final ObjectManager objectManager;

public S3Stream(StreamMetadata metadata, Wal wal, S3BlockCache blockCache, StreamManager streamManager, ObjectManager objectManager) {
this.metadata = metadata;
this.streamId = metadata.getStreamId();
this.epoch = metadata.getEpoch();
this.nextOffset = new AtomicLong(metadata.getRanges().get(metadata.getRanges().size() - 1).getStartOffset());
this.wal = wal;
this.blockCache = blockCache;
this.streamManager = streamManager;
Expand All @@ -55,31 +69,84 @@ public long startOffset() {

@Override
public long nextOffset() {
return 0;
return nextOffset.get();
}

@Override
public CompletableFuture<AppendResult> append(RecordBatch recordBatch) {
return null;
long offset = nextOffset.getAndIncrement();
StreamRecordBatch streamRecordBatch = new StreamRecordBatch(streamId, epoch, offset, recordBatch);
return wal.append(streamRecordBatch).thenApply(nil -> new DefaultAppendResult(offset));
}

@Override
public CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, int maxBytes) {
return null;
//TODO: bound check
//TODO: concurrent read.
//TODO: async read.
List<Long> objects = objectManager.getObjects(streamId, startOffset, endOffset, maxBytes);
long nextStartOffset = startOffset;
int nextMaxBytes = maxBytes;
List<RecordBatchWithContext> records = new LinkedList<>();
for (long objectId : objects) {
try {
ReadDataBlock dataBlock = blockCache.read(objectId, streamId, nextStartOffset, endOffset, nextMaxBytes).get();
OptionalLong blockStartOffset = dataBlock.startOffset();
if (blockStartOffset.isEmpty()) {
throw new IllegalStateException("expect not empty block from object[" + objectId + "]");
}
if (blockStartOffset.getAsLong() != nextStartOffset) {
throw new IllegalStateException("records not continuous, expect start offset[" + nextStartOffset + "], actual["
+ blockStartOffset.getAsLong() + "]");
}
records.addAll(dataBlock.getRecords());
// Already check start offset, so it's safe to get end offset.
//noinspection OptionalGetWithoutIsPresent
nextStartOffset = dataBlock.endOffset().getAsLong();
nextMaxBytes -= Math.min(nextMaxBytes, dataBlock.sizeInBytes());
if (nextStartOffset >= endOffset || nextMaxBytes <= 0) {
break;
}
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
//TODO: records integrity check.
return CompletableFuture.completedFuture(new DefaultFetchResult(records));
}

@Override
public CompletableFuture<Void> trim(long newStartOffset) {
if (newStartOffset < metadata.getStartOffset()) {
throw new IllegalArgumentException("newStartOffset[" + newStartOffset + "] cannot be less than current start offset["
+ metadata.getStartOffset() + "]");
}
metadata.setStartOffset(newStartOffset);
return streamManager.trimStream(metadata.getStreamId(), metadata.getEpoch(), newStartOffset);
}

@Override
public CompletableFuture<Void> close() {
return null;
// TODO: add stream status to fence future access.
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<Void> destroy() {
return null;
// TODO: add stream status to fence future access.
return streamManager.deleteStream(streamId, epoch);
}

static class DefaultFetchResult implements FetchResult {
private final List<RecordBatchWithContext> records;

public DefaultFetchResult(List<RecordBatchWithContext> records) {
this.records = records;
}

@Override
public List<RecordBatchWithContext> recordBatchList() {
return records;
}
}
}
12 changes: 6 additions & 6 deletions core/src/main/scala/kafka/log/s3/S3StreamClient.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/kafka/log/s3/Wal.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down
59 changes: 59 additions & 0 deletions core/src/main/scala/kafka/log/s3/cache/ReadDataBlock.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.log.s3.cache;

import com.automq.elasticstream.client.api.RecordBatchWithContext;

import java.util.List;
import java.util.OptionalLong;

public class ReadDataBlock {
private List<RecordBatchWithContext> records;

public ReadDataBlock(List<RecordBatchWithContext> records) {
this.records = records;
}

public List<RecordBatchWithContext> getRecords() {
return records;
}

public void setRecords(List<RecordBatchWithContext> records) {
this.records = records;
}

public OptionalLong startOffset() {
if (records.isEmpty()) {
return OptionalLong.empty();
} else {
return OptionalLong.of(records.get(0).baseOffset());
}
}

public OptionalLong endOffset() {
if (records.isEmpty()) {
return OptionalLong.empty();
} else {
return OptionalLong.of(records.get(records.size() - 1).lastOffset());
}
}

public int sizeInBytes() {
return records.stream().mapToInt(r -> r.rawPayload().remaining()).sum();
}
}
16 changes: 7 additions & 9 deletions core/src/main/scala/kafka/log/s3/cache/S3BlockCache.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -17,8 +17,6 @@

package kafka.log.s3.cache;

import com.automq.elasticstream.client.api.RecordBatch;

import java.util.concurrent.CompletableFuture;

/**
Expand All @@ -28,6 +26,6 @@
*/
public interface S3BlockCache {

CompletableFuture<RecordBatch> read(long objectId, long streamId, long startOffset, long endOffset, long maxBytes);
CompletableFuture<ReadDataBlock> read(long objectId, long streamId, long startOffset, long endOffset, int maxBytes);

}
19 changes: 13 additions & 6 deletions core/src/main/scala/kafka/log/s3/model/RangeMetadata.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -26,6 +26,13 @@ public class RangeMetadata {
private long endOffset;
private long serverId;

public RangeMetadata(int index, long startOffset, long endOffset, long serverId) {
this.index = index;
this.startOffset = startOffset;
this.endOffset = endOffset;
this.serverId = serverId;
}

public int getIndex() {
return index;
}
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/kafka/log/s3/model/StreamMetadata.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down
Loading