From 585348553726669f82bf7a24a7c7c23c8e77d5ba Mon Sep 17 00:00:00 2001 From: Winston Chen Date: Mon, 10 Nov 2014 18:17:08 -0800 Subject: [PATCH 1/4] get the dependency ready --- core/pom.xml | 5 +++++ .../scala/org/apache/spark/api/java/JavaSparkContext.scala | 2 ++ 2 files changed, 7 insertions(+) diff --git a/core/pom.xml b/core/pom.xml index 41296e0eca33..90a76db00f74 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -301,6 +301,11 @@ py4j 0.8.2.1 + + org.elasticsearch + elasticsearch-spark_2.10 + 2.1.0.Beta2 + target/scala-${scala.binary.version}/classes diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 5c6e8d32c5c8..1630016c5b8d 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -39,6 +39,8 @@ import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD} +import org.elasticsearch.spark._ + /** * A Java-friendly version of [[org.apache.spark.SparkContext]] that returns * [[org.apache.spark.api.java.JavaRDD]]s and works with Java collections instead of Scala ones. From a3cdbc4ff5798fdc8792c120a7b2753e1f402411 Mon Sep 17 00:00:00 2001 From: Winston Chen Date: Fri, 14 Nov 2014 14:37:55 -0800 Subject: [PATCH 2/4] suceeded in hooking up esRDD function --- .../apache/spark/api/java/JavaSparkContext.scala | 3 --- .../org/apache/spark/api/python/PythonRDD.scala | 7 +++++++ .../org/apache/spark/api/python/SerDeUtil.scala | 13 +++++++++++++ python/pyspark/context.py | 6 ++++++ 4 files changed, 26 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 1630016c5b8d..7b44c0ccad02 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -39,8 +39,6 @@ import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD} -import org.elasticsearch.spark._ - /** * A Java-friendly version of [[org.apache.spark.SparkContext]] that returns * [[org.apache.spark.api.java.JavaRDD]]s and works with Java collections instead of Scala ones. @@ -207,7 +205,6 @@ class JavaSparkContext(val sc: SparkContext) sc.textFile(path, minPartitions) - /** * Read a directory of text files from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI. Each file is read as a single record and returned in a diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 45beb8fc8c92..6be648161bd2 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -40,6 +40,8 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils +import org.elasticsearch.spark.rdd.api.java.JavaEsSpark + private[spark] class PythonRDD( @transient parent: RDD[_], command: Array[Byte], @@ -431,6 +433,11 @@ private[spark] object PythonRDD extends Logging { } } + def esRDD(sc: JavaSparkContext, resource: String, query: String) = { + val rdd = JavaEsSpark.esRDD(sc, resource, query).rdd + JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(rdd)) + } + /** * Create an RDD from a path using [[org.apache.hadoop.mapred.SequenceFileInputFormat]], * key and value class. diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala index a4153aaa926f..a4c4094a5c67 100644 --- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala @@ -216,6 +216,19 @@ private[spark] object SerDeUtil extends Logging { } } + def pairRDDToPython(rdd: RDD[(String, java.util.Map[String,Object])]): RDD[Array[Byte]] = { + val (keyFailed, valueFailed) = checkPickle(rdd.first()) + + rdd.mapPartitions { iter => + val cleaned = iter.map { case (k, v) => + val key = if (keyFailed) k.toString else k + val value = if (valueFailed) v.toString else v + Array[Any](key, value) + } + new AutoBatchedPickler(cleaned) + } + } + /** * Convert an RDD of serialized Python tuple (K, V) to RDD[(K, V)]. */ diff --git a/python/pyspark/context.py b/python/pyspark/context.py index faa5952258ae..87afb15ae59d 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -45,6 +45,8 @@ DEFAULT_CONFIGS = { "spark.serializer.objectStreamReset": 100, "spark.rdd.compress": True, + # added for ES + 'es.nodes': '104.131.165.122:9200' } @@ -570,6 +572,10 @@ def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None, jconf, batchSize) return RDD(jrdd, self) + def esRDD(self, resource, query): + es_rdd = self._jvm.PythonRDD.esRDD(self._jsc, resource, query) + return RDD(es_rdd, self) + def _checkpointFile(self, name, input_deserializer): jrdd = self._jsc.checkpointFile(name) return RDD(jrdd, self, input_deserializer) From d6c7807e3054f305645412c6c0ee2d89b650e1c5 Mon Sep 17 00:00:00 2001 From: Winston Chen Date: Fri, 14 Nov 2014 15:24:53 -0800 Subject: [PATCH 3/4] refine the function hack --- .../org/apache/spark/api/python/PythonRDD.scala | 2 +- .../org/apache/spark/api/python/SerDeUtil.scala | 13 ------------- 2 files changed, 1 insertion(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 6be648161bd2..aad9235301ea 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -435,7 +435,7 @@ private[spark] object PythonRDD extends Logging { def esRDD(sc: JavaSparkContext, resource: String, query: String) = { val rdd = JavaEsSpark.esRDD(sc, resource, query).rdd - JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(rdd)) + JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(rdd.asInstanceOf[RDD[(Any, Any)]], 0)) } /** diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala index a4c4094a5c67..a4153aaa926f 100644 --- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala @@ -216,19 +216,6 @@ private[spark] object SerDeUtil extends Logging { } } - def pairRDDToPython(rdd: RDD[(String, java.util.Map[String,Object])]): RDD[Array[Byte]] = { - val (keyFailed, valueFailed) = checkPickle(rdd.first()) - - rdd.mapPartitions { iter => - val cleaned = iter.map { case (k, v) => - val key = if (keyFailed) k.toString else k - val value = if (valueFailed) v.toString else v - Array[Any](key, value) - } - new AutoBatchedPickler(cleaned) - } - } - /** * Convert an RDD of serialized Python tuple (K, V) to RDD[(K, V)]. */ From e8c5412e973c4b19f06629621aa7cd7f6529f78f Mon Sep 17 00:00:00 2001 From: Winston Chen Date: Fri, 21 Nov 2014 12:40:22 -0800 Subject: [PATCH 4/4] remove the test node --- python/pyspark/context.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 87afb15ae59d..acf5c65cc18b 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -44,9 +44,7 @@ # the default ones for Spark if they are not configured by user. DEFAULT_CONFIGS = { "spark.serializer.objectStreamReset": 100, - "spark.rdd.compress": True, - # added for ES - 'es.nodes': '104.131.165.122:9200' + "spark.rdd.compress": True }