In [26]:
sc

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<SparkContext master=yarn appName=livy-session-0>

### Transformation Operations

- **Map function** -  It runs a function over each element of an RDD.

In [27]:
# Return a new RDD by applying a function to each element of this RDD.

# this is using the lambda functions (anonymous functions)
rdd = sc.parallelize(["b", "a", "c"])
rdd_upper = rdd.map(lambda x: x.upper())
rdd_upper.collect()
    

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['B', 'A', 'C']

In [28]:
# Map ... using a regular function
def upper_case( v ):
    return v.upper()
rdd = sc.parallelize(["b", "a", "c"])
rdd.map(upper_case).collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['B', 'A', 'C']

- **FlatMap function** -  It runs a function where the output of each element may not be a single element.

In [29]:
rdd = sc.parallelize(["b", "a", "c"])
rdd_upper = rdd.flatMap(lambda x: x.upper())
rdd_upper.collect()
    

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['B', 'A', 'C']

- **Filter function** -  This operation is useful to filter out the contents of an RDD based on a condition

In [36]:
#Return a new RDD containing only the elements that satisfy a predicate
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = rdd.filter(lambda x: x % 2 == 0)
rdd2.collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[2, 4]

- **Distinct function** -  This function will identify the unique elements in an RDD and put them in a new RDD

In [37]:
#Distinct Return a new RDD containing the distinct elements in this RDD.
sc.parallelize([1, 1, 2, 3]).distinct().collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[1, 2, 3]

In [38]:
# The result returned above will not be sorted, but in the random order
# If we want to get the data in sorted order in the driver memory
sorted(sc.parallelize([1, 4, 2, 3, 2]).distinct().collect())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[1, 2, 3, 4]

- **Applied example for Map, flatMap, Filter and Distinct transformations**

In [39]:
document1 = "Apache Spark is a unified analytics engine for large-scale data processing."
document2 = ""
document3 = "It is an open-source distributed-computing engine."
document4 = "Spark provides a productive environment for data analysis because of its lightning speed and support for various libraries."

rdd1 = sc.parallelize([document1,document2,document3,document4])
lines = rdd1.filter(lambda x:x!="")
lines.collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['Apache Spark is a unified analytics engine for large-scale data processing.', 'It is an open-source distributed-computing engine.', 'Spark provides a productive environment for data analysis because of its lightning speed and support for various libraries.']

In [40]:
#You have already built lines RDD in the filter example.

words = lines.map(lambda x:x.split(" "))

words.collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[['Apache', 'Spark', 'is', 'a', 'unified', 'analytics', 'engine', 'for', 'large-scale', 'data', 'processing.'], ['It', 'is', 'an', 'open-source', 'distributed-computing', 'engine.'], ['Spark', 'provides', 'a', 'productive', 'environment', 'for', 'data', 'analysis', 'because', 'of', 'its', 'lightning', 'speed', 'and', 'support', 'for', 'various', 'libraries.']]

In [41]:
#You have already built lines RDD in the filter example.

words = lines.flatMap(lambda x:x.split(" "))

words.collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['Apache', 'Spark', 'is', 'a', 'unified', 'analytics', 'engine', 'for', 'large-scale', 'data', 'processing.', 'It', 'is', 'an', 'open-source', 'distributed-computing', 'engine.', 'Spark', 'provides', 'a', 'productive', 'environment', 'for', 'data', 'analysis', 'because', 'of', 'its', 'lightning', 'speed', 'and', 'support', 'for', 'various', 'libraries.']

In [42]:
#Distinct return a new RDD containing the distinct elements in this RDD. 
words = lines.flatMap(lambda x:x.split(" "))

words.distinct().count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

29

- **Sorted function** -  This method is not used to perform an operation on an RDD but to sort the elements in a list

In [43]:
#sort the elements in a list
rdd1 = sc.parallelize([1,5,1,3,2,3,5])
sorted(rdd1.distinct().collect())


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[1, 2, 3, 5]

- **Union function** -  This operation will work on two RDDs and will result in an output that contains all the elements present in both the RDDs.

In [44]:
#Union Return the union of this RDD and another one.
rdd = sc.parallelize([1, 1, 2, 3])
rdd_union = rdd.union(rdd)
rdd_union.collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[1, 1, 2, 3, 1, 1, 2, 3]

- **Intersection function** -  This operation will work on two RDDs and will result in an output that contains only those elements that are present in both the RDDs.

In [45]:
#intersection 
# Return the intersection of this RDD and another one. 
# The output will not contain any duplicate elements, even if the input RDDs did.

# NOTE : This method performs a shuffle internally
rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
sorted(rdd1.intersection(rdd2).collect())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[1, 2, 3]

- **Subtract function** -  This operation will work on two RDDs and will result in an output that contains all the elements present in rdd1 but not those present in rdd2.

In [46]:
#subtract
#Return each value in self that is not contained in other
x = sc.parallelize([ 1,2,3,4,5])
y = sc.parallelize([2,3,4])
sorted(x.subtract(y).collect())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[1, 5]

- **Cartesian function** - This operation will work on two RDDs and will result in an output that contains pairs of each element of rdd1 with each element of rdd2.


In [47]:
#cartesian
#Return the Cartesian product of this RDD and another one, that is, 
# the RDD of all pairs of elements (a, b) 
#where a is in self and b is in other.
rdd = sc.parallelize([1, 2])
rdd2 = sc.parallelize([3,4])
sorted(rdd.cartesian(rdd2).collect())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[(1, 3), (1, 4), (2, 3), (2, 4)]

### Action functions 

- **Collect function** - Return a list that contains all of the elements in this RDD.

In [48]:
# NOTE : This method should only be used if the resulting array is expected to be small, 
#as all the data is loaded into the driver’s memory.

rdd = sc.parallelize([1, 2, 3, 4])
rdd.collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[1, 2, 3, 4]

- **Count function** - Return the number of elements in this RDD.

In [49]:
sc.parallelize([2, 3, 4]).count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

3

- **countByValue function** - Return the count of each unique value in this RDD as a dictionary of (value, count) pairs.

In [50]:

sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

dict_items([(1, 2), (2, 3)])

In [51]:
#countByValue
#Return the count of each unique value in this RDD
#as a dictionary of (value, count) pairs.
words.countByValue().items()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

dict_items([('Apache', 1), ('Spark', 2), ('is', 2), ('a', 2), ('unified', 1), ('analytics', 1), ('engine', 1), ('for', 3), ('large-scale', 1), ('data', 2), ('processing.', 1), ('It', 1), ('an', 1), ('open-source', 1), ('distributed-computing', 1), ('engine.', 1), ('provides', 1), ('productive', 1), ('environment', 1), ('analysis', 1), ('because', 1), ('of', 1), ('its', 1), ('lightning', 1), ('speed', 1), ('and', 1), ('support', 1), ('various', 1), ('libraries.', 1)])

- **take(num) function** - Take the first num elements of the RDD.

In [52]:
# It works by first scanning one partition, and use the results from
# that partition to estimate the number of additional partitions needed 
# to satisfy the limit.

# Translated from the Scala implementation in RDD#take().


#Note this method should only be used if the resulting array is expected to be small, 
#as all the data is loaded into the driver’s memory.

sc.parallelize([2, 3, 4, 5, 6]).take(4)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[2, 3, 4, 5]

- **top(num) function** - Get the top N elements from an RDD.

In [53]:
#Note This method should only be used if the resulting array is 
#expected to be small, as all the data is loaded into the driver’s memory.

#Note It returns the list sorted in descending order.

sc.parallelize([2, 3, 4, 5, 6], 2).top(2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[6, 5]

- **reduce function** - Reduces the elements of this RDD using the specified commutative and associative binary operator. 

In [54]:
#Currently reduces partitions locally.
sc.parallelize([9,3,1]).reduce(lambda x,y :x/y)
# We can pass the custom function inside this reduce function.

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

3.0

- **fold function** - Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral “zero value.

In [55]:
# The function op(t1, t2) is allowed to modify t1 and return it as its result value 
# to avoid object allocation; however, it should not modify t2.

from operator import add

sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

15

In [56]:
sc.parallelize([1, 2, 3, 4, 5]).fold(1, lambda x,y :x*y)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

120

- **Aggregate function** - Aggregate the elements of each partition, and then the results for all the partitions, using a given combine functions and a neutral “zero value.”

In [57]:
# The functions op(t1, t2) is allowed to modify t1 and return it as 
# its result value to avoid object allocation; however, it should not modify t2.

# The first function (seqOp) can return a different result type, U, 
# than the type of this RDD. Thus, we need one operation for merging 
# a T into an U and one operation for merging two U
rdd = sc.parallelize([1, 2, 3, 4])
seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
# above step should get us : (1,1),(2,1),(3,1), (4,1)
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
rdd.aggregate((0,0), seqOp, combOp)



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(10, 4)

- **Foreach function** - Applies a function to all elements of this RDD.

In [58]:
#foreach
def f(x): print(x)
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.foreach(f)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…