Skip to content

Commit

Permalink
Merge pull request #3608 from VoltDB/ENG-10453-lostwrites
Browse files Browse the repository at this point in the history
ENG-10453: Don't let work happen before partition detection kills nodes.
  • Loading branch information
jhugg committed May 24, 2016
2 parents 79b3e30 + 113e0e5 commit 99b46b2
Show file tree
Hide file tree
Showing 14 changed files with 373 additions and 371 deletions.
22 changes: 15 additions & 7 deletions src/frontend/org/voltcore/agreement/AgreementSite.java
Original file line number Diff line number Diff line change
Expand Up @@ -612,15 +612,15 @@ private void discoverGlobalFaultData(FaultMessage faultMessage) {
null);
}
Set<Long> unknownFaultedHosts = new TreeSet<>();

// This one line is a biggie. Gets agreement on what the post-fault cluster will be.
Map<Long, Long> initiatorSafeInitPoint = m_meshArbiter.reconfigureOnFault(m_hsIds, faultMessage, unknownFaultedHosts);
Set<Long> failedSites;
if (!initiatorSafeInitPoint.isEmpty()) {
failedSites = initiatorSafeInitPoint.keySet();
handleSiteFaults(failedSites, initiatorSafeInitPoint);
} else if (unknownFaultedHosts.isEmpty()) {

ImmutableSet<Long> failedSites = ImmutableSet.copyOf(initiatorSafeInitPoint.keySet());

// check if nothing actually happened
if (initiatorSafeInitPoint.isEmpty() && unknownFaultedHosts.isEmpty()) {
return;
} else {
failedSites = ImmutableSet.of();
}

ImmutableSet.Builder<Integer> failedHosts = ImmutableSet.builder();
Expand All @@ -634,6 +634,14 @@ private void discoverGlobalFaultData(FaultMessage faultMessage) {
}
m_failedHostsCallback.disconnect(failedHosts.build());

// Handle the failed sites after the failedHostsCallback to ensure
// that partition detection is run first -- as this might release
// work back to a client waiting on a failure notice. That's unsafe
// if we partitioned.
if (!initiatorSafeInitPoint.isEmpty()) {
handleSiteFaults(failedSites, initiatorSafeInitPoint);
}

m_hsIds.removeAll(failedSites);
}

Expand Down
38 changes: 19 additions & 19 deletions src/frontend/org/voltcore/agreement/MeshArbiter.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public class MeshArbiter {
*/
protected final Set<Long> m_failedSites = Sets.newTreeSet();

protected final Map<Long,SiteFailureForwardMessage> m_forwardCandidates = Maps.newHashMap();
protected final Map<Long, SiteFailureForwardMessage> m_forwardCandidates = Maps.newHashMap();
/**
* it builds mesh graphs, and determines the the kill set to resolve
* an arbitration
Expand Down Expand Up @@ -265,12 +265,12 @@ Map<Long, Long> reconfigureOnFault(Set<Long> hsIds, FaultMessage fm) {
* indicate either a stale message, or that an agreement has not been
* reached
*/
public Map<Long,Long> reconfigureOnFault(Set<Long> hsIds, FaultMessage fm, Set<Long> unknownFaultedSites) {
public Map<Long, Long> reconfigureOnFault(Set<Long> hsIds, FaultMessage fm, Set<Long> unknownFaultedSites) {
boolean proceed = false;
do {
Discard ignoreIt = mayIgnore(hsIds,fm);
Discard ignoreIt = mayIgnore(hsIds, fm);
if (Discard.DoNot == ignoreIt) {
m_inTrouble.put(fm.failedSite,fm.witnessed || fm.decided);
m_inTrouble.put(fm.failedSite, fm.witnessed || fm.decided);
m_recoveryLog.info("Agreement, Processing " + fm);
proceed = true;
} else {
Expand All @@ -280,7 +280,7 @@ public Map<Long,Long> reconfigureOnFault(Set<Long> hsIds, FaultMessage fm, Set<L
if (Discard.Unknown == ignoreIt) {
unknownFaultedSites.add(fm.failedSite);
}
fm = (FaultMessage)m_mailbox.recv(justFailures);
fm = (FaultMessage) m_mailbox.recv(justFailures);
} while (fm != null);

if (!proceed) {
Expand All @@ -296,7 +296,7 @@ public Map<Long,Long> reconfigureOnFault(Set<Long> hsIds, FaultMessage fm, Set<L
discoverGlobalFaultData_send(hsIds);

if (discoverGlobalFaultData_rcv(hsIds)) {
Map<Long,Long> lastTxnIdByFailedSite = extractGlobalFaultData(hsIds);
Map<Long, Long> lastTxnIdByFailedSite = extractGlobalFaultData(hsIds);
if (lastTxnIdByFailedSite.isEmpty()) {
return ImmutableMap.of();
}
Expand Down Expand Up @@ -333,14 +333,14 @@ public Map<Long,Long> reconfigureOnFault(Set<Long> hsIds, FaultMessage fm, Set<L
* @param decision map where the keys contain the kill sites
* and its values are their last known safe transaction ids
*/
protected void notifyOnKill(Set<Long> hsIds, Map<Long,Long> decision) {
protected void notifyOnKill(Set<Long> hsIds, Map<Long, Long> decision) {

SiteFailureMessage.Builder sfmb = SiteFailureMessage.
builder()
.decisions(decision.keySet())
.failures(decision.keySet());

Set<Long> dests = Sets.filter(m_seeker.getSurvivors(),not(equalTo(m_hsId)));
Set<Long> dests = Sets.filter(m_seeker.getSurvivors(), not(equalTo(m_hsId)));
if (dests.isEmpty()) return;

sfmb.survivors(Sets.difference(m_seeker.getSurvivors(), decision.keySet()));
Expand All @@ -360,10 +360,10 @@ protected void clearInTrouble(Set<Long> decision) {
m_inTroubleCount = 0;
}

protected Map<Long,Long> getSafeTxnIdsForSites(Set<Long> hsIds) {
ImmutableMap.Builder<Long,Long> safeb = ImmutableMap.builder();
protected Map<Long, Long> getSafeTxnIdsForSites(Set<Long> hsIds) {
ImmutableMap.Builder<Long, Long> safeb = ImmutableMap.builder();
for (long h: Sets.filter(hsIds, not(equalTo(m_hsId)))) {
safeb.put(h,m_meshAide.getNewestSafeTransactionForInitiator(h));
safeb.put(h, m_meshAide.getNewestSafeTransactionForInitiator(h));
}
return safeb.build();
}
Expand All @@ -376,7 +376,7 @@ protected Map<Long,Long> getSafeTxnIdsForSites(Set<Long> hsIds) {
* Sends all data all the time to avoid a need for request/response.
*/
private void discoverGlobalFaultData_send(Set<Long> hsIds) {
Set<Long> dests = Sets.filter(m_seeker.getSurvivors(),not(equalTo(m_hsId)));
Set<Long> dests = Sets.filter(m_seeker.getSurvivors(), not(equalTo(m_hsId)));

SiteFailureMessage.Builder msgBuilder = SiteFailureMessage.
builder()
Expand All @@ -395,7 +395,7 @@ private void discoverGlobalFaultData_send(Set<Long> hsIds) {
m_recoveryLog.info("Agreement, Sending survivors " + sfm);
}

protected void updateFailedSitesLedger(Set<Long> hsIds,SiteFailureMessage sfm) {
protected void updateFailedSitesLedger(Set<Long> hsIds, SiteFailureMessage sfm) {
for (Map.Entry<Long, Long> e: sfm.m_safeTxnIds.entrySet()) {

if( !hsIds.contains(e.getKey())
Expand Down Expand Up @@ -452,7 +452,7 @@ private boolean discoverGlobalFaultData_rcv(Set<Long> hsIds) {

} else if (m.getSubject() == Subject.SITE_FAILURE_UPDATE.getId()) {

SiteFailureMessage sfm = (SiteFailureMessage)m;
SiteFailureMessage sfm = (SiteFailureMessage) m;

if ( !m_seeker.getSurvivors().contains(m.m_sourceHSId)
|| m_failedSites.contains(m.m_sourceHSId)
Expand All @@ -467,7 +467,7 @@ private boolean discoverGlobalFaultData_rcv(Set<Long> hsIds) {

} else if (m.getSubject() == Subject.SITE_FAILURE_FORWARD.getId()) {

SiteFailureForwardMessage fsfm = (SiteFailureForwardMessage)m;
SiteFailureForwardMessage fsfm = (SiteFailureForwardMessage) m;

addForwardCandidate(fsfm);

Expand All @@ -487,9 +487,9 @@ private boolean discoverGlobalFaultData_rcv(Set<Long> hsIds) {
* If the fault distributor reports a new fault, ignore it if it is known , otherwise
* re-deliver the message to ourself and then abort so that the process can restart.
*/
FaultMessage fm = (FaultMessage)m;
FaultMessage fm = (FaultMessage) m;

Discard ignoreIt = mayIgnore(hsIds,fm);
Discard ignoreIt = mayIgnore(hsIds, fm);
if (Discard.DoNot == ignoreIt) {
m_mailbox.deliverFront(m);
m_recoveryLog.info("Agreement, Detected a concurrent failure from FaultDistributor, new failed site "
Expand All @@ -512,7 +512,7 @@ private boolean discoverGlobalFaultData_rcv(Set<Long> hsIds) {
Map.Entry<Long, SiteFailureForwardMessage> e = itr.next();
Set<Long> unseenBy = m_seeker.forWhomSiteIsDead(e.getKey());
if (unseenBy.size() > 0) {
m_mailbox.send(Longs.toArray(unseenBy),e.getValue());
m_mailbox.send(Longs.toArray(unseenBy), e.getValue());
m_recoveryLog.info("Agreement, fowarding to "
+ CoreUtils.hsIdCollectionToString(unseenBy)
+ " " + e.getValue());
Expand Down Expand Up @@ -560,7 +560,7 @@ private boolean haveNecessaryFaultInfo( Set<Long> survivors, boolean log) {
return missingMessages.isEmpty();
}

private Map<Long,Long> extractGlobalFaultData(Set<Long> hsIds) {
private Map<Long, Long> extractGlobalFaultData(Set<Long> hsIds) {

if (!haveNecessaryFaultInfo(m_seeker.getSurvivors(), false)) {
VoltDB.crashLocalVoltDB("Error extracting fault data", true, null);
Expand Down
22 changes: 20 additions & 2 deletions src/frontend/org/voltcore/messaging/ForeignHost.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import com.google_voltpatches.common.base.Throwables;

import org.voltcore.logging.Level;
import org.voltcore.logging.VoltLogger;
import org.voltcore.network.Connection;
Expand All @@ -42,6 +41,8 @@
import org.voltdb.OperationMode;
import org.voltdb.VoltDB;

import com.google_voltpatches.common.base.Throwables;

public class ForeignHost {
private static final VoltLogger hostLog = new VoltLogger("HOST");
private static RateLimitedLogger rateLimitedLogger;
Expand All @@ -65,6 +66,10 @@ public class ForeignHost {

private final AtomicInteger m_deadReportsCount = new AtomicInteger(0);

// used to immediately cut off reads from a foreign host
// great way to trigger a heartbeat timout / simulate a network partition
private AtomicBoolean m_linkCutForTest = new AtomicBoolean(false);

public static final int POISON_PILL = -1;

public static final int CRASH_ALL = 0;
Expand All @@ -81,6 +86,11 @@ public int getMaxRead() {

@Override
public void handleMessage(ByteBuffer message, Connection c) throws IOException {
// if this link is "gone silent" for partition tests, just drop the message on the floor
if (m_linkCutForTest.get()) {
return;
}

handleRead(message, c);
}

Expand Down Expand Up @@ -409,4 +419,12 @@ public void updateDeadHostTimeout(int timeout) {
m_deadHostTimeout = timeout;
setLogRate(timeout);
}

/**
* used to immediately cut off reads from a foreign host
* great way to trigger a heartbeat timout / simulate a network partition
*/
void cutLink() {
m_linkCutForTest.set(true);
}
}
Loading

0 comments on commit 99b46b2

Please sign in to comment.