Skip to content

Commit

Permalink
[SPARK] Add warning about inconsistent environment
Browse files Browse the repository at this point in the history
When using the connector jar for Spark 1.0-1.2 in a Spark 1.3+
environment, show a warning. Same for the reverse situation.
Additionally, do a cheap check on invalid or empty RDDs to avoid calls
over the network for nothing.

relates #415
  • Loading branch information
costin committed Apr 24, 2015
1 parent 555d803 commit d90ca4e
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 23 deletions.
12 changes: 7 additions & 5 deletions spark/core/main/scala/org/elasticsearch/spark/package.scala
@@ -1,36 +1,38 @@
package org.elasticsearch;

import scala.reflect.ClassTag

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.elasticsearch.spark.rdd.EsSpark
import org.elasticsearch.hadoop.util.ObjectUtils


package object spark {

private val init = { ObjectUtils.loadClass("org.elasticsearch.spark.rdd.CompatUtils", classOf[ObjectUtils].getClassLoader) }

implicit def sparkContextFunctions(sc: SparkContext)= new SparkContextFunctions(sc)

class SparkContextFunctions(sc: SparkContext) extends Serializable {
def esRDD() = EsSpark.esRDD(sc)
def esRDD(resource: String) = EsSpark.esRDD(sc, resource)
def esRDD(resource: String, query: String) = EsSpark.esRDD(sc, resource, query)
def esRDD(cfg: scala.collection.Map[String, String]) = EsSpark.esRDD(sc, cfg)

def esJsonRDD() = EsSpark.esJsonRDD(sc)
def esJsonRDD(resource: String) = EsSpark.esJsonRDD(sc, resource)
def esJsonRDD(resource: String, query: String) = EsSpark.esJsonRDD(sc, resource, query)
def esJsonRDD(cfg: scala.collection.Map[String, String]) = EsSpark.esJsonRDD(sc, cfg)
}

implicit def sparkRDDFunctions[T : ClassTag](rdd: RDD[T]) = new SparkRDDFunctions[T](rdd)

class SparkRDDFunctions[T : ClassTag](rdd: RDD[T]) extends Serializable {
def saveToEs(resource: String) { EsSpark.saveToEs(rdd, resource) }
def saveToEs(resource: String, cfg: scala.collection.Map[String, String]) { EsSpark.saveToEs(rdd, resource, cfg) }
def saveToEs(cfg: scala.collection.Map[String, String]) { EsSpark.saveToEs(rdd, cfg) }
}

implicit def sparkStringJsonRDDFunctions(rdd: RDD[String]) = new SparkJsonRDDFunctions[String](rdd)
implicit def sparkByteArrayJsonRDDFunctions(rdd: RDD[Array[Byte]]) = new SparkJsonRDDFunctions[Array[Byte]](rdd)

Expand All @@ -39,7 +41,7 @@ package object spark {
def saveJsonToEs(resource: String, cfg: scala.collection.Map[String, String]) { EsSpark.saveJsonToEs(rdd, resource, cfg) }
def saveJsonToEs(cfg: scala.collection.Map[String, String]) { EsSpark.saveJsonToEs(rdd, cfg) }
}

implicit def sparkPairRDDFunctions[K : ClassTag, V : ClassTag](rdd: RDD[(K,V)]) = new SparkPairRDDFunctions[K,V](rdd)

class SparkPairRDDFunctions[K : ClassTag, V : ClassTag](rdd: RDD[(K,V)]) extends Serializable {
Expand Down
Expand Up @@ -4,7 +4,6 @@ import scala.collection.JavaConversions.collectionAsScalaIterable
import scala.collection.JavaConversions.mapAsJavaMap
import scala.collection.JavaConverters._
import scala.reflect.ClassTag

import org.apache.commons.logging.LogFactory
import org.apache.spark.Partition
import org.apache.spark.SparkContext
Expand All @@ -13,12 +12,15 @@ import org.apache.spark.rdd.RDD
import org.elasticsearch.hadoop.rest.RestService
import org.elasticsearch.hadoop.rest.RestService.PartitionDefinition
import org.elasticsearch.spark.cfg.SparkSettingsManager
import org.elasticsearch.hadoop.util.ObjectUtils

private[spark] abstract class AbstractEsRDD[T: ClassTag](
@transient sc: SparkContext,
val params: scala.collection.Map[String, String] = Map.empty)
extends RDD[T](sc, Nil) {

private val init = { ObjectUtils.loadClass("org.elasticsearch.spark.rdd.CompatUtils", classOf[ObjectUtils].getClassLoader) }

protected var logger = LogFactory.getLog(this.getClass())

override def getPartitions: Array[Partition] = {
Expand All @@ -33,8 +35,8 @@ private[spark] abstract class AbstractEsRDD[T: ClassTag](

override def getPreferredLocations(split: Partition): Seq[String] = {
val esSplit = split.asInstanceOf[EsPartition]
val ip = esSplit.esPartition.nodeIp
if (ip != null) Seq(ip) else Nil
val ip = esSplit.esPartition.nodeIp
if (ip != null) Seq(ip) else Nil
}

override def checkpoint() {
Expand Down
43 changes: 41 additions & 2 deletions spark/core/main/scala/org/elasticsearch/spark/rdd/CompatUtils.java
@@ -1,8 +1,10 @@
package org.elasticsearch.spark.rdd;

import java.lang.reflect.Field;
import java.lang.reflect.Method;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.util.TaskCompletionListener;
Expand All @@ -17,8 +19,6 @@ abstract class CompatUtils {

private static final boolean SPARK_11_AVAILABLE = ObjectUtils.isClassPresent("org.apache.spark.util.TaskCompletionListener", SparkConf.class.getClassLoader());

//public static final boolean SPARK_12_AVAILABLE = ObjectUtils.isClassPresent("org.apache.spark.sql.catalyst.types.BinaryType", SparkConf.class.getClassLoader());

private static final Class<?> SCHEMA_RDD_LIKE_CLASS;

static {
Expand All @@ -29,6 +29,45 @@ abstract class CompatUtils {
// ignore
}
SCHEMA_RDD_LIKE_CLASS = clz;

// apply the warning when the class is loaded (to cover all access points)

// check whether the correct es-hadoop is used with the correct Spark version
boolean isSpark13 = ObjectUtils.isClassPresent("org.apache.spark.sql.DataFrame", SparkConf.class.getClassLoader());
boolean isEshForSpark13 = !ObjectUtils.isClassPresent("org.elasticsearch.spark.sql.EsSchemaRDDWriter", CompatUtils.class.getClassLoader());

// XOR can be applied as well but != increases readability
if (isSpark13 != isEshForSpark13) {

// need SparkContext which requires context
// as such do another reflex dance

String sparkVersion = null;

// Spark 1.0 - 1.1: SparkContext$.MODULE$.SPARK_VERSION();
// Spark 1.2+ : package$.MODULE$.SPARK_VERSION();
Object target = org.apache.spark.SparkContext$.MODULE$;
Method sparkVersionMethod = ReflectionUtils.findMethod(target.getClass(), "SPARK_VERSION");

if (sparkVersionMethod == null) {
target = org.apache.spark.package$.MODULE$;
sparkVersionMethod = ReflectionUtils.findMethod(target.getClass(), "SPARK_VERSION");
}

if (sparkVersionMethod == null) {
sparkVersion = (isSpark13 ? "1.3+" : "1.0-1.2");
}
else {
sparkVersion = ReflectionUtils.<String> invoke(sparkVersionMethod, target);
}

LogFactory.getLog("org.elasticsearch.spark.rdd.EsSpark").
warn(String.format("Incorrect classpath detected; Elasticsearch Spark compiled for Spark %s but used with Spark %s",
(isEshForSpark13 ? "1.3 (or higher)" : "1.0-1.2"),
sparkVersion
));
}

}

private static abstract class Spark10TaskContext {
Expand Down
24 changes: 16 additions & 8 deletions spark/core/main/scala/org/elasticsearch/spark/rdd/EsSpark.scala
Expand Up @@ -18,23 +18,23 @@ import org.apache.commons.logging.LogFactory
object EsSpark {
//
// Load methods
//
//

def esRDD(sc: SparkContext): RDD[(String, Map[String, AnyRef])] = new ScalaEsRDD[Map[String, AnyRef]](sc)
def esRDD(sc: SparkContext, cfg: Map[String, String]): RDD[(String, Map[String, AnyRef])] =
def esRDD(sc: SparkContext, cfg: Map[String, String]): RDD[(String, Map[String, AnyRef])] =
new ScalaEsRDD[Map[String, AnyRef]](sc, cfg)
def esRDD(sc: SparkContext, resource: String): RDD[(String, Map[String, AnyRef])] =
def esRDD(sc: SparkContext, resource: String): RDD[(String, Map[String, AnyRef])] =
new ScalaEsRDD[Map[String, AnyRef]](sc, Map(ES_RESOURCE_READ -> resource))
def esRDD(sc: SparkContext, resource: String, query: String): RDD[(String, Map[String, AnyRef])] =
def esRDD(sc: SparkContext, resource: String, query: String): RDD[(String, Map[String, AnyRef])] =
new ScalaEsRDD[Map[String, AnyRef]](sc, Map(ES_RESOURCE_READ -> resource, ES_QUERY -> query))

// load data as JSON
def esJsonRDD(sc: SparkContext): RDD[(String, String)] = new ScalaEsRDD[String](sc, Map(ES_OUTPUT_JSON -> true.toString))
def esJsonRDD(sc: SparkContext, cfg: Map[String, String]): RDD[(String, String)] =
def esJsonRDD(sc: SparkContext, cfg: Map[String, String]): RDD[(String, String)] =
new ScalaEsRDD[String](sc, collection.mutable.Map(cfg.toSeq: _*) += (ES_OUTPUT_JSON -> true.toString))
def esJsonRDD(sc: SparkContext, resource: String): RDD[(String, String)] =
def esJsonRDD(sc: SparkContext, resource: String): RDD[(String, String)] =
new ScalaEsRDD[String](sc, Map(ES_RESOURCE_READ -> resource, ES_OUTPUT_JSON -> true.toString))
def esJsonRDD(sc: SparkContext, resource: String, query: String): RDD[(String, String)] =
def esJsonRDD(sc: SparkContext, resource: String, query: String): RDD[(String, String)] =
new ScalaEsRDD[String](sc, Map(ES_RESOURCE_READ -> resource, ES_QUERY -> query, ES_OUTPUT_JSON -> true.toString))

//
Expand All @@ -46,7 +46,11 @@ object EsSpark {
}
def saveToEs(rdd: RDD[_], cfg: Map[String, String]) {
CompatUtils.warnSchemaRDD(rdd, LogFactory.getLog("org.elasticsearch.spark.rdd.EsSpark"))


if (rdd == null || rdd.isEmpty()) {
return
}

val sparkCfg = new SparkSettingsManager().load(rdd.sparkContext.getConf)
val config = new PropertiesSettings().load(sparkCfg.save())
config.merge(cfg.asJava)
Expand All @@ -62,6 +66,10 @@ object EsSpark {
def saveToEsWithMeta[K,V](rdd: RDD[(K,V)], cfg: Map[String, String]) {
CompatUtils.warnSchemaRDD(rdd, LogFactory.getLog("org.elasticsearch.spark.rdd.EsSpark"))

if (rdd == null || rdd.isEmpty()) {
return
}

val sparkCfg = new SparkSettingsManager().load(rdd.sparkContext.getConf)
val config = new PropertiesSettings().load(sparkCfg.save())
config.merge(cfg.asJava)
Expand Down
Expand Up @@ -31,23 +31,27 @@ object EsSparkSQL {
def esRDD(jsc: JavaSQLContext): JavaSchemaRDD = esRDD(jsc, Map.empty[String, String])
def esRDD(jsc: JavaSQLContext, resource: String): JavaSchemaRDD = esRDD(jsc, Map(ES_RESOURCE_READ -> resource))
def esRDD(jsc: JavaSQLContext, resource: String, query: String): JavaSchemaRDD = esRDD(jsc, Map(ES_RESOURCE_READ -> resource, ES_QUERY -> query))
def esRDD(jsc: JavaSQLContext, cfg: Map[String, String]): JavaSchemaRDD = {
def esRDD(jsc: JavaSQLContext, cfg: Map[String, String]): JavaSchemaRDD = {
val rowRDD = new JavaEsRowRDD(jsc.sqlContext.sparkContext, cfg)
val schema = Utils.asJavaDataType(MappingUtils.discoverMapping(rowRDD.esCfg)).asInstanceOf[JStructType]
jsc.applySchema(rowRDD, schema)
}

def saveToEs(srdd: SchemaRDD, resource: String) {
saveToEs(srdd, Map(ES_RESOURCE_WRITE -> resource))
}
def saveToEs(srdd: SchemaRDD, resource: String, cfg: Map[String, String]) {
saveToEs(srdd, collection.mutable.Map(cfg.toSeq: _*) += (ES_RESOURCE_WRITE -> resource))
}
def saveToEs(srdd: SchemaRDD, cfg: Map[String, String]) {
if (srdd == null || srdd.count() == 0) {
return
}

val sparkCfg = new SparkSettingsManager().load(srdd.sparkContext.getConf)
val esCfg = new PropertiesSettings().load(sparkCfg.save())
esCfg.merge(cfg.asJava)

srdd.sparkContext.runJob(srdd, new EsSchemaRDDWriter(srdd.schema, esCfg.save()).write _)
}
}
Expand Up @@ -2,7 +2,6 @@ package org.elasticsearch.spark.sql

import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.collection.Map

import org.apache.spark.annotation.AlphaComponent
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.java.JavaRDD.fromRDD
Expand All @@ -13,9 +12,12 @@ import org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_RESOURCE_READ
import org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_RESOURCE_WRITE
import org.elasticsearch.hadoop.cfg.PropertiesSettings
import org.elasticsearch.spark.cfg.SparkSettingsManager
import org.elasticsearch.hadoop.util.ObjectUtils

object EsSparkSQL {

private val init = { ObjectUtils.loadClass("org.elasticsearch.spark.rdd.CompatUtils", classOf[ObjectUtils].getClassLoader) }

def esDF(sc: SQLContext): DataFrame = esDF(sc, Map.empty[String, String])
def esDF(sc: SQLContext, resource: String): DataFrame = esDF(sc, Map(ES_RESOURCE_READ -> resource))
def esDF(sc: SQLContext, resource: String, query: String): DataFrame = esDF(sc, Map(ES_RESOURCE_READ -> resource, ES_QUERY -> query))
Expand All @@ -32,11 +34,15 @@ object EsSparkSQL {
saveToEs(srdd, collection.mutable.Map(cfg.toSeq: _*) += (ES_RESOURCE_WRITE -> resource))
}
def saveToEs(srdd: DataFrame, cfg: Map[String, String]) {
if (srdd == null || srdd.count() == 0) {
return
}

val sparkCtx = srdd.sqlContext.sparkContext
val sparkCfg = new SparkSettingsManager().load(sparkCtx.getConf)
val esCfg = new PropertiesSettings().load(sparkCfg.save())
esCfg.merge(cfg.asJava)

sparkCtx.runJob(srdd.rdd, new EsDataFrameWriter(srdd.schema, esCfg.save()).write _)
}
}

0 comments on commit d90ca4e

Please sign in to comment.