## Read Tables

In [2]:
courseDF = spark.read.csv('/FileStore/tables/course_name_credits.csv', sep=',', inferSchema='true', header='true')

studentDF = spark.read.csv('/FileStore/tables/student_name.tsv', sep='\t', inferSchema='true', header='true')

relationDF = spark.read.csv('/FileStore/tables/student_course_grade.csv', sep=',', inferSchema='true', header='true')

courseDF.show()
studentDF.show()
relationDF.show()

## Join the three tables

In [4]:
# relationDF.join(courseDF, courseDF.course == relationDF.course)
# is equals to
# relationDF.join(courseDF, courseDF['course'] == relationDF['course'])
# and the resulting dataframe will have both the two columns from the two table.
# So you may want to drop a column with dataframe_name.drop('column_you_want_to_drop')
#
# Otherwise, if the name of the column you want to join is the same in both tables,
# you can use the following
joinedDF = relationDF.join(courseDF, "course").join(studentDF, "student")
joinedDF.show()

## Aggregate Functions

In [6]:
from pyspark.sql.functions import *

student_avg_grade = joinedDF.groupBy('name').avg('grade') # or .agg(avg('grade'))
course_max_grade = joinedDF.groupBy('course_name').max('grade') # or .agg(max('grade'))

student_avg_grade.show()
course_max_grade.show()

In [7]:
# use joinedDF['grade'] or col('grade') or joinedDF.grade
finalDF = joinedDF.withColumn('grade_minus_4', joinedDF['grade'] - 4)
finalDF = finalDF.withColumn('done', when(finalDF['grade_minus_4'] >= 18, 'PASS').otherwise('FAIL'))
finalDF = finalDF.select('course_name', 'name', 'done')
finalDF.show()

## Weighted Average

In [7]:
# For the weighted average, we have to multiply grade * credits for each element in the group, sum them and then divide them by the total amount of credits earned by each student
weighted_avg = joinedDF.groupBy('name').agg(sum(col('grade') * col('credits'))/sum('credits'))
weighted_avg.show()

## Save Output

In [9]:
# Saving will produce n files, where n is the number of partitions
# In order to have only a single file, use dataframe_name.coalesce(1).write...
student_avg_grade.write.csv('/FileStore/tables/student_avg_grade.csv')
course_max_grade.write.json('/FileStore/tables/course_max_grade.json')
finalDF.write.parquet('/FileStore/tables/finalDF.parquet')