Skip to content

Commit

Permalink
Simplify the client creation in KinesisTest
Browse files Browse the repository at this point in the history
  • Loading branch information
ppalaga committed Feb 23, 2021
1 parent 99630f2 commit 6877792
Showing 1 changed file with 34 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.camel.quarkus.component.aws2.kinesis.it;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand All @@ -25,6 +24,7 @@
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
import io.restassured.http.ContentType;
import org.apache.camel.quarkus.test.support.aws2.Aws2Client;
import org.apache.camel.quarkus.test.support.aws2.Aws2TestResource;
import org.apache.commons.lang3.RandomStringUtils;
import org.awaitility.Awaitility;
Expand All @@ -33,12 +33,9 @@
import org.hamcrest.Matchers;
import org.jboss.logging.Logger;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import org.testcontainers.containers.localstack.LocalStackContainer.Service;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
Expand All @@ -51,6 +48,9 @@ class Aws2KinesisTest {

private static final Logger LOG = Logger.getLogger(Aws2KinesisTest.class);

@Aws2Client(Service.S3)
S3Client client;

@Test
public void kinesis() {
final String msg = "kinesis-" + java.util.UUID.randomUUID().toString().replace("-", "");
Expand All @@ -77,8 +77,10 @@ public void firehose() {
+ msgPrefix + "...");
final long deadline = System.currentTimeMillis() + (Aws2KinesisTestEnvCustomizer.BUFFERING_TIME_SEC * 1000);
while (bytesSent < maxDataBytes && System.currentTimeMillis() < deadline) {
/* Send at least 1MB of data but do not spend more than a minute by doing it.
* This is to overpass minimum buffering limits we have set via BufferingHints in the EnvCustomizer */
/*
* Send at least 1MB of data but do not spend more than a minute by doing it.
* This is to overpass minimum buffering limits we have set via BufferingHints in the EnvCustomizer
*/
RestAssured.given() //
.contentType(ContentType.TEXT)
.body(msg)
Expand All @@ -92,45 +94,34 @@ public void firehose() {

final Config config = ConfigProvider.getConfig();

S3ClientBuilder builder = S3Client.builder()
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(
config.getValue("camel.component.aws2-kinesis.access-key", String.class),
config.getValue("camel.component.aws2-kinesis.secret-key", String.class))))
.region(Region.of(config.getValue("camel.component.aws2-kinesis.region", String.class)));

config.getOptionalValue("camel.component.aws2-kinesis.uri-endpoint-override",
String.class).ifPresent(endpointOverride -> builder.endpointOverride(URI.create(endpointOverride)));
try (S3Client client = builder.build()) {

final String bucketName = config.getValue("aws-kinesis.s3-bucket-name", String.class);
final String bucketName = config.getValue("aws-kinesis.s3-bucket-name", String.class);

Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(120, TimeUnit.SECONDS).until(
() -> {
final ListObjectsResponse objects = client
.listObjects(ListObjectsRequest.builder().bucket(bucketName).build());
final List<S3Object> objs = objects.contents();
LOG.info("There are " + objs.size() + " objects in bucket " + bucketName);
for (S3Object obj : objs) {
LOG.info("Checking object " + obj.key() + " of size " + obj.size());
try (ResponseInputStream<GetObjectResponse> o = client
.getObject(GetObjectRequest.builder().bucket(bucketName).key(obj.key()).build())) {
final StringBuilder sb = new StringBuilder(msg.length());
final byte[] buf = new byte[1024];
int len;
while ((len = o.read(buf)) >= 0 && sb.length() < msgPrefix.length()) {
sb.append(new String(buf, 0, len, StandardCharsets.UTF_8));
}
final String foundContent = sb.toString();
if (foundContent.startsWith(msgPrefix)) {
/* Yes, this is what we have sent */
LOG.info("Found the expected content in object " + obj.key());
return true;
}
Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(120, TimeUnit.SECONDS).until(
() -> {
final ListObjectsResponse objects = client
.listObjects(ListObjectsRequest.builder().bucket(bucketName).build());
final List<S3Object> objs = objects.contents();
LOG.info("There are " + objs.size() + " objects in bucket " + bucketName);
for (S3Object obj : objs) {
LOG.info("Checking object " + obj.key() + " of size " + obj.size());
try (ResponseInputStream<GetObjectResponse> o = client
.getObject(GetObjectRequest.builder().bucket(bucketName).key(obj.key()).build())) {
final StringBuilder sb = new StringBuilder(msg.length());
final byte[] buf = new byte[1024];
int len;
while ((len = o.read(buf)) >= 0 && sb.length() < msgPrefix.length()) {
sb.append(new String(buf, 0, len, StandardCharsets.UTF_8));
}
final String foundContent = sb.toString();
if (foundContent.startsWith(msgPrefix)) {
/* Yes, this is what we have sent */
LOG.info("Found the expected content in object " + obj.key());
return true;
}
}
return false;
});
}
}
return false;
});

}

Expand Down

0 comments on commit 6877792

Please sign in to comment.