In [None]:
# import modules
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import matplotlib.pyplot as plt

ModuleNotFoundError: ignored

In [None]:
# plot graphs inline in jupyter notebook
%matplotlib inline
# plt.style.use('ggplot')  # unnecessary ink in the style

In [None]:
# create a SparkSession app object with the name "Ops"
spark_session = SparkSession.builder.appName("Ops").getOrCreate()

NameError: ignored

In [None]:
# load CSV Data into df
df = spark_session.read.csv("StudentsPerformance_tr.csv", inferSchema=True, header=True)
print(type(df))

In [None]:
# view schema
df.printSchema()

## Explore Data

In [None]:
# preview data, showing first 10 rows
df.show(10)

### Quick Visualization for Exploration

In [None]:
# convert spark df to pandas df
pd_df = df.toPandas()
# display score histagram by subject 
pd_df.plot(kind = 'hist', subplots = True, title = "Score Distribution")
plt.show()

In [None]:
# pandas boxplot grouped by lunch type
pd_df.boxplot(by="lunch", figsize = (12,8))
plt.show()

In [None]:
# boxplot by ethnic group
def bp (df, gp):
    dff = df.filter(df["race_ethnicity"] == "group "+ gp).toPandas()
    dff.plot(kind = 'hist', subplots = True, figsize = (5.5, 6))
    plt.gcf().text(.45, .9, "Group " + gp, fontsize = 16)
    dff.boxplot(by="lunch", figsize = (12,6))
    plt.show()
bp(df, "A")
bp(df, "B")
bp(df, "C")
bp(df, "D")
bp(df, "E")

### Calculate Quantiles and IQR & filter for outliers

In [None]:
# define function to find outliers
def print_outliers (df, col):
    quantiles = df.stat.approxQuantile("math_score", [0.25, 0.75], 0.0)
    Q1 = quantiles[0]
    Q3 = quantiles[1]
    IQR = Q3 - Q1
    lowerRange = Q1 - 1.5*IQR
    upperRange = Q3+ 1.5*IQR
    outliers = df.filter("%s < %d or %s > %d" % (col, lowerRange, col, upperRange))
    outliers.show()

In [None]:
print_outliers(df, "math_score")

In [None]:
print_outliers(df, "reading_score")

In [None]:
print_outliers(df, "writing_score")

### The above outliers are mostly receiving free/reduced lunch and have not completed test preparation course.

### Spark df operations (filter, groupBy, orderBy) and asc function:

In [None]:
# average score for students who receive free/reduced lunch
print ("Average Scores of Students who Receive Free/Reduced Lunch:")
df.filter(df['lunch'] == "free/reduced").groupBy("race_ethnicity").mean().orderBy(F.asc("race_ethnicity")).show()

In [None]:
# average score for students do not receive free/reduced lunch
print ("Average Scores of Students who Receive Standard Lunch:")
df.filter( df['lunch'] != "free/reduced").groupBy("race_ethnicity").mean().orderBy(F.asc("race_ethnicity")).show()

In [None]:
# average scores groupby race ethnicity and lunch type
df.groupBy(["lunch","race_ethnicity"]).mean().orderBy(F.asc("race_ethnicity"), F.asc("lunch")).show()

### More Data Visualization

### Students who receive free/reduced lunch on average score lower in all subjects across all ethnic groups.

In [None]:
# convert spark df toPandas for graphing boxplots
# This is a quite way to explore and visualize
fig = df.groupBy(["lunch", "race_ethnicity"]).mean().orderBy(F.asc("race_ethnicity"),F.asc("lunch")).toPandas()\
  .plot(x="lunch", kind = "bar", subplots = True, figsize = (10,9), legend = None)
fig[0].set_title("Average Math Score", fontsize = 14)
fig[1].set_title("Average Reading Score", fontsize = 14)
fig[2].set_title("Average Writng Score", fontsize = 14)
plt.tight_layout(pad = 2, h_pad = 1)
plt.gcf().text(.12, -.005, "Group A", fontsize = 14)
plt.gcf().text(.30, -.005, "Group B", fontsize = 14)
plt.gcf().text(.48, -.005, "Group C", fontsize = 14)
plt.gcf().text(.67, -.005, "Group D", fontsize = 14)
plt.gcf().text(.85, -.005, "Group E", fontsize = 14)
plt.show()

### I like the following graphs better.  But it takes more lines of codes.

In [None]:
df.filter(df["race_ethnicity"] == "group A").groupBy("lunch").mean().orderBy(F.asc("lunch")).show()

In [None]:
# spark df operations (filter, groupBy, orderBy) and asc function
df.filter(df["race_ethnicity"] == "group A").groupBy("lunch").mean().orderBy(F.asc("lunch"))\
          .toPandas().plot(x = "lunch", kind="bar", figsize = (6, 6), \
          title = "Group A Average Scores by Lunch Type", fontsize = 12)
plt.legend(['Math', 'Reading', 'Writing'], bbox_to_anchor=(1.3, .9))
plt.ylabel("Average Scores", fontsize = 12)
plt.show()

In [None]:
# for big data and to avoid shuffling of data using multiple keys, data may be filtered by a single key first and send to one 
#   worker node for processing: groupBy, mean, orderBy, toPandas
fig, axes = plt.subplots(nrows=1, ncols=5)
df.filter(df["race_ethnicity"] == "group A").groupBy("lunch").mean().orderBy(F.asc("lunch")).toPandas()\
          .plot(x = "lunch", ax=axes[0], kind = "bar", sharey = True, figsize = (18, 6), legend = 0, title = "Group A")
df.filter(df["race_ethnicity"] == "group B").groupBy("lunch").mean().orderBy(F.asc("lunch")).toPandas()\
          .plot(x = "lunch", ax=axes[1], kind = "bar", sharey = True, figsize = (18, 6), legend = 0, title = "Group B")
df.filter(df["race_ethnicity"] == "group C").groupBy("lunch").mean().orderBy(F.asc("lunch")).toPandas()\
          .plot(x = "lunch", ax=axes[2], kind = "bar", sharey = True, figsize = (18, 6), legend = 0, title = "Group C")
df.filter(df["race_ethnicity"] == "group D").groupBy("lunch").mean().orderBy(F.asc("lunch")).toPandas()\
          .plot(x = "lunch", ax=axes[3], kind = "bar", sharey = True, figsize = (18, 6), legend = 0, title = "Group D")
df.filter(df["race_ethnicity"] == "group E").groupBy("lunch").mean().orderBy(F.asc("lunch")).toPandas()\
          .plot(x = "lunch", ax=axes[4], kind = "bar", sharey = True, figsize = (18, 6), legend = 0, title = "Group E")
#axes.flatten()[-2].legend(loc='upper center', bbox_to_anchor=(0.5, -0.6), ncol=5)  
fig.legend(['Math', 'Reading', 'Writing'], loc="upper left", bbox_to_anchor=(.03, 1.09))
plt.tight_layout(pad = 2, h_pad = 1)
axes[0].set_ylabel("Average Scores")
remarks = "Students who receive free/reduced lunch on average score lower in all subjects across all ethnic groups."
plt.gcf().text(.03, 1.17, remarks, fontsize = 16)
plt.gcf().text(.01, 1.15, " ", fontsize = 12)
plt.gcf().text(.35, 1.09, "Average Scores Per Ethnic Group By Lunch Type", fontsize = 16)
plt.show()    

### Exploring Gender and Ethnicity

In [None]:
# Spaark groupby feature and get mean of numeric cols
print("Average Scores By Ethnic Group")
df.groupBy("race_ethnicity").mean().orderBy(F.asc("race_ethnicity")).show()

In [None]:
# Spark filter by gender, then grougBY race and get avg scores
print("++++++++++++++++ MALE STUFENTSc++++++++++++++++")
print("Average Scores of Male Students By Ethnic Group")
df.filter(df["gender"] == "male").groupBy("race_ethnicity").mean().orderBy(F.asc("race_ethnicity")).show()

In [None]:
# filter by gender, then grougBY race and get avg scores
print("++++++++++++++++ FEMALE STUFENTS ++++++++++++++++")
print("Average Scores of emale Students By Ethnic Group")
df.filter(df["gender"] == "female").groupBy("race_ethnicity").mean().orderBy(F.asc("race_ethnicity")).show()

In [None]:
# convert to pandas DataFrame
df_F_avg_scores = df.filter(df["gender"] == "female").groupBy("race_ethnicity").mean().orderBy(F.asc("race_ethnicity")).toPandas()
df_M_avg_scores = df.filter(df["gender"] == "male").groupBy("race_ethnicity").mean().orderBy(F.asc("race_ethnicity")).toPandas()

In [None]:
# define function to plot avg scores 
def plot_by_ethnicity(df, plot_title, remarks):
    fig = df.plot(x = "race_ethnicity", kind = "bar", figsize = (10, 8), fontsize = 12, legend = 4)
    plt.title(plot_title, fontsize = 16)
    plt.tight_layout(h_pad = 20)
    fig.legend(['Math Score', 'Reading  Score', 'Writing  Score'])
    plt.xlabel("Race Ethnicity", fontsize = 14)
    plt.ylabel("Average Scores", fontsize = 14)
    plt.gcf().text(.02, 1.05, remarks, fontsize = 15)
    plt.show()    

In [None]:
# plot average scores per gender
data = df_F_avg_scores
title = "Female Students Average Scores by Ethnic Group"
remarks = "Math scores of female students are consistently lower than other subjects across all ethnic groups.\
 There is an upward trend for all scores from Group A through Group E."
plot_by_ethnicity(data, title, remarks)

In [None]:
# plot average scores per gender
data = df_M_avg_scores
title = "Male Students Average Scores by Ethnic Group"
remarks = "Math scores of female students are consistently lower than other subjects across all ethnic groups.\
 There is an upward trend for all scores from Group A through Group E."
plot_by_ethnicity(data, title, remarks)

In [None]:
df.groupBy(["gender", "race_ethnicity"]).count().orderBy(F.asc("gender"), F.asc("race_ethnicity")).show()

In [None]:
# groupby counts
fig, axes = plt.subplots(nrows=1, ncols=2)
df.filter(df["gender"]=="female").groupBy("race_ethnicity").count().orderBy(F.asc("race_ethnicity")).toPandas()\
           .plot(x="race_ethnicity", ax=axes[0], kind="bar", legend=0, title="Female",\
            fontsize = 12, sharey = True, figsize = (12, 6), color = "g", alpha = .9)
axes[0].set_ylabel("Number of Students", fontsize = 12)
df.filter(df["gender"]=="male").groupBy("race_ethnicity").count().orderBy(F.asc("race_ethnicity")).toPandas()\
           .plot(x="race_ethnicity", ax = axes[1], kind="bar", legend=0, title="Male",\
            fontsize = 12, sharey = True, figsize = (12, 6), color = "b", alpha = .9)
plt.show()

## SQL Queries

In [None]:
# make a Temp Table
df.createOrReplaceTempView("STUDENT_PERFORMANCE")

In [None]:
# Access data with spark session SQL
spark_session.sql("""
    SELECT 
        gender, test_preparation_course, math_score, reading_score, writing_score
    FROM 
        STUDENT_PERFORMANCE
    WHERE
        gender = "male"      
    """).toPandas().boxplot(by="test_preparation_course",fontsize = 12, figsize = (12, 9))
plt.gcf().text(.37, 1.02, "Male Students Average Scores", fontsize = 15)
plt.show()

spark_session.sql("""
    SELECT 
        gender, test_preparation_course, math_score, reading_score, writing_score
    FROM 
        STUDENT_PERFORMANCE
    WHERE
        gender = "female"        
    """).toPandas().boxplot(by="test_preparation_course",fontsize = 12, figsize = (12, 9))
plt.gcf().text(.37, 1.02, "Female Students Average Scores", fontsize = 15)
plt.show()

In [None]:
spark_session.sql("""
    SELECT 
        test_preparation_course
    FROM 
        STUDENT_PERFORMANCE   
    """).groupBy("test_preparation_course").count().orderBy(F.asc("test_preparation_course"))\
        .toPandas().plot(x="test_preparation_course", kind="bar", legend = None, figsize = (8,4))
plt.gcf().text(0.01, 1.005, "Number of Students Who Completed Test Preparation Course Verse Those Who Have None", fontsize = 13)
plt.show()

In [None]:
spark_session.sql("""
    SELECT 
        gender, test_preparation_course
    FROM 
        STUDENT_PERFORMANCE   
    """).groupBy(["gender", "test_preparation_course"]).count().orderBy(F.asc("gender"), F.asc("test_preparation_course")).show()

In [None]:
fig, axes = plt.subplots(nrows=1, ncols=2)
spark_session.sql("""
    SELECT 
        gender, test_preparation_course
    FROM 
        STUDENT_PERFORMANCE   
    WHERE
        gender = "male"
    """).groupBy(["gender", "test_preparation_course"]).count().orderBy(F.asc("gender"), F.asc("test_preparation_course"))\
        .toPandas().plot(x = "test_preparation_course", kind = "bar", ax = axes[0], sharey = True, title = "Male", \
         figsize = (10, 5), legend = None)

spark_session.sql("""
    SELECT 
        gender, test_preparation_course
    FROM 
        STUDENT_PERFORMANCE   
    WHERE
        gender = "female"
    """).groupBy(["gender", "test_preparation_course"]).count().orderBy(F.asc("gender"), F.asc("test_preparation_course"))\
        .toPandas().plot(x = "test_preparation_course", kind = "bar", ax = axes[1], sharey = True, title = "Female", \
         figsize = (10, 5), legend = None)

plt.gcf().text(0.1, 1.01, "Number of Students Who Completed Test Preparation Course Verse Those Who Have None", fontsize = 14)

plt.show()

### Spark Functions

In [None]:
df.groupBy("gender").mean().show()

In [None]:
print(df.agg({"math_score": "mean"}).collect())
print(type(df.agg({"math_score": "mean"}).collect()[0]))
print(df.agg({"math_score": "mean"}).collect()[0])
print(type(df.agg({"math_score": "mean"}).collect()[0]))

print(df.agg({"math_score": "mean"}).collect()[0][0])
print(df.agg({"reading_score": "mean"}).collect()[0][0])
print(df.agg({"writing_score": "mean"}).collect()[0][0])

In [None]:
avg_math_score = str(df.agg({"math_score": "mean"}).collect()[0][0])
avg_reading_score = str(df.agg({"reading_score": "mean"}).collect()[0][0])
avg_writing_score = str(df.agg({"writing_score": "mean"}).collect()[0][0])

In [None]:
print("Number of students with above average MATH score: ", avg_math_score)

df_math_above_av = spark_session.sql("""
    SELECT 
        gender, test_preparation_course, math_score
    FROM 
        STUDENT_PERFORMANCE
    WHERE 
        writing_score > """ + avg_math_score
)
df_math_above_av.groupBy(["gender", "test_preparation_course"]).count().orderBy(F.asc("gender"), F.asc("test_preparation_course")).show()
df_math_above_av.groupBy("test_preparation_course").count().orderBy( F.asc("test_preparation_course")).show()

In [None]:
df_math_above_av.groupBy("test_preparation_course").count().orderBy( F.asc("test_preparation_course"))\
                .toPandas().plot(x="test_preparation_course", kind="bar", legend = None, \
                 title = "Number of Students who Scored MATH Above Average")
plt.show()

In [None]:
print("Number of students with above average READING score: ", avg_reading_score)

df_reading_above_av = spark_session.sql("""
    SELECT 
        gender, test_preparation_course, reading_score
    FROM 
        STUDENT_PERFORMANCE
    WHERE 
        writing_score > """ + avg_reading_score
)
df_reading_above_av.groupBy(["gender", "test_preparation_course"]).count().orderBy(F.asc("gender"), F.asc("test_preparation_course")).show()
df_reading_above_av.groupBy("test_preparation_course").count().orderBy( F.asc("test_preparation_course")).show()

In [None]:
df_reading_above_av.groupBy("test_preparation_course").count().orderBy( F.asc("test_preparation_course"))\
                .toPandas().plot(x="test_preparation_course", kind="bar", \
                 title = "Number of Students who Scored READING Above Average", legend = None)
plt.show()

In [None]:
print("Number of students with above average WRITING score: ", avg_writing_score)

df_writing_above_av = spark_session.sql("""
    SELECT 
        gender, test_preparation_course, writing_score
    FROM 
        STUDENT_PERFORMANCE
    WHERE 
        writing_score >  """ + avg_writing_score
)
df_writing_above_av.groupBy(["gender", "test_preparation_course"]).count().orderBy(F.asc("gender"), F.asc("test_preparation_course")).show()
df_writing_above_av.groupBy("test_preparation_course").count().orderBy( F.asc("test_preparation_course")).show()

In [None]:
fig, axes = plt.subplots(nrows=1, ncols=3)
df_math_above_av.groupBy("test_preparation_course").count().orderBy( F.asc("test_preparation_course"))\
                .toPandas().plot(x="test_preparation_course", kind="bar", ax = axes[0], title = "MATH", \
                 legend = 0, sharey = True, figsize = (12, 5), color = "r")

df_reading_above_av.groupBy("test_preparation_course").count().orderBy( F.asc("test_preparation_course"))\
                .toPandas().plot(x="test_preparation_course", kind="bar", ax = axes[1], title = "READING", \
                 legend = 0, sharey = True, figsize = (12, 5), color = "g")

df_writing_above_av.groupBy("test_preparation_course").count().orderBy( F.asc("test_preparation_course"))\
                .toPandas().plot(x="test_preparation_course", kind="bar", ax = axes[2], title = "WRITING", \
                 legend = 0, sharey = True, figsize = (12, 5), color = "b")

plt.gcf().text(.3, 1.08, "Number of Students who Scored Above Average", fontsize =14)
plt.gcf().text(.31, 1.01, "By Completed Test Preparation Course / None", fontsize =14)
axes[0].set_ylabel("Number of Students")

plt.show()