Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,5 @@
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{
"name": "StreamId",
"type": "int64",
"versions": "0+",
"entityType": "streamId",
"about": "The id of the requesting stream"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@
"type": "int32",
"versions": "0+",
"about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota."
},
{
"name": "StreamId",
"type": "int64",
"versions": "0+",
"entityType": "streamId",
"about": "The id of the created stream"
}
]
}
2 changes: 2 additions & 0 deletions core/src/test/java/kafka/log/s3/ObjectWriterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import kafka.log.s3.operator.MemoryS3Operator;
import kafka.log.s3.operator.S3Operator;
import org.apache.kafka.metadata.stream.S3ObjectType;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import java.util.Iterator;
Expand All @@ -33,6 +34,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;

@Tag("S3Unit")
public class ObjectWriterTest {

@Test
Expand Down
2 changes: 2 additions & 0 deletions core/src/test/java/kafka/log/s3/S3StreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import kafka.log.s3.model.StreamMetadata;
import kafka.log.s3.streams.StreamManager;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import java.util.List;
Expand All @@ -37,6 +38,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@Tag("S3Unit")
public class S3StreamTest {
Wal wal;
S3BlockCache blockCache;
Expand Down
2 changes: 2 additions & 0 deletions core/src/test/java/kafka/log/s3/S3WalTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import kafka.log.s3.objects.ObjectStreamRange;
import kafka.log.s3.operator.MemoryS3Operator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;

Expand All @@ -39,6 +40,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@Tag("S3Unit")
public class S3WalTest {
ObjectManager objectManager;
S3Wal s3Wal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ public String toString() {

private final S3ObjectControlManager s3ObjectControlManager;

/**
* The next stream id to be assigned.
*/
private Long nextAssignedStreamId = 0L;

private final TimelineHashMap<Long/*streamId*/, S3StreamMetadata> streamsMetadata;

private final TimelineHashMap<Integer/*brokerId*/, BrokerS3WALMetadata> brokersMetadata;
Expand All @@ -159,19 +164,15 @@ public StreamControlManager(
// TODO: lazy update range's end offset
// TODO: controller allocate the stream id
public ControllerResult<CreateStreamResponseData> createStream(CreateStreamRequestData data) {
long streamId = data.streamId();
CreateStreamResponseData resp = new CreateStreamResponseData();
if (this.streamsMetadata.containsKey(streamId)) {
// already exist
resp.setErrorCode(Errors.STREAM_EXIST.code());
return ControllerResult.of(Collections.emptyList(), resp);
}
long streamId = nextAssignedStreamId;
// create stream
ApiMessageAndVersion record = new ApiMessageAndVersion(new S3StreamRecord()
.setStreamId(streamId)
.setEpoch(0)
.setStartOffset(0L)
.setRangeIndex(-1), (short) 0);
resp.setStreamId(streamId);
return ControllerResult.of(Arrays.asList(record), resp);
}

Expand Down Expand Up @@ -267,6 +268,7 @@ public void replay(S3StreamRecord record) {
S3StreamMetadata streamMetadata = new S3StreamMetadata(record.epoch(), record.rangeIndex(),
record.startOffset(), this.snapshotRegistry);
this.streamsMetadata.put(streamId, streamMetadata);
this.nextAssignedStreamId = Math.max(this.nextAssignedStreamId, streamId + 1);
}

public void replay(RemoveS3StreamRecord record) {
Expand Down Expand Up @@ -305,6 +307,10 @@ public Map<Integer, BrokerS3WALMetadata> brokersMetadata() {
return brokersMetadata;
}

public Long nextAssignedStreamId() {
return nextAssignedStreamId;
}

@Override
public String toString() {
return "StreamControlManager{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;

@Timeout(40)
@Tag("S3Unit")
public class S3ObjectControlManagerTest {
private static final int BROKER0 = 0;
private static final int BROKER1 = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Timeout(value = 40)
@Tag("S3Unit")
public class StreamControlManagerTest {

private final static long STREAM0 = 0;
Expand All @@ -67,10 +69,12 @@ public void setUp() {
@Test
public void testBasicCreateStream() {
// 1. create stream_0 success
CreateStreamRequestData request0 = new CreateStreamRequestData().setStreamId(STREAM0);
CreateStreamRequestData request0 = new CreateStreamRequestData();
ControllerResult<CreateStreamResponseData> result0 = manager.createStream(request0);
List<ApiMessageAndVersion> records0 = result0.records();
CreateStreamResponseData response0 = result0.response();
assertEquals(Errors.NONE.code(), response0.errorCode());
assertEquals(STREAM0, response0.streamId());
assertEquals(1, records0.size());
ApiMessageAndVersion record0 = records0.get(0);
assertInstanceOf(S3StreamRecord.class, record0.message());
Expand All @@ -79,7 +83,6 @@ public void testBasicCreateStream() {
assertEquals(0, streamRecord0.epoch());
assertEquals(-1, streamRecord0.rangeIndex());
assertEquals(0L, streamRecord0.startOffset());
assertEquals(0, response0.errorCode());

// replay records_0
manager.replay(streamRecord0);
Expand All @@ -88,23 +91,41 @@ public void testBasicCreateStream() {
manager.streamsMetadata();
assertEquals(1, streamsMetadata.size());
verifyInitializedStreamMetadata(streamsMetadata.get(STREAM0));
assertEquals(1, manager.nextAssignedStreamId());

// 2. create stream_0 with exception
CreateStreamRequestData request1 = new CreateStreamRequestData().setStreamId(STREAM0);
// 2. create stream_1
CreateStreamRequestData request1 = new CreateStreamRequestData();
ControllerResult<CreateStreamResponseData> result1 = manager.createStream(request1);
List<ApiMessageAndVersion> records1 = result1.records();
CreateStreamResponseData response1 = result1.response();
assertEquals(0, records1.size());
assertEquals(Errors.STREAM_EXIST.code(), response1.errorCode());
assertEquals(Errors.NONE.code(), response1.errorCode());
assertEquals(STREAM1, response1.streamId());
assertEquals(1, records1.size());
ApiMessageAndVersion record1 = records1.get(0);
assertInstanceOf(S3StreamRecord.class, record1.message());
S3StreamRecord streamRecord1 = (S3StreamRecord) record1.message();
assertEquals(STREAM1, streamRecord1.streamId());
assertEquals(0, streamRecord1.epoch());
assertEquals(-1, streamRecord1.rangeIndex());
assertEquals(0L, streamRecord1.startOffset());

// replay records_1
manager.replay(streamRecord1);
// verify the stream_2 is created
streamsMetadata =
manager.streamsMetadata();
assertEquals(2, streamsMetadata.size());
verifyInitializedStreamMetadata(streamsMetadata.get(STREAM1));
assertEquals(2, manager.nextAssignedStreamId());
}

@Test
public void testBasicOpenStream() {
// 1. create stream_0 and stream_1
CreateStreamRequestData request0 = new CreateStreamRequestData().setStreamId(STREAM0);
CreateStreamRequestData request0 = new CreateStreamRequestData();
ControllerResult<CreateStreamResponseData> result0 = manager.createStream(request0);
result0.records().stream().map(x -> (S3StreamRecord) x.message()).forEach(manager::replay);
CreateStreamRequestData request1 = new CreateStreamRequestData().setStreamId(STREAM1);
CreateStreamRequestData request1 = new CreateStreamRequestData();
ControllerResult<CreateStreamResponseData> result1 = manager.createStream(request1);
result1.records().stream().map(x -> (S3StreamRecord) x.message()).forEach(manager::replay);

Expand Down