### Note Outputs are not in order due to Toree bug!

Spark is a distributed computing engine and its main abstraction is a resilient distributed dataset (RDD), which can be
viewed as a distributed collection. Basically, RDD's elements are partitioned across the nodes of the cluster, but Spark
abstracts this away from the user, letting the user interact with the RDD (collection) as if it were a local one.

Not to get into too many details, but when you run different transformations on a RDD (map, flatMap, filter and others),
your transformation code (closure) is:

1. Serialized on the driver node,
2. Shipped to the appropriate nodes in the cluster,
3. Deserialized, and
4. Finally executed on the nodes

You can of course run this locally, but all those phases (apart from shipping over network) still
occur. [This lets you catch any bugs even before deploying to production]

Links: http://stackoverflow.com/questions/22592811/task-not-serializable-java-io-notserializableexception-when-calling-function-ou


In [1]:
import org.apache.spark.SparkContext

In [10]:
val listRDD = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 2)
def increment(x: Int) = x + 1
println("simpleRDDManipulation")
val incrementedListRDD = listRDD.map(increment(_))
incrementedListRDD.foreach(println)

simpleRDDManipulation


In [2]:
//You know I am going to take your application down ;)
class ClassRDDManupulation_Ver1 {
  def listIncrement(sc: org.apache.spark.SparkContext) = {

    val listRDD = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 2)
    def increment(x: Int) = x + 1
    println("ClassRDDManupulation_Ver1")
    val incrementedListRDD = listRDD.map(increment(_))
    incrementedListRDD.foreach(println)
  }
}

In [3]:
//So why did this made the crash? Neither the class or the function is serializable
val classRDDManupulation_Ver1 = new ClassRDDManupulation_Ver1
classRDDManupulation_Ver1.listIncrement(sc)

ClassRDDManupulation_Ver1


Name: org.apache.spark.SparkException
Message: Task not serializable
StackTrace:   at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2290)
  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.map(RDD.scala:369)
  at ClassRDDManupulation_Ver1.listIncrement(<console>:22)
  ... 42 elided
Caused by: java.io.NotSerializableException: ClassRDDManupulation_Ver1
Serialization stack:
	- obj

2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
2
3
4
7
5
8
6
9
10
11


In [4]:
class ClassRDDManupulation_Ver2 extends java.io.Serializable { //Someone packed me! huh I am gonna travel
    def listIncrement(sc : org.apache.spark.SparkContext) = {

      val listRDD = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 2)
      def increment(x: Int) = x + 10
      println("ClassRDDManupulation_Ver2")
      val incrementedListRDD = listRDD.map(increment(_))
      incrementedListRDD.foreach(println)
    }
}

In [5]:
val classRDDManupulation_Ver2 = new ClassRDDManupulation_Ver2
classRDDManupulation_Ver2.listIncrement(sc)

ClassRDDManupulation_Ver2
[Stage 0:>                                                          (0 + 2) / 2]

In [6]:
//Only the increment anonymous function is serialized by default by Spark
class ClassRDDManupulation_Ver3 {
  def listIncrement(sc : org.apache.spark.SparkContext) = {

    val listRDD = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 2)
    val increment = (x: Int) => x + 100 //Hey there... now I am a first class citizen!
    println("ClassRDDManupulation_Ver3")
    val incrementedListRDD = listRDD.map(increment(_))
    incrementedListRDD.foreach(println)
  }
}

In [7]:
val classRDDManupulation_Ver3 = new ClassRDDManupulation_Ver3
classRDDManupulation_Ver3.listIncrement(sc)

ClassRDDManupulation_Ver3


In [8]:
//By default whole obejct is serialized
object ClassRDDManupulation_Ver4 {
  def listIncrement(sc : org.apache.spark.SparkContext) = {

    val listRDD = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 2)
    def increment(x: Int) = x + 1000
    println("ClassRDDManupulation_Ver4")
    val incrementedListRDD = listRDD.map(increment(_))
    incrementedListRDD.foreach(println)
  }
}

In [9]:
ClassRDDManupulation_Ver4.listIncrement(sc)

ClassRDDManupulation_Ver4


Name: org.apache.spark.SparkException
Message: Task not serializable
StackTrace:   at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2290)
  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.map(RDD.scala:369)
  at ClassRDDManupulation_Ver4$.listIncrement(<console>:35)
  ... 42 elided
Caused by: java.io.NotSerializableException: ClassRDDManupulation_Ver4$
Serialization stack:
	- o