In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import SQLContext, HiveContext
from pyspark import SparkConf, SparkContext

spark = (SparkSession.builder.appName("Day 5 RDDS").getOrCreate())
sqlContext = SQLContext(spark.sparkContext)

## Data Preparation

In [2]:
#student data and field description
exam_grade_data = [(12, 'Song Jiang', 25, 'Male', 'Chinese', 50),
                   (12, 'Song Jiang', 25, 'Male', 'Math', 60),
                   (12, 'Song Jiang', 25, 'Male', 'English', 70 ),
                   (12, 'Wu Yong', 20, 'Male', 'Chinese', 50),
                   (12, 'Wu Yong', 20, 'Male', 'English', 50),
                   (12, 'Yang Chun', 19, 'Female', 'Chinese', 70),
                   (12, 'Yang Chun', 19, 'Female', 'English', 70),
                   (13, 'Li Kui', 25, 'Male', 'Chinese', 60),
                   (13, 'Li Kui', 25, 'Male', 'Math', 60),
                   (13, 'Li Kui', 25, 'Male', 'English', 70),
                   (13, 'Lin Chong', 20, 'Male', 'Chinese', 50),
                   (13, 'Lin Chong', 20, 'Male', 'Math', 60),
                   (13, 'Lin Chong', 20, 'Male', 'English', 50),
                   (13, 'Wang Ying', 19, 'Female', 'Chinese', 70),
                   (13, 'Wang Ying', 19, 'Female', 'Math', 80),
                   (13, 'Wang Ying', 19, 'Female', 'English', 70)
                  ]

In [3]:
#create Spark dataframe
df = spark.createDataFrame(exam_grade_data, ["Class", "Name", "Age", "Gender", "Subject", "Grade"])

In [4]:
#convert dataframe to rdd
rdd = df.rdd

In [5]:
rdd.collect()

[Row(Class=12, Name='Song Jiang', Age=25, Gender='Male', Subject='Chinese', Grade=50),
 Row(Class=12, Name='Song Jiang', Age=25, Gender='Male', Subject='Math', Grade=60),
 Row(Class=12, Name='Song Jiang', Age=25, Gender='Male', Subject='English', Grade=70),
 Row(Class=12, Name='Wu Yong', Age=20, Gender='Male', Subject='Chinese', Grade=50),
 Row(Class=12, Name='Wu Yong', Age=20, Gender='Male', Subject='English', Grade=50),
 Row(Class=12, Name='Yang Chun', Age=19, Gender='Female', Subject='Chinese', Grade=70),
 Row(Class=12, Name='Yang Chun', Age=19, Gender='Female', Subject='English', Grade=70),
 Row(Class=13, Name='Li Kui', Age=25, Gender='Male', Subject='Chinese', Grade=60),
 Row(Class=13, Name='Li Kui', Age=25, Gender='Male', Subject='Math', Grade=60),
 Row(Class=13, Name='Li Kui', Age=25, Gender='Male', Subject='English', Grade=70),
 Row(Class=13, Name='Lin Chong', Age=20, Gender='Male', Subject='Chinese', Grade=50),
 Row(Class=13, Name='Lin Chong', Age=20, Gender='Male', Subject='M

### `(1)	How many people under 20 took the exam?`

In [6]:
under20 = rdd.filter(lambda x: x[2] < 20).count()
print("There are ",under20, " people under 20 took the exam.")

There are  5  people under 20 took the exam.


### `(2)	How many 20 years old students took the exam?`

In [7]:
aged20 = rdd.filter(lambda x: x[2] == 20).count()
print("There are ", aged20, " people in their 20 took the exam.")

There are  5  people under 20 took the exam.


### `(3)	How many people over 20 years old students took the exam?`

In [8]:
over20 = rdd.filter(lambda x: x[2] > 20).count()
print("There are ", over20, " people in over 20 took the exam.")

There are  6  people in their 20 took the exam.


### `(4)	How many boys are taking the exam?`

In [9]:
boys_taking_exam = rdd.filter(lambda x: 'Male' in x[3]).count()
print("There are ", boys_taking_exam, " boys taking the exam.")

There are  11  boys taking the exam.


### `(5)	How many girls are there in the exam?`

In [10]:
girls_taking_exam = rdd.filter(lambda x: 'Female' in x[3]).count()
print("There are ", girls_taking_exam, " girls taking the exam.")

There are  5  girls taking the exam.


### `(6)	How many people in the class take the exam?`

In [11]:
take_exams = rdd.map(lambda x: (x,1)).count()
print("There are ", take_exams, "exams.")

There are  16 exams.


In [12]:
subject = []
grade = []
theclass = []
name = []
gender = []
age = []

for rd in rdd.collect():
    subject.append(rd[-2])
    grade.append(rd[-1])
    age.append(rd[2])
    theclass.append(rd[0])
    name.append(rd[1])
    gender.append(rd[-3])

### `(7)	What is the average score of Chinese subject?`

In [13]:
subject_grade = list(zip(subject, grade))
subject_rdd = spark.sparkContext.parallelize(subject_grade)

subject_rdd.mapValues(lambda x: (x,1)) \
.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1])) \
.map(lambda x: (x[0], x[1][0]/x[1][1])) \
.filter(lambda x: 'Chinese' in x[-2]) \
.collect()

[('Chinese', 58.333333333333336)]

### `(8)	What is the average score of Math subject?`

In [14]:
subject_rdd.mapValues(lambda x: (x,1)) \
.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1])) \
.map(lambda x: (x[0], x[1][0]/x[1][1])) \
.filter(lambda x: 'Math' in x[-2]) \
.collect()

[('Math', 65.0)]

### `(9)	What is the average score of English subject?`

In [15]:
subject_rdd.mapValues(lambda x: (x,1)) \
.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1])) \
.map(lambda x: (x[0], x[1][0]/x[1][1])) \
.filter(lambda x: 'English' in x[-2]) \
.collect()

[('English', 63.333333333333336)]

### `(10)	What is the average score per person?`

In [16]:
person_grade = list(zip(name, grade))
person_rdd = spark.sparkContext.parallelize(person_grade)

person_rdd.mapValues(lambda x: (x,1)) \
.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1])) \
.map(lambda x: (x[0], x[1][0]/x[1][1])) \
.collect()

[('Song Jiang', 60.0),
 ('Wang Ying', 73.33333333333333),
 ('Yang Chun', 70.0),
 ('Wu Yong', 50.0),
 ('Lin Chong', 53.333333333333336),
 ('Li Kui', 63.333333333333336)]

### `#(11)	What is the class average?`

In [17]:
class_grade = list(zip(theclass, grade))
class_rdd = spark.sparkContext.parallelize(class_grade)

class_rdd.mapValues(lambda x: (x,1)) \
.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1])) \
.map(lambda x: (x[0], x[1][0]/x[1][1])) \
.collect()

[(12, 60.0), (13, 63.333333333333336)]

### `#(12)	What is the average score of the boys in the class?`

In [18]:
gender_grade = list(zip(gender, grade))
gender_rdd = spark.sparkContext.parallelize(gender_grade)

gender_rdd.mapValues(lambda x: (x,1)) \
.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1])) \
.map(lambda x: (x[0], x[1][0]/x[1][1])) \
.filter(lambda x: 'Male' in x[0]) \
.collect()

[('Male', 57.27272727272727)]

### `(13)	What is the average score of the girls in the class?`

In [19]:
gender_rdd.mapValues(lambda x: (x,1)) \
.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1])) \
.map(lambda x: (x[0], x[1][0]/x[1][1])) \
.filter(lambda x: 'Female' in x[0]) \
.collect()

[('Female', 72.0)]

### `(14)	What is the highest score of Chinese in the school?`

In [20]:
subject_rdd.filter(lambda x: 'Chinese' in x[0]) \
.sortBy(lambda x: x[1], ascending=False) \
.top(1)

[('Chinese', 70)]

### `(15)	What is the lowest score in Chinese?`

In [21]:
subject_rdd.filter(lambda x: 'Chinese' in x[0]) \
.sortBy(lambda x: x[1], ascending=True) \
.first()

('Chinese', 50)

### `(16)	What is highest score in math?`

In [22]:
subject_rdd.filter(lambda x: 'Math' in x[0]) \
.sortBy(lambda x: x[1]) \
.top(1)

[('Math', 80)]

### `(17)	How many girls are there in class 12 whose total score is more than 150? find the class, name and gender of female student`

In [23]:
class2_female = list(zip(theclass, name, gender))
class2_female_rdd = spark.sparkContext.parallelize(class2_female) \
.keyBy(lambda x: x[2])

female_rdd = spark.sparkContext.parallelize(gender_grade) \
.keyBy(lambda x: x[0])

In [24]:
#join class2_female with female_rdd using leftOuterJoin
join_classes = class2_female_rdd.leftOuterJoin(female_rdd).distinct()

In [25]:
#filter only female with class 12
join_class_female = join_classes.filter(lambda x: 'Female' in x[0]) \
.filter(lambda x: x[1][0][0] == 12)

In [26]:
name_student = []
score = []

for cf in join_class_female.collect():
    name_student.append(cf[1][0][1])
    score.append(cf[1][1][1])
    
name_score = list(zip(name_student, score))
rdd_over150 = spark.sparkContext.parallelize(name_score)

In [27]:
#perform action and transformation to find out how many female students with a total score of 150
rdd_over150.mapValues(lambda x: (x,1)) \
.reduceByKey(lambda x, y: (x[0]+y[0])) \
.filter(lambda x: x[1] > 150) \
.count()

0

### `(18)	What is the average score of students whose total score is more than 150, mathematics score is more than or equal to 70 and age is more than or equal to 19?`

In [28]:
class_val = list(zip(theclass, name, age, gender, subject, grade))
class_val = spark.sparkContext.parallelize(class_val)

In [29]:
#mathematics score is more than or equal to 70 
#age is more than or equal to 19
persons = class_val.filter(lambda x: 'Math' in x[-2]) \
.filter(lambda x: x[-1] >= 70) \
.filter(lambda x: x[2] >= 19) \
.collect()

name = []
for person in persons:
    name.append(person[1])  
print("The name of students whose maths score >= 70 and age >= 19 is ", name)

The name of students whose maths score >= 70 and age >= 19 is  ['Wang Ying']


In [30]:
#returns the name of students whose total score is 150 who scores math >= 70
#and the age is >= 19
def isthereMathAge(x):
    if x in name:
        return x
    
#returns the name of students whose total score is 150 and managed to score math >= 70
#and the age is >= 19
def isScoreOver150(x):
    if x in name_over150:
        return x

In [31]:
#whose total score is more than 150 by filtering the age >= 19 and math score >= 70
person2 = person_rdd.mapValues(lambda x: (x,1)) \
.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1])) \
.map(lambda x: (x[0], x[1][0])) \
.filter(lambda x: x[-1] > 150) \
.filter(lambda x: isthereMathAge(x[0])) \
.collect()
person2

name_over150 = []
for person in person2:
    name_over150.append(person[0])  
print("The student whose score is 150 is", name_over150)    

The student whose score is 150 is ['Wang Ying']


In [35]:
#find the average of students whose total score is 150, math score >= 70 and age 
person_rdd.mapValues(lambda x: (x,1)) \
.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1])) \
.map(lambda x: (x[0], x[1][0]/x[1][1])) \
.filter(lambda x: isScoreOver150(x[0])) \
.collect()

[('Wang Ying', 73.33333333333333)]