In [1]:
from pyspark import SparkContext

# Create a SparkContext instance
sc = SparkContext("local", "ExampleRDDApp")

In [2]:
# Example RDD

data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)

sentences = ["Hello World", "Apache Spark", "RDD Transformations Wide Vs Narrow in Spark"]
sentences_rdd = sc.parallelize(sentences)

# Transformations

## `Map` Function

In [3]:
# 1. map
print("### 1. map ###")
print("Description: Return a new RDD by applying a function to all elements of this RDD.")

### 1. map ###
Description: Return a new RDD by applying a function to all elements of this RDD.


In [4]:
# Example 1: Multiply each element by 2
simple_map = rdd.map(lambda x: x * 2).collect()
print(f"01 map example (multiply by 2): {simple_map}")

01 map example (multiply by 2): [2, 4, 6, 8, 10]


In [5]:
# Example 2: Extract the length of each word in a list of sentences
word_map = sentences_rdd.map(lambda sentence: len(sentence.split(" "))).collect()
print(f"02 word length map example (word count in sentences): {word_map}")

02 word length map example (word count in sentences): [2, 2, 7]


## `Filter` Function

In [6]:
# 2. filter
print("### 2. filter ###")
print("Description: Return a new RDD containing only the elements that satisfy a predicate.")

### 2. filter ###
Description: Return a new RDD containing only the elements that satisfy a predicate.


In [7]:
# 01 Example: Filter out the even numbers
simple_filter = rdd.filter(lambda x: x % 2 == 0).collect()
print(f"01 filter example (even numbers): {simple_filter}")

01 filter example (even numbers): [2, 4]


In [8]:
# 02 Example: Filter sentences containing the word 'Spark'
words_filter = sentences_rdd.filter(lambda sentence: "Spark" in sentence).collect()
print(f"02 word example (sentences with 'Spark'): {words_filter}")

02 word example (sentences with 'Spark'): ['Apache Spark', 'RDD Transformations Wide Vs Narrow in Spark']


## `FlatMap` Function

In [9]:
# 3. flatMap
print("### 3. flatMap ###")
print("Description: Return a new RDD by applying a function to all elements of this RDD and then flattening the results.")

### 3. flatMap ###
Description: Return a new RDD by applying a function to all elements of this RDD and then flattening the results.


In [10]:
# 01 Example: Split sentences into words
sentences_mapped = sentences_rdd.map(lambda sentence: sentence.split(" ")).collect()
print(f"01 sentences mapped: {sentences_mapped}")

simple_flatMap = sentences_rdd.flatMap(lambda sentence: sentence.split(" ")).collect()
print(f"02 flatMap example (split sentences into words): {simple_flatMap}")

01 sentences mapped: [['Hello', 'World'], ['Apache', 'Spark'], ['RDD', 'Transformations', 'Wide', 'Vs', 'Narrow', 'in', 'Spark']]
02 flatMap example (split sentences into words): ['Hello', 'World', 'Apache', 'Spark', 'RDD', 'Transformations', 'Wide', 'Vs', 'Narrow', 'in', 'Spark']


In [11]:
# example_Example: Flatten a list of lists
nested_lists = [[1, 2, 3], [4, 5], [6, 7, 8, 9]]
nested_rdd = sc.parallelize(nested_lists)
flatten_list = nested_rdd.flatMap(lambda x: x).collect()
print("flatten_list  flatMap example (flatten list of lists):", flatten_list)

flatten_list  flatMap example (flatten list of lists): [1, 2, 3, 4, 5, 6, 7, 8, 9]


## `Reduce` Function

In [12]:
# 4. reduce
print("\n#####  4. reduce ###")
print("Description: Reduces the elements of this RDD using the specified commutative and associative binary operator.")


#####  4. reduce ###
Description: Reduces the elements of this RDD using the specified commutative and associative binary operator.


In [13]:
# 01 Example: Sum of elements
simple_reduce = rdd.reduce(lambda x, y: x + y)
print("01 reduce example (sum of elements):", simple_reduce)

01 reduce example (sum of elements): 15


In [14]:
# example_Example: Find the longest word in a list of words
words = ["cat", "elephant", "rat", "hippopotamus"]
words_rdd = sc.parallelize(words)
words_rdd_reduced = words_rdd.reduce(lambda x, y: x if len(x) > len(y) else y)
print("reduce example (longest word):", words_rdd_reduced)

reduce example (longest word): hippopotamus


## `groupByKey` Function

In [15]:
# 5. groupByKey
print("\n#####  5. groupByKey ###")
print("Description: Group the values for each key in the RDD into a single sequence.")


#####  5. groupByKey ###
Description: Group the values for each key in the RDD into a single sequence.


In [16]:
# 01 Example: Group numbers by even and odd
pairs = [(1, 'a'),(1, 'ali'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e')]
pairs_rdd = sc.parallelize(pairs)
simple_groupByKey = pairs_rdd.groupByKey().mapValues(list).collect()
print("01 groupByKey example (group numbers):", simple_groupByKey)

01 groupByKey example (group numbers): [(1, ['a', 'ali']), (2, ['b']), (3, ['c']), (4, ['d']), (5, ['e'])]


In [17]:
# example_Example: Group words by their starting letter
words_pairs = [("cat", 1), ("car", 2), ("dog", 3), ("deer", 4), ("elephant", 5),("elephant", 20)]
words_rdd = sc.parallelize(words_pairs)
# mapValues(list) converts the grouped values (which are iterable) into lists.
words_grouped = words_rdd.groupByKey().mapValues(list).collect()
print("words_grouped example (group words by starting letter):", words_grouped)

words_grouped example (group words by starting letter): [('cat', [1]), ('car', [2]), ('dog', [3]), ('deer', [4]), ('elephant', [5, 20])]


## `reduceByKey` Function

In [18]:
# 6. reduceByKey
print("\n#####  6. reduceByKey ###")
print("Description: Merge the values for each key using an associative and commutative reduce function.")

pairs = [(1, 'a'),(1, '_a'), (2, 'b'), (2, '_b'), (3, 'c'), (4, 'd'), (5, 'e')]
pairs_rdd = sc.parallelize(pairs)


#####  6. reduceByKey ###
Description: Merge the values for each key using an associative and commutative reduce function.


In [19]:
# 01 Example: Sum values with the same key
simple_reduceByKey = pairs_rdd.reduceByKey(lambda x, y: x + y).collect()
print("01 reduceByKey example (sum values by key):", simple_reduceByKey)

01 reduceByKey example (sum values by key): [(1, 'a_a'), (2, 'b_b'), (3, 'c'), (4, 'd'), (5, 'e')]


In [20]:
# example_Example: Count the occurrences of each word in a list
word_list = ["cat", "cat", "dog", "elephant", "dog", "dog"]
word_pairs_rdd = sc.parallelize(word_list).map(lambda word: (word, 1))
example__reduceByKey = word_pairs_rdd.reduceByKey(lambda x, y: x + y).collect()
print("example_ reduceByKey example (word count):", example__reduceByKey)

example_ reduceByKey example (word count): [('cat', 2), ('dog', 3), ('elephant', 1)]


## `join` Function

In [21]:
# 7. join
print("\n#####  7. join ###")
print("Description: Perform an inner join of this RDD and another one.")


#####  7. join ###
Description: Perform an inner join of this RDD and another one.


In [22]:
# 01 Example: Join two RDDs by key
fruits = sc.parallelize([(1, "apple"), (2, "banana")])
colors = sc.parallelize([(1, "red"), (2, "yellow")])
fruits_color_join = fruits.join(colors).collect()
print("01 join fruits_color_join (join two RDDs):", fruits_color_join)

01 join fruits_color_join (join two RDDs): [(2, ('banana', 'yellow')), (1, ('apple', 'red'))]


In [23]:
# example_Example: Join employee data with department data
employees = sc.parallelize([(1, "John"), (2, "Jane"), (3, "Joe")])
departments = sc.parallelize([(1, "HR"), (2, "Finance")])
employees_department_join = employees.join(departments).collect()
print("join example (employee-department join):", employees_department_join)

join example (employee-department join): [(2, ('Jane', 'Finance')), (1, ('John', 'HR'))]


## `cogroup` Function

In [24]:
# 8. cogroup
# The cogroup function in PySpark is used to group data from two RDDs that share the same key. 
# It combines the values of matching keys from both RDDs into a tuple of lists.
print("\n#####  8. cogroup ###")
print("Description: Group data from two RDDs sharing the same key.")


#####  8. cogroup ###
Description: Group data from two RDDs sharing the same key.


In [25]:
# 01 Example: Cogroup two RDDs
fruits_rdd = sc.parallelize([(1, "apple"), (2, "banana"), (3, "orange")])
colors_rdd = sc.parallelize([(1, "red"), (2, "yellow")])
cogrouped_fruits_colors = fruits_rdd.cogroup(colors_rdd).mapValues(lambda x: (list(x[0]), list(x[1]))).collect()
print("01 cogroup example (group two RDDs):", cogrouped_fruits_colors)

01 cogroup example (group two RDDs): [(2, (['banana'], ['yellow'])), (1, (['apple'], ['red'])), (3, (['orange'], []))]


In [26]:
# example_Example: Cogroup sales data with target data
sales_rdd = sc.parallelize([("store1", 100), ("store2", 200)])
targets_rdd = sc.parallelize([("store1", 150), ("store3", 250)])
cogrouped_sales_targets = sales_rdd.cogroup(targets_rdd).mapValues(lambda x: (list(x[0]), list(x[1]))).collect()
print("example_cogroup example (sales-targets cogroup):", cogrouped_sales_targets)

example_cogroup example (sales-targets cogroup): [('store1', ([100], [150])), ('store2', ([200], [])), ('store3', ([], [250]))]


## `distinct` Function

In [27]:
# 9. distinct
print("\n#####  9. distinct ###")
print("Description: Return a new RDD containing the distinct elements in this RDD.")


#####  9. distinct ###
Description: Return a new RDD containing the distinct elements in this RDD.


In [28]:
# example_Example: Unique words from a list of words
words = ["cat", "dog", "cat", "elephant", "dog"]
words_rdd = sc.parallelize(words)
example__distinct = words_rdd.distinct().collect()
print("example_distinct example (unique words):", example__distinct)

example_distinct example (unique words): ['cat', 'dog', 'elephant']


# `repartition` Vs. `coalesce` Function

In [29]:
from pyspark.sql.functions import col, expr
from pyspark.sql import SparkSession
import random
import string
from datetime import datetime, timedelta

# Create SparkSession
spark = SparkSession.builder.master("local").appName("ExampleRepartitionApp").getOrCreate()

In [30]:
# Function to generate random log entry
def generate_log_entry():
  user_id = ''.join(random.choices(string.ascii_lowercase + string.digits, k=8))
  action = random.choice(["login", "logout", "purchase", "click", "view"])
  item_id = ''.join(random.choices(string.ascii_uppercase + string.digits, k=5))
  timestamp = (datetime.now() - timedelta(seconds=random.randint(0, 2592000))).strftime("%Y-%m-%d %H:%M:%S")
  return (user_id, action, item_id, timestamp)

# Generate synthetic data
log_entries = [generate_log_entry() for _ in range(1000000)]

In [31]:
# Create DataFrame
columns = ["user_id", "action", "item_id", "timestamp"]
log_df = spark.createDataFrame(log_entries, columns)

# Show sample data
log_df.show(10, truncate=False)

# Save to a CSV file in the DBFS (Databricks File System)
log_df.write.csv("/tmp/user_logs", header=True, mode="overwrite")

+--------+--------+-------+-------------------+
|user_id |action  |item_id|timestamp          |
+--------+--------+-------+-------------------+
|fc7whgyp|click   |WLBUP  |2025-02-24 03:26:10|
|y469l9hj|click   |ZA3NH  |2025-02-17 23:10:13|
|84fnr2n7|logout  |ISN1Z  |2025-03-04 21:02:32|
|7ar2tfru|purchase|5W457  |2025-02-25 02:59:44|
|c206wjye|click   |N8UFV  |2025-03-04 02:18:55|
|7ts0eeoz|purchase|DLBPQ  |2025-02-21 04:53:16|
|ohk520m1|logout  |U86YZ  |2025-03-09 17:38:08|
|z3gk6mty|view    |VKY9M  |2025-03-01 17:35:08|
|6nfe46hf|login   |BOZ8L  |2025-03-03 17:42:15|
|uxv1jeoh|click   |XVV9K  |2025-03-04 14:29:58|
+--------+--------+-------+-------------------+
only showing top 10 rows



## `repartition` Function

In [32]:
# 10. repartition
#https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L480
print("\n### 10. repartition ###")
print("Description: Return a new RDD that has exactly numPartitions partitions.")


### 10. repartition ###
Description: Return a new RDD that has exactly numPartitions partitions.


In [33]:
logs_rdd = sc.textFile("/tmp/user_logs")

# Initial number of partitions
initial_partitions = logs_rdd.getNumPartitions()
print(f"Initial Partitions: {initial_partitions}")

Initial Partitions: 2


In [34]:
# Repartition to 100 partitions
repartitioned_rdd = logs_rdd.repartition(100)
new_partitions = repartitioned_rdd.getNumPartitions()
print(f"New Partitions after Repartition: {new_partitions}")

New Partitions after Repartition: 100


## `coalesce` Function

In [35]:
# 11. coalesce
print("\n### 11. coalesce ###")
print("Description: Return a new RDD that is reduced into numPartitions partitions.")
#https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L506
#https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala


### 11. coalesce ###
Description: Return a new RDD that is reduced into numPartitions partitions.


In [36]:
# Initial number of partitions
initial_partitions = logs_rdd.getNumPartitions()
print(f"Initial Partitions: {initial_partitions}")

Initial Partitions: 2


In [37]:
# Coalesce to 4 partitions
coalesced_rdd_1 = logs_rdd.coalesce(1)
new_partitions_1 = coalesced_rdd_1.getNumPartitions()
print(f"new_partitions_1 after Coalesce: {new_partitions_1}")

new_partitions_1 after Coalesce: 1


In [38]:
# Coalesce to 50 partitions
coalesced_rdd_2 = logs_rdd.coalesce(10)
new_partitions_2 = coalesced_rdd_2.getNumPartitions()
print(f"new_partitions_2 after Coalesce: {new_partitions_2}")

new_partitions_2 after Coalesce: 2
