Skip to content

Commit

Permalink
Fix moquette-io#629: Adding subscriptions is very slow
Browse files Browse the repository at this point in the history
- Changed CNode.children List<INode> to Map<Token, INode>
- Changed CNode.add to return boolean
- Optimize execution logic for CNode.anyChildrenMatch, CNode.childOf
- Optimize execution logic for CTire.recursiveMatch
  • Loading branch information
lizexin committed Sep 24, 2021
1 parent 1b1bac7 commit 2c56e84
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 30 deletions.
33 changes: 13 additions & 20 deletions broker/src/main/java/io/moquette/broker/subscriptions/CNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,24 @@
package io.moquette.broker.subscriptions;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

class CNode {

private Token token;
private List<INode> children;
private Map<Token, INode> children;
Set<Subscription> subscriptions;

CNode() {
this.children = new ArrayList<>();
this.children = new ConcurrentHashMap<>();
this.subscriptions = new HashSet<>();
}

//Copy constructor
private CNode(Token token, List<INode> children, Set<Subscription> subscriptions) {
private CNode(Token token, Map<Token, INode> children, Set<Subscription> subscriptions) {
this.token = token; // keep reference, root comparison in directory logic relies on it for now.
this.subscriptions = new HashSet<>(subscriptions);
this.children = new ArrayList<>(children);
this.children = new ConcurrentHashMap<>(children);
}

public Token getToken() {
Expand All @@ -44,25 +45,17 @@ public void setToken(Token token) {
}

boolean anyChildrenMatch(Token token) {
for (INode iNode : children) {
final CNode child = iNode.mainNode();
if (child.equalsToken(token)) {
return true;
}
}
return false;
return this.children.containsKey(token);
}

List<INode> allChildren() {
Map<Token, INode> allChildren() {
return this.children;
}

INode childOf(Token token) {
for (INode iNode : children) {
final CNode child = iNode.mainNode();
if (child.equalsToken(token)) {
return iNode;
}
INode child = this.children.get(token);
if (child != null) {
return child;
}
throw new IllegalArgumentException("Asked for a token that doesn't exists in any child [" + token + "]");
}
Expand All @@ -80,12 +73,12 @@ CNode copy() {
return new CNode(this.token, this.children, this.subscriptions);
}

public void add(INode newINode) {
this.children.add(newINode);
public boolean add(INode newINode) {
return this.children.putIfAbsent(newINode.mainNode().getToken(), newINode) == null;
}

public void remove(INode node) {
this.children.remove(node);
this.children.remove(node.mainNode().getToken());
}

CNode addSubscription(Subscription newSubscription) {
Expand Down
23 changes: 14 additions & 9 deletions broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package io.moquette.broker.subscriptions;

import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.*;

public class CTrie {

Expand Down Expand Up @@ -66,6 +63,9 @@ public Set<Subscription> recursiveMatch(Topic topic) {
}

private Set<Subscription> recursiveMatch(Topic topic, INode inode) {
if (inode == null) {
return Collections.emptySet();
}
CNode cnode = inode.mainNode();
if (cnode instanceof TNode) {
return Collections.emptySet();
Expand All @@ -81,9 +81,11 @@ private Set<Subscription> recursiveMatch(Topic topic, INode inode) {
Set<Subscription> subscriptions = new HashSet<>();
if (remainingTopic.isEmpty()) {
subscriptions.addAll(cnode.subscriptions);
}
for (INode subInode : cnode.allChildren()) {
subscriptions.addAll(recursiveMatch(remainingTopic, subInode));
} else {
Map<Token, INode> children = cnode.allChildren();
subscriptions.addAll(recursiveMatch(remainingTopic, children.get(remainingTopic.headToken())));
subscriptions.addAll(recursiveMatch(remainingTopic, children.get(Token.MULTI)));
subscriptions.addAll(recursiveMatch(remainingTopic, children.get(Token.SINGLE)));
}
return subscriptions;
}
Expand Down Expand Up @@ -124,7 +126,10 @@ private Action createNodeAndInsertSubscription(Topic topic, INode inode, Subscri
INode newInode = createPathRec(topic, newSubscription);
CNode cnode = inode.mainNode();
CNode updatedCnode = cnode.copy();
updatedCnode.add(newInode);
boolean success = updatedCnode.add(newInode);
if (!success) {
return Action.REPEAT;
}

return inode.compareAndSet(cnode, updatedCnode) ? Action.OK : Action.REPEAT;
}
Expand Down Expand Up @@ -226,7 +231,7 @@ private void dfsVisit(INode node, IVisitor<?> visitor, int deep) {

visitor.visit(node.mainNode(), deep);
++deep;
for (INode child : node.mainNode().allChildren()) {
for (INode child : node.mainNode().allChildren().values()) {
dfsVisit(child, visitor, deep);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ CNode copy() {
}

@Override
public void add(INode newINode) {
public boolean add(INode newINode) {
throw new IllegalStateException("Can't be invoked on TNode");
}

Expand Down

0 comments on commit 2c56e84

Please sign in to comment.