From 5fa6ca55288c4efb83b07fdd77b80941bc74de47 Mon Sep 17 00:00:00 2001 From: zhouxh Date: Thu, 9 Nov 2017 23:49:29 -0800 Subject: [PATCH 1/4] GEODE-3967: if put hits concurrent modification exception should still notify serial gateway sender GEODE-3967: notifyTimestampsToGateways should inherit isConcurrencyConflict GEODE-3967: add to secondary event isConcurrencyConflict --- .../internal/cache/AbstractRegionMap.java | 3 +++ .../internal/cache/DestroyOperation.java | 3 --- .../cache/DistributedCacheOperation.java | 15 +++++++++++- .../geode/internal/cache/LocalRegion.java | 19 +++++++++++---- .../internal/cache/LocalRegionDataView.java | 9 ++++++++ .../AbstractGatewaySenderEventProcessor.java | 15 ++++++++---- .../cache/wan/GatewaySenderEventImpl.java | 3 +++ .../SerialGatewaySenderEventProcessor.java | 23 +++++++++++++++++++ 8 files changed, 76 insertions(+), 14 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java index 404488b36b28..75d8484fcfdf 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java @@ -1183,6 +1183,7 @@ public boolean destroy(EntryEventImpl event, boolean inTokenMode, boolean during true/* conflict with clear */, duringRI, true); doPart3 = true; } catch (ConcurrentCacheModificationException ccme) { + event.isConcurrencyConflict(true); VersionTag tag = event.getVersionTag(); if (tag != null && tag.isTimeStampUpdated()) { // Notify gateways of new time-stamp. @@ -2092,6 +2093,7 @@ public boolean invalidate(EntryEventImpl event, boolean invokeCallbacks, boolean } } // !opCompleted } catch (ConcurrentCacheModificationException ccme) { + event.isConcurrencyConflict(true); VersionTag tag = event.getVersionTag(); if (tag != null && tag.isTimeStampUpdated()) { // Notify gateways of new time-stamp. @@ -2849,6 +2851,7 @@ public RegionEntry basicPut(EntryEventImpl event, final long lastModified, final clearOccured = true; owner.recordEvent(event); } catch (ConcurrentCacheModificationException ccme) { + event.isConcurrencyConflict(true); VersionTag tag = event.getVersionTag(); if (tag != null && tag.isTimeStampUpdated()) { // Notify gateways of new time-stamp. diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java index 0d2dc7f70a76..7870f58dc891 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java @@ -95,9 +95,6 @@ protected boolean operateOnRegion(CacheEvent event, ClusterDistributionManager d } catch (EntryNotFoundException e) { dispatchElidedEvent(rgn, ev); - if (!ev.isConcurrencyConflict()) { - rgn.notifyGatewaySender(EnumListenerEvent.AFTER_DESTROY, ev); - } throw e; } catch (CacheWriterException e) { throw new Error( diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java index 908bf836c806..ddde23afd99a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java @@ -1289,11 +1289,24 @@ public void checkVersionIsRecorded(VersionTag tag, LocalRegion r) { */ protected void dispatchElidedEvent(LocalRegion rgn, EntryEventImpl ev) { if (logger.isDebugEnabled()) { - logger.debug("dispatching elided event: {}", ev); + logger.debug("GGG:dispatching elided event: {}", ev, new Exception()); } ev.isConcurrencyConflict(true); rgn.generateLocalFilterRouting(ev); rgn.notifyBridgeClients(ev); + rgn.notifyGatewaySender(getOperation(ev), ev); + } + + private EnumListenerEvent getOperation(EntryEventImpl ev) { + if (ev.getOperation().isInvalidate()) { + return EnumListenerEvent.AFTER_INVALIDATE; + } else if (ev.getOperation().isDestroy()) { + return EnumListenerEvent.AFTER_DESTROY; + } else if (ev.getOperation().isUpdate()) { + return EnumListenerEvent.AFTER_UPDATE; + } else { + return EnumListenerEvent.AFTER_CREATE; + } } protected abstract InternalCacheEvent createEvent(DistributedRegion rgn) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java index aa0f8c68c2da..38f74ec46293 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java @@ -2853,6 +2853,8 @@ protected Object findObjectInSystem(KeyInfo keyInfo, boolean isCreate, TXStateIn logger.debug("caught concurrent modification attempt when applying {}", event); } notifyBridgeClients(event); + notifyGatewaySender(event.getOperation().isUpdate() ? EnumListenerEvent.AFTER_UPDATE + : EnumListenerEvent.AFTER_CREATE, event); } if (!getDataView().isDeferredStats()) { getCachePerfStats().endPut(startPut, event.isOriginRemote()); @@ -5630,6 +5632,9 @@ boolean virtualPut(final EntryEventImpl event, final boolean ifNew, final boolea logger.debug("caught concurrent modification attempt when applying {}", event); } notifyBridgeClients(event); + notifyGatewaySender(event.getOperation().isUpdate() ? EnumListenerEvent.AFTER_UPDATE + : EnumListenerEvent.AFTER_CREATE, event); + return false; } @@ -5856,6 +5861,9 @@ void notifyTimestampsToGateways(EntryEventImpl event) { updateTimeStampEvent.setGenerateCallbacks(false); updateTimeStampEvent.distributedMember = event.getDistributedMember(); updateTimeStampEvent.setNewEventId(getSystem()); + if (event.isConcurrencyConflict()) { + updateTimeStampEvent.isConcurrencyConflict(true); + } if (event.getRegion() instanceof BucketRegion) { BucketRegion bucketRegion = (BucketRegion) event.getRegion(); @@ -6117,8 +6125,7 @@ public boolean notifiesMultipleSerialGateways() { } protected void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) { - if (isPdxTypesRegion() || event.isConcurrencyConflict()) { - // isConcurrencyConflict is usually a concurrent cache modification problem + if (isPdxTypesRegion()) { return; } @@ -6142,9 +6149,10 @@ protected void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl e if (allRemoteDSIds != null) { for (GatewaySender sender : getCache().getAllGatewaySenders()) { if (allGatewaySenderIds.contains(sender.getId())) { - // TODO: This is a BUG. Why return and not continue? - if (!this.getDataPolicy().withStorage() && sender.isParallel()) { - return; + // if isConcurrencyConflict is true, only notify serial gateway sender + if ((!this.getDataPolicy().withStorage() || event.isConcurrencyConflict()) + && sender.isParallel()) { + continue; } if (logger.isDebugEnabled()) { logger.debug("Notifying the GatewaySender : {}", sender.getId()); @@ -6503,6 +6511,7 @@ private boolean mapDestroy(final EntryEventImpl event, final boolean cacheWrite, if (event.getVersionTag() != null && !event.getVersionTag().isGatewayTag()) { notifyBridgeClients(event); } + notifyGatewaySender(EnumListenerEvent.AFTER_DESTROY, event); return true; // event was elided } catch (DiskAccessException dae) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java index eed61763b9cf..b68859e2919f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java @@ -25,6 +25,7 @@ import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList; import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException; +import org.apache.geode.internal.logging.LogService; /** * @@ -71,6 +72,10 @@ public void invalidateExistingEntry(EntryEventImpl event, boolean invokeCallback } catch (ConcurrentCacheModificationException e) { // a newer event has already been applied to the cache. this can happen // in a client cache if another thread is operating on the same key + event.isConcurrencyConflict(true); + LocalRegion lr = event.getLocalRegion(); + LogService.getLogger().info("GGG:invalidateExistingEntry:" + event, new Exception()); + // lr.notifyGatewaySender(EnumListenerEvent.AFTER_INVALIDATE, event); } } @@ -81,6 +86,10 @@ public void updateEntryVersion(EntryEventImpl event) throws EntryNotFoundExcepti } catch (ConcurrentCacheModificationException e) { // a later in time event has already been applied to the cache. this can happen // in a cache if another thread is operating on the same key + event.isConcurrencyConflict(true); + LocalRegion lr = event.getLocalRegion(); + LogService.getLogger().info("GGG:updateEntryVersion:" + event, new Exception()); + // lr.notifyGatewaySender(EnumListenerEvent.TIMESTAMP_UPDATE, event); } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index 7a2cee19b482..a55787559e9a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -518,16 +518,21 @@ protected void processQueue() { // version is < 7.0.1, especially to prevent another loop over events. if (!sendUpdateVersionEvents && event.getOperation() == Operation.UPDATE_VERSION_STAMP) { - if (isTraceEnabled) { - logger.trace( - "Update Event Version event: {} removed from Gateway Sender queue: {}", event, - sender); - } + logger.debug("Update Event Version event: {} removed from Gateway Sender queue: {}", + event, sender); itr.remove(); statistics.incEventsNotQueued(); continue; } + if (((GatewaySenderEventImpl) event).isConcurrencyConflict) { + if (isDebugEnabled) { + logger.debug("primary should ignore the concurrency conflict event:" + event); + } + itr.remove(); + statistics.incEventsNotQueued(); + continue; + } boolean transmit = filter.beforeTransmit(event); if (!transmit) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java index 5b1ba54e6a80..468dca2c5bda 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java @@ -171,6 +171,8 @@ public class GatewaySenderEventImpl protected boolean isInitialized; + public boolean isConcurrencyConflict = false; + /** * Is this thread in the process of serializing this event? */ @@ -312,6 +314,7 @@ public GatewaySenderEventImpl(EnumListenerEvent operation, CacheEvent ce, Object if (initialize) { initialize(); } + this.isConcurrencyConflict = event.isConcurrencyConflict(); } /** diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java index 734b5605d17b..995007d1c79b 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java @@ -291,6 +291,10 @@ protected void handleFailover() { if (o != null && o instanceof GatewaySenderEventImpl) { GatewaySenderEventImpl ge = (GatewaySenderEventImpl) o; EventWrapper unprocessedEvent = this.unprocessedEvents.remove(ge.getEventId()); + if (unprocessedEvent != null && ge.isConcurrencyConflict) { + logger.info( + "GGG:secondary after removed by handleFailover:" + unprocessedEvent + ":" + ge); + } if (unprocessedEvent != null) { unprocessedEvent.event.release(); if (this.unprocessedEvents.isEmpty()) { @@ -379,6 +383,7 @@ private void releaseUnprocessedEvents() { if (m != null) { for (EventWrapper ew : m.values()) { GatewaySenderEventImpl gatewayEvent = ew.event; + logger.info("GGG:releaseUnprocessedEvents:" + gatewayEvent); gatewayEvent.release(); } this.unprocessedEvents = null; @@ -632,6 +637,10 @@ protected void basicHandlePrimaryDestroy(final GatewaySenderEventImpl gatewayEve return; // now we can safely use the unprocessedEvents field EventWrapper ew = this.unprocessedEvents.remove(gatewayEvent.getEventId()); + if (ew != null && gatewayEvent.isConcurrencyConflict) { + logger.info("GGG:secondary after removed by destroy listener:" + ew + ":" + gatewayEvent); + } + if (ew != null) { ew.event.release(); statistics.incUnprocessedEventsRemovedByPrimary(); @@ -651,8 +660,16 @@ protected void basicHandlePrimaryEvent(final GatewaySenderEventImpl gatewayEvent return; // now we can safely use the unprocessedEvents field EventWrapper ew = this.unprocessedEvents.remove(gatewayEvent.getEventId()); + if (ew != null && gatewayEvent.isConcurrencyConflict) { + logger.info("GGG:secondary after removed by create listener:" + ew + ":" + gatewayEvent, + new Exception()); + } if (ew == null) { + if (gatewayEvent.isConcurrencyConflict) { + logger.info("GGG:secondary before add to by create listener:" + gatewayEvent, + new Exception()); + } // first time for the event if (logger.isTraceEnabled()) { logger.trace("{}: fromPrimary event {} : {}->{} added to unprocessed token map", @@ -711,8 +728,14 @@ private void basicHandleSecondaryEvent(final GatewaySenderEventImpl gatewayEvent // @todo add an assertion that !getPrimary() // now we can safely use the unprocessedEvents field Long v = this.unprocessedTokens.remove(gatewayEvent.getEventId()); + if (v != null && gatewayEvent.isConcurrencyConflict) { + logger.info("GGG:secondary after removed token:" + v + ":" + gatewayEvent); + } if (v == null) { + if (gatewayEvent.isConcurrencyConflict) { + logger.info("GGG:secondary before add to:" + gatewayEvent, new Exception()); + } // first time for the event if (logger.isTraceEnabled()) { logger.trace("{}: fromSecondary event {}:{}->{} added from unprocessed events map", From c591e948d37e8ee22f2d1512e74e8a552f6657f6 Mon Sep 17 00:00:00 2001 From: zhouxh Date: Fri, 12 Jan 2018 18:16:45 -0800 Subject: [PATCH 2/4] GEODE=3967: add more trace --- .../apache/geode/internal/cache/wan/GatewaySenderEventImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java index 468dca2c5bda..f2b7072e5441 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java @@ -780,7 +780,7 @@ public String toString() { .append(";creationTime=").append(this.creationTime).append(";shadowKey= ") .append(this.shadowKey).append(";timeStamp=").append(this.versionTimeStamp) .append(";acked=").append(this.isAcked).append(";dispatched=").append(this.isDispatched) - .append(";bucketId=").append(this.bucketId).append("]"); + .append(";bucketId=").append(this.bucketId).append(this.isConcurrencyConflict).append("]"); return buffer.toString(); } From b39ff950b3e5fe2c112812e4b842f359803c637d Mon Sep 17 00:00:00 2001 From: zhouxh Date: Tue, 16 Jan 2018 23:45:06 -0800 Subject: [PATCH 3/4] GEM-3967: change trace to debug --- .../wan/serial/SerialGatewaySenderEventProcessor.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java index 995007d1c79b..25efcc5670b2 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java @@ -728,7 +728,7 @@ private void basicHandleSecondaryEvent(final GatewaySenderEventImpl gatewayEvent // @todo add an assertion that !getPrimary() // now we can safely use the unprocessedEvents field Long v = this.unprocessedTokens.remove(gatewayEvent.getEventId()); - if (v != null && gatewayEvent.isConcurrencyConflict) { + if (v != null) { logger.info("GGG:secondary after removed token:" + v + ":" + gatewayEvent); } @@ -737,8 +737,8 @@ private void basicHandleSecondaryEvent(final GatewaySenderEventImpl gatewayEvent logger.info("GGG:secondary before add to:" + gatewayEvent, new Exception()); } // first time for the event - if (logger.isTraceEnabled()) { - logger.trace("{}: fromSecondary event {}:{}->{} added from unprocessed events map", + if (logger.isDebugEnabled()) { + logger.debug("{}: fromSecondary event {}:{}->{} added from unprocessed events map", sender.getId(), gatewayEvent.getEventId(), gatewayEvent.getKey(), gatewayEvent.getValueAsString(true)); } @@ -759,8 +759,8 @@ private void basicHandleSecondaryEvent(final GatewaySenderEventImpl gatewayEvent } } else { // token already added by primary already removed - if (logger.isTraceEnabled()) { - logger.trace("{}: Secondary created event {}:{}->{} removed from unprocessed events map", + if (logger.isDebugEnabled()) { + logger.debug("{}: Secondary created event {}:{}->{} removed from unprocessed events map", sender.getId(), gatewayEvent.getEventId(), gatewayEvent.getKey(), gatewayEvent.getValueAsString(true)); } From 39ecfa9b5f7018c6c29ec6a8d1861f896bea6b58 Mon Sep 17 00:00:00 2001 From: zhouxh Date: Thu, 18 Jan 2018 15:11:31 -0800 Subject: [PATCH 4/4] GEODE-3967: update the log --- .../internal/cache/DestroyOperation.java | 3 ++ .../cache/DistributedCacheOperation.java | 30 +++++++++---------- .../internal/cache/LocalRegionDataView.java | 9 ------ .../AbstractGatewaySenderEventProcessor.java | 15 ++++------ .../cache/wan/GatewaySenderEventImpl.java | 3 +- .../SerialGatewaySenderEventProcessor.java | 2 +- 6 files changed, 26 insertions(+), 36 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java index 7870f58dc891..0d2dc7f70a76 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java @@ -95,6 +95,9 @@ protected boolean operateOnRegion(CacheEvent event, ClusterDistributionManager d } catch (EntryNotFoundException e) { dispatchElidedEvent(rgn, ev); + if (!ev.isConcurrencyConflict()) { + rgn.notifyGatewaySender(EnumListenerEvent.AFTER_DESTROY, ev); + } throw e; } catch (CacheWriterException e) { throw new Error( diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java index ddde23afd99a..6a45cf93bf4b 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java @@ -1289,25 +1289,25 @@ public void checkVersionIsRecorded(VersionTag tag, LocalRegion r) { */ protected void dispatchElidedEvent(LocalRegion rgn, EntryEventImpl ev) { if (logger.isDebugEnabled()) { - logger.debug("GGG:dispatching elided event: {}", ev, new Exception()); + logger.debug("dispatching elided event: {}", ev); } ev.isConcurrencyConflict(true); rgn.generateLocalFilterRouting(ev); rgn.notifyBridgeClients(ev); - rgn.notifyGatewaySender(getOperation(ev), ev); - } - - private EnumListenerEvent getOperation(EntryEventImpl ev) { - if (ev.getOperation().isInvalidate()) { - return EnumListenerEvent.AFTER_INVALIDATE; - } else if (ev.getOperation().isDestroy()) { - return EnumListenerEvent.AFTER_DESTROY; - } else if (ev.getOperation().isUpdate()) { - return EnumListenerEvent.AFTER_UPDATE; - } else { - return EnumListenerEvent.AFTER_CREATE; - } - } + // rgn.notifyGatewaySender(getOperation(ev), ev); + } + + // private EnumListenerEvent getOperation(EntryEventImpl ev) { + // if (ev.getOperation().isInvalidate()) { + // return EnumListenerEvent.AFTER_INVALIDATE; + // } else if (ev.getOperation().isDestroy()) { + // return EnumListenerEvent.AFTER_DESTROY; + // } else if (ev.getOperation().isUpdate()) { + // return EnumListenerEvent.AFTER_UPDATE; + // } else { + // return EnumListenerEvent.AFTER_CREATE; + // } + // } protected abstract InternalCacheEvent createEvent(DistributedRegion rgn) throws EntryNotFoundException; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java index b68859e2919f..eed61763b9cf 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java @@ -25,7 +25,6 @@ import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList; import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException; -import org.apache.geode.internal.logging.LogService; /** * @@ -72,10 +71,6 @@ public void invalidateExistingEntry(EntryEventImpl event, boolean invokeCallback } catch (ConcurrentCacheModificationException e) { // a newer event has already been applied to the cache. this can happen // in a client cache if another thread is operating on the same key - event.isConcurrencyConflict(true); - LocalRegion lr = event.getLocalRegion(); - LogService.getLogger().info("GGG:invalidateExistingEntry:" + event, new Exception()); - // lr.notifyGatewaySender(EnumListenerEvent.AFTER_INVALIDATE, event); } } @@ -86,10 +81,6 @@ public void updateEntryVersion(EntryEventImpl event) throws EntryNotFoundExcepti } catch (ConcurrentCacheModificationException e) { // a later in time event has already been applied to the cache. this can happen // in a cache if another thread is operating on the same key - event.isConcurrencyConflict(true); - LocalRegion lr = event.getLocalRegion(); - LogService.getLogger().info("GGG:updateEntryVersion:" + event, new Exception()); - // lr.notifyGatewaySender(EnumListenerEvent.TIMESTAMP_UPDATE, event); } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index a55787559e9a..7a2cee19b482 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -518,17 +518,12 @@ protected void processQueue() { // version is < 7.0.1, especially to prevent another loop over events. if (!sendUpdateVersionEvents && event.getOperation() == Operation.UPDATE_VERSION_STAMP) { - logger.debug("Update Event Version event: {} removed from Gateway Sender queue: {}", - event, sender); - - itr.remove(); - statistics.incEventsNotQueued(); - continue; - } - if (((GatewaySenderEventImpl) event).isConcurrencyConflict) { - if (isDebugEnabled) { - logger.debug("primary should ignore the concurrency conflict event:" + event); + if (isTraceEnabled) { + logger.trace( + "Update Event Version event: {} removed from Gateway Sender queue: {}", event, + sender); } + itr.remove(); statistics.incEventsNotQueued(); continue; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java index f2b7072e5441..09bcbac903da 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java @@ -780,7 +780,8 @@ public String toString() { .append(";creationTime=").append(this.creationTime).append(";shadowKey= ") .append(this.shadowKey).append(";timeStamp=").append(this.versionTimeStamp) .append(";acked=").append(this.isAcked).append(";dispatched=").append(this.isDispatched) - .append(";bucketId=").append(this.bucketId).append(this.isConcurrencyConflict).append("]"); + .append(";bucketId=").append(this.bucketId).append(";isConcurrencyConflict=") + .append(this.isConcurrencyConflict).append("]"); return buffer.toString(); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java index 25efcc5670b2..2946dc078613 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java @@ -739,7 +739,7 @@ private void basicHandleSecondaryEvent(final GatewaySenderEventImpl gatewayEvent // first time for the event if (logger.isDebugEnabled()) { logger.debug("{}: fromSecondary event {}:{}->{} added from unprocessed events map", - sender.getId(), gatewayEvent.getEventId(), gatewayEvent.getKey(), + sender.getId(), gatewayEvent.getEventId(), gatewayEvent, gatewayEvent.getValueAsString(true)); } {