In [1]:
# Spark Transformation  - map

x= sc.parallelize(["b", "a", "c"])
y= x.map(lambda z: (z, 1))
print(x.collect())
print(y.collect())

['b', 'a', 'c']
[('b', 1), ('a', 1), ('c', 1)]


In [1]:
// %scala
val x= sc.parallelize(Array("b", "a", "c"))
val y= x.map(z => (z,1))
println(x.collect().mkString(", "))
println(y.collect().mkString(", "))

b, a, c
(b,1), (a,1), (c,1)


x = ParallelCollectionRDD[0] at parallelize at <console>:28
y = MapPartitionsRDD[1] at map at <console>:29


MapPartitionsRDD[1] at map at <console>:29

In [2]:
# Spark Transformation  - flatMap
x= sc.parallelize([1,2,3])
y= x.flatMap(lambda x: (x, x*100, 42))
print(x.collect())
print(y.collect())

[1, 2, 3]
[1, 100, 42, 2, 200, 42, 3, 300, 42]


In [3]:
// %scala
val x= sc.parallelize(Array(1,2,3))
val y= x.flatMap(n => Array(n, n*100, 42))
println(x.collect().mkString(", "))
println(y.collect().mkString(", "))

1, 2, 3
1, 100, 42, 2, 200, 42, 3, 300, 42


x = ParallelCollectionRDD[0] at parallelize at <console>:28
y = MapPartitionsRDD[1] at flatMap at <console>:29


MapPartitionsRDD[1] at flatMap at <console>:29

In [3]:
# Spark Transformation  - filter

x= sc.parallelize([1,2,3])
y= x.filter(lambda x: x%2 == 1) #keep odd values
print(x.collect())
print(y.collect())

[1, 2, 3]
[1, 3]


In [4]:
// %scala
val x= sc.parallelize(Array(1,2,3))
val y= x.filter(n => n%2 == 1)
println(x.collect().mkString(", "))
println(y.collect().mkString(", "))

1, 2, 3
1, 3


x = ParallelCollectionRDD[2] at parallelize at <console>:31
y = MapPartitionsRDD[3] at filter at <console>:32


MapPartitionsRDD[3] at filter at <console>:32

In [4]:
# Spark Transformation  - groupBy

x= sc.parallelize(['John', 'Fred', 'Anna', 'James'])
y= x.groupBy(lambda w: w[0])
print ([(k, list(v)) for (k, v) in y.collect()])

[('F', ['Fred']), ('J', ['John', 'James']), ('A', ['Anna'])]


In [None]:
%scala
val x= sc.parallelize(
Array("John", "Fred", "Anna", "James"))
val y= x.groupBy(w => w.charAt(0))
println(y.collect().mkString(", "))

In [5]:
# Spark Transformation  - groupByKey


x= sc.parallelize([('B',5),('B',4),('A',3),('A',2),('A',1)])
y= x.groupByKey()
print(x.collect())
print(list((j[0], list(j[1])) for j in y.collect()))

[('B', 5), ('B', 4), ('A', 3), ('A', 2), ('A', 1)]
[('B', [5, 4]), ('A', [3, 2, 1])]


In [None]:
// %scala
val x= sc.parallelize(Array(('B',5),('B',4),('A',3),('A',2),('A',1)))
val y= x.groupByKey()
println(x.collect().mkString(", "))
println(y.collect().mkString(", "))

In [None]:
# ReduceByKey VS groupByKey..

// %scala
val words = Array("one", "two", "two", "three", "three", "three")
val wordPairsRDD= sc.parallelize(words).map(word => (word, 1))
val wordCountsWithReduce= wordPairsRDD
.reduceByKey(_ + _)
.collect()

val wordCountsWithGroup= wordPairsRDD
.groupByKey()
.map(t => (t._1, t._2.sum))
.collect()

In [7]:
# Spark Transformation  -  mapPartitions()

x= sc.parallelize([1,2,3], 2)
def f(iterator):yield sum(iterator); yield 42
y= x.mapPartitions(f)
# glom() flattens elements on the same partition
print(x.glom().collect())
print(y.glom().collect())

[[1], [2, 3]]
[[1, 42], [5, 42]]


In [None]:
// %scala
val x= sc.parallelize(Array(1,2,3), 2)
def f(i:Iterator[Int])={ (i.sum,42).productIterator}
val y= x.mapPartitions(f)
// glom() flattens elements on the same partition
val xOut= x.glom().collect()
val yOut= y.glom().collect()

In [8]:
# Spark Transformation  - mapPartitionsWithIndex

x= sc.parallelize([1,2,3], 2)
def f(partitionIndex, iterator):yield (partitionIndex, sum(iterator))
y= x.mapPartitionsWithIndex(f)
# glom() flattens elements on the same partition
print(x.glom().collect())
print(y.glom().collect())

[[1], [2, 3]]
[[(0, 1)], [(1, 5)]]


In [None]:
// %scala
val x= sc.parallelize(Array(1,2,3), 2)
def f(partitionIndex:Int, i:Iterator[Int]) = {
  (partitionIndex, i.sum).productIterator
  }
val y= x.mapPartitionsWithIndex(f)
// glom() flattens elements on the same partition
val xOut= x.glom().collect()
val yOut= y.glom().collect()

In [9]:
# Spark Transformation  - sample

x= sc.parallelize([1, 2, 3, 4, 5])
y= x.sample(False, 0.4, 42)
print(x.collect())
print(y.collect())

[1, 2, 3, 4, 5]
[2, 4, 5]


In [None]:
// %scala
val x= sc.parallelize(Array(1, 2, 3, 4, 5))
val y= x.sample(false, 0.4)
// omitting seed will yield different output
println(y.collect().mkString(", "))

In [10]:
# Spark Transformation  - union


x= sc.parallelize([1,2,3], 2)
y= sc.parallelize([3,4], 1)
z= x.union(y)
print(z.glom().collect())

[[1], [2, 3], [3, 4]]


In [1]:
// %scala
val x= sc.parallelize(Array(1,2,3), 2)
val y= sc.parallelize(Array(3,4), 1)
val z= x.union(y)
val zOut= z.glom().collect()

Name: Error parsing magics!
Message: Magics [scala] do not exist!
StackTrace: 