Skip to content
Permalink
Browse files
fixup 501833 Fix Aws2KinesisTest.kinesis test #3638
  • Loading branch information
ppalaga committed Apr 13, 2022
1 parent 8f729f4 commit ed2e54707875da34633c5aaa9aa3e8e46415bb55
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 14 deletions.
@@ -16,12 +16,13 @@
*/
package org.apache.camel.quarkus.component.aws2.kinesis.it;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URI;
import java.util.Queue;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.inject.Named;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
@@ -30,23 +31,26 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import org.apache.camel.ConsumerTemplate;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.component.aws2.kinesis.Kinesis2Constants;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.jboss.logging.Logger;

@Path("/aws2-kinesis")
@ApplicationScoped
public class Aws2KinesisResource {

private static final Logger log = Logger.getLogger(Aws2KinesisResource.class);

@ConfigProperty(name = "aws-kinesis.stream-name")
String streamName;

@Inject
ProducerTemplate producerTemplate;

@Inject
ConsumerTemplate consumerTemplate;
@Named("aws2KinesisMessages")
Queue<String> aws2KinesisMessages;

@Path("/send")
@POST
@@ -69,12 +73,7 @@ public Response send(String message) throws Exception {
@GET
@Produces(MediaType.TEXT_PLAIN)
public String receive() throws IOException {
try (ByteArrayInputStream result = consumerTemplate.receiveBody(componentUri(), 10000, ByteArrayInputStream.class)) {
if (result == null) {
return null;
}
return new String(result.readAllBytes());
}
return aws2KinesisMessages.poll();
}

private String componentUri() {
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.quarkus.component.aws2.kinesis.it;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;

import org.apache.camel.builder.RouteBuilder;
import org.eclipse.microprofile.config.inject.ConfigProperty;

@ApplicationScoped
public class Aws2KinesisRoutes extends RouteBuilder {

@ConfigProperty(name = "aws-kinesis.stream-name")
String streamName;

@Inject
@Named("aws2KinesisMessages")
Queue<String> aws2KinesisMessages;

private String componentUri() {
return "aws2-kinesis://" + streamName;
}

@Override
public void configure() throws Exception {
from(componentUri())
.process(exchange -> aws2KinesisMessages.add(exchange.getMessage().getBody(String.class)));
}

static class Producers {
@Singleton
@javax.enterprise.inject.Produces
@Named("aws2KinesisMessages")
Queue<String> aws2KinesisMessages() {
return new ConcurrentLinkedDeque<>();
}
}

}
@@ -30,7 +30,6 @@
import org.awaitility.Awaitility;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.hamcrest.Matchers;
import org.jboss.logging.Logger;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.localstack.LocalStackContainer.Service;
@@ -61,10 +60,14 @@ public void kinesis() {
.then()
.statusCode(201);

RestAssured.get("/aws2-kinesis/receive")
.then()
.statusCode(200)
.body(Matchers.is(msg));
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(
() -> RestAssured.get("/aws2-kinesis/receive").then().extract(),
response -> {
final int status = response.statusCode();
final String body = status == 200 ? response.body().asString() : null;
LOG.info("Got " + status + " " + body);
return response.statusCode() == 200 && msg.equals(body);
});
}

@Test

0 comments on commit ed2e547

Please sign in to comment.