Skip to content
Permalink
Browse files
AMQCLI-7 Add support for filtering by dest
Support only exporting a subset of destinations by destination pattern
  • Loading branch information
cshannon committed Feb 16, 2017
1 parent b8d33cd commit 35ddb07d58aebae77cf099ee8d5b6a572aa8d2e8
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 21 deletions.
@@ -17,12 +17,16 @@
package org.apache.activemq.cli.kahadb.exporter;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
@@ -46,25 +50,42 @@ public KahaDBExporter(final KahaDBPersistenceAdapter adapter,
this.recoveryListener = recoveryListener;
}


@Override
public void exportMetadata() throws IOException {
metadataExporter.export();
}

@Override
public void exportQueues() throws IOException {
exportDestinations(ActiveMQDestination.QUEUE_TYPE);
exportQueues(DestinationFilter.ANY_DESCENDENT);
}

@Override
public void exportTopics() throws IOException {
exportDestinations(ActiveMQDestination.TOPIC_TYPE);
exportTopics(DestinationFilter.ANY_DESCENDENT);
}

@Override
public void exportQueues(String pattern) throws IOException {
exportDestinations(new ActiveMQQueue(pattern));
}

private void exportDestinations(byte destType) throws IOException {
final Set<ActiveMQDestination> destinations = adapter.getDestinations().stream().filter(
dest -> dest.getDestinationType() == destType).collect(Collectors.toSet());
@Override
public void exportTopics(String pattern) throws IOException {
exportDestinations(new ActiveMQTopic(pattern));
}

private void exportDestinations(ActiveMQDestination destPattern) throws IOException {

//Main destination filter
final DestinationFilter destFilter = DestinationFilter.parseFilter(destPattern);
//Secondary filter to catch a couple of extra edge cases
final Predicate<ActiveMQDestination> f = getExportDestinationFilter(destPattern);

final Set<ActiveMQDestination> destinations = adapter.getDestinations().stream()
.filter(dest -> destFilter.matches(dest))
.filter(f)
.collect(Collectors.toSet());

// loop through all queues and export them
for (final ActiveMQDestination destination : destinations) {
@@ -83,4 +104,40 @@ private void exportDestinations(byte destType) throws IOException {
}
}

/**
* Do extra processing to filter out destinations. This is because the destination filter
* will match some destianations that we don't want. For example, "test.queue" will match
* as true for the pattern "test.queue.>" which is not correct.
* @param destPattern
* @return
*/
private Predicate<ActiveMQDestination> getExportDestinationFilter(final ActiveMQDestination destPattern) {
//We need to check each composite destination individually
final List<ActiveMQDestination> nonComposite = destPattern.isComposite()
? Arrays.asList(destPattern.getCompositeDestinations()) : Arrays.asList(destPattern);

return (e) -> {
boolean match = false;
for (ActiveMQDestination d : nonComposite) {
String destString = d.getPhysicalName();
//don't match a.b when using a.b.>
if (destPattern.isPattern() && destString.length() > 1 && destString.endsWith(DestinationFilter.ANY_DESCENDENT)) {
final String startsWithString = destString.substring(0, destString.length() - 2);
match = e.getPhysicalName().startsWith(startsWithString) && !e.getPhysicalName().equals(startsWithString);
//non wildcard should be an exact match
} else if (!destPattern.isPattern()) {
match = e.getPhysicalName().equals(destString);
} else {
match = true;
}
if (match) {
break;
}
}

return match;
};

}

}
@@ -24,5 +24,9 @@ public interface MessageStoreExporter {

public void exportQueues() throws IOException;

public void exportQueues(String pattern) throws IOException;

public void exportTopics() throws IOException;

public void exportTopics(String pattern) throws IOException;
}
@@ -156,13 +156,7 @@ public void testExportQueues() throws Exception {
File xmlFile = new File(storeFolder.getRoot().getAbsoluteFile(), "outputXml.xml");
Exporter.exportKahaDbStore(kahaDbDir, xmlFile);

try (BufferedReader br = new BufferedReader(new FileReader(xmlFile))) {
String line = null;
while ((line = br.readLine()) != null) {
System.out.println(line);
}
}

// printFile(xmlFile);

validate(xmlFile, 17);

@@ -272,14 +266,7 @@ public void testExportTopics() throws Exception {
File xmlFile = new File(storeFolder.getRoot().getAbsoluteFile(), "outputXml.xml");
Exporter.exportKahaDbStore(kahaDbDir, xmlFile);


try (BufferedReader br = new BufferedReader(new FileReader(xmlFile))) {
String line = null;
while ((line = br.readLine()) != null) {
System.out.println(line);
}
}

// printFile(xmlFile);

validate(xmlFile, 5);

@@ -360,4 +347,13 @@ private void validate(File file, int count) throws JAXBException {
assertEquals(count, read.getValue().getMessages().getMessage().size());
}

private void printFile(File file) throws IOException {
try (BufferedReader br = new BufferedReader(new FileReader(file))) {
String line = null;
while ((line = br.readLine()) != null) {
System.out.println(line);
}
}
}

}

0 comments on commit 35ddb07

Please sign in to comment.