From 1403816f8287aeee316b27ba569ce607fdb0ed2c Mon Sep 17 00:00:00 2001 From: Alain Date: Fri, 24 Apr 2015 03:24:51 -0700 Subject: [PATCH] [PYSPARK] Add percentile method in rdd as numpy 1. Add percentile method in rdd 2. By default, get the kth percentile element from bottom(ascending order) 3. By specifying key, it can return top or even user-defined kth percentile element 4. Tested it --- python/pyspark/rdd.py | 21 ++++++++++++++++++++- python/pyspark/tests.py | 11 +++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index d254deb527d10..a662d04499a5d 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -32,7 +32,7 @@ from collections import defaultdict from itertools import chain from functools import reduce -from math import sqrt, log, isinf, isnan, pow, ceil +from math import sqrt, log, isinf, isnan, pow, ceil, floor if sys.version > '3': basestring = unicode = str @@ -1149,6 +1149,25 @@ def sampleVariance(self): """ return self.stats().sampleVariance() + def percentile(self, k, key=None): + """ + Returns the kth percentile of this RDD's elements given a key. + + >>> sc.parallelize(xrange(10)).percentile(0) + 0.0 + """ + assert 0 <= k <= 100 + c = self.count() + if c == 1: + return float(self.first()) + else: + idx = (k / 100.) * (c - 1.) + idx_below = int(floor(idx)) + idx_above = min(idx_below + 1, c-1) + weights_above = idx - idx_below + weights_below = 1.0 - weights_above + return float(weights_below * self.takeOrdered(idx_below+1, key)[-1] + self.takeOrdered(idx_above+1, key)[-1] * weights_above) + def countByValue(self): """ Return the count of each unique value in this RDD as a dictionary of diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index ea63a396da5b8..18e20d1266b2b 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -1832,6 +1832,17 @@ def test_statcounter_array(self): self.assertSequenceEqual([3.0, 3.0], s.max().tolist()) self.assertSequenceEqual([1.0, 1.0], s.sampleStdev().tolist()) + y = self.sc.parallelize(np.arange(10)) + rev = lambda x: -x + self.assertSequenceEqual([0.0, 9.0], [round(y.percentile(0), 2), round(y.percentile(0, rev), 2)]) + self.assertSequenceEqual([9.0, 0.0], [round(y.percentile(100), 2), round(y.percentile(100, rev), 2)]) + self.assertSequenceEqual([0.09, 8.91], [round(y.percentile(1), 2), round(y.percentile(1, rev), 2)]) + self.assertSequenceEqual([8.91, 0.09], [round(y.percentile(99), 2), round(y.percentile(99, rev), 2)]) + self.assertSequenceEqual([0.9, 8.1], [round(y.percentile(10), 2), round(y.percentile(10, rev), 2)]) + self.assertSequenceEqual([8.1, 0.9], [round(y.percentile(90), 2), round(y.percentile(90, rev), 2)]) + self.assertSequenceEqual([4.5, 4.5], [round(y.percentile(50), 2), round(y.percentile(50, rev), 2)]) + + if __name__ == "__main__": if not _have_scipy: