In [13]:
import os
os.environ["SPARK_HOME"] = "/opt/spark"
os.environ["PYSPARK_DRIVER_PYTHON"] = "jupyter"
os.environ["PYSPARK_DRIVER_PYTHON_OPTS"] = "lab"
os.environ["PYSPARK_PYTHON"] = "python"

In [14]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("RDD-Demo").getOrCreate()

## Create RDDS

In [15]:
numbers = [1, 2, 3, 4, 5, 6, 7]
rdd = spark.sparkContext.parallelize(numbers)

In [16]:
rdd.collect() # get all elements from RDD

[1, 2, 3, 4, 5, 6, 7]

In [17]:
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35), ("Alice", 40)]
rdd = spark.sparkContext.parallelize(data)

In [18]:
rdd.collect()

[('Alice', 25), ('Bob', 30), ('Charlie', 35), ('Alice', 40)]

## RDDs Operations: Actions

In [19]:
count = rdd.count()
print("Total number of elemenets: ", count)



Total number of elemenets:  4


                                                                                

In [20]:
first = rdd.first()

first

('Alice', 25)

In [22]:
taken_elements = rdd.take(2)

taken_elements

[('Alice', 25), ('Bob', 30)]

In [27]:
rdd.foreach(lambda x: print(x[0].upper()))

CHARLIE
BOB
ALICE
ALICE


## RDDs Operations: Transformations

In [28]:
mapped_rdd = rdd.map(lambda x: (x[0].upper(), x[1]))


In [29]:
result = mapped_rdd.collect()

result

[('ALICE', 25), ('BOB', 30), ('CHARLIE', 35), ('ALICE', 40)]

In [30]:
filtered_rdd = rdd.filter(lambda x: x[1] > 30)
filtered_rdd.collect()

[('Charlie', 35), ('Alice', 40)]

In [31]:
reduced_rdd = rdd.reduceByKey(lambda x, y: x + y)
reduced_rdd.collect()

[('Alice', 65), ('Bob', 30), ('Charlie', 35)]

In [32]:
sorted_rdd = rdd.sortBy(lambda x: x[1], ascending=False)
sorted_rdd.collect()

[('Alice', 40), ('Charlie', 35), ('Bob', 30), ('Alice', 25)]

## Save RDDs to text file and read RDDs from text file

In [33]:
rdd.saveAsTextFile("output.txt")

In [35]:
rdd_text = spark.sparkContext.textFile("output.txt")
rdd_text.collect()

["('Alice', 40)", "('Alice', 25)", "('Bob', 30)", "('Charlie', 35)"]

In [36]:
spark.stop()