Skip to content

Commit

Permalink
ARTEMIS-4382 Long Time to process export / import
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Aug 1, 2023
1 parent 39b6edd commit bce775c
Show file tree
Hide file tree
Showing 9 changed files with 430 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.activemq.artemis.cli.commands.tools;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -63,7 +65,7 @@ public class DBOption extends OptionalLocking {
@Option(names = "--output", description = "Output name for the file.")
private File output;

private FileOutputStream fileOutputStream;
private OutputStream fileOutputStream;

private PrintStream originalOut;

Expand Down Expand Up @@ -96,6 +98,15 @@ public boolean isJDBC() throws Exception {
return jdbc;
}

public File getOutput() {
return output;
}

public DBOption setOutput(File output) {
this.output = output;
return this;
}

public String getJdbcBindings() throws Exception {
parseDBConfig();
return jdbcBindings;
Expand Down Expand Up @@ -172,7 +183,7 @@ public Object execute(ActionContext context) throws Exception {
super.execute(context);

if (output != null) {
fileOutputStream = new FileOutputStream(output);
fileOutputStream = new BufferedOutputStream(new FileOutputStream(output));
originalOut = context.out;
PrintStream printStream = new PrintStream(fileOutputStream);
context.out = printStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ public class OptionalLocking extends LockAbstract {
@Option(names = "--f", description = "This will allow certain tools like print-data to be performed ignoring any running servers. WARNING: Changing data concurrently with a running broker may damage your data. Be careful with this option.")
boolean ignoreLock;

public boolean isIgnoreLock() {
return ignoreLock;
}

public OptionalLocking setIgnoreLock(boolean ignoreLock) {
this.ignoreLock = ignoreLock;
return this;
}

@Override
protected void lockCLI(File lockPlace) throws Exception {
if (!ignoreLock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ public Object execute(ActionContext context) throws Exception {
}
} catch (Exception e) {
treatError(e, "data", "print");
} finally {
done();
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,8 @@
import javax.xml.stream.XMLStreamConstants;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamReader;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -36,7 +31,6 @@
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.UUIDGenerator;
Expand Down Expand Up @@ -117,7 +111,7 @@ public MessageInfo readMessage(boolean decodeUTF8) throws Exception {
switch (eventType) {
case XMLStreamConstants.START_ELEMENT:
if (XmlDataConstants.MESSAGE_BODY.equals(reader.getLocalName())) {
largeMessageTemporaryFile = processMessageBody(message.toCore(), decodeUTF8);
processMessageBody(message.toCore(), decodeUTF8);
} else if (XmlDataConstants.PROPERTIES_CHILD.equals(reader.getLocalName())) {
processMessageProperties(message);
} else if (XmlDataConstants.QUEUES_CHILD.equals(reader.getLocalName())) {
Expand All @@ -135,7 +129,7 @@ public MessageInfo readMessage(boolean decodeUTF8) throws Exception {
}
reader.next();
}
return new MessageInfo(id, queues, message, largeMessageTemporaryFile);
return new MessageInfo(id, queues, message);
}

private Byte getMessageType(String value) {
Expand Down Expand Up @@ -239,7 +233,7 @@ private void processMessageProperties(Message message) {
}
}

private File processMessageBody(final ICoreMessage message, boolean decodeTextMessage) throws XMLStreamException, IOException {
private void processMessageBody(final ICoreMessage message, boolean decodeTextMessage) throws XMLStreamException, IOException {
File tempFileName = null;
boolean isLarge = false;

Expand All @@ -252,20 +246,7 @@ private File processMessageBody(final ICoreMessage message, boolean decodeTextMe
reader.next();
logger.debug("XMLStreamReader impl: {}", reader);

if (isLarge) {
tempFileName = File.createTempFile("largeMessage", ".tmp");
logger.debug("Creating temp file {} for large message.", tempFileName);
try (OutputStream out = new BufferedOutputStream(new FileOutputStream(tempFileName))) {
getMessageBodyBytes(bytes -> out.write(bytes), (message.toCore().getType() == Message.TEXT_TYPE) && decodeTextMessage);
}
FileInputStream fileInputStream = new FileInputStream(tempFileName);
BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
((ClientMessage) message).setBodyInputStream(bufferedInput);
} else {
getMessageBodyBytes(bytes -> message.getBodyBuffer().writeBytes(bytes), (message.toCore().getType() == Message.TEXT_TYPE) && decodeTextMessage);
}

return tempFileName;
getMessageBodyBytes(bytes -> message.getBodyBuffer().writeBytes(bytes), (message.toCore().getType() == Message.TEXT_TYPE) && decodeTextMessage);
}

/**
Expand Down Expand Up @@ -318,13 +299,11 @@ public class MessageInfo {
public long id;
public List<String> queues;
public Message message;
public File tempFile;

MessageInfo(long id, List<String> queues, Message message, File tempFile) {
MessageInfo(long id, List<String> queues, Message message) {
this.message = message;
this.queues = queues;
this.id = id;
this.tempFile = tempFile;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,15 @@
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine.Option;
import picocli.CommandLine.Command;

@Command(name = "exp", description = "Export all message-data using an XML that could be interpreted by any system.")
public final class XmlDataExporter extends DBOption {

@Option(names = "log-interval", description = "How often to print progress in the console. Set to <= 0 to disable it.")
private int logInterval = 10_000;

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private XMLStreamWriter xmlWriter;
Expand Down Expand Up @@ -103,6 +107,7 @@ public Object execute(ActionContext context) throws Exception {
try {
config = getParameterConfiguration();
process(context.out);
done();
} catch (Exception e) {
treatError(e, "data", "exp");
}
Expand Down Expand Up @@ -368,10 +373,21 @@ private void printBindingsAsXML() throws XMLStreamException {
private void printAllMessagesAsXML() throws Exception {
xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_PARENT);

if (logInterval > 0) {
System.err.println("Processing journal messages");
}

long msgs = 0;
// Order here is important. We must process the messages from the journal before we process those from the page
// files in order to get the messages in the right order.
for (Map.Entry<Long, Message> messageMapEntry : messages.entrySet()) {
printSingleMessageAsXML(messageMapEntry.getValue().toCore(), extractQueueNames(messageRefs.get(messageMapEntry.getKey())));
msgs++;
if (logInterval > 0) {
if (msgs % logInterval == 0) {
System.err.println("exported " + msgs + " messages from journal");
}
}
}

printPagedMessagesAsXML();
Expand All @@ -392,6 +408,7 @@ private void printPagedMessagesAsXML() {

pagingmanager.start();

long msgs = 0;
SimpleString[] stores = pagingmanager.getStoreNames();

for (SimpleString store : stores) {
Expand All @@ -414,6 +431,10 @@ private void printPagedMessagesAsXML() {
try (LinkedListIterator<PagedMessage> iter = messages.iterator()) {

while (iter.hasNext()) {
msgs++;
if (logInterval > 0 && msgs % logInterval == 0) {
System.err.println("Exported " + msgs + " messages from paging");
}
PagedMessage message = iter.next();
message.initMessage(storageManager);
long[] queueIDs = message.getQueueIDs();
Expand Down

0 comments on commit bce775c

Please sign in to comment.