Skip to content

Commit

Permalink
initial /system/journal resource for getting information about the cu…
Browse files Browse the repository at this point in the history
…rrent state

also make log flushing config params tunable and set default values (were "never", now 1 million entries or 1 minute for forced flush to disk)

REST resource:
* includes the journal config
* serializes durations to millis for easier parsing for third parties
* uses bytes for sizes instead of human readable forms
  • Loading branch information
kroepke committed Dec 31, 2014
1 parent 4021b4a commit b34b02f
Show file tree
Hide file tree
Showing 8 changed files with 356 additions and 48 deletions.
Expand Up @@ -22,6 +22,8 @@
*/ */
package org.graylog2.plugin; package org.graylog2.plugin;


import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.github.joschi.jadconfig.Parameter; import com.github.joschi.jadconfig.Parameter;
import com.github.joschi.jadconfig.util.Size; import com.github.joschi.jadconfig.util.Size;
import org.joda.time.Duration; import org.joda.time.Duration;
Expand All @@ -31,17 +33,36 @@
public class KafkaJournalConfiguration { public class KafkaJournalConfiguration {


@Parameter(value = "message_journal_dir", required = true) @Parameter(value = "message_journal_dir", required = true)
@JsonProperty("directory")
private File messageJournalDir = new File("journal"); private File messageJournalDir = new File("journal");


@Parameter("message_journal_segment_size") @Parameter("message_journal_segment_size")
@JsonProperty("segment_size")
private Size messageJournalSegmentSize = Size.megabytes(100l); private Size messageJournalSegmentSize = Size.megabytes(100l);


@Parameter("message_journal_segment_age")
@JsonFormat(shape = JsonFormat.Shape.NUMBER)
@JsonProperty("segment_age")
private Duration messageJournalSegmentAge = Duration.standardHours(1);

@Parameter("message_journal_max_size") @Parameter("message_journal_max_size")
@JsonProperty("max_size")
private Size messageJournalMaxSize = Size.gigabytes(5l); private Size messageJournalMaxSize = Size.gigabytes(5l);


@Parameter("message_journal_max_age") @Parameter("message_journal_max_age")
@JsonFormat(shape = JsonFormat.Shape.NUMBER)
@JsonProperty("max_age")
private Duration messageJournalMaxAge = Duration.standardHours(12); private Duration messageJournalMaxAge = Duration.standardHours(12);


@Parameter("message_journal_flush_interval")
@JsonProperty("flush_interval")
private long messageJournalFlushInterval = 1_000_000;

@Parameter("message_journal_flush_age")
@JsonFormat(shape = JsonFormat.Shape.NUMBER)
@JsonProperty("flush_age")
private Duration messageJournalFlushAge = Duration.standardMinutes(1);

public File getMessageJournalDir() { public File getMessageJournalDir() {
return messageJournalDir; return messageJournalDir;
} }
Expand All @@ -50,6 +71,10 @@ public Size getMessageJournalSegmentSize() {
return messageJournalSegmentSize; return messageJournalSegmentSize;
} }


public Duration getMessageJournalSegmentAge() {
return messageJournalSegmentAge;
}

public Duration getMessageJournalMaxAge() { public Duration getMessageJournalMaxAge() {
return messageJournalMaxAge; return messageJournalMaxAge;
} }
Expand All @@ -58,4 +83,11 @@ public Size getMessageJournalMaxSize() {
return messageJournalMaxSize; return messageJournalMaxSize;
} }


public long getMessageJournalFlushInterval() {
return messageJournalFlushInterval;
}

public Duration getMessageJournalFlushAge() {
return messageJournalFlushAge;
}
} }
@@ -0,0 +1,99 @@
/**
* This file is part of Graylog2.
*
* Graylog2 is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog2 is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog2. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog2.rest.resources.system;

import com.codahale.metrics.annotation.Timed;
import com.github.joschi.jadconfig.util.Size;
import com.wordnik.swagger.annotations.Api;
import com.wordnik.swagger.annotations.ApiOperation;
import kafka.log.LogSegment;
import org.apache.shiro.authz.annotation.RequiresAuthentication;
import org.apache.shiro.authz.annotation.RequiresPermissions;
import org.graylog2.Configuration;
import org.graylog2.plugin.KafkaJournalConfiguration;
import org.graylog2.plugin.ThrottleState;
import org.graylog2.rest.resources.RestResource;
import org.graylog2.rest.resources.system.responses.JournalSummaryResponse;
import org.graylog2.security.RestPermissions;
import org.graylog2.shared.journal.Journal;
import org.graylog2.shared.journal.KafkaJournal;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@RequiresAuthentication
@Api(value = "System/Journal", description = "Message journal information of this node.")
@Produces(MediaType.APPLICATION_JSON)
@Path("/system/journal")
public class JournalResource extends RestResource {
private static final Logger log = LoggerFactory.getLogger(JournalResource.class);
private final boolean journalEnabled;
private final Journal journal;
private final KafkaJournalConfiguration kafkaJournalConfiguration;

@Inject
public JournalResource(Configuration configuration, KafkaJournalConfiguration kafkaJournalConfiguration, Journal journal) {
this.kafkaJournalConfiguration = kafkaJournalConfiguration;
this.journalEnabled = configuration.isMessageJournalEnabled();
this.journal = journal;
}

@GET
@Timed
@ApiOperation(value = "Get current state of the journal on this node.")
@RequiresPermissions(RestPermissions.JOURNAL_READ)
public JournalSummaryResponse show() {
if (!journalEnabled) {
return JournalSummaryResponse.createDisabled();
}

if (journal instanceof KafkaJournal) {
final KafkaJournal kafkaJournal = (KafkaJournal) journal;
final ThrottleState throttleState = kafkaJournal.getThrottleState();
kafkaJournal.numberOfSegments();

long oldestSegment = Long.MAX_VALUE;
for (final LogSegment segment : kafkaJournal.getSegments()) {
oldestSegment = Math.min(oldestSegment, segment.created());
}

return JournalSummaryResponse.createEnabled(throttleState.appendEventsPerSec,
throttleState.readEventsPerSec,
throttleState.uncommittedJournalEntries,
Size.bytes(throttleState.journalSize),
Size.bytes(throttleState.journalSizeLimit),
kafkaJournal.numberOfSegments(),
new DateTime(oldestSegment, DateTimeZone.UTC),
kafkaJournalConfiguration
);

}

log.warn("Unknown Journal implementation {} in use, cannot get information about it. Pretending journal is disabled.",
journal.getClass());
return JournalSummaryResponse.createDisabled();

}

}
@@ -0,0 +1,84 @@
/**
* This file is part of Graylog2.
*
* Graylog2 is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog2 is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog2. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog2.rest.resources.system.responses;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.github.joschi.jadconfig.util.Size;
import com.google.auto.value.AutoValue;
import org.graylog2.plugin.KafkaJournalConfiguration;
import org.joda.time.DateTime;

import javax.annotation.Nullable;

@JsonAutoDetect
@AutoValue
public abstract class JournalSummaryResponse {

public static JournalSummaryResponse createDisabled() {
return new AutoValue_JournalSummaryResponse(false, 0, 0, 0, Size.bytes(0), Size.bytes(0), 0, null, null);
}

public static JournalSummaryResponse createEnabled(long appendEventsPerSec,
long readEventsPerSec,
long uncommittedJournalEntries,
Size journalSize,
Size journalSizeLimit,
int numberOfSegments,
DateTime oldestSegment,
KafkaJournalConfiguration kafkaJournalConfiguration) {
return new AutoValue_JournalSummaryResponse(true,
appendEventsPerSec,
readEventsPerSec,
uncommittedJournalEntries,
journalSize,
journalSizeLimit,
numberOfSegments,
oldestSegment,
kafkaJournalConfiguration);
}

// keep the fields in the same order as the auto value constructor params!
@JsonProperty
public abstract boolean enabled();

@JsonProperty
public abstract long appendEventsPerSecond();

@JsonProperty
public abstract long readEventsPerSecond();

@JsonProperty
public abstract long uncommittedJournalEntries();

@JsonProperty
public abstract Size journalSize();

@JsonProperty
public abstract Size journalSizeLimit();

@JsonProperty
public abstract int numberOfSegments();

@JsonProperty
@Nullable
public abstract DateTime oldestSegment();

@JsonProperty
@Nullable
public abstract KafkaJournalConfiguration journalConfig();
}
Expand Up @@ -110,6 +110,8 @@ public class RestPermissions {
public static final String BUNDLE_DELETE = "bundle:delete"; public static final String BUNDLE_DELETE = "bundle:delete";
public static final String BUNDLE_IMPORT = "bundle:import"; public static final String BUNDLE_IMPORT = "bundle:import";
public static final String BUNDLE_EXPORT = "bundle:export"; public static final String BUNDLE_EXPORT = "bundle:export";
public static final String JOURNAL_READ = "journal:read";
public static final String JOURNAL_EDIT = "journal:edit";


private static Map<String, Collection<String>> allPermissions; private static Map<String, Collection<String>> allPermissions;


Expand Down

0 comments on commit b34b02f

Please sign in to comment.