Skip to content

Commit

Permalink
Merge pull request #251 from jshaughn/notifier
Browse files Browse the repository at this point in the history
HWKALERTS-191 Prevent excessive cache updates
  • Loading branch information
lucasponce committed Nov 7, 2016
2 parents 5dc5131 + f5a030f commit 9eff7dc
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 99 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015 Red Hat, Inc. and/or its affiliates
* Copyright 2015-2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -16,31 +16,60 @@
*/
package org.hawkular.alerts.api.services;

import org.hawkular.alerts.api.model.dampening.Dampening;
import org.hawkular.alerts.api.model.trigger.Trigger;

/**
* Immutable definitions event.
* Immutable definitions change event. The target object depends on the change type.
*
* @author jay shaughnessy
* @author lucas ponce
*/
public class DefinitionsEvent {

public enum Type {
CONDITION_CHANGE,
DAMPENING_CHANGE,
TRIGGER_CONDITION_CHANGE,
TRIGGER_CREATE,
TRIGGER_REMOVE,
TRIGGER_UPDATE
};

private Type type;
private String targetTenantId;
private String targetId;

public DefinitionsEvent(Type type, Dampening dampening) {
this(type, dampening.getTenantId(), dampening.getDampeningId());
}

public DefinitionsEvent(Type type) {
public DefinitionsEvent(Type type, Trigger trigger) {
this(type, trigger.getTenantId(), trigger.getId());
}

public DefinitionsEvent(Type type, String targetTenantId, String targetId) {
super();
this.type = type;
this.targetTenantId = targetTenantId;
this.targetId = targetId;
}

public Type getType() {
return type;
}

public String getTargetTenantId() {
return targetTenantId;
}

public String getTargetId() {
return targetId;
}

@Override
public String toString() {
return "DefinitionsEvent [type=" + type + ", targetTenantId=" + targetTenantId + ", targetId=" + targetId
+ "]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.hawkular.alerts.engine.cache;

import static org.hawkular.alerts.api.services.DefinitionsEvent.Type.CONDITION_CHANGE;
import static org.hawkular.alerts.api.services.DefinitionsEvent.Type.TRIGGER_CONDITION_CHANGE;
import static org.hawkular.alerts.api.services.DefinitionsEvent.Type.TRIGGER_REMOVE;

import java.util.Collection;
Expand Down Expand Up @@ -60,6 +60,9 @@ public class CacheManager {
private static final String RESET_PUBLISH_CACHE_PROP = "hawkular-alerts.reset-publish-cache";
private static final String RESET_PUBLISH_CACHE_ENV = "RESET_PUBLISH_CACHE";

private volatile boolean updateRequested = false;
private volatile boolean updating = false;

@EJB
PropertiesService properties;

Expand All @@ -82,64 +85,91 @@ public void init() {
publishCache.clear();
}
msgLog.infoInitPublishCache();
updateActiveIds();
requestCacheUpdate();

definitions.registerListener(e -> {
updateActiveIds();
}, CONDITION_CHANGE, TRIGGER_REMOVE);
requestCacheUpdate();
}, TRIGGER_CONDITION_CHANGE, TRIGGER_REMOVE);

} else {
msgLog.warnDisabledPublishCache();
}
}

private synchronized void updateActiveIds() {
// Just run updateCache one time if multiple requests come in while an update is already in progress...
private void requestCacheUpdate() {
log.debug("Cache update requested");
if (updateRequested) {
log.debug("Cache update, redundant request ignored.");
return;
}

updateRequested = true;

if (!updating) {
updateCache();
}
}

private synchronized void updateCache() {
try {
CacheSet<CacheKey> currentlyPublished = publishCache.keySet();
updating = true;

log.debugf("Published before update=%s", currentlyPublished.size());
if (log.isTraceEnabled()) {
publishCache.entrySet().stream().forEach(e -> log.tracef("Published: %s", e.getValue()));
}
while (updateRequested) {
updateRequested = false;
log.debug("Cache update in progress..");

// This will include group trigger conditions, which is OK because for data-driven group triggers the
// dataIds will likely be the dataIds from the group level, made distinct by the source.
Collection<Condition> conditions = definitions.getAllConditions();
final Set<CacheKey> activeKeys = new HashSet<>();
for (Condition c : conditions) {
CacheKey cacheKey = new CacheKey(c.getTenantId(), c.getDataId());
if (!activeKeys.contains(cacheKey)) {
activeKeys.add(cacheKey);
if (!currentlyPublished.contains(cacheKey)) {
publish(cacheKey);
}
CacheSet<CacheKey> currentlyPublished = publishCache.keySet();

log.debugf("Published before update=%s", currentlyPublished.size());
if (log.isTraceEnabled()) {
publishCache.entrySet().stream().forEach(e -> log.tracef("Published: %s", e.getValue()));
}
if (c instanceof CompareCondition) {
String data2Id = ((CompareCondition) c).getData2Id();
CacheKey cacheKey2 = new CacheKey(c.getTenantId(), data2Id);
if (!activeKeys.contains(cacheKey2)) {
activeKeys.add(cacheKey2);
if (!currentlyPublished.contains(cacheKey2)) {
publish(cacheKey2);

// This will include group trigger conditions, which is OK because for data-driven group triggers the
// dataIds will likely be the dataIds from the group level, made distinct by the source.
Collection<Condition> conditions = definitions.getAllConditions();
final Set<CacheKey> activeKeys = new HashSet<>();
for (Condition c : conditions) {
CacheKey cacheKey = new CacheKey(c.getTenantId(), c.getDataId());
if (!activeKeys.contains(cacheKey)) {
activeKeys.add(cacheKey);
if (!currentlyPublished.contains(cacheKey)) {
publish(cacheKey);
}
}
if (c instanceof CompareCondition) {
String data2Id = ((CompareCondition) c).getData2Id();
CacheKey cacheKey2 = new CacheKey(c.getTenantId(), data2Id);
if (!activeKeys.contains(cacheKey2)) {
activeKeys.add(cacheKey2);
if (!currentlyPublished.contains(cacheKey2)) {
publish(cacheKey2);
}
}
}
}

}
}

final Set<CacheKey> doomedKeys = new HashSet<>();
if (!currentlyPublished.isEmpty()) {
currentlyPublished.stream()
.filter(k -> !activeKeys.contains(k))
.forEach(k -> doomedKeys.add(k));
}
unpublish(doomedKeys);
final Set<CacheKey> doomedKeys = new HashSet<>();
if (!currentlyPublished.isEmpty()) {
currentlyPublished.stream()
.filter(k -> !activeKeys.contains(k))
.forEach(k -> doomedKeys.add(k));
}
unpublish(doomedKeys);

log.debugf("Published after update=%s", publishCache.size());
if (log.isTraceEnabled()) {
publishCache.entrySet().stream().forEach(e -> log.tracef("Published: %s", e.getValue()));
log.debugf("Published after update=%s", publishCache.size());
if (log.isTraceEnabled()) {
publishCache.entrySet().stream().forEach(e -> log.tracef("Published: %s", e.getValue()));
}
}
} catch (Exception e) {
log.error("FAILED to load conditions to create Id filters. All data being forwarded to alerting!", e);
log.error("Failed to load conditions to create Id filters. All data being forwarded to alerting!", e);
return;
} finally {
log.debug("Cache updates complete.");
updating = false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ private void addTrigger(Trigger trigger) throws Exception {
alertsEngine.addTrigger(trigger.getTenantId(), trigger.getId());
}

notifyListeners(DefinitionsEvent.Type.TRIGGER_CREATE);
notifyListeners(new DefinitionsEvent(Type.TRIGGER_CREATE, trigger));
}

private void insertTriggerActions(Trigger trigger) throws Exception {
Expand Down Expand Up @@ -343,7 +343,7 @@ private void removeTrigger(Trigger trigger) throws Exception {
alertsEngine.removeTrigger(tenantId, triggerId);
}

notifyListeners(DefinitionsEvent.Type.TRIGGER_REMOVE);
notifyListeners(new DefinitionsEvent(Type.TRIGGER_REMOVE, tenantId, triggerId));
}

@Override
Expand Down Expand Up @@ -453,7 +453,7 @@ public void updateTriggerEnablement(String tenantId, String triggerId, boolean e
alertsEngine.reloadTrigger(tenantId, triggerId);
}

notifyListeners(DefinitionsEvent.Type.TRIGGER_UPDATE);
notifyListeners(new DefinitionsEvent(Type.TRIGGER_UPDATE, tenantId, triggerId));
}

private Trigger copyGroupTrigger(Trigger group, Trigger member, boolean isNewMember) {
Expand Down Expand Up @@ -531,7 +531,7 @@ private Trigger updateTrigger(Trigger trigger, Set<TriggerAction> existingAction
alertsEngine.reloadTrigger(trigger.getTenantId(), trigger.getId());
}

notifyListeners(DefinitionsEvent.Type.TRIGGER_UPDATE);
notifyListeners(new DefinitionsEvent(Type.TRIGGER_UPDATE, trigger));

return trigger;
}
Expand Down Expand Up @@ -1293,7 +1293,7 @@ private Dampening addDampening(Dampening dampening) throws Exception {
alertsEngine.reloadTrigger(dampening.getTenantId(), dampening.getTriggerId());
}

notifyListeners(DefinitionsEvent.Type.DAMPENING_CHANGE);
notifyListeners(new DefinitionsEvent(Type.DAMPENING_CHANGE, dampening));

return dampening;
}
Expand Down Expand Up @@ -1389,7 +1389,7 @@ private void removeDampening(Dampening dampening) throws Exception {
alertsEngine.reloadTrigger(dampening.getTenantId(), dampening.getTriggerId());
}

notifyListeners(DefinitionsEvent.Type.DAMPENING_CHANGE);
notifyListeners(new DefinitionsEvent(Type.DAMPENING_CHANGE, dampening));
}

@Override
Expand Down Expand Up @@ -1469,7 +1469,7 @@ private Dampening updateDampening(Dampening dampening) throws Exception {
alertsEngine.reloadTrigger(dampening.getTenantId(), dampening.getTriggerId());
}

notifyListeners(DefinitionsEvent.Type.DAMPENING_CHANGE);
notifyListeners(new DefinitionsEvent(Type.DAMPENING_CHANGE, dampening));

return dampening;
}
Expand Down Expand Up @@ -2061,7 +2061,7 @@ public Collection<Condition> setConditions(String tenantId, String triggerId, Mo
alertsEngine.reloadTrigger(tenantId, triggerId);
}

notifyListeners(DefinitionsEvent.Type.CONDITION_CHANGE);
notifyListeners(new DefinitionsEvent(Type.TRIGGER_CONDITION_CHANGE, tenantId, triggerId));

return conditions;
}
Expand Down Expand Up @@ -2769,20 +2769,16 @@ public void registerListener(DefinitionsListener listener, Type eventType, Type.
alertsContext.registerDefinitionListener(listener, eventType, eventTypes);
}

private void notifyListeners(Type eventType) {
DefinitionsEvent de = new DefinitionsEvent(eventType);
private void notifyListeners(DefinitionsEvent de) {
if (log.isDebugEnabled()) {
log.debug("Notifying applicable listeners " + alertsContext.getDefinitionListeners() +
" of event " + eventType.name());
}
for (Entry<DefinitionsListener, Set<Type>> me : alertsContext.getDefinitionListeners().entrySet()) {
if (me.getValue().contains(eventType)) {
if (log.isDebugEnabled()) {
log.debug("Notified Listener " + eventType.name());
}
me.getKey().onChange(de);
}
}
log.debugf("Notifying applicable listeners %s of event", alertsContext.getDefinitionListeners(), de);
}
alertsContext.getDefinitionListeners().entrySet().stream()
.filter(e -> e.getValue().contains(de.getType()))
.forEach(e -> {
log.debugf("Notified Listener %s of %s", e.getKey(), de.getType().name());
e.getKey().onChange(de);
});
}

@Override
Expand Down

0 comments on commit 9eff7dc

Please sign in to comment.