In [1]:
# Import Libraries
from pyspark.context import SparkContext

In [2]:
# Create Spark Context

sc = SparkContext(master="local", 
                         appName="Low Level API").getOrCreate()

In [3]:
# Show Spark Context information
sc

### CREATE RDD

In [4]:
# Create RDD from a list
# Main way is using parallelize method

rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])


In [5]:
# Calling the RDD variable is not the same as
# calling the RDD values
rdd

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:287

## EXAMPLE OF TRANSFORMATIONS ON RDDs

In order to view any results stored within an RDD after creating one, we must use the collect() method. This method will return all the elements of the RDD as an array at the driver program. This is usually useful when we have a small dataset that we want to view. However, if we have a large dataset, we can use the take() method to return a small number of elements from the RDD. The take() method returns an array of the first n elements of the RDD at the driver program.

In [6]:
# Print values from rdd using take action
rdd.take(5)

[1, 2, 3, 4, 5]

### Filter

You can use the filter action together with a lambda function to filter out elements from an RDD. The filter action takes in a lambda function that returns a boolean value. If the lambda function returns true, the element will be kept in the RDD. If the lambda function returns false, the element will be filtered out of the RDD.

In [7]:
# Filter out even numbers
rdd.filter(lambda x: x%2 == 0).collect()

[2, 4, 6, 8, 10]

### Sample

The Sample action allows you to take a sample of the RDD. The sample action takes in three parameters. The first parameter is whether the sampling is done with replacement or not. The second parameter is the sample size as a fraction. The third parameter is the random seed. The sample action returns an RDD with the sampled elements.

In [8]:
rdd.sample(withReplacement=False, 
           fraction=0.4,
           seed=23).collect()

[2, 4, 5, 10]

### Map

The map action allows you to apply a function to each element in the RDD. The map action takes in a lambda function that returns a new value for each element in the RDD. The map action returns an RDD with the new values.

In [9]:
rdd.map(lambda x: (x, str(x))).take(5)

[(1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5')]

It's important to pay attention to the difference between Map and flatMap. The map action takes in a lambda function that returns a single element. The flatMap action takes in a lambda function that returns an iterable object. The flatMap action returns an RDD with the elements from all the iterable objects.

### flatMap

In [10]:
rdd.flatMap(lambda x: (x, str(x))).take(6)

[1, '1', 2, '2', 3, '3']

## EXAMPLE USE OF RDDs

In [11]:
# Read a text file
lyrics = sc.textFile('./tmntLyrics.txt')

In [12]:
# Show the first 5 lines
lyrics.take(5)

['Teenage Mutant Ninja Turtles',
 'Teenage Mutant Ninja Turtles',
 'Teenage Mutant Ninja Turtles',
 'Heroes in a Half-shell Turtle Power',
 'Here we go']

In [13]:
# Turn all words to lowercase
lyrics = lyrics.map(lambda x: x.lower())
lyrics.take(5)

['teenage mutant ninja turtles',
 'teenage mutant ninja turtles',
 'teenage mutant ninja turtles',
 'heroes in a half-shell turtle power',
 'here we go']

In [14]:
# Split the lines into words
lyrics = lyrics.flatMap(lambda x: x.split())

In [15]:
# Map each word to a tuple (word, 1)

lyrics = lyrics.map(lambda x: (x, 1))

In [16]:
# Reduce by key to count the number of times each word appears
# Here is an example to show the output of reduceByKey
lyrics.reduceByKey(lambda x,y: x+y).takeOrdered(10, key=lambda x: -x[1])

[('the', 13),
 ('ninja', 8),
 ('teenage', 6),
 ('mutant', 6),
 ('turtles', 6),
 ('a', 5),
 ('to', 4),
 ('in', 3),
 ('team', 3),
 ('heroes', 2)]

> **Note:** You have to understand how this transformation works. It can be quite tricky.

reduceByKey works by applying the function provided in the first parameter to all the values that have the same key. The function provided in the first parameter must be commutative and associative. The reduceByKey action returns an RDD with the same key and a single value.

In [17]:
# Assign the output to a new variable
wordCount = lyrics.reduceByKey(lambda x,y: x+y)

In [18]:
# It is also possible to take elements in an ordered way
# The negative sign is to sort in descending order
wordCount.takeOrdered(10, key=lambda x: -x[1])

[('the', 13),
 ('ninja', 8),
 ('teenage', 6),
 ('mutant', 6),
 ('turtles', 6),
 ('a', 5),
 ('to', 4),
 ('in', 3),
 ('team', 3),
 ('heroes', 2)]

For the record, it is possible to chain all of the steps we took right before saving the RDD into a single line of code. However, it is not recommended to do so. It is better to break down the steps into multiple lines of code for readability purposes.

In [19]:
# An example of word count using chaining transformations

(sc.textFile('./tmntLyrics.txt') # Read text file 
 .map(lambda x: x.lower()) # Lowercase conversion
 .flatMap(lambda x: x.split())  # Split lines into words
 .map(lambda x: (x, 1)) # Map each word to a tuple (word, 1)
 .reduceByKey(lambda x,y: x+y)  # Reduce by key to count
 .takeOrdered(10, key=lambda x: -x[1])) # Take top 10 words

[('the', 13),
 ('ninja', 8),
 ('teenage', 6),
 ('mutant', 6),
 ('turtles', 6),
 ('a', 5),
 ('to', 4),
 ('in', 3),
 ('team', 3),
 ('heroes', 2)]

In [21]:
# Saving the result to a text file
wordCount.saveAsTextFile('./wordCount')

## Stop Spark

In [22]:
# Stop Spark Context
sc.stop()