Skip to content

Commit

Permalink
Intermittent failures in Aws2SqsSnsIT fix #2741
Browse files Browse the repository at this point in the history
  • Loading branch information
ppalaga committed Jun 8, 2021
1 parent 73711b0 commit 5a18c7d
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,18 @@
*/
package org.apache.camel.quarkus.component.aws2.ddb.it;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import javax.inject.Named;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import io.quarkus.runtime.StartupEvent;
import org.apache.camel.ConsumerTemplate;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.awssdk.services.dynamodb.model.StreamRecord;

@Path("/aws2-ddbstream")
@ApplicationScoped
Expand All @@ -41,35 +37,14 @@ public class Aws2DdbStreamResource {
String tableName;

@Inject
ConsumerTemplate consumerTemplate;

void startup(@Observes StartupEvent event) {
/* Hit the consumer URI at application startup so that the consumer starts polling eagerly */
consumerTemplate.receiveBody(componentUri(), 1000);
}
@Named("aws2DdbStreamReceivedEvents")
List<Map<String, String>> aws2DdbStreamReceivedEvents;

@Path("/change")
@GET
@Produces(MediaType.APPLICATION_JSON)
public Map<String, String> change() {
Map<String, String> result = new LinkedHashMap<>();
Record record = consumerTemplate.receiveBody(componentUri(), 10000, Record.class);
if (record == null) {
return null;
}
StreamRecord item = record.dynamodb();
result.put("key", item.keys().get("key").s());
if (item.hasOldImage()) {
result.put("old", item.oldImage().get("value").s());
}
if (item.hasNewImage()) {
result.put("new", item.newImage().get("value").s());
}
return result;
}

private String componentUri() {
return "aws2-ddbstream://" + tableName;
public List<Map<String, String>> change() {
return aws2DdbStreamReceivedEvents;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.quarkus.component.aws2.ddb.it;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import javax.ws.rs.Produces;

import org.apache.camel.builder.RouteBuilder;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.awssdk.services.dynamodb.model.StreamRecord;

@ApplicationScoped
public class Aws2DdbStreamRoutes extends RouteBuilder {

@ConfigProperty(name = "aws-ddb.table-name")
String tableName;

@Inject
@Named("aws2DdbStreamReceivedEvents")
List<Map<String, String>> aws2DdbStreamReceivedEvents;

@Override
public void configure() throws Exception {
from("aws2-ddbstream://" + tableName)
.process(e -> {
Record record = e.getMessage().getBody(Record.class);
StreamRecord item = record.dynamodb();
Map<String, String> result = new LinkedHashMap<>();
result.put("key", item.keys().get("key").s());
if (item.hasOldImage()) {
result.put("old", item.oldImage().get("value").s());
}
if (item.hasNewImage()) {
result.put("new", item.newImage().get("value").s());
}
aws2DdbStreamReceivedEvents.add(result);
});
}

static class Producers {
@Singleton
@javax.enterprise.inject.Produces
@Named("aws2DdbStreamReceivedEvents")
List<Map<String, String>> aws2DdbStreamReceivedEvents() {
return new CopyOnWriteArrayList<>();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.camel.quarkus.component.aws2.ddb.it;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -101,23 +102,28 @@ public void crud() {
},
Matchers.is(204));

/* The above actions should trigger the following three change events */
RestAssured.get("/aws2-ddbstream/change")
.then()
.statusCode(200)
.body("key", Matchers.is(key))
.body("new", Matchers.is(msg));
RestAssured.get("/aws2-ddbstream/change")
.then()
.statusCode(200)
.body("key", Matchers.is(key))
.body("old", Matchers.is(msg))
.body("new", Matchers.is(newMsg));
RestAssured.get("/aws2-ddbstream/change")
.then()
.statusCode(200)
.body("key", Matchers.is(key))
.body("old", Matchers.is(newMsg));
Awaitility.await().atMost(120, TimeUnit.SECONDS).until(
() -> {
ExtractableResponse<Response> result = RestAssured.get("/aws2-ddbstream/change")
.then()
.statusCode(200)
.extract();

LOG.info("Expecting 3 events got " + result.statusCode() + ": " + result.body().asString());
return result.jsonPath().getList("$", Map.class);
},
/* The above actions should trigger the following three change events */
list -> list.size() == 3

&& key.equals(list.get(0).get("key"))
&& msg.equals(list.get(0).get("new"))

&& key.equals(list.get(1).get("key"))
&& msg.equals(list.get(1).get("old"))
&& newMsg.equals(list.get(1).get("new"))

&& key.equals(list.get(2).get("key"))
&& newMsg.equals(list.get(2).get("old")));

}

Expand Down

0 comments on commit 5a18c7d

Please sign in to comment.