Skip to content

Commit

Permalink
Expand MongoDb test coverage apache#2622
Browse files Browse the repository at this point in the history
  • Loading branch information
JiriOndrusek committed May 21, 2021
1 parent 5a94006 commit 4a16e1d
Show file tree
Hide file tree
Showing 7 changed files with 457 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
*/
package org.apache.camel.quarkus.component.mongodb.deployment;

import com.mongodb.client.model.changestream.ChangeStreamDocument;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;

class MongoDbProcessor {

Expand All @@ -28,4 +30,9 @@ class MongoDbProcessor {
FeatureBuildItem feature() {
return new FeatureBuildItem(FEATURE);
}

@BuildStep
ReflectiveClassBuildItem reflectiveClass() {
return new ReflectiveClassBuildItem(true, false, ChangeStreamDocument.class);
}
}
5 changes: 5 additions & 0 deletions integration-tests/mongodb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>

<!-- The following dependencies guarantee that this module is built after them. You can update them by running `mvn process-resources -Pformat -N` from the source tree root directory -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@

import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.inject.Named;
import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonArrayBuilder;
Expand All @@ -35,11 +38,13 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoIterable;
import io.quarkus.mongodb.MongoClientName;
import io.quarkus.runtime.annotations.RegisterForReflection;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.component.mongodb.MongoDbConstants;
import org.apache.camel.util.CollectionHelper;
import org.bson.Document;

@Path("/mongodb")
Expand All @@ -50,11 +55,14 @@ public class MongoDbResource {
static final String NAMED_MONGO_CLIENT_NAME = "myMongoClient";

@Inject
@MongoClientName(value = NAMED_MONGO_CLIENT_NAME)
MongoClient namedMongoClient;
ProducerTemplate producerTemplate;

@Inject
ProducerTemplate producerTemplate;
CamelContext camelContext;

@Inject
@Named("results")
Map<String, List<Document>> results;

@POST
@Path("/collection/{collectionName}")
Expand Down Expand Up @@ -97,4 +105,93 @@ public JsonArray getCollection(@PathParam("collectionName") String collectionNam

return arrayBuilder.build();
}

@POST
@Path("/collection/dynamic/{collectionName}")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Object dynamic(@PathParam("collectionName") String collectionName, String content,
@HeaderParam("mongoClientName") String mongoClientName,
@HeaderParam("dynamicOperation") String operation)
throws URISyntaxException {

Object result = producerTemplate.requestBodyAndHeader(
String.format("mongodb:%s?database=test&collection=%s&operation=insert&dynamicity=true",
mongoClientName, collectionName),
content, MongoDbConstants.OPERATION_HEADER, operation);

return result;
}

@GET
@Path("/route/{routeId}/{operation}")
@Produces(MediaType.TEXT_PLAIN)
public String restartRoute(@PathParam("routeId") String routeId, @PathParam("operation") String operation)
throws Exception {
switch (operation) {
case "stop":
camelContext.getRouteController().stopRoute(routeId);
break;
case "start":
camelContext.getRouteController().startRoute(routeId);
break;
case "status":
return camelContext.getRouteController().getRouteStatus(routeId).name();

}

return null;
}

@GET
@Path("/resultsReset/{resultId}")
@Produces(MediaType.APPLICATION_JSON)
public Map getResultsAndReset(@PathParam("resultId") String resultId) {
int size = results.get(resultId).size();
Document last = null;
if (!results.get(resultId).isEmpty()) {
last = results.get(resultId).get(size - 1);
results.get(resultId).clear();
}
return CollectionHelper.mapOf("size", size, "last", last);
}

@Path("/convertMapToDocument")
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Map convertMapToDocument(Map input) {
Document doc = camelContext.getTypeConverter().convertTo(Document.class, input);
doc.put("clazz", doc.getClass().getName());
return doc;
}

@Path("/convertAnyObjectToDocument")
@POST
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.APPLICATION_JSON)
public Map convertMapToDocument(String input) {

Document doc = camelContext.getTypeConverter().convertTo(Document.class, new SimplePojo(input));
doc.put("clazz", doc.getClass().getName());
return doc;
}

@RegisterForReflection
private static class SimplePojo {
private String value;

public SimplePojo(String value) {
this.value = value;
}

public String getValue() {
return value;
}

public void setValue(String value) {
this.value = value;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.quarkus.component.mongodb.it;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
import javax.inject.Inject;
import javax.inject.Named;

import org.apache.camel.builder.RouteBuilder;
import org.bson.Document;

@ApplicationScoped
public class MongoDbRoute extends RouteBuilder {

public static String COLLECTION_TAILING = "tailingCollection";
public static String COLLECTION_PERSISTENT_TAILING = "persistentTailingCollection";
public static String COLLECTION_STREAM_CHANGES = "streamChangesgCollection";

@Inject
@Named("results")
Map<String, List<Document>> results;

@Override
public void configure() {
from(String.format("mongodb:%s?database=test&collection=%s&tailTrackIncreasingField=increasing",
MongoDbResource.DEFAULT_MONGO_CLIENT_NAME, COLLECTION_TAILING))
.process(e -> results.get(COLLECTION_TAILING).add(e.getMessage().getBody(Document.class)));

from(String.format(
"mongodb:%s?database=test&collection=%s&tailTrackIncreasingField=increasing&persistentTailTracking=true&persistentId=darwin",
MongoDbResource.DEFAULT_MONGO_CLIENT_NAME, COLLECTION_PERSISTENT_TAILING))
.id(COLLECTION_PERSISTENT_TAILING)
.process(e -> results.get(COLLECTION_PERSISTENT_TAILING).add(e.getMessage().getBody(Document.class)));

from(String.format("mongodb:%s?database=test&collection=%s&consumerType=changeStreams",
MongoDbResource.DEFAULT_MONGO_CLIENT_NAME, COLLECTION_STREAM_CHANGES))
.routeProperty("streamFilter", "{'$match':{'$or':[{'fullDocument.string': 'value2'}]}}")
.process(e -> results.get(COLLECTION_STREAM_CHANGES).add(e.getMessage().getBody(Document.class)));
}

@Produces
@ApplicationScoped
@Named("results")
Map<String, List<Document>> results() {
Map<String, List<Document>> result = new HashMap<>();
result.put(COLLECTION_TAILING, new CopyOnWriteArrayList<>());
result.put(COLLECTION_PERSISTENT_TAILING, new CopyOnWriteArrayList<>());
result.put(COLLECTION_STREAM_CHANGES, new CopyOnWriteArrayList<>());
return result;
}
}

0 comments on commit 4a16e1d

Please sign in to comment.