Skip to content

Commit

Permalink
AMQCLI-3 Add a utility method for exporting a store
Browse files Browse the repository at this point in the history
  • Loading branch information
cshannon committed Feb 15, 2017
1 parent d297503 commit 419019c
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 70 deletions.
Expand Up @@ -16,6 +16,23 @@
*/
package org.apache.activemq.cli.kahadb.exporter;

import static org.junit.Assert.fail;

import java.io.File;
import java.io.FileOutputStream;

import javax.xml.stream.XMLOutputFactory;
import javax.xml.stream.XMLStreamWriter;

import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.cli.artemis.schema.ArtemisJournalMarshaller;
import org.apache.activemq.cli.kahadb.exporter.artemis.ArtemisXmlMessageRecoveryListener;
import org.apache.activemq.cli.schema.QueueBindingType;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;

/**
* KahaDB Exporter
*/
Expand All @@ -24,4 +41,54 @@ public class Exporter {
public static void main(String[] args) {

}

public static void exportKahaDbStore(final File kahaDbDir, final File artemisXml) throws Exception {

KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
adapter.setDirectory(kahaDbDir);
adapter.start();

try(FileOutputStream fos = new FileOutputStream(artemisXml)) {
XMLStreamWriter xmlWriter = XMLOutputFactory.newFactory().createXMLStreamWriter(fos);
ArtemisJournalMarshaller xmlMarshaller = new ArtemisJournalMarshaller(xmlWriter);

xmlMarshaller.appendJournalOpen();
xmlMarshaller.appendBindingsElement();

adapter.getStore().getDestinations().stream()
.forEach(dest -> {
try {
if (dest.isQueue()) {
xmlMarshaller.appendBinding(QueueBindingType.builder()
.withName(dest.getPhysicalName())
.withRoutingType(RoutingType.ANYCAST.toString())
.withAddress(dest.getPhysicalName()).build());
} else if (dest.isTopic()) {
for (SubscriptionInfo info :
adapter.getStore().createTopicMessageStore((ActiveMQTopic) dest).getAllSubscriptions()) {
xmlMarshaller.appendBinding(QueueBindingType.builder()
.withName(ActiveMQDestination.createQueueNameForDurableSubscription(
true, info.getClientId(), info.getSubcriptionName()))
.withRoutingType(RoutingType.MULTICAST.toString())
.withAddress(dest.getPhysicalName()).build());
}
}
} catch (Exception e) {
throw new IllegalStateException(e);
}
});

xmlMarshaller.appendEndElement();
xmlMarshaller.appendMessagesElement();

KahaDBExporter dbExporter = new KahaDBExporter(adapter,
new ArtemisXmlMessageRecoveryListener(adapter.getStore(), xmlMarshaller));

dbExporter.exportQueues();
dbExporter.exportTopics();
xmlMarshaller.appendJournalClose(true);
} finally {
adapter.stop();
}
}
}
Expand Up @@ -103,10 +103,11 @@ public class ExporterTest {
@Test
public void testExportQueues() throws Exception {

File kahaDbDir = storeFolder.newFolder();
ActiveMQQueue queue = new ActiveMQQueue("test.queue");
KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
adapter.setJournalMaxFileLength(1024 * 1024);
adapter.setDirectory(storeFolder.newFolder());
adapter.setDirectory(kahaDbDir);
adapter.start();
MessageStore messageStore = adapter.createQueueMessageStore(queue);
messageStore.start();
Expand Down Expand Up @@ -162,46 +163,26 @@ public void testExportQueues() throws Exception {
messageStore.addMessage(context, message);
}

messageStore.stop();

File file = storeFolder.newFile();
try(FileOutputStream fos = new FileOutputStream(file)) {
XMLStreamWriter xmlWriter = XMLOutputFactory.newFactory().createXMLStreamWriter(fos);
ArtemisJournalMarshaller xmlMarshaller = new ArtemisJournalMarshaller(xmlWriter);

xmlMarshaller.appendJournalOpen();
xmlMarshaller.appendBindingsElement();
xmlMarshaller.appendBinding(QueueBindingType.builder()
.withName("test.queue")
.withRoutingType(RoutingType.ANYCAST.toString())
.withAddress("test.queue").build());
xmlMarshaller.appendEndElement();
xmlMarshaller.appendMessagesElement();

KahaDBExporter dbExporter = new KahaDBExporter(adapter,
new ArtemisXmlMessageRecoveryListener(adapter.getStore(), xmlMarshaller));

dbExporter.exportQueues();
xmlMarshaller.appendJournalClose(true);
}

adapter.stop();

try (BufferedReader br = new BufferedReader(new FileReader(file))) {
File xmlFile = storeFolder.newFile();
Exporter.exportKahaDbStore(kahaDbDir, xmlFile);

try (BufferedReader br = new BufferedReader(new FileReader(xmlFile))) {
String line = null;
while ((line = br.readLine()) != null) {
System.out.println(line);
}
}


validate(file, 17);
validate(xmlFile, 17);

final ActiveMQServer artemisServer = buildArtemisBroker();
artemisServer.start();

XmlDataImporter dataImporter = new XmlDataImporter();
dataImporter.process(file.getAbsolutePath(), "localhost", 61400, false);
dataImporter.process(xmlFile.getAbsolutePath(), "localhost", 61400, false);

ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61400");

Expand Down Expand Up @@ -263,10 +244,12 @@ public void testExportQueues() throws Exception {
@Test
public void testExportTopics() throws Exception {

File kahaDbDir = storeFolder.newFolder();

ActiveMQTopic topic = new ActiveMQTopic("test.topic");
KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
adapter.setJournalMaxFileLength(1024 * 1024);
adapter.setDirectory(storeFolder.newFolder());
adapter.setDirectory(kahaDbDir);
adapter.start();
TopicMessageStore messageStore = adapter.createTopicMessageStore(topic);
messageStore.start();
Expand Down Expand Up @@ -296,63 +279,27 @@ public void testExportTopics() throws Exception {
//ack for sub1 only
messageStore.acknowledge(context, "clientId1", "sub1", first, new MessageAck());

messageStore.stop();

// String queueName = ActiveMQDestination.createQueueNameForDurableSubscription(true, "clientId1", "sub1");

File file = storeFolder.newFile();
try(FileOutputStream fos = new FileOutputStream(file)) {
XMLStreamWriter xmlWriter = XMLOutputFactory.newFactory().createXMLStreamWriter(fos);
ArtemisJournalMarshaller xmlMarshaller = new ArtemisJournalMarshaller(xmlWriter);

xmlMarshaller.appendJournalOpen();
xmlMarshaller.appendBindingsElement();

adapter.getStore().getDestinations().stream()
.filter(dest -> dest.isTopic()).forEach(dest -> {

try {
for (SubscriptionInfo info :
adapter.getStore().createTopicMessageStore((ActiveMQTopic) dest).getAllSubscriptions()) {
xmlMarshaller.appendBinding(QueueBindingType.builder()
.withName(ActiveMQDestination.createQueueNameForDurableSubscription(
true, info.getClientId(), info.getSubcriptionName()))
.withRoutingType(RoutingType.MULTICAST.toString())
.withAddress(dest.getPhysicalName()).build());
}

} catch (Exception e) {
fail(e.getMessage());
}
});

xmlMarshaller.appendEndElement();
xmlMarshaller.appendMessagesElement();

KahaDBExporter dbExporter = new KahaDBExporter(adapter,
new ArtemisXmlMessageRecoveryListener(adapter.getStore(), xmlMarshaller));
adapter.stop();

dbExporter.exportTopics();
xmlMarshaller.appendJournalClose(true);
}
File xmlFile = storeFolder.newFile();
Exporter.exportKahaDbStore(kahaDbDir, xmlFile);

adapter.stop();

try (BufferedReader br = new BufferedReader(new FileReader(file))) {
try (BufferedReader br = new BufferedReader(new FileReader(xmlFile))) {
String line = null;
while ((line = br.readLine()) != null) {
System.out.println(line);
}
}


validate(file, 5);
validate(xmlFile, 5);

final ActiveMQServer artemisServer = buildArtemisBroker();
artemisServer.start();

XmlDataImporter dataImporter = new XmlDataImporter();
dataImporter.process(file.getAbsolutePath(), "localhost", 61400, false);
dataImporter.process(xmlFile.getAbsolutePath(), "localhost", 61400, false);

ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61400");

Expand Down

0 comments on commit 419019c

Please sign in to comment.