# **RDDs with PySpark**

## **Start Coding with PySpark**

We’ll be working with data about students applying for college. We usually use PySpark for extremely large datasets, but it’s easier to see how functions work when we start with a smaller example.

The data is a list of tuples called student_data. Each tuple contains a name, an SAT score out of 1600, a GPA out of 100% (in decimals), and a state.

In [1]:
from pyspark.sql import SparkSession

In [2]:
student_data = [("Chris",1523,0.72,"CA"),
                ("Jake", 1555,0.83,"NY"),
                ("Cody", 1439,0.92,"CA"),
                ("Lisa",1442,0.81,"FL"),
                ("Daniel",1600,0.88,"TX"),
                ("Kelvin",1382,0.99,"FL"),
                ("Nancy",1442,0.74,"TX"),
                ("Pavel",1599,0.82,"NY"),
                ("Josh",1482,0.78,"CA"),
                ("Cynthia",1582,0.94,"CA")]
student_data

[('Chris', 1523, 0.72, 'CA'),
 ('Jake', 1555, 0.83, 'NY'),
 ('Cody', 1439, 0.92, 'CA'),
 ('Lisa', 1442, 0.81, 'FL'),
 ('Daniel', 1600, 0.88, 'TX'),
 ('Kelvin', 1382, 0.99, 'FL'),
 ('Nancy', 1442, 0.74, 'TX'),
 ('Pavel', 1599, 0.82, 'NY'),
 ('Josh', 1482, 0.78, 'CA'),
 ('Cynthia', 1582, 0.94, 'CA')]

In [3]:
# start a spark session
spark = SparkSession.builder.getOrCreate()
# confirm the session is built
print(spark)

<pyspark.sql.session.SparkSession object at 0x7facf482d120>


In [5]:
# change the student_data into a RDD with 5 partitions
student_rdd = spark.sparkContext.parallelize(student_data, 5)
# confirm the rdd contains the correct data
student_rdd.collect()

[('Chris', 1523, 0.72, 'CA'),
 ('Jake', 1555, 0.83, 'NY'),
 ('Cody', 1439, 0.92, 'CA'),
 ('Lisa', 1442, 0.81, 'FL'),
 ('Daniel', 1600, 0.88, 'TX'),
 ('Kelvin', 1382, 0.99, 'FL'),
 ('Nancy', 1442, 0.74, 'TX'),
 ('Pavel', 1599, 0.82, 'NY'),
 ('Josh', 1482, 0.78, 'CA'),
 ('Cynthia', 1582, 0.94, 'CA')]

In [6]:
# check the number of partitions
student_rdd.getNumPartitions()

5

## **Transformations**

In [7]:
# convert the third column from decimals to whole numbers (multiply by 100) and keep the other variables
rdd_transformation = student_rdd.map(lambda x: (x[0], x[1], x[2]*100, x[3]))
# confirm transformation is correct
rdd_transformation.collect()

[('Chris', 1523, 72.0, 'CA'),
 ('Jake', 1555, 83.0, 'NY'),
 ('Cody', 1439, 92.0, 'CA'),
 ('Lisa', 1442, 81.0, 'FL'),
 ('Daniel', 1600, 88.0, 'TX'),
 ('Kelvin', 1382, 99.0, 'FL'),
 ('Nancy', 1442, 74.0, 'TX'),
 ('Pavel', 1599, 82.0, 'NY'),
 ('Josh', 1482, 78.0, 'CA'),
 ('Cynthia', 1582, 94.0, 'CA')]

In [9]:
# filter to include just the rows with grades above 80
rdd_filtered = rdd_transformation.filter(lambda x: x[2] > 80)
# confirm transformation is correct
rdd_filtered.collect()

[('Jake', 1555, 83.0, 'NY'),
 ('Cody', 1439, 92.0, 'CA'),
 ('Lisa', 1442, 81.0, 'FL'),
 ('Daniel', 1600, 88.0, 'TX'),
 ('Kelvin', 1382, 99.0, 'FL'),
 ('Pavel', 1599, 82.0, 'NY'),
 ('Cynthia', 1582, 94.0, 'CA')]

## **Actions**

In [10]:
# view the first five elements
rdd_transformation.take(5)

[('Jake', 1555, 83.0, 'NY'),
 ('Cody', 1439, 92.0, 'CA'),
 ('Lisa', 1442, 81.0, 'FL'),
 ('Daniel', 1600, 88.0, 'TX'),
 ('Kelvin', 1382, 99.0, 'FL')]

In [11]:
# sum the grades
sum_gpa = rdd_transformation.map(lambda x: x[2]).reduce(lambda x,y: x+y)
sum_gpa

843.0

In [12]:
# calculate the average grade
sum_gpa / rdd_transformation.count()

84.3

## **Associative and Commutative Properties**

Only operations both commutative and associative can be applied with `reduce()`, but let's see this in practice.

Also, the very handy transformation `glom()` can be used to print how how the data is being partitioned and the results of the sum and division operations.

In [13]:
# notice how the sum never changes as the number of partitions grows
data = [1,2,3,4,5]
for i in range(1,5):
    rdd = spark.sparkContext.parallelize(data, i)
    print('partition: ', rdd.glom().collect())
    print('addition: ', rdd.reduce(lambda a,b: a+b))

partition:  [[1, 2, 3, 4, 5]]
addition:  15
partition:  [[1, 2], [3, 4, 5]]
addition:  15
partition:  [[1], [2, 3], [4, 5]]
addition:  15
partition:  [[1], [2], [3], [4, 5]]
addition:  15


In [14]:
# notice how the final result changes as the number of partitions grows
for i in range(1,5):
    rdd = spark.sparkContext.parallelize(data, i)
    print('partition: ', rdd.glom().collect())
    print('division: ', rdd.reduce(lambda a,b: a/b))

partition:  [[1, 2, 3, 4, 5]]
division:  0.008333333333333333
partition:  [[1, 2], [3, 4, 5]]
division:  3.3333333333333335
partition:  [[1], [2, 3], [4, 5]]
division:  1.875
partition:  [[1], [2], [3], [4, 5]]
division:  0.20833333333333331


## **Broadcast Variables**

In [15]:
# this dictionary contains the names and abbreviations for the four states
states = {"NY":"New York", "CA":"California", "TX":"Texas", "FL":"Florida"}

# broadcast the states dictionary to spark cluster
broadcastStates = spark.sparkContext.broadcast(states)
# confirm type
type(broadcastStates)

pyspark.broadcast.Broadcast

In [19]:
# map the abbreviations to their full names
rdd_broadcast = rdd_transformation.map(lambda x: (x[0], x[1], x[2], broadcastStates.value[x[3]]))
# confirm transformation is correct
rdd_broadcast.collect()

[('Chris', 1523, 72.0, 'California'),
 ('Jake', 1555, 83.0, 'New York'),
 ('Cody', 1439, 92.0, 'California'),
 ('Lisa', 1442, 81.0, 'Florida'),
 ('Daniel', 1600, 88.0, 'Texas'),
 ('Kelvin', 1382, 99.0, 'Florida'),
 ('Nancy', 1442, 74.0, 'Texas'),
 ('Pavel', 1599, 82.0, 'New York'),
 ('Josh', 1482, 78.0, 'California'),
 ('Cynthia', 1582, 94.0, 'California')]

## **Accumulator Variables**

In [20]:
# create an accumulator variable that starts at 0
sat_1500 = spark.sparkContext.accumulator(0)
# confirm type
type(sat_1500)

pyspark.accumulators.Accumulator

In [21]:
# create a function that increments the accumulator by 1 whenever it encounters a score over 1500
def count_high_sat_score(score):
    if score > 1500: sat_1500.add(1)
# confirm type
print(count_high_sat_score)

<function count_high_sat_score at 0x7facd93ea050>


In [None]:
# stop the session
spark.stop()