# Aggregate in RDD

In [1]:
from pyspark import SparkConf, SparkContext

# Create a Spark configuration
conf = SparkConf().setAppName("RDDAggregateExample").setMaster("local[*]")

# Create a Spark context
sc = SparkContext(conf=conf)

24/03/21 07:26:56 WARN Utils: Your hostname, vivoBook resolves to a loopback address: 127.0.1.1; using 192.168.1.4 instead (on interface wlan0)
24/03/21 07:26:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/21 07:26:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Create an RDD of integers
numbers_rdd = sc.parallelize([1, 2, 3, 4, 5])
# print rdd value
numbers_rdd.collect()

                                                                                

[1, 2, 3, 4, 5]

In [3]:
# Define the initial value for aggregation
initial_value = (0, 0)  # (sum, count)


In [4]:
# Define the aggregation function
def seq_op(acc, value):
    sum_, count = acc
    return sum_ + value, count + 1


In [5]:
# Define the combining function
def comb_op(acc1, acc2):
    sum1, count1 = acc1
    sum2, count2 = acc2
    return sum1 + sum2, count1 + count2


In [6]:
# Use the aggregate function to compute sum and count
result = numbers_rdd.aggregate(initial_value, seq_op, comb_op)

# Print the result
print(f"Sum: {result[0]}, Count: {result[1]}")

[Stage 1:>                                                        (0 + 12) / 12]

Sum: 15, Count: 5


                                                                                

# Cache()

In [7]:
# Persist this RDD with the default storage level (MEMORY_ONLY).
# step-1: create a rdd
numbers_rdd= sc.parallelize(range(1,1000))
print(numbers_rdd.collect())
# cache the RDD in memory
numbers_rdd.cache()
# perform some operation in RDD (e.g count and Sum)
count = numbers_rdd.count()
total_sum = numbers_rdd.reduce(lambda x,y:x+y)
print(f"count: {count}")
print(f"sum: {total_sum}")



                                                                                

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 22

                                                                                

count: 999
sum: 499500


                                                                                

In [8]:
# check if the above rdd is cached ?
print(numbers_rdd.is_cached)
# see where this rdd is cached, whether in memory or disk?
print(numbers_rdd.getStorageLevel())

True
Memory Serialized 1x Replicated


# coalesce()
 - `coalesce` method on an RDD to  only **reduce**  the number of partitions. 

In [9]:
# lets create a rdd with 4 partitions
original_rdd = sc.parallelize([1,2,3,4,5,6,7,8],4)
# check the no of partition
print("number of partition",original_rdd.getNumPartitions())
# coalesce the rdd into 2 partitions
coalesce_rdd = original_rdd.coalesce(2)
# check the number of partition after coalesce
print("no of partition after coalesce",coalesce_rdd.getNumPartitions()) 
# collect and print rdd
print(coalesce_rdd.collect())

number of partition 4
no of partition after coalesce 2
[1, 2, 3, 4, 5, 6, 7, 8]


# repartition()
- `repartiton can be used to either increase or decrease the number of partitions in RDD

In [10]:
# Create an RDD with 4 partitions
original_rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8], 4)
# Get the number of partitions in the original RDD
num_partitions_before = original_rdd.getNumPartitions()
print("Number of partitions before repartition:", num_partitions_before)
# lets increase the no of partiton to 6
increased_partition_rdd = original_rdd.repartition(6)
# check the no of partition in new RDD.
print("new no of partitions",increased_partition_rdd.getNumPartitions())
# lets decrease the no of partiton to 2
decreased_no_partitions = original_rdd.repartition(2)
print("no of partition",decreased_no_partitions.getNumPartitions())

Number of partitions before repartition: 4
new no of partitions 6
no of partition 2


# Count()
- it is used to count the no of elements in the RDD

In [11]:
# Create an RDD with some elements
rdd = sc.parallelize([1, 2, 3, 4, 5])

# Count the number of elements in the RDD using count()
element_count = rdd.count()
# Print the total count of elements in the RDD
print("Total count of elements in the RDD:", element_count)




Total count of elements in the RDD: 5


                                                                                

# countByKey()
- the countByKey() method is used specifically on an RDD of key-value pairs to count the occurrences of each unique key in the RDD.

In [15]:
# Create an RDD of key-value pairs
rdd = sc.parallelize([( 'apple',1), ( 'banana',1), ( 'cherry',1), ( 'apple',2), ( 'orange',4)])
# Count the occurrences of each key in the RDD using countByKey()
key_counts = rdd.countByKey()
# print the count dict
print(dict(key_counts))

{'apple': 2, 'banana': 1, 'cherry': 1, 'orange': 1}


# countByValue()
-  countByValue() method is used to count the occurrences of each unique element in an RDD. This method is typically applied to RDDs containing non-key-value pairs

In [16]:
# Create an RDD with some elements
rdd = sc.parallelize([1, 2, 3, 2, 1, 4, 5, 2, 1])

# Count the occurrences of each unique element in the RDD using countByValue()
element_counts = rdd.countByValue()
# Print the element counts
print(element_counts)

defaultdict(<class 'int'>, {1: 3, 2: 3, 3: 1, 4: 1, 5: 1})


# distinct()
- returns a new rdd with distinct elements from original rdd

In [17]:
# Create an RDD with duplicate elements
rdd = sc.parallelize([1, 2, 3, 2, 1, 4, 5, 2, 1])
# print the original rdd
print(rdd.collect())

# Get the distinct elements from the RDD using distinct()
distinct_rdd = rdd.distinct()

# Collect and print the distinct elements
print("Distinct elements in the RDD:")
print(distinct_rdd.collect())

[1, 2, 3, 2, 1, 4, 5, 2, 1]
Distinct elements in the RDD:




[1, 2, 3, 4, 5]


                                                                                

# filter()
- it is used to create new RDD containing the elements that satisfy the condition . you can relate it to `where` clause fo SQL.


In [18]:
# Create an RDD with some elements
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
# print original rdd
print(rdd.collect())
# Filter the RDD to keep only even numbers
filtered_rdd = rdd.filter(lambda x: x % 2 == 0)

# Collect and print the filtered elements
print("Filtered elements in the RDD (even numbers):")
print(filtered_rdd.collect())

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Filtered elements in the RDD (even numbers):
[2, 4, 6, 8, 10]


                                                                                

# first()
 - returns the first element in the RDD

In [19]:
# create an rdd with some elements
rdd = sc.parallelize([11,45,12,9,34])
# print the complete rdd
print(rdd.collect())
# print the first element of the rdd
print(rdd.first())

[11, 45, 12, 9, 34]
11


# map()
 - it is used to transform each element in an rdd using a specified function.

In [20]:
# Create an RDD with some elements
rdd = sc.parallelize([1, 2, 3, 4, 5])
# print the original rdd
print(rdd.collect())

# Map each element to its square using map()
squared_rdd = rdd.map(lambda x: x * x)
# Collect and print the transformed elements
print("Squared elements in the RDD:")
print(squared_rdd.collect())

# this can be done using function too instead of lambda
def sq_ele(x):
    return x*x
squared_rdd_by_func = rdd.map(sq_ele)
print(squared_rdd_by_func.collect())

[1, 2, 3, 4, 5]
Squared elements in the RDD:


                                                                                

[1, 4, 9, 16, 25]
[1, 4, 9, 16, 25]


# flatmap
- it maps each element in rdd using a specified function. the result is a flattened rdd containing all the transformed elements

In [21]:
# Create an RDD with some words in sentences
rdd = sc.parallelize(["Hello world", "Good morning", "How are you"])

# Apply map() to split each sentence into words
mapped_rdd = rdd.map(lambda line: line.split())

# Apply flatMap() to split each sentence into words and flatten the result
flat_mapped_rdd = rdd.flatMap(lambda line: line.split())

# Collect and print the results
print("Mapped elements (list of words) using map():")
print(mapped_rdd.collect())

print("\nFlattened elements (individual words) using flatMap():")
print(flat_mapped_rdd.collect())


Mapped elements (list of words) using map():
[['Hello', 'world'], ['Good', 'morning'], ['How', 'are', 'you']]

Flattened elements (individual words) using flatMap():
['Hello', 'world', 'Good', 'morning', 'How', 'are', 'you']


# foreach()
- applies a function to each element of the RDD
- unlike `map()` and `flatmap()` (bot of these are transformations), `foreach()` is an action.

In [23]:
# Create an RDD with some elements
rdd = sc.parallelize([1, 2, 3, 4, 5])

# Define a function to print each element
def print_element(element):
    print("Element:", element)

# Apply foreach() to print each element
rdd.foreach(print_element)

Element: 5
Element: 2
Element: 4
Element: 1
                                                                                

# groupBy()
- returns an RDD of grouped items


In [26]:
# create an RDD
rdd = sc.parallelize(["Tom", "Lenevo", "Anvisha",
    "John", "Jimmy", "Jacky",
    "John", "Jimmy", "Jimmy"])
# Using groupBy transformation to group elements based on their length
group_rdd = rdd.groupBy( lambda x: len(x))
# Iterate through the grouped RDD and print the key and grouped elements
print("Grouped elements in the RDD by length:")
for key, group in group_rdd.collect():
    print(f"Length: {key}, Grouped Elements: {list(group)}")


Grouped elements in the RDD by length:
Length: 3, Grouped Elements: ['Tom']
Length: 4, Grouped Elements: ['John', 'John']
Length: 5, Grouped Elements: ['Jimmy', 'Jacky', 'Jimmy', 'Jimmy']
Length: 6, Grouped Elements: ['Lenevo']
Length: 7, Grouped Elements: ['Anvisha']


                                                                                

# groupByKey()
- used to group elements of an RDD by a key. It is similar to the groupBy() method, but specifically designed for RDDs containing key-value pairs. 

In [30]:
# create an key-value rdd
rdd = sc.parallelize([("A",1),("A",3),("B",4),("B",2),("C",5)])
print(rdd.collect())
# lets group it by key
grouped_by_key = rdd.groupByKey()
# iterate through rdd and print the key and grouped elements
for key,group in grouped_by_key.collect():
    print(key,list(group))
# print(rdd.groupByKey().collect())

[('A', 1), ('A', 3), ('B', 4), ('B', 2), ('C', 5)]
B [4, 2]
C [5]
A [1, 3]


# persist()
- Set this RDD’s storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet. If no storage level is specified defaults to (MEMORY_ONLY).

In [32]:
# create a new rdd
rdd = sc.parallelize(["a","b","c"])
# persist it
rdd.persist()
# check if it is cached
print(rdd.is_cached)
# print the storage level of the rdd
print(rdd.getStorageLevel())

# lets unpersis it
rdd.unpersist()
print(rdd.is_cached)

True
Memory Serialized 1x Replicated
False


# cache()
- cache is shorthand for rdd.persist(StorageLevel.MEMORY_ONLY)

# reduceByKey()
-  merge the values for each key using  an associative and cumulative redue fucntion

In [33]:
# Create an RDD with key-value pairs
rdd = sc.parallelize([(1, 10), (2, 20), (1, 30), (3, 40), (2, 50)])

# Reduce values by key using reduceByKey()
reduced_rdd = rdd.reduceByKey(lambda x, y: x + y)

# Collect and print the reduced RDD
print("Reduced RDD:")
print(reduced_rdd.collect())


Reduced RDD:
[(1, 40), (2, 70), (3, 40)]


# textFile()
- reads a textfile to create an RDD

In [34]:
# lets read a texfile to create an RDD
rdd = sc.textFile('sample_rdd_text.txt')
# print the content of rdd
print(rdd.collect())

['Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Aenean commodo ligula eget dolor. Aenean massa. Cum sociis natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Donec quam felis, ultricies nec, pellentesque eu, pretium quis, sem. Nulla consequat massa quis enim. Donec pede justo, fringilla vel, aliquet nec, vulputate eget, arcu. In enim justo, rhoncus ut, imperdiet a, venenati']


# saveAsTextFile()
- saves the rdd to texfile


In [37]:
# Create an RDD with some data
data = ["Hello", "World", "Spark", "is", "awesome"]
rdd = sc.parallelize(data)

# lets save the rdd to a textfile
rdd.coalesce(1).saveAsTextFile("rdd_as_textfile")


# sortBy()
- sorts the RDD by given keyFunc


In [38]:
# Create an RDD with some data
data = [(1, 'apple'), (3, 'orange'), (2, 'banana'), (4, 'grape')]
rdd = sc.parallelize(data)

print(rdd.collect())
# Sort the RDD by the first element of each tuple (ascending order)
sorted_rdd_asc = rdd.sortBy(lambda x: x[0])

# Sort the RDD by the second element of each tuple (descending order)
sorted_rdd_desc = rdd.sortBy(lambda x: x[1], ascending=False)

# Collect and print the sorted RDDs
print("Sorted RDD (Ascending):")
print(sorted_rdd_asc.collect())

print("\nSorted RDD (Descending):")
print(sorted_rdd_desc.collect())

[(1, 'apple'), (3, 'orange'), (2, 'banana'), (4, 'grape')]
Sorted RDD (Ascending):
[(1, 'apple'), (2, 'banana'), (3, 'orange'), (4, 'grape')]

Sorted RDD (Descending):
[(3, 'orange'), (4, 'grape'), (2, 'banana'), (1, 'apple')]


# sortByKey()
- used to sort an RDD containing key-value pairs by their keys. This method is specifically designed for RDDs where each element is a tuple or pair, with the first element of the tuple being the key and the second element being the value

In [40]:
# Create an RDD with key-value pairs
data = [(3, 'orange'), (1, 'apple'), (2, 'banana'), (4, 'grape')]
rdd = sc.parallelize(data)
print("original_rdd",rdd.collect())

# Sort the RDD by keys using sortByKey()
sorted_rdd = rdd.sortByKey(ascending=True)

# Collect and print the sorted RDD
print("Sorted  ascending RDD by key:")
print(sorted_rdd.collect())
#  Sort the RDD by keys using sortByKey()
sorted_rdd = rdd.sortByKey(ascending=False)
print("Sorted  ascending RDD by key:")
print(sorted_rdd.collect())

original_rdd [(3, 'orange'), (1, 'apple'), (2, 'banana'), (4, 'grape')]
Sorted  ascending RDD by key:
[(1, 'apple'), (2, 'banana'), (3, 'orange'), (4, 'grape')]
Sorted  ascending RDD by key:
[(4, 'grape'), (3, 'orange'), (2, 'banana'), (1, 'apple')]
