In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml import *
from pyspark.ml.regression import LinearRegression , DecisionTreeRegressor, RandomForestRegressor, GBTRegressionModel, GBTRegressor

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml.feature import VectorIndexer
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml.linalg import Vectors

from pyspark.ml.evaluation import BinaryClassificationEvaluator

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

from pyspark.context import SparkContext
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.session import SparkSession

In [0]:
#PYSPARK_CLI = True
#if PYSPARK_CLI:
	#sc = SparkContext.getOrCreate()
	#spark = SparkSession(sc)

In [0]:
# File location and type
#file_location = "/FileStore/tables/2016_UniversityOfCalifornia.csv"
path = ['/FileStore/tables/Project/*.csv']
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type).option("inferSchema", infer_schema)\
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(path)


df.show()

In [0]:
print("Whole Data Set")
display(df)

In [0]:
df.printSchema()

# Create a view or table

temp_table_name = "masterTable"

df.createOrReplaceTempView(temp_table_name)

dftemp=df.select('Year','EmployerName','Position','TotalWages', 'RegularPay', 'MaxPositionSalary', 'MinPositionSalary')
#dftemp.show()

# Commented out IPython magic to ensure Python compatibility.
# %sql
#select * from masterTable`

#df.take(10)

#df_sql = spark.sql("select * from masterTable LIMIT 10")
#display(df_sql)

#df.printSchema()

df2 = df.drop("DepartmentOrSubdivision", "ElectedOfficial", "Judicial","OtherPositions", "DefinedBenefitPlanContribution","EmployeesRetirementCostCovered", "DeferredCompensationPlan","HealthDentalVision", "TotalRetirementAndHealthContribution","PensionFormula", "EmployerURL","EmployerPopulation","IncludesUnfundedLiability", "SpecialDistrictActivities"," IncludesUnfundedLiability","SpecialDistrictType")
#df2.printSchema()

print("Dataset with Dropped Columns")
#df2.show()

root
 |-- Year: integer (nullable = true)
 |-- EmployerType: string (nullable = true)
 |-- EmployerName: string (nullable = true)
 |-- DepartmentOrSubdivision: string (nullable = true)
 |-- Position: string (nullable = true)
 |-- ElectedOfficial: boolean (nullable = true)
 |-- Judicial: boolean (nullable = true)
 |-- OtherPositions: string (nullable = true)
 |-- MinPositionSalary: double (nullable = true)
 |-- MaxPositionSalary: double (nullable = true)
 |-- ReportedBaseWage: string (nullable = true)
 |-- RegularPay: double (nullable = true)
 |-- OvertimePay: double (nullable = true)
 |-- LumpSumPay: double (nullable = true)
 |-- OtherPay: double (nullable = true)
 |-- TotalWages: double (nullable = true)
 |-- DefinedBenefitPlanContribution: double (nullable = true)
 |-- EmployeesRetirementCostCovered: double (nullable = true)
 |-- DeferredCompensationPlan: double (nullable = true)
 |-- HealthDentalVision: double (nullable = true)
 |-- TotalRetirementAndHealthContribution: double (null

In [0]:
df3 = df2.na.fill(value=0).na.fill("NA")
print("Dataset with Dropped Columns")
df3.show()

Dataset with Dropped Columns
+----+--------------+------------+--------------------+-----------------+-----------------+----------------+----------+-----------+----------+--------+----------+---------------+--------------+
|Year|  EmployerType|EmployerName|            Position|MinPositionSalary|MaxPositionSalary|ReportedBaseWage|RegularPay|OvertimePay|LumpSumPay|OtherPay|TotalWages|LastUpdatedDate|EmployerCounty|
+----+--------------+------------+--------------------+-----------------+-----------------+----------------+----------+-----------+----------+--------+----------+---------------+--------------+
|2015|K-12 Education| ABC Unified|Assistant Supt-Ac...|         174489.0|         174489.0|              NA|  164375.0|        0.0|       0.0| 25677.0|  190052.0|     09/06/2017|   Los Angeles|
|2015|K-12 Education| ABC Unified|Child Care Specia...|              0.0|              0.0|              NA|    2313.0|        0.0|       0.0|     0.0|    2313.0|     09/06/2017|   Los Angeles|
|

In [0]:
splits = df3.randomSplit([0.7,0.3])
train = splits[0]
test = splits[1]
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, "Testing Rows:", test_rows)

Training Rows: 6553298 Testing Rows: 2806965


In [0]:
assembler = VectorAssembler(inputCols = ["RegularPay", "TotalWages","MaxPositionSalary", "MinPositionSalary"], outputCol="wages")

In [0]:
training = assembler.transform(train)
print("Displaying results from Training the dataset")
training.show()

Displaying results from Training the dataset
+----+--------------+------------+--------------------+-----------------+-----------------+----------------+----------+-----------+----------+--------+----------+---------------+--------------+--------------------+
|Year|  EmployerType|EmployerName|            Position|MinPositionSalary|MaxPositionSalary|ReportedBaseWage|RegularPay|OvertimePay|LumpSumPay|OtherPay|TotalWages|LastUpdatedDate|EmployerCounty|               wages|
+----+--------------+------------+--------------------+-----------------+-----------------+----------------+----------+-----------+----------+--------+----------+---------------+--------------+--------------------+
|2015|K-12 Education| ABC Unified|Abc Sec School In...|              0.0|              0.0|              NA|     276.0|        0.0|       0.0|     0.0|     276.0|     09/06/2017|   Los Angeles|[276.0,276.0,0.0,...|
|2015|K-12 Education| ABC Unified|Abc Sec School In...|              0.0|              0.0|    

In [0]:
rf = RandomForestRegressor(labelCol="TotalWages", featuresCol="wages", numTrees=10, maxDepth=5)

In [0]:
pipeline = Pipeline(stages=[assembler, rf])
print("Displaying results from Staging the Assembler and RF")
model = pipeline.fit(train)

Displaying results from Staging the Assembler and RF


In [0]:
rfModel = model.stages[-1]
print(rfModel.toDebugString)

#importance column name can be changed

# import pandas as pd
#featureImp = pd.DataFrame(list(zip(assembler.getInputCols(),
#rfModel.featureImportances)),
#columns=["trueWages", "importance"])
#featureImp.sort_values(by="importance", ascending=False)

In [0]:
rf = RandomForestRegressor(labelCol="TotalWages",featuresCol="wages", numTrees=10, maxDepth=5)
# Combine stages into pipeline
pipeline = Pipeline(stages=[assembler, rf])
model = pipeline.fit(train)

In [0]:
prediction = model.transform(test)
predicted = prediction.select("wages", "prediction", "TotalWages")
print("Displaying results from the Prediction")
predicted.show()

#display(predicted)

Displaying results from the Prediction
+--------------------+------------------+----------+
|               wages|        prediction|TotalWages|
+--------------------+------------------+----------+
|[31897.0,34289.0,...| 32230.43158413585|   34289.0|
|[47347.0,50478.0,...|  51539.4633605598|   50478.0|
|[55969.0,61586.0,...| 60055.20260306975|   61586.0|
|[56294.0,60744.0,...| 60055.20260306975|   60744.0|
|[540.0,540.0,0.0,...| 1943.944408420543|     540.0|
|[38979.0,39626.0,...| 42301.02740924526|   39626.0|
|[44389.0,44840.0,...|46544.566807735435|   44840.0|
|[45575.0,46305.0,...| 49426.14765858212|   46305.0|
|[46973.0,47931.0,...| 49864.32043488817|   47931.0|
|[49471.0,50219.0,...|51942.136885027096|   50219.0|
|[59020.0,59992.0,...| 65917.31575096237|   59992.0|
|[64563.0,65827.0,...|  70290.5658378546|   65827.0|
|[64803.0,65871.0,...|  70290.5658378546|   65871.0|
|[65833.0,66916.0,...|  70290.5658378546|   66916.0|
|[66031.0,67609.0,...|  70290.5658378546|   67609.0|
|[2957.

In [0]:
#from pyspark.ml.evaluation import RegressionEvaluator
rf_evaluator = RegressionEvaluator(predictionCol="prediction", \
                                   labelCol="TotalWages",metricName="r2")
print("R Squared (R2) on test data = %g" % rf_evaluator.evaluate(prediction))
rf_evaluator = RegressionEvaluator(labelCol="TotalWages", predictionCol="prediction", metricName="rmse")
print("RMSE: %f" % rf_evaluator.evaluate(prediction))

#Starting GBT

R Squared (R2) on test data = 0.889008
RMSE: 17253.777949


In [0]:
df3 = df2.na.fill(value=0).na.fill("NA")
#df3.show()

In [0]:
splits = df3.randomSplit([0.7,0.3])
train = splits[0]
test = splits[1]
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, "Testing Rows:", test_rows)

Training Rows: 6550260 Testing Rows: 2810003


In [0]:
assembler = VectorAssembler(inputCols = ["RegularPay", "TotalWages","MaxPositionSalary", "MinPositionSalary"], outputCol="wages")

In [0]:
training = assembler.transform(train)
#training.show()

In [0]:
gbt = GBTRegressor(labelCol="TotalWages", featuresCol="wages", maxDepth=5)

In [0]:
paramGrid = ParamGridBuilder()\
.addGrid(gbt.maxDepth, [2, 5])\
.addGrid(gbt.maxIter, [10, 20])\
.build()

In [0]:
#gbt_evaluator = RegressionEvaluator(predictionCol="prediction", \ labelCol="TotalWages",metricName="r2")

gbt_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="TotalWages", metricName="r2")

In [0]:
pipeline = Pipeline(stages=[assembler, gbt])

In [0]:
cv = CrossValidator(estimator=pipeline, evaluator= gbt_evaluator, estimatorParamMaps=paramGrid)

In [0]:
print("Displaying results from CrossValidation with the trained dataset")
model = cv.fit(train)


Displaying results from CrossValidation with the trained dataset


In [0]:
prediction = model.transform(test)
predicted = prediction.select("wages", "prediction", "TotalWages")
print("Displaying results from TrainValidationSplit with the Predicted dataset")
predicted.show()

Displaying results from TrainValidationSplit with the Predicted dataset
+--------------------+------------------+----------+
|               wages|        prediction|TotalWages|
+--------------------+------------------+----------+
|[31897.0,34289.0,...|  34572.5588365633|   34289.0|
|[43955.0,46998.0,...| 48045.62950520257|   46998.0|
|[47347.0,49172.0,...| 48045.62950520257|   49172.0|
|[47347.0,50478.0,...|52462.623924493986|   50478.0|
|[55969.0,61586.0,...|61653.309718778386|   61586.0|
|[56294.0,61863.0,...|61653.309718778386|   61863.0|
|[3317.0,12724.0,6...|12939.672514724653|   12724.0|
|[38979.0,39626.0,...| 39336.42602898334|   39626.0|
|[44389.0,44840.0,...| 43703.13219409577|   44840.0|
|[45575.0,46305.0,...| 48051.07997088548|   46305.0|
|[49471.0,50219.0,...|  52468.0743901769|   50219.0|
|[55745.0,56705.0,...| 56893.88048049886|   56705.0|
|[66723.0,67795.0,...| 67897.45906108731|   67795.0|
|[1059.0,1059.0,0....| 915.6912916537415|    1059.0|
|[3233.0,3270.0,0....| 3124

In [0]:
print("R Squared (R2) on test data = %g" % gbt_evaluator.evaluate(prediction))
gbt_evaluator = RegressionEvaluator(labelCol="TotalWages", predictionCol="prediction", metricName="rmse")
print("RMSE: %f" % gbt_evaluator.evaluate(prediction))

R Squared (R2) on test data = 0.899337
RMSE: 16437.748252


In [0]:
cva = TrainValidationSplit(estimator= pipeline, evaluator=gbt_evaluator, estimatorParamMaps=paramGrid, trainRatio=0.8)


example = cva.fit(train)


# Transform the test data and generate predictions by applying the trained model
prediction = example.transform(test)
predicted = prediction.select("wages", "prediction", "TotalWages")
print("Displaying results from TrainValidationSplit with the Predicted dataset")
predicted.show()

Displaying results from TrainValidationSplit with the Predicted dataset
+--------------------+------------------+----------+
|               wages|        prediction|TotalWages|
+--------------------+------------------+----------+
|[31897.0,34289.0,...|  34572.5588365633|   34289.0|
|[43955.0,46998.0,...| 48045.62950520257|   46998.0|
|[47347.0,49172.0,...| 48045.62950520257|   49172.0|
|[47347.0,50478.0,...|52462.623924493986|   50478.0|
|[55969.0,61586.0,...|61653.309718778386|   61586.0|
|[56294.0,61863.0,...|61653.309718778386|   61863.0|
|[3317.0,12724.0,6...|12939.672514724653|   12724.0|
|[38979.0,39626.0,...| 39336.42602898334|   39626.0|
|[44389.0,44840.0,...| 43703.13219409577|   44840.0|
|[45575.0,46305.0,...| 48051.07997088548|   46305.0|
|[49471.0,50219.0,...|  52468.0743901769|   50219.0|
|[55745.0,56705.0,...| 56893.88048049886|   56705.0|
|[66723.0,67795.0,...| 67897.45906108731|   67795.0|
|[1059.0,1059.0,0....| 915.6912916537415|    1059.0|
|[3233.0,3270.0,0....| 3124