# Spark demo

Illustrates how to use the ammonite-spark library, that adds Spark support to (my [fork](https://github.com/alexarchambault/ammonite-shell) of) the [Ammonite](https://github.com/lihaoyi/Ammonite/) REPL or a notebook.

**Just a proof-of-concept of things that can be done.**

All of this was tested (and is unit tested, although not on Travis, see [1](https://github.com/alexarchambault/jupyter-scala/blob/master/kernel/src/test/scala/jupyter/scala/LocalSparkTests.scala), [2](https://github.com/alexarchambault/jupyter-scala/blob/master/kernel/src/test/scala/jupyter/scala/LocalClusterSparkTests.scala), [3](https://github.com/alexarchambault/jupyter-scala/blob/master/kernel/src/test/scala/jupyter/scala/StandAloneClusterSparkTests.scala)):
* locally (master like `local`),
* on a local cluster (`local-cluster[1,1,512]`), and
* on a (docker based :-|) standalone cluster (`spark://master:7077`).

This would deserve more large scale tests (ec2, ...). Use with caution. Nothing specific was done for YARN, I don't expect YARN clusters to work as is.

The examples come mostly from the spark-repl test suite: https://github.com/apache/spark/blob/master/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala.

We start from a bare scala 2.10 jupyter scala notebook. First, let's fetch ammonite-spark.

Notice the `1.3` in the dependency name, which corresponds to our spark version, and the `2.10.5` which is the full current scala version.

In [1]:
load.ivy("com.github.alexarchambault" % "ammonite-spark_1.3_2.10.5" % "0.3.1-SNAPSHOT")



:: problems summary ::

	Unable to reparse com.github.alexarchambault.jupyter#jupyter-scala-api_2.10.5;0.2.0-SNAPSHOT from sonatype-snapshots, using Wed May 20 03:09:16 CEST 2015

	Choosing sonatype-snapshots for com.github.alexarchambault.jupyter#jupyter-scala-api_2.10.5;0.2.0-SNAPSHOT

	Unable to reparse com.github.alexarchambault#ammonite-api_2.10.5;0.3.1-SNAPSHOT from sonatype-snapshots, using Wed May 20 01:57:50 CEST 2015

	Choosing sonatype-snapshots for com.github.alexarchambault#ammonite-api_2.10.5;0.3.1-SNAPSHOT

	Unable to reparse com.github.alexarchambault.jupyter#jupyter-api_2.10;0.2.0-SNAPSHOT from sonatype-snapshots, using Fri May 15 16:53:44 CEST 2015

	Choosing sonatype-snapshots for com.github.alexarchambault.jupyter#jupyter-api_2.10;0.2.0-SNAPSHOT

	Unable to reparse com.github.alexarchambault#ammonite-spark_1.3_2.10.5;0.3.1-SNAPSHOT from sonatype-snapshots, using Wed May 20 01:58:01 CEST 2015

	Choosing sonatype-snapshots for com.github.alexarchambault#ammonite-spark_1.3_2.10.5;0.3.1-SNAPSH




om.github.alexarchambault#ammonite-shell-api_2.10.5;0.3.1-SNAPSHOT from sonatype-snapshots, using Wed May 20 01:57:55 CEST 2015


:::: ERRORS
	unknown resolver null

	unknown resolver null

	unknown resolver null

	unknown resolver null

	unknown resolver null

	unknown resolver null

	unknown resolver null

	unknown resolver null

	unknown resolver null




	Choosing sonatype-snapshots for com.github.alexarchambault#ammonite-shell-api_2.10.5;0.3.1-SNAPSHOT





Now, we'll create a handle, of type `ammonite.spark.Spark`, which is able to grab info from the current interpreter, and uses it later to initialize spark.

Note the `@transient` annotation added to it, so that it will not get serialized when running closures on a cluster.

In [3]:
@transient val Spark = new ammonite.spark.Spark

log4j:WARN No appenders could be found for logger (org.eclipse.jetty.util.log).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.


[36mSpark[0m: [32mammonite.spark.Spark[0m = Spark(uninitialized)

A `SparkContext` is accessible through the `sc` method of the spark handle. It is *lazily* initialized, which means it is not yet, as we didn't call the `sc` method.

Before that, we'll setup the spark config, through a `SparkConf`.

In [4]:
Spark.withConf(_
  .setMaster("spark://master:7077")
  .set("spark.home", "/path-to-spark-distrib")
)



In [6]:
import Spark.sc

[32mimport [36mSpark.sc[0m

In [7]:
Spark.start() // equivalent to just calling sc, triggers its initialization

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/05/20 03:16:07 INFO Spark$$anon$2: Running Spark version 1.3.1
15/05/20 03:16:07 WARN Utils: Your hostname, pc-ubuntu resolves to a loopback address: 127.0.1.1; using 192.168.0.15 instead (on interface eth0)
15/05/20 03:16:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
15/05/20 03:16:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/05/20 03:16:07 INFO SecurityManager: Changing view acls to: alexandre
15/05/20 03:16:07 INFO SecurityManager: Changing modify acls to: alexandre
15/05/20 03:16:07 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(alexandre); users with modify permissions: Set(alexandre)
15/05/20 03:16:07 INFO Slf4jLogger: Slf4jLogger started
15/05/20 03:16:07 INFO Remoting: Starting remoting
15/05/20 03:16:08 INFO Remoting:



Now the examples:

### Accumulator

In [9]:
val accum = sc.accumulator(0)

[36maccum[0m: [32morg.apache.spark.Accumulator[Int][0m = 0

In [10]:
sc.parallelize(1 to 10).foreach(x => accum += x)



In [11]:
accum.value

[36mres10[0m: [32mInt[0m = [32m55[0m

### Parallel function

In [12]:
def double(x: Int) = x + x

defined [32mfunction [36mdouble[0m

In [13]:
sc.parallelize(1 to 10).map(x => double(x)).collect().reduceLeft(_+_)

[36mres12[0m: [32mInt[0m = [32m110[0m

### Mutable variable

In [14]:
var v = 7

[36mv[0m: [32mInt[0m = [32m7[0m

In [15]:
def getV() = v

defined [32mfunction [36mgetV[0m

In [16]:
sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)

[36mres15[0m: [32mInt[0m = [32m70[0m

In [17]:
v = 10



In [18]:
sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)

[36mres17[0m: [32mInt[0m = [32m100[0m

### Defining your own classes is ok

and more complex definions should too

In [19]:
case class Sum(exp: String, exp2: String)

defined [32mclass [36mSum[0m

In [20]:
val a = Sum("A", "B")

[36ma[0m: [32mcmd19.INSTANCE.$ref$cmd18.Sum[0m = [33mSum[0m([32m"A"[0m, [32m"B"[0m)

In [21]:
def b(a: Sum): String = a match { case Sum(_, _) => "Found Sum" }

defined [32mfunction [36mb[0m

In [22]:
b(a)

[36mres21[0m: [32mString[0m = [32m"Found Sum"[0m

### SparkSQL :|

Spark SQL doesn't work from here because of https://issues.apache.org/jira/browse/SPARK-5281. It should be fixed in the next Spark releases.

In [26]:
import Spark.sqlContext
import sqlContext.implicits._

[32mimport [36mSpark.sqlContext[0m
[32mimport [36msqlContext.implicits._[0m

In [27]:
case class TestCaseClass(value: Int)

defined [32mclass [36mTestCaseClass[0m

In [28]:
sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF().collect()

: 

### Unused non serializable things are fine

If they are not used in parallel calculations, they do not prevent serialization.

In [29]:
class TestClass() { def testMethod = 3; override def toString = "TestClass" }

defined [32mclass [36mTestClass[0m

In [1]:
// not serializable
val t = new TestClass

: 

In [31]:
import t.testMethod

[32mimport [36mt.testMethod[0m

In [32]:
// serializable
case class TestCaseClass(value: Int)

defined [32mclass [36mTestCaseClass[0m

In [33]:
// some parallel calculations with the serializable class
sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect()

[36mres32[0m: [32mscala.Array[cmd32.INSTANCE.$ref$cmd31.TestCaseClass][0m = [33mArray[0m(
  [33mTestCaseClass[0m([32m1[0m),
  [33mTestCaseClass[0m([32m2[0m),
  [33mTestCaseClass[0m([32m3[0m),
  [33mTestCaseClass[0m([32m4[0m),
  [33mTestCaseClass[0m([32m5[0m),
  [33mTestCaseClass[0m([32m6[0m),
  [33mTestCaseClass[0m([32m7[0m),
  [33mTestCaseClass[0m([32m8[0m),
  [33mTestCaseClass[0m([32m9[0m),
  [33mTestCaseClass[0m([32m10[0m)
)

**TODO** jupyter-scala: handle fully cross-versioned artifact with a smoother syntax

**TODO** jupyter-scala: interpreter API to add config files to the classpath (e.g. log4j config below)

**TODO** jupyter-scala: is the `@transient` still necessary?

**TODO** jupyter-scala: do we still have to be cautious not to return the SparkConf? It should be ok now that we filter imports.

**TODO** jupyter-spark: do like spark with hostname, do not use a loopback address