In [None]:
import os

os.environ["SPARK_HOME"] = "C:/Program Files/Spark"
os.environ["PYSPARK_DRIVER_PYTHON"] = "jupyter"
os.environ["PYSPARK_DRIVER_PYTHON_OPTS"] = "lab"
os.environ["PYSPARK_PYTHON"] = (
    "C:/Users//AppData/Local/Programs/Python/Python310/python.exe"
)

In [None]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = (
    SparkSession.builder.appName("RDD-Demo").getOrCreate()
)

In [3]:
numbers = [1, 2, 3, 4, 5]
rdd = spark.sparkContext.parallelize(numbers)

In [4]:
# Everything is actions or transformations

# Collect action
rdd.collect()
# Transformation 1: Double each number
doubled_rdd = rdd.map(lambda x: x * 2)
print(doubled_rdd.collect())  # Output: [2, 4, 6, 8, 10]

# Transformation 2: Sum the numbers
sum_result = rdd.reduce(lambda x, y: x + y)
print(sum_result)  # Output: 15

[2, 4, 6, 8, 10]
15


In [5]:
# Create an RDD from a list of tuples
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35), ("Alice", 40)]
rdd = spark.sparkContext.parallelize(data)

In [6]:
# Collect action: Retrieve all elements of the RDD
print("All elements of the rdd: ", rdd.collect())

All elements of the rdd:  [('Alice', 25), ('Bob', 30), ('Charlie', 35), ('Alice', 40)]


RDDs Operation: Actions

In [7]:
# Count action: Count the number of elements in the RDD
count = rdd.count()
print("The total number of elements in rdd: ", count)

The total number of elements in rdd:  4


In [8]:
# First action: Retrieve the first element of the RDD
first_element = rdd.first()
print("The first element of the rdd: ", first_element)

The first element of the rdd:  ('Alice', 25)


In [9]:
# Foreach action: Print each element of the RDD
# The print statements are executed on the worker nodes in the cluster, 
# but their outputs are typically not captured by the driver, and you won’t see them in the driver’s console.
rdd.foreach(lambda x: print(x))

RDDs Operation: Transformations

In [10]:
# Map transformation: Convert name to uppercase
mapped_rdd = rdd.map(lambda x: (x[0].upper(), x[1]))
result = mapped_rdd.collect()
print("rdd with uppercase name: ", result)

rdd with uppercase name:  [('ALICE', 25), ('BOB', 30), ('CHARLIE', 35), ('ALICE', 40)]


In [11]:
# Filter transformation: Filter records where age is greater than 30
filtered_rdd = rdd.filter(lambda x: x[1] > 30)
filtered_rdd.collect()

[('Charlie', 35), ('Alice', 40)]

In [12]:
# ReduceByKey transformation: Calculate the total age for each name
reduced_rdd = rdd.reduceByKey(lambda x, y: x + y)
reduced_rdd.collect()

[('Charlie', 35), ('Bob', 30), ('Alice', 65)]

In [13]:
# SortBy transformation: Sort the RDD by age in descending order
sorted_rdd = rdd.sortBy(lambda x: x[1], ascending=False)
sorted_rdd.collect()

[('Alice', 40), ('Charlie', 35), ('Bob', 30), ('Alice', 25)]

Save RDDs to text file and read RDDs from text file

In [None]:
# Save action: Save the RDD to a text file
rdd.saveAsTextFile("output.txt")

In [None]:
# create rdd from text file
rdd_text = spark.sparkContext.textFile("output.txt")
rdd_text.collect()

Shut down Spark Session

In [None]:
spark.stop()