Skip to content

Commit

Permalink
Merge pull request #712 from CorfuDB/backpointerTermination
Browse files Browse the repository at this point in the history
Terminate Backpointers at head of stream
  • Loading branch information
no2chem committed Jun 14, 2017
2 parents 21fbbd5 + f638433 commit 2249c07
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import org.corfudb.protocols.logprotocol.LogEntry;
import org.corfudb.runtime.CorfuRuntime;
import org.corfudb.runtime.view.Address;

import java.util.UUID;

Expand Down Expand Up @@ -82,8 +81,7 @@ default LogEntry getLogEntry(CorfuRuntime runtime) {
*/
default boolean hasBackpointer(UUID streamID) {
return getBackpointerMap() != null
&& getBackpointerMap().containsKey(streamID) &&
Address.isAddress(getBackpointerMap().get(streamID));
&& getBackpointerMap().containsKey(streamID);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package org.corfudb.runtime.view.stream;

import lombok.extern.slf4j.Slf4j;
import org.corfudb.protocols.logprotocol.CheckpointEntry;
import org.corfudb.protocols.wireprotocol.ILogData;
import org.corfudb.protocols.wireprotocol.TokenResponse;
import org.corfudb.protocols.logprotocol.CheckpointEntry;
import org.corfudb.runtime.CorfuRuntime;
import org.corfudb.runtime.exceptions.OverwriteException;
import org.corfudb.runtime.exceptions.TrimmedException;
Expand Down Expand Up @@ -161,6 +161,10 @@ protected enum BackpointerOp {
INCLUDE_STOP /** Stop, but also include this address. */
}

private long backpointerCount = 0L;

public long getBackpointerCount() {return backpointerCount;}

protected boolean followBackpointers(final UUID streamId,
final NavigableSet<Long> queue,
final long startAddress,
Expand All @@ -172,15 +176,15 @@ protected boolean followBackpointers(final UUID streamId,
long currentAddress = startAddress;

// Loop until we have reached the stop address.
while (currentAddress > stopAddress) {
while (currentAddress > stopAddress && Address.isAddress(currentAddress)) {
// The queue already contains an address from this
// range, terminate.
if (queue.contains(currentAddress)) {
log.trace("FollowBackpointers[{}] Terminate due to {} " +
"already in queue", currentAddress);
return entryAdded;
}

backpointerCount++;
// Read the current address
ILogData d = read(currentAddress);

Expand All @@ -199,20 +203,29 @@ protected boolean followBackpointers(final UUID streamId,
}
}

boolean singleStep = true;
// Now calculate the next address
// Try using backpointers first
if (!runtime.isBackpointersDisabled() &&
d.hasBackpointer(streamId)) {
currentAddress = d.getBackpointer(streamId);

if (!runtime.isBackpointersDisabled() && d.hasBackpointer(streamId)) {
long tmp = d.getBackpointer(streamId);
// if backpointer is a valid log address or Address.NON_EXIST (beginning of the stream),
// do not single step back on the log
if (Address.isAddress(tmp) || tmp == Address.NON_EXIST) {
currentAddress = tmp;
singleStep = false;
}
}
// backpointers failed, so we're
// downgrading to a linear scan
else {

if (singleStep) {
// backpointers failed, so we're
// downgrading to a linear scan
currentAddress = currentAddress - 1;
}
}

return entryAdded;

}

protected BackpointerOp resolveCheckpoint(final QueuedStreamContext context, ILogData data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
import org.junit.Before;
import org.junit.Test;

import java.util.*;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package org.corfudb.runtime.view.stream;

import org.corfudb.infrastructure.TestLayoutBuilder;
import org.corfudb.runtime.CorfuRuntime;
import org.corfudb.runtime.view.AbstractViewTest;
import org.corfudb.runtime.view.Layout;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -113,4 +111,30 @@ public void moreReadQueueTest() {
assertThat(new String(payLoad).equals(String.valueOf(i)));
}
}

/**
* test proper backpointer termination at the head of a stream
*
* */
@Test
public void headOfStreamBackpointerTermination() {

final int totalEntries = PARAMETERS.NUM_ITERATIONS_LOW + 1;
CorfuRuntime runtime = getDefaultRuntime();

// Create StreamA (100 entries)
IStreamView svA = runtime.getStreamsView().get(CorfuRuntime.getStreamID("streamA"));
for (int i = 0; i < PARAMETERS.NUM_ITERATIONS_LOW; i++) {
svA.append(String.valueOf(i).getBytes());
}

// Create StreamB (1 entry)
IStreamView svB = runtime.getStreamsView().get(CorfuRuntime.getStreamID("streamB"));
svB.append(String.valueOf(0).getBytes());

// Fetch Stream B and verify backpointer count (which requires 1 read = 1 entry)
svB.remainingUpTo(totalEntries);
assertThat(((BackpointerStreamView) svB).getBackpointerCount()).isEqualTo(1L);
}

}

0 comments on commit 2249c07

Please sign in to comment.