In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("RDD-demo").getOrCreate()

### How to create RDDs

In [3]:
numbers = [1, 2, 3, 4, 5]
rdd = spark.sparkContext.parallelize(numbers)


[1, 2, 3, 4, 5]

In [4]:
# Collect action: retrieve all elements in the RDD
rdd.collect()

[1, 2, 3, 4, 5]

In [20]:
# Create an RDD from a list of tuples
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35), ("Alice", 40)]
rdd = spark.sparkContext.parallelize(data)
rdd.collect()

[('Alice', 25), ('Bob', 30), ('Charlie', 35), ('Alice', 40)]

### RDDs operation: Actions

In [8]:
# Count action: Returns the number of elements in the RDD
count = rdd.count()
print(f"The number of elements in rdd: {count}")

The number of elements in rdd: 3


In [9]:
# First action: Retrieve the first element of the RDD
first_element = rdd.first()
print(f"The first element in the RDD is: {first_element}")

The first element in the RDD is: ('Alice', 25)


In [10]:
# Take action: Retrieve the n elements of the RDD
n = 2
taken_elements = rdd.take(n)
print(f"The first {n} elements of the RDD is: {taken_elements}")

The first 2 elements of the RDD is: [('Alice', 25), ('Bob', 30)]


In [12]:
# ForEach action: Print each element in the RDD
rdd.foreach(lambda x: print(x))

### RDD Operations: Transformations

In [16]:
# Map transformation: Convert name to uppercase
mapped_rdd = rdd.map(lambda x: (x[0].upper(), x[1]))
results = mapped_rdd.collect()
print(f"RDD with uppercase names {results}")

RDD with uppercase names [('ALICE', 25), ('BOB', 30), ('CHARLIE', 35)]


In [21]:
# Filter transformation: Filter records where age is greater then 30
filtered_rdd = rdd.filter(lambda x: x[1] > 30)
filtered_rdd.collect()

[('Charlie', 35), ('Alice', 40)]

In [22]:
# ReduceByKey transformation: Calcualte the total age of each name
reduced_rdd = rdd.reduceByKey(lambda x, y: x + y)
reduced_rdd.collect()

[('Charlie', 35), ('Alice', 65), ('Bob', 30)]

In [24]:
# SortBy transformation: Sort the RDD by age in decending order
sorted_rdd = rdd.sortBy(lambda x: x[1], ascending=False)
sorted_rdd.collect()

[('Alice', 40), ('Charlie', 35), ('Bob', 30), ('Alice', 25)]

### Save RRDs to text file and read RDDs from text files

In [28]:
try:
    rdd.saveAsTextFile("ouput.txt")
except:
    pass


### Save RDDs to text file and read RDDs from text file

In [None]:
# Create rdd from text file
try:
    rdd_text = spark.sparkContext.textFile("output.txt")
except:
    pass
rdd_text.collect()

In [31]:
spark.stop()