Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1183,6 +1183,7 @@ public boolean destroy(EntryEventImpl event, boolean inTokenMode, boolean during
true/* conflict with clear */, duringRI, true);
doPart3 = true;
} catch (ConcurrentCacheModificationException ccme) {
event.isConcurrencyConflict(true);
VersionTag tag = event.getVersionTag();
if (tag != null && tag.isTimeStampUpdated()) {
// Notify gateways of new time-stamp.
Expand Down Expand Up @@ -2092,6 +2093,7 @@ public boolean invalidate(EntryEventImpl event, boolean invokeCallbacks, boolean
}
} // !opCompleted
} catch (ConcurrentCacheModificationException ccme) {
event.isConcurrencyConflict(true);
VersionTag tag = event.getVersionTag();
if (tag != null && tag.isTimeStampUpdated()) {
// Notify gateways of new time-stamp.
Expand Down Expand Up @@ -2849,6 +2851,7 @@ public RegionEntry basicPut(EntryEventImpl event, final long lastModified, final
clearOccured = true;
owner.recordEvent(event);
} catch (ConcurrentCacheModificationException ccme) {
event.isConcurrencyConflict(true);
VersionTag tag = event.getVersionTag();
if (tag != null && tag.isTimeStampUpdated()) {
// Notify gateways of new time-stamp.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1294,8 +1294,21 @@ protected void dispatchElidedEvent(LocalRegion rgn, EntryEventImpl ev) {
ev.isConcurrencyConflict(true);
rgn.generateLocalFilterRouting(ev);
rgn.notifyBridgeClients(ev);
// rgn.notifyGatewaySender(getOperation(ev), ev);
}

// private EnumListenerEvent getOperation(EntryEventImpl ev) {
// if (ev.getOperation().isInvalidate()) {
// return EnumListenerEvent.AFTER_INVALIDATE;
// } else if (ev.getOperation().isDestroy()) {
// return EnumListenerEvent.AFTER_DESTROY;
// } else if (ev.getOperation().isUpdate()) {
// return EnumListenerEvent.AFTER_UPDATE;
// } else {
// return EnumListenerEvent.AFTER_CREATE;
// }
// }

protected abstract InternalCacheEvent createEvent(DistributedRegion rgn)
throws EntryNotFoundException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2853,6 +2853,8 @@ protected Object findObjectInSystem(KeyInfo keyInfo, boolean isCreate, TXStateIn
logger.debug("caught concurrent modification attempt when applying {}", event);
}
notifyBridgeClients(event);
notifyGatewaySender(event.getOperation().isUpdate() ? EnumListenerEvent.AFTER_UPDATE
: EnumListenerEvent.AFTER_CREATE, event);
}
if (!getDataView().isDeferredStats()) {
getCachePerfStats().endPut(startPut, event.isOriginRemote());
Expand Down Expand Up @@ -5630,6 +5632,9 @@ boolean virtualPut(final EntryEventImpl event, final boolean ifNew, final boolea
logger.debug("caught concurrent modification attempt when applying {}", event);
}
notifyBridgeClients(event);
notifyGatewaySender(event.getOperation().isUpdate() ? EnumListenerEvent.AFTER_UPDATE
: EnumListenerEvent.AFTER_CREATE, event);

return false;
}

Expand Down Expand Up @@ -5856,6 +5861,9 @@ void notifyTimestampsToGateways(EntryEventImpl event) {
updateTimeStampEvent.setGenerateCallbacks(false);
updateTimeStampEvent.distributedMember = event.getDistributedMember();
updateTimeStampEvent.setNewEventId(getSystem());
if (event.isConcurrencyConflict()) {
updateTimeStampEvent.isConcurrencyConflict(true);
}

if (event.getRegion() instanceof BucketRegion) {
BucketRegion bucketRegion = (BucketRegion) event.getRegion();
Expand Down Expand Up @@ -6117,8 +6125,7 @@ public boolean notifiesMultipleSerialGateways() {
}

protected void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) {
if (isPdxTypesRegion() || event.isConcurrencyConflict()) {
// isConcurrencyConflict is usually a concurrent cache modification problem
if (isPdxTypesRegion()) {
return;
}

Expand All @@ -6142,9 +6149,10 @@ protected void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl e
if (allRemoteDSIds != null) {
for (GatewaySender sender : getCache().getAllGatewaySenders()) {
if (allGatewaySenderIds.contains(sender.getId())) {
// TODO: This is a BUG. Why return and not continue?
if (!this.getDataPolicy().withStorage() && sender.isParallel()) {
return;
// if isConcurrencyConflict is true, only notify serial gateway sender
if ((!this.getDataPolicy().withStorage() || event.isConcurrencyConflict())
&& sender.isParallel()) {
continue;
}
if (logger.isDebugEnabled()) {
logger.debug("Notifying the GatewaySender : {}", sender.getId());
Expand Down Expand Up @@ -6503,6 +6511,7 @@ private boolean mapDestroy(final EntryEventImpl event, final boolean cacheWrite,
if (event.getVersionTag() != null && !event.getVersionTag().isGatewayTag()) {
notifyBridgeClients(event);
}
notifyGatewaySender(EnumListenerEvent.AFTER_DESTROY, event);
return true; // event was elided

} catch (DiskAccessException dae) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ public class GatewaySenderEventImpl

protected boolean isInitialized;

public boolean isConcurrencyConflict = false;

/**
* Is this thread in the process of serializing this event?
*/
Expand Down Expand Up @@ -312,6 +314,7 @@ public GatewaySenderEventImpl(EnumListenerEvent operation, CacheEvent ce, Object
if (initialize) {
initialize();
}
this.isConcurrencyConflict = event.isConcurrencyConflict();
}

/**
Expand Down Expand Up @@ -777,7 +780,8 @@ public String toString() {
.append(";creationTime=").append(this.creationTime).append(";shadowKey= ")
.append(this.shadowKey).append(";timeStamp=").append(this.versionTimeStamp)
.append(";acked=").append(this.isAcked).append(";dispatched=").append(this.isDispatched)
.append(";bucketId=").append(this.bucketId).append("]");
.append(";bucketId=").append(this.bucketId).append(";isConcurrencyConflict=")
.append(this.isConcurrencyConflict).append("]");
return buffer.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,10 @@ protected void handleFailover() {
if (o != null && o instanceof GatewaySenderEventImpl) {
GatewaySenderEventImpl ge = (GatewaySenderEventImpl) o;
EventWrapper unprocessedEvent = this.unprocessedEvents.remove(ge.getEventId());
if (unprocessedEvent != null && ge.isConcurrencyConflict) {
logger.info(
"GGG:secondary after removed by handleFailover:" + unprocessedEvent + ":" + ge);
}
if (unprocessedEvent != null) {
unprocessedEvent.event.release();
if (this.unprocessedEvents.isEmpty()) {
Expand Down Expand Up @@ -379,6 +383,7 @@ private void releaseUnprocessedEvents() {
if (m != null) {
for (EventWrapper ew : m.values()) {
GatewaySenderEventImpl gatewayEvent = ew.event;
logger.info("GGG:releaseUnprocessedEvents:" + gatewayEvent);
gatewayEvent.release();
}
this.unprocessedEvents = null;
Expand Down Expand Up @@ -632,6 +637,10 @@ protected void basicHandlePrimaryDestroy(final GatewaySenderEventImpl gatewayEve
return;
// now we can safely use the unprocessedEvents field
EventWrapper ew = this.unprocessedEvents.remove(gatewayEvent.getEventId());
if (ew != null && gatewayEvent.isConcurrencyConflict) {
logger.info("GGG:secondary after removed by destroy listener:" + ew + ":" + gatewayEvent);
}

if (ew != null) {
ew.event.release();
statistics.incUnprocessedEventsRemovedByPrimary();
Expand All @@ -651,8 +660,16 @@ protected void basicHandlePrimaryEvent(final GatewaySenderEventImpl gatewayEvent
return;
// now we can safely use the unprocessedEvents field
EventWrapper ew = this.unprocessedEvents.remove(gatewayEvent.getEventId());
if (ew != null && gatewayEvent.isConcurrencyConflict) {
logger.info("GGG:secondary after removed by create listener:" + ew + ":" + gatewayEvent,
new Exception());
}

if (ew == null) {
if (gatewayEvent.isConcurrencyConflict) {
logger.info("GGG:secondary before add to by create listener:" + gatewayEvent,
new Exception());
}
// first time for the event
if (logger.isTraceEnabled()) {
logger.trace("{}: fromPrimary event {} : {}->{} added to unprocessed token map",
Expand Down Expand Up @@ -711,12 +728,18 @@ private void basicHandleSecondaryEvent(final GatewaySenderEventImpl gatewayEvent
// @todo add an assertion that !getPrimary()
// now we can safely use the unprocessedEvents field
Long v = this.unprocessedTokens.remove(gatewayEvent.getEventId());
if (v != null) {
logger.info("GGG:secondary after removed token:" + v + ":" + gatewayEvent);
}

if (v == null) {
if (gatewayEvent.isConcurrencyConflict) {
logger.info("GGG:secondary before add to:" + gatewayEvent, new Exception());
}
// first time for the event
if (logger.isTraceEnabled()) {
logger.trace("{}: fromSecondary event {}:{}->{} added from unprocessed events map",
sender.getId(), gatewayEvent.getEventId(), gatewayEvent.getKey(),
if (logger.isDebugEnabled()) {
logger.debug("{}: fromSecondary event {}:{}->{} added from unprocessed events map",
sender.getId(), gatewayEvent.getEventId(), gatewayEvent,
gatewayEvent.getValueAsString(true));
}
{
Expand All @@ -736,8 +759,8 @@ private void basicHandleSecondaryEvent(final GatewaySenderEventImpl gatewayEvent
}
} else {
// token already added by primary already removed
if (logger.isTraceEnabled()) {
logger.trace("{}: Secondary created event {}:{}->{} removed from unprocessed events map",
if (logger.isDebugEnabled()) {
logger.debug("{}: Secondary created event {}:{}->{} removed from unprocessed events map",
sender.getId(), gatewayEvent.getEventId(), gatewayEvent.getKey(),
gatewayEvent.getValueAsString(true));
}
Expand Down