Skip to content

Commit

Permalink
Merge 2979284 into 363be7f
Browse files Browse the repository at this point in the history
  • Loading branch information
Maithem committed Nov 20, 2017
2 parents 363be7f + 2979284 commit d0908f2
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ private void read(CorfuPayloadMsg<ReadRequest> msg, ChannelHandlerContext ctx, I
l < msg.getPayload().getRange().upperEndpoint() + 1L; l++) {
ILogData e = dataCache.get(l);
if (e == null) {
rr.put(l, LogData.EMPTY);
rr.put(l, LogData.getEmpty(l));
} else {
rr.put(l, (LogData) e);
}
Expand All @@ -212,7 +212,7 @@ private void multiRead(CorfuPayloadMsg<MultipleReadRequest> msg, ChannelHandlerC
for (Long l : msg.getPayload().getAddresses()) {
ILogData e = dataCache.get(l);
if (e == null) {
rr.put(l, LogData.EMPTY);
rr.put(l, LogData.getEmpty(l));
} else {
rr.put(l, (LogData) e);
}
Expand All @@ -228,7 +228,8 @@ private void fillHole(CorfuPayloadMsg<TrimRequest> msg, ChannelHandlerContext ct
IServerRouter r,
boolean isMetricsEnabled) {
try {
dataCache.put(msg.getPayload().getAddress(), LogData.HOLE);
long address = msg.getPayload().getAddress();
dataCache.put(address, LogData.getHole(address));
r.sendResponse(ctx, msg, CorfuMsgType.WRITE_OK.msg());

} catch (OverwriteException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ public synchronized void trim(long address) {
@Override
public LogData read(long address) {
if (isTrimmed(address)) {
return LogData.TRIMMED;
return LogData.getTrimmed(address);
}
if (trimmed.contains(address)) {
return LogData.TRIMMED;
return LogData.getTrimmed(address);
}

return logCache.get(address);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -983,13 +983,13 @@ public void append(long address, LogData entry) {
@Override
public LogData read(long address) {
if (isTrimmed(address)) {
return LogData.TRIMMED;
return LogData.getTrimmed(address);
}
SegmentHandle sh = getSegmentHandleForAddress(address);

try {
if (sh.getPendingTrims().contains(address)) {
return LogData.TRIMMED;
return LogData.getTrimmed(address);
}
return readRecord(sh, address);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
@AllArgsConstructor
public enum DataType implements ICorfuPayload<DataType> {
DATA(0, true),
EMPTY(1, false),
EMPTY(1, true),
HOLE(2, true),
TRIMMED(3, false),
TRIMMED(3, true),
RANK_ONLY(4, true);

final int val;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
@Slf4j
public class LogData implements ICorfuPayload<LogData>, IMetadata, ILogData {

public static final LogData EMPTY = new LogData(DataType.EMPTY);
public static final LogData HOLE = new LogData(DataType.HOLE);
public static final LogData TRIMMED = new LogData(DataType.TRIMMED);
public static final int NOT_KNOWN = -1;

@Getter
Expand All @@ -37,6 +34,24 @@ public class LogData implements ICorfuPayload<LogData>, IMetadata, ILogData {

private final transient AtomicReference<Object> payload = new AtomicReference<>();

public static LogData getTrimmed(long address) {
LogData logData = new LogData(DataType.TRIMMED);
logData.setGlobalAddress(address);
return logData;
}

public static LogData getHole(long address) {
LogData logData = new LogData(DataType.HOLE);
logData.setGlobalAddress(address);
return logData;
}

public static LogData getEmpty(long address) {
LogData logData = new LogData(DataType.EMPTY);
logData.setGlobalAddress(address);
return logData;
}

/**
* Return the payload.
*/
Expand Down Expand Up @@ -211,6 +226,17 @@ void doSerializeInternal(ByteBuf buf) {
}
}

@Override
public boolean equals(Object o) {
if (o == this) {
return true;
} else if (!(o instanceof LogData)) {
return false;
} else {
return compareTo((LogData) o) == 0;
}
}

@Override
public String toString() {
return "LogData[" + getGlobalAddress() + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@
import org.corfudb.protocols.wireprotocol.DataType;
import org.corfudb.protocols.wireprotocol.ILogData;
import org.corfudb.protocols.wireprotocol.IMetadata;
import org.corfudb.protocols.wireprotocol.IToken;
import org.corfudb.protocols.wireprotocol.LogData;
import org.corfudb.protocols.wireprotocol.ReadResponse;
import org.corfudb.runtime.CorfuRuntime;
import org.corfudb.runtime.exceptions.DataCorruptionException;
import org.corfudb.runtime.exceptions.DataOutrankedException;
import org.corfudb.runtime.exceptions.OverwriteException;
import org.corfudb.runtime.exceptions.TrimmedException;
import org.corfudb.runtime.exceptions.ValueAdoptedException;
import org.junit.Test;

Expand Down Expand Up @@ -86,29 +84,41 @@ public void canReadWrite()
.isEqualTo(testString);
}

@Test
public void readingEmptyAddress() throws Exception {
final long address0 = 0;
LogData r = client.read(address0).get().getAddresses().get(0L);
assertThat(r.isEmpty()).isTrue();
assertThat(r.getGlobalAddress()).isEqualTo(address0);
assertThat(LogData.getEmpty(0)).isEqualTo(LogData.getEmpty(0));
}

@Test
public void readingTrimmedAddress() throws Exception {
byte[] testString = "hello world".getBytes();
client.write(0, Collections.<UUID>emptySet(), null, testString, Collections.emptyMap()).get();
client.write(1, Collections.<UUID>emptySet(), null, testString, Collections.emptyMap()).get();
LogData r = client.read(0).get().getAddresses().get(0L);
final long address0 = 0;
final long address1 = 1;
client.write(address0, Collections.<UUID>emptySet(), null, testString, Collections.emptyMap()).get();
client.write(address1, Collections.<UUID>emptySet(), null, testString, Collections.emptyMap()).get();
LogData r = client.read(address0).get().getAddresses().get(0L);
assertThat(r.getType())
.isEqualTo(DataType.DATA);
r = client.read(1).get().getAddresses().get(1L);
r = client.read(address1).get().getAddresses().get(1L);
assertThat(r.getType())
.isEqualTo(DataType.DATA);

client.prefixTrim(0);
client.prefixTrim(address0);
client.compact();

// For logunit cach flush
LogUnitServer server2 = new LogUnitServer(serverContext);
serverRouter.reset();
serverRouter.addServer(server2);

assertThat(client.read(0).get().getAddresses().get(0L).isTrimmed()).isTrue();
assertThat(r.getType())
.isEqualTo(DataType.DATA);
LogData trimmedAddress = client.read(address0).get().getAddresses().get(0L);

assertThat(trimmedAddress.isTrimmed()).isTrue();
assertThat(trimmedAddress.getGlobalAddress()).isEqualTo(address0);
}

@Test
Expand Down Expand Up @@ -225,12 +235,14 @@ public void overwriteThrowsException()
public void holeFillCannotBeOverwritten()
throws Exception {
byte[] testString = "hello world".getBytes();
client.fillHole(0).get();
LogData r = client.read(0).get().getAddresses().get(0L);
final long address0 = 0;
client.fillHole(address0).get();
LogData r = client.read(address0).get().getAddresses().get(0L);
assertThat(r.getType())
.isEqualTo(DataType.HOLE);
assertThat(r.getGlobalAddress()).isEqualTo(address0);

assertThatThrownBy(() -> client.write(0, Collections.<UUID>emptySet(), null, testString, Collections.emptyMap()).get())
assertThatThrownBy(() -> client.write(address0, Collections.<UUID>emptySet(), null, testString, Collections.emptyMap()).get())
.isInstanceOf(ExecutionException.class)
.hasCauseInstanceOf(OverwriteException.class);
}
Expand Down

0 comments on commit d0908f2

Please sign in to comment.