Skip to content
Permalink
Browse files

IGNITE-4526: Add Spark Shared RDD examples

Reviewed by Denis Magda <dmagda@apache.org>
  • Loading branch information...
manishatGit authored and dmagda committed Feb 15, 2017
1 parent 79e1e53 commit b461cb47882861356ede58775bd9e253dcf26202
@@ -0,0 +1,83 @@
<?xml version="1.0" encoding="UTF-8"?>

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<!--
Ignite Spring configuration file to startup Ignite cache.
This file demonstrates how to configure cache using Spring. Provided cache
will be created on node startup.
When starting a standalone node, you need to execute the following command:
{IGNITE_HOME}/bin/ignite.{bat|sh} examples/config/example-shared-rdd.xml
When starting Ignite from Java IDE, pass path to this file to Ignition:
Ignition.start("examples/config/example-shared-rdd.xml");
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">

<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="cacheConfiguration">
<!-- SharedRDD cache example configuration (Atomic mode). -->
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<!-- Set a cache name. -->
<property name="name" value="sharedRDD"/>
<!-- Set a cache mode. -->
<property name="cacheMode" value="PARTITIONED"/>
<!-- Index Integer pairs used in the example. -->
<property name="indexedTypes">
<list>
<value>java.lang.Integer</value>
<value>java.lang.Integer</value>
</list>
</property>
<!-- Set atomicity mode. -->
<property name="atomicityMode" value="ATOMIC"/>
<!-- Configure a number of backups. -->
<property name="backups" value="1"/>
</bean>
</property>

<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<!--
Ignite provides several options for automatic discovery that can be used
instead os static IP based discovery. For information on all options refer
to our documentation: http://apacheignite.readme.io/docs/cluster-config
-->
<!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
<!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>127.0.0.1:47500..47509</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>
@@ -17,7 +17,8 @@
limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
@@ -138,6 +139,18 @@
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-spark</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
<version>3.2.9.Final</version>
</dependency>
</dependencies>

<build>
@@ -172,6 +185,18 @@
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-spark_2.10</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
<version>3.2.9.Final</version>
</dependency>
</dependencies>

<build>
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.examples.java8.spark;

import org.apache.ignite.spark.JavaIgniteContext;
import org.apache.ignite.spark.JavaIgniteRDD;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import scala.Tuple2;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* This example demonstrates how to create an JavaIgnitedRDD and share it with multiple spark workers. The goal of this
* particular example is to provide the simplest code example of this logic.
* <p>
* This example will start Ignite in the embedded mode and will start an JavaIgniteContext on each Spark worker node.
* <p>
* The example can work in the standalone mode as well that can be enabled by setting JavaIgniteContext's
* {@code standalone} property to {@code true} and running an Ignite node separately with
* `examples/config/spark/example-shared-rdd.xml` config.
*/
public class SharedRDDExample {
/**
* Executes the example.
* @param args Command line arguments, none required.
*/
public static void main(String args[]) {
// Spark Configuration.
SparkConf sparkConf = new SparkConf()
.setAppName("JavaIgniteRDDExample")
.setMaster("local")
.set("spark.executor.instances", "2");

// Spark context.
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

// Adjust the logger to exclude the logs of no interest.
Logger.getRootLogger().setLevel(Level.ERROR);
Logger.getLogger("org.apache.ignite").setLevel(Level.INFO);

// Creates Ignite context with specific configuration and runs Ignite in the embedded mode.
JavaIgniteContext<Integer, Integer> igniteContext = new JavaIgniteContext<Integer, Integer>(
sparkContext,"examples/config/spark/example-shared-rdd.xml", false);

// Create a Java Ignite RDD of Type (Int,Int) Integer Pair.
JavaIgniteRDD<Integer, Integer> sharedRDD = igniteContext.<Integer, Integer>fromCache("sharedRDD");

// Define data to be stored in the Ignite RDD (cache).
List<Integer> data = IntStream.range(0, 20).boxed().collect(Collectors.toList());

// Preparing a Java RDD.
JavaRDD<Integer> javaRDD = sparkContext.<Integer>parallelize(data);

// Fill the Ignite RDD in with Int pairs. Here Pairs are represented as Scala Tuple2.
sharedRDD.savePairs(javaRDD.<Integer, Integer>mapToPair(new PairFunction<Integer, Integer, Integer>() {
@Override public Tuple2<Integer, Integer> call(Integer val) throws Exception {
return new Tuple2<Integer, Integer>(val, val);
}
}));

System.out.println(">>> Iterating over Ignite Shared RDD...");

// Iterate over the Ignite RDD.
sharedRDD.foreach((x) -> System.out.println("(" + x._1 + "," + x._2 + ")"));

System.out.println(">>> Transforming values stored in Ignite Shared RDD...");

// Filter out even values as a transformed RDD.
JavaPairRDD<Integer, Integer> transformedValues =
sharedRDD.filter((Tuple2<Integer, Integer> pair) -> pair._2() % 2 == 0);

// Print out the transformed values.
transformedValues.foreach((x) -> System.out.println("(" + x._1 + "," + x._2 + ")"));

System.out.println(">>> Executing SQL query over Ignite Shared RDD...");

// Execute SQL query over the Ignite RDD.
DataFrame df = sharedRDD.sql("select _val from Integer where _key < 9");

// Show the result of the execution.
df.show();

// Close IgniteContext on all the workers.
igniteContext.close(true);
}
}
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.scalar.examples.spark

import org.apache.ignite.spark.{IgniteContext, IgniteRDD}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}

/**
* This example demonstrates how to create an IgnitedRDD and share it with multiple spark workers.
* The goal of this particular example is to provide the simplest code example of this logic.
* <p>
* This example will start Ignite in the embedded mode and will start an IgniteContext on each Spark worker node.
* <p>
* The example can work in the standalone mode as well that can be enabled by setting IgniteContext's {@code isClient}
* property to {@code true} and running an Ignite node separately with `examples/config/spark/
* example-shared-rdd.xml` config.
* <p>
*/
object ScalarSharedRDDExample extends App {
// Spark Configuration.
private val conf = new SparkConf()
.setAppName("IgniteRDDExample")
.setMaster("local")
.set("spark.executor.instances", "2")

// Spark context.
val sparkContext = new SparkContext(conf)

// Adjust the logger to exclude the logs of no interest.
Logger.getRootLogger.setLevel(Level.ERROR)
Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)

// Defines spring cache Configuration path.
private val CONFIG = "examples/config/spark/example-shared-rdd.xml"

// Creates Ignite context with above configuration.
val igniteContext = new IgniteContext(sparkContext, CONFIG, false)

// Creates an Ignite Shared RDD of Type (Int,Int) Integer Pair.
val sharedRDD: IgniteRDD[Int, Int] = igniteContext.fromCache[Int, Int]("sharedRDD")

// Fill the Ignite Shared RDD in with Int pairs.
sharedRDD.savePairs(sparkContext.parallelize(1 to 100000, 10).map(i => (i, i)))

// Transforming Pairs to contain their Squared value.
sharedRDD.mapValues(x => (x * x))

// Retrieve sharedRDD back from the Cache.
val transformedValues: IgniteRDD[Int, Int] = igniteContext.fromCache("sharedRDD")

// Perform some transformations on IgniteRDD and print.
val squareAndRootPair = transformedValues.map { case (x, y) => (x, Math.sqrt(y.toDouble)) }

println(">>> Transforming values stored in Ignite Shared RDD...")

// Filter out pairs which square roots are less than 100 and
// take the first five elements from the transformed IgniteRDD and print them.
squareAndRootPair.filter(_._2 < 100.0).take(5).foreach(println)

println(">>> Executing SQL query over Ignite Shared RDD...")

// Execute a SQL query over the Ignite Shared RDD.
val df = transformedValues.sql("select _val from Integer where _val < 100 and _val > 9 ")

// Show ten rows from the result set.
df.show(10)

// Close IgniteContext on all workers.
igniteContext.close(true)

// Stop SparkContext.
sparkContext.stop()
}
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.java8.examples;

import org.apache.ignite.examples.java8.spark.SharedRDDExample;
import org.junit.Test;

/**
* SharedRDD examples self test.
*/
public class SharedRDDExampleSelfTest {
static final String[] EMPTY_ARGS = new String[0];
/**
* @throws Exception If failed.
*/
@Test
public void testSharedRDDExample() throws Exception {
SharedRDDExample.main(EMPTY_ARGS);
}

}
@@ -26,6 +26,7 @@
import org.apache.ignite.java8.examples.EventsExamplesSelfTest;
import org.apache.ignite.java8.examples.IndexingBridgeMethodTest;
import org.apache.ignite.java8.examples.MessagingExamplesSelfTest;
import org.apache.ignite.java8.examples.SharedRDDExampleSelfTest;
import org.apache.ignite.testframework.GridTestUtils;

import static org.apache.ignite.IgniteSystemProperties.IGNITE_OVERRIDE_MCAST_GRP;
@@ -49,6 +50,8 @@ public static TestSuite suite() throws Exception {
suite.addTest(new TestSuite(IndexingBridgeMethodTest.class));
suite.addTest(new TestSuite(CacheExamplesSelfTest.class));
suite.addTest(new TestSuite(BasicExamplesSelfTest.class));
suite.addTest(new TestSuite(SharedRDDExampleSelfTest.class));

// suite.addTest(new TestSuite(ContinuationExamplesSelfTest.class));
// suite.addTest(new TestSuite(ContinuousMapperExamplesSelfTest.class));
// suite.addTest(new TestSuite(DeploymentExamplesSelfTest.class));
Oops, something went wrong.

0 comments on commit b461cb4

Please sign in to comment.
You can’t perform that action at this time.