From 51c26c2b7010a2860efbd5f618f6999b00927a26 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Tue, 14 Oct 2014 20:53:59 +0100 Subject: [PATCH 1/4] Make JavaPairRDD.collectAsMap result Serializable since Java Maps generally are --- .../main/scala/org/apache/spark/api/java/JavaPairRDD.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 0846225e4f992..1c03b2f9d53b4 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -17,6 +17,7 @@ package org.apache.spark.api.java +import java.io.Serializable import java.util.{Comparator, List => JList, Map => JMap} import java.lang.{Iterable => JIterable} @@ -614,7 +615,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) /** * Return the key-value pairs in this RDD to the master as a Map. */ - def collectAsMap(): java.util.Map[K, V] = mapAsJavaMap(rdd.collectAsMap()) + def collectAsMap(): java.util.Map[K, V] = new SerializableMapWrapper(rdd.collectAsMap()) + + class SerializableMapWrapper(underlying: collection.Map[K, V]) + extends MapWrapper(underlying) with Serializable /** * Pass each value in the key-value pair RDD through a map function without changing the keys; From ae1b36f57ac876467b23dcc003cc81a8c6881341 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Wed, 15 Oct 2014 09:38:36 +0100 Subject: [PATCH 2/4] Expand to cover Maps returned from other Java API methods as well --- .../org/apache/spark/api/java/JavaPairRDD.scala | 13 +++++-------- .../org/apache/spark/api/java/JavaRDDLike.scala | 14 +++++++++++--- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 1c03b2f9d53b4..baf24203145c5 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -17,7 +17,6 @@ package org.apache.spark.api.java -import java.io.Serializable import java.util.{Comparator, List => JList, Map => JMap} import java.lang.{Iterable => JIterable} @@ -266,10 +265,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * before sending results to a reducer, similarly to a "combiner" in MapReduce. */ def reduceByKeyLocally(func: JFunction2[V, V, V]): java.util.Map[K, V] = - mapAsJavaMap(rdd.reduceByKeyLocally(func)) + mapAsSerializableJavaMap(rdd.reduceByKeyLocally(func)) /** Count the number of elements for each key, and return the result to the master as a Map. */ - def countByKey(): java.util.Map[K, Long] = mapAsJavaMap(rdd.countByKey()) + def countByKey(): java.util.Map[K, Long] = mapAsSerializableJavaMap(rdd.countByKey()) /** * :: Experimental :: @@ -278,7 +277,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) */ @Experimental def countByKeyApprox(timeout: Long): PartialResult[java.util.Map[K, BoundedDouble]] = - rdd.countByKeyApprox(timeout).map(mapAsJavaMap) + rdd.countByKeyApprox(timeout).map(mapAsSerializableJavaMap) /** * :: Experimental :: @@ -288,7 +287,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) @Experimental def countByKeyApprox(timeout: Long, confidence: Double = 0.95) : PartialResult[java.util.Map[K, BoundedDouble]] = - rdd.countByKeyApprox(timeout, confidence).map(mapAsJavaMap) + rdd.countByKeyApprox(timeout, confidence).map(mapAsSerializableJavaMap) /** * Aggregate the values of each key, using given combine functions and a neutral "zero value". @@ -615,10 +614,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) /** * Return the key-value pairs in this RDD to the master as a Map. */ - def collectAsMap(): java.util.Map[K, V] = new SerializableMapWrapper(rdd.collectAsMap()) + def collectAsMap(): java.util.Map[K, V] = mapAsSerializableJavaMap(rdd.collectAsMap()) - class SerializableMapWrapper(underlying: collection.Map[K, V]) - extends MapWrapper(underlying) with Serializable /** * Pass each value in the key-value pair RDD through a map function without changing the keys; diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 545bc0e9e99ed..4ce32a48dc94f 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -17,6 +17,7 @@ package org.apache.spark.api.java +import java.io.Serializable import java.util.{Comparator, List => JList, Iterator => JIterator} import java.lang.{Iterable => JIterable, Long => JLong} @@ -390,7 +391,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * combine step happens locally on the master, equivalent to running a single reduce task. */ def countByValue(): java.util.Map[T, java.lang.Long] = - mapAsJavaMap(rdd.countByValue().map((x => (x._1, new java.lang.Long(x._2))))) + mapAsSerializableJavaMap(rdd.countByValue().map((x => (x._1, new java.lang.Long(x._2))))) /** * (Experimental) Approximate version of countByValue(). @@ -399,13 +400,13 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { timeout: Long, confidence: Double ): PartialResult[java.util.Map[T, BoundedDouble]] = - rdd.countByValueApprox(timeout, confidence).map(mapAsJavaMap) + rdd.countByValueApprox(timeout, confidence).map(mapAsSerializableJavaMap) /** * (Experimental) Approximate version of countByValue(). */ def countByValueApprox(timeout: Long): PartialResult[java.util.Map[T, BoundedDouble]] = - rdd.countByValueApprox(timeout).map(mapAsJavaMap) + rdd.countByValueApprox(timeout).map(mapAsSerializableJavaMap) /** * Take the first num elements of the RDD. This currently scans the partitions *one by one*, so @@ -587,4 +588,11 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { rdd.foreachAsync(x => f.call(x)) } + private[java] def mapAsSerializableJavaMap[A, B](underlying: collection.Map[A, B]) = + new SerializableMapWrapper(underlying) + + private class SerializableMapWrapper[A, B](underlying: collection.Map[A, B]) + extends MapWrapper(underlying) with Serializable + + } From f4717f972f55dd43b4c84e8c9d3e21ff163e8b9c Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Wed, 15 Oct 2014 16:35:11 +0100 Subject: [PATCH 3/4] Oops, fix compile problem --- core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 4ce32a48dc94f..e56b5d096eb1d 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -591,7 +591,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { private[java] def mapAsSerializableJavaMap[A, B](underlying: collection.Map[A, B]) = new SerializableMapWrapper(underlying) - private class SerializableMapWrapper[A, B](underlying: collection.Map[A, B]) + private[java] class SerializableMapWrapper[A, B](underlying: collection.Map[A, B]) extends MapWrapper(underlying) with Serializable From ecb78eef6bd00d1d13206fafb6e0d8b4c565ebf3 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Wed, 15 Oct 2014 18:08:35 +0100 Subject: [PATCH 4/4] Fix conflict between java.io.Serializable and use of Scala's Serializable --- .../src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index e56b5d096eb1d..e59df7a50cbc3 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -17,7 +17,6 @@ package org.apache.spark.api.java -import java.io.Serializable import java.util.{Comparator, List => JList, Iterator => JIterator} import java.lang.{Iterable => JIterable, Long => JLong} @@ -592,7 +591,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { new SerializableMapWrapper(underlying) private[java] class SerializableMapWrapper[A, B](underlying: collection.Map[A, B]) - extends MapWrapper(underlying) with Serializable + extends MapWrapper(underlying) with java.io.Serializable }