Skip to content

Commit

Permalink
Merge branch 'remove-throughputstats'
Browse files Browse the repository at this point in the history
  • Loading branch information
bernd committed Mar 4, 2016
2 parents d6de9fd + 6e6c37b commit a24f715
Show file tree
Hide file tree
Showing 8 changed files with 1 addition and 212 deletions.
5 changes: 0 additions & 5 deletions graylog2-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,6 @@
<artifactId>joda-time</artifactId>
</dependency>

<dependency>
<groupId>com.github.stephenc.high-scale-lib</groupId>
<artifactId>high-scale-lib</artifactId>
</dependency>

<dependency>
<groupId>com.eaio.uuid</groupId>
<artifactId>uuid</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.graylog2.periodical.IndexRotationThread;
import org.graylog2.periodical.IndexerClusterCheckerThread;
import org.graylog2.periodical.NodePingThread;
import org.graylog2.periodical.StreamThroughputCounterManagerThread;
import org.graylog2.periodical.ThrottleStateUpdaterThread;
import org.graylog2.periodical.UserPermissionMigrationPeriodical;
import org.graylog2.periodical.VersionCheckThread;
Expand All @@ -53,7 +52,6 @@ protected void configure() {
periodicalBinder.addBinding().to(IndexRetentionThread.class);
periodicalBinder.addBinding().to(IndexRotationThread.class);
periodicalBinder.addBinding().to(NodePingThread.class);
periodicalBinder.addBinding().to(StreamThroughputCounterManagerThread.class);
periodicalBinder.addBinding().to(VersionCheckThread.class);
periodicalBinder.addBinding().to(ThrottleStateUpdaterThread.class);
periodicalBinder.addBinding().to(ClusterEventPeriodical.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,27 @@
import org.graylog2.plugin.Message;
import org.graylog2.plugin.filters.MessageFilter;
import org.graylog2.plugin.streams.Stream;
import org.graylog2.shared.stats.ThroughputStats;
import org.graylog2.streams.StreamRouter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import java.util.List;

/**
* @author Lennart Koopmann <lennart@socketfeed.com>
*/
public class StreamMatcherFilter implements MessageFilter {

private static final Logger LOG = LoggerFactory.getLogger(StreamMatcherFilter.class);

private final StreamRouter streamRouter;
private final ThroughputStats throughputStats;

@Inject
public StreamMatcherFilter(StreamRouter streamRouter,
ThroughputStats throughputStats) {
public StreamMatcherFilter(StreamRouter streamRouter) {
this.streamRouter = streamRouter;
this.throughputStats = throughputStats;
}

@Override
public boolean filter(Message msg) {
List<Stream> streams = streamRouter.route(msg);

for (Stream stream : streams) {
throughputStats.incrementStreamThroughput(stream.getId());
}
msg.addStreams(streams);

LOG.debug("Routed message <{}> to {} streams.", msg.getId(), streams.size());
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.shiro.authz.annotation.RequiresAuthentication;
import org.apache.shiro.authz.annotation.RequiresPermissions;
import org.bson.types.ObjectId;
import org.cliffc.high_scale_lib.Counter;
import org.graylog2.alarmcallbacks.AlarmCallbackConfiguration;
import org.graylog2.alarmcallbacks.AlarmCallbackConfigurationService;
import org.graylog2.database.NotFoundException;
Expand All @@ -51,7 +50,6 @@
import org.graylog2.rest.resources.streams.rules.requests.CreateStreamRuleRequest;
import org.graylog2.shared.rest.resources.RestResource;
import org.graylog2.shared.security.RestPermissions;
import org.graylog2.shared.stats.ThroughputStats;
import org.graylog2.streams.StreamImpl;
import org.graylog2.streams.StreamRouterEngine;
import org.graylog2.streams.StreamRuleService;
Expand Down Expand Up @@ -99,19 +97,16 @@ public class StreamResource extends RestResource {
private final StreamService streamService;
private final StreamRuleService streamRuleService;
private final StreamRouterEngine.Factory streamRouterEngineFactory;
private final ThroughputStats throughputStats;
private final AlarmCallbackConfigurationService alarmCallbackConfigurationService;

@Inject
public StreamResource(StreamService streamService,
StreamRuleService streamRuleService,
StreamRouterEngine.Factory streamRouterEngineFactory,
ThroughputStats throughputStats,
AlarmCallbackConfigurationService alarmCallbackConfigurationService) {
this.streamService = streamService;
this.streamRuleService = streamRuleService;
this.streamRouterEngineFactory = streamRouterEngineFactory;
this.throughputStats = throughputStats;
this.alarmCallbackConfigurationService = alarmCallbackConfigurationService;
}

Expand Down Expand Up @@ -388,45 +383,6 @@ public Response cloneStream(@ApiParam(name = "streamId", required = true) @PathP
return Response.created(streamUri).entity(result).build();
}

@GET
@Timed
@Path("/{streamId}/throughput")
@ApiOperation(value = "Current throughput of this stream on this node in messages per second")
@Produces(MediaType.APPLICATION_JSON)
public Map<String, Long> oneStreamThroughput(@ApiParam(name = "streamId", required = true) @PathParam("streamId") String streamId) {
final Map<String, Long> result = Maps.newHashMap();
result.put("throughput", 0L);

final HashMap<String, Counter> currentStreamThroughput = throughputStats.getCurrentStreamThroughput();
if (currentStreamThroughput != null) {
final Counter counter = currentStreamThroughput.get(streamId);
if (counter != null && isPermitted(RestPermissions.STREAMS_READ, streamId))
result.put("throughput", counter.get());
}

return result;
}

@GET
@Timed
@Path("/throughput")
@ApiOperation("Current throughput of all visible streams on this node in messages per second")
@Produces(MediaType.APPLICATION_JSON)
public Map<String, Map<String, Long>> streamThroughput() {
final Map<String, Long> perStream = Maps.newHashMap();

final Map<String, Counter> currentStreamThroughput = throughputStats.getCurrentStreamThroughput();
if (currentStreamThroughput != null) {
for (Map.Entry<String, Counter> entry : currentStreamThroughput.entrySet()) {
if (entry.getValue() != null && isPermitted(RestPermissions.STREAMS_READ, entry.getKey())) {
perStream.put(entry.getKey(), entry.getValue().get());
}
}
}

return ImmutableMap.of("throughput", perStream);
}

private StreamResponse streamToResponse(Stream stream) {
return StreamResponse.create(
stream.getId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.graylog2.shared.buffers.processors.DecodingProcessor;
import org.graylog2.shared.inputs.InputRegistry;
import org.graylog2.shared.inputs.InputStateListener;
import org.graylog2.shared.stats.ThroughputStats;
import org.jboss.netty.util.HashedWheelTimer;

import java.util.concurrent.Semaphore;
Expand All @@ -54,7 +53,6 @@ protected void configure() {
// This is holding all our metrics.
bind(MetricRegistry.class).toProvider(MetricRegistryProvider.class).asEagerSingleton();
bind(LocalMetricRegistry.class).in(Scopes.NO_SCOPE); // must not be a singleton!
bind(ThroughputStats.class).toInstance(new ThroughputStats());

install(new FactoryModuleBuilder().build(DecodingProcessor.Factory.class));

Expand Down

This file was deleted.

5 changes: 0 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -436,11 +436,6 @@
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
<dependency>
<groupId>com.github.stephenc.high-scale-lib</groupId>
<artifactId>high-scale-lib</artifactId>
<version>1.1.4</version>
</dependency>

<dependency>
<groupId>org.apache.directory.api</groupId>
Expand Down

0 comments on commit a24f715

Please sign in to comment.