Skip to content

Commit

Permalink
Revert "Add timing metrics to GelfOutput (#3810)"
Browse files Browse the repository at this point in the history
This reverts commit ff1a9f3.

It broke backwards compatibility with 3rd party output plugins.
  • Loading branch information
bernd committed Jun 6, 2017
1 parent 8626f6c commit 3f68cfa
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 168 deletions.
Expand Up @@ -31,7 +31,6 @@
import org.graylog2.plugin.streams.Stream;
import org.graylog2.shared.journal.Journal;

import javax.annotation.Nullable;
import javax.inject.Inject;
import java.io.File;
import java.util.List;
Expand All @@ -49,6 +48,14 @@ public class BenchmarkOutput implements MessageOutput {
private final CsvReporter csvReporter;
private final Journal journal;

@AssistedInject
public BenchmarkOutput(final MetricRegistry metricRegistry,
final Journal journal,
@Assisted Stream stream,
@Assisted Configuration configuration) {
this(metricRegistry, journal);
}

@Inject
public BenchmarkOutput(final MetricRegistry metricRegistry, final Journal journal) {
this.journal = journal;
Expand Down Expand Up @@ -102,7 +109,7 @@ public void write(List<Message> messages) throws Exception {

public interface Factory extends MessageOutput.Factory<GelfOutput> {
@Override
GelfOutput create(Stream stream, Configuration configuration, @Nullable String id);
GelfOutput create(Stream stream, Configuration configuration);

@Override
Config getConfig();
Expand Down
Expand Up @@ -18,13 +18,15 @@

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.configuration.ConfigurationRequest;
import org.graylog2.plugin.outputs.MessageOutput;
import org.graylog2.plugin.streams.Stream;
import org.graylog2.shared.journal.Journal;

import javax.annotation.Nullable;
import javax.inject.Inject;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -36,6 +38,14 @@ public class DiscardMessageOutput implements MessageOutput {
private final Journal journal;
private final Meter messagesDiscarded;

@AssistedInject
public DiscardMessageOutput(final Journal journal,
final MetricRegistry metricRegistry,
@Assisted Stream stream,
@Assisted Configuration configuration) {
this(journal, metricRegistry);
}

@Inject
public DiscardMessageOutput(final Journal journal, final MetricRegistry metricRegistry) {
this.journal = journal;
Expand Down Expand Up @@ -72,14 +82,6 @@ public void write(List<Message> messages) throws Exception {
}

public interface Factory extends MessageOutput.Factory<DiscardMessageOutput> {
@Override
DiscardMessageOutput create(Stream stream, Configuration configuration, @Nullable String id);

@Override
Config getConfig();

@Override
Descriptor getDescriptor();
}

public static class Config extends MessageOutput.Config {
Expand Down
Expand Up @@ -21,6 +21,8 @@
import com.codahale.metrics.Timer;
import com.google.common.base.Joiner;
import com.google.common.collect.Ordering;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import org.graylog2.indexer.IndexSet;
import org.graylog2.indexer.messages.Messages;
import org.graylog2.plugin.Message;
Expand All @@ -32,7 +34,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import javax.inject.Inject;
import java.util.Collections;
import java.util.List;
Expand All @@ -55,6 +56,15 @@ public class ElasticSearchOutput implements MessageOutput {
private final Journal journal;
private final AtomicBoolean isRunning = new AtomicBoolean(false);

@AssistedInject
public ElasticSearchOutput(MetricRegistry metricRegistry,
Messages messages,
Journal journal,
@Assisted Stream stream,
@Assisted Configuration configuration) {
this(metricRegistry, messages, journal);
}

@Inject
public ElasticSearchOutput(MetricRegistry metricRegistry,
Messages messages,
Expand Down Expand Up @@ -112,7 +122,7 @@ public boolean isRunning() {

public interface Factory extends MessageOutput.Factory<ElasticSearchOutput> {
@Override
ElasticSearchOutput create(Stream stream, Configuration configuration, @Nullable String id);
ElasticSearchOutput create(Stream stream, Configuration configuration);

@Override
Config getConfig();
Expand Down
58 changes: 9 additions & 49 deletions graylog2-server/src/main/java/org/graylog2/outputs/GelfOutput.java
Expand Up @@ -16,9 +16,6 @@
*/
package org.graylog2.outputs;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.inject.assistedinject.Assisted;
Expand Down Expand Up @@ -52,9 +49,8 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.codahale.metrics.MetricRegistry.name;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.Objects.requireNonNull;

public class GelfOutput implements MessageOutput {
private static final Logger LOG = LoggerFactory.getLogger(GelfOutput.class);
Expand All @@ -74,62 +70,29 @@ public class GelfOutput implements MessageOutput {
private final AtomicBoolean isRunning = new AtomicBoolean(false);

private final GelfTransport transport;
private final String id;
private final String streamId;
private final MetricRegistry metricRegistry;
private final String writesMetricName;
private final Meter writes;
private final String processTimeMetricName;
private final Timer processTime;

@Inject
public GelfOutput(@Assisted Stream stream,
@Assisted Configuration configuration,
@Assisted @Nullable String id,
MetricRegistry metricRegistry) throws MessageOutputConfigurationException {
this(buildTransport(configuration), stream.getId(), id, metricRegistry);
public GelfOutput(@Assisted Configuration configuration) throws MessageOutputConfigurationException {
this(buildTransport(configuration));
}

@VisibleForTesting
GelfOutput(GelfTransport gelfTransport, String streamId, @Nullable String id, MetricRegistry metricRegistry) {
this.transport = requireNonNull(gelfTransport, "gelfTransport must not be null");
this.streamId = streamId;
this.id = id;
this.metricRegistry = requireNonNull(metricRegistry, "metricRegistry must not be null");

this.writesMetricName = buildMetricName(streamId, id, "writes");
this.writes = metricRegistry.meter(writesMetricName);
this.processTimeMetricName = buildMetricName(streamId, id, "processTime");
this.processTime = metricRegistry.timer(processTimeMetricName);

GelfOutput(GelfTransport gelfTransport) {
this.transport = checkNotNull(gelfTransport);
isRunning.set(true);
}

private String buildMetricName(String streamId, String id, String name) {
return isNullOrEmpty(id)
? name(GelfOutput.class, "stream", streamId, name)
: name(GelfOutput.class, id, "stream", streamId, name);
}

@Override
public void stop() {
LOG.debug("Stopping {} [{}] on stream [{}]", transport.getClass().getName(), id, streamId);
LOG.debug("Stopping {}", transport.getClass().getName());
try {
transport.stop();
cleanupMetrics();
} catch (Exception e) {
LOG.error("Error stopping {} [{}] on stream [{}]", transport.getClass().getName(), id, streamId, e);
LOG.error("Error stopping " + transport.getClass().getName(), e);
}
isRunning.set(false);
}

private void cleanupMetrics() {
if (metricRegistry != null && metricRegistry.getMetrics() != null) {
metricRegistry.remove(writesMetricName);
metricRegistry.remove(processTimeMetricName);
}
}

@Override
public boolean isRunning() {
return isRunning.get();
Expand Down Expand Up @@ -218,10 +181,7 @@ protected static GelfTransport buildTransport(final Configuration configuration)

@Override
public void write(final Message message) throws Exception {
writes.mark();
try (final Timer.Context ignored = processTime.time()) {
transport.send(toGELFMessage(message));
}
transport.send(toGELFMessage(message));
}

@Override
Expand Down Expand Up @@ -307,7 +267,7 @@ protected GelfMessage toGELFMessage(final Message message) {

public interface Factory extends MessageOutput.Factory<GelfOutput> {
@Override
GelfOutput create(Stream stream, Configuration configuration, @Nullable String id);
GelfOutput create(Stream stream, Configuration configuration);

@Override
Config getConfig();
Expand Down
Expand Up @@ -28,7 +28,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import javax.inject.Inject;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -70,7 +69,7 @@ public void write(List<Message> messages) throws Exception {

public interface Factory extends MessageOutput.Factory<LoggingOutput> {
@Override
LoggingOutput create(Stream stream, Configuration configuration, @Nullable String id);
LoggingOutput create(Stream stream, Configuration configuration);

@Override
Config getConfig();
Expand Down
Expand Up @@ -48,7 +48,7 @@ public MessageOutput fromStreamOutput(Output output, final Stream stream, Config

Preconditions.checkArgument(factory != null, "Output type is not supported: %s!", outputType);

return factory.create(stream, configuration, output.getId());
return factory.create(stream, configuration);
}


Expand Down
Expand Up @@ -23,12 +23,11 @@
import org.graylog2.plugin.configuration.ConfigurationRequest;
import org.graylog2.plugin.streams.Stream;

import javax.annotation.Nullable;
import java.util.List;

public interface MessageOutput extends Stoppable {
interface Factory<T> {
T create(Stream stream, Configuration configuration, @Nullable String id);
T create(Stream stream, Configuration configuration);
Config getConfig();
Descriptor getDescriptor();
}
Expand Down
Expand Up @@ -16,12 +16,21 @@
*/
package org.graylog2.buffers.processors.fakestreams;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.graylog2.plugin.outputs.MessageOutput;
import org.graylog2.streams.StreamImpl;

import java.util.Collections;
import java.util.List;

public class FakeStream extends StreamImpl {
private List<MessageOutput> outputs = Lists.newArrayList();

public FakeStream(String title) {
super(Collections.singletonMap(StreamImpl.FIELD_TITLE, title));
super(Maps.<String, Object>newHashMap());
}

public void addOutput(MessageOutput output) {
outputs.add(output);
}
}

0 comments on commit 3f68cfa

Please sign in to comment.