In [None]:
import pyspark
import csv

import numpy as np
import pandas as pd
import seaborn as sns
import os

import pyspark.sql.functions as func
from pyspark.sql import SQLContext
import pyspark.sql.types as types

### Data Loading & Cleansing

In [None]:
Edu_Dataframe = spark.read.csv("states_all_extended.csv", mode="DROPMALFORMED",inferSchema=True, header = True)

In [None]:
Edu_Dataframe.printSchema()

In [None]:
print(len(Edu_Dataframe.columns))
print(Edu_Dataframe.count())

A_A_A is total number of students, G01_A_A is total Grade 1 Students, etc. Only want to keep Grade 4 and Grade 8. 
Rems is removed columns, Reqs are required to keep.

PRIMARY_KEY is removed as it is information otherwise duplicated in the columns STATE and YEAR.

ENROLL is removed as the figure includes student numbers for grades other than 4 and 8.

The breakdown of revenue sources (Federal, State, and Local) is also not relevant to the total revenue of the school

Since we do not have gender breakdowns of results, aside from the results for "All Ethnicities", it is unnecessary to retain the gender split of each race in the enrollment columns. As such, all pairs of "X_M" and "X_F" can be summed into one "X_A" column.

This allows us to reduce 266 columns down to 64 in a "Simplified" dataframe.

In [None]:
print(Edu_Dataframe.columns)

In [None]:
Sim_Edu_df = Edu_Dataframe

Reqs = ['STATE', 'YEAR', 'TOTAL_REVENUE', 'TOTAL_EXPENDITURE', 'INSTRUCTION_EXPENDITURE', 'SUPPORT_SERVICES_EXPENDITURE', \
        'OTHER_EXPENDITURE', 'CAPITAL_OUTLAY_EXPENDITURE', 'G04_A_A', 'G08_A_A', 'G04_AM_F', 'G04_AM_M', 'G04_AS_F', \
        'G04_AS_M', 'G04_BL_F', 'G04_BL_M', 'G04_HI_F', 'G04_HI_M', 'G04_HP_F', 'G04_HP_M', 'G04_TR_F', 'G04_TR_M', \
        'G04_WH_F', 'G04_WH_M', 'G08_AM_F', 'G08_AM_M', 'G08_AS_F', 'G08_AS_M', 'G08_BL_F', 'G08_BL_M', 'G08_HI_F', \
        'G08_HI_M', 'G08_HP_F', 'G08_HP_M', 'G08_TR_F', 'G08_TR_M', 'G08_WH_F', 'G08_WH_M', 'G04_A_A_READING', \
        'G04_A_A_MATHEMATICS', 'G04_A_M_READING', 'G04_A_M_MATHEMATICS', 'G04_A_F_READING', 'G04_A_F_MATHEMATICS', \
        'G04_WH_A_READING', 'G04_WH_A_MATHEMATICS', 'G04_BL_A_READING', 'G04_BL_A_MATHEMATICS', 'G04_HI_A_READING', \
        'G04_HI_A_MATHEMATICS', 'G04_AS_A_READING', 'G04_AS_A_MATHEMATICS', 'G04_AM_A_READING', 'G04_AM_A_MATHEMATICS', \
        'G04_HP_A_READING', 'G04_HP_A_MATHEMATICS', 'G04_TR_A_READING', 'G04_TR_A_MATHEMATICS', 'G08_A_A_READING', \
        'G08_A_A_MATHEMATICS', 'G08_A_M_READING', 'G08_A_M_MATHEMATICS', 'G08_A_F_READING', 'G08_A_F_MATHEMATICS', \
        'G08_WH_A_READING', 'G08_WH_A_MATHEMATICS', 'G08_BL_A_READING', 'G08_BL_A_MATHEMATICS', 'G08_HI_A_READING', \
        'G08_HI_A_MATHEMATICS', 'G08_AS_A_READING', 'G08_AS_A_MATHEMATICS', 'G08_AM_A_READING', 'G08_AM_A_MATHEMATICS', \
        'G08_HP_A_READING', 'G08_HP_A_MATHEMATICS', 'G08_TR_A_READING', 'G08_TR_A_MATHEMATICS']


Rems = ['PRIMARY_KEY', 'ENROLL', 'FEDERAL_REVENUE', 'STATE_REVENUE', 'LOCAL_REVENUE', 'A_A_A', 'G01_A_A', 'G02_A_A', \
        'G03_A_A', 'G05_A_A', 'G06_A_A', 'G07_A_A', 'G09_A_A', 'G10_A_A', 'G11_A_A', 'G12_A_A', 'KG_A_A', 'PK_A_A', \
        'G01-G08_A_A', 'G09-G12_A_A', 'G01_AM_F', 'G01_AM_M', 'G01_AS_F', 'G01_AS_M', 'G01_BL_F', 'G01_BL_M', 'G01_HI_F', \
        'G01_HI_M', 'G01_HP_F', 'G01_HP_M', 'G01_TR_F', 'G01_TR_M', 'G01_WH_F', 'G01_WH_M', 'G02_AM_F', 'G02_AM_M', \
        'G02_AS_F', 'G02_AS_M', 'G02_BL_F', 'G02_BL_M', 'G02_HI_F', 'G02_HI_M', 'G02_HP_F', 'G02_HP_M', 'G02_TR_F', \
        'G02_TR_M', 'G02_WH_F', 'G02_WH_M', 'G03_AM_F', 'G03_AM_M', 'G03_AS_F', 'G03_AS_M', 'G03_BL_F', 'G03_BL_M', \
        'G03_HI_F', 'G03_HI_M', 'G03_HP_F', 'G03_HP_M', 'G03_TR_F', 'G03_TR_M', 'G03_WH_F', 'G03_WH_M', 'G05_AM_F', \
        'G05_AM_M', 'G05_AS_F', 'G05_AS_M', 'G05_BL_F', 'G05_BL_M', 'G05_HI_F', 'G05_HI_M', 'G05_HP_F', 'G05_HP_M', \
        'G05_TR_F', 'G05_TR_M', 'G05_WH_F', 'G05_WH_M', 'G06_AM_F', 'G06_AM_M', 'G06_AS_F', 'G06_AS_M', 'G06_BL_F', \
        'G06_BL_M', 'G06_HI_F', 'G06_HI_M', 'G06_HP_F', 'G06_HP_M', 'G06_TR_F', 'G06_TR_M', 'G06_WH_F', 'G06_WH_M', \
        'G07_AM_F', 'G07_AM_M', 'G07_AS_F', 'G07_AS_M', 'G07_BL_F', 'G07_BL_M', 'G07_HI_F', 'G07_HI_M', 'G07_HP_F', \
        'G07_HP_M', 'G07_TR_F', 'G07_TR_M', 'G07_WH_F', 'G07_WH_M', 'G09_AM_F', 'G09_AM_M', 'G09_AS_F', 'G09_AS_M', \
        'G09_BL_F', 'G09_BL_M', 'G09_HI_F', 'G09_HI_M', 'G09_HP_F', 'G09_HP_M', 'G09_TR_F', 'G09_TR_M', 'G09_WH_F', \
        'G09_WH_M', 'G10_AM_F', 'G10_AM_M', 'G10_AS_F', 'G10_AS_M', 'G10_BL_F', 'G10_BL_M', 'G10_HI_F', 'G10_HI_M', \
        'G10_HP_F', 'G10_HP_M', 'G10_TR_F', 'G10_TR_M', 'G10_WH_F', 'G10_WH_M', 'G11_AM_F', 'G11_AM_M', 'G11_AS_F', \
        'G11_AS_M', 'G11_BL_F', 'G11_BL_M', 'G11_HI_F', 'G11_HI_M', 'G11_HP_F', 'G11_HP_M', 'G11_TR_F', 'G11_TR_M', \
        'G11_WH_F', 'G11_WH_M', 'G12_AM_F', 'G12_AM_M', 'G12_AS_F', 'G12_AS_M', 'G12_BL_F', 'G12_BL_M', 'G12_HI_F', \
        'G12_HI_M', 'G12_HP_F', 'G12_HP_M', 'G12_TR_F', 'G12_TR_M', 'G12_WH_F', 'G12_WH_M', 'KG_AM_F', 'KG_AM_M', \
        'KG_AS_F', 'KG_AS_M', 'KG_BL_F', 'KG_BL_M', 'KG_HI_F', 'KG_HI_M', 'KG_HP_F', 'KG_HP_M', 'KG_TR_F', 'KG_TR_M', \
        'KG_WH_F', 'KG_WH_M', 'PK_AM_F', 'PK_AM_M', 'PK_AS_F', 'PK_AS_M', 'PK_BL_F', 'PK_BL_M', 'PK_HI_F', 'PK_HI_M', \
        'PK_HP_F', 'PK_HP_M', 'PK_TR_F', 'PK_TR_M', 'PK_WH_F', 'PK_WH_M']

for e in Rems:
    Sim_Edu_df = Sim_Edu_df.drop(e)

In [None]:
print(len(Edu_Dataframe.columns))
print(len(Sim_Edu_df.columns))

In [None]:
Eth = ["AM_", "AS_", "BL_", "HI_", "HP_", "TR_", "WH_"] 

print(len(Sim_Edu_df.columns))

for e in Eth:
    Sim_Edu_df = Sim_Edu_df.withColumn("G04_"+ e +"A", func.col("G04_" + e + "M")+func.col("G04_" + e + "F"))
    Sim_Edu_df = Sim_Edu_df.withColumn("G08_"+ e +"A", func.col("G08_" + e + "M")+func.col("G08_" + e + "F"))
    Sim_Edu_df = Sim_Edu_df.drop("G04_"+ e + "M")
    Sim_Edu_df = Sim_Edu_df.drop("G04_"+ e + "F")
    Sim_Edu_df = Sim_Edu_df.drop("G08_"+ e + "M")
    Sim_Edu_df = Sim_Edu_df.drop("G08_"+ e + "F")

print(len(Sim_Edu_df.columns))

In [None]:
print(Sim_Edu_df.columns)

Interestingly, from the below cell we can see that there is no data entered for 2018, even though 2019 seems to be included in the dataset.

In [None]:
Unique_Years = Sim_Edu_df.select(func.sort_array(*[func.collect_set("YEAR")]).alias("YEAR")).collect()[0]["YEAR"]
print("These are the unique years: " + str(Unique_Years))

print("")

Unique_States = Sim_Edu_df.select(func.sort_array(*[func.collect_set("STATE")]).alias("STATE")).collect()[0]["STATE"]
print("These are the unique States: " + str(Unique_States))

In [None]:
print(len(Unique_Years))

print(len(Unique_States))

Above we can see that there are 53 US states listed. It appears that DODEA (Department of Defence Education Activity), National, and The District Of Columbia have all be included on the list. The District of Columbia is not officially a state due to the US constitution, however for our purposes, it functions identically, and so shall be included.
The number of occurences of each state in the dataframe:

In [None]:
for c in Unique_States:
    print(str(c) + ": " + str(Sim_Edu_df.select("STATE").where((Sim_Edu_df["STATE"] == c)).count()))

There are 33 instances (The number of years the dataset covers) of each "State", besides National and DODEA, which is fortunate as we plan to remove those, using the code below.

In [None]:
State_Clean_Edu = Sim_Edu_df
for c in Unique_States:
    if Sim_Edu_df.select("STATE").where((Sim_Edu_df["STATE"] == c)).count() < 33:
        State_Clean_Edu = State_Clean_Edu.where(Sim_Edu_df["STATE"] != c)

In [None]:
State_Clean_Edu.select("STATE").count()

In [None]:
Unique_Years_2 = State_Clean_Edu.select(func.sort_array(*[func.collect_set("YEAR")]).alias("YEAR")).collect()[0]["YEAR"]
print("These are the unique years: " + str(Unique_Years_2))

print("")

Unique_States_2 = State_Clean_Edu.select(func.sort_array(*[func.collect_set("STATE")]).alias("STATE")).collect()[0]["STATE"]
print("These are the unique States: " + str(Unique_States_2))

In [None]:
print(len(Unique_Years_2))

print(len(Unique_States_2))

The numbers of null values per column are listed below

In [None]:
Null_Counts = {}

for c in State_Clean_Edu.columns:
    Null_Counts[c] = State_Clean_Edu.filter( \
    (State_Clean_Edu[c] == "") | State_Clean_Edu[c].isNull() | func.isnan(State_Clean_Edu[c]) \
    ).count()

print(Null_Counts)


In [None]:
State_Clean_Edu.columns

The 2019 year data does not contain the Revenue, Expenditure, or Student Counts for each state, only showing the average score for each demographic. As such, there is not enough information to compare this against, and the rows will be purged.

In [None]:
Null_G04_df = State_Clean_Edu.filter(State_Clean_Edu["G04_A_A"].isNull() == True)
Null_G08_df = State_Clean_Edu.filter(State_Clean_Edu["G08_A_A"].isNull() == True)
Null_Rev_df = State_Clean_Edu.filter(State_Clean_Edu["TOTAL_REVENUE"].isNull() == True)
Null_Exp_df = State_Clean_Edu.filter(State_Clean_Edu["TOTAL_EXPENDITURE"].isNull() == True)

#Unique years in Null_G04_df
print(Null_G04_df.select(func.sort_array(*[func.collect_set("YEAR")]).alias("YEAR")).collect()[0]["YEAR"])

#Unique years in Null_G08_df
print(Null_G08_df.select(func.sort_array(*[func.collect_set("YEAR")]).alias("YEAR")).collect()[0]["YEAR"])

#Unique years in Null_Rev_df
print(Null_Rev_df.select(func.sort_array(*[func.collect_set("YEAR")]).alias("YEAR")).collect()[0]["YEAR"])

#Unique years in Null_Exp_df
print(Null_Exp_df.select(func.sort_array(*[func.collect_set("YEAR")]).alias("YEAR")).collect()[0]["YEAR"])

Clean_df = State_Clean_Edu.filter(State_Clean_Edu["G04_A_A"].isNull() == False)

In [None]:
Clean_df = Clean_df.where(Clean_df["YEAR"] != 2019)

In [None]:
Num_Null = 0

for c in Null_G04_df.columns:
    Num_Null += Clean_df.where(Clean_df[c].isNull()).count()

print(Num_Null)

As we can see below, there are 357 null values in the financial columns. This is equal to 51 * 7, so we would expect all states to be missing fiscal data for 7 years. This is found to be correct in the following cells, and the incomplete years removed.

In [None]:
print(Clean_df.where(Clean_df["TOTAL_EXPENDITURE"].isNull()).count())
print(Clean_df.where(Clean_df["TOTAL_REVENUE"].isNull()).count())
print(Clean_df.where(Clean_df["INSTRUCTION_EXPENDITURE"].isNull()).count())
print(Clean_df.where(Clean_df["SUPPORT_SERVICES_EXPENDITURE"].isNull()).count())

In [None]:
for c in ["TOTAL_REVENUE", "TOTAL_EXPENDITURE", "INSTRUCTION_EXPENDITURE", "SUPPORT_SERVICES_EXPENDITURE"]:
    Null_REV_df = Clean_df.filter(Clean_df[c].isNull() == True)
    print(Null_REV_df.select(func.sort_array(*[func.collect_set("YEAR")]).alias("YEAR")).collect()[0]["YEAR"])

In [None]:
Clean_df = Clean_df.filter(~Clean_df["YEAR"].isin([1986, 1987, 1988, 1989, 1990, 1991, 2017]))

Clean_df.count()

In [None]:
CleanPandas = Clean_df.toPandas()

CleanPandas

In [None]:
Null_Counts = {}

for c in Clean_df.columns:
    Null_Counts[c] = Clean_df.filter((Clean_df[c] == "") | Clean_df[c].isNull() | func.isnan(Clean_df[c])).count()

print(Null_Counts)

In [None]:
print("Num of Cols of Results: " + str(len(['G04_A_A_READING', 'G04_A_A_MATHEMATICS', 'G04_A_M_READING', 'G04_A_M_MATHEMATICS', 'G04_A_F_READING', 'G04_A_F_MATHEMATICS', 'G04_WH_A_READING', 'G04_WH_A_MATHEMATICS', 'G04_BL_A_READING', 'G04_BL_A_MATHEMATICS', 'G04_HI_A_READING', 'G04_HI_A_MATHEMATICS', 'G04_AS_A_READING', 'G04_AS_A_MATHEMATICS', 'G04_AM_A_READING', 'G04_AM_A_MATHEMATICS', 'G04_HP_A_READING', 'G04_HP_A_MATHEMATICS', 'G04_TR_A_READING', 'G04_TR_A_MATHEMATICS', 'G08_A_A_READING', 'G08_A_A_MATHEMATICS', 'G08_A_M_READING', 'G08_A_M_MATHEMATICS', 'G08_A_F_READING', 'G08_A_F_MATHEMATICS', 'G08_WH_A_READING', 'G08_WH_A_MATHEMATICS', 'G08_BL_A_READING', 'G08_BL_A_MATHEMATICS', 'G08_HI_A_READING', 'G08_HI_A_MATHEMATICS', 'G08_AS_A_READING', 'G08_AS_A_MATHEMATICS', 'G08_AM_A_READING', 'G08_AM_A_MATHEMATICS', 'G08_HP_A_READING', 'G08_HP_A_MATHEMATICS', 'G08_TR_A_READING', 'G08_TR_A_MATHEMATICS'])))

print("Num of Cols of Student Nums: " + str(len(['G04_A_A', 'G08_A_A', 'G04_AM_A', 'G08_AM_A', 'G04_AS_A', 'G08_AS_A', 'G04_BL_A', 'G08_BL_A', 'G04_HI_A', 'G08_HI_A', 'G04_HP_A', 'G08_HP_A', 'G04_TR_A', 'G08_TR_A', 'G04_WH_A', 'G08_WH_A'])))

In [None]:
Stud_Num_df = Clean_df['G04_A_A', 'G08_A_A', 'G04_AM_A', 'G08_AM_A', 'G04_AS_A', 'G08_AS_A', 'G04_BL_A', 'G08_BL_A', 'G04_HI_A', 'G08_HI_A', 'G04_HP_A', 'G08_HP_A', 'G04_TR_A', 'G08_TR_A', 'G04_WH_A', 'G08_WH_A']
Result_df = Clean_df['G04_A_A_READING', 'G04_A_A_MATHEMATICS', 'G04_A_M_READING', 'G04_A_M_MATHEMATICS', 'G04_A_F_READING', 'G04_A_F_MATHEMATICS', 'G04_WH_A_READING', 'G04_WH_A_MATHEMATICS', 'G04_BL_A_READING', 'G04_BL_A_MATHEMATICS', 'G04_HI_A_READING', 'G04_HI_A_MATHEMATICS', 'G04_AS_A_READING', 'G04_AS_A_MATHEMATICS', 'G04_AM_A_READING', 'G04_AM_A_MATHEMATICS', 'G04_HP_A_READING', 'G04_HP_A_MATHEMATICS', 'G04_TR_A_READING', 'G04_TR_A_MATHEMATICS', 'G08_A_A_READING', 'G08_A_A_MATHEMATICS', 'G08_A_M_READING', 'G08_A_M_MATHEMATICS', 'G08_A_F_READING', 'G08_A_F_MATHEMATICS', 'G08_WH_A_READING', 'G08_WH_A_MATHEMATICS', 'G08_BL_A_READING', 'G08_BL_A_MATHEMATICS', 'G08_HI_A_READING', 'G08_HI_A_MATHEMATICS', 'G08_AS_A_READING', 'G08_AS_A_MATHEMATICS', 'G08_AM_A_READING', 'G08_AM_A_MATHEMATICS', 'G08_HP_A_READING', 'G08_HP_A_MATHEMATICS', 'G08_TR_A_READING', 'G08_TR_A_MATHEMATICS']

OnlyNull_Results = Result_df.withColumn('ResultNulls', sum(Result_df[col].isNull().cast('int') for col in Result_df.columns))

OnlyNull_StudentNo = Stud_Num_df.withColumn('StudNoNulls', sum(Stud_Num_df[col].isNull().cast('int') for col in Stud_Num_df.columns))

Total_Nulls = Clean_df.withColumn('ResultNulls', sum(Result_df[col].isNull().cast('int') for col in Result_df.columns))

Total_Nulls = Total_Nulls.withColumn('StudNoNulls', sum(Stud_Num_df[col].isNull().cast('int') for col in Stud_Num_df.columns))

In [None]:
NullResultPD = OnlyNull_Results.toPandas()
NullStudentNoPD = OnlyNull_StudentNo.toPandas()

When G04 and G08's Mathematics and Reading results are missing, all demographic result data is also absent, so we cannot calculate it from these.

In [None]:
Test = OnlyNull_Results.where(OnlyNull_Results["G04_A_A_MATHEMATICS"].isNull() & OnlyNull_Results["G04_A_A_READING"].isNull() & OnlyNull_Results["G08_A_A_MATHEMATICS"].isNull() & OnlyNull_Results["G08_A_A_READING"].isNull())

NullResultPDTest = Test.toPandas()

Null_Counter = 0

for c in Test.columns:
    Null_Counter += Test.filter((Test[c] == "") | Test[c].isNull() | func.isnan(Test[c])).count()

print(Null_Counter)
    
NullResultPDTest

In [None]:
711*40

In [None]:
Test2 = OnlyNull_Results.where(~(OnlyNull_Results["G04_A_A_MATHEMATICS"].isNull() & OnlyNull_Results["G04_A_A_READING"].isNull() & OnlyNull_Results["G08_A_A_MATHEMATICS"].isNull() & OnlyNull_Results["G08_A_A_READING"].isNull()))

SomeResultPDTest = Test2.toPandas()

Null_Counter = 0

for c in Test.columns:
    Null_Counter += Test2.filter((Test2[c] == "") | Test2[c].isNull() | func.isnan(Test2[c])).count()

print(Null_Counter)
    
SomeResultPDTest

In [None]:
TotalNullPD = Total_Nulls.toPandas()
print(Total_Nulls.columns)
TotalNullPD

In [None]:
FilteredPD = Total_Nulls.where((Total_Nulls["ResultNulls"] == 40) & (Total_Nulls["StudNoNulls"] == 14)).toPandas()

FilteredPD

In [None]:
# Averaging results in column

def Means_Of_Cols(df, Incomplete_cols, verbose=False):
    Col_Means=[]
    for c in Incomplete_cols:
        Mean = df.select(func.avg(df[c]))
        Avg = Mean.columns[0]
        Result = Mean.rdd.map(lambda row : row[Avg]).collect()
        
        if (verbose==True): print(Mean.columns[0], result[0])
        Col_Means.append([c, Result[0]])    
    return Col_Means

In [None]:
Missing_Res_Data = ["G04_A_A_READING", "G04_A_A_MATHEMATICS", "G08_A_A_READING", "G08_A_A_MATHEMATICS"]

print("Mean values: ",  Means_Of_Cols(Clean_df,Missing_Res_Data))

In [None]:
#Replacing Null with mean

def Null_to_mean(df, numeric_cols):
    col_mean = Means_Of_Cols(df, numeric_cols) 
    
    for col, mean in col_mean:
        df = df.withColumn(col, func.when(df[col].isNull(), func.lit(mean)).otherwise(df[col]))
        
    return df

In [None]:
Clean_By_State = Clean_df.where(Clean_df["STATE"] == "NotAState")

for s in Unique_States_2:
    temp_df = Null_to_mean(Clean_df.where(Clean_df["STATE"] == s), Missing_Res_Data)
    Clean_By_State = Clean_By_State.union(temp_df)

In [None]:
Clean_By_State.select("G04_A_A_MATHEMATICS").where(Clean_By_State["G04_A_A_MATHEMATICS"].isNull()).count()

In [None]:
Clean_By_State.select("STATE", "YEAR", "G04_A_A_READING", "G04_A_A_MATHEMATICS", "G08_A_A_READING", "G08_A_A_MATHEMATICS").toPandas()

In [None]:
Null_Counts = {}

for c in Clean_By_State.columns:
    Null_Counts[c] = Clean_By_State.filter((Clean_By_State[c] == "") | Clean_By_State[c].isNull() | func.isnan(Clean_By_State[c])).count()

print(Null_Counts)

In [None]:
Years_in_Clean = Clean_By_State.select(func.sort_array(*[func.collect_set("YEAR")]).alias("YEAR")).collect()[0]["YEAR"]

In [None]:
for y in Years_in_Clean:
    print(str(y) + ": " + str(Clean_By_State.where((func.col("YEAR") == y) & (func.col("G04_A_A_MATHEMATICS").isNull() == False)).count()))

In [None]:
ByYear = Clean_By_State.groupBy('YEAR').agg(func.sum('TOTAL_REVENUE').alias('REVENUE_BY_YEAR'), \
(func.sum('G04_A_A') + func.sum('G08_A_A')).alias('STUDENT_BY_YEAR'))
     
ByYear = ByYear.withColumn("Dollars_Per_Students", (func.col("REVENUE_BY_YEAR") / func.col("STUDENT_BY_YEAR")))
    
Temp = Clean_By_State.groupBy('YEAR').agg((func.sum('G04_A_A_READING')).alias('Average_G04_Reading'), \
(func.sum('G08_A_A_READING')).alias('Average_G08_Reading'), \
(func.sum('G04_A_A_MATHEMATICS')).alias('Average_G04_Mathematics'), \
(func.sum('G08_A_A_MATHEMATICS')).alias('Average_G08_Mathematics'), \
)  

Num_G04_Maths_Datapoints = []
Num_G04_Read_Datapoints = []
Num_G08_Maths_Datapoints = []
Num_G08_Read_Datapoints = []

for y in Years_in_Clean:
    Num_G04_Maths_Datapoints.append(51)
    Num_G04_Read_Datapoints.append(51)
    Num_G08_Maths_Datapoints.append(51)
    Num_G08_Read_Datapoints.append(51)
      
G04_Maths_Temps = sqlContext.createDataFrame(zip(Years_in_Clean, Num_G04_Maths_Datapoints), \
         schema = ["Year", "G04_Maths_Datapoints"])
G04_Read_Temps = sqlContext.createDataFrame(zip(Years_in_Clean, Num_G04_Read_Datapoints), \
         schema = ["Year", "G04_Read_Datapoints"])
G08_Maths_Temps = sqlContext.createDataFrame(zip(Years_in_Clean, Num_G08_Maths_Datapoints), \
         schema = ["Year", "G08_Maths_Datapoints"])
G08_Read_Temps = sqlContext.createDataFrame(zip(Years_in_Clean, Num_G08_Read_Datapoints), \
         schema = ["Year", "G08_Read_Datapoints"])


ByYear = ByYear.join(Temp, on= ["YEAR"], how= "inner")

ByYear = ByYear.join(G04_Maths_Temps, on=["YEAR"], how="inner")
ByYear = ByYear.join(G04_Read_Temps, on=["YEAR"], how="inner")
ByYear = ByYear.join(G08_Maths_Temps, on=["YEAR"], how="inner")
ByYear = ByYear.join(G08_Read_Temps, on=["YEAR"], how="inner")


ByYear = ByYear.withColumn("Average_G04_Mathematics", func.col("Average_G04_Mathematics") / func.col("G04_Maths_Datapoints"))
ByYear = ByYear.withColumn("Average_G04_Reading", func.col("Average_G04_Reading") / func.col("G04_Read_Datapoints"))
ByYear = ByYear.withColumn("Average_G08_Mathematics", func.col("Average_G08_Mathematics") / func.col("G08_Maths_Datapoints"))
ByYear = ByYear.withColumn("Average_G08_Reading", func.col("Average_G08_Reading") / func.col("G08_Read_Datapoints"))


ByYear = ByYear.drop("G04_Maths_Datapoints")
ByYear = ByYear.drop("G04_Read_Datapoints")
ByYear = ByYear.drop("G08_Maths_Datapoints")
ByYear = ByYear.drop("G08_Read_Datapoints")

ByYear = ByYear.orderBy("YEAR")
PdByYear = ByYear.toPandas()

PdByYear

In [None]:
State_List = Sim_Edu_df.select(func.sort_array(*[func.collect_set("STATE")]).alias("STATE")).collect()[0]["STATE"]

ByState = Clean_By_State.groupBy('STATE').agg(func.sum('TOTAL_REVENUE').alias('REVENUE_BY_STATE'), \
(func.sum('G04_A_A') + func.sum('G08_A_A')).alias('STUDENT_BY_STATE'))
     
ByState = ByState.withColumn("Dollars_Per_Students", (func.col("REVENUE_BY_STATE") / func.col("STUDENT_BY_STATE")))
    
Temp = Clean_By_State.groupBy('STATE').agg((func.sum('G04_A_A_READING')).alias('Average_G04_Reading'), \
(func.sum('G08_A_A_READING')).alias('Average_G08_Reading'), \
(func.sum('G04_A_A_MATHEMATICS')).alias('Average_G04_Mathematics'), \
(func.sum('G08_A_A_MATHEMATICS')).alias('Average_G08_Mathematics'), \
)  

Num_G04_Maths_Datapoints = []
Num_G04_Read_Datapoints = []
Num_G08_Maths_Datapoints = []
Num_G08_Read_Datapoints = []

for s in State_List:
    Num_G04_Maths_Datapoints.append(25)
    Num_G04_Read_Datapoints.append(25)
    Num_G08_Maths_Datapoints.append(25)
    Num_G08_Read_Datapoints.append(25)
        
G04_Maths_Temps = sqlContext.createDataFrame(zip(State_List, Num_G04_Maths_Datapoints), \
        schema = ["State", "G04_Maths_Datapoints"])
G04_Read_Temps = sqlContext.createDataFrame(zip(State_List, Num_G04_Read_Datapoints), \
        schema = ["State", "G04_Read_Datapoints"])
G08_Maths_Temps = sqlContext.createDataFrame(zip(State_List, Num_G08_Maths_Datapoints), \
        schema = ["State", "G08_Maths_Datapoints"])
G08_Read_Temps = sqlContext.createDataFrame(zip(State_List, Num_G08_Read_Datapoints), \
        schema = ["State", "G08_Read_Datapoints"])


ByState = ByState.join(Temp, on= ["STATE"], how= "inner")

ByState = ByState.join(G04_Maths_Temps, on=["STATE"], how="inner")
ByState = ByState.join(G04_Read_Temps, on=["STATE"], how="inner")
ByState = ByState.join(G08_Maths_Temps, on=["STATE"], how="inner")
ByState = ByState.join(G08_Read_Temps, on=["STATE"], how="inner")


ByState = ByState.withColumn("Average_G04_Mathematics", func.col("Average_G04_Mathematics") / func.col("G04_Maths_Datapoints"))
ByState = ByState.withColumn("Average_G04_Reading", func.col("Average_G04_Reading") / func.col("G04_Read_Datapoints"))
ByState = ByState.withColumn("Average_G08_Mathematics", func.col("Average_G08_Mathematics") / func.col("G08_Maths_Datapoints"))
ByState = ByState.withColumn("Average_G08_Reading", func.col("Average_G08_Reading") / func.col("G08_Read_Datapoints"))


ByState = ByState.drop("G04_Maths_Datapoints")
ByState = ByState.drop("G04_Read_Datapoints")
ByState = ByState.drop("G08_Maths_Datapoints")
ByState = ByState.drop("G08_Read_Datapoints")

ByState = ByState.orderBy("STATE")
PdByState = ByState.toPandas()

PdByState

In [None]:
Clean_By_State.repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save("Cleaned_Data")

fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path

list_status = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path("Cleaned_Data/"))

filePath = "Cleaned_Data/"
for file in list_status: 
    if file.getPath().getName().startswith('part-') == True:
        fileName = file.getPath().getName()

fs.rename(Path(filePath+fileName), Path(filePath+"Cleaned_Data.csv"))

In [None]:
ByYear.repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save("Yearly_Data")

list_status2 = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path("Yearly_Data/"))

filePath2 = "Yearly_Data/"
for file in list_status2: 
    if file.getPath().getName().startswith('part-') == True:
        fileName2 = file.getPath().getName()
        
fs.rename(Path(filePath2+fileName2), Path(filePath2+"Yearly_Data.csv"))

In [None]:
ByState.repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save("Stately_Data")

list_status3 = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path("Stately_Data/"))

filePath3 = "Stately_Data/"
for file in list_status3: 
    if file.getPath().getName().startswith('part-') == True:
        fileName3 = file.getPath().getName()
        
fs.rename(Path(filePath3+fileName3), Path(filePath3+"Stately_Data.csv"))

### Data Visualisation

In [None]:
pd.set_option("display.max_rows", None)
pd.set_option("display.max_columns", None)

Clean_Corr = Clean_By_State.toPandas().corr()

Clean_Corr2 = Clean_Corr.loc[Clean_Corr["TOTAL_REVENUE"] > 0.75]

Clean_Corr3 = Clean_Corr.loc[(Clean_Corr["G04_A_A_READING"] > 0.75) | (Clean_Corr["G04_A_A_MATHEMATICS"] > 0.75) | \
            (Clean_Corr["G08_A_A_READING"] > 0.75) | (Clean_Corr["G08_A_A_MATHEMATICS"] > 0.75)] \
            [["G04_A_A_READING", "G04_A_A_MATHEMATICS", "G08_A_A_READING", "G08_A_A_MATHEMATICS"]]

Clean_Corr

In [None]:
Clean_Corr2

In [None]:
Clean_Corr3

In [None]:
Clean_Scatter = Clean_df.select("YEAR", "TOTAL_REVENUE", "G04_A_A", "G08_A_A", "G04_A_A_READING", "G04_A_A_MATHEMATICS", "G08_A_A_READING", "G08_A_A_MATHEMATICS")

import seaborn as sns
sns.set(style="ticks")

Clean_df_plot = sns.pairplot(Clean_Scatter.toPandas())

### Analysis

In [None]:
from pyspark.ml.feature import VectorAssembler
Rev_vs_Maths = Clean_By_State.select(Clean_By_State["Total_Revenue"],Clean_By_State["G04_A_A_MATHEMATICS"].alias('G04_Maths'))
train, test = Rev_vs_Maths.randomSplit([0.7,0.3])

#Vectorise Total_Revenue, rename as Revenue
assembler = VectorAssembler().setInputCols(["Total_Revenue",]).setOutputCol("Revenue")
train = assembler.transform(train)

# Keep Result and Vectorised Revenue column, drop original Total_Revenue
train = train.select("Revenue","G04_Maths")
train.show(truncate=False)

In [None]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'Revenue', labelCol='G04_Maths', maxIter=10, regParam=0.3, elasticNetParam=0.8)
model = lr.fit(train)
test = assembler.transform(test)
test = test.select('Revenue', 'G04_Maths')
test = model.transform(test)
test.show(truncate=False)

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol = "G04_Maths")

#R squared test
print("R2 Test: " + str(evaluator.evaluate(test,{evaluator.metricName: "r2"})))

#Mean Squared Error
print("MSE Test: " + str(evaluator.evaluate(test,{evaluator.metricName: "mse"})))

#Root Mean Squared Error
print("RMSE Test: " + str(evaluator.evaluate(test,{evaluator.metricName: "rmse"})))

## Multivariant Analysis

In [None]:
from pyspark.ml.feature import VectorAssembler
train, test = Clean_By_State.randomSplit([0.7,0.3])
assembler = VectorAssembler().setInputCols(["G04_A_A", "G08_A_A", "G04_A_A_READING", "G04_A_A_MATHEMATICS", \
                                            "G08_A_A_READING", "G08_A_A_MATHEMATICS"]).setOutputCol('features')
train = assembler.transform(train)
train = train.select("features","Total_Revenue")
train.show(truncate=False)

In [None]:
# Import LinearRegression class
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='Total_Revenue', maxIter=10, regParam=0.3, elasticNetParam=0.8)
model = lr.fit(train)
test = assembler.transform(test)
test = test.select('features', 'Total_Revenue')
test = model.transform(test)
test.show(truncate=False)

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol = "Total_Revenue")

#R squared test
print("R2 Test: " + str(evaluator.evaluate(test,{evaluator.metricName: "r2"})))

#Mean Squared Error
print("MSE Test: " + str(evaluator.evaluate(test,{evaluator.metricName: "mse"})))

#Root Mean Squared Error
print("RMSE Test: " + str(evaluator.evaluate(test,{evaluator.metricName: "rmse"})))

In [None]:
test.describe().show()

In [None]:
list(zip(["G04_A_A", "G08_A_A", "G04_A_A_READING", "G04_A_A_MATHEMATICS", "G08_A_A_READING", "G08_A_A_MATHEMATICS"], \
         model.coefficients))