### Big Data Analytics

**Project 02**

**Zakaria Alsahfi**

## Part A: Set up Environment
In this part of the project, we will import the required tool from pyspark, and will also create a SparkSession object.

In [0]:
from pyspark.sql import SparkSession
import pyspark.sql.types
import pyspark.sql.functions as F
from pyspark.sql.functions import *

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

## Part B: Load the Data
In this project, you will be working with a database of course records from a (fictional) university called Imaginary U, which was founded in Fall 2000.

In [0]:
alumni_schema = 'SID INT'
alumni = spark.read.option('delimiter', '\t').csv('/FileStore/tables/univ/alumni.txt', header='True', schema=alumni_schema)

courses_schema = 'dept STRING, course STRING, prereq STRING, credits INT'
courses = spark.read.option('delimiter', '\t').csv('/FileStore/tables/univ/courses.txt', header='True', schema=courses_schema)

expelled_schema = 'SID INT'
expelled = spark.read.option('delimiter', '\t').csv('/FileStore/tables/univ/expelled.txt', header='True', schema=expelled_schema)

faculty_schema = 'FID INT, firstName STRING, lastName STRING, dept STRING'
faculty = spark.read.option('delimiter', '\t').csv('/FileStore/tables/univ/faculty.txt', header='True', schema=faculty_schema)

grades_schema = 'termID STRING, SID INT, course STRING, FID INT, grade STRING'
grades = spark.read.option('delimiter', '\t').csv('/FileStore/tables/univ/grades.txt', header='True', schema=grades_schema)

students_schema = 'firstTerm STRING, SID INT, firstName STRING, lastName STRING, major STRING'
students = spark.read.option('delimiter', '\t').csv('/FileStore/tables/univ/students.txt', header='True', schema=students_schema)

transferred_schema = 'SID INT'
transferred = spark.read.option('delimiter', '\t').csv('/FileStore/tables/univ/transferred.txt', header='True', schema=transferred_schema)

In [0]:
print('The number of records in alumni is '+str(alumni.count())+'.')
print('The number of records in courses is '+str(courses.count())+'.')
print('The number of records in expelled is '+str(expelled.count())+'.')
print('The number of records in faculty is '+str(faculty.count())+'.')
print('The number of records in grades is '+str(grades.count())+'.')
print('The number of records in students is '+str(students.count())+'.')
print('The number of records in transferred is '+str(transferred.count())+'.')

## Part C: Student Count by Status
In this part, we will count the number of students in each of the following groups: students who have been accepted, students who actually enrolled, current students, former students, alumni, students who transferred, and students who were expelled.

In [0]:
print('Number of accepted students:',students.count())
enrolled = students.join(grades, 'SID', 'left_semi')
print('Number of enrolled students:',enrolled.count())

current = enrolled.join(alumni, 'SID', 'left_anti').\
                   join(expelled, 'SID', 'left_anti').\
                   join(transferred, 'SID', 'left_anti')
print('Number of current students: ',current.count())

former = alumni.join(expelled,'SID', 'outer').\
               join(transferred,'SID', 'outer')
print('Number of former students:  ',former.count())

## Part D: Distribution of Students by Major
In this part, we will determine of the number of students currently in each each major, as well as the proportion of the overall number of students in each major.

In [0]:
st_by_major = current.groupBy('major').\
                      agg(expr('COUNT(SID) AS n_students')).\
                      withColumn('prop', F.round(col('n_students')/current.count(), 4)).\
                      sort('n_students', ascending=False)
st_by_major.show()

## Part E: Course Enrollments by Departement
In this part, we will determine of the number of students enrolled in courses offered by each department during the Fall 2020 term.

In [0]:
c_enrol_by_dept = courses.join(grades, 'course', 'outer').\
                          filter(expr('termID == "2020B"')).\
                          groupBy('dept').\
                          agg(expr('COUNT(SID) AS n_students')).\
                          withColumn('prop', F.round(col('n_students')/current.count(), 4)).\
                          sort('n_students', ascending=False)

c_enrol_by_dept.show()

## Part F: Graduation Rates by Major
In this part, we will determine the graduation rates for each major. We will perform this analysis in steps. First, we will create a DataFrame containing the number of former students in each major. Then we will create a DataFrame containing the number of alumni for each major. We will then combine these DataFrames to determine the graduation rate.

In [0]:
former_by_major = students.join(former,'SID','inner').\
                           groupBy('major').\
                           agg(expr('COUNT(SID) AS n_former')).\
                           sort('major', ascending=True)
former_by_major.show()

In [0]:
alumni_by_major = students.join(alumni, 'SID', 'inner').\
                          groupBy('major').\
                          agg(expr('COUNT(SID) AS n_alumnni')).\
                          sort('major', ascending=True)
alumni_by_major.show()

In [0]:
grad_rate_by_major = alumni_by_major.join(former_by_major, 'major', 'outer').\
                                    withColumn('grad_rate',F.round(col('n_alumnni')/col('n_former'), 4))

grad_rate_by_major.show()

## Part G: Number of Terms Required for Graduation
In this part, we will find a frequency distribution for the number of terms that alumni required for graduation.

In [0]:
n_term_req_grad = grades.groupBy('SID').\
                         agg(expr('COUNT(DISTINCT termID) AS n_terms')).\
                         join(alumni, 'SID', 'left_semi').\
                         groupBy('n_terms').\
                         agg(expr('COUNT(n_terms) AS count')).\
                         sort('n_terms', ascending=True)

n_term_req_grad.show()

## Part H: Number of Current Students by Major and Terms
In this part, we will determine the number of students in who major who are in their first term, second term, and so on.

In [0]:
ncs_by_major_term = grades.groupBy('SID').\
                           agg(expr('COUNT(DISTINCT termID) AS n_terms')).\
                           join(current, 'SID', 'inner').\
                           groupBy('major').\
                           agg(expr('SUM(CASE WHEN n_terms == 1 THEN 1 ELSE 0 END) AS t1'),
                               expr('SUM(CASE WHEN n_terms == 2 THEN 1 ELSE 0 END) AS t2'),
                               expr('SUM(CASE WHEN n_terms == 3 THEN 1 ELSE 0 END) AS t3'),
                               expr('SUM(CASE WHEN n_terms == 4 THEN 1 ELSE 0 END) AS t4'),
                               expr('SUM(CASE WHEN n_terms == 5 THEN 1 ELSE 0 END) AS t5'),
                               expr('SUM(CASE WHEN n_terms == 6 THEN 1 ELSE 0 END) AS t6'),
                               expr('SUM(CASE WHEN n_terms == 7 THEN 1 ELSE 0 END) AS t7'),
                               expr('SUM(CASE WHEN n_terms == 8 THEN 1 ELSE 0 END) AS t8'),
                               expr('SUM(CASE WHEN n_terms == 9 THEN 1 ELSE 0 END) AS t9'),
                               expr('SUM(CASE WHEN n_terms == 10 THEN 1 ELSE 0 END) AS t10'),
                               expr('SUM(CASE WHEN n_terms == 11 THEN 1 ELSE 0 END) AS t11'),
                               expr('SUM(CASE WHEN n_terms >= 12 THEN 1 ELSE 0 END) AS t12plus')).\
                         sort('major', ascending=True)

ncs_by_major_term.show()

## Part I: Student GPAs
In this section, we will calculate the GPA of each student who has enrolled at IU, and will analyze the results.

In [0]:
 def convert_letter_grade(letter):
    if letter == 'A':
      return 4
    if letter == 'B':
      return 3
    if letter == 'C':
      return 2
    if letter == 'D':
      return 1
    if letter == 'F':
      return 0
    
spark.udf.register('convert_letter_grade', convert_letter_grade)

In [0]:
gpa = grades.filter(expr('termID < "2020B"')).\
            join(courses,'course', 'inner').\
            withColumn('grad_points', expr('convert_letter_grade(grade) * credits')).\
            groupBy('SID').\
            agg(F.round(F.sum(col('grad_points'))/F.sum(col('credits')), 2).alias('GPA'))
gpa.persist()
gpa.show(5)

In [0]:
ten_lowest_GPA = gpa.join(current, 'SID', 'inner').\
                     select('firstName', 'lastName', 'SID', 'major', 'GPA').\
                     sort('GPA', ascending=True)

ten_lowest_GPA.show(10)

In [0]:
cs_w_perfect_GPAs = gpa.filter(expr('GPA == 4')).\
                        join(current, 'SID', 'left_semi')

cs_w_perfect_GPAs.count()

## Part J: Student GPA by Major
In this section, we will calculate the (non-weighted) average GPA for current students in each major.

In [0]:
st_gpa_by_major = grades.filter(expr('termID < "2020B"')).\
            join(courses,'course', 'inner').\
            withColumn('grad_points', expr('convert_letter_grade(grade) * credits')).\
            groupBy('SID').\
            agg(F.round(F.sum(col('grad_points'))/F.sum(col('credits')), 2).alias('GPA')).\
            join(current, 'SID', 'inner').\
            groupBy('major').\
            agg(F.round(F.sum(col('GPA'))/F.count(col('GPA')), 2).alias('avg_GPA')).\
            sort('major', ascending=False)
st_gpa_by_major.show()

## Part K: GPA by Faculty
In this section, we calculate for each instructor the credit-weighted grade point average for all grades that have ever been assigned by that instructor.

In [0]:
gpa_by_faculty = grades.filter(expr('termID < "2020B"')).\
                     join(faculty,'FID', 'left_semi').\
                     join(courses, 'course', 'inner').\
                     withColumn('grad_points', expr('convert_letter_grade(grade) * credits')).\
                     groupBy('FID').\
                     agg(F.round(F.sum(col('grad_points'))/F.sum(col('credits')), 2).alias('GPA'))
gpa_by_faculty.show()

In [0]:
faculty_ten_lowest_gpa = gpa_by_faculty.join(faculty, 'FID', 'inner').\
                                        select('firstName', 'lastName', 'FID', 'dept', 'GPA').\
                                        sort('GPA', ascending=True)
faculty_ten_lowest_gpa.show(10)

In [0]:
ten_highest_gpa_faculty = gpa_by_faculty.join(faculty, 'FID', 'inner').\
                                         select('firstName', 'lastName', 'FID', 'dept', 'GPA').\
                                         sort('GPA', ascending=False)
ten_highest_gpa_faculty.show(10)

## Part L: First Term GPA
In this section, we calculate the first-term GPA for each student who has enrolled at IU.

In [0]:
first_term_gpa = grades.filter(expr('termID < "2020B"')).\
                        join(courses,'course', 'inner').\
                        join(students, 'SID', 'inner').\
                        filter(expr('termID == firstTerm')).\
                        withColumn('grad_points', expr('convert_letter_grade(grade) * credits')).\
                        groupBy('SID').\
                        agg(F.round(F.sum(col('grad_points'))/ F.sum(col('credits')), 2).alias('term1_gpa'))

first_term_gpa.persist()
first_term_gpa.show(5)

## Part M: Graduation Rates and First Term GPA
In this section, we will calculate graduation rates for various ranges of first term GPAs.

In [0]:
def gpa_bin(gpa):
  if gpa >= 0 and gpa < 1:
    return '[0, 1)'
  if gpa >= 1 and gpa < 2:
    return '[1, 2)'
  if gpa >= 2 and gpa < 3:
    return '[2, 3)'
  if gpa >= 3 and gpa <= 4:
    return '[3, 4]'  
  
spark.udf.register('gpa_bin', gpa_bin)

In [0]:
alumni_ft_gpa = first_term_gpa.join(alumni, 'SID', 'left_semi').\
                              withColumn('gpa_bin', expr('gpa_bin(term1_gpa)')).\
                              groupBy('gpa_bin').\
                              agg(expr('count(gpa_bin) AS n_alumni'))
alumni_ft_gpa.persist()
alumni_ft_gpa.show()

In [0]:
former_ft_gpa = first_term_gpa.join(former, 'SID', 'left_semi').\
                              withColumn('gpa_bin', expr('gpa_bin(term1_gpa)')).\
                              groupBy('gpa_bin').\
                              agg(expr('count(gpa_bin) AS n_former'))
former_ft_gpa.persist()
former_ft_gpa.show()

In [0]:
grad_rate_first_term_gpa = alumni_ft_gpa.join(former_ft_gpa, 'gpa_bin', 'inner').\
                                        withColumn('grad_rate', F.round(col('n_alumni')/col('n_former'), 4))
                  
grad_rate_first_term_gpa.show()