# Q2) Create two RDD (rdd1 and rdd2) and perform the following transformations and call collect() to see the output after each transformation:

## map(), filter(), distinct(), sample(), union(), intersection(), subtract(), cartesian() 

In [2]:
from pyspark import SparkContext

sc.stop()
# Create a SparkContext (only once per application)
sc = SparkContext("local", "RDD transformations and actions")

# Create RDD1
rdd1 = sc.parallelize([1, 2, 3, 4, 5])

# Create RDD2
rdd2 = sc.parallelize([4, 5, 6, 7, 8])

print("Initially RDD1 and RDD2\n")
print("RDD1 Elements: ", rdd1.collect())
print("RDD2 Elements: ", rdd2.collect())

Initially RDD1 and RDD2

RDD1 Elements:  [1, 2, 3, 4, 5]
RDD2 Elements:  [4, 5, 6, 7, 8]


In [3]:
# Transformations
# map(): Square each element
rdd1_squared = rdd1.map(lambda x: x ** 2)
print("RDD1 Squared: ", rdd1_squared.collect())

RDD1 Squared:  [1, 4, 9, 16, 25]


In [4]:
# filter(): Keep even numbers
rdd1_even = rdd1.filter(lambda x: x % 2 == 0)
print("RDD1 Even Numbers: ", rdd1_even.collect())

RDD1 Even Numbers:  [2, 4]


In [5]:
# distinct(): Remove duplicate elements
rdd2_distinct = rdd2.distinct()
print("RDD2 Distinct: ", rdd2_distinct.collect())

RDD2 Distinct:  [4, 5, 6, 7, 8]


In [6]:
# sample(): Take a random sample with replacement
rdd1_sampled = rdd1.sample(withReplacement=True, fraction=0.5, seed=42)
print("RDD1 Sampled: ", rdd1_sampled.collect())

RDD1 Sampled:  []


In [7]:
# union(): Merge two RDDs
rdd_union = rdd1.union(rdd2)
print("Union of RDD1 and RDD2: ", rdd_union.collect())

Union of RDD1 and RDD2:  [1, 2, 3, 4, 5, 4, 5, 6, 7, 8]


In [8]:
# intersection(): Find common elements between RDDs
rdd_intersection = rdd1.intersection(rdd2)
print("Intersection of RDD1 and RDD2: ", rdd_intersection.collect())

Intersection of RDD1 and RDD2:  [4, 5]


In [9]:
# subtract(): Remove elements from RDD1 that are present in RDD2
rdd_subtracted = rdd1.subtract(rdd2)
print("RDD1 - RDD2: ", rdd_subtracted.collect())

RDD1 - RDD2:  [2, 1, 3]


In [10]:
# cartesian(): Generate Cartesian product of RDD1 and RDD2
rdd_cartesian = rdd1.cartesian(rdd2)
print("Cartesian product of RDD1 and RDD2: ", rdd_cartesian.collect())

Cartesian product of RDD1 and RDD2:  [(1, 4), (1, 5), (1, 6), (1, 7), (1, 8), (2, 4), (2, 5), (2, 6), (2, 7), (2, 8), (3, 4), (3, 5), (3, 6), (3, 7), (3, 8), (4, 4), (4, 5), (4, 6), (4, 7), (4, 8), (5, 4), (5, 5), (5, 6), (5, 7), (5, 8)]


# perform the following actions and call collect() to see the output after each transformation:

## collect(), count(), countByValue(), aggregate(), foreach, take(), top(), takeordered(num)(ordering), reduce().

In [11]:
# collect(): Retrieve all elements of RDD
print("RDD1 Elements: ", rdd1.collect())
print("RDD2 Elements: ", rdd2.collect())

RDD1 Elements:  [1, 2, 3, 4, 5]
RDD2 Elements:  [4, 5, 6, 7, 8]


In [12]:
# count(): Count the number of elements in RDD
print("Count of RDD1: ", rdd1.count())

Count of RDD1:  5


In [13]:
# countByValue(): Count occurrences of each element in RDD
print("Count by value in RDD1: ", rdd1.countByValue())

Count by value in RDD1:  defaultdict(<class 'int'>, {1: 1, 2: 1, 3: 1, 4: 1, 5: 1})


In [14]:
# aggregate(): Perform a custom aggregation operation
aggregated_value = rdd1.aggregate(0, lambda x, y: x + y, lambda x, y: x + y)
print("Aggregated value of RDD1: ", aggregated_value)

Aggregated value of RDD1:  15


In [17]:
# foreach(): Apply a function to each element (prints each element in this example)
def p(x): 
    print(x)
rdd1.foreach(p)

In [19]:
# take(): Retrieve the first n elements from RDD
print("First two elements of RDD1: ", rdd1.take(2))

First two elements of RDD1:  [1, 2]


In [20]:
# top(): Retrieve the top n elements from RDD
print("Top two elements of RDD1: ", rdd1.top(2))

Top two elements of RDD1:  [5, 4]


In [21]:
# takeOrdered(): Retrieve the first n elements based on ordering
print("Top three elements of RDD1 (ordered): ", rdd1.takeOrdered(3))

Top three elements of RDD1 (ordered):  [1, 2, 3]


In [22]:
# reduce(): Perform a reduction operation on RDD
reduced_value = rdd1.reduce(lambda x, y: x + y)
print("Reduced value of RDD1: ", reduced_value)

Reduced value of RDD1:  15
