In [None]:
from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel
conf = SparkConf().setAppName("StuApp")
sc = SparkContext.getOrCreate(conf=conf)

In [None]:
raw_stu_data = sc.textFile('/FileStore/tables/StudentData.csv')
raw_stu_data.take(10)

Out[2]: ['age,gender,name,course,roll,marks,email',
 '28,Female,Hubert Oliveras,DB,02984,59,Annika Hoffman_Naoma Fritts@OOP.com',
 '29,Female,Toshiko Hillyard,Cloud,12899,62,Margene Moores_Marylee Capasso@DB.com',
 '28,Male,Celeste Lollis,PF,21267,45,Jeannetta Golden_Jenna Montague@DSA.com',
 '29,Female,Elenore Choy,DB,32877,29,Billi Clore_Mitzi Seldon@DB.com',
 '28,Male,Sheryll Towler,DSA,41487,41,Claude Panos_Judie Chipps@OOP.com',
 '28,Male,Margene Moores,MVC,52771,32,Toshiko Hillyard_Clementina Menke@MVC.com',
 '28,Male,Neda Briski,OOP,61973,69,Alberta Freund_Elenore Choy@DB.com',
 '28,Female,Claude Panos,Cloud,72409,85,Sheryll Towler_Alberta Freund@Cloud.com',
 '28,Male,Celeste Lollis,MVC,81492,64,Nicole Harwood_Claude Panos@MVC.com']

In [None]:
#remove header line and split row for all jobs that follow
raw_stu_data = sc.textFile('/FileStore/tables/StudentData.csv')
header = raw_stu_data.first()
raw_stu_data = raw_stu_data.filter(lambda row: row != header)
raw_stu_data = raw_stu_data.map(lambda row: row.split(',') )
raw_stu_data.persist(StorageLevel.MEMORY_AND_DISK_DESER)

Out[5]: PythonRDD[167] at RDD at PythonRDD.scala:58

In [None]:
##### number of unique students in file #####
# map to key = student name, then reduce to filter out duplicate names
stu_names = raw_stu_data.map(lambda row: (row[2], None) )
stu_names = stu_names.reduceByKey(lambda val1, val2: None)
stu_names.count()

Out[55]: 50

In [None]:
##### total marks by male/female students #####
# map to key: gender, val: grade
stu_marks = raw_stu_data.map(lambda row: (row[1], int(row[5]) ) )
stu_marks = stu_marks.reduceByKey(lambda val1, val2: val1 + val2)
stu_marks.collect()

Out[56]: [('Female', 29636), ('Male', 30461)]

In [None]:
##### total number of passed/failed students #####
# map to grade
stu_passed = raw_stu_data.map(lambda row: int(row[5]) )
stu_passed = stu_passed.map(lambda grade: ("pass", 1) if grade > 50 else ("fail", 1) )
stu_passed = stu_passed.reduceByKey(lambda val1, val2: val1 + val2)
stu_passed.collect()

Out[57]: [('pass', 630), ('fail', 370)]

In [None]:
##### total enrollments per course #####
# map to key: course, val: 1, then reduce to count
course_enroll = raw_stu_data.map(lambda row: (row[3], 1) )
course_enroll = course_enroll.reduceByKey(lambda val1, val2: val1 + val2)
course_enroll.collect()

Out[58]: [('DB', 157),
 ('Cloud', 192),
 ('PF', 166),
 ('MVC', 157),
 ('OOP', 152),
 ('DSA', 176)]

In [None]:
##### total marks per course #####
# map to key: course, val: marks
course_marks = raw_stu_data.map(lambda row: (row[3], int(row[5]) ) )
course_marks = course_marks.reduceByKey(lambda val1, val2: val1 + val2)
course_marks.collect()

Out[59]: [('DB', 9270),
 ('Cloud', 11443),
 ('PF', 9933),
 ('MVC', 9585),
 ('OOP', 8916),
 ('DSA', 10950)]

In [None]:
##### average marks per course #####
# map to key: course, val: (mark, 1)
course_ave = raw_stu_data.map(lambda row: (row[3], (int(row[5]), 1) ) )
course_ave = course_ave.reduceByKey(lambda val1, val2: (val1[0] + val2[0], val1[1] + val2[1]) )
course_ave = course_ave.mapValues(lambda val: val[0]/val[1] )
course_ave.collect()

Out[60]: [('DB', 59.044585987261144),
 ('Cloud', 59.598958333333336),
 ('PF', 59.83734939759036),
 ('MVC', 61.05095541401274),
 ('OOP', 58.6578947368421),
 ('DSA', 62.21590909090909)]

In [None]:
##### max marks per course#####
# map to key: course, val: mark
all_marks = raw_stu_data.map(lambda row: (row[3], int(row[5]) ) )
max_marks = all_marks.reduceByKey(lambda val1, val2: val1 if val1 > val2 else val2)
max_marks.collect()

Out[61]: [('DB', 98), ('Cloud', 99), ('PF', 99), ('MVC', 99), ('OOP', 99), ('DSA', 99)]

In [None]:
##### min marks per course#####
min_marks = all_marks.reduceByKey(lambda val1, val2: val1 if val1 < val2 else val2)
min_marks.collect()

Out[62]: [('DB', 20), ('Cloud', 20), ('PF', 20), ('MVC', 22), ('OOP', 20), ('DSA', 20)]

In [None]:
##### ave age male and female students #####
# map to key: gender, val: (age, 1)
ave_age = raw_stu_data.map(lambda row: ( row[1], (int(row[0]), 1 ) ) )
ave_age = ave_age.reduceByKey(lambda val1, val2: (val1[0] + val2[0], val1[1] + val2[1]) )
ave_age = ave_age.mapValues(lambda val: float(val[0])/val[1])
ave_age.collect()

Out[63]: [('Female', 28.489021956087825), ('Male', 28.52304609218437)]