Skip to content
Permalink
Browse files
Azure Storage Queue : increase Producer test coverage
Fixes #3577
  • Loading branch information
zbendhiba authored and ppalaga committed Apr 12, 2022
1 parent 23e1108 commit 048f1c2d08c26189e4cc577126824ca6a1da8d36
Showing 5 changed files with 255 additions and 23 deletions.
@@ -37,7 +37,7 @@
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy</artifactId>
<artifactId>quarkus-resteasy-jackson</artifactId>
</dependency>

<!-- test dependencies -->
@@ -17,6 +17,8 @@
package org.apache.camel.quarkus.component.azure.storage.queue.it;

import java.net.URI;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -28,6 +30,7 @@
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@@ -38,9 +41,14 @@
import com.azure.storage.common.StorageSharedKeyCredential;
import com.azure.storage.queue.QueueServiceClient;
import com.azure.storage.queue.QueueServiceClientBuilder;
import com.azure.storage.queue.models.PeekedMessageItem;
import com.azure.storage.queue.models.QueueItem;
import com.azure.storage.queue.models.QueueMessageItem;
import com.azure.storage.queue.models.QueuesSegmentOptions;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.component.azure.storage.queue.QueueConstants;
import org.apache.camel.component.azure.storage.queue.QueueOperationDefinition;
import org.apache.camel.quarkus.component.azure.storage.queue.it.model.ExampleMessage;
import org.eclipse.microprofile.config.inject.ConfigProperty;

@Path("/azure-storage-queue")
@@ -83,14 +91,28 @@ public Response createQueue() throws Exception {

@Path("/queue/read")
@GET
@Produces(MediaType.TEXT_PLAIN)
public String retrieveMessage() throws Exception {
@Produces(MediaType.APPLICATION_JSON)
public List<ExampleMessage> retrieveMessage() throws Exception {
@SuppressWarnings("unchecked")
List<QueueMessageItem> messages = producerTemplate.requestBody(
componentUri(QueueOperationDefinition.receiveMessages),
null, List.class);
return messages.stream()
.map(QueueMessageItem::getBody)
.map(this::createMessage)
.collect(Collectors.toList());
}

@Path("/queue/peek")
@GET
@Produces(MediaType.TEXT_PLAIN)
public String peekOneMessage() throws Exception {
@SuppressWarnings("unchecked")
List<PeekedMessageItem> messages = producerTemplate.requestBodyAndHeader(
componentUri(QueueOperationDefinition.peekMessages),
null, QueueConstants.MAX_MESSAGES, 1,
List.class);
return messages.stream()
.map(PeekedMessageItem::getBody)
.map(BinaryData::toString)
.collect(Collectors.joining("\n"));
}
@@ -105,6 +127,21 @@ public Response addMessage(String message) throws Exception {
return Response.created(new URI("https://camel.apache.org/")).build();
}

@Path("/queue/list")
@GET
@Produces(MediaType.TEXT_PLAIN)
public String listQueues() throws Exception {
QueuesSegmentOptions queuesSegmentOptions = new QueuesSegmentOptions();
queuesSegmentOptions.setIncludeMetadata(true);
@SuppressWarnings("unchecked")
List<QueueItem> messages = producerTemplate.requestBodyAndHeader(
componentUri(QueueOperationDefinition.listQueues),
null, QueueConstants.QUEUES_SEGMENT_OPTIONS, queuesSegmentOptions, List.class);
return messages.stream()
.map(QueueItem::getName)
.collect(Collectors.joining("\n"));
}

@Path("/queue/delete")
@DELETE
public Response deleteQueue() throws Exception {
@@ -114,10 +151,58 @@ public Response deleteQueue() throws Exception {
return Response.noContent().build();
}

@Path("/queue/delete/{id}/{popReceipt}")
@DELETE
public Response deleteMessageById(@PathParam("id") String id, @PathParam("popReceipt") String popReceipt) throws Exception {
var headers = new HashMap<String, Object>();
headers.put(QueueConstants.MESSAGE_ID, id);
headers.put(QueueConstants.POP_RECEIPT, popReceipt);
headers.put(QueueConstants.VISIBILITY_TIMEOUT, Duration.ofMillis(10));
producerTemplate.sendBodyAndHeaders(
componentUri(QueueOperationDefinition.deleteMessage),
null, headers);
return Response.noContent().build();
}

@Path("/queue/update/{id}/{popReceipt}")
@POST
@Consumes(MediaType.TEXT_PLAIN)
public Response addMessage(@PathParam("id") String id, @PathParam("popReceipt") String popReceipt, String message)
throws Exception {
var headers = new HashMap<String, Object>();
headers.put(QueueConstants.MESSAGE_ID, id);
headers.put(QueueConstants.POP_RECEIPT, popReceipt);
headers.put(QueueConstants.VISIBILITY_TIMEOUT, Duration.ofMillis(10));

producerTemplate.sendBodyAndHeaders(
componentUri(QueueOperationDefinition.updateMessage),
message,
headers);
return Response.created(new URI("https://camel.apache.org/")).build();
}

@Path("/queue/clear")
@GET
public Response clearQueue() throws Exception {
producerTemplate.sendBody(
componentUri(QueueOperationDefinition.clearQueue),
null);
return Response.noContent().build();
}

private String componentUri(final QueueOperationDefinition operation) {
return String.format("azure-storage-queue://%s/%s?operation=%s",
azureStorageAccountName, QUEUE_NAME,
operation.name());
}

private ExampleMessage createMessage(final QueueMessageItem messageItem) {
var message = new ExampleMessage();
var binaryData = messageItem.getBody();
message.setBody(binaryData == null ? null : binaryData.toString());
message.setId(messageItem.getMessageId());
message.setPopReceipt(messageItem.getPopReceipt());
return message;
}

}
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.quarkus.component.azure.storage.queue.it.model;

import io.quarkus.runtime.annotations.RegisterForReflection;

@RegisterForReflection
public class ExampleMessage {
private String id;
private String body;
private String popReceipt;

public ExampleMessage() {
}

public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

public String getBody() {
return body;
}

public void setBody(String body) {
this.body = body;
}

public String getPopReceipt() {
return popReceipt;
}

public void setPopReceipt(String popReceipt) {
this.popReceipt = popReceipt;
}
}
@@ -16,47 +16,137 @@
*/
package org.apache.camel.quarkus.component.azure.storage.queue.it;

import java.util.LinkedHashMap;
import java.util.List;

import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
import io.restassured.http.ContentType;
import org.apache.camel.quarkus.test.support.azure.AzureStorageTestResource;
import org.junit.jupiter.api.Test;

import static org.hamcrest.core.Is.is;
import static io.restassured.RestAssured.given;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

@QuarkusTest
@QuarkusTestResource(AzureStorageTestResource.class)
class AzureStorageQueueTest {

@Test
public void crud() {
String message = "Hello Camel Quarkus Azure Queue";
try {
String message = "Hello Camel Quarkus Azure Queue ";

// Create
RestAssured.given()
.contentType(ContentType.TEXT)
.post("/azure-storage-queue/queue/create")
.then()
.statusCode(201);
// Create
given()
.contentType(ContentType.TEXT)
.post("/azure-storage-queue/queue/create")
.then()
.statusCode(201);

// create 2 messages
for (int i = 1; i < 2; i++) {
addMessage(message + i);
}

// peek one message
RestAssured.get("/azure-storage-queue/queue/peek")
.then()
.statusCode(200)
.body(is(message + "1"));

// Read 2 messages
List<LinkedHashMap<String, String>> response = null;
for (int i = 1; i < 2; i++) {
response = readMessage();
assertNotNull(response);
assertEquals(1, response.size());
assertNotNull(response.get(0));
assertEquals(message + i, response.get(0).get("body"));
}

// updating message
// get needed informations from last message
var id = response.get(0).get("id");
var popReceipt = response.get(0).get("popReceipt");

message = "Update Camel Quarkus example message";
given()
.contentType(ContentType.TEXT)
.body(message)
.post(String.format("/azure-storage-queue/queue/update/%s/%s", id, popReceipt))
.then()
.statusCode(201);

// reading update message
response = readMessage();
assertNotNull(response);
assertNotNull(response.get(0));
assertEquals(message, response.get(0).get("body"));

// list queues
RestAssured.given()
.contentType(ContentType.TEXT)
.get("/azure-storage-queue/queue/list")
.then()
.statusCode(200)
.body(containsString("camel-quarkus"));

// adding another message to the queue
addMessage(message);

RestAssured.given()
// clear queue
given()
.get("/azure-storage-queue/queue/clear")
.then()
.statusCode(204);

// Read and make sure the queue was cleared
response = readMessage();
assertNotNull(response);
assertEquals(0, response.size());

// adding new message
addMessage(message);

// peek latest message
response = readMessage();
id = response.get(0).get("id");
popReceipt = response.get(0).get("popReceipt");

// delete message by id
RestAssured.delete("/azure-storage-queue/queue/delete/" + id + "/" + popReceipt)
.then()
.statusCode(204);

} finally {
// Delete
RestAssured.delete("/azure-storage-queue/queue/delete")
.then()
.statusCode(204);
}
}

private void addMessage(String message) {
given()
.contentType(ContentType.TEXT)
.body(message)
.post("/azure-storage-queue/queue/message")
.then()
.statusCode(201);
}

// Read
RestAssured.get("/azure-storage-queue/queue/read")
.then()
.statusCode(200)
.body(is(message));

// Delete
RestAssured.delete("/azure-storage-queue/queue/delete")
.then()
.statusCode(204);
@SuppressWarnings("unchecked")
private List<LinkedHashMap<String, String>> readMessage() {
return (List<LinkedHashMap<String, String>>) given()
.contentType(ContentType.JSON)
.when()
.get("/azure-storage-queue/queue/read")
.as(List.class);
}

}
@@ -40,6 +40,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-jsonb</artifactId>

0 comments on commit 048f1c2

Please sign in to comment.