Skip to content

Commit

Permalink
Merge branch 'master' into failAfter
Browse files Browse the repository at this point in the history
  • Loading branch information
no2chem committed Dec 12, 2017
2 parents 6f2856e + 0c92e3c commit cab0e4a
Show file tree
Hide file tree
Showing 9 changed files with 241 additions and 17 deletions.
48 changes: 43 additions & 5 deletions runtime/src/main/java/org/corfudb/runtime/CorfuRuntime.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,22 +53,29 @@
public class CorfuRuntime {

static final int DEFAULT_TIMEOUT_MINUTES_FAST_LOADING = 30;

public static final int BULK_READ_SIZE = 10;

@Data
public static class CorfuRuntimeParameters {

/** True, if undo logging is disabled. */
/**
* True, if undo logging is disabled.
*/
boolean undoDisabled = false;

/** True, if optimistic undo logging is disabled. */
/**
* True, if optimistic undo logging is disabled.
*/
boolean optimisticUndoDisabled = false;

/** Number of times to attempt to read before hole filling. */
/**
* Number of times to attempt to read before hole filling.
*/
int holeFillRetry = 10;
}



@Getter
private final CorfuRuntimeParameters parameters = new CorfuRuntimeParameters();
/**
Expand Down Expand Up @@ -238,6 +245,31 @@ public CorfuRuntime setMetrics(@NonNull MetricRegistry metrics) {
return this;
}

/**
* These two handlers are provided to give some control on what happen when system is down.
*
* For applications that want to have specific behaviour when a the system appears unavailable, they can
* register their own handler for both before the rpc request and upon network exception.
*
* An example of how to use these handlers implementing timeout is given in
* test/src/test/java/org/corfudb/runtime/CorfuRuntimeTest.java
*
*/
public Runnable beforeRpcHandler = () -> {};
public Runnable systemDownHandler = () -> {};


public CorfuRuntime registerSystemDownHandler(Runnable handler) {
systemDownHandler = handler;
return this;
}

public CorfuRuntime registerBeforeRpcHandler(Runnable handler) {
beforeRpcHandler = handler;
return this;
}


/**
* When set, overrides the default getRouterFunction. Used by the testing
* framework to ensure the default routers used are for testing.
Expand Down Expand Up @@ -360,7 +392,7 @@ public void stop() {
* Stop all routers associated with this Corfu Runtime.
**/
public void stop(boolean shutdown) {
for (IClientRouter r: nodeRouters.values()) {
for (IClientRouter r : nodeRouters.values()) {
r.stop(shutdown);
}
if (!shutdown) {
Expand Down Expand Up @@ -426,6 +458,7 @@ public CorfuRuntime setCacheDisabled(boolean disable) {
/**
* If enabled, successful transactions will be written to a special transaction stream
* (i.e. TRANSACTION_STREAM_ID)
*
* @param enable indicates if transaction logging is enabled
* @return corfu runtime object
*/
Expand Down Expand Up @@ -496,6 +529,8 @@ private CompletableFuture<Layout> fetchLayout() {
return CompletableFuture.<Layout>supplyAsync(() -> {

List<String> layoutServersCopy = new ArrayList<>(layoutServers);
beforeRpcHandler.run();

while (true) {

Collections.shuffle(layoutServersCopy);
Expand Down Expand Up @@ -557,6 +592,9 @@ private CompletableFuture<Layout> fetchLayout() {
}
log.warn("Couldn't connect to any up-to-date layout servers, retrying in {}s.",
retryRate);

systemDownHandler.run();

try {
Thread.sleep(retryRate * 1000);
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.corfudb.runtime.exceptions.unrecoverable;

/**
* Created by rmichoud on 10/31/17.
*/
public class SystemUnavailableError extends UnrecoverableCorfuError {
public SystemUnavailableError(String reason) {
super(reason);
}

}
47 changes: 43 additions & 4 deletions runtime/src/main/java/org/corfudb/runtime/view/AbstractView.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
import lombok.extern.slf4j.Slf4j;

import org.corfudb.runtime.CorfuRuntime;
import org.corfudb.runtime.exceptions.NetworkException;
import org.corfudb.runtime.exceptions.ServerNotReadyException;
import org.corfudb.runtime.exceptions.unrecoverable.UnrecoverableCorfuError;
import org.corfudb.runtime.exceptions.unrecoverable.SystemUnavailableError;
import org.corfudb.runtime.exceptions.unrecoverable.UnrecoverableCorfuInterruptedError;
import org.corfudb.runtime.exceptions.WrongEpochException;
import org.corfudb.util.CFUtils;
import org.corfudb.util.Utils;

/**
Expand Down Expand Up @@ -58,24 +58,39 @@ public Layout getCurrentLayout() {
}
}

public <T, A extends RuntimeException, B extends RuntimeException, C extends RuntimeException,
D extends RuntimeException> T layoutHelper(LayoutFunction<Layout, T, A, B, C, D>
function)
throws A, B, C, D {
return layoutHelper(function, false);
}


/**
* Helper function for view to retrieve layouts.
* This function will retry the given function indefinitely,
* invalidating the view if there was a exception contacting the endpoint.
*
* There is a flag to set if we want the caller to handle Runtime Exceptions. For some
* special cases (like writes), we need to do a bit more work upon a Runtime Exception than just retry.
*
* @param function The function to execute.
* @param <T> The return type of the function.
* @param <A> Any exception the function may throw.
* @param <B> Any exception the function may throw.
* @param <C> Any exception the function may throw.
* @param <D> Any exception the function may throw.
* @param rethrowAllExceptions if all exceptions are rethrown to caller.
* @return The return value of the function.
*/
public <T, A extends RuntimeException, B extends RuntimeException, C extends RuntimeException,
D extends RuntimeException> T layoutHelper(LayoutFunction<Layout, T, A, B, C, D>
function)
function,
boolean rethrowAllExceptions)
throws A, B, C, D {
while (true) {

runtime.beforeRpcHandler.run();
while(true) {
try {
return function.apply(runtime.layout.get());
} catch (RuntimeException re) {
Expand All @@ -93,14 +108,38 @@ D extends RuntimeException> T layoutHelper(LayoutFunction<Layout, T, A, B, C, D>
log.warn("Got a wrong epoch exception, updating epoch to {} and "
+ "invalidate view", we.getCorrectEpoch());
runtime.invalidateLayout();
} else if (re instanceof NetworkException) {
log.warn("layoutHelper: System seems unavailable", re);

runtime.systemDownHandler.run();
runtime.invalidateLayout();

try {
Thread.sleep(runtime.retryRate * 1000);
} catch (InterruptedException e) {
log.warn("Interrupted Exception in layout helper.", e);
}

} else {
throw re;
}
if (rethrowAllExceptions) {
throw new RuntimeException(re);
}

} catch (InterruptedException ie) {
throw new UnrecoverableCorfuInterruptedError("Interrupted in layoutHelper", ie);
} catch (ExecutionException ex) {
log.warn("Error executing remote call, invalidating view and retrying in {}s",
runtime.retryRate, ex);

// If SystemUnavailable exception is thrown by the layout.get() completable future,
// the exception will materialize as an ExecutionException. In that case, we need to propagate
// this exception.
if (ex.getCause() instanceof SystemUnavailableError) {
throw (SystemUnavailableError) ex.getCause();
}

runtime.invalidateLayout();
Utils.sleepUninterruptibly(runtime.retryRate * 1000);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.corfudb.runtime.exceptions.StaleTokenException;
import org.corfudb.runtime.exceptions.TrimmedException;
import org.corfudb.runtime.exceptions.WrongEpochException;
import org.corfudb.runtime.exceptions.unrecoverable.UnrecoverableCorfuError;
import org.corfudb.util.CFUtils;

import java.util.Comparator;
Expand Down Expand Up @@ -84,6 +85,33 @@ public void resetCaches() {
readCache.invalidateAll();
}


/**
* Validates the state of a write after an exception occured during the process
*
* There are [currently] three different scenarios:
* 1. The data was persisted to some log units and we were able to recover it.
* 2. The data was not persisted and another client (or ourself) hole filled.
* In that case, we return an OverwriteException and let the nex layer handle it.
* 3. The address we tried to write to was trimmed. In this case, there is no way to
* know if the write went through or not. For sanity, we throw an OverwriteException
* and let the above layer retry.
*
* @param address
*/
private void validateStateOfWrittenEntry(long address) {
ILogData logData;
try {
logData = read(address);
} catch (TrimmedException te) {
// We cannot know if the write went through or not
throw new UnrecoverableCorfuError("We cannot determine state of an update because of a trim.");
}
if (logData.isHole()) {
throw new OverwriteException();
}
}

/** Write the given log data using a token, returning
* either when the write has been completed successfully,
* or throwing an OverwriteException if another value
Expand Down Expand Up @@ -111,12 +139,23 @@ public void write(IToken token, Object data) throws OverwriteException {
// Set the data to use the token
ld.useToken(token);


// Do the write
l.getReplicationMode(token.getTokenValue())
try {
l.getReplicationMode(token.getTokenValue())
.getReplicationProtocol(runtime)
.write(l, ld);
} catch (RuntimeException re) {
// If we have an Overwrite exception, it is already too late for trying
// to validate the state of the write, we know that it didn't went through.
if (re instanceof OverwriteException) {
throw re;
}

validateStateOfWrittenEntry(token.getTokenValue());
}
return null;
});
}, true);

// Cache the successful write
if (!runtime.isCacheDisabled()) {
Expand Down
30 changes: 24 additions & 6 deletions test/src/test/java/org/corfudb/integration/ServerRestartIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ private void runSingleNodeRecoveryTransactionalClient(boolean nested) throws Exc
final int MAX_LIMIT_KEY_RANGE_POST_SHUTDOWN = 200;
final int CLIENT_DELAY_POST_SHUTDOWN = 50;

final int CORFU_SERVER_DOWN_TIME = 4000;

final Random rand = getRandomNumberGenerator();

// Run CORFU Server. Expect slight delay until server is running.
Expand All @@ -266,15 +268,30 @@ private void runSingleNodeRecoveryTransactionalClient(boolean nested) throws Exc
// ShutDown (STOP) CORFU Server
assertThat(shutdownCorfuServer(corfuServerProcess)).isTrue();

// Execute Transactions (once Corfu Server Shutdown)
for (int i = 0; i < ITERATIONS; i++) {
assertThat(executeTransaction(runtime, smrMapList, expectedMapList,
MIN_LIMIT_KEY_RANGE_DURING_SHUTDOWN, MAX_LIMIT_KEY_RANGE_DURING_SHUTDOWN,
nested, rand)).isFalse();
}
// Schedule offline transactions, first one should be stuck and will eventually succeed
// on reconnect
ScheduledThreadPoolExecutor offline = new ScheduledThreadPoolExecutor(1);
ScheduledFuture<Boolean> offlineTransactionsSucceeded = offline.schedule(() -> {
for (int i = 0; i < ITERATIONS; i++) {
boolean txState = executeTransaction(runtime, smrMapList, expectedMapList,
MIN_LIMIT_KEY_RANGE_DURING_SHUTDOWN, MAX_LIMIT_KEY_RANGE_DURING_SHUTDOWN,
nested, rand);

if (!txState) {
return false;
};
}

return true;
},CLIENT_DELAY_POST_SHUTDOWN, TimeUnit.MILLISECONDS);
offline.shutdown();

Thread.sleep(CORFU_SERVER_DOWN_TIME);

// Restart Corfu Server
Process corfuServerProcessRestart = runCorfuServer();
offline.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);

// Block until server is ready.
runtime.invalidateLayout();
runtime.layout.get();
Expand Down Expand Up @@ -312,6 +329,7 @@ private void runSingleNodeRecoveryTransactionalClient(boolean nested) throws Exc

// Data Correctness Validation
assertThat(future.get()).isTrue();
assertThat(offlineTransactionsSucceeded.get()).isTrue();

// ShutDown the server before exiting
assertThat(shutdownCorfuServer(corfuServerProcessRestart)).isTrue();
Expand Down
Loading

0 comments on commit cab0e4a

Please sign in to comment.