In [1]:
import findspark
findspark.init("/Users/imadelhanafi/anaconda/lib/python2.7/site-packages/pyspark")
import pyspark

In [3]:
from pyspark import SparkConf, SparkContext

# Creating RDD
# Creat SparkContext (the master)
conf = SparkConf().setMaster("local").setAppName("exercices")
sc = SparkContext(conf = conf)

# Creat data

data = xrange(1, 10001)

#Each partition holds a unique subset of the entries in the list. Spark calls datasets that it stores "Resilient Distributed Datasets" (RDDs).

xrangeRDD = sc.parallelize(data, 8)
print 'xrangeRDD id: {0}'.format(xrangeRDD.id())
xrangeRDD.setName('My first RDD')


xrangeRDD id: 1


In [9]:
# View Lineage of an RDD '(set of transformations)
print xrangeRDD.toDebugString()


(8) My first RDD PythonRDD[1] at RDD at PythonRDD.scala:48 []
 |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:480 []


In [10]:
# Get number of partitions 
# When the operation on one entry doesn't effect the operations on any of the other entries, we can parallelize the operation.¶

xrangeRDD.getNumPartitions()


8

In [12]:
######## Map() transformation
#When applying the map() transformation, each item in the parent RDD will map to one element in the new RDD

def add(value):
    """"add one to value
    """
    return (value + 1)

# Transform xrangeRDD through map transformation using sub function

addRDD = xrangeRDD.map(add)

# Let's see the RDD transformation hierarchy
print addRDD.toDebugString()

(8) PythonRDD[5] at RDD at PythonRDD.scala:48 []
 |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:480 []


In [21]:
##### collect to view results 


#print addRDD.collect()
# Print only 5 first elements

print addRDD.take(5)
print xrangeRDD.count()


[2, 3, 4, 5, 6]
10000


In [25]:
###### Apply filters 

# Define a function to filter
def five(value): return value < 5
"""
    True if value is smaller than five
    False otherwise
"""

# Filter is a transformation so no tasks are run
filteredRDD = addRDD.filter(five)

# View the results using collect()
# Collect is an action so the filter transformation is runned

print filteredRDD.collect()

[2, 3, 4]


In [27]:
#### Another way using the lambda function

#collect the odd values less than 10
oddRDD = filteredRDD.filter(lambda x: x % 2 != 0)
oddRDD.collect()

[3]

In [34]:
#### Advanced actions and transformations

In [35]:
# Retrieve the three smallest elements
print filteredRDD.takeOrdered(3)
# Retrieve the five largest elements
print filteredRDD.top(5)


[2, 3, 4]
[4, 3, 2]


In [36]:
#reduce() action reduces the elements of a RDD to a single value by applying a function that takes two parameters and returns a single value.

print filteredRDD.reduce(lambda a, b: a + b)


9


In [37]:
# Create new base RDD to show countByValue
repRDD = sc.parallelize([1, 2, 3, 1, 2, 3, 1, 2, 1, 2, 3, 3, 3, 4, 5, 4, 6])
print repRDD.countByValue()

defaultdict(<type 'int'>, {1: 4, 2: 4, 3: 5, 4: 2, 5: 1, 6: 1})


In [39]:
#Map vs flatMap -  RDD transformations 

# Let's create a new base RDD to work from
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList)

# Use map
singularAndPluralWordsRDDMap = wordsRDD.map(lambda x: (x, x + 's'))
# Use flatMap
singularAndPluralWordsRDD = wordsRDD.flatMap(lambda x: (x, x + 's'))

# View the results
print singularAndPluralWordsRDDMap.collect()
print singularAndPluralWordsRDD.collect()

[('cat', 'cats'), ('elephant', 'elephants'), ('rat', 'rats'), ('rat', 'rats'), ('cat', 'cats')]
['cat', 'cats', 'elephant', 'elephants', 'rat', 'rats', 'rat', 'rats', 'cat', 'cats']


In [43]:
# groupByKey 

pairRDD = sc.parallelize([('a', 1), ('a', 2), ('b', 1)])
print pairRDD.groupByKey().mapValues(lambda x: list(x)).collect()

print pairRDD.groupByKey().mapValues(lambda x: sum(x)).collect()


[('a', [1, 2]), ('b', [1])]
[('a', 3), ('b', 1)]
