-
Notifications
You must be signed in to change notification settings - Fork 683
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GEODE-5828: fixes replicates miss transaction commit. #2571
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -92,7 +92,6 @@ public class TXCommitMessage extends PooledDistributionMessage | |
protected int processorId; // 0 unless needsAck is true | ||
protected TXLockIdImpl lockId; | ||
protected HashSet farSiders; | ||
|
||
protected transient DistributionManager dm; // Used on the sending side of this message | ||
private transient int sequenceNum = 0; | ||
|
||
|
@@ -1837,7 +1836,7 @@ public CommitProcessQueryMessage(Object trackerKey, int processorId) { | |
|
||
@Override | ||
protected void process(ClusterDistributionManager dm) { | ||
final boolean processMsgReceived = txTracker.commitProcessReceived(this.trackerKey, dm); | ||
final boolean processMsgReceived = txTracker.commitProcessReceived(this.trackerKey); | ||
if (!processMsgReceived) { | ||
if (logger.isDebugEnabled()) { | ||
logger.debug("CommitProcessQuery did not find {} in the history", this.trackerKey); | ||
|
@@ -1979,7 +1978,7 @@ public void memberDeparted(DistributionManager distributionManager, | |
if (!getSender().equals(id)) { | ||
return; | ||
} | ||
this.dm.removeMembershipListener(this); | ||
getDistributionManager().removeMembershipListener(this); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it would be better to just use the "distributionManager" parameter. At some point we may find that this class no longer needs to keep a reference to the dm in an instance field. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will address this concern with additional dunit test. |
||
|
||
synchronized (this) { | ||
if (isProcessing() || this.departureNoticed) { | ||
|
@@ -1996,7 +1995,7 @@ public void memberDeparted(DistributionManager distributionManager, | |
|
||
// Send message to fellow FarSiders (aka recipients), if any, to | ||
// determine if any one of them have received a CommitProcessMessage | ||
if (this.farSiders != null && !this.farSiders.isEmpty()) { | ||
if (getFarSiders() != null && !getFarSiders().isEmpty()) { | ||
if (logger.isDebugEnabled()) { | ||
logger.debug( | ||
"Member departed: {} sending query for CommitProcess message to other recipients.", id); | ||
|
@@ -2009,66 +2008,7 @@ public void memberDeparted(DistributionManager distributionManager, | |
// sized thread pool | ||
@Override | ||
public void run() { | ||
final TXCommitMessage mess = TXCommitMessage.this; | ||
Object trackerKey = mess.getTrackerKey(); | ||
DistributedMember member = getMemberFromTrackerKey(trackerKey); | ||
if (!mess.getSender().equals(member)) { | ||
/* | ||
* Do not send a CommitProcessQueryMessage when the sender of CommitMessage is not the | ||
* member in the tracker key. (If this happens we are the redundant node for PR, and the | ||
* primary just crashed). | ||
*/ | ||
txTracker.removeMessage(mess); | ||
return; | ||
} | ||
CommitProcessQueryReplyProcessor replProc = | ||
new CommitProcessQueryReplyProcessor(mess.dm, mess.farSiders); | ||
CommitProcessQueryMessage query = | ||
new CommitProcessQueryMessage(mess.getTrackerKey(), replProc.getProcessorId()); | ||
query.setRecipients(mess.farSiders); | ||
mess.dm.putOutgoing(query); | ||
// Wait for any one positive response or all negative responses. | ||
// (while() loop removed for bug 36983 - you can't loop on waitForReplies() | ||
TXCommitMessage.this.dm.getCancelCriterion().checkCancelInProgress(null); | ||
try { | ||
replProc.waitForRepliesUninterruptibly(); | ||
} catch (ReplyException e) { | ||
e.handleCause(); | ||
} | ||
if (replProc.receivedACommitProcessMessage()) { | ||
if (logger.isDebugEnabled()) { | ||
logger.debug( | ||
"Transaction associated with lockID: {} from orign {} is processing due to a received \"commit process\" message", | ||
mess.lockId, id); | ||
} | ||
|
||
try { | ||
// Set processor to zero to avoid the ack to the now departed origin | ||
mess.processorId = 0; | ||
mess.basicProcess(); | ||
} finally { | ||
txTracker.processed(mess); | ||
} | ||
} else { | ||
if (logger.isDebugEnabled()) { | ||
logger.debug( | ||
"Transaction associated with lockID: {} from origin {} ignored. No other recipients received \"commit process\" message", | ||
mess.lockId, id); | ||
} | ||
txTracker.removeMessage(mess); | ||
} | ||
|
||
} | ||
|
||
private DistributedMember getMemberFromTrackerKey(Object trackerKey) { | ||
if (trackerKey instanceof TXId) { | ||
TXId id1 = (TXId) trackerKey; | ||
return id1.getMemberId(); | ||
} else if (trackerKey instanceof TXLockId) { | ||
TXLockId id2 = (TXLockId) trackerKey; | ||
return id2.getMemberId(); | ||
} | ||
return null; | ||
doCommitProcessQuery(id); | ||
} | ||
}; | ||
fellowFarSidersQuery.setDaemon(true); | ||
|
@@ -2100,6 +2040,70 @@ public void run() { | |
} | ||
} | ||
|
||
HashSet getFarSiders() { | ||
return farSiders; | ||
} | ||
|
||
DistributionManager getDistributionManager() { | ||
return dm; | ||
} | ||
|
||
void doCommitProcessQuery(final InternalDistributedMember id) { | ||
CommitProcessQueryReplyProcessor replyProcessor = createReplyProcessor(); | ||
CommitProcessQueryMessage queryMessage = createQueryMessage(replyProcessor); | ||
queryMessage.setRecipients(this.farSiders); | ||
getDistributionManager().putOutgoing(queryMessage); | ||
// Wait for any one positive response or all negative responses. | ||
// (while() loop removed for bug 36983 - you can't loop on waitForReplies() | ||
getDistributionManager().getCancelCriterion().checkCancelInProgress(null); | ||
try { | ||
replyProcessor.waitForRepliesUninterruptibly(); | ||
} catch (ReplyException e) { | ||
e.handleCause(); | ||
} | ||
if (replyProcessor.receivedACommitProcessMessage()) { | ||
if (logger.isDebugEnabled()) { | ||
logger.debug( | ||
"Transaction associated with lockID: {} from orign {} is processing due to a received \"commit process\" message", | ||
lockId, id); | ||
} | ||
|
||
try { | ||
// Set processor to zero to avoid the ack to the now departed origin | ||
processorId = 0; | ||
basicProcess(); | ||
} finally { | ||
txTracker.processed(this); | ||
} | ||
} else { | ||
if (logger.isDebugEnabled()) { | ||
logger.debug( | ||
"Transaction associated with lockID: {} from origin {} ignored. No other recipients received \"commit process\" message", | ||
lockId, id); | ||
} | ||
txTracker.removeMessage(this); | ||
} | ||
} | ||
|
||
CommitProcessQueryReplyProcessor createReplyProcessor() { | ||
return new CommitProcessQueryReplyProcessor(dm, farSiders); | ||
} | ||
|
||
CommitProcessQueryMessage createQueryMessage(CommitProcessQueryReplyProcessor replyProcessor) { | ||
return new CommitProcessQueryMessage(getTrackerKey(), replyProcessor.getProcessorId()); | ||
} | ||
|
||
private DistributedMember getMemberFromTrackerKey(Object trackerKey) { | ||
if (trackerKey instanceof TXId) { | ||
TXId id1 = (TXId) trackerKey; | ||
return id1.getMemberId(); | ||
} else if (trackerKey instanceof TXLockId) { | ||
TXLockId id2 = (TXLockId) trackerKey; | ||
return id2.getMemberId(); | ||
} | ||
return null; | ||
} | ||
|
||
void setUpdateLockMembers() { | ||
this.lockNeedsUpdate = true; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license | ||
* agreements. See the NOTICE file distributed with this work for additional information regarding | ||
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance with the License. You may obtain a | ||
* copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License | ||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express | ||
* or implied. See the License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
package org.apache.geode.internal.cache; | ||
|
||
import static org.mockito.Mockito.doReturn; | ||
import static org.mockito.Mockito.mock; | ||
import static org.mockito.Mockito.spy; | ||
import static org.mockito.Mockito.timeout; | ||
import static org.mockito.Mockito.verify; | ||
import static org.mockito.Mockito.when; | ||
|
||
import java.util.HashSet; | ||
|
||
import org.junit.Test; | ||
|
||
import org.apache.geode.CancelCriterion; | ||
import org.apache.geode.distributed.internal.DistributionManager; | ||
import org.apache.geode.distributed.internal.membership.InternalDistributedMember; | ||
|
||
public class TXCommitMessageTest { | ||
|
||
@Test | ||
public void commitProcessQueryMessageIsSentIfHostDeparted() { | ||
DistributionManager manager = mock(DistributionManager.class); | ||
InternalDistributedMember member = mock(InternalDistributedMember.class); | ||
DistributionManager dm = mock(DistributionManager.class); | ||
TXCommitMessage.CommitProcessQueryReplyProcessor processor = mock( | ||
TXCommitMessage.CommitProcessQueryReplyProcessor.class); | ||
TXCommitMessage.CommitProcessQueryMessage queryMessage = | ||
mock(TXCommitMessage.CommitProcessQueryMessage.class); | ||
HashSet farSiders = mock(HashSet.class); | ||
TXCommitMessage message = spy(new TXCommitMessage()); | ||
doReturn(dm).when(message).getDistributionManager(); | ||
when(dm.getCancelCriterion()).thenReturn(mock(CancelCriterion.class)); | ||
doReturn(member).when(message).getSender(); | ||
doReturn(false).when(message).isProcessing(); | ||
doReturn(processor).when(message).createReplyProcessor(); | ||
doReturn(farSiders).when(message).getFarSiders(); | ||
doReturn(queryMessage).when(message).createQueryMessage(processor); | ||
when(farSiders.isEmpty()).thenReturn(false); | ||
|
||
message.memberDeparted(manager, member, false); | ||
|
||
verify(dm, timeout(60000)).putOutgoing(queryMessage); | ||
verify(processor, timeout(60000)).waitForRepliesUninterruptibly(); | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the old code was better that passed along the "dm" parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
txTracker.commitProcessReceived method no longer need distribution manager anymore.