Skip to content

Commit

Permalink
[SPARK] Fixing version warning for Spark Library level
Browse files Browse the repository at this point in the history
  • Loading branch information
jbaiera committed Aug 17, 2016
1 parent 392ddcd commit 079d846
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 25 deletions.
72 changes: 47 additions & 25 deletions spark/core/main/scala/org/elasticsearch/spark/rdd/CompatUtils.java
Expand Up @@ -7,6 +7,7 @@
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.util.TaskCompletionListener;
import org.elasticsearch.hadoop.EsHadoopIllegalStateException;
import org.elasticsearch.hadoop.util.ObjectUtils;
import org.elasticsearch.hadoop.util.ReflectionUtils;

Expand All @@ -28,41 +29,62 @@ abstract class CompatUtils {
// apply the warning when the class is loaded (to cover all access points)

// check whether the correct es-hadoop is used with the correct Spark version
boolean isSpark13 = ObjectUtils.isClassPresent("org.apache.spark.sql.DataFrame", SparkConf.class.getClassLoader());
boolean isEshForSpark13 = !ObjectUtils.isClassPresent("org.elasticsearch.spark.sql.EsSchemaRDDWriter", CompatUtils.class.getClassLoader());
checkSparkLibraryCompatibility(false);
}

// XOR can be applied as well but != increases readability
if (isSpark13 != isEshForSpark13) {
static void checkSparkLibraryCompatibility(boolean throwOnIncompatible) {
// check whether the correct es-hadoop is used with the correct Spark version
boolean isSpark13Level = ObjectUtils.isClassPresent("org.apache.spark.sql.DataFrame", SparkConf.class.getClassLoader());
boolean isSpark20Level = ObjectUtils.isClassPresent("org.apache.spark.sql.streaming.StreamingQuery", SparkConf.class.getClassLoader());

// need SparkContext which requires context
// as such do another reflex dance
CompatibilityLevel compatibilityLevel = ObjectUtils.instantiate("org.elasticsearch.spark.sql.SparkSQLCompatibilityLevel", CompatUtils.class.getClassLoader());

String sparkVersion = null;
boolean isEshForSpark20 = "20".equals(compatibilityLevel.versionId());
String esSupportedSparkVersion = compatibilityLevel.versionDescription();

// Spark 1.0 - 1.1: SparkContext$.MODULE$.SPARK_VERSION();
// Spark 1.2+ : package$.MODULE$.SPARK_VERSION();
Object target = org.apache.spark.SparkContext$.MODULE$;
Method sparkVersionMethod = ReflectionUtils.findMethod(target.getClass(), "SPARK_VERSION");
String errorMessage = null;

if (sparkVersionMethod == null) {
target = org.apache.spark.package$.MODULE$;
sparkVersionMethod = ReflectionUtils.findMethod(target.getClass(), "SPARK_VERSION");
}
if (!(isSpark13Level || isSpark20Level)) {
String sparkVersion = getSparkVersionOr("1.0-1.2");
errorMessage = String.format("Incorrect classpath detected; Elasticsearch Spark compiled for Spark %s but used with unsupported Spark version %s",
esSupportedSparkVersion, sparkVersion);
} else if (isSpark20Level != isEshForSpark20) { // XOR can be applied as well but != increases readability
String sparkVersion = getSparkVersionOr(isSpark13Level ? "1.3-1.6" : "2.0+");
errorMessage = String.format("Incorrect classpath detected; Elasticsearch Spark compiled for Spark %s but used with Spark %s",
esSupportedSparkVersion, sparkVersion);
}

if (sparkVersionMethod == null) {
sparkVersion = (isSpark13 ? "1.3+" : "1.0-1.2");
}
else {
sparkVersion = ReflectionUtils.<String> invoke(sparkVersionMethod, target);
if (errorMessage != null) {
if (throwOnIncompatible) {
throw new EsHadoopIllegalStateException(errorMessage);
} else {
LogFactory.getLog("org.elasticsearch.spark.rdd.EsSpark").warn(errorMessage);
}
}
}

private static String getSparkVersionOr(String defaultValue) {
// need SparkContext which requires context
// as such do another reflex dance
String sparkVersion = null;

// Spark 1.0 - 1.1: SparkContext$.MODULE$.SPARK_VERSION();
// Spark 1.2+ : package$.MODULE$.SPARK_VERSION();
Object target = org.apache.spark.SparkContext$.MODULE$;
Method sparkVersionMethod = ReflectionUtils.findMethod(target.getClass(), "SPARK_VERSION");

if (sparkVersionMethod == null) {
target = org.apache.spark.package$.MODULE$;
sparkVersionMethod = ReflectionUtils.findMethod(target.getClass(), "SPARK_VERSION");
}

LogFactory.getLog("org.elasticsearch.spark.rdd.EsSpark").
warn(String.format("Incorrect classpath detected; Elasticsearch Spark compiled for Spark %s but used with Spark %s",
(isEshForSpark13 ? "1.3 (or higher)" : "1.0-1.2"),
sparkVersion
));
if (sparkVersionMethod != null) {
sparkVersion = ReflectionUtils.<String>invoke(sparkVersionMethod, target);
} else {
sparkVersion = defaultValue;
}

return sparkVersion;
}

static void addOnCompletition(TaskContext taskContext, final Function0<?> function) {
Expand Down
@@ -0,0 +1,6 @@
package org.elasticsearch.spark.rdd

trait CompatibilityLevel {
def versionId: String
def versionDescription: String
}
@@ -0,0 +1,11 @@
package org.elasticsearch.spark.sql

import org.elasticsearch.spark.rdd.CompatibilityLevel

/**
* For determining Spark Version Compatibility
*/
class SparkSQLCompatibilityLevel extends CompatibilityLevel {
val versionId = "13"
val versionDescription = "1.3-1.6"
}
@@ -0,0 +1,15 @@
package org.elasticsearch.spark.rdd

import org.junit.Test

/**
* Created by james.baiera on 8/17/16.
*/
class CompatibilityCheckTest {

@Test
def checkCompatibility: Unit = {
CompatUtils.checkSparkLibraryCompatibility(true)
}

}
@@ -0,0 +1,11 @@
package org.elasticsearch.spark.sql

import org.elasticsearch.spark.rdd.CompatibilityLevel

/**
* For determining Spark Version Compatibility
*/
class SparkSQLCompatibilityLevel extends CompatibilityLevel {
val versionId = "20"
val versionDescription = "2.0"
}
@@ -0,0 +1,15 @@
package org.elasticsearch.spark.rdd

import org.junit.Test

/**
* Created by james.baiera on 8/17/16.
*/
class CompatibilityCheckTest {

@Test
def checkCompatibility: Unit = {
CompatUtils.checkSparkLibraryCompatibility(true)
}

}

0 comments on commit 079d846

Please sign in to comment.