In [1]:
#Set the Pyspark Environment variables
# Set the PySpark environment variables
import os
os.environ['SPARK_HOME'] = "/Users/nistharaj/Desktop/Spark"
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'
os.environ['PYSPARK_PYTHON'] = 'python'

In [2]:
from pyspark.sql import SparkSession


In [3]:
# Create a SparkSession
spark = SparkSession.builder.appName("RDD-Demo").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/14 13:25:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Advantages of RDDs 

1. Backbone of Data Processing in Spark.
2. Distributed, fault-tolerant and parallelizable data structure.
3. Efficiently processes large datasets across a cluster
4. Key Characteristics: immutable, distributed, resilient, lazily evaluated, fault tolerant.

In [4]:
numbers = [1, 2, 3, 4, 5]
rdd = spark.sparkContext.parallelize(numbers)

In [5]:
# Collect action: Retrieve all elements of the RDD
rdd.collect()

[1, 2, 3, 4, 5]

In [6]:
# Create an RDD from a list of tuples
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35), ("Alice", 40)]
rdd = spark.sparkContext.parallelize(data)

RDD Operations: Actions

1. Returns results or perform actions in RDD, triggering execution.
2. Eager evaluation data movements/computation
3. Examples: Collect,count,first,take,save,foreach

In [7]:
# Count action: Count the number of elements in the RDD
count = rdd.count()
print("The total number of elements in rdd: ", count)

The total number of elements in rdd:  4


[Stage 1:>                                                          (0 + 8) / 8]                                                                                

In [8]:
# First action: Retrieve the first element of the RDD
first_element = rdd.first()
print("The first element of the rdd: ", first_element)

The first element of the rdd:  ('Alice', 25)


In [9]:
# Take action: Retrieve the n elements of the RDD
taken_elements = rdd.take(2)
print("The first two elements of the rdd: ", taken_elements)

The first two elements of the rdd:  [('Alice', 25), ('Bob', 30)]


In [10]:
# Foreach action: Print each element of the RDD
rdd.foreach(lambda x:print(x))
#rdd.foreach(lambda x: print(x))

('Alice', 25)
('Charlie', 35)
('Bob', 30)
('Alice', 40)


RDD Operations: Transformations

1. Create new RDDs by applying computation/manipulation
2. Lazy evaluation, lineage graph
3. Examples:map,filter,flatMap,reduceByKey,SortBy,Join

In [11]:
# Map transformation: Convert name to uppercase
mapped_rdd = rdd.map(lambda x: (x[0].upper(), x[1]))
#mapped_rdd = rdd.map(lambda x: (x[0].upper(), x[2]))

In [12]:
result = mapped_rdd.collect()
#result = mapped_rdd.collect()
print("rdd with uppercease name: ", result)

rdd with uppercease name:  [('ALICE', 25), ('BOB', 30), ('CHARLIE', 35), ('ALICE', 40)]


In [13]:
# Filter transformation: Filter records where age is greater than 30
filtered_rdd = rdd.filter(lambda x:x[1] > 30)
filtered_rdd.collect()

[('Charlie', 35), ('Alice', 40)]

In [14]:
# ReduceByKey transformation: Calculate the total age for each name
reduced_rdd = rdd.reduceByKey(lambda x, y: x+y)
reduced_rdd.collect()

[('Charlie', 35), ('Alice', 65), ('Bob', 30)]

In [15]:
# SortBy transformation: Sort the RDD by age in descending order
sorted_rdd = rdd.sortBy(lambda x:x[1], ascending = False)
#sorted_rdd = rdd.sortBy(lambda x: x[1], ascending=False)
sorted_rdd.collect()

[('Alice', 40), ('Charlie', 35), ('Bob', 30), ('Alice', 25)]

Save RDDs to text file and read RDDs from text file


In [16]:
# Save action: Save the RDD to a text file
rdd.saveAsTextFile("output.txt")

In [17]:
# create rdd from text file
rdd_text = spark.sparkContext.textFile("output.txt")
rdd_text.collect()

["('Charlie', 35)", "('Bob', 30)", "('Alice', 25)", "('Alice', 40)"]

Shut down Spark Session


In [18]:
spark.stop()