Skip to content

Commit

Permalink
Merge pull request #8550 from jerrinot/fix/wait-notify-wrong-target-e…
Browse files Browse the repository at this point in the history
…xception/master

Delay invocation retry during split brain
  • Loading branch information
jerrinot committed Jul 21, 2016
2 parents a46aa85 + 821e800 commit e7c3122
Show file tree
Hide file tree
Showing 44 changed files with 340 additions and 74 deletions.
Expand Up @@ -44,7 +44,7 @@ public AddAllBackupOperation(String name, Map<Long, Data> dataMap) {

@Override
public void run() throws Exception {
QueueContainer queueContainer = getOrCreateContainer();
QueueContainer queueContainer = getContainer();
queueContainer.addAllBackup(dataMap);
}

Expand Down
Expand Up @@ -50,7 +50,7 @@ public AddAllOperation(String name, Collection<Data> dataList) {

@Override
public void run() {
QueueContainer queueContainer = getOrCreateContainer();
QueueContainer queueContainer = getContainer();
if (queueContainer.hasEnoughCapacity()) {
dataMap = queueContainer.addAll(dataList);
response = true;
Expand Down Expand Up @@ -87,7 +87,7 @@ public boolean shouldNotify() {

@Override
public WaitNotifyKey getNotifiedKey() {
return getOrCreateContainer().getPollWaitNotifyKey();
return getContainer().getPollWaitNotifyKey();
}

@Override
Expand Down
Expand Up @@ -34,7 +34,7 @@ public CheckAndEvictOperation(String name) {

@Override
public void run() throws Exception {
final QueueContainer queueContainer = getOrCreateContainer();
final QueueContainer queueContainer = getContainer();
if (queueContainer.isEvictable()) {
ProxyService proxyService = getNodeEngine().getProxyService();
proxyService.destroyDistributedObject(getServiceName(), name);
Expand Down
Expand Up @@ -43,7 +43,7 @@ public ClearBackupOperation(String name, Set<Long> itemIdSet) {

@Override
public void run() throws Exception {
QueueContainer queueContainer = getOrCreateContainer();
QueueContainer queueContainer = getContainer();
queueContainer.clearBackup(itemIdSet);
response = true;
}
Expand Down
Expand Up @@ -43,7 +43,7 @@ public ClearOperation(String name) {

@Override
public void run() {
QueueContainer queueContainer = getOrCreateContainer();
QueueContainer queueContainer = getContainer();
dataMap = queueContainer.clear();
response = true;
}
Expand Down Expand Up @@ -74,7 +74,7 @@ public boolean shouldNotify() {

@Override
public WaitNotifyKey getNotifiedKey() {
return getOrCreateContainer().getOfferWaitNotifyKey();
return getContainer().getOfferWaitNotifyKey();
}

@Override
Expand Down
Expand Up @@ -43,7 +43,7 @@ public CompareAndRemoveBackupOperation(String name, Set<Long> keySet) {

@Override
public void run() throws Exception {
QueueContainer queueContainer = getOrCreateContainer();
QueueContainer queueContainer = getContainer();
queueContainer.compareAndRemoveBackup(keySet);
response = true;
}
Expand Down
Expand Up @@ -52,7 +52,7 @@ public CompareAndRemoveOperation(String name, Collection<Data> dataList, boolean

@Override
public void run() {
QueueContainer queueContainer = getOrCreateContainer();
QueueContainer queueContainer = getContainer();
dataMap = queueContainer.compareAndRemove(dataList, retain);
response = dataMap.size() > 0;
}
Expand Down Expand Up @@ -85,7 +85,7 @@ public boolean shouldNotify() {

@Override
public WaitNotifyKey getNotifiedKey() {
return getOrCreateContainer().getOfferWaitNotifyKey();
return getContainer().getOfferWaitNotifyKey();
}

@Override
Expand Down
Expand Up @@ -44,7 +44,7 @@ public ContainsOperation(String name, Collection<Data> dataList) {

@Override
public void run() throws Exception {
QueueContainer queueContainer = getOrCreateContainer();
QueueContainer queueContainer = getContainer();
response = queueContainer.contains(dataList);
}

Expand Down
Expand Up @@ -44,7 +44,7 @@ public DrainBackupOperation(String name, Set<Long> itemIdSet) {

@Override
public void run() throws Exception {
QueueContainer queueContainer = getOrCreateContainer();
QueueContainer queueContainer = getContainer();
queueContainer.drainFromBackup(itemIdSet);
}

Expand Down
Expand Up @@ -50,7 +50,7 @@ public DrainOperation(String name, int maxSize) {

@Override
public void run() throws Exception {
QueueContainer queueContainer = getOrCreateContainer();
QueueContainer queueContainer = getContainer();
dataMap = queueContainer.drain(maxSize);
response = new SerializableList(new ArrayList<Data>(dataMap.values()));
}
Expand Down Expand Up @@ -81,7 +81,7 @@ public boolean shouldNotify() {

@Override
public WaitNotifyKey getNotifiedKey() {
return getOrCreateContainer().getOfferWaitNotifyKey();
return getContainer().getOfferWaitNotifyKey();
}

@Override
Expand Down
Expand Up @@ -34,7 +34,7 @@ public IsEmptyOperation(final String name) {

@Override
public void run() throws Exception {
QueueContainer queueContainer = getOrCreateContainer();
QueueContainer queueContainer = getContainer();
response = queueContainer.size() == 0;
}

Expand Down
Expand Up @@ -38,7 +38,7 @@ public IteratorOperation(String name) {

@Override
public void run() {
QueueContainer queueContainer = getOrCreateContainer();
QueueContainer queueContainer = getContainer();
List<Data> dataList = queueContainer.getAsDataList();
response = new SerializableList(dataList);
}
Expand Down
Expand Up @@ -46,7 +46,7 @@ public OfferBackupOperation(String name, Data data, long itemId) {

@Override
public void run() throws Exception {
QueueContainer queueContainer = getOrCreateContainer();
QueueContainer queueContainer = getContainer();
queueContainer.offerBackup(data, itemId);
response = true;
}
Expand Down
Expand Up @@ -50,7 +50,7 @@ public OfferOperation(final String name, final long timeout, final Data data) {

@Override
public void run() {
QueueContainer queueContainer = getOrCreateContainer();
QueueContainer queueContainer = getContainer();
if (queueContainer.hasEnoughCapacity()) {
itemId = queueContainer.offer(data);
response = true;
Expand Down Expand Up @@ -87,17 +87,17 @@ public boolean shouldNotify() {

@Override
public WaitNotifyKey getNotifiedKey() {
return getOrCreateContainer().getPollWaitNotifyKey();
return getContainer().getPollWaitNotifyKey();
}

@Override
public WaitNotifyKey getWaitKey() {
return getOrCreateContainer().getOfferWaitNotifyKey();
return getContainer().getOfferWaitNotifyKey();
}

@Override
public boolean shouldWait() {
QueueContainer container = getOrCreateContainer();
QueueContainer container = getContainer();
return getWaitTimeout() != 0 && !container.hasEnoughCapacity();
}

Expand Down
Expand Up @@ -36,7 +36,7 @@ public PeekOperation(final String name) {

@Override
public void run() {
QueueContainer queueContainer = getOrCreateContainer();
QueueContainer queueContainer = getContainer();
QueueItem item = queueContainer.peek();
response = item != null ? item.getData() : null;
}
Expand Down
Expand Up @@ -42,7 +42,7 @@ public PollBackupOperation(String name, long itemId) {

@Override
public void run() throws Exception {
QueueContainer queueContainer = getOrCreateContainer();
QueueContainer queueContainer = getContainer();
queueContainer.pollBackup(itemId);
response = Boolean.TRUE;
}
Expand Down
Expand Up @@ -44,7 +44,7 @@ public PollOperation(String name, long timeoutMillis) {

@Override
public void run() {
QueueContainer queueContainer = getOrCreateContainer();
QueueContainer queueContainer = getContainer();
item = queueContainer.poll();
if (item != null) {
response = item.getData();
Expand Down Expand Up @@ -79,17 +79,17 @@ public boolean shouldNotify() {

@Override
public WaitNotifyKey getNotifiedKey() {
return getOrCreateContainer().getOfferWaitNotifyKey();
return getContainer().getOfferWaitNotifyKey();
}

@Override
public WaitNotifyKey getWaitKey() {
return getOrCreateContainer().getPollWaitNotifyKey();
return getContainer().getPollWaitNotifyKey();
}

@Override
public boolean shouldWait() {
return getWaitTimeout() != 0 && getOrCreateContainer().size() == 0;
return getWaitTimeout() != 0 && getContainer().size() == 0;
}

@Override
Expand Down
Expand Up @@ -37,11 +37,11 @@ protected QueueBackupAwareOperation(String name, long timeoutMillis) {

@Override
public final int getSyncBackupCount() {
return getOrCreateContainer().getConfig().getBackupCount();
return getContainer().getConfig().getBackupCount();
}

@Override
public final int getAsyncBackupCount() {
return getOrCreateContainer().getConfig().getAsyncBackupCount();
return getContainer().getConfig().getAsyncBackupCount();
}
}
Expand Up @@ -62,18 +62,19 @@ protected QueueOperation(String name, long timeoutMillis) {
setWaitTimeout(timeoutMillis);
}

protected final QueueContainer getOrCreateContainer() {
if (container == null) {
QueueService queueService = getService();
try {
container = queueService.getOrCreateContainer(name, this instanceof BackupOperation);
} catch (Exception e) {
throw new RetryableHazelcastException(e);
}
}
protected final QueueContainer getContainer() {
return container;
}

private void initializeContainer() {
QueueService queueService = getService();
try {
container = queueService.getOrCreateContainer(name, this instanceof BackupOperation);
} catch (Exception e) {
throw new RetryableHazelcastException(e);
}
}

@Override
public boolean returnsResponse() {
return true;
Expand All @@ -99,6 +100,7 @@ public void afterRun() throws Exception {

@Override
public void beforeRun() throws Exception {
initializeContainer();
}

public boolean hasListener() {
Expand Down
Expand Up @@ -34,7 +34,7 @@ public RemainingCapacityOperation(String name) {

@Override
public void run() {
QueueContainer queueContainer = getOrCreateContainer();
QueueContainer queueContainer = getContainer();
response = queueContainer.getConfig().getMaxSize() - queueContainer.size();
}

Expand Down
Expand Up @@ -42,7 +42,7 @@ public RemoveBackupOperation(String name, long itemId) {

@Override
public void run() throws Exception {
QueueContainer queueContainer = getOrCreateContainer();
QueueContainer queueContainer = getContainer();
queueContainer.removeBackup(itemId);
response = true;
}
Expand Down
Expand Up @@ -47,7 +47,7 @@ public RemoveOperation(String name, Data data) {

@Override
public void run() throws Exception {
QueueContainer queueContainer = getOrCreateContainer();
QueueContainer queueContainer = getContainer();
itemId = queueContainer.remove(data);
response = itemId != -1;
}
Expand Down Expand Up @@ -78,7 +78,7 @@ public boolean shouldNotify() {

@Override
public WaitNotifyKey getNotifiedKey() {
return getOrCreateContainer().getOfferWaitNotifyKey();
return getContainer().getOfferWaitNotifyKey();
}

@Override
Expand Down
Expand Up @@ -34,7 +34,7 @@ public SizeOperation(String name) {

@Override
public void run() {
QueueContainer queueContainer = getOrCreateContainer();
QueueContainer queueContainer = getContainer();
response = queueContainer.size();
}

Expand Down
Expand Up @@ -41,7 +41,7 @@ public QueueTransactionRollbackOperation(String name, String transactionId) {

@Override
public void run() throws Exception {
QueueContainer queueContainer = getOrCreateContainer();
QueueContainer queueContainer = getContainer();
queueContainer.rollbackTransaction(transactionId);
}

Expand Down
Expand Up @@ -91,7 +91,7 @@ public boolean shouldNotify() {

@Override
public WaitNotifyKey getNotifiedKey() {
QueueContainer queueContainer = getOrCreateContainer();
QueueContainer queueContainer = getContainer();
if (CollectionTxnUtil.isRemove(shouldNotify)) {
return queueContainer.getOfferWaitNotifyKey();
}
Expand Down
Expand Up @@ -45,7 +45,7 @@ public TxnOfferBackupOperation(String name, long itemId, Data data) {

@Override
public void run() throws Exception {
QueueContainer queueContainer = getOrCreateContainer();
QueueContainer queueContainer = getContainer();
queueContainer.txnCommitOffer(itemId, data, true);
}

Expand Down
Expand Up @@ -48,7 +48,7 @@ public TxnOfferOperation(String name, long itemId, Data data) {

@Override
public void run() throws Exception {
QueueContainer createContainer = getOrCreateContainer();
QueueContainer createContainer = getContainer();
response = createContainer.txnCommitOffer(getItemId(), data, false);
}

Expand Down Expand Up @@ -80,7 +80,7 @@ public boolean shouldNotify() {

@Override
public WaitNotifyKey getNotifiedKey() {
return getOrCreateContainer().getPollWaitNotifyKey();
return getContainer().getPollWaitNotifyKey();
}

@Override
Expand Down
Expand Up @@ -46,7 +46,7 @@ public TxnPeekOperation(String name, long timeoutMillis, long itemId, String tra

@Override
public void run() throws Exception {
QueueContainer queueContainer = getOrCreateContainer();
QueueContainer queueContainer = getContainer();
response = queueContainer.txnPeek(itemId, transactionId);
}

Expand Down Expand Up @@ -79,14 +79,14 @@ protected void readInternal(ObjectDataInput in) throws IOException {

@Override
public WaitNotifyKey getWaitKey() {
QueueContainer queueContainer = getOrCreateContainer();
QueueContainer queueContainer = getContainer();
return queueContainer.getPollWaitNotifyKey();
}


@Override
public boolean shouldWait() {
final QueueContainer queueContainer = getOrCreateContainer();
final QueueContainer queueContainer = getContainer();
return getWaitTimeout() != 0 && itemId == -1 && queueContainer.size() == 0;
}

Expand Down

0 comments on commit e7c3122

Please sign in to comment.