Skip to content

Commit

Permalink
Support for multiple listeners in PartitionManagerImpl (#248)
Browse files Browse the repository at this point in the history
  • Loading branch information
lucasponce authored and jshaughn committed Nov 2, 2016
1 parent 48b4771 commit 5f247b6
Showing 1 changed file with 30 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -170,14 +172,14 @@ public class PartitionManagerImpl implements PartitionManager {
private Integer currentNode = null;

/**
* Listener used to interact with the triggers partition events
* Listeners used to interact with the triggers partition events
*/
private PartitionTriggerListener triggerListener;
private Set<PartitionTriggerListener> triggerListeners = new HashSet<>();

/**
* Listener used to interact with the data/events partition events
* Listeners used to interact with the data/events partition events
*/
private PartitionDataListener dataListener;
private Set<PartitionDataListener> dataListeners = new HashSet<>();

private TopologyChangeListener topologyChangeListener = new TopologyChangeListener();
private PartitionChangeListener partitionChangeListener = new PartitionChangeListener();
Expand Down Expand Up @@ -258,7 +260,7 @@ public void notifyTrigger(Operation operation, String tenantId, String triggerId

@Override
public void registerTriggerListener(PartitionTriggerListener triggerListener) {
this.triggerListener = triggerListener;
triggerListeners.add(triggerListener);
}

@Override
Expand Down Expand Up @@ -303,7 +305,7 @@ public void notifyEvents(Collection<Event> events) {

@Override
public void registerDataListener(PartitionDataListener dataListener) {
this.dataListener = dataListener;
dataListeners.add(dataListener);
}

/*
Expand Down Expand Up @@ -547,7 +549,7 @@ protected Map<String, Map<String, List<String>>> getAddedRemovedPartition(Map<Pa
Invoke PartitionTriggerListener with local, added and removed partition
*/
private void invokePartitionChangeListener() {
if (triggerListener != null) {
if (!triggerListeners.isEmpty()) {
Map<PartitionEntry, Integer> current = (Map<PartitionEntry, Integer>) partitionCache.get(CURRENT);
Map<PartitionEntry, Integer> previous = (Map<PartitionEntry, Integer>) partitionCache.get(PREVIOUS);

Expand All @@ -562,7 +564,9 @@ private void invokePartitionChangeListener() {
log.debug("Added: " + addedRemoved.get("added"));
log.debug("Removed: " + addedRemoved.get("removed"));
}
triggerListener.onPartitionChange(partition, addedRemoved.get("removed"), addedRemoved.get("added"));
triggerListeners.stream().forEach(triggerListener -> {
triggerListener.onPartitionChange(partition, addedRemoved.get("removed"), addedRemoved.get("added"));
});
}
}

Expand Down Expand Up @@ -655,9 +659,11 @@ public void onNewTrigger(CacheEntryCreatedEvent cacheEvent) {
/*
Finally invoke listener
*/
if (triggerListener != null) {
triggerListener.onTriggerChange(newTrigger.getOperation(), newTrigger.getTenantId(),
newTrigger.getTriggerId());
if (!triggerListeners.isEmpty()) {
triggerListeners.stream().forEach(triggerListener -> {
triggerListener.onTriggerChange(newTrigger.getOperation(), newTrigger.getTenantId(),
newTrigger.getTriggerId());
});
}
}
}
Expand Down Expand Up @@ -705,15 +711,23 @@ public void onNewData(CacheEntryCreatedEvent cacheEvent) {
/*
Finally invoke listener on non-sender nodes
*/
if (dataListener != null && newData.getFromNode() != currentNode) {
if (!dataListeners.isEmpty() && newData.getFromNode() != currentNode) {
if (newData.getData() != null) {
dataListener.onNewData(newData.getData());
dataListeners.stream().forEach(dataListener -> {
dataListener.onNewData(newData.getData());
});
} else if (newData.getEvent() != null) {
dataListener.onNewEvent(newData.getEvent());
dataListeners.stream().forEach(dataListener -> {
dataListener.onNewEvent(newData.getEvent());
});
} else if (newData.getDataCollection() != null) {
dataListener.onNewData(newData.getDataCollection());
dataListeners.stream().forEach(dataListener -> {
dataListener.onNewData(newData.getDataCollection());
});
} else if (newData.getEventCollection() != null) {
dataListener.onNewEvents(newData.getEventCollection());
dataListeners.stream().forEach(dataListener -> {
dataListener.onNewEvents(newData.getEventCollection());
});
}
}
}
Expand Down

0 comments on commit 5f247b6

Please sign in to comment.