Skip to content

Commit

Permalink
Checkpoint WIP -- Do dupe handling for InitiateTask/Response.
Browse files Browse the repository at this point in the history
  • Loading branch information
Mike Ismert committed May 31, 2012
1 parent 78dd799 commit f4f8db2
Showing 1 changed file with 58 additions and 26 deletions.
84 changes: 58 additions & 26 deletions src/frontend/org/voltdb/iv2/SpScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.voltdb.messaging.BorrowTaskMessage;
import org.voltdb.messaging.InitiateResponseMessage;
import org.voltdb.ProcedureRunner;
import org.voltdb.VoltDB;
import org.voltdb.VoltTable;
import org.voltdb.dtxn.TransactionState;
import org.voltdb.messaging.CompleteTransactionMessage;
Expand Down Expand Up @@ -88,28 +89,45 @@ public void handleIv2InitiateTaskMessage(Iv2InitiateTaskMessage message)
final String procedureName = message.getStoredProcedureName();
final ProcedureRunner runner = m_loadedProcs.getProcByName(procedureName);
if (message.isSinglePartition()) {
long new_txnId = m_txnId.incrementAndGet();
if (m_replicaHSIds.length > 0) {
try {
Iv2InitiateTaskMessage replmsg =
new Iv2InitiateTaskMessage(m_mailbox.getHSId(),
m_mailbox.getHSId(),
message.getTxnId(),
message.isReadOnly(),
message.isSinglePartition(),
message.getStoredProcedureInvocation(),
message.getClientInterfaceHandle()) ;
m_mailbox.send(m_replicaHSIds, replmsg);
} catch (MessagingException e) {
hostLog.error("Failed to deliver response from execution site.", e);
long newSpHandle;
if (m_isLeader) {
// Need to set the SP handle on the received message
newSpHandle = m_txnId.incrementAndGet();
message.setSpHandle(newSpHandle);
// Also, if this is a vanilla single-part procedure, make the TXNID
// be the SpHandle (for now)
if (!runner.isEverySite()) {
message.setTxnId(newSpHandle);
}
DuplicateCounter counter = new DuplicateCounter(
message.getInitiatorHSId(), m_replicaHSIds.length + 1,
new_txnId);
if (m_replicaHSIds.length > 0) {
try {
Iv2InitiateTaskMessage replmsg =
new Iv2InitiateTaskMessage(m_mailbox.getHSId(),
m_mailbox.getHSId(),
message.getTxnId(),
message.isReadOnly(),
message.isSinglePartition(),
message.getStoredProcedureInvocation(),
message.getClientInterfaceHandle());
// Update the handle in the copy
message.setSpHandle(newSpHandle);
m_mailbox.send(m_replicaHSIds, replmsg);
} catch (MessagingException e) {
hostLog.error("Failed to deliver response from execution site.", e);
}
DuplicateCounter counter = new DuplicateCounter(
message.getInitiatorHSId(), m_replicaHSIds.length + 1,
message.getTxnId());
m_duplicateCounters.put(newSpHandle, counter);
}
}
else {
newSpHandle = message.getSpHandle();
// FUTURE: update SP handle state on replicas based on value from primary
}
final SpProcedureTask task =
new SpProcedureTask(m_mailbox, runner,
new_txnId, m_pendingTasks, message);
newSpHandle, m_pendingTasks, message);
m_pendingTasks.offer(task);
return;
}
Expand All @@ -124,16 +142,30 @@ public void handleIv2InitiateTaskMessage(Iv2InitiateTaskMessage message)
public void handleInitiateResponseMessage(InitiateResponseMessage message)
{
// Send the message to the duplicate counter, if any
// ADD CODE HERE

if (message.getInitiatorHSId() != m_mailbox.getHSId()) {
try {
// the initiatorHSId is the ClientInterface mailbox. Yeah. I know.
m_mailbox.send(message.getInitiatorHSId(), message);
DuplicateCounter counter = m_duplicateCounters.get(message.getSpHandle());
if (counter != null) {
int result = counter.offer(message);
if (result == DuplicateCounter.DONE) {
m_duplicateCounters.remove(message.getSpHandle());
try {
m_mailbox.send(counter.m_destinationId, message);
} catch (MessagingException e) {
VoltDB.crashLocalVoltDB("Failed to send every-site response.", true, e);
}
}
catch (MessagingException e) {
// hostLog.error("Failed to deliver response from execution site.", e);
else if (result == DuplicateCounter.MISMATCH) {
VoltDB.crashLocalVoltDB("HASH MISMATCH running every-site system procedure.", true, null);
}
// doing duplicate suppresion: all done.
return;
}

try {
// the initiatorHSId is the ClientInterface mailbox. Yeah. I know.
m_mailbox.send(message.getInitiatorHSId(), message);
}
catch (MessagingException e) {
// hostLog.error("Failed to deliver response from execution site.", e);
}
}

Expand Down

0 comments on commit f4f8db2

Please sign in to comment.