Skip to content

Commit

Permalink
Add system notification if output has been disabled
Browse files Browse the repository at this point in the history
Refs #741
  • Loading branch information
Jochen Schalanda committed Jan 21, 2015
1 parent 6119ca5 commit 52ef686
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public enum Type {
STREAM_PROCESSING_DISABLED,
GC_TOO_LONG,
JOURNAL_UTILIZATION_TOO_HIGH,
JOURNAL_UNCOMMITTED_MESSAGES_DELETED;
JOURNAL_UNCOMMITTED_MESSAGES_DELETED,
OUTPUT_DISABLED;

public static Type fromString(String name) {
return valueOf(name.toUpperCase());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@

import java.util.Map;

/**
* @author Dennis Oelkers <dennis@torch.sh>
*/
public interface Notification extends Persisted {
Notification addType(Type type);

Expand All @@ -43,6 +40,7 @@ public interface Notification extends Persisted {
String getNodeId();

Notification addDetail(String key, Object value);

Object getDetail(String key);

Map<String, Object> asMap();
Expand All @@ -63,7 +61,8 @@ public enum Type {
STREAM_PROCESSING_DISABLED,
GC_TOO_LONG,
JOURNAL_UTILIZATION_TOO_HIGH,
JOURNAL_UNCOMMITTED_MESSAGES_DELETED
JOURNAL_UNCOMMITTED_MESSAGES_DELETED,
OUTPUT_DISABLED
}

public enum Severity {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,20 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.UncheckedExecutionException;
import org.graylog2.Configuration;
import org.graylog2.database.NotFoundException;
import org.graylog2.notifications.Notification;
import org.graylog2.notifications.NotificationService;
import org.graylog2.plugin.outputs.MessageOutput;
import org.graylog2.plugin.outputs.MessageOutputConfigurationException;
import org.graylog2.plugin.streams.Output;
import org.graylog2.plugin.streams.Stream;
import org.graylog2.plugin.system.NodeId;
import org.graylog2.streams.OutputService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import java.util.Map;
import java.util.Set;
Expand All @@ -49,6 +52,8 @@ public class OutputRegistry {
private final Cache<String, MessageOutput> runningMessageOutputs;
private final MessageOutput defaultMessageOutput;
private final OutputService outputService;
private final NotificationService notificationService;
private final NodeId nodeId;
private final MessageOutputFactory messageOutputFactory;
private final LoadingCache<String, AtomicInteger> faultCounters;
private final long faultCountThreshold;
Expand All @@ -58,13 +63,18 @@ public class OutputRegistry {
public OutputRegistry(@DefaultMessageOutput MessageOutput defaultMessageOutput,
final OutputService outputService,
MessageOutputFactory messageOutputFactory,
Configuration configuration) {
NotificationService notificationService,
NodeId nodeId,
@Named("output_fault_count_threshold") long faultCountThreshold,
@Named("output_fault_penalty_seconds") long faultPenaltySeconds) {
this.defaultMessageOutput = defaultMessageOutput;
this.outputService = outputService;
this.notificationService = notificationService;
this.nodeId = nodeId;
this.messageOutputFactory = messageOutputFactory;
this.runningMessageOutputs = CacheBuilder.newBuilder().build();
this.faultCountThreshold = configuration.getOutputFaultCountThreshold();
this.faultPenaltySeconds = configuration.getOutputFaultPenaltySeconds();
this.faultCountThreshold = faultCountThreshold;
this.faultPenaltySeconds = faultPenaltySeconds;
this.faultCounters = CacheBuilder.newBuilder()
.expireAfterWrite(this.faultPenaltySeconds, TimeUnit.SECONDS)
.build(new CacheLoader<String, AtomicInteger>() {
Expand All @@ -91,13 +101,26 @@ public MessageOutput getOutputForIdAndStream(String id, Stream stream) {
if (!(e.getCause() instanceof NotFoundException)) {
final int number = faultCount.addAndGet(1);
LOG.error("Unable to fetch output {}, fault #{}: ", id, number, e);
if (number >= faultCountThreshold)
if (number >= faultCountThreshold) {
LOG.error("Output {} has crossed threshold of {} faults in {} seconds. Disabling for {} seconds.",
id,
faultCountThreshold,
faultPenaltySeconds,
faultPenaltySeconds
);

final Notification notification = notificationService.buildNow()
.addType(Notification.Type.OUTPUT_DISABLED)
.addSeverity(Notification.Severity.NORMAL)
.addNode(nodeId.toString())
.addDetail("outputId", id)
.addDetail("streamId", stream.getId())
.addDetail("streamTitle", stream.getTitle())
.addDetail("faultCount", number)
.addDetail("faultCountThreshold", faultCountThreshold)
.addDetail("faultPenaltySeconds", faultPenaltySeconds);
notificationService.publishIfFirst(notification);
}
}
}
return null;
Expand Down

0 comments on commit 52ef686

Please sign in to comment.