In [1]:
import pyspark.sql

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName("RDD_Functions_Demo").master("local[*]").getOrCreate()

In [4]:
sc = spark.sparkContext

In [13]:
# Sample data
data = [("Alice", 23), ("Bob", 45), ("Cathy", 29), ("Alice", 30), ("Bob", 32), ("Alice", 30)]

In [14]:
# Step 1: Create an RDD
rdd = sc.parallelize(data)
print("Original RDD:")
print(rdd.collect())

Original RDD:
[('Alice', 23), ('Bob', 45), ('Cathy', 29), ('Alice', 30), ('Bob', 32), ('Alice', 30)]


In [15]:
# Create an RDD
rdd = sc.parallelize(data)

In [16]:
# 1. map(): Transform each element
mapped_rdd = rdd.map(lambda x: (x[0], x[1] + 5))  # Add 5 to each age
print("Mapped RDD:", mapped_rdd.collect())

Mapped RDD: [('Alice', 28), ('Bob', 50), ('Cathy', 34), ('Alice', 35), ('Bob', 37), ('Alice', 35)]


In [17]:
# 2. filter(): Filter elements based on a condition
filtered_rdd = rdd.filter(lambda x: x[1] > 30)  # Keep elements where age > 30
print("Filtered RDD:", filtered_rdd.collect())

Filtered RDD: [('Bob', 45), ('Bob', 32)]


In [18]:
# 3. flatMap(): Flatten the results
flat_mapped_rdd = rdd.flatMap(lambda x: [x[0], x[1]])  # Split into individual items
print("FlatMapped RDD:", flat_mapped_rdd.collect())

FlatMapped RDD: ['Alice', 23, 'Bob', 45, 'Cathy', 29, 'Alice', 30, 'Bob', 32, 'Alice', 30]


In [19]:
# 4. distinct(): Remove duplicates
distinct_rdd = rdd.distinct()
print("Distinct RDD:", distinct_rdd.collect())


Distinct RDD: [('Alice', 23), ('Bob', 45), ('Cathy', 29), ('Alice', 30), ('Bob', 32)]


In [20]:
# 5. groupByKey(): Group values by key
grouped_rdd = rdd.groupByKey().mapValues(list)
print("Grouped by Key RDD:", grouped_rdd.collect())

Grouped by Key RDD: [('Alice', [23, 30, 30]), ('Bob', [45, 32]), ('Cathy', [29])]


In [21]:
# 6. reduceByKey(): Aggregate values by key
reduced_rdd = rdd.reduceByKey(lambda x, y: x + y)  # Sum the values for each key
print("Reduced by Key RDD:", reduced_rdd.collect())

Reduced by Key RDD: [('Alice', 83), ('Bob', 77), ('Cathy', 29)]


In [22]:
# 7. sortByKey(): Sort by key
sorted_rdd = rdd.map(lambda x: (x[1], x[0])).sortByKey()  # Sort by age
print("Sorted by Key RDD:", sorted_rdd.collect())

Sorted by Key RDD: [(23, 'Alice'), (29, 'Cathy'), (30, 'Alice'), (30, 'Alice'), (32, 'Bob'), (45, 'Bob')]


In [23]:
# 8. union(): Combine two RDDs
rdd2 = sc.parallelize([("Eve", 28), ("Frank", 33)])
union_rdd = rdd.union(rdd2)
print("Union RDD:", union_rdd.collect())

Union RDD: [('Alice', 23), ('Bob', 45), ('Cathy', 29), ('Alice', 30), ('Bob', 32), ('Alice', 30), ('Eve', 28), ('Frank', 33)]


In [26]:
# 9. intersection(): Common elements in two RDDs
rdd3 = sc.parallelize([("Jake", 30), ("Cathy", 29)])
intersection_rdd = rdd.intersection(rdd3)
print("Intersection RDD:", intersection_rdd.collect())

Intersection RDD: [('Cathy', 29)]


In [27]:
# 10. subtract(): Elements in RDD1 but not in RDD2
subtract_rdd = rdd.subtract(rdd2)
print("Subtract RDD:", subtract_rdd.collect())

Subtract RDD: [('Alice', 23), ('Bob', 45), ('Cathy', 29), ('Alice', 30), ('Alice', 30), ('Bob', 32)]


In [28]:
# 11. cartesian(): Cartesian product of two RDDs
cartesian_rdd = rdd.cartesian(rdd2)
print("Cartesian RDD:", cartesian_rdd.collect())

Cartesian RDD: [(('Alice', 23), ('Eve', 28)), (('Bob', 45), ('Eve', 28)), (('Cathy', 29), ('Eve', 28)), (('Alice', 23), ('Frank', 33)), (('Bob', 45), ('Frank', 33)), (('Cathy', 29), ('Frank', 33)), (('Alice', 30), ('Eve', 28)), (('Bob', 32), ('Eve', 28)), (('Alice', 30), ('Eve', 28)), (('Alice', 30), ('Frank', 33)), (('Bob', 32), ('Frank', 33)), (('Alice', 30), ('Frank', 33))]


In [29]:
# 12. coalesce(): Reduce the number of partitions
coalesced_rdd = rdd.coalesce(1)
print("Number of Partitions after Coalesce:", coalesced_rdd.getNumPartitions())

Number of Partitions after Coalesce: 1


In [30]:
# 13. repartition(): Increase the number of partitions
repartitioned_rdd = rdd.repartition(4)
print("Number of Partitions after Repartition:", repartitioned_rdd.getNumPartitions())


Number of Partitions after Repartition: 4


# **Actions**

In [31]:
# 14. collect(): Return all elements as a list
print("Collected RDD:", rdd.collect())

Collected RDD: [('Alice', 23), ('Bob', 45), ('Cathy', 29), ('Alice', 30), ('Bob', 32), ('Alice', 30)]


In [32]:
# 15. count(): Count the number of elements
print("Count of elements:", rdd.count())

Count of elements: 6


In [33]:
# 16. first(): Get the first element
print("First element:", rdd.first())


First element: ('Alice', 23)


In [34]:
# 17. take(): Get the first n elements
print("Take first 3 elements:", rdd.take(3))



Take first 3 elements: [('Alice', 23), ('Bob', 45), ('Cathy', 29)]


In [35]:
# 18. reduce(): Aggregate all elements
sum_of_ages = rdd.map(lambda x: x[1]).reduce(lambda x, y: x + y)
print("Sum of ages:", sum_of_ages)

Sum of ages: 189


In [40]:
# Define the function to print each element
def print_element(x):
    print(x)

# Collect the data from RDD to the driver and print each element
rdd_contents = rdd.collect()
for element in rdd_contents:
    print_element(element)

('Alice', 23)
('Bob', 45)
('Cathy', 29)
('Alice', 30)
('Bob', 32)
('Alice', 30)


In [42]:
# Stop the SparkContext
sc.stop()