In [2]:
from pyspark import SparkContext

In [4]:
# Initializing
#sc = SparkContext(master='local', appName='RDD')

In [5]:
# Parallelized Collections

data = [1,2,3,4,5]
distributed_dataset = sc.parallelize(data, numSlices=5)

'''
The parameter "numSlices" is the partition that users pre-define.
Typically, 2-4 partitions for each CPU in the cluster.
Spark tries to set the value of partitions automatically.
'''

print(distributed_dataset, '\n', type(distributed_dataset))
print(distributed_dataset.collect())   # Return a list that contains ALL of the elements in this RDD.
print(f'The number of partitions of RDD is {distributed_dataset.getNumPartitions()}')


ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:194 
 <class 'pyspark.rdd.RDD'>


[1, 2, 3, 4, 5]
The number of partitions of RDD is 5


In [4]:
# External Datasets

distributed_file = sc.textFile('./files/README.md')
print('First element: ', distributed_file.first())  # Return the first element in the RDD
print('Number of elements: ', distributed_file.count())  # Return the number of elements in the RDD
print(distributed_file.first())

First element:  # Apache Spark
Number of elements:  103
# Apache Spark


In [1]:
# RDD Transformations and Actions
'''
Transformation: Create a new dataset from an existing one. Return a new RDD.
Action: Return other types of datasets
'''

'''
Transformation:
1. map()
2. flatMap()
3. filter()
4. distinct()
5. sample(withReplacement, fraction, [seed])
6. union()
7. intersection()
8. subtract()
9. cartesian()
10. repartition()
11. coalesce()
'''

'\nTransformation:\n1. map()\n2. flatMap()\n3. filter()\n4. distinct()\n5. sample(withReplacement, fraction, [seed])\n6. union()\n7. intersection()\n8. subtract()\n9. cartesian()\n10. repartition()\n11. coalesce()\n'

In [1]:
data1 = sc.parallelize([1,2,3,4,5,5], 2)
data2 = sc.parallelize([7,6,5,4,3])

data1.persist()  # save RDD into RAM for multiple uses
data2.persist()

# In Python, persist() is the same to the cache(), which is "MEMORY_ONLY" level.
# Use persist() only if other storage level is required.

ParallelCollectionRDD[1] at parallelize at PythonRDD.scala:194

In [3]:
mapExample = data1.map(lambda x: x+1)
print('map function example: ', mapExample.collect())

map function example:  [2, 3, 4, 5, 6, 6]


In [4]:
flatMapExample = data1.flatMap(lambda x: range(x, 5))   # iterate each element to 4
print('flatMap function example: ', flatMapExample.collect())

flatMap function example:  [1, 2, 3, 4, 2, 3, 4, 3, 4, 4]


In [5]:
filterExample = data1.filter(lambda x: x <= 3)
print('filter function example: ', filterExample.collect())

filter function example:  [1, 2, 3]


In [8]:
distinctExample = data1.distinct()
print('distinct function example: ', distinctExample.collect())

distinct function example:  [2, 4, 1, 3, 5]


In [7]:
sampleExample = data1.sample(withReplacement=False, fraction=0.2)
print('sample function example: ', sampleExample.collect())

sample function example:  [2, 3, 5]


In [9]:
unionExample = data1.union(data2)
print('union function example: ', unionExample.collect())

union function example:  [1, 2, 3, 4, 5, 5, 7, 6, 5, 4, 3]


In [10]:
intersectionExample = data1.intersection(data2)
print('intersection function example: ', intersectionExample.collect())


intersection function example:  [3, 4, 5]


In [11]:
subtractExample = data1.subtract(data2)
print('subtract function example: ', subtractExample.collect())

subtract function example:  [1, 2]


In [12]:
cartesianExample = data1.cartesian(data2)
print('cartesian function example: ', cartesianExample.collect())

cartesian function example:  [(1, 7), (2, 7), (3, 7), (1, 6), (2, 6), (3, 6), (1, 5), (2, 5), (3, 5), (1, 4), (2, 4), (3, 4), (1, 3), (2, 3), (3, 3), (4, 7), (5, 7), (5, 7), (4, 6), (5, 6), (5, 6), (4, 5), (5, 5), (5, 5), (4, 4), (5, 4), (5, 4), (4, 3), (5, 3), (5, 3)]


In [13]:
repartitionExample = data1.repartition(5)
print(f'repartition function example: Original partitions: {data1.getNumPartitions()}, New partitions: {repartitionExample.getNumPartitions()}')


repartition function example: Original partitions: 2, New partitions: 5


In [15]:
coalesceExample = data1.coalesce(1)
print(f'coalesce function example: Original partitions: {data1.getNumPartitions()}, New partitions: {coalesceExample.getNumPartitions()}')

# https://stackoverflow.com/a/31612810
# This answer well explained the differences between repartition() and coalesce().
# coalesce() avoids a full shuffle.
# coalesce() only moves the data off the extra partitions, onto the partitions that we kept.
# repartition() always shuffles all data over the network.


coalesce function example: Original partitions: 2, New partitions: 1


In [16]:
'''
Actions:
1. collect()
2. count()
3. countByValue()
4. take()
5. top()
6. takeOrdered()
7. takeSample(withReplacement, num, [seed])
8. reduce()
9. fold(zeroValue, op)
10. foreach()
11. aggregate()
'''

'\nActions:\n1. collect()\n2. count()\n3. countByValue()\n4. take()\n5. top()\n6. takeOrdered()\n7. takeSample(withReplacement, num, [seed])\n8. reduce()\n9. fold(zeroValue, op)\n10. foreach()\n11. aggregate()\n'

In [17]:
collectExample = data1.collect()
print('collect function example: ', collectExample)

collect function example:  [1, 2, 3, 4, 5, 5]


In [18]:
countExample = data1.count()   # return the number of elements
print('count function example: ', countExample)

count function example:  6


In [19]:
countByValueExample = data1.countByValue()  # return a dictionary
print('countByValue function example: ', countByValueExample)

countByValue function example:  defaultdict(<class 'int'>, {1: 1, 2: 1, 3: 1, 4: 1, 5: 2})


In [20]:
takeExample = data1.take(3)   # take first 3 elements
print('take function example: ', takeExample)

take function example:  [1, 2, 3]


In [21]:
topExample = data1.top(2)     # take the largest 2 elements
print('top function example: ', topExample)


top function example:  [5, 5]


In [22]:
takeOrderedExample = data1.takeOrdered(num=3, key=lambda x: -x)  # take first 3 elements by descending sorted
print('takeOrdered function example: ', takeOrderedExample)

takeOrdered function example:  [5, 5, 4]


In [23]:
takeSampleExample = data1.takeSample(withReplacement=False, num=2)
print('takeSample function example: ', takeSampleExample)

takeSample function example:  [5, 3]


In [24]:
reduceExample = data1.reduce(lambda x, y: x+y)  # Equals to sum()
print('reduce function example: ', reduceExample)

reduce function example:  20


In [25]:
foldExample = data1.fold(zeroValue=0, op=lambda x,y: x+y)
# fold() is the almost same to the reduce(),
# while the difference is that fold() needs to pre-define a start value, aka zeroValue.
print('fold function example: ', foldExample)

fold function example:  20


In [27]:
foreachExample = data1.foreach(print)
# Both foreach() and map() are designed to iterate all elements in RDD.
# The differences are:
# 1. map() is a transformation, foreach() is a action.
# 2. foreach() returns none (void), and map() returns a new RDD.

In [2]:
aggregateExample = data1.aggregate(zeroValue=(0,0),
                                   seqOp=lambda x,y: (x[0] + y, x[1] + 1),
                                   combOp=lambda x,y: (x[0] + y[0], x[1] + y[1]))
# https://stackoverflow.com/a/38949457
# This answer well explained aggregate function and zeroValue.
print('aggregate function example: ', aggregateExample)

aggregate function example:  (20, 6)
