From b4b431471736cd32a50b49cc9e91e038a4387808 Mon Sep 17 00:00:00 2001 From: gallenvara Date: Mon, 7 Sep 2015 14:55:11 +0800 Subject: [PATCH 1/2] [FLINK-2533] [core] Gap based random sample optimization. --- .../api/java/sampling/BernoulliSampler.java | 32 +++++++-- .../api/java/sampling/PoissonSampler.java | 69 ++++++++++++++----- .../api/java/sampling/RandomSampler.java | 2 + 3 files changed, 80 insertions(+), 23 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java b/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java index 0f5ecc67a19a2..cac31707b74c4 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java @@ -28,12 +28,16 @@ * Bernoulli experiment. * * @param The type of sample. + * @see Gap Sampling */ public class BernoulliSampler extends RandomSampler { private final double fraction; private final Random random; + //THRESHOLD is a tuning parameter for choosing sampling method according to the fraction + private final static double THRESHOLD = 0.33; + /** * Create a Bernoulli sampler with sample fraction and default random number generator. * @@ -102,15 +106,31 @@ public T next() { } private T getNextSampledElement() { - while (input.hasNext()) { - T element = input.next(); - - if (random.nextDouble() <= fraction) { + if (fraction <= THRESHOLD) { + double rand = random.nextDouble(); + double u = Math.max(rand, EPSILON); + int gap = (int) (Math.log(u) / Math.log(1 - fraction)); + int elementCount = 0; + if (input.hasNext()) { + T element = input.next(); + while (input.hasNext() && elementCount < gap) { + element = input.next(); + elementCount++; + } return element; + } else { + return null; } - } + }else { + while (input.hasNext()){ + T element = input.next(); - return null; + if (random.nextDouble() <= fraction) { + return element; + } + } + return null; + } } }; } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java b/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java index 3834d2414bee0..bb958c1079ff2 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java @@ -21,6 +21,7 @@ import org.apache.commons.math3.distribution.PoissonDistribution; import java.util.Iterator; +import java.util.Random; /** * A sampler implementation based on the Poisson Distribution. While sampling elements with fraction @@ -28,11 +29,16 @@ * * @param The type of sample. * @see https://en.wikipedia.org/wiki/Poisson_distribution + * @see Gap Sampling */ public class PoissonSampler extends RandomSampler { private PoissonDistribution poissonDistribution; private final double fraction; + private final Random random = new Random(); + + //THRESHOLD is a tuning parameter for choosing sampling method according to the fraction + private final static double THRESHOLD = 0.4; /** * Create a poisson sampler which can sample elements with replacement. @@ -84,8 +90,7 @@ public boolean hasNext() { if (currentCount > 0) { return true; } else { - moveToNextElement(); - + samplingProcess(); if (currentCount > 0) { return true; } else { @@ -93,29 +98,59 @@ public boolean hasNext() { } } } - - private void moveToNextElement() { - while (input.hasNext()) { + + public int poisson_ge1(double p){ + // sample 'k' from Poisson(p), conditioned to k >= 1 + double q = Math.pow(Math.E, -p); + // simulate a poisson trial such that k >= 1 + double t = q + (1 - q)*random.nextDouble(); + int k = 1; + // continue standard poisson generation trials + t = t * random.nextDouble(); + while (t > q) { + k++; + t = t * random.nextDouble(); + } + return k; + } + + private void moveToNextElement(int num) { + // skip elements with replication factor zero + int elementCount = 0; + while (input.hasNext() && elementCount < num){ currentElement = input.next(); - currentCount = poissonDistribution.sample(); - if (currentCount > 0) { - break; + elementCount++; + } + } + + private void samplingProcess(){ + if (fraction <= THRESHOLD) { + double u = Math.max(random.nextDouble(), EPSILON); + int gap = (int) (Math.log(u) / -fraction); + moveToNextElement(gap); + if (input.hasNext()) { + currentElement = input.next(); + currentCount = poisson_ge1(fraction); + } + } + else { + while (input.hasNext()){ + currentElement = input.next(); + currentCount = poissonDistribution.sample(); + if (currentCount > 0) { + break; + } } } } @Override public T next() { - if (currentCount == 0) { - moveToNextElement(); - } - - if (currentCount == 0) { - return null; - } else { - currentCount--; - return currentElement; + if (currentCount <= 0) { + samplingProcess(); } + currentCount--; + return currentElement; } }; } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/RandomSampler.java b/flink-java/src/main/java/org/apache/flink/api/java/sampling/RandomSampler.java index 5fe292025b1b4..7d748977d3b26 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/sampling/RandomSampler.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/RandomSampler.java @@ -26,6 +26,8 @@ * @param The type of sampler data. */ public abstract class RandomSampler { + + protected final static double EPSILON = 1e-5; protected final Iterator EMPTY_ITERABLE = new SampledIterator() { @Override From 0178a59c6c6fc51d601fca872f06dea1179432fe Mon Sep 17 00:00:00 2001 From: gallenvara Date: Wed, 9 Sep 2015 13:27:51 +0800 Subject: [PATCH 2/2] [FLINK-2533] [core] Gap based random sample optimization. --- .../api/java/sampling/BernoulliSampler.java | 12 ++++-- .../api/java/sampling/PoissonSampler.java | 41 ++++++++++--------- 2 files changed, 29 insertions(+), 24 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java b/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java index cac31707b74c4..99ea5decafb3b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java @@ -35,7 +35,7 @@ public class BernoulliSampler extends RandomSampler { private final double fraction; private final Random random; - //THRESHOLD is a tuning parameter for choosing sampling method according to the fraction + // THRESHOLD is a tuning parameter for choosing sampling method according to the fraction. private final static double THRESHOLD = 0.33; /** @@ -117,12 +117,16 @@ private T getNextSampledElement() { element = input.next(); elementCount++; } - return element; + if (elementCount < gap) { + return null; + } else { + return element; + } } else { return null; } - }else { - while (input.hasNext()){ + } else { + while (input.hasNext()) { T element = input.next(); if (random.nextDouble() <= fraction) { diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java b/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java index bb958c1079ff2..8701167c3ac19 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java @@ -35,9 +35,9 @@ public class PoissonSampler extends RandomSampler { private PoissonDistribution poissonDistribution; private final double fraction; - private final Random random = new Random(); + private final Random random; - //THRESHOLD is a tuning parameter for choosing sampling method according to the fraction + // THRESHOLD is a tuning parameter for choosing sampling method according to the fraction. private final static double THRESHOLD = 0.4; /** @@ -53,6 +53,7 @@ public PoissonSampler(double fraction, long seed) { this.poissonDistribution = new PoissonDistribution(fraction); this.poissonDistribution.reseedRandomGenerator(seed); } + this.random = new Random(seed); } /** @@ -66,6 +67,7 @@ public PoissonSampler(double fraction) { if (this.fraction > 0) { this.poissonDistribution = new PoissonDistribution(fraction); } + this.random = new Random(); } /** @@ -99,13 +101,22 @@ public boolean hasNext() { } } + @Override + public T next() { + if (currentCount <= 0) { + samplingProcess(); + } + currentCount--; + return currentElement; + } + public int poisson_ge1(double p){ - // sample 'k' from Poisson(p), conditioned to k >= 1 + // sample 'k' from Poisson(p), conditioned to k >= 1. double q = Math.pow(Math.E, -p); - // simulate a poisson trial such that k >= 1 - double t = q + (1 - q)*random.nextDouble(); + // simulate a poisson trial such that k >= 1. + double t = q + (1 - q) * random.nextDouble(); int k = 1; - // continue standard poisson generation trials + // continue standard poisson generation trials. t = t * random.nextDouble(); while (t > q) { k++; @@ -114,8 +125,8 @@ public int poisson_ge1(double p){ return k; } - private void moveToNextElement(int num) { - // skip elements with replication factor zero + private void skipGapElements(int num) { + // skip the elements that occurrence number is zero. int elementCount = 0; while (input.hasNext() && elementCount < num){ currentElement = input.next(); @@ -127,13 +138,12 @@ private void samplingProcess(){ if (fraction <= THRESHOLD) { double u = Math.max(random.nextDouble(), EPSILON); int gap = (int) (Math.log(u) / -fraction); - moveToNextElement(gap); + skipGapElements(gap); if (input.hasNext()) { currentElement = input.next(); currentCount = poisson_ge1(fraction); } - } - else { + } else { while (input.hasNext()){ currentElement = input.next(); currentCount = poissonDistribution.sample(); @@ -143,15 +153,6 @@ private void samplingProcess(){ } } } - - @Override - public T next() { - if (currentCount <= 0) { - samplingProcess(); - } - currentCount--; - return currentElement; - } }; } }