Skip to content

Commit

Permalink
Added edge compression to the RadixTreeParser. This will combine
Browse files Browse the repository at this point in the history
nodes with only 1 child into a single node.
  • Loading branch information
soleger committed Mar 24, 2017
1 parent 4d89b2f commit 03717ba
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;

import org.opennms.core.collections.RadixTree;
import org.opennms.core.collections.RadixTreeImpl;
Expand Down Expand Up @@ -69,9 +70,31 @@
*/
public class RadixTreeParser implements ByteBufferParser<SyslogMessage> {

final static Logger LOG = LoggerFactory.getLogger(RadixTreeParser.class);
private final static Logger LOG = LoggerFactory.getLogger(RadixTreeParser.class);

RadixTree<ParserStage> tree = new RadixTreeImpl<>();
final RadixTree<ParserStage> tree = new RadixTreeImpl<>();

// private static final ThreadPoolExecutor m_executor = new ThreadPoolExecutor(
// 1,
// 1,
// 0L, TimeUnit.MILLISECONDS,
// new SynchronousQueue<>(true),
// new ThreadFactory() {
// final AtomicInteger index = new AtomicInteger();
// @Override
// public Thread newThread(Runnable r) {
// return new Thread(r, RadixTreeParser.class.getSimpleName() + "-Thread-" + String.valueOf(index.incrementAndGet()));
// }
// },
// // Throttle incoming tasks by running them on the caller thread
// new ThreadPoolExecutor.CallerRunsPolicy()
// );

// private static final ExecutorService m_executor = Executors.newSingleThreadExecutor();

// private final ExecutorService m_executor = new ExecutorFactoryCassandraSEPImpl().newExecutor("StagedParser", "StageExecutor");

// private final ExecutorService m_executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);

/**
* Teach a new {@link ParserStage} sequence to this parser.
Expand All @@ -82,6 +105,74 @@ public void teach(ParserStage[] stages) {
tree.addChildren(stages);
}

public void performEdgeCompression() {
for (RadixTreeNode<ParserStage> child : tree.getChildren()) {
compressNode(child);
}
}

public static void compressNode(RadixTreeNode<ParserStage> node) {
if (node.getChildren() != null) {
switch(node.getChildren().size()) {
case(0):
return;
case(1):
RadixTreeNode<ParserStage> child = node.getChildren().iterator().next();
if (node.getContent() instanceof CompositeParserStage) {
((CompositeParserStage)node.getContent()).members.add(child.getContent());
} else {
CompositeParserStage stage = new CompositeParserStage();
stage.members.add(node.getContent());
stage.members.add(node.getChildren().iterator().next().getContent());
node.setContent(stage);
}
// Link the child's children to this node
node.setChildren(child.getChildren());
// Recompress the node
compressNode(node);
break;
default:
for (RadixTreeNode<ParserStage> current : node.getChildren()) {
compressNode(current);
}
break;
}
}
}

private static class CompositeParserStage implements ParserStage {

public final List<ParserStage> members = new ArrayList<>();

@Override
public ParserState apply(ParserState state) {
ParserState currentState = state;
for (ParserStage member : members) {
currentState = member.apply(currentState);
}
return currentState;
}

@Override
public void setOptional(boolean optional) {
throw new UnsupportedOperationException();
}

@Override
public void setTerminal(boolean terminal) {
throw new UnsupportedOperationException();
}

@Override
public String toString() {
StringBuffer buffer = new StringBuffer();
buffer.append("[");
buffer.append(members.stream().map(ParserStage::toString).collect(Collectors.joining(", ")));
buffer.append("]");
return buffer.toString();
}
}

@Override
public CompletableFuture<SyslogMessage> parse(ByteBuffer incoming) {
ParserState state = new ParserState(incoming);
Expand Down Expand Up @@ -114,7 +205,7 @@ public CompletableFuture<SyslogMessage> parse(ByteBuffer incoming) {
* node.getChildren().size() == 1. Then figure out how to do that recursively.
*/
private static void addStageFutures(List<CompletableFuture<ParserState>> finishedFutures, CompletableFuture<ParserState> parent, RadixTreeNode<ParserStage> node) {
CompletableFuture<ParserState> current = null;
final CompletableFuture<ParserState> current;

// If we're at the root of the radix tree (where the content is null),
// use the parent future as the current future
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class RadixTreeSyslogParser extends SyslogParser {
}
radixParser.teach(GrokParserStageSequenceBuilder.parseGrok(pattern).toArray(new ParserStage[0]));
});
System.out.println("Parser tree: " + radixParser.tree.toString());
radixParser.performEdgeCompression();
}

public RadixTreeSyslogParser(SyslogdConfig config, ByteBuffer syslogString) {
Expand Down

0 comments on commit 03717ba

Please sign in to comment.