Skip to content

Commit

Permalink
Replace OutputServiceMJImpl#countByType() with old version
Browse files Browse the repository at this point in the history
The new implementation of `OutputServiceMJImpl#countByType()` didn't work at all
and was using aggregations which are only supported since MongoDB 2.2 and we need
to support MongoDB 2.0 as well.

The error in the new implementation using aggregations was:

com.mongodb.CommandFailureException: { "serverUsed" : "127.0.0.1:27017" , "errmsg" : "exception: A pipeline stage specification object must contain exactly one field." , "code" : 16435 , "ok" : 0.0}
  • Loading branch information
Jochen Schalanda committed May 6, 2015
1 parent 359ce5b commit 71a1e4a
Showing 1 changed file with 16 additions and 21 deletions.
Expand Up @@ -16,10 +16,10 @@
*/ */
package org.graylog2.streams; package org.graylog2.streams;


import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.mongodb.BasicDBObject; import com.mongodb.BasicDBObject;
import com.mongodb.DBCollection; import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject; import com.mongodb.DBObject;
import org.bson.types.ObjectId; import org.bson.types.ObjectId;
import org.graylog2.bindings.providers.MongoJackObjectMapperProvider; import org.graylog2.bindings.providers.MongoJackObjectMapperProvider;
Expand All @@ -29,22 +29,21 @@
import org.graylog2.plugin.Tools; import org.graylog2.plugin.Tools;
import org.graylog2.plugin.database.ValidationException; import org.graylog2.plugin.database.ValidationException;
import org.graylog2.plugin.streams.Output; import org.graylog2.plugin.streams.Output;
import org.graylog2.plugin.streams.Stream;
import org.graylog2.rest.models.streams.outputs.requests.CreateOutputRequest; import org.graylog2.rest.models.streams.outputs.requests.CreateOutputRequest;
import org.mongojack.Aggregation;
import org.mongojack.AggregationResult;
import org.mongojack.DBQuery; import org.mongojack.DBQuery;
import org.mongojack.DBUpdate; import org.mongojack.DBUpdate;
import org.mongojack.JacksonDBCollection; import org.mongojack.JacksonDBCollection;
import org.mongojack.WriteResult; import org.mongojack.WriteResult;


import javax.inject.Inject; import javax.inject.Inject;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;


public class OutputServiceMJImpl implements OutputService { public class OutputServiceMJImpl implements OutputService {
private final JacksonDBCollection<OutputAVImpl, String> coll; private final JacksonDBCollection<OutputAVImpl, String> coll;
private final DBCollection dbCollection;
private final StreamService streamService; private final StreamService streamService;


@Inject @Inject
Expand All @@ -53,7 +52,7 @@ public OutputServiceMJImpl(MongoConnection mongoConnection,
StreamService streamService) { StreamService streamService) {
this.streamService = streamService; this.streamService = streamService;
final String collectionName = OutputAVImpl.class.getAnnotation(CollectionName.class).value(); final String collectionName = OutputAVImpl.class.getAnnotation(CollectionName.class).value();
final DBCollection dbCollection = mongoConnection.getDatabase().getCollection(collectionName); this.dbCollection = mongoConnection.getDatabase().getCollection(collectionName);
this.coll = JacksonDBCollection.wrap(dbCollection, OutputAVImpl.class, String.class, mapperProvider.get()); this.coll = JacksonDBCollection.wrap(dbCollection, OutputAVImpl.class, String.class, mapperProvider.get());
} }


Expand Down Expand Up @@ -110,23 +109,19 @@ public long count() {


@Override @Override
public Map<String, Long> countByType() { public Map<String, Long> countByType() {
final DBObject groupFields = new BasicDBObject("_id", "$type"); final DBCursor outputTypes = dbCollection.find(null, new BasicDBObject(OutputImpl.FIELD_TYPE, 1));
groupFields.put("count", new BasicDBObject("$sum", 1));
final DBObject countOperation = new BasicDBObject("$group", groupFields); final Map<String, Long> outputsCountByType = new HashMap<>(outputTypes.count());

for (DBObject outputType : outputTypes) {
final AggregationResult<TypeCountResult> aggregationResult = coll.aggregate(new Aggregation<>(TypeCountResult.class, countOperation, new BasicDBObject())); final String type = (String) outputType.get(OutputImpl.FIELD_TYPE);

if (type != null) {
final Map<String, Long> result = Maps.newHashMap(); final Long oldValue = outputsCountByType.get(type);

final Long newValue = (oldValue == null) ? 1 : oldValue + 1;
for (TypeCountResult typeResult : aggregationResult.results()) outputsCountByType.put(type, newValue);
result.put(typeResult.type, typeResult.count); }

}
return result;
}


class TypeCountResult { return outputsCountByType;
String type;
Long count;
} }


private OutputAVImpl implOrFail(Output output) { private OutputAVImpl implOrFail(Output output) {
Expand Down

0 comments on commit 71a1e4a

Please sign in to comment.