Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,19 +108,17 @@ public int compareTo(S3Object o) {

@Override
public boolean equals(Object o) {
if (this == o) {
if (this == o)
return true;
}
if (o == null || getClass() != o.getClass()) {
if (o == null || getClass() != o.getClass())
return false;
}
S3Object s3Object = (S3Object) o;
return objectId == s3Object.objectId;
return objectId == s3Object.objectId && objectSize == s3Object.objectSize && preparedTimeInMs == s3Object.preparedTimeInMs && expiredTimeInMs == s3Object.expiredTimeInMs && committedTimeInMs == s3Object.committedTimeInMs && markDestroyedTimeInMs == s3Object.markDestroyedTimeInMs && attributes == s3Object.attributes && Objects.equals(objectKey, s3Object.objectKey) && s3ObjectState == s3Object.s3ObjectState;
}

@Override
public int hashCode() {
return Objects.hash(objectId);
return Objects.hash(objectId, objectSize, objectKey, preparedTimeInMs, expiredTimeInMs, committedTimeInMs, markDestroyedTimeInMs, s3ObjectState, attributes);
}

public long getObjectId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

package org.apache.kafka.image;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.automq.stream.s3.objects.ObjectAttributes;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -29,7 +26,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.kafka.common.metadata.AssignedS3ObjectIdRecord;
import org.apache.kafka.common.metadata.RemoveS3ObjectRecord;
import org.apache.kafka.common.metadata.S3ObjectRecord;
Expand All @@ -40,12 +36,16 @@
import org.apache.kafka.metadata.stream.S3Object;
import org.apache.kafka.metadata.stream.S3ObjectState;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.automq.AutoMQVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

@Timeout(40)
@Tag("S3Unit")
public class S3ObjectsImageTest {
Expand All @@ -72,6 +72,9 @@ public class S3ObjectsImageTest {

RegistryRef ref1 = new RegistryRef(registry, 0, new ArrayList<>());

short objectRecordVersion = AutoMQVersion.LATEST.objectRecordVersion();
int attribute = ObjectAttributes.builder().bucket((short) 1).build().attributes();

IMAGE1 = new S3ObjectsImage(3, map, ref1);
DELTA1_RECORDS = new ArrayList<>();
// try to update object0 and object1 to committed
Expand All @@ -80,43 +83,43 @@ public class S3ObjectsImageTest {
// try to add applied object4
DELTA1_RECORDS.add(new ApiMessageAndVersion(new S3ObjectRecord().
setObjectId(0L).
setObjectState((byte) S3ObjectState.COMMITTED.ordinal()), (short) 0));
setObjectState((byte) S3ObjectState.COMMITTED.ordinal()).setAttributes(attribute).setObjectSize(233)
.setPreparedTimeInMs(2).setExpiredTimeInMs(3).setCommittedTimeInMs(4).setMarkDestroyedTimeInMs(5), objectRecordVersion));
DELTA1_RECORDS.add(new ApiMessageAndVersion(new S3ObjectRecord().
setObjectId(1L).
setObjectState((byte) S3ObjectState.COMMITTED.ordinal()), (short) 0));
setObjectState((byte) S3ObjectState.COMMITTED.ordinal()).setAttributes(attribute), objectRecordVersion));
DELTA1_RECORDS.add(new ApiMessageAndVersion(new S3ObjectRecord().
setObjectId(2L).
setObjectState((byte) S3ObjectState.MARK_DESTROYED.ordinal()), (short) 0));
setObjectState((byte) S3ObjectState.MARK_DESTROYED.ordinal()).setAttributes(attribute), objectRecordVersion));
DELTA1_RECORDS.add(new ApiMessageAndVersion(new RemoveS3ObjectRecord()
.setObjectId(3L), (short) 0));
DELTA1_RECORDS.add(new ApiMessageAndVersion(new AssignedS3ObjectIdRecord()
.setAssignedS3ObjectId(4L), (short) 0));
DELTA1_RECORDS.add(new ApiMessageAndVersion(new S3ObjectRecord().
setObjectId(4L).
setObjectState((byte) S3ObjectState.PREPARED.ordinal()), (short) 0));
setObjectState((byte) S3ObjectState.PREPARED.ordinal()), objectRecordVersion));
DELTA1 = new S3ObjectsDelta(IMAGE1);
RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS);


registry = new SnapshotRegistry(new LogContext());
TimelineHashMap<Long/*objectId*/, S3Object> map2 = new TimelineHashMap<>(registry, 10);

RegistryRef ref2 = new RegistryRef(registry, 1, new ArrayList<>());
map2.put(0L, new S3Object(
0L, -1, null,
-1, -1, -1, -1,
S3ObjectState.COMMITTED, ObjectAttributes.DEFAULT.attributes()));
0L, 233, null,
2, 3, 4, 5,
S3ObjectState.COMMITTED, attribute));
map2.put(1L, new S3Object(
1L, -1, null,
-1, -1, -1, -1,
S3ObjectState.COMMITTED, ObjectAttributes.DEFAULT.attributes()));
1L, 0, null,
0, 0, 0, 0,
S3ObjectState.COMMITTED, attribute));
map2.put(2L, new S3Object(
2L, -1, null,
-1, -1, -1, -1,
S3ObjectState.MARK_DESTROYED, ObjectAttributes.DEFAULT.attributes()));
2L, 0, null,
0, 0, 0, 0,
S3ObjectState.MARK_DESTROYED, attribute));
map2.put(4L, new S3Object(
4L, -1, null,
-1, -1, -1, -1,
4L, 0, null,
0, 0, 0, 0,
S3ObjectState.PREPARED, ObjectAttributes.DEFAULT.attributes()));
registry.getOrCreateSnapshot(1);
IMAGE2 = new S3ObjectsImage(4L, map2, ref2);
Expand Down