Skip to content

Commit

Permalink
move throttle state updater out of KafkaJournal
Browse files Browse the repository at this point in the history
* our bindings aren't set up for instantiating a processbuffer properly for the command line tools
* moved the event bus publishing of throttle state into the periodical
* periodical only runs if journalling is enabled
* removed some dependencies from KafkaJournal and mode stuff local where possible
  • Loading branch information
kroepke committed Jan 2, 2015
1 parent a8a2d39 commit ffdbb70
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 131 deletions.
Expand Up @@ -29,6 +29,7 @@
import org.graylog2.periodical.IndexerClusterCheckerThread;
import org.graylog2.periodical.NodePingThread;
import org.graylog2.periodical.StreamThroughputCounterManagerThread;
import org.graylog2.periodical.ThrottleStateUpdaterThread;
import org.graylog2.periodical.VersionCheckThread;
import org.graylog2.plugin.periodical.Periodical;

Expand All @@ -48,5 +49,6 @@ protected void configure() {
periodicalBinder.addBinding().to(NodePingThread.class);
periodicalBinder.addBinding().to(StreamThroughputCounterManagerThread.class);
periodicalBinder.addBinding().to(VersionCheckThread.class);
periodicalBinder.addBinding().to(ThrottleStateUpdaterThread.class);
}
}
@@ -0,0 +1,162 @@
/**
* This file is part of Graylog2.
*
* Graylog2 is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog2 is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog2. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog2.periodical;

import com.github.joschi.jadconfig.util.Size;
import com.google.common.eventbus.EventBus;
import org.graylog2.plugin.ThrottleState;
import org.graylog2.plugin.periodical.Periodical;
import org.graylog2.shared.buffers.ProcessBuffer;
import org.graylog2.shared.journal.Journal;
import org.graylog2.shared.journal.KafkaJournal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import javax.inject.Named;

/**
* The ThrottleStateUpdater publishes the current state buffer state of the journal to other interested parties,
* chiefly the ThrottleableTransports.
* <p/>
* <p>
* It only includes the necessary information to make a decision about whether to throttle parts of the system,
* but does not send "throttle" commands. This allows for a flexible approach in picking a throttling strategy.
* </p>
* <p>
* The implementation expects to be called once per second to have a rough estimate about the events per second,
* over the last second.
* </p>
*/
public class ThrottleStateUpdaterThread extends Periodical {
private static final Logger log = LoggerFactory.getLogger(ThrottleStateUpdaterThread.class);
private final KafkaJournal journal;
private final ProcessBuffer processBuffer;
private final EventBus eventBus;
private final Size retentionSize;

private boolean firstRun = true;

private long logStartOffset;
private long logEndOffset;
private long previousLogEndOffset;
private long previousReadOffset;
private long currentReadOffset;
private long currentTs;
private long prevTs;

@Inject
public ThrottleStateUpdaterThread(Journal journal,
ProcessBuffer processBuffer,
EventBus eventBus,
@Named("message_journal_max_size") Size retentionSize) {
this.processBuffer = processBuffer;
this.eventBus = eventBus;
this.retentionSize = retentionSize;
// leave this.journal null, we'll say "don't start" in that case, see startOnThisNode() below.
if (journal instanceof KafkaJournal) {
this.journal = (KafkaJournal) journal;
} else {
this.journal = null;
}
}

@Override
public boolean runsForever() {
return false;
}

@Override
public boolean stopOnGracefulShutdown() {
return true;
}

@Override
public boolean masterOnly() {
return false;
}

@Override
public boolean startOnThisNode() {
// don't start if we don't have the KafkaJournal
return journal != null;
}

@Override
public boolean isDaemon() {
return true;
}

@Override
public int getInitialDelaySeconds() {
return 1;
}

@Override
public int getPeriodSeconds() {
return 1;
}

@Override
protected Logger getLogger() {
return log;
}

@Override
public void doRun() {
final ThrottleState throttleState = new ThrottleState();
final long committedOffset = journal.getCommittedOffset();

prevTs = currentTs;
currentTs = System.nanoTime();

previousLogEndOffset = logEndOffset;
previousReadOffset = currentReadOffset;
logStartOffset = journal.getLogStartOffset();
logEndOffset = journal.getLogEndOffset() - 1; // -1 because getLogEndOffset is the next offset that gets assigned
currentReadOffset = journal.getNextReadOffset() - 1; // just to make it clear which field we read

// for the first run, don't send an update, there's no previous data available to calc rates
if (firstRun) {
firstRun = false;
return;
}

throttleState.appendEventsPerSec = (long) Math.floor((logEndOffset - previousLogEndOffset) / ((currentTs - prevTs) / 1.0E09));
throttleState.readEventsPerSec = (long) Math.floor((currentReadOffset - previousReadOffset) / ((currentTs - prevTs) / 1.0E09));

throttleState.journalSize = journal.size();
throttleState.journalSizeLimit = retentionSize.toBytes();

throttleState.processBufferCapacity = processBuffer.getRemainingCapacity();

if (committedOffset == KafkaJournal.DEFAULT_COMMITTED_OFFSET) {
// nothing committed at all, the entire log is uncommitted, or completely empty.
throttleState.uncommittedJournalEntries = journal.size() == 0 ? 0 : logEndOffset - logStartOffset;
} else {
throttleState.uncommittedJournalEntries = logEndOffset - committedOffset;
}
log.debug("ThrottleState: {}", throttleState);

// the journal needs this to provide information to rest clients
journal.setThrottleState(throttleState);

// publish to interested parties
eventBus.post(throttleState);

}
}
Expand Up @@ -20,11 +20,9 @@
import com.github.joschi.jadconfig.util.Size;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.eventbus.EventBus;
import kafka.log.LogSegment;
import org.graylog2.Graylog2BaseTest;
import org.graylog2.plugin.InstantMillisProvider;
import org.graylog2.shared.buffers.ProcessBuffer;
import org.joda.time.DateTime;
import org.joda.time.DateTimeUtils;
import org.joda.time.Duration;
Expand All @@ -43,15 +41,8 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;

import static com.google.common.base.Charsets.UTF_8;
import static org.apache.commons.io.filefilter.FileFilterUtils.and;
import static org.apache.commons.io.filefilter.FileFilterUtils.directoryFileFilter;
import static org.apache.commons.io.filefilter.FileFilterUtils.fileFileFilter;
import static org.apache.commons.io.filefilter.FileFilterUtils.nameFileFilter;
import static org.apache.commons.io.filefilter.FileFilterUtils.suffixFileFilter;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.apache.commons.io.filefilter.FileFilterUtils.*;
import static org.testng.Assert.*;

public class KafkaJournalTest extends Graylog2BaseTest {
private static final int BULK_SIZE = 200;
Expand Down Expand Up @@ -90,9 +81,7 @@ public void writeAndRead() throws IOException {
Duration.standardHours(1),
1_000_000,
Duration.standardMinutes(1),
new EventBus(),
new MetricRegistry(),
mock(ProcessBuffer.class));
new MetricRegistry());

final byte[] idBytes = "id".getBytes(UTF_8);
final byte[] messageBytes = "message".getBytes(UTF_8);
Expand All @@ -115,9 +104,7 @@ public void readAtLeastOne() throws Exception {
Duration.standardHours(1),
1_000_000,
Duration.standardMinutes(1),
new EventBus(),
new MetricRegistry(),
mock(ProcessBuffer.class));
new MetricRegistry());

final byte[] idBytes = "id".getBytes(UTF_8);
final byte[] messageBytes = "message1".getBytes(UTF_8);
Expand Down Expand Up @@ -163,9 +150,7 @@ public void segmentRotation() throws Exception {
Duration.standardDays(1),
1_000_000,
Duration.standardMinutes(1),
new EventBus(),
new MetricRegistry(),
mock(ProcessBuffer.class));
new MetricRegistry());

createBulkChunks(journal, 3);

Expand All @@ -191,9 +176,7 @@ public void segmentSizeCleanup() throws Exception {
Duration.standardDays(1),
1_000_000,
Duration.standardMinutes(1),
new EventBus(),
new MetricRegistry(),
mock(ProcessBuffer.class));
new MetricRegistry());
final File messageJournalDir = new File(journalDirectory, "messagejournal-0");
assertTrue(messageJournalDir.exists());

Expand Down Expand Up @@ -227,9 +210,7 @@ public void segmentAgeCleanup() throws Exception {
Duration.standardMinutes(1),
1_000_000,
Duration.standardMinutes(1),
new EventBus(),
new MetricRegistry(),
mock(ProcessBuffer.class));
new MetricRegistry());
final File messageJournalDir = new File(journalDirectory, "messagejournal-0");
assertTrue(messageJournalDir.exists());

Expand Down Expand Up @@ -280,9 +261,7 @@ public void segmentCommittedCleanup() throws Exception {
Duration.standardDays(1),
1_000_000,
Duration.standardMinutes(1),
new EventBus(),
new MetricRegistry(),
mock(ProcessBuffer.class));
new MetricRegistry());
final File messageJournalDir = new File(journalDirectory, "messagejournal-0");
assertTrue(messageJournalDir.exists());

Expand Down
Expand Up @@ -21,8 +21,6 @@
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import javax.inject.Provider;
import javax.inject.Named;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
Expand All @@ -37,6 +35,8 @@

import javax.annotation.Nonnull;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Provider;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
Expand All @@ -56,8 +56,6 @@ public class ProcessBuffer extends Buffer {

private final Meter incomingMessages;

private final ServerStatus serverStatus;

@Inject
public ProcessBuffer(MetricRegistry metricRegistry,
ServerStatus serverStatus,
Expand All @@ -66,7 +64,6 @@ public ProcessBuffer(MetricRegistry metricRegistry,
@Named("processbuffer_processors") int processorCount,
@Named("ring_size") int ringSize,
@Named("processor_wait_strategy") String waitStrategyName) {
this.serverStatus = serverStatus;
this.ringBufferSize = ringSize;

this.executor = executorService(metricRegistry);
Expand Down

0 comments on commit ffdbb70

Please sign in to comment.