Skip to content

Commit

Permalink
aggregate function return stream of Entities
Browse files Browse the repository at this point in the history
  • Loading branch information
redmitry committed Oct 13, 2023
1 parent e97f46e commit 801cdf7
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.mongodb.client.result.DeleteResult;
import jakarta.data.repository.Sort;
import org.bson.BsonDocument;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.eclipse.jnosql.communication.document.DocumentDeleteQuery;
Expand All @@ -34,6 +33,7 @@
import org.eclipse.jnosql.communication.document.Documents;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -42,6 +42,7 @@

import static java.util.stream.Collectors.toList;
import static java.util.stream.StreamSupport.stream;
import org.bson.BsonValue;
import static org.eclipse.jnosql.databases.mongodb.communication.MongoDBUtils.ID_FIELD;
import static org.eclipse.jnosql.databases.mongodb.communication.MongoDBUtils.getDocument;

Expand Down Expand Up @@ -202,17 +203,34 @@ public long delete(String collectionName, Bson filter) {
*
* @param collectionName the collection name
* @param pipeline the aggregation pipeline
* @return the number of documents deleted.
* @return the stream of BSON Documents
* @throws NullPointerException when filter or collectionName is null
*/
public Stream<Map<String, BsonValue>> aggregate(String collectionName, List<Bson> pipeline) {
public Stream<Map<String, BsonValue>> aggregate(String collectionName, Bson[] pipeline) {
Objects.requireNonNull(pipeline, "filter is required");
Objects.requireNonNull(collectionName, "collectionName is required");
MongoCollection<BsonDocument> collection = mongoDatabase.getCollection(collectionName, BsonDocument.class);
AggregateIterable aggregate = collection.aggregate(pipeline);
AggregateIterable aggregate = collection.aggregate(Arrays.asList(pipeline));
return stream(aggregate.spliterator(), false);
}

/**
* Aggregates documents according to the specified aggregation pipeline.
*
* @param collectionName the collection name
* @param pipeline the aggregation pipeline
* @return the stream result
* @throws NullPointerException when pipeline or collectionName is null
*/
public Stream<DocumentEntity> aggregate(String collectionName, List<Bson> pipeline) {
Objects.requireNonNull(pipeline, "pipeline is required");
Objects.requireNonNull(collectionName, "collectionName is required");
MongoCollection<Document> collection = mongoDatabase.getCollection(collectionName);
AggregateIterable<Document> aggregate = collection.aggregate(pipeline);
return stream(aggregate.spliterator(), false).map(MongoDBUtils::of)
.map(ds -> DocumentEntity.of(collectionName, ds));
}

/**
* Finds all documents in the collection.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public static List<org.eclipse.jnosql.communication.document.Document> of(Map<St
return values.keySet().stream().filter(isNotNull).map(documentMap).collect(Collectors.toList());
}

private static org.eclipse.jnosql.communication.document.Document getDocument(String key, Object value) {
public static org.eclipse.jnosql.communication.document.Document getDocument(String key, Object value) {
if (value instanceof Document) {
return org.eclipse.jnosql.communication.document.Document.of(key, of(Document.class.cast(value)));
} else if (isDocumentIterable(value)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.Typed;
import jakarta.inject.Inject;
import org.bson.BsonValue;
import org.bson.conversions.Bson;
import org.eclipse.jnosql.communication.document.DocumentEntity;
import org.eclipse.jnosql.databases.mongodb.communication.MongoDBDocumentManager;
Expand All @@ -33,6 +32,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.stream.Stream;
import org.bson.BsonValue;


@ApplicationScoped
Expand Down Expand Up @@ -125,20 +125,37 @@ public <T> Stream<T> select(Class<T> entity, Bson filter) {
}

@Override
public Stream<Map<String, BsonValue>> aggregate(String collectionName, List<Bson> pipeline) {
public Stream<Map<String, BsonValue>> aggregate(String collectionName, Bson[] pipeline) {
Objects.requireNonNull(collectionName, "collectionName is required");
Objects.requireNonNull(pipeline, "pipeline is required");
return this.getManager().aggregate(collectionName, pipeline);
}

@Override
public <T> Stream<Map<String, BsonValue>> aggregate(Class<T> entity, List<Bson> pipeline) {
public <T> Stream<Map<String, BsonValue>> aggregate(Class<T> entity, Bson[] pipeline) {
Objects.requireNonNull(entity, "entity is required");
Objects.requireNonNull(pipeline, "pipeline is required");
EntityMetadata entityMetadata = this.entities.get(entity);
return this.getManager().aggregate(entityMetadata.name(), pipeline);
}

@Override
public <T> Stream<T> aggregate(String collectionName, List<Bson> pipeline) {
Objects.requireNonNull(collectionName, "collectionName is required");
Objects.requireNonNull(pipeline, "pipeline is required");
return this.getManager().aggregate(collectionName, pipeline)
.map(this.converter::toEntity);
}

@Override
public <T> Stream<T> aggregate(Class<T> entity, List<Bson> pipeline) {
Objects.requireNonNull(entity, "entity is required");
Objects.requireNonNull(pipeline, "pipeline is required");
EntityMetadata entityMetadata = this.entities.get(entity);
return this.getManager().aggregate(entityMetadata.name(), pipeline)
.map(this.converter::toEntity);
}

@Override
public long count(String collectionName, Bson filter) {
Objects.requireNonNull(collectionName, "collection name is required");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
package org.eclipse.jnosql.databases.mongodb.mapping;

import jakarta.nosql.document.DocumentTemplate;
import org.bson.BsonValue;
import org.bson.conversions.Bson;
import org.eclipse.jnosql.mapping.document.JNoSQLDocumentTemplate;

import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import org.bson.BsonValue;

/**
* A MongoDB extension of {@link DocumentTemplate}
Expand Down Expand Up @@ -80,21 +80,43 @@ public interface MongoDBTemplate extends JNoSQLDocumentTemplate {
*
* @param collectionName the collection name
* @param pipeline the aggregation pipeline
* @return the number of documents deleted.
* @throws NullPointerException when filter or collectionName is null
* @return the stream result
* @throws NullPointerException when pipeline or collectionName is null
*/
Stream<Map<String, BsonValue>> aggregate(String collectionName, List<Bson> pipeline);
Stream<Map<String, BsonValue>> aggregate(String collectionName, Bson[] pipeline);

/**
* Aggregates documents according to the specified aggregation pipeline.
*
* @param entity the collection name
* @param pipeline the aggregation pipeline
* @param <T> the entity type
* @return the stream result
* @throws NullPointerException when pipeline or entity is null
*/
<T> Stream<Map<String, BsonValue>> aggregate(Class<T> entity, Bson[] pipeline);

/**
* Aggregates documents according to the specified aggregation pipeline.
*
* @param entity the collection name
* @param pipeline the aggregation pipeline
* @return the number of documents deleted.
* @throws NullPointerException when filter or entity is null
* @param <T> the entity type
* @return the stream result
* @throws NullPointerException when pipeline or entity is null
*/
<T> Stream<T> aggregate(Class<T> entity, List<Bson> pipeline);

/**
* Aggregates documents according to the specified aggregation pipeline.
*
* @param collectionName the collection name
* @param pipeline the aggregation pipeline
* @param <T> the entity type
* @return the stream result
* @throws NullPointerException when pipeline or collectionName is null
*/
<T> Stream<Map<String, BsonValue>> aggregate(Class<T> entity, List<Bson> pipeline);
<T> Stream<T> aggregate(String collectionName, List<Bson> pipeline);

/**
* Returns the number of items in the collection that match the given query filter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -108,9 +107,9 @@ public void shouldDelete() {
@Test
public void shouldReturnErrorOnAggregateWhenThereIsNullParameter() {
Assertions.assertThrows(NullPointerException.class,
() -> entityManager.aggregate(null, null));
() -> entityManager.aggregate(null, (List)null));
Assertions.assertThrows(NullPointerException.class,
() -> entityManager.aggregate(COLLECTION_NAME, null));
() -> entityManager.aggregate(COLLECTION_NAME, (List)null));

Assertions.assertThrows(NullPointerException.class,
() -> entityManager.aggregate(null,
Expand All @@ -119,10 +118,10 @@ public void shouldReturnErrorOnAggregateWhenThereIsNullParameter() {

@Test
public void shouldAggregate() {
List<Bson> predicates = Arrays.asList(
Bson[] predicates = {
Aggregates.match(eq("name", "Poliana")),
Aggregates.group("$stars", Accumulators.sum("count", 1))
);
};
entityManager.insert(getEntity());
Stream<Map<String, BsonValue>> aggregate = entityManager.aggregate(COLLECTION_NAME, predicates);
Assertions.assertNotNull(aggregate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,12 @@ public void shouldSelectWithEntity() {

@Test
public void shouldReturnErrorOnAggregateMethod() {
assertThrows(NullPointerException.class, () -> template.aggregate((String) null, null));
assertThrows(NullPointerException.class, () -> template.aggregate("Collection", null));
assertThrows(NullPointerException.class, () -> template.aggregate((String) null, (List) null));
assertThrows(NullPointerException.class, () -> template.aggregate("Collection", (List) null));
assertThrows(NullPointerException.class, () -> template.aggregate((String) null,
Collections.singletonList(eq("name", "Poliana"))));

assertThrows(NullPointerException.class, () -> template.aggregate(Person.class, null));
assertThrows(NullPointerException.class, () -> template.aggregate(Person.class, (List) null));
assertThrows(NullPointerException.class, () -> template.aggregate((Class<Object>) null,
Collections.singletonList(eq("name", "Poliana"))));
}
Expand Down

0 comments on commit 801cdf7

Please sign in to comment.