Skip to content

Commit

Permalink
HWKALERTS-210 Add missing events for PartitionManageImpl (#273)
Browse files Browse the repository at this point in the history
- Extend the modified entry events for the internal caches used to manage the PartitionManager logic
  • Loading branch information
lucasponce authored and jshaughn committed Dec 13, 2016
1 parent db4d68e commit 77fd17e
Showing 1 changed file with 78 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.event.CacheEntryCreatedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent;
import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
import org.jboss.logging.Logger;
Expand Down Expand Up @@ -585,64 +587,86 @@ public void onPartitionModified(CacheEntryCreatedEvent cacheEvent) {

@Listener
public class NewTriggerListener {

@CacheEntryCreated
public void onNewTrigger(CacheEntryCreatedEvent cacheEvent) {
public void onNewNotifyTrigger(CacheEntryCreatedEvent cacheEvent) {
if (cacheEvent.isPre()) {
if (log.isTraceEnabled()) {
log.trace("Discarding pre onNewTrigger(@CacheEntryCreated) event");
log.trace("Discarding pre onNewNotifyTrigger(@CacheEntryCreated) event");
}
return;
}
/*
When a trigger is added, updated or removed it should be notified on the PartitionManager.
PartitionManager adds an entry on "triggers" cache to fire an event that will place the trigger
on the partition and invoke PartitionTriggerListener previously registered to process the event.
*/
NotifyTrigger newTrigger = (NotifyTrigger)triggersCache.get(cacheEvent.getKey());
NotifyTrigger notifyTrigger = (NotifyTrigger)triggersCache.get(cacheEvent.getKey());
if (log.isDebugEnabled()) {
log.debug("onNewTrigger(@CacheEntryCreated) received on " + currentNode);
log.debug("onNewNotifyTrigger(@CacheEntryCreated) received on " + currentNode);
log.debug("CacheEvent: " + cacheEvent);
log.debug("NotifyTrigger: " + newTrigger);
log.debug("NotifyTrigger: " + notifyTrigger);
}
processNotifyTrigger(notifyTrigger);
}

@CacheEntryModified
public void onModifiedNotifyTrigger(CacheEntryModifiedEvent cacheEvent) {
if (cacheEvent.isPre()) {
if (log.isTraceEnabled()) {
log.trace("Discarding pre onModifiedNotifyTrigger(@CacheEntryModified) event");
}
return;
}
NotifyTrigger notifyTrigger = (NotifyTrigger)triggersCache.get(cacheEvent.getKey());
if (log.isDebugEnabled()) {
log.debug("onModifiedNotifyTrigger(@CacheEntryModified) received on " + currentNode);
log.debug("CacheEvent: " + cacheEvent);
log.debug("NotifyTrigger: " + notifyTrigger);
}
processNotifyTrigger(notifyTrigger);
}

/*
When a trigger is added, updated or removed it should be notified on the PartitionManager.
PartitionManager adds an entry on "triggers" cache to fire an event that will place the trigger
on the partition and invoke PartitionTriggerListener previously registered to process the event.
*/
private void processNotifyTrigger(NotifyTrigger notifyTrigger) {
/*
A trigger should be processed on the target node
*/
if (null != newTrigger.toNode && null != currentNode && newTrigger.toNode.equals(currentNode)) {
if (null != notifyTrigger.toNode && null != currentNode && notifyTrigger.toNode.equals(currentNode)) {
/*
Update partition
*/
Map<PartitionEntry, Integer> current = (Map) partitionCache.get(CURRENT);
PartitionEntry newEntry = new PartitionEntry(newTrigger.getTenantId(),
newTrigger.getTriggerId());
PartitionEntry newEntry = new PartitionEntry(notifyTrigger.getTenantId(),
notifyTrigger.getTriggerId());
boolean exist = current.containsKey(newEntry);
if (exist) {
Integer partitionNode = current.get(newEntry);
switch (newTrigger.getOperation()) {
switch (notifyTrigger.getOperation()) {
case ADD:
case UPDATE:
/*
Partition is updated if information is outdated
*/
if (!partitionNode.equals(currentNode)) {
modifyPartition(newEntry, current, newTrigger.getOperation());
modifyPartition(newEntry, current, notifyTrigger.getOperation());
}
break;
case REMOVE:
modifyPartition(newEntry, current, newTrigger.getOperation());
modifyPartition(newEntry, current, notifyTrigger.getOperation());
break;
}
} else {
if (!newTrigger.getOperation().equals(Operation.REMOVE)) {
modifyPartition(newEntry, current, newTrigger.getOperation());
if (!notifyTrigger.getOperation().equals(Operation.REMOVE)) {
modifyPartition(newEntry, current, notifyTrigger.getOperation());
}
}
/*
Finally invoke listener
*/
if (!triggerListeners.isEmpty()) {
triggerListeners.stream().forEach(triggerListener -> {
triggerListener.onTriggerChange(newTrigger.getOperation(), newTrigger.getTenantId(),
newTrigger.getTriggerId());
triggerListener.onTriggerChange(notifyTrigger.getOperation(), notifyTrigger.getTenantId(),
notifyTrigger.getTriggerId());
});
}
}
Expand Down Expand Up @@ -670,35 +694,57 @@ private void modifyPartition(PartitionEntry entry, Map<PartitionEntry, Integer>

@Listener
public class NewDataListener {

@CacheEntryCreated
public void onNewData(CacheEntryCreatedEvent cacheEvent) {
public void onNewNotifyData(CacheEntryCreatedEvent cacheEvent) {
if (cacheEvent.isPre()) {
if (log.isTraceEnabled()) {
log.trace("Discarding pre onNewData(@CacheEntryCreated) event");
log.trace("Discarding pre onNewNotifyData(@CacheEntryCreated) event");
}
return;
}
NotifyData notifyData = (NotifyData)dataCache.get(cacheEvent.getKey());
if (log.isDebugEnabled()) {
log.debug("onNewNotifyData(@CacheEntryCreated) received.");
log.debug("NotifyData: " + notifyData);
}
processNotifyData(notifyData);
}

@CacheEntryModified
public void onModifiedNotifyData(CacheEntryModifiedEvent cacheEvent) {
if (cacheEvent.isPre()) {
if (log.isTraceEnabled()) {
log.trace("Discarding pre onModifiedNotifyData(@CacheEntryModified) event");
}
return;
}
NotifyData notifyData = (NotifyData)dataCache.get(cacheEvent.getKey());
if (log.isDebugEnabled()) {
log.debug("onModifiedNotifyData(@CacheEntryModified) received.");
log.debug("NotifyData: " + notifyData);
}
processNotifyData(notifyData);
}

/*
When a new data/event is added it should be notified on the PartitionManager.
PartitionManager adds an entry on "data" cache to fire an event that will propagate the
across the nodes invoking previously registered PartitionDataListener.
*/
NotifyData newData = (NotifyData)dataCache.get(cacheEvent.getKey());
if (log.isDebugEnabled()) {
log.debug("onNewData(@CacheEntryCreated) received.");
log.debug("NotifyData: " + newData);
}

private void processNotifyData(NotifyData notifyData) {
/*
Finally invoke listener on non-sender nodes
*/
if (!dataListeners.isEmpty() && newData.getFromNode() != currentNode) {
if (newData.getDataCollection() != null) {
if (!dataListeners.isEmpty() && notifyData.getFromNode() != currentNode) {
if (notifyData.getDataCollection() != null) {
dataListeners.stream().forEach(dataListener -> {
dataListener.onNewData(newData.getDataCollection());
dataListener.onNewData(notifyData.getDataCollection());
});
} else if (newData.getEventCollection() != null) {
} else if (notifyData.getEventCollection() != null) {
dataListeners.stream().forEach(dataListener -> {
dataListener.onNewEvents(newData.getEventCollection());
dataListener.onNewEvents(notifyData.getEventCollection());
});
}
}
Expand Down

0 comments on commit 77fd17e

Please sign in to comment.