Skip to content

Commit

Permalink
IGNITE-3710 Upgrade ignite-spark module to Spark 2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Evgenii Zhuravlev authored and anton-vinogradov committed Feb 20, 2017
1 parent 76f3060 commit 8613c16
Show file tree
Hide file tree
Showing 7 changed files with 270 additions and 28 deletions.
Expand Up @@ -26,7 +26,7 @@
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Dataset;
import scala.Tuple2;

import java.util.List;
Expand Down Expand Up @@ -99,7 +99,7 @@ public static void main(String args[]) {
System.out.println(">>> Executing SQL query over Ignite Shared RDD...");

// Execute SQL query over the Ignite RDD.
DataFrame df = sharedRDD.sql("select _val from Integer where _key < 9");
Dataset df = sharedRDD.sql("select _val from Integer where _key < 9");

// Show the result of the execution.
df.show();
Expand Down
54 changes: 54 additions & 0 deletions modules/spark-2.10/pom.xml
Expand Up @@ -61,12 +61,66 @@
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-unsafe_2.10</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson2.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-network-shuffle_2.10</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson2.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_2.10</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.10</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>${hadoop.version}</version>
</dependency>

<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-core_2.11</artifactId>
<version>3.5.0</version>
</dependency>

<!-- Test dependencies -->

<dependency>
Expand Down
183 changes: 181 additions & 2 deletions modules/spark/pom.xml
Expand Up @@ -52,7 +52,7 @@
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.7</version>
<version>2.11.8</version>
</dependency>

<dependency>
Expand All @@ -61,12 +61,54 @@
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.11</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-network-common_2.11</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-network-shuffle_2.11</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson2.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson2.version}</version>
</dependency>

<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-core_2.11</artifactId>
<version>3.5.0</version>
</dependency>

<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-indexing</artifactId>
Expand All @@ -89,7 +131,7 @@
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.11</artifactId>
<version>2.2.4</version>
<version>2.2.6</version>
<scope>test</scope>
<exclusions>
<exclusion>
Expand All @@ -98,6 +140,143 @@
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-unsafe_2.11</artifactId>
<version>${spark.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-launcher_2.11</artifactId>
<version>${spark.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_2.11</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-unsafe_2.10</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.5</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.29.Final</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
<version>2.20</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill_2.11</artifactId>
<version>0.8.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>3.0.2</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-json</artifactId>
<version>3.1.2</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.tomcat</groupId>
<artifactId>tomcat-servlet-api</artifactId>
<version>8.0.23</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.glassfish.jersey.containers</groupId>
<artifactId>jersey-container-servlet-core</artifactId>
<version>2.25</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson2.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.11</artifactId>
<version>${jackson2.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-asm5-shaded</artifactId>
<version>4.5</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<version>1.3.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.9.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>3.0.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>commons-compiler</artifactId>
<version>3.0.0</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Expand Up @@ -22,7 +22,8 @@ import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration}
import org.apache.ignite.internal.IgnitionEx
import org.apache.ignite.internal.util.IgniteUtils
import org.apache.spark.sql.SQLContext
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.SparkContext
import org.apache.log4j.Logger

/**
* Ignite context.
Expand All @@ -34,7 +35,7 @@ class IgniteContext(
@transient val sparkContext: SparkContext,
cfgF: () IgniteConfiguration,
standalone: Boolean = true
) extends Serializable with Logging {
) extends Serializable {
private val cfgClo = new Once(cfgF)

private val igniteHome = IgniteUtils.getIgniteHome
Expand All @@ -47,7 +48,7 @@ class IgniteContext(
if (workers <= 0)
throw new IllegalStateException("No Spark executors found to start Ignite nodes.")

logInfo("Will start Ignite nodes on " + workers + " workers")
Logging.log.info("Will start Ignite nodes on " + workers + " workers")

// Start ignite server node on each worker in server mode.
sparkContext.parallelize(1 to workers, workers).foreachPartition(it ignite())
Expand Down Expand Up @@ -126,7 +127,7 @@ class IgniteContext(
val home = IgniteUtils.getIgniteHome

if (home == null && igniteHome != null) {
logInfo("Setting IGNITE_HOME from driver not as it is not available on this worker: " + igniteHome)
Logging.log.info("Setting IGNITE_HOME from driver not as it is not available on this worker: " + igniteHome)

IgniteUtils.nullifyHomeDirectory()

Expand All @@ -143,7 +144,7 @@ class IgniteContext(
}
catch {
case e: IgniteException
logError("Failed to start Ignite.", e)
Logging.log.error("Failed to start Ignite.", e)

throw e
}
Expand All @@ -161,7 +162,7 @@ class IgniteContext(
sparkContext.getExecutorStorageStatus.length)

if (workers > 0) {
logInfo("Will stop Ignite nodes on " + workers + " workers")
Logging.log.info("Will stop Ignite nodes on " + workers + " workers")

// Start ignite server node on each worker in server mode.
sparkContext.parallelize(1 to workers, workers).foreachPartition(it doClose())
Expand Down Expand Up @@ -200,3 +201,12 @@ private class Once(clo: () ⇒ IgniteConfiguration) extends Serializable {
res
}
}

/**
* Spark uses log4j by default. Using this logger in IgniteContext as well.
*
* This object is used to avoid problems with log4j serialization.
*/
object Logging extends Serializable {
@transient lazy val log = Logger.getLogger(classOf[IgniteContext])
}

0 comments on commit 8613c16

Please sign in to comment.