Skip to content

Commit

Permalink
GEODE-5922: concurrency problems in SerialGatewaySenderQueue (#5870)
Browse files Browse the repository at this point in the history
reverting 3ed37a7

(cherry picked from commit ab16f68)
  • Loading branch information
bschuchardt committed Jan 6, 2021
1 parent ec21170 commit 2dfef38
Showing 1 changed file with 49 additions and 75 deletions.
Expand Up @@ -25,7 +25,6 @@
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -140,13 +139,6 @@ public class SerialGatewaySenderQueue implements RegionQueue {
*/
private boolean isDiskSynchronous;

/**
* The writeLock of this concurrent lock is used to protect access to the queue.
* It is implemented as a fair lock to ensure FIFO ordering of queueing attempts.
* Otherwise threads can be unfairly delayed.
*/
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);

/**
* The <code>Map</code> mapping the regionName->key to the queue key. This index allows fast
* updating of entries in the queue for conflation.
Expand Down Expand Up @@ -219,23 +211,18 @@ public void destroy() {
}

@Override
public boolean put(Object event) throws CacheException {
lock.writeLock().lock();
try {
GatewaySenderEventImpl eventImpl = (GatewaySenderEventImpl) event;
final Region r = eventImpl.getRegion();
final boolean isPDXRegion =
(r instanceof DistributedRegion && r.getName().equals(PeerTypeRegistration.REGION_NAME));
final boolean isWbcl =
this.regionName.startsWith(AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX);
if (!(isPDXRegion && isWbcl)) {
putAndGetKey(event);
return true;
}
return false;
} finally {
lock.writeLock().unlock();
public synchronized boolean put(Object event) throws CacheException {
GatewaySenderEventImpl eventImpl = (GatewaySenderEventImpl) event;
final Region r = eventImpl.getRegion();
final boolean isPDXRegion =
(r instanceof DistributedRegion && r.getName().equals(PeerTypeRegistration.REGION_NAME));
final boolean isWbcl =
this.regionName.startsWith(AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX);
if (!(isPDXRegion && isWbcl)) {
putAndGetKey(event);
return true;
}
return false;
}

private long putAndGetKey(Object object) throws CacheException {
Expand All @@ -259,6 +246,7 @@ private long putAndGetKey(Object object) throws CacheException {
return key.longValue();
}


@Override
public AsyncEvent take() throws CacheException {
// Unsupported since we have no callers.
Expand All @@ -280,49 +268,44 @@ public List<AsyncEvent> take(int batchSize) throws CacheException {
* have peeked. If the entry was not peeked, this method will silently return.
*/
@Override
public void remove() throws CacheException {
lock.writeLock().lock();
public synchronized void remove() throws CacheException {
if (this.peekedIds.isEmpty()) {
return;
}
Long key = this.peekedIds.remove();
try {
if (this.peekedIds.isEmpty()) {
return;
}
Long key = this.peekedIds.remove();
try {
// Increment the head key
updateHeadKey(key.longValue());
removeIndex(key);
// Remove the entry at that key with a callback arg signifying it is
// a WAN queue so that AbstractRegionEntry.destroy can get the value
// even if it has been evicted to disk. In the normal case, the
// AbstractRegionEntry.destroy only gets the value in the VM.
this.region.localDestroy(key, WAN_QUEUE_TOKEN);
this.stats.decQueueSize();

} catch (EntryNotFoundException ok) {
// this is acceptable because the conflation can remove entries
// out from underneath us.
if (logger.isDebugEnabled()) {
logger.debug(
"{}: Did not destroy entry at {} it was not there. It should have been removed by conflation.",
this, key);
}
}

boolean wasEmpty = this.lastDispatchedKey == this.lastDestroyedKey;
this.lastDispatchedKey = key;
if (wasEmpty) {
synchronized (this) {
notifyAll();
}
}
// Increment the head key
updateHeadKey(key.longValue());
removeIndex(key);
// Remove the entry at that key with a callback arg signifying it is
// a WAN queue so that AbstractRegionEntry.destroy can get the value
// even if it has been evicted to disk. In the normal case, the
// AbstractRegionEntry.destroy only gets the value in the VM.
this.region.localDestroy(key, WAN_QUEUE_TOKEN);
this.stats.decQueueSize();

} catch (EntryNotFoundException ok) {
// this is acceptable because the conflation can remove entries
// out from underneath us.
if (logger.isDebugEnabled()) {
logger.debug(
"{}: Destroyed entry at key {} setting the lastDispatched Key to {}. The last destroyed entry was {}",
this, key, this.lastDispatchedKey, this.lastDestroyedKey);
"{}: Did not destroy entry at {} it was not there. It should have been removed by conflation.",
this, key);
}
}

boolean wasEmpty = this.lastDispatchedKey == this.lastDestroyedKey;
this.lastDispatchedKey = key;
if (wasEmpty) {
synchronized (this) {
notifyAll();
}
} finally {
lock.writeLock().unlock();
}

if (logger.isDebugEnabled()) {
logger.debug(
"{}: Destroyed entry at key {} setting the lastDispatched Key to {}. The last destroyed entry was {}",
this, key, this.lastDispatchedKey, this.lastDestroyedKey);
}
}

Expand Down Expand Up @@ -463,17 +446,14 @@ private boolean removeOldEntry(Conflatable object, Long tailKey) throws CacheExc
Object key = object.getKeyToConflate();
Long previousIndex;

lock.writeLock().lock();
try {
synchronized (this) {
Map<Object, Long> latestIndexesForRegion = this.indexes.get(rName);
if (latestIndexesForRegion == null) {
latestIndexesForRegion = new HashMap<Object, Long>();
this.indexes.put(rName, latestIndexesForRegion);
}

previousIndex = latestIndexesForRegion.put(key, tailKey);
} finally {
lock.writeLock().unlock();
}

if (isDebugEnabled) {
Expand Down Expand Up @@ -554,7 +534,7 @@ private AsyncEvent optimalGet(Long k) {
}

/*
* this must be invoked with lock.writeLock() held
* this must be invoked under synchronization
*/
private void removeIndex(Long qkey) {
// Determine whether conflation is enabled for this queue and object
Expand Down Expand Up @@ -742,8 +722,7 @@ private void initializeKeys() throws CacheException {
if (tailKey.get() != -1) {
return;
}
lock.writeLock().lock();
try {
synchronized (this) {
long largestKey = -1;
long largestKeyLessThanHalfMax = -1;
long smallestKey = -1;
Expand Down Expand Up @@ -791,8 +770,6 @@ private void initializeKeys() throws CacheException {
logger.debug("{}: Initialized tail key to: {}, head key to: {}", this, this.tailKey,
this.headKey);
}
} finally {
lock.writeLock().unlock();
}
}

Expand Down Expand Up @@ -1031,8 +1008,7 @@ public void run() {
}

long temp;
lock.writeLock().lock();
try {
synchronized (SerialGatewaySenderQueue.this) {
temp = lastDispatchedKey;
boolean wasEmpty = temp == lastDestroyedKey;
while (lastDispatchedKey == lastDestroyedKey) {
Expand All @@ -1041,8 +1017,6 @@ public void run() {
}
if (wasEmpty)
continue;
} finally {
lock.writeLock().unlock();
}
// release not needed since disallowOffHeapValues called
EntryEventImpl event = EntryEventImpl.create((LocalRegion) region, Operation.DESTROY,
Expand Down

0 comments on commit 2dfef38

Please sign in to comment.