Skip to content

Commit

Permalink
Fix set router epoch on runtime invalidate.
Browse files Browse the repository at this point in the history
  • Loading branch information
Zeeshan Lokhandwala committed Dec 12, 2017
1 parent 3b8cfb8 commit 706c3c3
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 25 deletions.
19 changes: 11 additions & 8 deletions runtime/src/main/java/org/corfudb/runtime/CorfuRuntime.java
Original file line number Diff line number Diff line change
Expand Up @@ -568,14 +568,17 @@ private CompletableFuture<Layout> fetchLayout() {
// it is acceptable (at least the code on 10/13/2016 does not have issues)
// but setEpoch of routers needs to be synchronized as those variables are
// not local.
try {
l.getAllServers().stream().map(getRouterFunction).forEach(x ->
x.setEpoch(l.getEpoch()));
} catch (NetworkException ne) {
// We have already received the layout and there is no need to keep client waiting.
// NOTE: This is true assuming this happens only at router creation.
// If not we also have to take care of setting the latest epoch on Client Router.
log.warn("fetchLayout: Error getting router : {}", ne);
for (String server : l.getAllServers()) {
try {
getRouter(server).setEpoch(l.getEpoch());
} catch (NetworkException ne) {
// We have already received the layout and there is no need to keep
// client waiting.
// NOTE: This is true assuming this happens only at router creation.
// If not we also have to take care of setting the latest epoch on
// Client Router.
log.warn("fetchLayout: Error getting router : {}", ne);
}
}
layoutServers = l.getLayoutServers();
layout = layoutFuture;
Expand Down
124 changes: 109 additions & 15 deletions test/src/test/java/org/corfudb/runtime/CorfuRuntimeTest.java
Original file line number Diff line number Diff line change
@@ -1,20 +1,33 @@
package org.corfudb.runtime;

import org.corfudb.infrastructure.TestLayoutBuilder;

import org.corfudb.runtime.clients.BaseClient;
import org.corfudb.runtime.clients.LayoutClient;
import org.corfudb.runtime.clients.LogUnitClient;
import org.corfudb.runtime.clients.ManagementClient;
import org.corfudb.runtime.clients.SequencerClient;
import org.corfudb.runtime.clients.TestClientRouter;
import org.corfudb.runtime.clients.TestRule;
import org.corfudb.runtime.exceptions.unrecoverable.SystemUnavailableError;

import org.corfudb.infrastructure.TestServerRouter;
import org.corfudb.runtime.exceptions.NetworkException;
import org.corfudb.runtime.exceptions.WrongEpochException;
import org.corfudb.runtime.view.AbstractViewTest;
import org.corfudb.runtime.view.Layout;
import org.corfudb.runtime.view.stream.IStreamView;
import org.corfudb.util.CFUtils;
import org.junit.Before;
import org.junit.Test;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand All @@ -29,6 +42,15 @@ public class CorfuRuntimeTest extends AbstractViewTest {



/**
* Resets the router function to the default function for AbstractViewTest.
*/
@Before
public void setDefaultRuntimeGetRouterFunction() {
CorfuRuntime.overrideGetRouterFunction =
(runtime, endpoint) -> super.getRouterFunction(runtime, endpoint);
}

@Test
public void checkValidLayout() throws Exception {

Expand Down Expand Up @@ -61,19 +83,13 @@ public void canInstantiateRuntimeWithoutTestRef() throws Exception {
}

/**
* Ensures that we will not accept a Layout that is obsolete.
*
* Test storyline:
* 1. Seal the 3 servers
* 2. Install a new Layout only on 2 of them
* 3. Force the client to receive the Layout only from the staled Layout server.
* 4. Ensure that we will never accept it.
* Generates and bootstraps a 3 node cluster.
* Shuts down the management servers of the 3 nodes.
*
* @return The generated layout.
* @throws Exception
*/
@Test
public void doesNotUpdateToLayoutWithSmallerEpoch() throws Exception {

private Layout get3NodeLayout() throws Exception {
addServer(SERVERS.PORT_0);
addServer(SERVERS.PORT_1);
addServer(SERVERS.PORT_2);
Expand All @@ -96,15 +112,30 @@ public void doesNotUpdateToLayoutWithSmallerEpoch() throws Exception {

bootstrapAllServers(l);

CorfuRuntime rt = new CorfuRuntime(SERVERS.ENDPOINT_0);
rt.connect();


// Shutdown management server (they interfere)
getManagementServer(SERVERS.PORT_0).shutdown();
getManagementServer(SERVERS.PORT_1).shutdown();
getManagementServer(SERVERS.PORT_2).shutdown();

return l;
}

/**
* Ensures that we will not accept a Layout that is obsolete.
*
* Test storyline:
* 1. Seal the 3 servers
* 2. Install a new Layout only on 2 of them
* 3. Force the client to receive the Layout only from the staled Layout server.
* 4. Ensure that we will never accept it.
*
* @throws Exception
*/
@Test
public void doesNotUpdateToLayoutWithSmallerEpoch() throws Exception {

CorfuRuntime rt = getRuntime(get3NodeLayout()).connect();

// Seal
Layout currentLayout = (Layout) rt.getLayoutView().getCurrentLayout().clone();
currentLayout.setRuntime(rt);
Expand Down Expand Up @@ -173,7 +204,7 @@ public void doesNotAllowReadsAfterSealAndBeforeNewLayout() throws Exception {

LogUnitClient luc = runtime.getRouter(SERVERS.ENDPOINT_0).getClient(LogUnitClient.class);

assertThatThrownBy(() ->luc.read(0).get())
assertThatThrownBy(() -> luc.read(0).get())
.isInstanceOf(ExecutionException.class)
.hasRootCauseInstanceOf(WrongEpochException.class);
}
Expand Down Expand Up @@ -233,4 +264,67 @@ void stopRuntimeAndThrowException() {
isInstanceOf(SystemUnavailableError.class);

}


/**
* Creates and bootstraps 3 nodes N0, N1 and N2.
* The runtime connects to the 3 nodes and sets the clientRouter epochs to 1, 1 and 1.
* Now the epoch is updated to 2.
* We now add a rule which throws a NetworkException while fetching a router to the first node
* in the list. The runtime is now invalidated, which forces to update the client router
* epochs. This should now update the epochs to the following: 1, 2, 2.
*
* @throws Exception
*/
@Test
public void testFailedRouterInFetchLayout() throws Exception {

final Map<String, TestClientRouter> routerMap = new ConcurrentHashMap<>();
final AtomicReference<String> failedNode = new AtomicReference<>();

CorfuRuntime.overrideGetRouterFunction = (corfuRuntime, endpoint) -> {
if (failedNode.get() != null && endpoint.equals(failedNode.get())) {
throw new NetworkException("Test server not responding : ", endpoint);
}
if (!endpoint.startsWith("test:")) {
throw new RuntimeException("Unsupported endpoint in test: " + endpoint);
}
return routerMap.computeIfAbsent(endpoint,
x -> {
TestClientRouter tcn =
new TestClientRouter(
(TestServerRouter) getServerRouter(getPort(endpoint)));
tcn.addClient(new BaseClient())
.addClient(new SequencerClient())
.addClient(new LayoutClient())
.addClient(new LogUnitClient())
.addClient(new ManagementClient());
return tcn;
}
);
};

Layout l = get3NodeLayout();
CorfuRuntime runtime = getRuntime(l).connect();
String[] serverArray = runtime.getLayoutView().getLayout().getAllServers()
.toArray(new String[l.getAllServers().size()]);

l.setRuntime(runtime);
l.setEpoch(l.getEpoch() + 1);
l.moveServersToEpoch();
runtime.getLayoutView().updateLayout(l, 1L);

assertThat(routerMap.get(serverArray[0]).getEpoch()).isEqualTo(1L);
assertThat(routerMap.get(serverArray[1]).getEpoch()).isEqualTo(1L);
assertThat(routerMap.get(serverArray[2]).getEpoch()).isEqualTo(1L);

failedNode.set((String) l.getAllServers().toArray()[0]);

runtime.invalidateLayout();
runtime.getLayoutView().getLayout();
assertThat(routerMap.get(serverArray[0]).getEpoch()).isEqualTo(1L);
assertThat(routerMap.get(serverArray[1]).getEpoch()).isEqualTo(2L);
assertThat(routerMap.get(serverArray[2]).getEpoch()).isEqualTo(2L);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public synchronized void setEpoch(long epoch) {
@Setter
public UUID clientID;

private boolean connected = true;
private volatile boolean connected = true;

public void simulateDisconnectedEndpoint() {
connected = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void simulateEndpointDisconnected(CorfuRuntime runtime) {
* @param endpoint An endpoint string for the router.
* @return
*/
private IClientRouter getRouterFunction(CorfuRuntime runtime, String endpoint) {
protected IClientRouter getRouterFunction(CorfuRuntime runtime, String endpoint) {
runtimeRouterMap.putIfAbsent(runtime, new ConcurrentHashMap<>());
if (!endpoint.startsWith("test:")) {
throw new RuntimeException("Unsupported endpoint in test: " + endpoint);
Expand Down Expand Up @@ -355,6 +355,15 @@ public String getEndpoint(int port) {
return "test:" + port;
}

/**
* Get the port from the endpoint.
*
* @param endpoint The endpoint string.
* @return The port in the endpoint.
*/
public Integer getPort(String endpoint) {
return Integer.parseInt(endpoint.split(":")[1]);
}

// Private

Expand Down

0 comments on commit 706c3c3

Please sign in to comment.