Skip to content

Commit

Permalink
[ML] Frequent Items: use a bitset for deduplication (#88943)
Browse files Browse the repository at this point in the history
Speedup frequent_items by using bitsets instead of lists of longs. With this item sets
can be faster de-duplicated. A bit is set according to the order of top items (by count).
  • Loading branch information
Hendrik Muhs committed Aug 1, 2022
1 parent 87ab933 commit e64eb8c
Show file tree
Hide file tree
Showing 10 changed files with 996 additions and 367 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/88943.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 88943
summary: "Frequent Items: use a bitset for deduplication"
area: Machine Learning
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@

package org.elasticsearch.xpack.ml.aggs.frequentitemsets;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.BitSet;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.LongsRef;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.xpack.ml.aggs.frequentitemsets.TransactionStore.TopItemIds;

import java.io.IOException;
import java.util.Arrays;
Expand All @@ -30,6 +32,7 @@
* if [a, b] is not in T, [a, b, c] can not be in T either
*/
class CountingItemSetTraverser implements Releasable {
private static final Logger logger = LogManager.getLogger(CountingItemSetTraverser.class);

// start size and size increment for the occurences stack
private static final int OCCURENCES_SIZE_INCREMENT = 10;
Expand All @@ -48,13 +51,19 @@ class CountingItemSetTraverser implements Releasable {
// growable bit set from java util
private java.util.BitSet visited;

CountingItemSetTraverser(TransactionStore transactionStore, int cacheTraversalDepth, int cacheNumberOfTransactions, long minCount) {
CountingItemSetTraverser(
TransactionStore transactionStore,
TopItemIds topItemIds,
int cacheTraversalDepth,
int cacheNumberOfTransactions,
long minCount
) {
this.transactionStore = transactionStore;

boolean success = false;
try {
// we allocate 2 big arrays, if the 2nd allocation fails, ensure we clean up
this.topItemSetTraverser = transactionStore.getTopItemIdTraverser();
this.topItemSetTraverser = new ItemSetTraverser(topItemIds);
this.topTransactionIds = transactionStore.getTopTransactionIds();
success = true;
} finally {
Expand All @@ -80,11 +89,15 @@ public boolean next(long earlyStopMinCount) throws IOException {
final long totalTransactionCount = transactionStore.getTotalTransactionCount();

int depth = topItemSetTraverser.getNumberOfItems();
long occurencesOfSingleItem = transactionStore.getItemCount(topItemSetTraverser.getItemId());

if (depth == 1) {
// at the 1st level, we can take the count directly from the transaction store
occurencesStack[0] = transactionStore.getItemCount(topItemSetTraverser.getItemId());
occurencesStack[0] = occurencesOfSingleItem;
return true;
} else if (occurencesOfSingleItem < earlyStopMinCount) {
rememberCountInStack(depth, occurencesOfSingleItem);
return true;

// till a certain depth store results in a cache matrix
} else if (depth < cacheTraversalDepth) {
// get the cached skip count
Expand Down Expand Up @@ -187,7 +200,7 @@ public long getCount() {
/**
* Get the count of the item set without the last item
*/
public long getPreviousCount() {
public long getParentCount() {
if (topItemSetTraverser.getNumberOfItems() > 1) {
return occurencesStack[topItemSetTraverser.getNumberOfItems() - 2];
}
Expand All @@ -201,7 +214,7 @@ public boolean hasBeenVisited() {
return true;
}

public boolean hasPredecessorBeenVisited() {
public boolean hasParentBeenVisited() {
if (topItemSetTraverser.getNumberOfItems() > 1) {
return visited.get(topItemSetTraverser.getNumberOfItems() - 2);
}
Expand All @@ -214,7 +227,7 @@ public void setVisited() {
}
}

public void setPredecessorVisited() {
public void setParentVisited() {
if (topItemSetTraverser.getNumberOfItems() > 1) {
visited.set(topItemSetTraverser.getNumberOfItems() - 2);
}
Expand All @@ -228,10 +241,15 @@ public int getNumberOfItems() {
}

/**
* Get the current item set
*
* Get a bitset representation of the current item set
*/
public LongsRef getItemSet() {
return topItemSetTraverser.getItemSet();
public ItemSetBitSet getItemSetBitSet() {
return topItemSetTraverser.getItemSetBitSet();
}

public ItemSetBitSet getParentItemSetBitSet() {
return topItemSetTraverser.getParentItemSetBitSet();
}

/**
Expand All @@ -250,7 +268,7 @@ public boolean atLeaf() {

@Override
public void close() {
Releasables.close(topItemSetTraverser, topTransactionIds);
Releasables.close(topTransactionIds);
}

// remember the count in the stack without tracking push and pop
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.LongsRef;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand All @@ -25,6 +24,7 @@
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.ml.aggs.frequentitemsets.FrequentItemSetCollector.FrequentItemSet;
import org.elasticsearch.xpack.ml.aggs.frequentitemsets.TransactionStore.TopItemIds;
import org.elasticsearch.xpack.ml.aggs.frequentitemsets.mr.AbstractItemSetMapReducer;
import org.elasticsearch.xpack.ml.aggs.frequentitemsets.mr.ItemSetMapReduceValueSource.Field;

Expand Down Expand Up @@ -338,17 +338,17 @@ private static EclatResult eclat(
final long totalTransactionCount = transactionStore.getTotalTransactionCount();
Map<String, Object> profilingInfo = null;
long minCount = (long) Math.ceil(totalTransactionCount * minimumSupport);
FrequentItemSetCollector collector = new FrequentItemSetCollector(transactionStore, size, minCount);
long numberOfSetsChecked = 0;

if (profilingInfoReduce != null) {
profilingInfo = new LinkedHashMap<>(profilingInfoReduce);
profilingInfo.put("start_min_count_eclat", minCount);
}

try (
TopItemIds topItemIds = transactionStore.getTopItemIds();
CountingItemSetTraverser setTraverser = new CountingItemSetTraverser(
transactionStore,
topItemIds,
BITSET_CACHE_TRAVERSAL_DEPTH,
(int) Math.min(MAX_BITSET_CACHE_NUMBER_OF_TRANSACTIONS, totalTransactionCount),
minCount
Expand All @@ -360,7 +360,8 @@ private static EclatResult eclat(
minCount,
transactionStore.getTotalItemCount()
);

FrequentItemSetCollector collector = new FrequentItemSetCollector(transactionStore, topItemIds, size, minCount);
long numberOfSetsChecked = 0;
long previousMinCount = 0;

while (setTraverser.next(minCount)) {
Expand Down Expand Up @@ -402,8 +403,11 @@ private static EclatResult eclat(
if (setTraverser.atLeaf()
&& setTraverser.hasBeenVisited() == false
&& setTraverser.getCount() >= minCount
&& setTraverser.getItemSet().length >= minimumSetSize) {
minCount = collector.add(setTraverser.getItemSet(), setTraverser.getCount());
&& setTraverser.getItemSetBitSet().cardinality() >= minimumSetSize) {

logger.trace("add after prune");

minCount = collector.add(setTraverser.getItemSetBitSet(), setTraverser.getCount());
// no need to set visited, as we are on a leaf
}

Expand All @@ -418,19 +422,17 @@ private static EclatResult eclat(
*
* iff the count of the subset is higher, collect
*/
if (setTraverser.hasPredecessorBeenVisited() == false
&& setTraverser.getItemSet().length > minimumSetSize
&& setTraverser.getCount() < setTraverser.getPreviousCount()) {
if (setTraverser.hasParentBeenVisited() == false
&& setTraverser.getItemSetBitSet().cardinality() > minimumSetSize
&& setTraverser.getCount() < setTraverser.getParentCount()) {
// add the set without the last item

LongsRef subItemSet = setTraverser.getItemSet().clone();
subItemSet.length--;
minCount = collector.add(subItemSet, setTraverser.getPreviousCount());
minCount = collector.add(setTraverser.getParentItemSetBitSet(), setTraverser.getParentCount());
}

// closed set criteria: the predecessor is no longer of interest: either we reported in the previous step or we found a
// super set
setTraverser.setPredecessorVisited();
setTraverser.setParentVisited();

/**
* Iff the traverser reached a leaf, the item set can not be further expanded, e.g. we reached [f]:
Expand All @@ -445,8 +447,8 @@ private static EclatResult eclat(
*
* Note: this also covers the last item, e.g. [a, x, y]
*/
if (setTraverser.atLeaf() && setTraverser.getItemSet().length >= minimumSetSize) {
minCount = collector.add(setTraverser.getItemSet(), setTraverser.getCount());
if (setTraverser.atLeaf() && setTraverser.getItemSetBitSet().cardinality() >= minimumSetSize) {
minCount = collector.add(setTraverser.getItemSetBitSet(), setTraverser.getCount());
// no need to set visited, as we are on a leaf
}

Expand Down

0 comments on commit e64eb8c

Please sign in to comment.