Skip to content
Permalink
Browse files
AMQCLI-3 Add a flag for compression on export
Also use a BufferedOutputStream to improve performance of writes
  • Loading branch information
cshannon committed Feb 15, 2017
1 parent 419019c commit 76ca845ffe77b3f1eec265a24f3c8c5918ed3df7
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 19 deletions.
@@ -16,10 +16,11 @@
*/
package org.apache.activemq.cli.kahadb.exporter;

import static org.junit.Assert.fail;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.util.zip.GZIPOutputStream;

import javax.xml.stream.XMLOutputFactory;
import javax.xml.stream.XMLStreamWriter;
@@ -32,23 +33,40 @@
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* KahaDB Exporter
*/
public class Exporter {

static final Logger LOG = LoggerFactory.getLogger(Exporter.class);

public static void main(String[] args) {


}

public static void exportKahaDbStore(final File kahaDbDir, final File artemisXml) throws Exception {
Exporter.exportKahaDbStore(kahaDbDir, artemisXml, false);
}

public static void exportKahaDbStore(final File kahaDbDir, final File artemisXml,
boolean compress) throws Exception {

KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
adapter.setDirectory(kahaDbDir);
adapter.start();

try(FileOutputStream fos = new FileOutputStream(artemisXml)) {
if (artemisXml.exists()) {
throw new IllegalStateException("File: " + artemisXml + " already exists");
}

long start = System.currentTimeMillis();
try(OutputStream fos = new BufferedOutputStream(compress ? new GZIPOutputStream(
new FileOutputStream(artemisXml)) : new FileOutputStream(artemisXml))) {

XMLStreamWriter xmlWriter = XMLOutputFactory.newFactory().createXMLStreamWriter(fos);
ArtemisJournalMarshaller xmlMarshaller = new ArtemisJournalMarshaller(xmlWriter);

@@ -90,5 +108,8 @@ public static void exportKahaDbStore(final File kahaDbDir, final File artemisXml
} finally {
adapter.stop();
}
long end = System.currentTimeMillis();

LOG.info("Total export time: " + (end - start) + " ms");
}
}
@@ -28,9 +28,13 @@
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.IOExceptionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KahaDBExporter implements MessageStoreExporter {

static final Logger LOG = LoggerFactory.getLogger(KahaDBExporter.class);

private final KahaDBPersistenceAdapter adapter;
private final MessageRecoveryListener recoveryListener;

@@ -49,6 +53,7 @@ public void exportQueues() throws IOException {
// loop through all queues and export them
for (final ActiveMQDestination destination : destinations) {

LOG.info("Starting export of: " + destination);
final ActiveMQQueue queue = (ActiveMQQueue) destination;
final MessageStore messageStore = adapter.createQueueMessageStore(queue);

@@ -68,6 +73,8 @@ public void exportTopics() throws IOException {
dest -> dest.isTopic()).collect(Collectors.toSet());

for (ActiveMQDestination destination : destinations) {
LOG.info("Starting export of: " + destination);

final ActiveMQTopic topic = (ActiveMQTopic) destination;
final TopicMessageStore messageStore = adapter.createTopicMessageStore(topic);

@@ -20,17 +20,14 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import javax.jms.BytesMessage;
import javax.jms.Connection;
@@ -44,28 +41,19 @@
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import javax.xml.stream.XMLOutputFactory;
import javax.xml.stream.XMLStreamWriter;

import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.cli.commands.tools.XmlDataImporter;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.cli.artemis.schema.ArtemisJournalMarshaller;
import org.apache.activemq.cli.kahadb.exporter.artemis.ArtemisXmlMessageRecoveryListener;
import org.apache.activemq.cli.schema.ActivemqJournalType;
import org.apache.activemq.cli.schema.ObjectFactory;
import org.apache.activemq.cli.schema.QueueBindingType;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
@@ -165,7 +153,7 @@ public void testExportQueues() throws Exception {

adapter.stop();

File xmlFile = storeFolder.newFile();
File xmlFile = new File(storeFolder.getRoot().getAbsoluteFile(), "outputXml.xml");
Exporter.exportKahaDbStore(kahaDbDir, xmlFile);

try (BufferedReader br = new BufferedReader(new FileReader(xmlFile))) {
@@ -281,7 +269,7 @@ public void testExportTopics() throws Exception {

adapter.stop();

File xmlFile = storeFolder.newFile();
File xmlFile = new File(storeFolder.getRoot().getAbsoluteFile(), "outputXml.xml");
Exporter.exportKahaDbStore(kahaDbDir, xmlFile);


@@ -18,9 +18,9 @@
#
# The logging properties used during tests..
#
log4j.rootLogger=DEBUG, out, stdout
log4j.rootLogger=INFO, out, stdout

log4j.logger.org.apache.activemq=DEBUG
log4j.logger.org.apache.activemq=INFO

# CONSOLE appender not used by default
log4j.appender.stdout=org.apache.log4j.ConsoleAppender

0 comments on commit 76ca845

Please sign in to comment.