From c4c27524ea66a24545f4321623995cb235e25bf4 Mon Sep 17 00:00:00 2001 From: Bruce Schuchardt Date: Wed, 6 Jan 2021 09:56:54 -0800 Subject: [PATCH] GEODE-5922: concurrency problems in SerialGatewaySenderQueue (#5870) reverting 3ed37a754d789bb52cf190db23088e819955fd58 (cherry picked from commit ab16f68c7c3b121af00c3aca64a92d9809cb6019) --- .../wan/serial/SerialGatewaySenderQueue.java | 124 +++++++----------- 1 file changed, 49 insertions(+), 75 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java index cc5dee4dc1f6..332256ddc50c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java @@ -24,7 +24,6 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.logging.log4j.Logger; @@ -137,13 +136,6 @@ public class SerialGatewaySenderQueue implements RegionQueue { */ private boolean isDiskSynchronous; - /** - * The writeLock of this concurrent lock is used to protect access to the queue. - * It is implemented as a fair lock to ensure FIFO ordering of queueing attempts. - * Otherwise threads can be unfairly delayed. - */ - private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); - /** * The Map mapping the regionName->key to the queue key. This index allows fast * updating of entries in the queue for conflation. @@ -216,23 +208,18 @@ public void destroy() { } @Override - public boolean put(Object event) throws CacheException { - lock.writeLock().lock(); - try { - GatewaySenderEventImpl eventImpl = (GatewaySenderEventImpl) event; - final Region r = eventImpl.getRegion(); - final boolean isPDXRegion = - (r instanceof DistributedRegion && r.getName().equals(PeerTypeRegistration.REGION_NAME)); - final boolean isWbcl = - this.regionName.startsWith(AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX); - if (!(isPDXRegion && isWbcl)) { - putAndGetKey(event); - return true; - } - return false; - } finally { - lock.writeLock().unlock(); + public synchronized boolean put(Object event) throws CacheException { + GatewaySenderEventImpl eventImpl = (GatewaySenderEventImpl) event; + final Region r = eventImpl.getRegion(); + final boolean isPDXRegion = + (r instanceof DistributedRegion && r.getName().equals(PeerTypeRegistration.REGION_NAME)); + final boolean isWbcl = + this.regionName.startsWith(AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX); + if (!(isPDXRegion && isWbcl)) { + putAndGetKey(event); + return true; } + return false; } private long putAndGetKey(Object object) throws CacheException { @@ -256,6 +243,7 @@ private long putAndGetKey(Object object) throws CacheException { return key.longValue(); } + @Override public AsyncEvent take() throws CacheException { // Unsupported since we have no callers. @@ -277,49 +265,44 @@ public List take(int batchSize) throws CacheException { * have peeked. If the entry was not peeked, this method will silently return. */ @Override - public void remove() throws CacheException { - lock.writeLock().lock(); + public synchronized void remove() throws CacheException { + if (this.peekedIds.isEmpty()) { + return; + } + Long key = this.peekedIds.remove(); try { - if (this.peekedIds.isEmpty()) { - return; - } - Long key = this.peekedIds.remove(); - try { - // Increment the head key - updateHeadKey(key.longValue()); - removeIndex(key); - // Remove the entry at that key with a callback arg signifying it is - // a WAN queue so that AbstractRegionEntry.destroy can get the value - // even if it has been evicted to disk. In the normal case, the - // AbstractRegionEntry.destroy only gets the value in the VM. - this.region.localDestroy(key, WAN_QUEUE_TOKEN); - this.stats.decQueueSize(); - - } catch (EntryNotFoundException ok) { - // this is acceptable because the conflation can remove entries - // out from underneath us. - if (logger.isDebugEnabled()) { - logger.debug( - "{}: Did not destroy entry at {} it was not there. It should have been removed by conflation.", - this, key); - } - } - - boolean wasEmpty = this.lastDispatchedKey == this.lastDestroyedKey; - this.lastDispatchedKey = key; - if (wasEmpty) { - synchronized (this) { - notifyAll(); - } - } + // Increment the head key + updateHeadKey(key.longValue()); + removeIndex(key); + // Remove the entry at that key with a callback arg signifying it is + // a WAN queue so that AbstractRegionEntry.destroy can get the value + // even if it has been evicted to disk. In the normal case, the + // AbstractRegionEntry.destroy only gets the value in the VM. + this.region.localDestroy(key, WAN_QUEUE_TOKEN); + this.stats.decQueueSize(); + } catch (EntryNotFoundException ok) { + // this is acceptable because the conflation can remove entries + // out from underneath us. if (logger.isDebugEnabled()) { logger.debug( - "{}: Destroyed entry at key {} setting the lastDispatched Key to {}. The last destroyed entry was {}", - this, key, this.lastDispatchedKey, this.lastDestroyedKey); + "{}: Did not destroy entry at {} it was not there. It should have been removed by conflation.", + this, key); + } + } + + boolean wasEmpty = this.lastDispatchedKey == this.lastDestroyedKey; + this.lastDispatchedKey = key; + if (wasEmpty) { + synchronized (this) { + notifyAll(); } - } finally { - lock.writeLock().unlock(); + } + + if (logger.isDebugEnabled()) { + logger.debug( + "{}: Destroyed entry at key {} setting the lastDispatched Key to {}. The last destroyed entry was {}", + this, key, this.lastDispatchedKey, this.lastDestroyedKey); } } @@ -460,8 +443,7 @@ private boolean removeOldEntry(Conflatable object, Long tailKey) throws CacheExc Object key = object.getKeyToConflate(); Long previousIndex; - lock.writeLock().lock(); - try { + synchronized (this) { Map latestIndexesForRegion = this.indexes.get(rName); if (latestIndexesForRegion == null) { latestIndexesForRegion = new HashMap(); @@ -469,8 +451,6 @@ private boolean removeOldEntry(Conflatable object, Long tailKey) throws CacheExc } previousIndex = latestIndexesForRegion.put(key, tailKey); - } finally { - lock.writeLock().unlock(); } if (isDebugEnabled) { @@ -551,7 +531,7 @@ private AsyncEvent optimalGet(Long k) { } /* - * this must be invoked with lock.writeLock() held + * this must be invoked under synchronization */ private void removeIndex(Long qkey) { // Determine whether conflation is enabled for this queue and object @@ -739,8 +719,7 @@ private void initializeKeys() throws CacheException { if (tailKey.get() != -1) { return; } - lock.writeLock().lock(); - try { + synchronized (this) { long largestKey = -1; long largestKeyLessThanHalfMax = -1; long smallestKey = -1; @@ -788,8 +767,6 @@ private void initializeKeys() throws CacheException { logger.debug("{}: Initialized tail key to: {}, head key to: {}", this, this.tailKey, this.headKey); } - } finally { - lock.writeLock().unlock(); } } @@ -1027,8 +1004,7 @@ public void run() { } long temp; - lock.writeLock().lock(); - try { + synchronized (SerialGatewaySenderQueue.this) { temp = lastDispatchedKey; boolean wasEmpty = temp == lastDestroyedKey; while (lastDispatchedKey == lastDestroyedKey) { @@ -1037,8 +1013,6 @@ public void run() { } if (wasEmpty) continue; - } finally { - lock.writeLock().unlock(); } // release not needed since disallowOffHeapValues called EntryEventImpl event = EntryEventImpl.create((LocalRegion) region, Operation.DESTROY,