In [None]:
from pyspark import SparkContext
sc = SparkContext()

## CREATING RDDs USING PYTHON COLLECTIONS

In [None]:
sc.parallelize(['this', 'is', 'an', 'example'])

In [None]:
range_rdd = sc.parallelize(range(1,100))
range_rdd.collect()
# range_rdd.count()

## CREATING RDDs USING TEXT FILES

In [None]:
rdd_hp = sc.textFile('data/HP.txt')

In [None]:
rdd_hp # Check the type of the RDD

In [None]:
# Check the contents of the rdd created
rdd_hp.collect()

In [None]:
# Check the total elements in the RDD

## SPARK TRANSFORMATIONS & ACTIONS

#### map(func) : Return a new distributed dataset formed by passing each element of the source through a function func. 

In [None]:
ten_range_rdd = sc.parallelize(range(1,10)) # [1,2,.....10]
ten_range_rdd_mapped = ten_range_rdd.map(lambda x: (x, x)) # RDD created from an RDD
ten_range_rdd_mapped.collect()  # collect is an action!

In [None]:
ten_range_rdd_mapped_again = ten_range_rdd_mapped.map(lambda x: (x[0]*2, x[1])).collect() # Python collection
print(ten_range_rdd_mapped_again)

#### flatMap(func) : Similar to map, but flattens the final result. 

In [None]:
ten_range_rdd_flatmapped = ten_range_rdd_mapped.flatMap(lambda x: (x[0], x[1] + 1))
ten_range_rdd_flatmapped.collect()

#### filter(func) : return a new dataset formed by selecting those elements of the source on which func returns true. 

In [None]:
rdd_hp.collect() # Get back to the RDD we created

In [None]:
rdd_hp_filtered = rdd_hp.filter(lambda line: line.startswith('If'))
rdd_hp_filtered.collect()

##### USER EXERCISE

In [None]:
# Convert all the lines to Uppercase

In [None]:
# Remove the lines which contain the word "HIDDEN” in it

In [None]:
# Create a new RDD with elements that are tuples (x, y), where x represents a line and y is the length of that line

In [None]:
# Remove lines that are longer than 30 characters

In [None]:
# Arrange all the words in a single list and count the total number of words

In [None]:
# Arrange the words with the length of each word in a tuple for eg. ("word", 4)

#### groupByKey() : When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable &lt;V&gt;) pairs. 

In [None]:
rdd_places = sc.parallelize([("Finland", "Helsinki"), ("Norway", "Oslo"), ("Sweden", "Stockholm"),
                             ("Denmark", "Copenhagen"), ("Norway", "Bergen"), ("Finland", "Tampere"),
                             ("Denmark", "Aarhus"), ("Finland", "Turku")])
rdd_places.collect()

In [None]:
rdd_places_grouped = rdd_places.groupByKey()
rdd_places_grouped.collect()

In [None]:
# User readable format

#### join(otherDataset, [numTasks]) : When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are also supported

In [None]:
rdd_places_other = sc.parallelize([("Finland", "Espoo"), ("Norway", "Stavanger"), ("Sweden", "Gothenburg"), ("Finland", "Vantaa")])
rdd_places_other.collect()

In [None]:
rdd_places.join(rdd_places_other).collect()

In [None]:
rdd_places_grouped.mapValues(list).join(rdd_places_other).collect()

#### cogroup(otherDataset, [numTasks]) : When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable&lt;V&gt; , Iterable&lt;W&gt;)) tuples.

In [None]:
rdd_places_other_cogrouped = rdd_places.cogroup(rdd_places_other)
rdd_places_other_cogrouped.collect()

In [None]:
rdd_places_other_cogrouped.map(lambda x: (x[0], (list(x[1][0]), list(x[1][1])))).collect()

#### sortByKey([ascending], [numTasks]) : When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

In [None]:
# Sort by key
rdd_places.sortByKey().collect()

#### reduceByKey(func, [numTasks]) 	When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V.

In [None]:
rdd_reduce_example = sc.parallelize([("Messi", 2), ("Suarez", 2), ("Ronaldo", 0), ("Ronaldo", 3), ("Messi", 2)])
rdd_reduce_example.reduceByKey(lambda a,b: a+b).collect()

In [None]:
# Sort by value

## SOME RDD ACTIONS

In [None]:
rdd_places.collect()

In [None]:
# Check the total elements in rdd_places

In [None]:
# Output the first element in the rdd

In [None]:
# Output an array with the first two elements

In [None]:
# Output a sample of 2 random elements with replacement

In [None]:
# Output the first 2 elements by using their natural ordering

In [None]:
# Output the total number of elements for each key