-
Notifications
You must be signed in to change notification settings - Fork 121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Simulating partially synchronous n/w #1026
Conversation
fe856fb
to
a1747d7
Compare
Results automatically generated by CorfuDB Benchmark Framework to assess the performance of this pull request for commit a1747d7. *** 0.03333333333333333% transaction FAILURE rate for NonConflictingTx+Scan workload, 1 threads, Disk mode An interactive dashboard with Pull Request Performance Metrics for ALL cluster types and numbers of threads in run, is available at: |
4120d32
to
d11f306
Compare
Results automatically generated by CorfuDB Benchmark Framework to assess the performance of this pull request for commit d11f306. *** 0.0% transaction FAILURE rate for NonConflictingTx+Scan workload, 1 threads, Disk mode An interactive dashboard with Pull Request Performance Metrics for ALL cluster types and numbers of threads in run, is available at: |
.get(); | ||
log.info("Healing nodes successful: {}", pollReport); | ||
} catch (InterruptedException | ExecutionException e) { | ||
log.error("Healing nodes failed: ", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rethrow the interrupted exception
Results automatically generated by CorfuDB Benchmark Framework to assess the performance of this pull request for commit 1267fb5. *** 0.0% transaction FAILURE rate for NonConflictingTx+Scan workload, 1 threads, Disk mode An interactive dashboard with Pull Request Performance Metrics for ALL cluster types and numbers of threads in run, is available at: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks okay stylistically.
However, it's hard to determine what the overall design is. What is partially synchronous? Could you describe what kind of failures we can detect (and which ones we can't?) Can you also describe what conditions we can heal from? You can add this to the first comment (PR description).
@no2chem I have added some explanation in the PR description. More detailed explanation is present in the javadocs. Let me know if this is still unclear. |
Results automatically generated by CorfuDB Benchmark Framework to assess the performance of this pull request for commit 653298c. *** 0.0% transaction FAILURE rate for NonConflictingTx+Scan workload, 1 threads, Disk mode An interactive dashboard with Pull Request Performance Metrics for ALL cluster types and numbers of threads in run, is available at: |
a1c7b05
to
c5354f8
Compare
Results automatically generated by CorfuDB Benchmark Framework to assess the performance of this pull request for commit c5354f8. *** 0.0% transaction FAILURE rate for NonConflictingTx+Scan workload, 1 threads, Disk mode An interactive dashboard with Pull Request Performance Metrics for ALL cluster types and numbers of threads in run, is available at: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
||
/** | ||
* Executes the detector which runs the failure or healing detecting algorithm. | ||
* Gets the polling report from the execution of the detector. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
r/polling/poll
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
import lombok.Data; | ||
|
||
/** | ||
* Poll Report generated by the polling detectors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Poll report generated by detectors that poll to detect failed or healed nodes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -5,7 +5,7 @@ | |||
import lombok.extern.slf4j.Slf4j; | |||
|
|||
import org.corfudb.runtime.CorfuRuntime; | |||
import org.corfudb.runtime.view.IFailureHandlerPolicy; | |||
import org.corfudb.runtime.view.IReconfigurationHandlerPolicy; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
r/or policy detecting a failure in the cluster/or policy detecting a failure or healing in the cluster
r/Handle healing: Handles healing of responsive nodes./Handle healing: Handles healing of unresponsive nodes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
r/Handle healing: Handles healing of responsive nodes./Handle healing: Handles healing of unresponsive nodes.
We actually heal responsive nodes. Shouldn't it be the way it is?
I can change it to:
Handle healing: Handles healing of unresponsive marked nodes which are now responsive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes that is better.
* | ||
* <p>Created by zlokhandwala on 11/21/16. | ||
*/ | ||
public class NoLogUnitHealingPolicy implements IReconfigurationHandlerPolicy { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not understand this name. It is not intuitive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
* @param currentLayout The current layout | ||
* @param healedServers Set of healed server addresses | ||
*/ | ||
public void handleHealing(IReconfigurationHandlerPolicy failureHandlerPolicy, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
param name should be healingHandlerPolicy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
} else { | ||
if (newPeriod != period && newPeriod != 0) { | ||
period = newPeriod; | ||
tuneRoutersResponseTimeout(membersSet, period); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method should not have a side effect like tuningRoutersResponseTimeout .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed side effect
* - Poll result aggregation. | ||
* - If we complete an iteration without detecting any healed nodes, we end the round. | ||
* - Else we continue polling and generate the report with the healed node. | ||
* The management Server ensures only one instance of this class and hence this is NOT thread safe. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the not thread safe disclaimer but I still am not a big fan of object scoped variables unless really making sense.
try { | ||
pollCompletableFutures[i].get(); | ||
responses[i] = pollIteration; | ||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just catch WrongEpochException before the generic Exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WrongEpochException is wrapped under the ExecutionException. So cannot catch in a separate catch block. This classification is required.
try { | ||
pollCompletableFutures[i].get(); | ||
responses[i] = pollIteration; | ||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log errors for exception everywhere needed. It is hard to remember but we need to make sure we have enough logging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, but logging exceptions in this particular loop is really going to produce a lot of unwanted logs.
Since this is a poller fro healing nodes, it usually polls failed nodes and will encounter exceptions every second. We do not want this to pollute our debug logs.
For now, I will log when we encounter a WrongEpochException.
} | ||
|
||
try { | ||
Thread.sleep(interIterationInterval); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you have to do this and not use a scheduled executor feature to start after certain delay the next task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sleep and loop is executed only if there is a failure.
In an ideal scenario, each round consists of only one iteration as there are no failures.
If there are failures, the round confirms this failure by iterating thrice.
So, from my view, this reduces a bit of code complexity. Let me know if you feel otherwise.
534a051
to
3b4b075
Compare
@@ -128,6 +128,10 @@ public synchronized void handleMessageLayoutRequest(CorfuPayloadMsg<Long> msg, | |||
} | |||
} | |||
|
|||
public String le() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this ? probably need to clean this up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
import org.corfudb.runtime.view.QuorumFuturesFactory; | ||
|
||
@Slf4j | ||
public class ManagementAgent { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comments ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
private final String bootstrapEndpoint; | ||
|
||
@Getter | ||
private volatile CompletableFuture<Boolean> sequencerBootstrappedFuture; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
private volatile CompletableFuture<Boolean> sequencerBootstrappedFuture; | ||
|
||
|
||
public ManagementAgent(Callable<CorfuRuntime> getRuntime, ServerContext serverContext) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
} | ||
|
||
serverContext.installSingleNodeLayoutIfAbsent(); | ||
serverContext.saveManagementLayout(serverContext.getCurrentLayout()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are savingManagementLayout twice ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please put comments as this is very dense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comments.
* - This task is executed in intervals of 1 second (default). This task is blocked until | ||
* the management server is bootstrapped and has a connected runtime. | ||
* - On every invocation, this task refreshes the runtime to fetch the latest layout and also | ||
* updates the local copy of the 'latestLayout' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No more latestLayout
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
log.info("Initiated Failure Handler."); | ||
log.info("handleFailureDetectedMsg: Received DetectorMsg : {}", msg.getPayload()); | ||
|
||
DetectorMsg detectorMsg = msg.getPayload(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update comments for this new logic. It is better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -218,6 +232,7 @@ public String getNodeIdBase64() { | |||
public synchronized boolean installSingleNodeLayoutIfAbsent() { | |||
if ((Boolean) getServerConfig().get("--single") && getCurrentLayout() == null) { | |||
setCurrentLayout(getNewSingleNodeLayout()); | |||
log.error("HERE ====="); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
* | ||
* <p>Created by zlokhandwala on 11/21/16. | ||
*/ | ||
public class LayoutSequencerHealingPolicy implements IReconfigurationHandlerPolicy { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename it to SequencerHealingPolicy as it only heals Sequencers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
.initiateFailureHandler().get(); | ||
corfuRuntime.getRouter(SERVERS.ENDPOINT_2).getClient(ManagementClient.class) | ||
.initiateFailureHandler().get(); | ||
getManagementServer(SERVERS.PORT_0).getManagementAgent().getCorfuRuntime(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change this test to first fail and then heal the node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see the main set of comments in the HealingDetector class and follow that set of comments. In general the code is way too verbose for what it does.
@@ -153,6 +153,10 @@ public SequencerServer(ServerContext serverContext) { | |||
globalLogTail.set(initialToken); | |||
} | |||
|
|||
if ((Boolean) opts.get("--single")) { | |||
readyStateEpoch = serverContext.getNewSingleNodeLayout().getEpoch(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't right, this generates a new layout which will always have epoch 0. You can use installSingleNodeLayoutIfAbsent, followed by getting the actual layout.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you get the actual layout ?
You cannot bootstrap the node again as it will throw an Already bootstrapped exception.
If there is a reconfiguration change and the cluster moves to a new epoch, then the sequencer becomes NOT_READY as this epoch is now stale anyways. The management server then takes care of bootstrapping the new primary sequencer for the new epoch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
installSingleNodeLayoutIfAbsent does not bootstrap the node. It installs a layout if it is not present. ManagementServer/LayoutServer constructors call the same function. Then just get the layout from the data store.
I'm not sure how an alreadybootstrap exception would be thrown, that isn't even thrown anywhere in ServerContext.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I'm just saying that once we start a server as a single node (-s) we cannot bootstrap the node again later. As you said, the LayoutServer installs the layout from the local data store.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
public synchronized void saveManagementLayout(Layout layout) { | ||
// Cannot update with a null layout. | ||
if (layout == null) { | ||
log.warn("saveManagementLayout: Attempted to update with null layout"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throw an exception.
@@ -368,16 +384,32 @@ public void setStartingAddress(long startingAddress) { | |||
* | |||
* @param layout Layout to be persisted | |||
*/ | |||
public void setManagementLayout(Layout layout) { | |||
dataStore.put(Layout.class, PREFIX_MANAGEMENT, MANAGEMENT_LAYOUT, layout); | |||
public synchronized void saveManagementLayout(Layout layout) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs to be annotated with @nonnull
* Created by zlokhandwala on 11/29/17. | ||
*/ | ||
@Slf4j | ||
public class FailureDetector implements IDetector { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely we should NOT be unit testing private internal methods.
/** | ||
* Members to poll in every round | ||
*/ | ||
private String[] members; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be a list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -658,8 +663,9 @@ private void checkClusterId(@Nonnull Layout layout) { | |||
// We haven't adopted a clusterId yet. | |||
if (clusterId == null) { | |||
clusterId = layout.getClusterId(); | |||
log.info("Connected to new cluster {}", clusterId == null ? "(legacy)" : | |||
UuidUtils.asBase64(clusterId)); | |||
if (clusterId != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whats the reason for this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if clusterId is null, the logs are filled with this log message.
This is because the management service invalidates the layout every second.
2018-01-16 18:35:46,776 INFO [ForkJoinPool.commonPool-worker-2] o.c.r.CorfuRuntime - Connected to new cluster (legacy)
2018-01-16 18:35:47,778 INFO [ForkJoinPool.commonPool-worker-2] o.c.r.CorfuRuntime - Connected to new cluster (legacy)
2018-01-16 18:35:48,778 INFO [ForkJoinPool.commonPool-worker-2] o.c.r.CorfuRuntime - Connected to new cluster (legacy)
2018-01-16 18:35:49,776 INFO [ForkJoinPool.commonPool-worker-2] o.c.r.CorfuRuntime - Connected to new cluster (legacy)
2018-01-16 18:35:50,780 INFO [ForkJoinPool.commonPool-worker-2] o.c.r.CorfuRuntime - Connected to new cluster (legacy)
2018-01-16 18:35:51,779 INFO [ForkJoinPool.commonPool-worker-2] o.c.r.CorfuRuntime - Connected to new cluster (legacy)
2018-01-16 18:35:52,776 INFO [ForkJoinPool.commonPool-worker-2] o.c.r.CorfuRuntime - Connected to new cluster (legacy)
2018-01-16 18:35:53,777 INFO [ForkJoinPool.commonPool-worker-2] o.c.r.CorfuRuntime - Connected to new cluster (legacy)
2018-01-16 18:35:54,774 INFO [ForkJoinPool.commonPool-worker-2] o.c.r.CorfuRuntime - Connected to new cluster (legacy)
2018-01-16 18:35:55,777 INFO [ForkJoinPool.commonPool-worker-2] o.c.r.CorfuRuntime - Connected to new cluster (legacy)
2018-01-16 18:35:56,778 INFO [ForkJoinPool.commonPool-worker-2] o.c.r.CorfuRuntime - Connected to new cluster (legacy)
2018-01-16 18:35:57,778 INFO [ForkJoinPool.commonPool-worker-2] o.c.r.CorfuRuntime - Connected to new cluster (legacy)
2018-01-16 18:35:58,777 INFO [ForkJoinPool.commonPool-worker-2] o.c.r.CorfuRuntime - Connected to new cluster (legacy)
Is there any other alternative? Should I keep this and reduce it to TRACE? It would still produce a lot of logs.
/** | ||
* Retries to connect to a disconnected node. | ||
*/ | ||
@Default int connectionRetries = 3; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't seem like it should be part of the runtime.
@@ -361,6 +362,9 @@ private synchronized void connectChannel(Bootstrap b) | |||
log.warn("Exception while reconnecting, retry in {} ms", timeoutRetry, e); | |||
Sleep.MILLISECONDS.sleepUninterruptibly(timeoutRetry); | |||
} | |||
if (--retryCount == 0) { | |||
throw new NetworkException("Connection retry limit reached.", node); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a good idea. No one will see this exception since it runs on a Netty future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code was throwing an un-handled exception earlier after a certain number of retries.
2018-01-17 15:49:49,899 WARN [client-1] i.n.u.c.DefaultPromise - An exception was thrown by org.corfudb.runtime.clients.NettyClientRouter$$Lambda$123/1147741970.operationComplete()
org.corfudb.runtime.exceptions.NetworkException: Retry limit reached. [endpoint=tcp://localhost:9001/]
at org.corfudb.runtime.clients.NettyClientRouter.lambda$connectChannel$2(NettyClientRouter.java:359)
* @throws LayoutModificationException Thrown if attempt to create an invalid layout. | ||
*/ | ||
@Override | ||
public Layout generateLayout(Layout originalLayout, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
javax annotations, please
import org.corfudb.runtime.view.Layout; | ||
import org.corfudb.util.Sleep; | ||
import org.corfudb.util.Utils; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see comments on healing detector. This logic can be summarized in a few lines and without state information.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
bb38010
to
b952852
Compare
@no2chem Addressed all comments as we discussed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, looks much better to me
Can you rebase plz lets get this in. |
f65ec1d
to
e143e94
Compare
* Simulating partially synchronous n/w * Refactored detector code.
Failure detector simulating partially synchronous network.
Added healing detector which polls and detects healed nodes.
Partially Synchronous Network.
A completely synchronous network would guarantee message delivery in a limited time frame. This would be ideal to detect failures if we fail to receive a response in the determined period.
Since this is not practical, we can have delayed responses from slow servers. It would not be right to
mark a slow node as failed just because it has delayed responses.
New Fault Detector.
The new fault detector handles this in the following manner.
Healing Detector.