Skip to content

Commit

Permalink
Fix set router epoch on runtime invalidate.
Browse files Browse the repository at this point in the history
  • Loading branch information
Zeeshan Lokhandwala committed Dec 6, 2017
1 parent c48e731 commit b507ff2
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 26 deletions.
19 changes: 11 additions & 8 deletions runtime/src/main/java/org/corfudb/runtime/CorfuRuntime.java
Original file line number Diff line number Diff line change
Expand Up @@ -531,14 +531,17 @@ private CompletableFuture<Layout> fetchLayout() {
// it is acceptable (at least the code on 10/13/2016 does not have issues)
// but setEpoch of routers needs to be synchronized as those variables are
// not local.
try {
l.getAllServers().stream().map(getRouterFunction).forEach(x ->
x.setEpoch(l.getEpoch()));
} catch (NetworkException ne) {
// We have already received the layout and there is no need to keep client waiting.
// NOTE: This is true assuming this happens only at router creation.
// If not we also have to take care of setting the latest epoch on Client Router.
log.warn("fetchLayout: Error getting router : {}", ne);
for (String server : l.getAllServers()) {
try {
getRouter(server).setEpoch(l.getEpoch());
} catch (NetworkException ne) {
// We have already received the layout and there is no need to keep
// client waiting.
// NOTE: This is true assuming this happens only at router creation.
// If not we also have to take care of setting the latest epoch on
// Client Router.
log.warn("fetchLayout: Error getting router : {}", ne);
}
}
layoutServers = l.getLayoutServers();
layout = layoutFuture;
Expand Down
113 changes: 96 additions & 17 deletions test/src/test/java/org/corfudb/runtime/CorfuRuntimeTest.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
package org.corfudb.runtime;

import org.corfudb.infrastructure.TestLayoutBuilder;
import org.corfudb.runtime.clients.LogUnitClient;
import org.corfudb.runtime.clients.TestRule;
import org.corfudb.infrastructure.TestServerRouter;
import org.corfudb.runtime.clients.*;
import org.corfudb.runtime.exceptions.NetworkException;
import org.corfudb.runtime.exceptions.WrongEpochException;
import org.corfudb.runtime.view.AbstractViewTest;
import org.corfudb.runtime.view.Layout;
import org.corfudb.runtime.view.stream.IStreamView;
import org.corfudb.util.CFUtils;
import org.junit.Before;
import org.junit.Test;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand All @@ -25,6 +33,15 @@
public class CorfuRuntimeTest extends AbstractViewTest {
static final int TIME_TO_WAIT_FOR_LAYOUT_IN_SEC = 5;

/**
* Resets the router function to the default function for AbstractViewTest.
*/
@Before
public void setDefaultRuntimeGetRouterFunction() {
CorfuRuntime.overrideGetRouterFunction =
(runtime, endpoint) -> super.getRouterFunction(runtime, endpoint);
}

@Test
public void checkValidLayout() throws Exception {

Expand Down Expand Up @@ -57,19 +74,13 @@ public void canInstantiateRuntimeWithoutTestRef() throws Exception {
}

/**
* Ensures that we will not accept a Layout that is obsolete.
*
* Test storyline:
* 1. Seal the 3 servers
* 2. Install a new Layout only on 2 of them
* 3. Force the client to receive the Layout only from the staled Layout server.
* 4. Ensure that we will never accept it.
* Generates and bootstraps a 3 node cluster.
* Shuts down the management servers of the 3 nodes.
*
* @return The generated layout.
* @throws Exception
*/
@Test
public void doesNotUpdateToLayoutWithSmallerEpoch() throws Exception {

private Layout get3NodeLayout() throws Exception {
addServer(SERVERS.PORT_0);
addServer(SERVERS.PORT_1);
addServer(SERVERS.PORT_2);
Expand All @@ -92,15 +103,30 @@ public void doesNotUpdateToLayoutWithSmallerEpoch() throws Exception {

bootstrapAllServers(l);

CorfuRuntime rt = new CorfuRuntime(SERVERS.ENDPOINT_0);
rt.connect();


// Shutdown management server (they interfere)
getManagementServer(SERVERS.PORT_0).shutdown();
getManagementServer(SERVERS.PORT_1).shutdown();
getManagementServer(SERVERS.PORT_2).shutdown();

return l;
}

/**
* Ensures that we will not accept a Layout that is obsolete.
*
* Test storyline:
* 1. Seal the 3 servers
* 2. Install a new Layout only on 2 of them
* 3. Force the client to receive the Layout only from the staled Layout server.
* 4. Ensure that we will never accept it.
*
* @throws Exception
*/
@Test
public void doesNotUpdateToLayoutWithSmallerEpoch() throws Exception {

CorfuRuntime rt = getRuntime(get3NodeLayout()).connect();

// Seal
Layout currentLayout = (Layout) rt.getLayoutView().getCurrentLayout().clone();
currentLayout.setRuntime(rt);
Expand Down Expand Up @@ -169,8 +195,61 @@ public void doesNotAllowReadsAfterSealAndBeforeNewLayout() throws Exception {

LogUnitClient luc = runtime.getRouter(SERVERS.ENDPOINT_0).getClient(LogUnitClient.class);

assertThatThrownBy(() ->luc.read(0).get())
assertThatThrownBy(() -> luc.read(0).get())
.isInstanceOf(ExecutionException.class)
.hasRootCauseInstanceOf(WrongEpochException.class);
}

@Test
public void testFailedRouterInFetchLayout() throws Exception {

final Map<String, TestClientRouter> routerMap = new ConcurrentHashMap<>();
final AtomicReference<String> failedNode = new AtomicReference<>();

CorfuRuntime.overrideGetRouterFunction = (corfuRuntime, endpoint) -> {
if (failedNode.get() != null && endpoint.equals(failedNode.get())) {
throw new NetworkException("Test server not responding : ", endpoint);
}
if (!endpoint.startsWith("test:")) {
throw new RuntimeException("Unsupported endpoint in test: " + endpoint);
}
return routerMap.computeIfAbsent(endpoint,
x -> {
TestClientRouter tcn =
new TestClientRouter(
(TestServerRouter) getServerRouter(getPort(endpoint)));
tcn.addClient(new BaseClient())
.addClient(new SequencerClient())
.addClient(new LayoutClient())
.addClient(new LogUnitClient())
.addClient(new ManagementClient());
return tcn;
}
);
};

Layout l = get3NodeLayout();
CorfuRuntime runtime = getRuntime(l).connect();
runtime.invalidateLayout();
String[] serverArray = runtime.getLayoutView().getLayout().getAllServers()
.toArray(new String[3]);

l.setRuntime(runtime);
l.setEpoch(l.getEpoch() + 1);
l.moveServersToEpoch();
runtime.getLayoutView().updateLayout(l, 1L);

assertThat(routerMap.get(serverArray[0]).getEpoch()).isEqualTo(1L);
assertThat(routerMap.get(serverArray[1]).getEpoch()).isEqualTo(1L);
assertThat(routerMap.get(serverArray[2]).getEpoch()).isEqualTo(1L);

failedNode.set((String) l.getAllServers().toArray()[0]);

runtime.invalidateLayout();
runtime.getLayoutView().getLayout();
assertThat(routerMap.get(serverArray[0]).getEpoch()).isEqualTo(1L);
assertThat(routerMap.get(serverArray[1]).getEpoch()).isEqualTo(2L);
assertThat(routerMap.get(serverArray[2]).getEpoch()).isEqualTo(2L);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public AbstractViewTest() {
* @param endpoint An endpoint string for the router.
* @return
*/
private IClientRouter getRouterFunction(CorfuRuntime runtime, String endpoint) {
protected IClientRouter getRouterFunction(CorfuRuntime runtime, String endpoint) {
runtimeRouterMap.putIfAbsent(runtime, new ConcurrentHashMap<>());
if (!endpoint.startsWith("test:")) {
throw new RuntimeException("Unsupported endpoint in test: " + endpoint);
Expand Down Expand Up @@ -350,6 +350,16 @@ public String getEndpoint(int port) {
return "test:" + port;
}

/**
* Get the port from the endpoint.
*
* @param endpoint The endpoint string.
* @return The port in the endpoint.
*/
public Integer getPort(String endpoint) {
return Integer.parseInt(endpoint.split(":")[1]);
}


// Private

Expand Down

0 comments on commit b507ff2

Please sign in to comment.