From ca4490da536578ef4650039b099db8dafc9d6b66 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Fri, 24 Jan 2014 00:26:55 -0800 Subject: [PATCH 01/18] Add .sortBy(f) method on RDD --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 8 ++++++++ .../test/scala/org/apache/spark/rdd/RDDSuite.scala | 12 ++++++++++++ 2 files changed, 20 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index cd90a1561a975..9583f22abce2a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -371,6 +371,14 @@ abstract class RDD[T: ClassTag]( */ def ++(other: RDD[T]): RDD[T] = this.union(other) + /** + * Return this RDD sorted by the given key function. + */ + def sortBy[K <% Ordered[K]: ClassTag](f: T => K): RDD[T] = + this.keyBy[K](f) + .sortByKey() + .values + /** * Return an RDD created by coalescing all elements within each partition into an array. */ diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 559ea051d3533..3b37fcf9a506b 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -491,4 +491,16 @@ class RDDSuite extends FunSuite with SharedSparkContext { sc.runJob(sc.parallelize(1 to 10, 2), {iter: Iterator[Int] => iter.size}, Seq(0, 1, 2), false) } } + + test("sortByKey") { + val data = sc.parallelize(Seq("5|50|A","4|60|C", "6|40|B")) + + val one = Array("4|60|C", "5|50|A", "6|40|B") + val two = Array("6|40|B", "5|50|A", "4|60|C") + val three = Array("5|50|A", "6|40|B", "4|60|C") + + assert(data.sortBy(_.split("\\|")(0)).collect === one) + assert(data.sortBy(_.split("\\|")(1)).collect === two) + assert(data.sortBy(_.split("\\|")(2)).collect === three) + } } From 7db3e849c5a9e4a3189ea594e349835cef6d307e Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Thu, 13 Feb 2014 22:27:06 -0800 Subject: [PATCH 02/18] Support ascending and numPartitions params in sortBy() --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index b8771bc9f0039..117f969349b97 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -396,7 +396,10 @@ abstract class RDD[T: ClassTag]( /** * Return this RDD sorted by the given key function. */ - def sortBy[K <% Ordered[K]: ClassTag](f: T => K): RDD[T] = + def sortBy[K: Ordering: ClassTag]( + f: (T) ⇒ K, + ascending: Boolean = true, + numPartitions: Int = self.partitions.size): RDD[T] = this.keyBy[K](f) .sortByKey() .values From 381eef23f59a44b0555de9bb63fc8e598595ef32 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Thu, 13 Feb 2014 22:32:19 -0800 Subject: [PATCH 03/18] Correct silly typo --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 117f969349b97..021ad3c09036a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -399,7 +399,7 @@ abstract class RDD[T: ClassTag]( def sortBy[K: Ordering: ClassTag]( f: (T) ⇒ K, ascending: Boolean = true, - numPartitions: Int = self.partitions.size): RDD[T] = + numPartitions: Int = this.partitions.size): RDD[T] = this.keyBy[K](f) .sortByKey() .values From 8c53298cfeebcba7e08ef8c586816e7513daf11b Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Mon, 24 Feb 2014 16:19:02 -0800 Subject: [PATCH 04/18] Actually use ascending and numPartitions parameters --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 021ad3c09036a..64cba4cc8cd6e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -401,7 +401,7 @@ abstract class RDD[T: ClassTag]( ascending: Boolean = true, numPartitions: Int = this.partitions.size): RDD[T] = this.keyBy[K](f) - .sortByKey() + .sortByKey(ascending, numPartitions) .values /** From b8b5bbc5465aa0cc7d137c624bbb4cc1bb58555f Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Thu, 10 Apr 2014 00:29:43 +0100 Subject: [PATCH 05/18] Align remove extra spaces that were used to align ='s in test code --- .../test/scala/org/apache/spark/rdd/RDDSuite.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index bb5d47e0a4739..921697ae941c4 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -510,13 +510,13 @@ class RDDSuite extends FunSuite with SharedSparkContext { test("sortByKey") { val data = sc.parallelize(Seq("5|50|A","4|60|C", "6|40|B")) - val one = Array("4|60|C", "5|50|A", "6|40|B") - val two = Array("6|40|B", "5|50|A", "4|60|C") - val three = Array("5|50|A", "6|40|B", "4|60|C") + val col1 = Array("4|60|C", "5|50|A", "6|40|B") + val col2 = Array("6|40|B", "5|50|A", "4|60|C") + val col3 = Array("5|50|A", "6|40|B", "4|60|C") - assert(data.sortBy(_.split("\\|")(0)).collect === one) - assert(data.sortBy(_.split("\\|")(1)).collect === two) - assert(data.sortBy(_.split("\\|")(2)).collect === three) + assert(data.sortBy(_.split("\\|")(0)).collect === col1) + assert(data.sortBy(_.split("\\|")(1)).collect === col2) + assert(data.sortBy(_.split("\\|")(2)).collect === col3) } test("intersection") { From 3fd0dd373d43b5a5f9ec9bf4fa7b5db0599c6a68 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Thu, 10 Apr 2014 02:23:26 +0100 Subject: [PATCH 06/18] Add (failing) test for sortByKey with explicit Ordering --- .../main/scala/org/apache/spark/rdd/RDD.scala | 5 ++- .../scala/org/apache/spark/rdd/RDDSuite.scala | 38 +++++++++++++++++++ 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 64cba4cc8cd6e..2098b29697db9 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -396,10 +396,11 @@ abstract class RDD[T: ClassTag]( /** * Return this RDD sorted by the given key function. */ - def sortBy[K: Ordering: ClassTag]( + def sortBy[K]( f: (T) ⇒ K, ascending: Boolean = true, - numPartitions: Int = this.partitions.size): RDD[T] = + numPartitions: Int = this.partitions.size) + (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = this.keyBy[K](f) .sortByKey(ascending, numPartitions) .values diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 921697ae941c4..4d532ae713fd8 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -519,6 +519,44 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(data.sortBy(_.split("\\|")(2)).collect === col3) } + test("sortByKey ascending parameter") { + val data = sc.parallelize(Seq("5|50|A","4|60|C", "6|40|B")) + + val asc = Array("4|60|C", "5|50|A", "6|40|B") + val desc = Array("6|40|B", "5|50|A", "4|60|C") + + assert(data.sortBy(_.split("\\|")(0), true).collect === asc) + assert(data.sortBy(_.split("\\|")(0), false).collect === desc) + } + + test("sortByKey with explicit ordering") { + val data = sc.parallelize(Seq("Bob|Smith|50", "Jane|Smith|40", "Thomas|Williams|30", "Karen|Williams|60")) + + val ageOrdered = Array("Thomas|Williams|30", "Jane|Smith|40", "Bob|Smith|50", "Karen|Williams|60") + // last name, then first name + val nameOrdered = Array("Bob|Smith|50", "Jane|Smith|40", "Karen|Williams|60", "Thomas|Williams|30") + + case class Person(first: String, last: String, age: Int) + + def parse(s: String): Person = { + val split = s.split("\\|") + Person(split(0), split(1), split(2).toInt) + } + + object AgeOrdering extends Ordering[Person] { + def compare(a:Person, b:Person) = a.age compare b.age + } + + object NameOrdering extends Ordering[Person] { + def compare(a:Person, b:Person) = + implicitly[Ordering[Tuple2[String,String]]].compare((a.last, a.first), (b.last, b.first)) + } + + import scala.reflect._ + assert(data.sortBy(parse, false, 2)(AgeOrdering, classTag[Person]) === ageOrdered) + assert(data.sortBy(parse, false, 2)(NameOrdering, classTag[Person]) === nameOrdered) + } + test("intersection") { val all = sc.parallelize(1 to 10) val evens = sc.parallelize(2 to 10 by 2) From 222ae9739b67aea61bef025bde4ba24aaeb81f10 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Sat, 12 Apr 2014 01:30:38 +0100 Subject: [PATCH 07/18] Try moving Ordering objects out to a different class Still captures a SparkConf from somewhere though and fails to serialize the task, causing test failure --- .../scala/org/apache/spark/rdd/RDDSuite.scala | 15 +++------------ .../org/apache/spark/rdd/RDDSuiteUtils.scala | 14 ++++++++++++++ 2 files changed, 17 insertions(+), 12 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/rdd/RDDSuiteUtils.scala diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 4d532ae713fd8..f420d5e535d53 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -26,6 +26,8 @@ import org.apache.spark.rdd._ import scala.collection.parallel.mutable import org.apache.spark._ +import org.apache.spark.rdd.RDDSuiteUtils._ + class RDDSuite extends FunSuite with SharedSparkContext { test("basic operations") { @@ -536,23 +538,12 @@ class RDDSuite extends FunSuite with SharedSparkContext { // last name, then first name val nameOrdered = Array("Bob|Smith|50", "Jane|Smith|40", "Karen|Williams|60", "Thomas|Williams|30") - case class Person(first: String, last: String, age: Int) - def parse(s: String): Person = { val split = s.split("\\|") Person(split(0), split(1), split(2).toInt) } - object AgeOrdering extends Ordering[Person] { - def compare(a:Person, b:Person) = a.age compare b.age - } - - object NameOrdering extends Ordering[Person] { - def compare(a:Person, b:Person) = - implicitly[Ordering[Tuple2[String,String]]].compare((a.last, a.first), (b.last, b.first)) - } - - import scala.reflect._ + import scala.reflect.classTag assert(data.sortBy(parse, false, 2)(AgeOrdering, classTag[Person]) === ageOrdered) assert(data.sortBy(parse, false, 2)(NameOrdering, classTag[Person]) === nameOrdered) } diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuiteUtils.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuiteUtils.scala new file mode 100644 index 0000000000000..0db86b2cdc210 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuiteUtils.scala @@ -0,0 +1,14 @@ +package org.apache.spark.rdd + +object RDDSuiteUtils { + case class Person(first: String, last: String, age: Int) + + object AgeOrdering extends Ordering[Person] { + def compare(a:Person, b:Person) = a.age compare b.age + } + + object NameOrdering extends Ordering[Person] { + def compare(a:Person, b:Person) = + implicitly[Ordering[Tuple2[String,String]]].compare((a.last, a.first), (b.last, b.first)) + } +} From 0457b690143149ad0d903fbd70fa76e2699e3b35 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Tue, 6 May 2014 22:29:51 -0700 Subject: [PATCH 08/18] Ignore failing test Issues with the serialization in the test framework capturing too much, but I expect this to work otherwise --- core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 0afaf2454d167..8aacc4c400939 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -565,7 +565,8 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(data.sortBy(_.split("\\|")(0), false).collect === desc) } - test("sortByKey with explicit ordering") { + // issues with serialization of Ordering in the test + ignore("sortByKey with explicit ordering") { val data = sc.parallelize(Seq("Bob|Smith|50", "Jane|Smith|40", "Thomas|Williams|30", "Karen|Williams|60")) val ageOrdered = Array("Thomas|Williams|30", "Jane|Smith|40", "Bob|Smith|50", "Karen|Williams|60") From 9d9b9d8fb81b8e329b6d6d12ded07a56b63f4d60 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Mon, 12 May 2014 16:08:41 -0700 Subject: [PATCH 09/18] Use parentheses on .collect() calls --- .../scala/org/apache/spark/rdd/RDDSuite.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 8aacc4c400939..cd328da165641 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -550,9 +550,9 @@ class RDDSuite extends FunSuite with SharedSparkContext { val col2 = Array("6|40|B", "5|50|A", "4|60|C") val col3 = Array("5|50|A", "6|40|B", "4|60|C") - assert(data.sortBy(_.split("\\|")(0)).collect === col1) - assert(data.sortBy(_.split("\\|")(1)).collect === col2) - assert(data.sortBy(_.split("\\|")(2)).collect === col3) + assert(data.sortBy(_.split("\\|")(0)).collect() === col1) + assert(data.sortBy(_.split("\\|")(1)).collect() === col2) + assert(data.sortBy(_.split("\\|")(2)).collect() === col3) } test("sortByKey ascending parameter") { @@ -561,8 +561,8 @@ class RDDSuite extends FunSuite with SharedSparkContext { val asc = Array("4|60|C", "5|50|A", "6|40|B") val desc = Array("6|40|B", "5|50|A", "4|60|C") - assert(data.sortBy(_.split("\\|")(0), true).collect === asc) - assert(data.sortBy(_.split("\\|")(0), false).collect === desc) + assert(data.sortBy(_.split("\\|")(0), true).collect() === asc) + assert(data.sortBy(_.split("\\|")(0), false).collect() === desc) } // issues with serialization of Ordering in the test @@ -589,8 +589,8 @@ class RDDSuite extends FunSuite with SharedSparkContext { val intersection = Array(2, 4, 6, 8, 10) // intersection is commutative - assert(all.intersection(evens).collect.sorted === intersection) - assert(evens.intersection(all).collect.sorted === intersection) + assert(all.intersection(evens).collect().sorted === intersection) + assert(evens.intersection(all).collect().sorted === intersection) } test("intersection strips duplicates in an input") { @@ -598,8 +598,8 @@ class RDDSuite extends FunSuite with SharedSparkContext { val b = sc.parallelize(Seq(1,1,2,3)) val intersection = Array(1,2,3) - assert(a.intersection(b).collect.sorted === intersection) - assert(b.intersection(a).collect.sorted === intersection) + assert(a.intersection(b).collect().sorted === intersection) + assert(b.intersection(a).collect().sorted === intersection) } test("zipWithIndex") { From adf84c5e941085a2e7c1c2ddd84d18279a556fb5 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Mon, 12 May 2014 16:11:51 -0700 Subject: [PATCH 10/18] Re-indent to keep line lengths under 100 chars --- .../scala/org/apache/spark/rdd/RDDSuite.scala | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index cd328da165641..357b1f00d6ff8 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -567,11 +567,21 @@ class RDDSuite extends FunSuite with SharedSparkContext { // issues with serialization of Ordering in the test ignore("sortByKey with explicit ordering") { - val data = sc.parallelize(Seq("Bob|Smith|50", "Jane|Smith|40", "Thomas|Williams|30", "Karen|Williams|60")) + val data = sc.parallelize(Seq("Bob|Smith|50", + "Jane|Smith|40", + "Thomas|Williams|30", + "Karen|Williams|60")) + + val ageOrdered = Array("Thomas|Williams|30", + "Jane|Smith|40", + "Bob|Smith|50", + "Karen|Williams|60") - val ageOrdered = Array("Thomas|Williams|30", "Jane|Smith|40", "Bob|Smith|50", "Karen|Williams|60") // last name, then first name - val nameOrdered = Array("Bob|Smith|50", "Jane|Smith|40", "Karen|Williams|60", "Thomas|Williams|30") + val nameOrdered = Array("Bob|Smith|50", + "Jane|Smith|40", + "Karen|Williams|60", + "Thomas|Williams|30") def parse(s: String): Person = { val split = s.split("\\|") From 45e0fde9debd9807d7226975767936a81335a546 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Mon, 12 May 2014 16:53:55 -0700 Subject: [PATCH 11/18] Add Java version of .sortBy() --- .../org/apache/spark/api/java/JavaRDD.scala | 18 ++++++++++ .../java/org/apache/spark/JavaAPISuite.java | 33 +++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index dc698dea75e43..33f3c49db7a90 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -17,10 +17,13 @@ package org.apache.spark.api.java +import java.util.Comparator + import scala.language.implicitConversions import scala.reflect.ClassTag import org.apache.spark._ +import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -150,6 +153,21 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) rdd.setName(name) this } + + /** + * Return this RDD sorted by the given key function. + */ + def sortBy[S](f: JFunction[T, S], ascending: Boolean, numPartitions: Int): JavaRDD[T] = { + import scala.collection.JavaConverters._ + //def fn = (x: T) => f.call(x).asScala + def fn = (x: T) => f.call(x) + implicit val ordering = com.google.common.collect.Ordering.natural().asInstanceOf[Ordering[S]] + //implicit val ordering = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] + //implicit val ordering = comp // Allow implicit conversion of Comparator to Ordering. + implicit val ctag: ClassTag[S] = fakeClassTag + wrapRDD(rdd.sortBy(fn, ascending, numPartitions)) + } + } object JavaRDD { diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index c3e03cea917b3..1d378ccdd2442 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -167,6 +167,39 @@ public void sortByKey() { Assert.assertEquals(new Tuple2(3, 2), sortedPairs.get(2)); } + @Test + public void sortBy() { + List> pairs = new ArrayList>(); + pairs.add(new Tuple2(0, 4)); + pairs.add(new Tuple2(3, 2)); + pairs.add(new Tuple2(-1, 1)); + + JavaRDD> rdd = sc.parallelize(pairs); + + // compare on first value + JavaRDD> sortedRDD = rdd.sortBy(new Function, Integer>() { + public Integer call(Tuple2 t) throws Exception { + return t._1(); + } + }, true, 2); + + Assert.assertEquals(new Tuple2(-1, 1), sortedRDD.first()); + List> sortedPairs = sortedRDD.collect(); + Assert.assertEquals(new Tuple2(0, 4), sortedPairs.get(1)); + Assert.assertEquals(new Tuple2(3, 2), sortedPairs.get(2)); + + // compare on second value + sortedRDD = rdd.sortBy(new Function, Integer>() { + public Integer call(Tuple2 t) throws Exception { + return t._2(); + } + }, true, 2); + Assert.assertEquals(new Tuple2(-1, 1), sortedRDD.first()); + sortedPairs = sortedRDD.collect(); + Assert.assertEquals(new Tuple2(3, 2), sortedPairs.get(1)); + Assert.assertEquals(new Tuple2(0, 4), sortedPairs.get(2)); + } + static int foreachCalls = 0; @Test From 63638b5af4ba107149a2cc06554ef046b4e04bc6 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Mon, 12 May 2014 17:19:13 -0700 Subject: [PATCH 12/18] Add Python version of .sortBy() --- python/pyspark/rdd.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index a59778c72130e..105b4ec1f0b1f 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -495,6 +495,18 @@ def mapFunc(iterator): .mapPartitions(mapFunc,preservesPartitioning=True) .flatMap(lambda x: x, preservesPartitioning=True)) + def sortBy(self, keyfunc, ascending=True, numPartitions=None): + """ + Sorts this RDD by the given keyfunc + + >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] + >>> sc.parallelize(tmp).sortBy(lambda x: x[0]).collect() + [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] + >>> sc.parallelize(tmp).sortBy(lambda x: x[1]).collect() + [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] + """ + return self.keyBy(keyfunc).sortByKey(ascending, numPartitions).values() + def glom(self): """ Return an RDD created by coalescing all elements within each partition @@ -891,6 +903,14 @@ def saveAsTextFile(self, path): >>> from glob import glob >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*")))) '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n' + + Empty lines are tolerated when saving to text files. + + >>> tempFile2 = NamedTemporaryFile(delete=True) + >>> tempFile2.close() + >>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name) + >>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*")))) + '\\n\\n\\nbar\\nfoo\\n' """ def func(split, iterator): for x in iterator: From d4de69a8e61d5544c42c63cf2808812e75290bbc Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Mon, 12 May 2014 17:35:59 -0700 Subject: [PATCH 13/18] Remove scar tissue --- core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index 33f3c49db7a90..a702f07b452e5 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -159,11 +159,8 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) */ def sortBy[S](f: JFunction[T, S], ascending: Boolean, numPartitions: Int): JavaRDD[T] = { import scala.collection.JavaConverters._ - //def fn = (x: T) => f.call(x).asScala def fn = (x: T) => f.call(x) implicit val ordering = com.google.common.collect.Ordering.natural().asInstanceOf[Ordering[S]] - //implicit val ordering = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] - //implicit val ordering = comp // Allow implicit conversion of Comparator to Ordering. implicit val ctag: ClassTag[S] = fakeClassTag wrapRDD(rdd.sortBy(fn, ascending, numPartitions)) } From 64ed6e381632a2b41c286a9eab081b01090d51d3 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Mon, 12 May 2014 17:37:16 -0700 Subject: [PATCH 14/18] Remove leaked diff --- python/pyspark/rdd.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 105b4ec1f0b1f..f53d8d4abf14f 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -903,14 +903,6 @@ def saveAsTextFile(self, path): >>> from glob import glob >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*")))) '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n' - - Empty lines are tolerated when saving to text files. - - >>> tempFile2 = NamedTemporaryFile(delete=True) - >>> tempFile2.close() - >>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name) - >>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*")))) - '\\n\\n\\nbar\\nfoo\\n' """ def func(split, iterator): for x in iterator: From 5a953482138e4c4c706b8a28c7a40dc06445a826 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Tue, 13 May 2014 22:59:51 -0700 Subject: [PATCH 15/18] Add license for RDDSuiteUtils --- .../org/apache/spark/rdd/RDDSuiteUtils.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuiteUtils.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuiteUtils.scala index 0db86b2cdc210..4762fc17855ce 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuiteUtils.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuiteUtils.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.rdd object RDDSuiteUtils { From 29a54edd6d4fcc015e55e3ac723b1239e83a0185 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Tue, 13 May 2014 23:45:36 -0700 Subject: [PATCH 16/18] Re-enable test by converting to a closure --- core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 357b1f00d6ff8..25f8694ad9d37 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -565,8 +565,7 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(data.sortBy(_.split("\\|")(0), false).collect() === desc) } - // issues with serialization of Ordering in the test - ignore("sortByKey with explicit ordering") { + test("sortByKey with explicit ordering") { val data = sc.parallelize(Seq("Bob|Smith|50", "Jane|Smith|40", "Thomas|Williams|30", @@ -583,14 +582,14 @@ class RDDSuite extends FunSuite with SharedSparkContext { "Karen|Williams|60", "Thomas|Williams|30") - def parse(s: String): Person = { + val parse = (s: String) => { val split = s.split("\\|") Person(split(0), split(1), split(2).toInt) } import scala.reflect.classTag - assert(data.sortBy(parse, false, 2)(AgeOrdering, classTag[Person]) === ageOrdered) - assert(data.sortBy(parse, false, 2)(NameOrdering, classTag[Person]) === nameOrdered) + assert(data.sortBy(parse, true, 2)(AgeOrdering, classTag[Person]) === ageOrdered) + assert(data.sortBy(parse, true, 2)(NameOrdering, classTag[Person]) === nameOrdered) } test("intersection") { From 43d0a5327c467d77cf9320ab02f3cf87f913db37 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Wed, 14 May 2014 00:07:36 -0700 Subject: [PATCH 17/18] Fix missing .collect() --- core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 25f8694ad9d37..aaf66d5309574 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -588,8 +588,8 @@ class RDDSuite extends FunSuite with SharedSparkContext { } import scala.reflect.classTag - assert(data.sortBy(parse, true, 2)(AgeOrdering, classTag[Person]) === ageOrdered) - assert(data.sortBy(parse, true, 2)(NameOrdering, classTag[Person]) === nameOrdered) + assert(data.sortBy(parse, true, 2)(AgeOrdering, classTag[Person]).collect() === ageOrdered) + assert(data.sortBy(parse, true, 2)(NameOrdering, classTag[Person]).collect() === nameOrdered) } test("intersection") { From d09147a4a472cfd94f535b2e7924ff6957338926 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Mon, 19 May 2014 16:07:00 -0700 Subject: [PATCH 18/18] Fix Ordering import --- core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index a702f07b452e5..7cadf7c3b917f 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -160,7 +160,8 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) def sortBy[S](f: JFunction[T, S], ascending: Boolean, numPartitions: Int): JavaRDD[T] = { import scala.collection.JavaConverters._ def fn = (x: T) => f.call(x) - implicit val ordering = com.google.common.collect.Ordering.natural().asInstanceOf[Ordering[S]] + import com.google.common.collect.Ordering // shadows scala.math.Ordering + implicit val ordering = Ordering.natural().asInstanceOf[Ordering[S]] implicit val ctag: ClassTag[S] = fakeClassTag wrapRDD(rdd.sortBy(fn, ascending, numPartitions)) }