Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/es_unit_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ jobs:
- name: Setup Gradle
uses: gradle/gradle-build-action@v2
- name: Execute Gradle build
run: ./gradlew metadata:esUnitTest core:esUnitTest
run: ./gradlew metadata:esUnitTest core:esUnitTest metadata:S3UnitTest core:S3UnitTest
30 changes: 30 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,36 @@ subprojects {
}
}

tasks.register('S3UnitTest', Test) {
description = 'Runs unit tests for Kafka on S3.'
dependsOn compileJava

maxParallelForks = maxTestForks
ignoreFailures = userIgnoreFailures

maxHeapSize = defaultMaxHeapSize
jvmArgs = defaultJvmArgs

testLogging {
events = userTestLoggingEvents ?: testLoggingEvents
showStandardStreams = userShowStandardStreams ?: testShowStandardStreams
exceptionFormat = testExceptionFormat
displayGranularity = 0
}
logTestStdout.rehydrate(delegate, owner, this)()

exclude testsToExclude

useJUnitPlatform {
includeTags 'S3Unit'
}

retry {
maxRetries = userMaxTestRetries
maxFailures = userMaxTestRetryFailures
}
}

tasks.register('esIntegrationTest', Test) {
description = 'Runs integration tests for elastic stream storage.'
dependsOn compileJava
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.Set;
Expand Down Expand Up @@ -121,15 +122,33 @@ public Long nextAssignedObjectId() {
return nextAssignedObjectId;
}

public ControllerResult<PrepareS3ObjectResponseData> prepareObject(PrepareS3ObjectRequestData data) {
throw new UnsupportedOperationException();
public ControllerResult<PrepareS3ObjectResponseData> prepareObject(PrepareS3ObjectRequestData request) {
// TODO: support batch prepare objects
List<ApiMessageAndVersion> records = new ArrayList<>();
PrepareS3ObjectResponseData response = new PrepareS3ObjectResponseData();
int count = request.preparedCount();
List<Long> prepareObjectIds = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
Long objectId = nextAssignedObjectId + i;
prepareObjectIds.add(objectId);
long preparedTs = System.currentTimeMillis();
long expiredTs = preparedTs + request.timeToLiveInMs();
S3ObjectRecord record = new S3ObjectRecord()
.setObjectId(objectId)
.setObjectState((byte) S3ObjectState.PREPARED.ordinal())
.setPreparedTimeInMs(preparedTs)
.setExpiredTimeInMs(expiredTs);
records.add(new ApiMessageAndVersion(record, (short) 0));
}
response.setS3ObjectIds(prepareObjectIds);
return ControllerResult.of(records, response);
}

public void replay(S3ObjectRecord record) {
GenerateContextV0 ctx = new GenerateContextV0(clusterId, record.objectId());
String objectKey = S3ObjectKeyGeneratorManager.getByVersion(0).generate(ctx);
S3Object object = new S3Object(record.objectId(), record.objectSize(), objectKey,
record.appliedTimeInMs(), record.expiredTimeInMs(), record.committedTimeInMs(), record.destroyedTimeInMs(),
record.preparedTimeInMs(), record.expiredTimeInMs(), record.committedTimeInMs(), record.destroyedTimeInMs(),
S3ObjectState.fromByte(record.objectState()));
objectsMetadata.put(record.objectId(), object);
// TODO: recover the prepared objects and mark destroyed objects when restart the controller
Expand Down Expand Up @@ -161,11 +180,11 @@ public ControllerResult<Void> checkS3ObjectsLifecycle() {
S3ObjectRecord record = new S3ObjectRecord()
.setObjectId(obj.getObjectId())
.setObjectState((byte) S3ObjectState.DESTROYED.ordinal())
.setObjectSize(obj.getObjectSize().get())
.setAppliedTimeInMs(obj.getAppliedTimeInMs().get())
.setExpiredTimeInMs(obj.getExpiredTimeInMs().get())
.setCommittedTimeInMs(obj.getCommittedTimeInMs().get())
.setDestroyedTimeInMs(obj.getDestroyedTimeInMs().get());
.setObjectSize(obj.getObjectSize())
.setPreparedTimeInMs(obj.getPreparedTimeInMs())
.setExpiredTimeInMs(obj.getExpiredTimeInMs())
.setCommittedTimeInMs(obj.getCommittedTimeInMs())
.setDestroyedTimeInMs(obj.getDestroyedTimeInMs());
// generate the records which mark the expired objects as destroyed
records.add(new ApiMessageAndVersion(record, (short) 0));
// generate the records which listener reply for the object-destroy events
Expand All @@ -177,7 +196,7 @@ public ControllerResult<Void> checkS3ObjectsLifecycle() {
// check the mark destroyed objects
ObjectPair[] destroyedObjects = this.markDestroyedObjects.stream()
// must guarantee that the objects in markDestroyedObjects also exist in objectsMetadata
.map(id -> new ObjectPair(id, objectsMetadata.get(id).getObjectKey().get()))
.map(id -> new ObjectPair(id, objectsMetadata.get(id).getObjectKey()))
.toArray(ObjectPair[]::new);
String[] destroyedObjectKeys = Arrays.stream(destroyedObjects).map(ObjectPair::objectKey).toArray(String[]::new);
Set<Long> destroyedObjectIds = Arrays.stream(destroyedObjects).map(ObjectPair::objectId).collect(Collectors.toSet());
Expand All @@ -203,6 +222,7 @@ public ControllerResult<Void> checkS3ObjectsLifecycle() {

/**
* Generate RemoveS3ObjectRecord for the deleted S3Objects.
*
* @param deletedObjectIds the deleted S3Objects' ids
* @return the result of the generation, contains the records which should be applied to the raft.
*/
Expand All @@ -215,6 +235,14 @@ public ControllerResult<Void> notifyS3ObjectDeleted(Set<Long> deletedObjectIds)
return ControllerResult.of(records, null);
}

public Queue<Long> preparedObjects() {
return preparedObjects;
}

public Map<Long, S3Object> objectsMetadata() {
return objectsMetadata;
}

/**
* S3Object's lifecycle listener.
*/
Expand All @@ -230,6 +258,7 @@ public interface S3ObjectLifeCycleListener {
}

static class ObjectPair {

private final Long objectId;
private final String objectKey;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.kafka.metadata.stream;

import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.common.metadata.S3ObjectRecord;
import org.apache.kafka.server.common.ApiMessageAndVersion;

Expand All @@ -27,54 +26,54 @@
*/
public class S3Object implements Comparable<S3Object> {

protected final Long objectId;
private final long objectId;

protected Optional<Long> objectSize = Optional.empty();
private long objectSize = -1;

protected Optional<String> objectKey = Optional.empty();
private String objectKey;

/**
* The time when broker apply the object
* The time when broker ask for preparing the object
*/
protected Optional<Long> appliedTimeInMs = Optional.empty();
private long preparedTimeInMs;

/**
* The time when the object will be expired if it is not be committed
*/
protected Optional<Long> expiredTimeInMs = Optional.empty();
private long expiredTimeInMs;

/**
* The time when the object is committed
*/
protected Optional<Long> committedTimeInMs = Optional.empty();
private long committedTimeInMs;

/**
* The time when the object is destroyed
*/
protected Optional<Long> destroyedTimeInMs = Optional.empty();
private long destroyedTimeInMs;

protected S3ObjectState s3ObjectState = S3ObjectState.UNINITIALIZED;
private S3ObjectState s3ObjectState = S3ObjectState.UNINITIALIZED;

protected S3Object(final Long objectId) {
private S3Object(final long objectId) {
this.objectId = objectId;
}

public S3Object(
final Long objectId,
final Long objectSize,
final long objectId,
final long objectSize,
final String objectKey,
final Long appliedTimeInMs,
final Long expiredTimeInMs,
final Long committedTimeInMs,
final Long destroyedTimeInMs,
final long preparedTimeInMs,
final long expiredTimeInMs,
final long committedTimeInMs,
final long destroyedTimeInMs,
final S3ObjectState s3ObjectState) {
this.objectId = objectId;
this.objectSize = Optional.of(objectSize);
this.objectKey = Optional.of(objectKey);
this.appliedTimeInMs = Optional.of(appliedTimeInMs);
this.expiredTimeInMs = Optional.of(expiredTimeInMs);
this.committedTimeInMs = Optional.of(committedTimeInMs);
this.destroyedTimeInMs = Optional.of(destroyedTimeInMs);
this.objectSize = objectSize;
this.objectKey = objectKey;
this.preparedTimeInMs = preparedTimeInMs;
this.expiredTimeInMs = expiredTimeInMs;
this.committedTimeInMs = committedTimeInMs;
this.destroyedTimeInMs = destroyedTimeInMs;
this.s3ObjectState = s3ObjectState;
}

Expand All @@ -83,7 +82,7 @@ public void onApply() {
throw new IllegalStateException("Object is not in UNINITIALIZED state");
}
this.s3ObjectState = S3ObjectState.PREPARED;
this.appliedTimeInMs = Optional.of(System.currentTimeMillis());
this.preparedTimeInMs = System.currentTimeMillis();
}

public void onCreate(S3ObjectCommitContext createContext) {
Expand All @@ -92,9 +91,9 @@ public void onCreate(S3ObjectCommitContext createContext) {
throw new IllegalStateException("Object is not in APPLIED state");
}
this.s3ObjectState = S3ObjectState.COMMITTED;
this.committedTimeInMs = Optional.of(createContext.committedTimeInMs);
this.objectSize = Optional.of(createContext.objectSize);
this.objectKey = Optional.of(createContext.objectAddress);
this.committedTimeInMs = createContext.committedTimeInMs;
this.objectSize = createContext.objectSize;
this.objectKey = createContext.objectAddress;
}

public void onMarkDestroy() {
Expand All @@ -115,23 +114,23 @@ public void onDestroy() {
public ApiMessageAndVersion toRecord() {
return new ApiMessageAndVersion(new S3ObjectRecord()
.setObjectId(objectId)
.setObjectSize(objectSize.orElse(null))
.setObjectSize(objectSize)
.setObjectState((byte) s3ObjectState.ordinal())
.setAppliedTimeInMs(appliedTimeInMs.orElse(null))
.setExpiredTimeInMs(expiredTimeInMs.orElse(null))
.setCommittedTimeInMs(committedTimeInMs.orElse(null))
.setDestroyedTimeInMs(destroyedTimeInMs.orElse(null)), (short) 0);
.setPreparedTimeInMs(preparedTimeInMs)
.setExpiredTimeInMs(expiredTimeInMs)
.setCommittedTimeInMs(committedTimeInMs)
.setDestroyedTimeInMs(destroyedTimeInMs), (short) 0);
}

static public class S3ObjectCommitContext {

private final Long committedTimeInMs;
private final Long objectSize;
private final long committedTimeInMs;
private final long objectSize;
private final String objectAddress;

public S3ObjectCommitContext(
final Long committedTimeInMs,
final Long objectSize,
final long committedTimeInMs,
final long objectSize,
final String objectAddress) {
this.committedTimeInMs = committedTimeInMs;
this.objectSize = objectSize;
Expand All @@ -141,7 +140,7 @@ public S3ObjectCommitContext(

@Override
public int compareTo(S3Object o) {
return this.objectId.compareTo(o.objectId);
return Long.compare(this.objectId, o.objectId);
}

@Override
Expand All @@ -161,31 +160,31 @@ public int hashCode() {
return Objects.hash(objectId);
}

public Long getObjectId() {
public long getObjectId() {
return objectId;
}

public Optional<Long> getObjectSize() {
public long getObjectSize() {
return objectSize;
}

public Optional<String> getObjectKey() {
public String getObjectKey() {
return objectKey;
}

public Optional<Long> getAppliedTimeInMs() {
return appliedTimeInMs;
public long getPreparedTimeInMs() {
return preparedTimeInMs;
}

public Optional<Long> getCommittedTimeInMs() {
public long getCommittedTimeInMs() {
return committedTimeInMs;
}

public Optional<Long> getDestroyedTimeInMs() {
public long getDestroyedTimeInMs() {
return destroyedTimeInMs;
}

public Optional<Long> getExpiredTimeInMs() {
public long getExpiredTimeInMs() {
return expiredTimeInMs;
}

Expand All @@ -194,6 +193,6 @@ public S3ObjectState getS3ObjectState() {
}

public boolean isExpired() {
return System.currentTimeMillis() > expiredTimeInMs.get();
return System.currentTimeMillis() > expiredTimeInMs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
"about": "The object size of the S3 object"
},
{
"name": "AppliedTimeInMs",
"name": "PreparedTimeInMs",
"type": "int64",
"versions": "0+",
"about": "The object be applied timestamp"
"about": "The object be prepared timestamp"
},
{
"name": "ExpiredTimeInMs",
Expand Down
Loading