Skip to content

Commit

Permalink
Merge pull request #1110 from Graylog2/fix-cluster-event-bus
Browse files Browse the repository at this point in the history
Fix cluster event bus
  • Loading branch information
joschi committed Apr 15, 2015
2 parents a41f4fb + c585b79 commit 3c6ca23
Show file tree
Hide file tree
Showing 16 changed files with 876 additions and 236 deletions.
Expand Up @@ -19,7 +19,6 @@
import com.google.common.util.concurrent.Service;
import com.google.inject.AbstractModule;
import com.google.inject.multibindings.Multibinder;
import org.graylog2.events.ClusterEventService;
import org.graylog2.initializers.BufferSynchronizerService;
import org.graylog2.initializers.DashboardRegistryService;
import org.graylog2.initializers.IndexerSetupService;
Expand All @@ -35,6 +34,5 @@ protected void configure() {
serviceBinder.addBinding().to(IndexerSetupService.class);
serviceBinder.addBinding().to(BufferSynchronizerService.class);
serviceBinder.addBinding().to(OutputSetupService.class);
serviceBinder.addBinding().to(ClusterEventService.class);
}
}
Expand Up @@ -18,6 +18,8 @@

import com.google.inject.AbstractModule;
import com.google.inject.multibindings.Multibinder;
import org.graylog2.events.ClusterEventCleanupPeriodical;
import org.graylog2.events.ClusterEventPeriodical;
import org.graylog2.periodical.AlertScannerThread;
import org.graylog2.periodical.BatchedElasticSearchOutputFlushThread;
import org.graylog2.periodical.ClusterHealthCheckThread;
Expand Down Expand Up @@ -50,5 +52,7 @@ protected void configure() {
periodicalBinder.addBinding().to(StreamThroughputCounterManagerThread.class);
periodicalBinder.addBinding().to(VersionCheckThread.class);
periodicalBinder.addBinding().to(ThrottleStateUpdaterThread.class);
periodicalBinder.addBinding().to(ClusterEventPeriodical.class);
periodicalBinder.addBinding().to(ClusterEventCleanupPeriodical.class);
}
}
Expand Up @@ -80,6 +80,8 @@
import org.graylog2.streams.StreamRouter;
import org.graylog2.streams.StreamRouterEngine;
import org.graylog2.system.activities.SystemMessageActivityWriter;
import org.graylog2.system.debug.ClusterDebugEventListener;
import org.graylog2.system.debug.LocalDebugEventListener;
import org.graylog2.system.jobs.SystemJobFactory;
import org.graylog2.system.jobs.SystemJobManager;
import org.graylog2.system.shutdown.GracefulShutdown;
Expand Down Expand Up @@ -217,5 +219,7 @@ private void bindPluginMetaData() {

private void bindEventBusListeners() {
bind(InputStateListener.class).asEagerSingleton();
bind(LocalDebugEventListener.class).asEagerSingleton();
bind(ClusterDebugEventListener.class).asEagerSingleton();
}
}
Expand Up @@ -18,10 +18,8 @@

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.auto.value.AutoValue;
import org.graylog2.plugin.system.NodeId;
import org.hibernate.validator.constraints.NotEmpty;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
Expand All @@ -30,7 +28,6 @@

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Map;
import java.util.Set;

@JsonAutoDetect
Expand All @@ -42,8 +39,7 @@ public abstract class ClusterEvent {
public abstract String id();

@JsonProperty
@Nullable
public abstract DateTime date();
public abstract long timestamp();

@JsonProperty
@Nullable
Expand All @@ -64,19 +60,19 @@ public abstract class ClusterEvent {

@JsonCreator
public static ClusterEvent create(@Id @ObjectId @JsonProperty("_id") @Nullable String id,
@JsonProperty("date") @Nullable DateTime date,
@JsonProperty("timestamp") long timestamp,
@JsonProperty("producer") @Nullable String producer,
@JsonProperty("consumers") @Nullable Set<String> consumers,
@JsonProperty("event_class") @Nullable String eventClass,
@JsonProperty("payload") @Nullable Object payload) {
return new AutoValue_ClusterEvent(id, date, producer, consumers, eventClass, payload);
return new AutoValue_ClusterEvent(id, timestamp, producer, consumers, eventClass, payload);
}

public static ClusterEvent create(@NotEmpty String producer,
@NotEmpty String eventClass,
@NotEmpty Object payload) {
return new AutoValue_ClusterEvent(null,
DateTime.now(DateTimeZone.UTC),
return create(null,
DateTime.now(DateTimeZone.UTC).getMillis(),
producer,
Collections.<String>emptySet(),
eventClass,
Expand Down
@@ -0,0 +1,114 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog2.events;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Ints;
import com.mongodb.WriteConcern;
import org.graylog2.bindings.providers.MongoJackObjectMapperProvider;
import org.graylog2.database.MongoConnection;
import org.graylog2.plugin.periodical.Periodical;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.mongojack.DBQuery;
import org.mongojack.JacksonDBCollection;
import org.mongojack.WriteResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkNotNull;

public class ClusterEventCleanupPeriodical extends Periodical {
private static final Logger LOG = LoggerFactory.getLogger(ClusterEventCleanupPeriodical.class);
private static final String COLLECTION_NAME = ClusterEventPeriodical.COLLECTION_NAME;

@VisibleForTesting
static final long DEFAULT_MAX_EVENT_AGE = TimeUnit.HOURS.toMillis(1L);

private final JacksonDBCollection<ClusterEvent, String> dbCollection;
private final long maxEventAge;

@Inject
public ClusterEventCleanupPeriodical(final MongoJackObjectMapperProvider mapperProvider,
final MongoConnection mongoConnection) {
this(JacksonDBCollection.wrap(mongoConnection.getDatabase().getCollection(COLLECTION_NAME),
ClusterEvent.class, String.class, mapperProvider.get()), DEFAULT_MAX_EVENT_AGE);
}

ClusterEventCleanupPeriodical(final JacksonDBCollection<ClusterEvent, String> dbCollection, final long maxEventAge) {
this.dbCollection = checkNotNull(dbCollection);
this.maxEventAge = maxEventAge;
}

@Override
public boolean runsForever() {
return false;
}

@Override
public boolean stopOnGracefulShutdown() {
return true;
}

@Override
public boolean masterOnly() {
return true;
}

@Override
public boolean startOnThisNode() {
return true;
}

@Override
public boolean isDaemon() {
return true;
}

@Override
public int getInitialDelaySeconds() {
return 0;
}

@Override
public int getPeriodSeconds() {
return Ints.saturatedCast(TimeUnit.MINUTES.toSeconds(5L));
}

@Override
protected Logger getLogger() {
return LOG;
}

@Override
public void doRun() {
try {
LOG.debug("Removing stale events from MongoDB collection \"{}\"", COLLECTION_NAME);

final long timestamp = DateTime.now(DateTimeZone.UTC).getMillis() - maxEventAge;
final DBQuery.Query query = DBQuery.lessThan("timestamp", timestamp);
final WriteResult<ClusterEvent, String> writeResult = dbCollection.remove(query, WriteConcern.MAJORITY);

LOG.debug("Removed {} stale events from \"{}\"", writeResult.getN(), COLLECTION_NAME);
} catch (Exception e) {
LOG.warn("Error while removing stale cluster events from MongoDB", e);
}
}
}

0 comments on commit 3c6ca23

Please sign in to comment.