In [1]:
import findspark # necessary to avoid error with Apache Python API
from pyspark.sql import SparkSession

findspark.init()
student_data = [("Chris",1523,0.72,"CA"),
                ("Jake", 1555,0.83,"NY"),
                ("Cody", 1439,0.92,"CA"),
                ("Lisa",1442,0.81,"FL"),
                ("Daniel",1600,0.88,"TX"),
                ("Kelvin",1382,0.99,"FL"),
                ("Nancy",1442,0.74,"TX"),
                ("Pavel",1599,0.82,"NY"),
                ("Josh",1482,0.78,"CA"),
                ("Cynthia",1582,0.94,"CA")]

Creating SparkSession:

In [2]:
import findspark # necessary to avoid error with Apache Python API
findspark.init()

spark = SparkSession.builder.getOrCreate()
print(spark)

<pyspark.sql.session.SparkSession object at 0x000001B97BE7B950>


In [3]:
student_rdd = spark.sparkContext.parallelize(student_data)

Transforming student data

In [4]:
rdd_transformation =  student_rdd.map(lambda x: (x[0], x[1], int(x[2]*100), x[3]))

Filtering student data with grades > 80.

In [5]:
rdd_filtered = rdd_transformation.filter(lambda x: x[2] > 80)

Using `take(n)` to show first five elements:

In [6]:
rdd_transformation.take(5)

[('Chris', 1523, 72, 'CA'),
 ('Jake', 1555, 83, 'NY'),
 ('Cody', 1439, 92, 'CA'),
 ('Lisa', 1442, 81, 'FL'),
 ('Daniel', 1600, 88, 'TX')]

Calculating average grade

In [7]:
sum_gpa = rdd_transformation.map(lambda x: x[2]).reduce(lambda a,b: a+b)
sum_gpa

843

In [8]:
avg_grade = sum_gpa / rdd_transformation.count()
avg_grade

84.3

We can only apply operations that are commutative and associative:

In [11]:
for i in range(1,5):
    student_partitions = spark.sparkContext.parallelize(student_data, i)
    student_partitions = student_partitions.map(lambda x: (x[0], x[1], int(x[2]*100), x[3]))
    print('partition: ', student_partitions.glom().collect())
    print('addition: ', student_partitions.map(lambda x: x[2]).reduce(lambda a,b: a+b))

partition:  [[('Chris', 1523, 72, 'CA'), ('Jake', 1555, 83, 'NY'), ('Cody', 1439, 92, 'CA'), ('Lisa', 1442, 81, 'FL'), ('Daniel', 1600, 88, 'TX'), ('Kelvin', 1382, 99, 'FL'), ('Nancy', 1442, 74, 'TX'), ('Pavel', 1599, 82, 'NY'), ('Josh', 1482, 78, 'CA'), ('Cynthia', 1582, 94, 'CA')]]
addition:  843
partition:  [[('Chris', 1523, 72, 'CA'), ('Jake', 1555, 83, 'NY'), ('Cody', 1439, 92, 'CA'), ('Lisa', 1442, 81, 'FL'), ('Daniel', 1600, 88, 'TX')], [('Kelvin', 1382, 99, 'FL'), ('Nancy', 1442, 74, 'TX'), ('Pavel', 1599, 82, 'NY'), ('Josh', 1482, 78, 'CA'), ('Cynthia', 1582, 94, 'CA')]]
addition:  843
partition:  [[('Chris', 1523, 72, 'CA'), ('Jake', 1555, 83, 'NY'), ('Cody', 1439, 92, 'CA')], [('Lisa', 1442, 81, 'FL'), ('Daniel', 1600, 88, 'TX'), ('Kelvin', 1382, 99, 'FL')], [('Nancy', 1442, 74, 'TX'), ('Pavel', 1599, 82, 'NY'), ('Josh', 1482, 78, 'CA'), ('Cynthia', 1582, 94, 'CA')]]
addition:  843
partition:  [[('Chris', 1523, 72, 'CA'), ('Jake', 1555, 83, 'NY')], [('Cody', 1439, 92, 'CA'),

Sharing variables with broadcast:

In [12]:
states = {'NY':'New York', 'CA':'California', 'TX':'Texas', 'FL':'Florida'}
broadcastStates = spark.sparkContext.broadcast(states)
type(broadcastStates)

pyspark.broadcast.Broadcast

In [15]:
rdd_broadcast = rdd_transformation.map(lambda x: (x[0], x[1], x[2], broadcastStates.value[x[3]]))
rdd_broadcast.collect()

[('Chris', 1523, 72, 'California'),
 ('Jake', 1555, 83, 'New York'),
 ('Cody', 1439, 92, 'California'),
 ('Lisa', 1442, 81, 'Florida'),
 ('Daniel', 1600, 88, 'Texas'),
 ('Kelvin', 1382, 99, 'Florida'),
 ('Nancy', 1442, 74, 'Texas'),
 ('Pavel', 1599, 82, 'New York'),
 ('Josh', 1482, 78, 'California'),
 ('Cynthia', 1582, 94, 'California')]

Using accumulators.

In [16]:
sat_1500 = spark.sparkContext.accumulator(0)
type(sat_1500)

pyspark.accumulators.Accumulator

In [17]:
def count_high_sat_score(x):
    if x[1] > 1500: sat_1500.add(1)
print(count_high_sat_score)

<function count_high_sat_score at 0x000001B97BECF600>


In [18]:
rdd_broadcast.foreach(lambda x: count_high_sat_score(x))
print(sat_1500)

5
