Skip to content

Commit

Permalink
Improve synchronization of provisioning listeners
Browse files Browse the repository at this point in the history
1. There were some places where synchronization was missing
2. We also added diagnostics of listeners leaks
  • Loading branch information
mederly committed May 24, 2019
1 parent 08b9498 commit bf9ddac
Showing 1 changed file with 32 additions and 16 deletions.
Expand Up @@ -50,7 +50,13 @@
@Component
public class ChangeNotificationDispatcherImpl implements ChangeNotificationDispatcher {

private static final int MAX_LISTENERS = 1000; // just to make sure we don't grow indefinitely
// just to make sure we don't grow indefinitely
private static final int LISTENERS_SOFT_LIMIT = 500;
private static final int LISTENERS_HARD_LIMIT = 2000;

private static final long LISTENERS_WARNING_INTERVAL = 2000;

private long lastListenersWarning = 0;

private boolean filterProtectedObjects = true;
private List<ResourceObjectChangeListener> changeListeners = new ArrayList<>(); // use synchronized access only!
Expand Down Expand Up @@ -85,14 +91,20 @@ public synchronized void registerNotificationListener(ResourceObjectChangeListen
}

private void checkSize(List<?> listeners, String name) {
if (listeners.size() > MAX_LISTENERS) {
throw new IllegalStateException("Possible listeners leak: number of " + name + " exceeded the threshold of " + MAX_LISTENERS);
int size = listeners.size();
LOGGER.trace("Listeners of type '{}': {}", name, size);
if (size > LISTENERS_HARD_LIMIT) {
throw new IllegalStateException("Possible listeners leak: number of " + name + " exceeded the threshold of " + LISTENERS_SOFT_LIMIT);
} else if (size > LISTENERS_SOFT_LIMIT && System.currentTimeMillis() - lastListenersWarning >= LISTENERS_WARNING_INTERVAL) {
LOGGER.warn("Too many listeners of type '{}': {} (soft limit: {}, hard limit: {})", name, size,
LISTENERS_SOFT_LIMIT, LISTENERS_HARD_LIMIT);
lastListenersWarning = System.currentTimeMillis();
}
}

private synchronized List<ResourceObjectChangeListener> getChangeListenersSnapshot() {
if (changeListeners != null) {
return new ArrayList<>(changeListeners);
private synchronized <T> List<T> getListenersSnapshot(List<T> listeners) {
if (listeners != null) {
return new ArrayList<>(listeners);
} else {
return new ArrayList<>();
}
Expand Down Expand Up @@ -131,7 +143,7 @@ public synchronized void registerNotificationListener(ResourceEventListener list
}

@Override
public void unregisterNotificationListener(ResourceEventListener listener) {
public synchronized void unregisterNotificationListener(ResourceEventListener listener) {
eventListeners.remove(listener);

}
Expand All @@ -155,7 +167,7 @@ public void notifyChange(ResourceObjectShadowChangeDescription change, Task task
if (InternalsConfig.consistencyChecks) change.checkConsistence();

// sometimes there is registration/deregistration from within
List<ResourceObjectChangeListener> changeListenersSnapshot = getChangeListenersSnapshot();
List<ResourceObjectChangeListener> changeListenersSnapshot = getListenersSnapshot(changeListeners);
if (!changeListenersSnapshot.isEmpty()) {
for (ResourceObjectChangeListener listener : changeListenersSnapshot) {
try {
Expand Down Expand Up @@ -184,8 +196,9 @@ public void notifyFailure(ResourceOperationDescription failureDescription, Task

failureDescription.checkConsistence();

if ((null != operationListeners) && (!operationListeners.isEmpty())) {
for (ResourceOperationListener listener : new ArrayList<>(operationListeners)) { // sometimes there is registration/deregistration from within
List<ResourceOperationListener> operationListenersSnapshot = getListenersSnapshot(operationListeners);
if (!operationListenersSnapshot.isEmpty()) {
for (ResourceOperationListener listener : operationListenersSnapshot) {
//LOGGER.trace("Listener: {}", listener.getClass().getSimpleName());
try {
listener.notifyFailure(failureDescription, task, parentResult);
Expand All @@ -208,8 +221,9 @@ public void notifySuccess(ResourceOperationDescription successDescription, Task

successDescription.checkConsistence();

if ((null != operationListeners) && (!operationListeners.isEmpty())) {
for (ResourceOperationListener listener : new ArrayList<>(operationListeners)) { // sometimes there is registration/deregistration from within
List<ResourceOperationListener> operationListenersSnapshot = getListenersSnapshot(operationListeners);
if (!operationListenersSnapshot.isEmpty()) {
for (ResourceOperationListener listener : operationListenersSnapshot) {
//LOGGER.trace("Listener: {}", listener.getClass().getSimpleName());
try {
listener.notifySuccess(successDescription, task, parentResult);
Expand All @@ -233,8 +247,9 @@ public void notifyInProgress(ResourceOperationDescription inProgressDescription,

inProgressDescription.checkConsistence();

if ((null != operationListeners) && (!operationListeners.isEmpty())) {
for (ResourceOperationListener listener : new ArrayList<>(operationListeners)) { // sometimes there is registration/deregistration from within
List<ResourceOperationListener> operationListenersSnapshot = getListenersSnapshot(operationListeners);
if (!operationListenersSnapshot.isEmpty()) {
for (ResourceOperationListener listener : operationListenersSnapshot) {
//LOGGER.trace("Listener: {}", listener.getClass().getSimpleName());
try {
listener.notifyInProgress(inProgressDescription, task, parentResult);
Expand Down Expand Up @@ -272,8 +287,9 @@ public void notifyEvent(ResourceEventDescription eventDescription,

// if (InternalsConfig.consistencyChecks) eventDescription.checkConsistence();

if ((null != eventListeners) && (!eventListeners.isEmpty())) {
for (ResourceEventListener listener : new ArrayList<>(eventListeners)) { // sometimes there is registration/deregistration from within
List<ResourceEventListener> eventListenersSnapshot = getListenersSnapshot(eventListeners);
if (!eventListenersSnapshot.isEmpty()) {
for (ResourceEventListener listener : eventListenersSnapshot) {
try {
listener.notifyEvent(eventDescription, task, parentResult);
} catch (RuntimeException e) {
Expand Down

0 comments on commit bf9ddac

Please sign in to comment.