Skip to content

Commit

Permalink
IGNITE-3710 Upgrade ignite-spark module to Spark 2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Evgenii Zhuravlev authored and anton-vinogradov committed Feb 22, 2017
1 parent d77fe81 commit b78d354
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 12 deletions.
2 changes: 1 addition & 1 deletion DEVNOTES.txt
Expand Up @@ -33,7 +33,7 @@ Ignite Hadoop Accelerator Maven Build Instructions
mvn clean package -DskipTests -Dignite.edition=hadoop [-Dhadoop.version=X.X.X] [-Dspark.version=x.y.z]

Use 'hadoop.version' parameter to build Ignite against a specific Hadoop version.
Use 'spark.version' parameter to build ignite-spark module for a specific Spark version.
Use 'spark.version' parameter to build ignite-spark module for a specific Spark version. Version should be >= 2.0.0.

Look for apache-ignite-hadoop-<version>-bin.zip in ./target/bin directory. Resulting binary
assembly will also include integration module for Apache Spark.
Expand Down
14 changes: 14 additions & 0 deletions examples/pom.xml
Expand Up @@ -112,6 +112,8 @@
<!-- will be changed by profile activation. allows to combine profiles. -->
<lgpl.folder>src/main/java</lgpl.folder>
<java8.folder>src/main/java</java8.folder>
<spark.folder>src/main/java</spark.folder>
<spark.test.folder>src/test/java</spark.test.folder>
<lgpl.test.folder>src/test/java</lgpl.test.folder>
<java8.test.folder>src/test/java</java8.test.folder>
</properties>
Expand All @@ -120,6 +122,11 @@
<profile>
<id>scala</id>

<properties>
<spark.folder>src/main/spark</spark.folder>
<spark.test.folder>src/test/spark</spark.test.folder>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.ignite</groupId>
Expand Down Expand Up @@ -166,6 +173,11 @@
<profile>
<id>scala-2.10</id>

<properties>
<spark.folder>src/main/spark</spark.folder>
<spark.test.folder>src/test/spark</spark.test.folder>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.ignite</groupId>
Expand Down Expand Up @@ -273,6 +285,7 @@
<source>schema-import/src/main/java</source>
<source>${lgpl.folder}</source>
<source>${java8.folder}</source>
<source>${spark.folder}</source>
</sources>
</configuration>
</execution>
Expand All @@ -286,6 +299,7 @@
<configuration>
<sources>
<source>${lgpl.test.folder}</source>
<source>${spark.test.folder}</source>
<source>${java8.test.folder}</source>
</sources>
</configuration>
Expand Down
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.ignite.examples.java8.spark;
package org.apache.ignite.examples.spark;

import org.apache.ignite.spark.JavaIgniteContext;
import org.apache.ignite.spark.JavaIgniteRDD;
Expand All @@ -25,13 +25,14 @@
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* This example demonstrates how to create an JavaIgnitedRDD and share it with multiple spark workers. The goal of this
Expand Down Expand Up @@ -70,7 +71,11 @@ public static void main(String args[]) {
JavaIgniteRDD<Integer, Integer> sharedRDD = igniteContext.<Integer, Integer>fromCache("sharedRDD");

// Define data to be stored in the Ignite RDD (cache).
List<Integer> data = IntStream.range(0, 20).boxed().collect(Collectors.toList());
List<Integer> data = new ArrayList<>(20);

for (int i = 0; i<20; i++) {
data.add(i);
}

// Preparing a Java RDD.
JavaRDD<Integer> javaRDD = sparkContext.<Integer>parallelize(data);
Expand All @@ -85,16 +90,28 @@ public static void main(String args[]) {
System.out.println(">>> Iterating over Ignite Shared RDD...");

// Iterate over the Ignite RDD.
sharedRDD.foreach((x) -> System.out.println("(" + x._1 + "," + x._2 + ")"));
sharedRDD.foreach(new VoidFunction<Tuple2<Integer, Integer>>() {
@Override public void call(Tuple2<Integer, Integer> tuple) throws Exception {
System.out.println("(" + tuple._1 + "," + tuple._2 + ")");
}
});

System.out.println(">>> Transforming values stored in Ignite Shared RDD...");

// Filter out even values as a transformed RDD.
JavaPairRDD<Integer, Integer> transformedValues =
sharedRDD.filter((Tuple2<Integer, Integer> pair) -> pair._2() % 2 == 0);
sharedRDD.filter(new Function<Tuple2<Integer, Integer>, Boolean>() {
@Override public Boolean call(Tuple2<Integer, Integer> tuple) throws Exception {
return tuple._2() % 2 == 0;
}
});

// Print out the transformed values.
transformedValues.foreach((x) -> System.out.println("(" + x._1 + "," + x._2 + ")"));
transformedValues.foreach(new VoidFunction<Tuple2<Integer, Integer>>() {
@Override public void call(Tuple2<Integer, Integer> tuple) throws Exception {
System.out.println("(" + tuple._1 + "," + tuple._2 + ")");
}
});

System.out.println(">>> Executing SQL query over Ignite Shared RDD...");

Expand Down
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* <!-- Package description. -->
* Basic examples for ignite functionality with spark.
*/
package org.apache.ignite.examples.spark;
Expand Up @@ -26,7 +26,6 @@
import org.apache.ignite.java8.examples.EventsExamplesSelfTest;
import org.apache.ignite.java8.examples.IndexingBridgeMethodTest;
import org.apache.ignite.java8.examples.MessagingExamplesSelfTest;
import org.apache.ignite.java8.examples.SharedRDDExampleSelfTest;
import org.apache.ignite.testframework.GridTestUtils;

import static org.apache.ignite.IgniteSystemProperties.IGNITE_OVERRIDE_MCAST_GRP;
Expand All @@ -50,7 +49,6 @@ public static TestSuite suite() throws Exception {
suite.addTest(new TestSuite(IndexingBridgeMethodTest.class));
suite.addTest(new TestSuite(CacheExamplesSelfTest.class));
suite.addTest(new TestSuite(BasicExamplesSelfTest.class));
suite.addTest(new TestSuite(SharedRDDExampleSelfTest.class));

// suite.addTest(new TestSuite(ContinuationExamplesSelfTest.class));
// suite.addTest(new TestSuite(ContinuousMapperExamplesSelfTest.class));
Expand Down
Expand Up @@ -15,9 +15,9 @@
* limitations under the License.
*/

package org.apache.ignite.java8.examples;
package org.apache.ignite.spark.examples;

import org.apache.ignite.examples.java8.spark.SharedRDDExample;
import org.apache.ignite.examples.spark.SharedRDDExample;
import org.junit.Test;

/**
Expand Down
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.spark.testsuites;

import junit.framework.TestSuite;
import org.apache.ignite.spark.examples.SharedRDDExampleSelfTest;
import org.apache.ignite.testframework.GridTestUtils;

import static org.apache.ignite.IgniteSystemProperties.IGNITE_OVERRIDE_MCAST_GRP;

/**
* Examples test suite.
* <p>
* Contains only Spring ignite examples tests.
*/
public class IgniteExamplesSparkSelfTestSuite extends TestSuite {
/**
* @return Suite.
* @throws Exception If failed.
*/
public static TestSuite suite() throws Exception {
System.setProperty(IGNITE_OVERRIDE_MCAST_GRP,
GridTestUtils.getNextMulticastGroup(IgniteExamplesSparkSelfTestSuite.class));

TestSuite suite = new TestSuite("Ignite Examples Test Suite");

suite.addTest(new TestSuite(SharedRDDExampleSelfTest.class));

return suite;
}
}

0 comments on commit b78d354

Please sign in to comment.