Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12818] Polishes spark-sketch module #10985

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.io.IOException;
import java.util.Arrays;

public final class BitArray {
final class BitArray {
private final long[] data;
private long bitCount;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,28 @@
import java.io.OutputStream;

/**
* A Bloom filter is a space-efficient probabilistic data structure, that is used to test whether
* an element is a member of a set. It returns false when the element is definitely not in the
* set, returns true when the element is probably in the set.
*
* Internally a Bloom filter is initialized with 2 information: how many space to use(number of
* bits) and how many hash values to calculate for each record. To get as lower false positive
* probability as possible, user should call {@link BloomFilter#create} to automatically pick a
* best combination of these 2 parameters.
*
* Currently the following data types are supported:
* A Bloom filter is a space-efficient probabilistic data structure that offers an approximate
* containment test with one-sided error: if it claims that an item is contained in it, this
* might be in error, but if it claims that an item is <i>not</i> contained in it, then this is
* definitely true. Currently supported data types include:
* <ul>
* <li>{@link Byte}</li>
* <li>{@link Short}</li>
* <li>{@link Integer}</li>
* <li>{@link Long}</li>
* <li>{@link String}</li>
* </ul>
* The false positive probability ({@code FPP}) of a Bloom filter is defined as the probability that
* {@linkplain #mightContain(Object)} will erroneously return {@code true} for an object that hasu
* not actually been put in the {@code BloomFilter}.
*
* The implementation is largely based on the {@code BloomFilter} class from guava.
* The implementation is largely based on the {@code BloomFilter} class from Guava.
*/
public abstract class BloomFilter {

public enum Version {
/**
* {@code BloomFilter} binary format version 1 (all values written in big-endian order):
* {@code BloomFilter} binary format version 1. All values written in big-endian order:
* <ul>
* <li>Version number, always 1 (32 bit)</li>
* <li>Number of hash functions (32 bit)</li>
Expand All @@ -68,14 +65,13 @@ int getVersionNumber() {
}

/**
* Returns the false positive probability, i.e. the probability that
* {@linkplain #mightContain(Object)} will erroneously return {@code true} for an object that
* has not actually been put in the {@code BloomFilter}.
* Returns the probability that {@linkplain #mightContain(Object)} erroneously return {@code true}
* for an object that has not actually been put in the {@code BloomFilter}.
*
* <p>Ideally, this number should be close to the {@code fpp} parameter
* passed in to create this bloom filter, or smaller. If it is
* significantly higher, it is usually the case that too many elements (more than
* expected) have been put in the {@code BloomFilter}, degenerating it.
* Ideally, this number should be close to the {@code fpp} parameter passed in
* {@linkplain #create(long, double)}, or smaller. If it is significantly higher, it is usually
* the case that too many items (more than expected) have been put in the {@code BloomFilter},
* degenerating it.
*/
public abstract double expectedFpp();

Expand All @@ -85,8 +81,8 @@ int getVersionNumber() {
public abstract long bitSize();

/**
* Puts an element into this {@code BloomFilter}. Ensures that subsequent invocations of
* {@link #mightContain(Object)} with the same element will always return {@code true}.
* Puts an item into this {@code BloomFilter}. Ensures that subsequent invocations of
* {@linkplain #mightContain(Object)} with the same item will always return {@code true}.
*
* @return true if the bloom filter's bits changed as a result of this operation. If the bits
* changed, this is <i>definitely</i> the first time {@code object} has been added to the
Expand All @@ -98,19 +94,19 @@ int getVersionNumber() {
public abstract boolean put(Object item);

/**
* A specialized variant of {@link #put(Object)}, that can only be used to put utf-8 string.
* A specialized variant of {@link #put(Object)} that only supports {@code String} items.
*/
public abstract boolean putString(String str);
public abstract boolean putString(String item);

/**
* A specialized variant of {@link #put(Object)}, that can only be used to put long.
* A specialized variant of {@link #put(Object)} that only supports {@code long} items.
*/
public abstract boolean putLong(long l);
public abstract boolean putLong(long item);

/**
* A specialized variant of {@link #put(Object)}, that can only be used to put byte array.
* A specialized variant of {@link #put(Object)} that only supports byte array items.
*/
public abstract boolean putBinary(byte[] bytes);
public abstract boolean putBinary(byte[] item);

/**
* Determines whether a given bloom filter is compatible with this bloom filter. For two
Expand All @@ -137,38 +133,36 @@ int getVersionNumber() {
public abstract boolean mightContain(Object item);

/**
* A specialized variant of {@link #mightContain(Object)}, that can only be used to test utf-8
* string.
* A specialized variant of {@link #mightContain(Object)} that only tests {@code String} items.
*/
public abstract boolean mightContainString(String str);
public abstract boolean mightContainString(String item);

/**
* A specialized variant of {@link #mightContain(Object)}, that can only be used to test long.
* A specialized variant of {@link #mightContain(Object)} that only tests {@code long} items.
*/
public abstract boolean mightContainLong(long l);
public abstract boolean mightContainLong(long item);

/**
* A specialized variant of {@link #mightContain(Object)}, that can only be used to test byte
* array.
* A specialized variant of {@link #mightContain(Object)} that only tests byte array items.
*/
public abstract boolean mightContainBinary(byte[] bytes);
public abstract boolean mightContainBinary(byte[] item);

/**
* Writes out this {@link BloomFilter} to an output stream in binary format.
* It is the caller's responsibility to close the stream.
* Writes out this {@link BloomFilter} to an output stream in binary format. It is the caller's
* responsibility to close the stream.
*/
public abstract void writeTo(OutputStream out) throws IOException;

/**
* Reads in a {@link BloomFilter} from an input stream.
* It is the caller's responsibility to close the stream.
* Reads in a {@link BloomFilter} from an input stream. It is the caller's responsibility to close
* the stream.
*/
public static BloomFilter readFrom(InputStream in) throws IOException {
return BloomFilterImpl.readFrom(in);
}

/**
* Computes the optimal k (number of hashes per element inserted in Bloom filter), given the
* Computes the optimal k (number of hashes per item inserted in Bloom filter), given the
* expected insertions and total number of bits in the Bloom filter.
*
* See http://en.wikipedia.org/wiki/File:Bloom_filter_fp_probability.svg for the formula.
Expand Down Expand Up @@ -197,31 +191,46 @@ private static long optimalNumOfBits(long n, double p) {
static final double DEFAULT_FPP = 0.03;

/**
* Creates a {@link BloomFilter} with given {@code expectedNumItems} and the default {@code fpp}.
* Creates a {@link BloomFilter} with the expected number of insertions and a default expected
* false positive probability of 3%.
*
* Note that overflowing a {@code BloomFilter} with significantly more elements than specified,
* will result in its saturation, and a sharp deterioration of its false positive probability.
*/
public static BloomFilter create(long expectedNumItems) {
return create(expectedNumItems, DEFAULT_FPP);
}

/**
* Creates a {@link BloomFilter} with given {@code expectedNumItems} and {@code fpp}, it will pick
* an optimal {@code numBits} and {@code numHashFunctions} for the bloom filter.
* Creates a {@link BloomFilter} with the expected number of insertions and expected false
* positive probability.
*
* Note that overflowing a {@code BloomFilter} with significantly more elements than specified,
* will result in its saturation, and a sharp deterioration of its false positive probability.
*/
public static BloomFilter create(long expectedNumItems, double fpp) {
assert fpp > 0.0 : "False positive probability must be > 0.0";
assert fpp < 1.0 : "False positive probability must be < 1.0";
long numBits = optimalNumOfBits(expectedNumItems, fpp);
return create(expectedNumItems, numBits);
if (fpp <= 0D || fpp >= 1D) {
throw new IllegalArgumentException(
"False positive probability must be within range (0.0, 1.0)"
);
}

return create(expectedNumItems, optimalNumOfBits(expectedNumItems, fpp));
}

/**
* Creates a {@link BloomFilter} with given {@code expectedNumItems} and {@code numBits}, it will
* pick an optimal {@code numHashFunctions} which can minimize {@code fpp} for the bloom filter.
*/
public static BloomFilter create(long expectedNumItems, long numBits) {
assert expectedNumItems > 0 : "Expected insertions must be > 0";
assert numBits > 0 : "number of bits must be > 0";
int numHashFunctions = optimalNumOfHashFunctions(expectedNumItems, numBits);
return new BloomFilterImpl(numHashFunctions, numBits);
if (expectedNumItems <= 0) {
throw new IllegalArgumentException("Expected insertions must be positive");
}

if (numBits <= 0) {
throw new IllegalArgumentException("Number of bits must be positive");
}

return new BloomFilterImpl(optimalNumOfHashFunctions(expectedNumItems, numBits), numBits);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@

import java.io.*;

public class BloomFilterImpl extends BloomFilter implements Serializable {
class BloomFilterImpl extends BloomFilter implements Serializable {

private int numHashFunctions;

private BitArray bits;

BloomFilterImpl(int numHashFunctions, long numBits) {
Expand Down Expand Up @@ -77,14 +78,14 @@ public boolean put(Object item) {
}

@Override
public boolean putString(String str) {
return putBinary(Utils.getBytesFromUTF8String(str));
public boolean putString(String item) {
return putBinary(Utils.getBytesFromUTF8String(item));
}

@Override
public boolean putBinary(byte[] bytes) {
int h1 = Murmur3_x86_32.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, bytes.length, 0);
int h2 = Murmur3_x86_32.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, bytes.length, h1);
public boolean putBinary(byte[] item) {
int h1 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, 0);
int h2 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, h1);

long bitSize = bits.bitSize();
boolean bitsChanged = false;
Expand All @@ -100,14 +101,14 @@ public boolean putBinary(byte[] bytes) {
}

@Override
public boolean mightContainString(String str) {
return mightContainBinary(Utils.getBytesFromUTF8String(str));
public boolean mightContainString(String item) {
return mightContainBinary(Utils.getBytesFromUTF8String(item));
}

@Override
public boolean mightContainBinary(byte[] bytes) {
int h1 = Murmur3_x86_32.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, bytes.length, 0);
int h2 = Murmur3_x86_32.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, bytes.length, h1);
public boolean mightContainBinary(byte[] item) {
int h1 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, 0);
int h2 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, h1);

long bitSize = bits.bitSize();
for (int i = 1; i <= numHashFunctions; i++) {
Expand All @@ -124,14 +125,14 @@ public boolean mightContainBinary(byte[] bytes) {
}

@Override
public boolean putLong(long l) {
public boolean putLong(long item) {
// Here we first hash the input long element into 2 int hash values, h1 and h2, then produce n
// hash values by `h1 + i * h2` with 1 <= i <= numHashFunctions.
// Note that `CountMinSketch` use a different strategy, it hash the input long element with
// every i to produce n hash values.
// TODO: the strategy of `CountMinSketch` looks more advanced, should we follow it here?
int h1 = Murmur3_x86_32.hashLong(l, 0);
int h2 = Murmur3_x86_32.hashLong(l, h1);
int h1 = Murmur3_x86_32.hashLong(item, 0);
int h2 = Murmur3_x86_32.hashLong(item, h1);

long bitSize = bits.bitSize();
boolean bitsChanged = false;
Expand All @@ -147,9 +148,9 @@ public boolean putLong(long l) {
}

@Override
public boolean mightContainLong(long l) {
int h1 = Murmur3_x86_32.hashLong(l, 0);
int h2 = Murmur3_x86_32.hashLong(l, h1);
public boolean mightContainLong(long item) {
int h1 = Murmur3_x86_32.hashLong(item, 0);
int h2 = Murmur3_x86_32.hashLong(item, h1);

long bitSize = bits.bitSize();
for (int i = 1; i <= numHashFunctions; i++) {
Expand Down Expand Up @@ -197,7 +198,7 @@ public BloomFilter mergeInPlace(BloomFilter other) throws IncompatibleMergeExcep
throw new IncompatibleMergeException("Cannot merge null bloom filter");
}

if (!(other instanceof BloomFilter)) {
if (!(other instanceof BloomFilterImpl)) {
throw new IncompatibleMergeException(
"Cannot merge bloom filter of class " + other.getClass().getName()
);
Expand All @@ -211,7 +212,8 @@ public BloomFilter mergeInPlace(BloomFilter other) throws IncompatibleMergeExcep

if (this.numHashFunctions != that.numHashFunctions) {
throw new IncompatibleMergeException(
"Cannot merge bloom filters with different number of hash functions");
"Cannot merge bloom filters with different number of hash functions"
);
}

this.bits.putAll(that.bits);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.io.OutputStream;

/**
* A Count-Min sketch is a probabilistic data structure used for summarizing streams of data in
* A Count-min sketch is a probabilistic data structure used for summarizing streams of data in
* sub-linear space. Currently, supported data types include:
* <ul>
* <li>{@link Byte}</li>
Expand All @@ -31,8 +31,7 @@
* <li>{@link Long}</li>
* <li>{@link String}</li>
* </ul>
* Each {@link CountMinSketch} is initialized with a random seed, and a pair
* of parameters:
* A {@link CountMinSketch} is initialized with a random seed, and a pair of parameters:
* <ol>
* <li>relative error (or {@code eps}), and
* <li>confidence (or {@code delta})
Expand All @@ -49,16 +48,13 @@
* <li>{@code w = ceil(-log(1 - confidence) / log(2))}</li>
* </ul>
*
* See http://www.eecs.harvard.edu/~michaelm/CS222/countmin.pdf for technical details,
* including proofs of the estimates and error bounds used in this implementation.
*
* This implementation is largely based on the {@code CountMinSketch} class from stream-lib.
*/
abstract public class CountMinSketch {

public enum Version {
/**
* {@code CountMinSketch} binary format version 1 (all values written in big-endian order):
* {@code CountMinSketch} binary format version 1. All values written in big-endian order:
* <ul>
* <li>Version number, always 1 (32 bit)</li>
* <li>Total count of added items (64 bit)</li>
Expand Down Expand Up @@ -172,14 +168,14 @@ public abstract CountMinSketch mergeInPlace(CountMinSketch other)
throws IncompatibleMergeException;

/**
* Writes out this {@link CountMinSketch} to an output stream in binary format.
* It is the caller's responsibility to close the stream.
* Writes out this {@link CountMinSketch} to an output stream in binary format. It is the caller's
* responsibility to close the stream.
*/
public abstract void writeTo(OutputStream out) throws IOException;

/**
* Reads in a {@link CountMinSketch} from an input stream.
* It is the caller's responsibility to close the stream.
* Reads in a {@link CountMinSketch} from an input stream. It is the caller's responsibility to
* close the stream.
*/
public static CountMinSketch readFrom(InputStream in) throws IOException {
return CountMinSketchImpl.readFrom(in);
Expand All @@ -188,6 +184,10 @@ public static CountMinSketch readFrom(InputStream in) throws IOException {
/**
* Creates a {@link CountMinSketch} with given {@code depth}, {@code width}, and random
* {@code seed}.
*
* @param depth depth of the Count-min Sketch, must be positive
* @param width width of the Count-min Sketch, must be positive
* @param seed random seed
*/
public static CountMinSketch create(int depth, int width, int seed) {
return new CountMinSketchImpl(depth, width, seed);
Expand All @@ -196,6 +196,10 @@ public static CountMinSketch create(int depth, int width, int seed) {
/**
* Creates a {@link CountMinSketch} with given relative error ({@code eps}), {@code confidence},
* and random {@code seed}.
*
* @param eps relative error, must be positive
* @param confidence confidence, must be positive and less than 1.0
* @param seed random seed
*/
public static CountMinSketch create(double eps, double confidence, int seed) {
return new CountMinSketchImpl(eps, confidence, seed);
Expand Down
Loading