Skip to content

Commit

Permalink
Move AMQP broker to test-util
Browse files Browse the repository at this point in the history
This is to allow it to be used e.g. from model-intest.
  • Loading branch information
mederly committed Nov 15, 2019
1 parent 5d72f44 commit 4641822
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 32 deletions.
16 changes: 16 additions & 0 deletions infra/test-util/pom.xml
Expand Up @@ -115,5 +115,21 @@
<artifactId>test-ng</artifactId>
<version>4.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-plugins-amqp-0-8-protocol</artifactId>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-plugins-memory-store</artifactId>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
</dependencies>
</project>
@@ -1,13 +1,12 @@
/*
* Copyright (c) 2010-2019 Evolveum and contributors
* Copyright (c) 2019 Evolveum and contributors
*
* This work is dual-licensed under the Apache License 2.0
* and European Union Public License. See LICENSE file for details.
*/

package com.evolveum.midpoint.provisioning.impl.async;
package com.evolveum.midpoint.test.amqp;

import com.evolveum.midpoint.provisioning.ucf.impl.builtin.async.sources.Amqp091AsyncUpdateSource;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
Expand All @@ -23,39 +22,43 @@

public class EmbeddedBroker {

private static final String DEFAULT_CONFIG_RESOURCE_PATH = "amqp/default-qpid-config.json";

private final SystemLauncher broker = new SystemLauncher();

void start() throws Exception {
public void start() throws Exception {
start(DEFAULT_CONFIG_RESOURCE_PATH);
}

public void start(String configResourcePath) throws Exception {
System.out.println("Starting the broker");
Map<String, Object> attributes = new HashMap<>();
attributes.put("type", "Memory");
attributes.put("initialConfigurationLocation", findResourcePath("async/qpid-config.json"));
attributes.put("initialConfigurationLocation", findResourcePath(configResourcePath));
broker.startup(attributes);
}

private String findResourcePath(String fileName) {
return EmbeddedBroker.class.getClassLoader().getResource(fileName).toExternalForm();
}

void stop() {
public void stop() {
System.out.println("Stopping the broker");
broker.shutdown();
}

void send(String queueName, String message) throws IOException, TimeoutException {
public void send(String queueName, String message, Map<String, Object> headers) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.basicPublish("", queueName, createProperties(), message.getBytes(StandardCharsets.UTF_8));
channel.basicPublish("", queueName, createProperties(headers), message.getBytes(StandardCharsets.UTF_8));
System.out.println("Sent '" + message + "'");
}
}

@NotNull
private AMQP.BasicProperties createProperties() {
Map<String, Object> headers = new HashMap<>();
headers.put(Amqp091AsyncUpdateSource.HEADER_LAST_MESSAGE, true);
private AMQP.BasicProperties createProperties(Map<String, Object> headers) {
return new AMQP.BasicProperties()
.builder()
.headers(headers)
Expand Down
20 changes: 0 additions & 20 deletions provisioning/provisioning-impl/pom.xml
Expand Up @@ -267,26 +267,6 @@
<artifactId>spring-aop</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-plugins-amqp-0-8-protocol</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-plugins-memory-store</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<scope>test</scope>
</dependency>

<!-- NOT needed at all. But dependency:analyze plugin complains if it is not here. -->
<dependency>
Expand Down
Expand Up @@ -7,14 +7,18 @@

package com.evolveum.midpoint.provisioning.impl.async;

import com.evolveum.midpoint.provisioning.ucf.impl.builtin.async.sources.Amqp091AsyncUpdateSource;
import com.evolveum.midpoint.schema.result.OperationResult;
import com.evolveum.midpoint.task.api.Task;
import com.evolveum.midpoint.test.amqp.EmbeddedBroker;
import org.apache.commons.io.IOUtils;
import org.testng.annotations.AfterClass;

import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
Expand Down Expand Up @@ -46,6 +50,8 @@ public void stop() {
@Override
void prepareMessage(File messageFile) throws IOException, TimeoutException {
String message = String.join("\n", IOUtils.readLines(new FileReader(messageFile)));
embeddedBroker.send(QUEUE_NAME, message);
Map<String, Object> headers = new HashMap<>();
headers.put(Amqp091AsyncUpdateSource.HEADER_LAST_MESSAGE, true);
embeddedBroker.send(QUEUE_NAME, message, headers);
}
}

0 comments on commit 4641822

Please sign in to comment.