In [1]:
import warnings
warnings.filterwarnings("ignore")

from pyspark.sql import SparkSession

In [2]:
# Create a SparkSession
spark = SparkSession.builder.appName("RDD_creation_and_operations").getOrCreate()

24/03/12 21:42:55 WARN Utils: Your hostname, dhiraj resolves to a loopback address: 127.0.1.1; using 192.168.10.66 instead (on interface wlo1)
24/03/12 21:42:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/12 21:42:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## RDD Creation

In [3]:
# Create an RDD from a list

numbers = [1, 2, 3, 4, 5]

rdd = spark.sparkContext.parallelize(numbers)

#### collect() action: Retrieve all elements of the RDD

In [4]:
rdd.collect()

[1, 2, 3, 4, 5]

In [5]:
# Create an RDD from a list of tuples

data = [("Alice", 25), ("Bob", 30), ("Charlie", 35), ("Alice", 40)]

rdd = spark.sparkContext.parallelize(data)

#### collect() action: Retrieve all elements of the RDD

In [6]:
print("All elements of the rdd: ", rdd.collect())

All elements of the rdd:  [('Alice', 25), ('Bob', 30), ('Charlie', 35), ('Alice', 40)]


## RDDs Operation: `Actions`

#### count() action: Count the number of elements in the RDD

In [7]:
count = rdd.count()

print("The total number of elements in rdd: ", count)

[Stage 2:>                                                          (0 + 4) / 4]

The total number of elements in rdd:  4


                                                                                

#### first() action: Retrieve the first element of the RDD

In [8]:
first_element = rdd.first()

print("The first element of the rdd: ", first_element)

The first element of the rdd:  ('Alice', 25)


#### take() action: Retrieve the n elements of the RDD

In [9]:
taken_elements = rdd.take(3)

print("The first two elements of the rdd: ", taken_elements)

The first two elements of the rdd:  [('Alice', 25), ('Bob', 30), ('Charlie', 35)]


#### foreach() action: allows to apply some functionality to each element of the RDD

In [10]:
rdd.foreach(lambda x: print(x))

('Alice', 40)
('Bob', 30)
('Alice', 25)
('Charlie', 35)


## RDDs Operation: `Transformations`

#### map() transformation: creating new RDD with uppercase from pre-existing RDD

In [11]:
mapped_rdd = rdd.map(lambda x: (x[0].upper(), x[1]))

In [12]:
result = mapped_rdd.collect()

print("rdd with uppercease name: ", result)

rdd with uppercease name:  [('ALICE', 25), ('BOB', 30), ('CHARLIE', 35), ('ALICE', 40)]


#### filter() transformation: Filtering records where age is greater than 30

In [13]:
filtered_rdd = rdd.filter(lambda x: x[1] > 30)

filtered_rdd.collect()

[('Charlie', 35), ('Alice', 40)]

#### reduceByKey() transformation (enables to perform aggregation/grouping on basis of Key)

#### calculating the total age for each key/name

In [14]:
reduced_rdd = rdd.reduceByKey(lambda x, y: x + y)

reduced_rdd.collect()

                                                                                

[('Bob', 30), ('Charlie', 35), ('Alice', 65)]

#### sortBy() transformation: 

#### sorting the RDD by age in descending order

In [15]:
sorted_rdd = rdd.sortBy(lambda x: x[1], ascending=False)

sorted_rdd.collect()

[('Alice', 40), ('Charlie', 35), ('Bob', 30), ('Alice', 25)]

### Save RDDs to text file

In [16]:
rdd.saveAsTextFile("./saved/saved_RDD.txt")

### Load RDD from a text file

In [17]:
rdd_text = spark.sparkContext.textFile("./saved/saved_RDD.txt")

rdd_text.collect()

["('Alice', 40)", "('Alice', 25)", "('Bob', 30)", "('Charlie', 35)"]

In [18]:
### Stoping/shutting down SparkSession
spark.stop()