From 41cf61e5bbee5ee18ca2eb94fbfbab845ed5785e Mon Sep 17 00:00:00 2001 From: drJAGarnter Date: Sun, 17 Jan 2016 07:30:29 -0800 Subject: [PATCH] Add ordered split to RDD class --- python/pyspark/rdd.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index a019c05862549..f59564bb6b41e 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -421,6 +421,30 @@ def randomSplit(self, weights, seed=None): return [self.mapPartitionsWithIndex(RDDRangeSampler(lb, ub, seed).func, True) for lb, ub in zip(cweights, cweights[1:])] + def orderedSplit(self, f): + """ + Splits this RDD via functional binary classification. + + :param func: filter function for creating rdds + :return: split RDDs in a list + + >>> rdd = sc.parallelize([('f', 0), ('n', 4), ('i', 2), ('o', 7), ('b', 8), ('t', 9)]) + >>> def func(record): + ... if record[1] in [0, 1, 1, 2, 3, 5, 8]: + ... return True + ... return False + >>> rdd1, rdd2 = rdd.orderedSplit(func) + >>> rdd1.collect() + [('f', 0), ('i', 2), ('b', 8)] + >>> rdd2.collect() + [('n', 4), ('o', 7), ('t', 9)] + """ + def func(iterator): + return filter(f, iterator) + def funcs_comp(iterator): + return filter(lambda x: f(x) is not True, iterator) + return [self.mapPartitions(func, True), self.mapPartitions(funcs_comp, True)] + # this is ported from scala/spark/RDD.scala def takeSample(self, withReplacement, num, seed=None): """