### Transformations
##### Map Function

In [0]:
data = [1,2,3,4,5,6]
rdd = sc.parallelize(data)

In [0]:
# 1. map
print("#####  1. map ###")
print("Description: Return a new RDD by applying a function to all elements of this RDD.")
# Example 1: Multiply each element by 2
simple_map = rdd.map(lambda x: x * 2).collect()
print("01 map example (multiply by 2):", simple_map)

#####  1. map ###
Description: Return a new RDD by applying a function to all elements of this RDD.
01 map example (multiply by 2): [2, 4, 6, 8, 10, 12]


In [0]:
# Example 2: Extract the length of each word in a list of sentences
sentences = ["Hello world", "Apache Spark", "RDD transformations Wide Vs Narrow Spark"]
sentences_rdd = sc.parallelize(sentences)

In [0]:
words_map = sentences_rdd.map(lambda s: len(s.split(" "))).collect()
print("example_map example (word count in sentences):", words_map)

example_map example (word count in sentences): [2, 2, 6]


##### Filter Function

In [0]:
# 01 Example: Filter out even numbers
simple_filter = rdd.filter(lambda x: x % 2 == 0).collect()
print("01 filter example (even numbers):", simple_filter)

01 filter example (even numbers): [2, 4, 6]


In [0]:
# example_Example: Filter sentences containing the word 'Spark'
words_filter = sentences_rdd.filter(lambda sentence: "Spark" in sentence).collect()
print("example_ filter example (sentences with 'Spark'):", words_filter)

example_ filter example (sentences with 'Spark'): ['Apache Spark', 'RDD transformations Wide Vs Narrow Spark']


##### FlatMap Function

In [0]:
# 3. flatMap
simple_flatMap = sentences_rdd.flatMap(lambda sentence: sentence.split(" ")).collect()
print("02 flatMap example (split sentences into words):", simple_flatMap)

02 flatMap example (split sentences into words): ['Hello', 'world', 'Apache', 'Spark', 'RDD', 'transformations', 'Wide', 'Vs', 'Narrow', 'Spark']


In [0]:

# example_Example: Flatten a list of lists
nested_lists = [[1, 2, 3], [4, 5], [6, 7, 8, 9]]
nested_rdd = sc.parallelize(nested_lists)
flatten_list = nested_rdd.flatMap(lambda x: x).collect()
print("flatten_list  flatMap example (flatten list of lists):", flatten_list)

flatten_list  flatMap example (flatten list of lists): [1, 2, 3, 4, 5, 6, 7, 8, 9]


##### Reduce Function


In [0]:
# 4. reduce
# 01 Example: Sum of elements
simple_reduce = rdd.reduce(lambda x, y: x + y)
print("01 reduce example (sum of elements):", simple_reduce)


01 reduce example (sum of elements): 21


In [0]:
# 02 Example: Find the longest word in a list of words
words = ["cat", "elephant", "rat", "hippopotamus"]
words_rdd = sc.parallelize(words)
words_rdd_reduced = words_rdd.reduce(lambda x, y: x if len(x) > len(y) else y)
print("reduce example (longest word):", words_rdd_reduced)

reduce example (longest word): hippopotamus


##### groupByKey Function
**Definition**: GroupByKey collects all values associated with the same key into a single list.

**Use Case**: Useful when you need to process or analyze all values for each key separately.

**Efficiency**: Less efficient because it involves shuffling all values across the network and can result in high memory usage.

In [0]:
# 5. groupByKey
print("Description: Group the values for each key in the RDD into a single sequence.")

# 01 Example: Group numbers by even and odd
pairs = [(1, 'a'),(1, 'ali'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e')]
pairs_rdd = sc.parallelize(pairs)
simple_groupByKey = pairs_rdd.groupByKey().mapValues(list).collect()
print("01 groupByKey example (group numbers):", simple_groupByKey)

Description: Group the values for each key in the RDD into a single sequence.
01 groupByKey example (group numbers): [(1, ['a', 'ali']), (2, ['b']), (3, ['c']), (4, ['d']), (5, ['e'])]


In [0]:
# 02 Example: Group words by their starting letter
words_pairs = [("cat", 1), ("car", 2), ("dog", 3), ("deer", 4), ("elephant", 5),("elephant", 20)]
words_rdd = sc.parallelize(words_pairs)
# mapValues(list) converts the grouped values (which are iterable) into lists.
words_grouped = words_rdd.groupByKey().mapValues(list).collect()
print("words_grouped example (group words by starting letter):", words_grouped)

words_grouped example (group words by starting letter): [('elephant', [5, 20]), ('dog', [3]), ('cat', [1]), ('car', [2]), ('deer', [4])]


##### reduceByKey Function
**Definition**: ReduceByKey combines values associated with the same key using a specified reduction function, resulting in a single value per key.

**Use Case**: Ideal for operations that can aggregate values (like summing) and where intermediate results can be reduced locally before shuffling.

**Efficiency**: More efficient due to local aggregation before shuffling, reducing network and memory overhead.

In [0]:
# 6. reduceByKey
print("Description: Merge the values for each key using an associative and commutative reduce function.")
pairs = [(1, 'a'),(1, '_a'), (2, 'b'), (2, '_b'), (3, 'c'), (4, 'd'), (5, 'e')]
pairs_rdd = sc.parallelize(pairs)

# 01 Example: Sum values with the same key
simple_reduceByKey = pairs_rdd.reduceByKey(lambda x, y: x + y).collect()
print("01 reduceByKey example (sum values by key):", simple_reduceByKey)

Description: Merge the values for each key using an associative and commutative reduce function.
01 reduceByKey example (sum values by key): [(1, 'a_a'), (2, 'b_b'), (3, 'c'), (4, 'd'), (5, 'e')]


In [0]:
# example_Example: Count the occurrences of each word in a list
word_list = ["cat", "cat", "dog", "elephant", "dog", "dog"]
word_pairs_rdd = sc.parallelize(word_list).map(lambda word: (word, 1))
example__reduceByKey = word_pairs_rdd.reduceByKey(lambda x, y: x + y).collect()
print("example_ reduceByKey example (word count):", example__reduceByKey)

example_ reduceByKey example (word count): [('elephant', 1), ('dog', 3), ('cat', 2)]
