Skip to content

Commit

Permalink
Measure buffer sizes
Browse files Browse the repository at this point in the history
A set of AtomicInteger bound to the Core object that are incremented and decremented on buffer read and write allow us to indicate current utilization of the buffers. Using Metrics was not an option here, because the most precise calculation (one minute average) was way to deferred. The new CLI parameter -s/--statistics enables periodical printing of buffer utilization to STDOUT. Next step is to write this information to MongoDB and display it in the web interface.

fixes #SERVER-199
  • Loading branch information
Lennart Koopmann committed Nov 5, 2012
1 parent 9a498a2 commit c3bc81f
Show file tree
Hide file tree
Showing 12 changed files with 239 additions and 8 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -13,9 +13,11 @@ build.xml
nbactions.xml nbactions.xml


misc/elasticsearch.yml.2 misc/elasticsearch.yml.2
misc/elasticsearch.yml.dev
misc/convert_rsyslog_db_to_mongo.rb misc/convert_rsyslog_db_to_mongo.rb


plugin/*/*.jar plugin/*/*.jar
*_gl2plugin.jar


build_script/builds/* build_script/builds/*
build_script/logs/* build_script/logs/*
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/org/graylog2/CommandLineArguments.java
Expand Up @@ -31,6 +31,9 @@ public class CommandLineArguments {


@Parameter(names = {"-l", "--local"}, description = "Run graylog2 in local mode. Only interesting for Graylog2 developers.") @Parameter(names = {"-l", "--local"}, description = "Run graylog2 in local mode. Only interesting for Graylog2 developers.")
private boolean local = false; private boolean local = false;

@Parameter(names = {"-s", "--statistics"}, description = "Print utilization statistics to STDOUT")
private boolean stats = false;


@Parameter(names = {"-r", "--no-retention"}, description = "Do not automatically remove messages from index that are older than the retention time") @Parameter(names = {"-r", "--no-retention"}, description = "Do not automatically remove messages from index that are older than the retention time")
private boolean noRetention = false; private boolean noRetention = false;
Expand Down Expand Up @@ -80,6 +83,10 @@ public boolean isDebug() {
public boolean isLocal() { public boolean isLocal() {
return local; return local;
} }

public boolean isStats() {
return stats;
}


public void setDebug(boolean debug) { public void setDebug(boolean debug) {
this.debug = debug; this.debug = debug;
Expand Down
22 changes: 20 additions & 2 deletions src/main/java/org/graylog2/Core.java
Expand Up @@ -37,6 +37,7 @@
import org.graylog2.streams.StreamCache; import org.graylog2.streams.StreamCache;


import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.graylog2.activities.Activity; import org.graylog2.activities.Activity;
import org.graylog2.activities.ActivityWriter; import org.graylog2.activities.ActivityWriter;
Expand Down Expand Up @@ -88,10 +89,10 @@ public class Core implements GraylogServer {
private List<MessageFilter> filters = Lists.newArrayList(); private List<MessageFilter> filters = Lists.newArrayList();
private List<MessageOutput> outputs = Lists.newArrayList(); private List<MessageOutput> outputs = Lists.newArrayList();


private int loadedFilterPlugins = 0;

private ProcessBuffer processBuffer; private ProcessBuffer processBuffer;
private OutputBuffer outputBuffer; private OutputBuffer outputBuffer;
private AtomicInteger outputBufferWatermark = new AtomicInteger();
private AtomicInteger processBufferWatermark = new AtomicInteger();


private Deflector deflector; private Deflector deflector;


Expand All @@ -100,6 +101,7 @@ public class Core implements GraylogServer {
private String serverId; private String serverId;


private boolean localMode = false; private boolean localMode = false;
private boolean statsMode = false;


public void initialize(Configuration configuration) { public void initialize(Configuration configuration) {
serverId = Tools.generateServerId(); serverId = Tools.generateServerId();
Expand Down Expand Up @@ -289,6 +291,14 @@ public Buffer getProcessBuffer() {
public Buffer getOutputBuffer() { public Buffer getOutputBuffer() {
return this.outputBuffer; return this.outputBuffer;
} }

public AtomicInteger outputBufferWatermark() {
return outputBufferWatermark;
}

public AtomicInteger processBufferWatermark() {
return processBufferWatermark;
}


public List<MessageInput> getInputs() { public List<MessageInput> getInputs() {
return this.inputs; return this.inputs;
Expand Down Expand Up @@ -340,4 +350,12 @@ public boolean isLocalMode() {
return localMode; return localMode;
} }


public void setStatsMode(boolean mode) {
this.statsMode = mode;
}

public boolean isStatsMode() {
return statsMode;
}

} }
8 changes: 8 additions & 0 deletions src/main/java/org/graylog2/Main.java
Expand Up @@ -131,11 +131,18 @@ public static void main(String[] args) {
configuration.setIsMaster(false); configuration.setIsMaster(false);
} }


// Enable local mode?
if (commandLineArguments.isLocal() || commandLineArguments.isDebug()) { if (commandLineArguments.isLocal() || commandLineArguments.isDebug()) {
// In local mode, systemstats are sent to localhost for example. // In local mode, systemstats are sent to localhost for example.
LOG.info("Running in local mode"); LOG.info("Running in local mode");
server.setLocalMode(true); server.setLocalMode(true);
} }

// Are we in stats mode?
if (commandLineArguments.isStats()) {
LOG.info("Printing system utilization information.");
server.setStatsMode(true);
}


// Register initializers. // Register initializers.
server.registerInitializer(new ServerValueWriterInitializer(server, configuration)); server.registerInitializer(new ServerValueWriterInitializer(server, configuration));
Expand All @@ -152,6 +159,7 @@ public static void main(String[] args) {
if (configuration.isAmqpEnabled()) { if (configuration.isAmqpEnabled()) {
server.registerInitializer(new AMQPSyncInitializer(server)); server.registerInitializer(new AMQPSyncInitializer(server));
} }
server.registerInitializer(new BufferWatermarkInitializer(server));


// Register inputs. // Register inputs.
if (configuration.isUseGELF()) { if (configuration.isUseGELF()) {
Expand Down
48 changes: 48 additions & 0 deletions src/main/java/org/graylog2/buffers/BufferWatermark.java
@@ -0,0 +1,48 @@
/**
* Copyright 2012 Lennart Koopmann <lennart@socketfeed.com>
*
* This file is part of Graylog2.
*
* Graylog2 is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog2 is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog2. If not, see <http://www.gnu.org/licenses/>.
*
*/
package org.graylog2.buffers;

import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Logger;

/**
* @author Lennart Koopmann <lennart@socketfeed.com>
*/
public class BufferWatermark {

private static final Logger LOG = Logger.getLogger(BufferWatermark.class);

private final int bufferSize;
private final AtomicInteger watermark;

public BufferWatermark(int bufferSize, AtomicInteger watermark) {
this.bufferSize = bufferSize;
this.watermark = watermark;
}

public int getUtilization() {
return watermark.get();
}

public float getUtilizationPercentage() {
return getUtilization()/bufferSize*100;
}

}
10 changes: 9 additions & 1 deletion src/main/java/org/graylog2/buffers/OutputBuffer.java
Expand Up @@ -24,12 +24,14 @@
import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SleepingWaitStrategy; import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.Disruptor;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Meter;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.graylog2.Core; import org.graylog2.Core;
import org.graylog2.buffers.processors.OutputBufferProcessor; import org.graylog2.buffers.processors.OutputBufferProcessor;
import org.graylog2.plugin.GraylogServer;
import org.graylog2.plugin.buffers.Buffer; import org.graylog2.plugin.buffers.Buffer;
import org.graylog2.plugin.logmessage.LogMessage; import org.graylog2.plugin.logmessage.LogMessage;


Expand All @@ -45,7 +47,10 @@ public class OutputBuffer implements Buffer {
.namingPattern("outputbufferprocessor-%d") .namingPattern("outputbufferprocessor-%d")
.build() .build()
); );

Core server; Core server;

private final Meter incomingMessages = Metrics.newMeter(OutputBuffer.class, "InsertedMessages", "messages", TimeUnit.SECONDS);


public OutputBuffer(Core server) { public OutputBuffer(Core server) {
this.server = server; this.server = server;
Expand Down Expand Up @@ -76,6 +81,9 @@ public void insert(LogMessage message) {
LogMessageEvent event = ringBuffer.get(sequence); LogMessageEvent event = ringBuffer.get(sequence);
event.setMessage(message); event.setMessage(message);
ringBuffer.publish(sequence); ringBuffer.publish(sequence);

server.outputBufferWatermark().incrementAndGet();
incomingMessages.mark();
} }


} }
2 changes: 2 additions & 0 deletions src/main/java/org/graylog2/buffers/ProcessBuffer.java
Expand Up @@ -78,6 +78,8 @@ public void insert(LogMessage message) {
LogMessageEvent event = ringBuffer.get(sequence); LogMessageEvent event = ringBuffer.get(sequence);
event.setMessage(message); event.setMessage(message);
ringBuffer.publish(sequence); ringBuffer.publish(sequence);

server.processBufferWatermark().incrementAndGet();
} }


} }
Expand Up @@ -45,6 +45,7 @@ public class OutputBufferProcessor implements EventHandler<LogMessageEvent> {


private List<LogMessage> buffer = Lists.newArrayList(); private List<LogMessage> buffer = Lists.newArrayList();
private final Meter incomingMessages = Metrics.newMeter(OutputBufferProcessor.class, "IncomingMessages", "messages", TimeUnit.SECONDS); private final Meter incomingMessages = Metrics.newMeter(OutputBufferProcessor.class, "IncomingMessages", "messages", TimeUnit.SECONDS);

private final Histogram batchSize = Metrics.newHistogram(OutputBufferProcessor.class, "BatchSize"); private final Histogram batchSize = Metrics.newHistogram(OutputBufferProcessor.class, "BatchSize");


private final long ordinal; private final long ordinal;
Expand All @@ -63,6 +64,7 @@ public void onEvent(LogMessageEvent event, long sequence, boolean endOfBatch) th
return; return;
} }


server.outputBufferWatermark().decrementAndGet();
incomingMessages.mark(); incomingMessages.mark();


LogMessage msg = event.getMessage(); LogMessage msg = event.getMessage();
Expand Down
Expand Up @@ -63,6 +63,8 @@ public void onEvent(LogMessageEvent event, long sequence, boolean endOfBatch) th
return; return;
} }


server.processBufferWatermark().decrementAndGet();

incomingMessages.mark(); incomingMessages.mark();
incomingMessagesPerMinute.mark(); incomingMessagesPerMinute.mark();
TimerContext tcx = processTime.time(); TimerContext tcx = processTime.time();
Expand Down
24 changes: 19 additions & 5 deletions src/main/java/org/graylog2/initializers/AMQPSyncInitializer.java
@@ -1,15 +1,29 @@
/* /**
* To change this template, choose Tools | Templates * Copyright 2012 Lennart Koopmann <lennart@socketfeed.com>
* and open the template in the editor. *
* This file is part of Graylog2.
*
* Graylog2 is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog2 is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog2. If not, see <http://www.gnu.org/licenses/>.
*
*/ */
package org.graylog2.initializers; package org.graylog2.initializers;


import org.graylog2.Core; import org.graylog2.Core;
import org.graylog2.periodical.AMQPSyncThread; import org.graylog2.periodical.AMQPSyncThread;


/** /**
* * @author Lennart Koopmann <lennart@socketfeed.com>
* @author lennart.koopmann
*/ */
public class AMQPSyncInitializer extends SimpleFixedRateScheduleInitializer implements Initializer { public class AMQPSyncInitializer extends SimpleFixedRateScheduleInitializer implements Initializer {


Expand Down
@@ -0,0 +1,47 @@
/**
* Copyright 2012 Lennart Koopmann <lennart@socketfeed.com>
*
* This file is part of Graylog2.
*
* Graylog2 is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog2 is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog2. If not, see <http://www.gnu.org/licenses/>.
*
*/
package org.graylog2.initializers;

import org.graylog2.Core;
import org.graylog2.periodical.BufferWatermarkThread;

/**
* @author Lennart Koopmann <lennart@socketfeed.com>
*/
public class BufferWatermarkInitializer extends SimpleFixedRateScheduleInitializer implements Initializer {

public BufferWatermarkInitializer(Core graylogServer) {
this.graylogServer = graylogServer;
}

@Override
public void initialize() {
configureScheduler(
new BufferWatermarkThread(this.graylogServer),
BufferWatermarkThread.INITIAL_DELAY,
BufferWatermarkThread.PERIOD
);
}

@Override
public boolean masterOnly() {
return true;
}
}
73 changes: 73 additions & 0 deletions src/main/java/org/graylog2/periodical/BufferWatermarkThread.java
@@ -0,0 +1,73 @@
/**
* Copyright 2012 Lennart Koopmann <lennart@socketfeed.com>
*
* This file is part of Graylog2.
*
* Graylog2 is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog2 is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog2. If not, see <http://www.gnu.org/licenses/>.
*
*/
package org.graylog2.periodical;

import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Logger;
import org.graylog2.Core;
import org.graylog2.buffers.BufferWatermark;
import org.joda.time.DateTime;

/**
* @author Lennart Koopmann <lennart@socketfeed.com>
*/
public class BufferWatermarkThread implements Runnable {

private static final Logger LOG = Logger.getLogger(BufferWatermarkThread.class);

public static final int INITIAL_DELAY = 0;
public static final int PERIOD = 5;

private final Core graylogServer;

public BufferWatermarkThread(Core graylogServer) {
this.graylogServer = graylogServer;
}

@Override
public void run() {
checkValidity(graylogServer.processBufferWatermark());
checkValidity(graylogServer.outputBufferWatermark());

int ringSize = graylogServer.getConfiguration().getRingSize();

BufferWatermark oWm = new BufferWatermark(ringSize, graylogServer.outputBufferWatermark());

BufferWatermark pWm = new BufferWatermark(ringSize, graylogServer.processBufferWatermark());

if (graylogServer.isStatsMode()) {
DateTime now = new DateTime();
System.out.println("[util] [" + now + "] OutputBuffer is at "
+ oWm.getUtilizationPercentage() + "%. [" + oWm.getUtilization() + "/" + ringSize +"]");
System.out.println("[util] [" + now + "] ProcessBuffer is at "
+ pWm.getUtilizationPercentage() + "%. [" + pWm.getUtilization() + "/" + ringSize +"]");
}
}

private void checkValidity(AtomicInteger watermark) {
// This should never happen, but just to make sure...
int x = watermark.get();
if (x < 0) {
LOG.warn("Reset a watermark to 0 because it was <" + x + ">");
watermark.set(0);
}
}

}

0 comments on commit c3bc81f

Please sign in to comment.