Skip to content

Commit

Permalink
inital version of throughput calculcation and exposing it via metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
kroepke committed Mar 30, 2015
1 parent 927650e commit f631326
Show file tree
Hide file tree
Showing 13 changed files with 242 additions and 72 deletions.
Expand Up @@ -24,7 +24,6 @@
import io.airlift.airline.Command; import io.airlift.airline.Command;
import io.airlift.airline.Option; import io.airlift.airline.Option;
import org.graylog2.Configuration; import org.graylog2.Configuration;
import org.graylog2.shared.ServerVersion;
import org.graylog2.UI; import org.graylog2.UI;
import org.graylog2.bindings.AlarmCallbackBindings; import org.graylog2.bindings.AlarmCallbackBindings;
import org.graylog2.bindings.InitializerBindings; import org.graylog2.bindings.InitializerBindings;
Expand Down Expand Up @@ -82,9 +81,6 @@ public Server() {
@Option(name = {"-l", "--local"}, description = "Run Graylog in local mode. Only interesting for Graylog developers.") @Option(name = {"-l", "--local"}, description = "Run Graylog in local mode. Only interesting for Graylog developers.")
private boolean local = false; private boolean local = false;


@Option(name = {"-s", "--statistics"}, description = "Print utilization statistics to STDOUT")
private boolean stats = false;

@Option(name = {"-r", "--no-retention"}, description = "Do not automatically remove messages from index that are older than the retention time") @Option(name = {"-r", "--no-retention"}, description = "Do not automatically remove messages from index that are older than the retention time")
private boolean noRetention = false; private boolean noRetention = false;


Expand All @@ -96,10 +92,6 @@ public boolean isLocal() {
return local; return local;
} }


public boolean isStats() {
return stats;
}

public boolean performRetention() { public boolean performRetention() {
return !noRetention; return !noRetention;
} }
Expand Down
@@ -0,0 +1,35 @@
/**
* The MIT License
* Copyright (c) 2012 Graylog, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package org.graylog2.plugin;

import static com.codahale.metrics.MetricRegistry.name;

public class GlobalMetricNames {

public static final String INPUT_THROUGHPUT = "org.graylog.throughput.input";
public static final String INPUT_THROUGHPUT_RATE = name(INPUT_THROUGHPUT, "1-sec-rate");

public static final String OUTPUT_THROUGHPUT = "org.graylog.throughput.output";
public static final String OUTPUT_THROUGHPUT_RATE = name(OUTPUT_THROUGHPUT, "1-sec-rate");

}
Expand Up @@ -25,14 +25,14 @@
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.eventbus.EventBus; import com.google.common.eventbus.EventBus;
import com.google.common.util.concurrent.Uninterruptibles; import com.google.common.util.concurrent.Uninterruptibles;
import javax.inject.Inject;
import org.graylog2.plugin.lifecycles.Lifecycle; import org.graylog2.plugin.lifecycles.Lifecycle;
import org.graylog2.plugin.system.NodeId; import org.graylog2.plugin.system.NodeId;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.DateTimeZone; import org.joda.time.DateTimeZone;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import javax.inject.Inject;
import javax.inject.Singleton; import javax.inject.Singleton;
import java.util.Arrays; import java.util.Arrays;
import java.util.Set; import java.util.Set;
Expand All @@ -51,7 +51,6 @@ public enum Capability {
SERVER, SERVER,
RADIO, RADIO,
MASTER, MASTER,
STATSMODE,
LOCALMODE LOCALMODE
} }


Expand Down Expand Up @@ -203,14 +202,6 @@ public void unlockProcessingPause() {
processingPauseLocked.set(false); processingPauseLocked.set(false);
} }


public void setStatsMode(boolean statsMode) {
if (statsMode) {
addCapability(Capability.STATSMODE);
} else {
removeCapability(Capability.STATSMODE);
}
}

private ServerStatus removeCapability(Capability capability) { private ServerStatus removeCapability(Capability capability) {
this.capabilitySet.remove(capability); this.capabilitySet.remove(capability);
return this; return this;
Expand Down
Expand Up @@ -22,12 +22,14 @@
*/ */
package org.graylog2.plugin.inputs; package org.graylog2.plugin.inputs;


import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter; import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.MetricSet; import com.codahale.metrics.MetricSet;
import com.fasterxml.jackson.annotation.JsonValue; import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import org.graylog2.plugin.AbstractDescriptor; import org.graylog2.plugin.AbstractDescriptor;
import org.graylog2.plugin.GlobalMetricNames;
import org.graylog2.plugin.LocalMetricRegistry; import org.graylog2.plugin.LocalMetricRegistry;
import org.graylog2.plugin.ServerStatus; import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.Stoppable; import org.graylog2.plugin.Stoppable;
Expand Down Expand Up @@ -83,6 +85,7 @@ public abstract class MessageInput implements Stoppable {
* avoid serialising those parts of the configuration in order to save bytes on disk/network. * avoid serialising those parts of the configuration in order to save bytes on disk/network.
*/ */
private final Configuration codecConfig; private final Configuration codecConfig;
private final Counter globalIncomingMessages;


protected String title; protected String title;
protected String creatorUserId; protected String creatorUserId;
Expand Down Expand Up @@ -117,6 +120,7 @@ public MessageInput(MetricRegistry metricRegistry,
this.codecConfig = config.codecConfig.getRequestedConfiguration().filter(codec.getConfiguration()); this.codecConfig = config.codecConfig.getRequestedConfiguration().filter(codec.getConfiguration());
rawSize = localRegistry.meter("rawSize"); rawSize = localRegistry.meter("rawSize");
incomingMessages = localRegistry.meter("incomingMessages"); incomingMessages = localRegistry.meter("incomingMessages");
globalIncomingMessages = metricRegistry.counter(GlobalMetricNames.INPUT_THROUGHPUT);
} }


public static long getDefaultRecvBufferSize() { public static long getDefaultRecvBufferSize() {
Expand Down Expand Up @@ -324,6 +328,7 @@ public void processRawMessage(RawMessage rawMessage) {
inputBuffer.insert(rawMessage); inputBuffer.insert(rawMessage);


incomingMessages.mark(); incomingMessages.mark();
globalIncomingMessages.inc();
rawSize.mark(rawMessage.getPayload().length); rawSize.mark(rawMessage.getPayload().length);
} }


Expand Down
Expand Up @@ -177,8 +177,8 @@ public void testAddCapability() throws Exception {


@Test @Test
public void testAddCapabilities() throws Exception { public void testAddCapabilities() throws Exception {
assertEquals(status.addCapabilities(ServerStatus.Capability.LOCALMODE, ServerStatus.Capability.STATSMODE), status); assertEquals(status.addCapabilities(ServerStatus.Capability.LOCALMODE), status);
assertTrue(status.hasCapabilities(ServerStatus.Capability.MASTER, ServerStatus.Capability.LOCALMODE, ServerStatus.Capability.STATSMODE)); assertTrue(status.hasCapabilities(ServerStatus.Capability.MASTER, ServerStatus.Capability.LOCALMODE));
} }


@Test @Test
Expand Down Expand Up @@ -236,15 +236,6 @@ public void testUnlockProcessingPause() throws Exception {
assertFalse(status.processingPauseLocked()); assertFalse(status.processingPauseLocked());
} }


@Test
public void testSetStatsMode() throws Exception {
status.setStatsMode(false);
assertFalse(status.hasCapability(ServerStatus.Capability.STATSMODE));

status.setStatsMode(true);
assertTrue(status.hasCapability(ServerStatus.Capability.STATSMODE));
}

@Test @Test
public void testSetLocalMode() throws Exception { public void testSetLocalMode() throws Exception {
status.setLocalMode(false); status.setLocalMode(false);
Expand Down
Expand Up @@ -16,15 +16,14 @@
*/ */
package org.graylog2.buffers.processors; package org.graylog2.buffers.processors;


import com.codahale.metrics.Histogram; import com.codahale.metrics.Counter;
import com.codahale.metrics.InstrumentedExecutorService; import com.codahale.metrics.InstrumentedExecutorService;
import com.codahale.metrics.Meter; import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer; import com.codahale.metrics.Timer;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles; import com.google.common.util.concurrent.Uninterruptibles;
import javax.inject.Inject;
import com.lmax.disruptor.WorkHandler; import com.lmax.disruptor.WorkHandler;
import org.graylog2.Configuration; import org.graylog2.Configuration;
import org.graylog2.outputs.DefaultMessageOutput; import org.graylog2.outputs.DefaultMessageOutput;
Expand All @@ -33,10 +32,12 @@
import org.graylog2.plugin.ServerStatus; import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.buffers.MessageEvent; import org.graylog2.plugin.buffers.MessageEvent;
import org.graylog2.plugin.outputs.MessageOutput; import org.graylog2.plugin.outputs.MessageOutput;
import org.graylog2.plugin.GlobalMetricNames;
import org.graylog2.shared.stats.ThroughputStats; import org.graylog2.shared.stats.ThroughputStats;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import javax.inject.Inject;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
Expand All @@ -52,6 +53,9 @@ public class OutputBufferProcessor implements WorkHandler<MessageEvent> {


private static final Logger LOG = LoggerFactory.getLogger(OutputBufferProcessor.class); private static final Logger LOG = LoggerFactory.getLogger(OutputBufferProcessor.class);


public static final String INCOMING_MESSAGES_METRICNAME = name(OutputBufferProcessor.class, "incomingMessages");
public static final String PROCESS_TIME_METRICNAME = name(OutputBufferProcessor.class, "processTime");

private final ExecutorService executor; private final ExecutorService executor;


private final Configuration configuration; private final Configuration configuration;
Expand All @@ -61,7 +65,7 @@ public class OutputBufferProcessor implements WorkHandler<MessageEvent> {
//private List<Message> buffer = Lists.newArrayList(); //private List<Message> buffer = Lists.newArrayList();


private final Meter incomingMessages; private final Meter incomingMessages;
private final Histogram batchSize; private final Counter outputThroughput;
private final Timer processTime; private final Timer processTime;


private final OutputRouter outputRouter; private final OutputRouter outputRouter;
Expand All @@ -86,9 +90,9 @@ public OutputBufferProcessor(Configuration configuration,
final int keepAliveTime = configuration.getOutputBufferProcessorKeepAliveTime(); final int keepAliveTime = configuration.getOutputBufferProcessorKeepAliveTime();
this.executor = executorService(metricRegistry, nameFormat, corePoolSize, maxPoolSize, keepAliveTime); this.executor = executorService(metricRegistry, nameFormat, corePoolSize, maxPoolSize, keepAliveTime);


this.incomingMessages = metricRegistry.meter(name(OutputBufferProcessor.class, "incomingMessages")); this.incomingMessages = metricRegistry.meter(INCOMING_MESSAGES_METRICNAME);
this.batchSize = metricRegistry.histogram(name(OutputBufferProcessor.class, "batchSize")); this.outputThroughput = metricRegistry.counter(GlobalMetricNames.OUTPUT_THROUGHPUT);
this.processTime = metricRegistry.timer(name(OutputBufferProcessor.class, "processTime")); this.processTime = metricRegistry.timer(PROCESS_TIME_METRICNAME);
} }


private ExecutorService executorService(final MetricRegistry metricRegistry, final String nameFormat, private ExecutorService executorService(final MetricRegistry metricRegistry, final String nameFormat,
Expand Down Expand Up @@ -158,11 +162,9 @@ public void onEvent(MessageEvent event) throws Exception {
if (msg.hasRecordings()) { if (msg.hasRecordings()) {
LOG.debug("Message event trace: {}", msg.recordingsAsString()); LOG.debug("Message event trace: {}", msg.recordingsAsString());
} }
if (serverStatus.hasCapability(ServerStatus.Capability.STATSMODE)) {
throughputStats.getBenchmarkCounter().increment();
}


throughputStats.getThroughputCounter().increment(); throughputStats.getThroughputCounter().increment();
outputThroughput.inc();


LOG.debug("Wrote message <{}> to all outputs. Finished handling.", msg.getId()); LOG.debug("Wrote message <{}> to all outputs. Finished handling.", msg.getId());
} }
Expand Down
Expand Up @@ -22,7 +22,6 @@
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import javax.inject.Inject;
import com.google.inject.assistedinject.Assisted; import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject; import com.google.inject.assistedinject.AssistedInject;
import org.graylog2.indexer.messages.Messages; import org.graylog2.indexer.messages.Messages;
Expand All @@ -35,13 +34,17 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import javax.inject.Inject;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;


import static com.codahale.metrics.MetricRegistry.name; import static com.codahale.metrics.MetricRegistry.name;


public class ElasticSearchOutput implements MessageOutput { public class ElasticSearchOutput implements MessageOutput {
public static final String WRITES_METRICNAME = name(ElasticSearchOutput.class, "writes");
public static final String PROCESS_TIME_METRICNAME = name(ElasticSearchOutput.class, "processTime");

private static final String NAME = "ElasticSearch Output"; private static final String NAME = "ElasticSearch Output";
private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchOutput.class); private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchOutput.class);


Expand All @@ -67,8 +70,8 @@ public ElasticSearchOutput(MetricRegistry metricRegistry,
this.messages = messages; this.messages = messages;
this.journal = journal; this.journal = journal;
// Only constructing metrics here. write() get's another Core reference. (because this technically is a plugin) // Only constructing metrics here. write() get's another Core reference. (because this technically is a plugin)
this.writes = metricRegistry.meter(name(ElasticSearchOutput.class, "writes")); this.writes = metricRegistry.meter(WRITES_METRICNAME);
this.processTime = metricRegistry.timer(name(ElasticSearchOutput.class, "processTime")); this.processTime = metricRegistry.timer(PROCESS_TIME_METRICNAME);


// Should be set in initialize once this becomes a real plugin. // Should be set in initialize once this becomes a real plugin.
isRunning.set(true); isRunning.set(true);
Expand Down

0 comments on commit f631326

Please sign in to comment.