From 6da3e2042ba19517b364b5699fb9d619f31b1a2e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=99=A8=E5=85=89=20=E4=BD=95?= Date: Mon, 18 Apr 2016 13:52:45 -0500 Subject: [PATCH 1/6] add try-finally block --- .../apache/flink/runtime/blob/BlobCache.java | 58 +++++++++---------- 1 file changed, 28 insertions(+), 30 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java index 4d3336464de23..ae725ac52e828 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java @@ -24,12 +24,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.URL; import java.util.concurrent.atomic.AtomicBoolean; @@ -41,7 +35,9 @@ */ public final class BlobCache implements BlobService { - /** The log object used for debugging. */ + /** + * The log object used for debugging. + */ private static final Logger LOG = LoggerFactory.getLogger(BlobCache.class); private final InetSocketAddress serverAddress; @@ -50,10 +46,14 @@ public final class BlobCache implements BlobService { private final AtomicBoolean shutdownRequested = new AtomicBoolean(); - /** Shutdown hook thread to ensure deletion of the storage directory. */ + /** + * Shutdown hook thread to ensure deletion of the storage directory. + */ private final Thread shutdownHook; - /** The number of retries when the transfer fails */ + /** + * The number of retries when the transfer fails + */ private final int numFetchRetries; @@ -71,13 +71,12 @@ public BlobCache(InetSocketAddress serverAddress, Configuration configuration) { // configure the number of fetch retries final int fetchRetries = configuration.getInteger( - ConfigConstants.BLOB_FETCH_RETRIES_KEY, ConfigConstants.DEFAULT_BLOB_FETCH_RETRIES); + ConfigConstants.BLOB_FETCH_RETRIES_KEY, ConfigConstants.DEFAULT_BLOB_FETCH_RETRIES); if (fetchRetries >= 0) { this.numFetchRetries = fetchRetries; - } - else { + } else { LOG.warn("Invalid value for {}. System will attempt no retires on failed fetches of BLOBs.", - ConfigConstants.BLOB_FETCH_RETRIES_KEY); + ConfigConstants.BLOB_FETCH_RETRIES_KEY); this.numFetchRetries = 0; } @@ -89,7 +88,7 @@ public BlobCache(InetSocketAddress serverAddress, Configuration configuration) { * Returns the URL for the BLOB with the given key. The method will first attempt to serve * the BLOB from its local cache. If the BLOB is not in the cache, the method will try to download it * from this cache's BLOB server. - * + * * @param requiredBlob The key of the desired BLOB. * @return URL referring to the local storage location of the BLOB. * @throws IOException Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server. @@ -146,8 +145,7 @@ public URL getURL(final BlobKey requiredBlob) throws IOException { // success, we finished break; - } - catch (Throwable t) { + } catch (Throwable t) { // we use "catch (Throwable)" to keep the root exception. Otherwise that exception // it would be replaced by any exception thrown in the finally block closeSilently(os); @@ -160,10 +158,9 @@ public URL getURL(final BlobKey requiredBlob) throws IOException { throw new IOException(t.getMessage(), t); } } - } - catch (IOException e) { + } catch (IOException e) { String message = "Failed to fetch BLOB " + requiredBlob + " from " + serverAddress + - " and store it under " + localJarFile.getAbsolutePath(); + " and store it under " + localJarFile.getAbsolutePath(); if (attempt < numFetchRetries) { attempt++; if (LOG.isDebugEnabled()) { @@ -171,8 +168,7 @@ public URL getURL(final BlobKey requiredBlob) throws IOException { } else { LOG.error(message + " Retrying..."); } - } - else { + } else { LOG.error(message + " No retries left.", e); throw new IOException(message, e); } @@ -185,9 +181,10 @@ public URL getURL(final BlobKey requiredBlob) throws IOException { /** * Deletes the file associated with the given key from the BLOB cache. + * * @param key referring to the file to be deleted */ - public void delete(BlobKey key) throws IOException{ + public void delete(BlobKey key) throws IOException { final File localFile = BlobUtils.getStorageLocation(storageDir, key); if (localFile.exists() && !localFile.delete()) { @@ -197,13 +194,17 @@ public void delete(BlobKey key) throws IOException{ /** * Deletes the file associated with the given key from the BLOB cache and BLOB server. + * * @param key referring to the file to be deleted */ public void deleteGlobal(BlobKey key) throws IOException { delete(key); BlobClient bc = createClient(); - bc.delete(key); - bc.close(); + try { + bc.delete(key); + } finally { + bc.close(); + } } @Override @@ -219,8 +220,7 @@ public void shutdown() { // Clean up the storage directory try { FileUtils.deleteDirectory(storageDir); - } - catch (IOException e) { + } catch (IOException e) { LOG.error("BLOB cache failed to properly clean up its storage directory."); } @@ -228,11 +228,9 @@ public void shutdown() { if (shutdownHook != null && shutdownHook != Thread.currentThread()) { try { Runtime.getRuntime().removeShutdownHook(shutdownHook); - } - catch (IllegalStateException e) { + } catch (IllegalStateException e) { // race, JVM is in shutdown already, we can safely ignore this - } - catch (Throwable t) { + } catch (Throwable t) { LOG.warn("Exception while unregistering BLOB cache's cleanup shutdown hook."); } } From 4ceba9abdf9e14938cb840b60c10846fc3ae3c68 Mon Sep 17 00:00:00 2001 From: Chenguang He Date: Mon, 18 Apr 2016 14:37:43 -0500 Subject: [PATCH 2/6] Update BlobCache.java --- .../src/main/java/org/apache/flink/runtime/blob/BlobCache.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java index ae725ac52e828..b21d0e422a262 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java @@ -23,7 +23,7 @@ import org.apache.flink.configuration.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import java.io.*; import java.net.InetSocketAddress; import java.net.URL; import java.util.concurrent.atomic.AtomicBoolean; From 7cbe926255c3640fa2543219ac053f6306970150 Mon Sep 17 00:00:00 2001 From: Chenguang He Date: Mon, 18 Apr 2016 14:41:45 -0500 Subject: [PATCH 3/6] reformat code style just notice my intellij idea reformat the code style. so i change it back --- .../apache/flink/runtime/blob/BlobCache.java | 50 +++++++++++-------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java index b21d0e422a262..6c200bde361d0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java @@ -23,7 +23,13 @@ import org.apache.flink.configuration.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.*; + +import java.io.Closeable; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.URL; import java.util.concurrent.atomic.AtomicBoolean; @@ -35,9 +41,7 @@ */ public final class BlobCache implements BlobService { - /** - * The log object used for debugging. - */ + /** The log object used for debugging. */ private static final Logger LOG = LoggerFactory.getLogger(BlobCache.class); private final InetSocketAddress serverAddress; @@ -46,14 +50,10 @@ public final class BlobCache implements BlobService { private final AtomicBoolean shutdownRequested = new AtomicBoolean(); - /** - * Shutdown hook thread to ensure deletion of the storage directory. - */ + /** Shutdown hook thread to ensure deletion of the storage directory. */ private final Thread shutdownHook; - /** - * The number of retries when the transfer fails - */ + /** The number of retries when the transfer fails */ private final int numFetchRetries; @@ -74,7 +74,8 @@ public BlobCache(InetSocketAddress serverAddress, Configuration configuration) { ConfigConstants.BLOB_FETCH_RETRIES_KEY, ConfigConstants.DEFAULT_BLOB_FETCH_RETRIES); if (fetchRetries >= 0) { this.numFetchRetries = fetchRetries; - } else { + } + else { LOG.warn("Invalid value for {}. System will attempt no retires on failed fetches of BLOBs.", ConfigConstants.BLOB_FETCH_RETRIES_KEY); this.numFetchRetries = 0; @@ -145,7 +146,8 @@ public URL getURL(final BlobKey requiredBlob) throws IOException { // success, we finished break; - } catch (Throwable t) { + } + catch (Throwable t) { // we use "catch (Throwable)" to keep the root exception. Otherwise that exception // it would be replaced by any exception thrown in the finally block closeSilently(os); @@ -158,7 +160,8 @@ public URL getURL(final BlobKey requiredBlob) throws IOException { throw new IOException(t.getMessage(), t); } } - } catch (IOException e) { + } + catch (IOException e) { String message = "Failed to fetch BLOB " + requiredBlob + " from " + serverAddress + " and store it under " + localJarFile.getAbsolutePath(); if (attempt < numFetchRetries) { @@ -168,7 +171,8 @@ public URL getURL(final BlobKey requiredBlob) throws IOException { } else { LOG.error(message + " Retrying..."); } - } else { + } + else { LOG.error(message + " No retries left.", e); throw new IOException(message, e); } @@ -181,10 +185,9 @@ public URL getURL(final BlobKey requiredBlob) throws IOException { /** * Deletes the file associated with the given key from the BLOB cache. - * * @param key referring to the file to be deleted */ - public void delete(BlobKey key) throws IOException { + public void delete(BlobKey key) throws IOException{ final File localFile = BlobUtils.getStorageLocation(storageDir, key); if (localFile.exists() && !localFile.delete()) { @@ -194,15 +197,15 @@ public void delete(BlobKey key) throws IOException { /** * Deletes the file associated with the given key from the BLOB cache and BLOB server. - * * @param key referring to the file to be deleted */ public void deleteGlobal(BlobKey key) throws IOException { - delete(key); BlobClient bc = createClient(); + delete(key); try { bc.delete(key); - } finally { + } + finally { bc.close(); } } @@ -220,7 +223,8 @@ public void shutdown() { // Clean up the storage directory try { FileUtils.deleteDirectory(storageDir); - } catch (IOException e) { + } + catch (IOException e) { LOG.error("BLOB cache failed to properly clean up its storage directory."); } @@ -228,9 +232,11 @@ public void shutdown() { if (shutdownHook != null && shutdownHook != Thread.currentThread()) { try { Runtime.getRuntime().removeShutdownHook(shutdownHook); - } catch (IllegalStateException e) { + } + catch (IllegalStateException e) { // race, JVM is in shutdown already, we can safely ignore this - } catch (Throwable t) { + } + catch (Throwable t) { LOG.warn("Exception while unregistering BLOB cache's cleanup shutdown hook."); } } From 2773ca9167336243abf28e7ffdecf412d4ddace5 Mon Sep 17 00:00:00 2001 From: Chenguang He Date: Tue, 19 Apr 2016 11:44:07 -0500 Subject: [PATCH 4/6] Update BlobCache.java Just notices that! Thanks uce! --- .../src/main/java/org/apache/flink/runtime/blob/BlobCache.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java index 6c200bde361d0..bb0aacb97007f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java @@ -201,8 +201,8 @@ public void delete(BlobKey key) throws IOException{ */ public void deleteGlobal(BlobKey key) throws IOException { BlobClient bc = createClient(); - delete(key); try { + delete(key); bc.delete(key); } finally { From c402729a8077ae884cdc5b209cac4ea33183fec0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=99=A8=E5=85=89=20=E4=BD=95?= Date: Wed, 20 Apr 2016 02:37:45 -0500 Subject: [PATCH 5/6] Implement VeryFastReservoirSampler --- .../sampling/VeryFastReservoirSampler.java | 120 ++++++++++ .../VeryFastReservoirSamplingTest.java | 211 ++++++++++++++++++ 2 files changed, 331 insertions(+) create mode 100644 flink-java/src/main/java/org/apache/flink/api/java/sampling/VeryFastReservoirSampler.java create mode 100644 flink-java/src/test/java/org/apache/flink/api/java/sampling/VeryFastReservoirSamplingTest.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/VeryFastReservoirSampler.java b/flink-java/src/main/java/org/apache/flink/api/java/sampling/VeryFastReservoirSampler.java new file mode 100644 index 0000000000000..89a6b9c837ef3 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/VeryFastReservoirSampler.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.sampling; + +import com.google.common.base.Preconditions; +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.XORShiftRandom; + +import java.util.Iterator; +import java.util.PriorityQueue; +import java.util.Random; + +/** + * A sampler implementation built upon a Bernoulli trail. This sampler is used to sample with + * fraction and without replacement. Whether an element is sampled or not is determined by a + * Bernoulli experiment. + * + * @param The type of sample. + * @see Very Fast Reservoir Sampling + */ +@Internal +public class VeryFastReservoirSampler extends DistributedRandomSampler { + + private final Random random; + // THRESHOLD is a tuning parameter for choosing sampling method according to the fraction. + private final int THRESHOLD = 4 * super.numSamples; + + /** + * Create a new sampler with reservoir size and a supplied random number generator. + * + * @param numSamples Maximum number of samples to retain in reservoir, must be non-negative. + * @param random Instance of random number generator for sampling. + */ + public VeryFastReservoirSampler(int numSamples, Random random) { + super(numSamples); + Preconditions.checkArgument(numSamples >= 0, "numSamples should be non-negative."); + this.random = random; + } + + /** + * Create a new sampler with reservoir size and a default random number generator. + * + * @param numSamples Maximum number of samples to retain in reservoir, must be non-negative. + */ + public VeryFastReservoirSampler(int numSamples) { + this(numSamples, new XORShiftRandom()); + } + + /** + * Create a new sampler with reservoir size and the seed for random number generator. + * + * @param numSamples Maximum number of samples to retain in reservoir, must be non-negative. + * @param seed Random number generator seed. + */ + public VeryFastReservoirSampler(int numSamples, long seed) { + + this(numSamples, new XORShiftRandom(seed)); + } + + @Override + public Iterator> sampleInPartition(Iterator input) { + if (numSamples == 0) { + return EMPTY_INTERMEDIATE_ITERABLE; + } + PriorityQueue> queue = new PriorityQueue>(numSamples); + double probability; + IntermediateSampleData smallest = null; + int index = 0, k, gap = 0; + while (input.hasNext()) { + T element = input.next(); + double rand = random.nextDouble(); + if (index < THRESHOLD) { // if index is less than THRESHOLD, then use regular reservoir + if (index < numSamples) { + // Fill the queue with first K elements from input. + queue.add(new IntermediateSampleData(random.nextDouble(), element)); + smallest = queue.peek(); + } else { + // Remove the element with the smallest weight, and append current element into the queue. + if (rand > smallest.getWeight()) { + queue.remove(); + queue.add(new IntermediateSampleData(rand, element)); + smallest = queue.peek(); + } + } + index++; + } else { // fast section + probability = (double) numSamples / index; + double rand1 = random.nextDouble(); + double u = Math.max(rand1, EPSILON); + gap = (int) (Math.log(u) / Math.log(1 - probability)); + if (gap > 0) { + while (input.hasNext() && gap > 0) { + gap--; + element = input.next(); + index++; + } + } + queue.remove(); + queue.add(new IntermediateSampleData(rand, element)); + smallest = queue.peek(); + } + } + return queue.iterator(); + } +} diff --git a/flink-java/src/test/java/org/apache/flink/api/java/sampling/VeryFastReservoirSamplingTest.java b/flink-java/src/test/java/org/apache/flink/api/java/sampling/VeryFastReservoirSamplingTest.java new file mode 100644 index 0000000000000..98a9f5ced6ff0 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/sampling/VeryFastReservoirSamplingTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.sampling; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.*; + +import static org.junit.Assert.assertTrue; + +/** + * This test suite try to verify whether all the random samplers work as we expected, which mainly focus on: + *
    + *
  • Does sampled result fit into input parameters? we check parameters like sample fraction, sample size, + * w/o replacement, and so on.
  • + *
  • Does sampled result randomly selected? we verify this by measure how much does it distributed on source data. + * Run Kolmogorov-Smirnov (KS) test between the random samplers and default reference samplers which is distributed + * well-proportioned on source data. If random sampler select elements randomly from source, it would distributed + * well-proportioned on source data as well. The KS test will fail to strongly reject the null hypothesis that + * the distributions of sampling gaps are the same. + *
  • + *
+ * + * @see Kolmogorov Smirnov test + */ +public class VeryFastReservoirSamplingTest { + private final static int SOURCE_SIZE = 10000; + private static KolmogorovSmirnovTest ksTest; + private static List source; + private final static int DEFFAULT_PARTITION_NUMBER=10; + private List[] sourcePartitions = new List[DEFFAULT_PARTITION_NUMBER]; + + @BeforeClass + public static void init() { + // initiate source data set. + source = new ArrayList(SOURCE_SIZE); + for (int i = 0; i < SOURCE_SIZE; i++) { + source.add((double) i); + } + + ksTest = new KolmogorovSmirnovTest(); + } + + private void initSourcePartition() { + for (int i=0; i(); + } + for (int i = 0; i< SOURCE_SIZE; i++) { + int index = i % DEFFAULT_PARTITION_NUMBER; + sourcePartitions[index].add((double)i); + } + } +// +// @Test(expected = java.lang.IllegalArgumentException.class) +// public void testBernoulliSamplerWithUnexpectedFraction1() { +// verifySamplerFraction(-1, false); +// } +// +// @Test(expected = java.lang.IllegalArgumentException.class) +// public void testBernoulliSamplerWithUnexpectedFraction2() { +// verifySamplerFraction(2, false); +// } + + @Test + public void testVeryFastReservoirSampler() { + initSourcePartition(); + + verifyVeryFastReservoirSampler(10, false); + } + @Test + public void testVeryFastReservoirSamplerWithMultiSourcePartitions() { + initSourcePartition(); + + verifyVeryFastReservoirSampler(100, true); + } + + private int getSize(Iterator iterator) { + int size = 0; + while (iterator.hasNext()) { + iterator.next(); + size++; + } + return size; + } + + private void verifyVeryFastReservoirSampler(int numSamplers, boolean sampleOnPartitions) { + VeryFastReservoirSampler sampler = new VeryFastReservoirSampler(numSamplers); + verifyRandomSamplerWithSampleSize(numSamplers, sampler, true, sampleOnPartitions); + verifyRandomSamplerWithSampleSize(numSamplers, sampler, false, sampleOnPartitions); + } + + /* + * Verify whether random sampler sample with fixed size from source data randomly. There are two default sample, one is + * sampled from source data with certain interval, the other is sampled only from the first half region of source data, + * If random sampler select elements randomly from source, it would distributed well-proportioned on source data as well, + * so the K-S Test result would accept the first one, while reject the second one. + */ + private void verifyRandomSamplerWithSampleSize(int sampleSize, RandomSampler sampler, boolean withDefaultSampler, boolean sampleWithPartitions) { + double[] baseSample; + if (withDefaultSampler) { + baseSample = getDefaultSampler(sampleSize); + } else { + baseSample = getWrongSampler(sampleSize); + } + + verifyKSTest(sampler, baseSample, withDefaultSampler, sampleWithPartitions); + } + + private void verifyKSTest(RandomSampler sampler, double[] defaultSampler, boolean expectSuccess, boolean sampleOnPartitions) { + double[] sampled = getSampledOutput(sampler, sampleOnPartitions); + double pValue = ksTest.kolmogorovSmirnovStatistic(sampled, defaultSampler); + double dValue = getDValue(sampled.length, defaultSampler.length); + if (expectSuccess) { + assertTrue(String.format("KS test result with p value(%f), d value(%f)", pValue, dValue), pValue <= dValue); + } else { + assertTrue(String.format("KS test result with p value(%f), d value(%f)", pValue, dValue), pValue > dValue); + } + } + + private double[] getSampledOutput(RandomSampler sampler, boolean sampleOnPartitions) { + Iterator sampled = null; + if (sampleOnPartitions) { + DistributedRandomSampler reservoirRandomSampler = (DistributedRandomSampler)sampler; + List> intermediateResult = Lists.newLinkedList(); + for (int i=0; i> partialIntermediateResult = reservoirRandomSampler.sampleInPartition(sourcePartitions[i].iterator()); + while (partialIntermediateResult.hasNext()) { + intermediateResult.add(partialIntermediateResult.next()); + } + } + sampled = reservoirRandomSampler.sampleInCoordinator(intermediateResult.iterator()); + } else { + sampled = sampler.sample(source.iterator()); + } + List list = Lists.newArrayList(); + while (sampled.hasNext()) { + list.add(sampled.next()); + } + double[] result = transferFromListToArrayWithOrder(list); + return result; + } + + /* + * Some sample result may not order by the input sequence, we should make it in order to do K-S test. + */ + private double[] transferFromListToArrayWithOrder(List list) { + Collections.sort(list); + double[] result = new double[list.size()]; + for (int i = 0; i < list.size(); i++) { + result[i] = list.get(i); + } + return result; + } + + private double[] getDefaultSampler(int fixSize) { + Preconditions.checkArgument(fixSize > 0, "Sample fraction should be positive."); + int size = fixSize; + double step = SOURCE_SIZE / (double) fixSize; + double[] defaultSampler = new double[size]; + for (int i = 0; i < size; i++) { + defaultSampler[i] = Math.round(step * i); + } + + return defaultSampler; + } + + /* + * Build a failed sample distribution which only contains elements in the first half of source data. + */ + private double[] getWrongSampler(int fixSize) { + Preconditions.checkArgument(fixSize > 0, "Sample size be positive."); + int halfSourceSize = SOURCE_SIZE / 2; + int size = fixSize; + double[] wrongSampler = new double[size]; + for (int i = 0; i < size; i++) { + wrongSampler[i] = (double) i % halfSourceSize; + } + + return wrongSampler; + } + + /* + * Calculate the D value of K-S test for p-value 0.01, m and n are the sample size + */ + private double getDValue(int m, int n) { + Preconditions.checkArgument(m > 0, "input sample size should be positive."); + Preconditions.checkArgument(n > 0, "input sample size should be positive."); + double first = (double) m; + double second = (double) n; + return 1.63 * Math.sqrt((first + second) / (first * second)); + } +} From 5bbb998926b4256324060dad6a329f634d3f33e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=99=A8=E5=85=89=20=E4=BD=95?= Date: Wed, 20 Apr 2016 09:53:41 -0500 Subject: [PATCH 6/6] Revert "Implement VeryFastReservoirSampler" This reverts commit c402729a8077ae884cdc5b209cac4ea33183fec0. --- .../sampling/VeryFastReservoirSampler.java | 120 ---------- .../VeryFastReservoirSamplingTest.java | 211 ------------------ 2 files changed, 331 deletions(-) delete mode 100644 flink-java/src/main/java/org/apache/flink/api/java/sampling/VeryFastReservoirSampler.java delete mode 100644 flink-java/src/test/java/org/apache/flink/api/java/sampling/VeryFastReservoirSamplingTest.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/VeryFastReservoirSampler.java b/flink-java/src/main/java/org/apache/flink/api/java/sampling/VeryFastReservoirSampler.java deleted file mode 100644 index 89a6b9c837ef3..0000000000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/sampling/VeryFastReservoirSampler.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.java.sampling; - -import com.google.common.base.Preconditions; -import org.apache.flink.annotation.Internal; -import org.apache.flink.util.XORShiftRandom; - -import java.util.Iterator; -import java.util.PriorityQueue; -import java.util.Random; - -/** - * A sampler implementation built upon a Bernoulli trail. This sampler is used to sample with - * fraction and without replacement. Whether an element is sampled or not is determined by a - * Bernoulli experiment. - * - * @param The type of sample. - * @see Very Fast Reservoir Sampling - */ -@Internal -public class VeryFastReservoirSampler extends DistributedRandomSampler { - - private final Random random; - // THRESHOLD is a tuning parameter for choosing sampling method according to the fraction. - private final int THRESHOLD = 4 * super.numSamples; - - /** - * Create a new sampler with reservoir size and a supplied random number generator. - * - * @param numSamples Maximum number of samples to retain in reservoir, must be non-negative. - * @param random Instance of random number generator for sampling. - */ - public VeryFastReservoirSampler(int numSamples, Random random) { - super(numSamples); - Preconditions.checkArgument(numSamples >= 0, "numSamples should be non-negative."); - this.random = random; - } - - /** - * Create a new sampler with reservoir size and a default random number generator. - * - * @param numSamples Maximum number of samples to retain in reservoir, must be non-negative. - */ - public VeryFastReservoirSampler(int numSamples) { - this(numSamples, new XORShiftRandom()); - } - - /** - * Create a new sampler with reservoir size and the seed for random number generator. - * - * @param numSamples Maximum number of samples to retain in reservoir, must be non-negative. - * @param seed Random number generator seed. - */ - public VeryFastReservoirSampler(int numSamples, long seed) { - - this(numSamples, new XORShiftRandom(seed)); - } - - @Override - public Iterator> sampleInPartition(Iterator input) { - if (numSamples == 0) { - return EMPTY_INTERMEDIATE_ITERABLE; - } - PriorityQueue> queue = new PriorityQueue>(numSamples); - double probability; - IntermediateSampleData smallest = null; - int index = 0, k, gap = 0; - while (input.hasNext()) { - T element = input.next(); - double rand = random.nextDouble(); - if (index < THRESHOLD) { // if index is less than THRESHOLD, then use regular reservoir - if (index < numSamples) { - // Fill the queue with first K elements from input. - queue.add(new IntermediateSampleData(random.nextDouble(), element)); - smallest = queue.peek(); - } else { - // Remove the element with the smallest weight, and append current element into the queue. - if (rand > smallest.getWeight()) { - queue.remove(); - queue.add(new IntermediateSampleData(rand, element)); - smallest = queue.peek(); - } - } - index++; - } else { // fast section - probability = (double) numSamples / index; - double rand1 = random.nextDouble(); - double u = Math.max(rand1, EPSILON); - gap = (int) (Math.log(u) / Math.log(1 - probability)); - if (gap > 0) { - while (input.hasNext() && gap > 0) { - gap--; - element = input.next(); - index++; - } - } - queue.remove(); - queue.add(new IntermediateSampleData(rand, element)); - smallest = queue.peek(); - } - } - return queue.iterator(); - } -} diff --git a/flink-java/src/test/java/org/apache/flink/api/java/sampling/VeryFastReservoirSamplingTest.java b/flink-java/src/test/java/org/apache/flink/api/java/sampling/VeryFastReservoirSamplingTest.java deleted file mode 100644 index 98a9f5ced6ff0..0000000000000 --- a/flink-java/src/test/java/org/apache/flink/api/java/sampling/VeryFastReservoirSamplingTest.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.java.sampling; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.util.*; - -import static org.junit.Assert.assertTrue; - -/** - * This test suite try to verify whether all the random samplers work as we expected, which mainly focus on: - *
    - *
  • Does sampled result fit into input parameters? we check parameters like sample fraction, sample size, - * w/o replacement, and so on.
  • - *
  • Does sampled result randomly selected? we verify this by measure how much does it distributed on source data. - * Run Kolmogorov-Smirnov (KS) test between the random samplers and default reference samplers which is distributed - * well-proportioned on source data. If random sampler select elements randomly from source, it would distributed - * well-proportioned on source data as well. The KS test will fail to strongly reject the null hypothesis that - * the distributions of sampling gaps are the same. - *
  • - *
- * - * @see Kolmogorov Smirnov test - */ -public class VeryFastReservoirSamplingTest { - private final static int SOURCE_SIZE = 10000; - private static KolmogorovSmirnovTest ksTest; - private static List source; - private final static int DEFFAULT_PARTITION_NUMBER=10; - private List[] sourcePartitions = new List[DEFFAULT_PARTITION_NUMBER]; - - @BeforeClass - public static void init() { - // initiate source data set. - source = new ArrayList(SOURCE_SIZE); - for (int i = 0; i < SOURCE_SIZE; i++) { - source.add((double) i); - } - - ksTest = new KolmogorovSmirnovTest(); - } - - private void initSourcePartition() { - for (int i=0; i(); - } - for (int i = 0; i< SOURCE_SIZE; i++) { - int index = i % DEFFAULT_PARTITION_NUMBER; - sourcePartitions[index].add((double)i); - } - } -// -// @Test(expected = java.lang.IllegalArgumentException.class) -// public void testBernoulliSamplerWithUnexpectedFraction1() { -// verifySamplerFraction(-1, false); -// } -// -// @Test(expected = java.lang.IllegalArgumentException.class) -// public void testBernoulliSamplerWithUnexpectedFraction2() { -// verifySamplerFraction(2, false); -// } - - @Test - public void testVeryFastReservoirSampler() { - initSourcePartition(); - - verifyVeryFastReservoirSampler(10, false); - } - @Test - public void testVeryFastReservoirSamplerWithMultiSourcePartitions() { - initSourcePartition(); - - verifyVeryFastReservoirSampler(100, true); - } - - private int getSize(Iterator iterator) { - int size = 0; - while (iterator.hasNext()) { - iterator.next(); - size++; - } - return size; - } - - private void verifyVeryFastReservoirSampler(int numSamplers, boolean sampleOnPartitions) { - VeryFastReservoirSampler sampler = new VeryFastReservoirSampler(numSamplers); - verifyRandomSamplerWithSampleSize(numSamplers, sampler, true, sampleOnPartitions); - verifyRandomSamplerWithSampleSize(numSamplers, sampler, false, sampleOnPartitions); - } - - /* - * Verify whether random sampler sample with fixed size from source data randomly. There are two default sample, one is - * sampled from source data with certain interval, the other is sampled only from the first half region of source data, - * If random sampler select elements randomly from source, it would distributed well-proportioned on source data as well, - * so the K-S Test result would accept the first one, while reject the second one. - */ - private void verifyRandomSamplerWithSampleSize(int sampleSize, RandomSampler sampler, boolean withDefaultSampler, boolean sampleWithPartitions) { - double[] baseSample; - if (withDefaultSampler) { - baseSample = getDefaultSampler(sampleSize); - } else { - baseSample = getWrongSampler(sampleSize); - } - - verifyKSTest(sampler, baseSample, withDefaultSampler, sampleWithPartitions); - } - - private void verifyKSTest(RandomSampler sampler, double[] defaultSampler, boolean expectSuccess, boolean sampleOnPartitions) { - double[] sampled = getSampledOutput(sampler, sampleOnPartitions); - double pValue = ksTest.kolmogorovSmirnovStatistic(sampled, defaultSampler); - double dValue = getDValue(sampled.length, defaultSampler.length); - if (expectSuccess) { - assertTrue(String.format("KS test result with p value(%f), d value(%f)", pValue, dValue), pValue <= dValue); - } else { - assertTrue(String.format("KS test result with p value(%f), d value(%f)", pValue, dValue), pValue > dValue); - } - } - - private double[] getSampledOutput(RandomSampler sampler, boolean sampleOnPartitions) { - Iterator sampled = null; - if (sampleOnPartitions) { - DistributedRandomSampler reservoirRandomSampler = (DistributedRandomSampler)sampler; - List> intermediateResult = Lists.newLinkedList(); - for (int i=0; i> partialIntermediateResult = reservoirRandomSampler.sampleInPartition(sourcePartitions[i].iterator()); - while (partialIntermediateResult.hasNext()) { - intermediateResult.add(partialIntermediateResult.next()); - } - } - sampled = reservoirRandomSampler.sampleInCoordinator(intermediateResult.iterator()); - } else { - sampled = sampler.sample(source.iterator()); - } - List list = Lists.newArrayList(); - while (sampled.hasNext()) { - list.add(sampled.next()); - } - double[] result = transferFromListToArrayWithOrder(list); - return result; - } - - /* - * Some sample result may not order by the input sequence, we should make it in order to do K-S test. - */ - private double[] transferFromListToArrayWithOrder(List list) { - Collections.sort(list); - double[] result = new double[list.size()]; - for (int i = 0; i < list.size(); i++) { - result[i] = list.get(i); - } - return result; - } - - private double[] getDefaultSampler(int fixSize) { - Preconditions.checkArgument(fixSize > 0, "Sample fraction should be positive."); - int size = fixSize; - double step = SOURCE_SIZE / (double) fixSize; - double[] defaultSampler = new double[size]; - for (int i = 0; i < size; i++) { - defaultSampler[i] = Math.round(step * i); - } - - return defaultSampler; - } - - /* - * Build a failed sample distribution which only contains elements in the first half of source data. - */ - private double[] getWrongSampler(int fixSize) { - Preconditions.checkArgument(fixSize > 0, "Sample size be positive."); - int halfSourceSize = SOURCE_SIZE / 2; - int size = fixSize; - double[] wrongSampler = new double[size]; - for (int i = 0; i < size; i++) { - wrongSampler[i] = (double) i % halfSourceSize; - } - - return wrongSampler; - } - - /* - * Calculate the D value of K-S test for p-value 0.01, m and n are the sample size - */ - private double getDValue(int m, int n) { - Preconditions.checkArgument(m > 0, "input sample size should be positive."); - Preconditions.checkArgument(n > 0, "input sample size should be positive."); - double first = (double) m; - double second = (double) n; - return 1.63 * Math.sqrt((first + second) / (first * second)); - } -}