In [5]:
from __future__ import print_function
import pyspark
findspark.find()
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler,VectorIndexer
from pyspark.ml.feature import VectorAssembler         # transformation technique 
from pyspark.ml.regression import LinearRegression      # if you are using dataframe import from pyspark.ml
from pyspark.ml.regression import RandomForestRegressor # if you are using rdd u use ml.mlib
from pyspark.ml.evaluation import RegressionEvaluator

In [6]:
import findspark
findspark.init()
findspark.find()

'C:\\spark-3.0.2-bin-hadoop3.2'

In [7]:
spark = SparkSession.builder.appName("Pyspark ML Algorithms").getOrCreate() # getorcreate is like read and write ops

In [8]:
dataframe = spark.read.csv("Admission_Prediction.csv",header=True)

In [9]:
type (dataframe)

pyspark.sql.dataframe.DataFrame

In [10]:
dataframe.show()

+---------+-----------+-----------------+----+----+----+--------+---------------+
|GRE Score|TOEFL Score|University Rating| SOP| LOR|CGPA|Research|Chance of Admit|
+---------+-----------+-----------------+----+----+----+--------+---------------+
|   337.00|     118.00|                4|4.50|4.50|9.65|    1.00|           0.92|
|   324.00|     107.00|                4|4.00|4.50|8.87|    1.00|           0.76|
|     null|     104.00|                3|3.00|3.50|8.00|    1.00|           0.72|
|   322.00|     110.00|                3|3.50|2.50|8.67|    1.00|           0.80|
|   314.00|     103.00|                2|2.00|3.00|8.21|    0.00|           0.65|
|   330.00|     115.00|                5|4.50|3.00|9.34|    1.00|           0.90|
|   321.00|     109.00|             null|3.00|4.00|8.20|    1.00|           0.75|
|   308.00|     101.00|                2|3.00|4.00|7.90|    0.00|           0.68|
|   302.00|     102.00|                1|2.00|1.50|8.00|    0.00|           0.50|
|   323.00|     

In [11]:
dataframe.printSchema()

root
 |-- GRE Score: string (nullable = true)
 |-- TOEFL Score: string (nullable = true)
 |-- University Rating: string (nullable = true)
 |-- SOP: string (nullable = true)
 |-- LOR: string (nullable = true)
 |-- CGPA: string (nullable = true)
 |-- Research: string (nullable = true)
 |-- Chance of Admit: string (nullable = true)



In [12]:
from pyspark.sql.functions import col
new_dataframe = dataframe.select(*(col(c).cast("float").alias(c) for c in dataframe.columns)) # to convert to float

In [13]:
new_dataframe.printSchema()

root
 |-- GRE Score: float (nullable = true)
 |-- TOEFL Score: float (nullable = true)
 |-- University Rating: float (nullable = true)
 |-- SOP: float (nullable = true)
 |-- LOR: float (nullable = true)
 |-- CGPA: float (nullable = true)
 |-- Research: float (nullable = true)
 |-- Chance of Admit: float (nullable = true)



In [14]:
from pyspark.sql.functions import col, count, isnan, when

In [15]:
for c in new_dataframe.columns:
    print(c)

GRE Score
TOEFL Score
University Rating
SOP
LOR
CGPA
Research
Chance of Admit


In [16]:
#checking for null ir nan type values in our columns
new_dataframe.select([count(when(col(c).isNull(), c)).alias(c) for c in new_dataframe.columns]).show() # when - where

+---------+-----------+-----------------+---+---+----+--------+---------------+
|GRE Score|TOEFL Score|University Rating|SOP|LOR|CGPA|Research|Chance of Admit|
+---------+-----------+-----------------+---+---+----+--------+---------------+
|       15|         10|               15|  0|  0|   0|       0|              0|
+---------+-----------+-----------------+---+---+----+--------+---------------+



In [17]:
from pyspark.ml.feature import Imputer

In [18]:
imputer = Imputer(inputCols=["GRE Score", "TOEFL Score","University Rating"], 
                  outputCols=["GRE Score", "TOEFL Score","University Rating"]) # outputCols- new columns
model = imputer.fit(new_dataframe)

imputed_data = model.transform(new_dataframe)

In [19]:
imputed_data

DataFrame[GRE Score: float, TOEFL Score: float, University Rating: float, SOP: float, LOR: float, CGPA: float, Research: float, Chance of Admit: float]

In [20]:
#checking for null ir nan type values in our columns
imputed_data.select([count(when(col(c).isNull(), c)).alias(c) for c in imputed_data.columns]).show()

+---------+-----------+-----------------+---+---+----+--------+---------------+
|GRE Score|TOEFL Score|University Rating|SOP|LOR|CGPA|Research|Chance of Admit|
+---------+-----------+-----------------+---+---+----+--------+---------------+
|        0|          0|                0|  0|  0|   0|       0|              0|
+---------+-----------+-----------------+---+---+----+--------+---------------+



In [21]:
features = imputed_data.drop('Chance of Admit')

In [22]:
features

DataFrame[GRE Score: float, TOEFL Score: float, University Rating: float, SOP: float, LOR: float, CGPA: float, Research: float]

In [23]:
#let's assemble our features together using vectorAssembler
assembler = VectorAssembler( inputCols=features.columns,outputCol="features") # features is the new column we define
# iwill take all features and output a single vector column

In [24]:
output = assembler.transform(imputed_data)
output

DataFrame[GRE Score: float, TOEFL Score: float, University Rating: float, SOP: float, LOR: float, CGPA: float, Research: float, Chance of Admit: float, features: vector]

In [25]:
output.show()

+---------+-----------+-----------------+---+---+----+--------+---------------+--------------------+
|GRE Score|TOEFL Score|University Rating|SOP|LOR|CGPA|Research|Chance of Admit|            features|
+---------+-----------+-----------------+---+---+----+--------+---------------+--------------------+
|    337.0|      118.0|              4.0|4.5|4.5|9.65|     1.0|           0.92|[337.0,118.0,4.0,...|
|    324.0|      107.0|              4.0|4.0|4.5|8.87|     1.0|           0.76|[324.0,107.0,4.0,...|
|316.55878|      104.0|              3.0|3.0|3.5| 8.0|     1.0|           0.72|[316.558776855468...|
|    322.0|      110.0|              3.0|3.5|2.5|8.67|     1.0|            0.8|[322.0,110.0,3.0,...|
|    314.0|      103.0|              2.0|2.0|3.0|8.21|     0.0|           0.65|[314.0,103.0,2.0,...|
|    330.0|      115.0|              5.0|4.5|3.0|9.34|     1.0|            0.9|[330.0,115.0,5.0,...|
|    321.0|      109.0|        3.1216495|3.0|4.0| 8.2|     1.0|           0.75|[321.0,109.0

## Linear Regressor

In [26]:
output= output.select("features", "Chance of Admit")
output

DataFrame[features: vector, Chance of Admit: float]

In [27]:
output.select("features").toPandas()
#output.select("features").toPandas().values

Unnamed: 0,features
0,"[337.0, 118.0, 4.0, 4.5, 4.5, 9.64999961853027..."
1,"[324.0, 107.0, 4.0, 4.0, 4.5, 8.86999988555908..."
2,"[316.55877685546875, 104.0, 3.0, 3.0, 3.5, 8.0..."
3,"[322.0, 110.0, 3.0, 3.5, 2.5, 8.67000007629394..."
4,"[314.0, 103.0, 2.0, 2.0, 3.0, 8.21000003814697..."
...,...
495,"[332.0, 108.0, 5.0, 4.5, 4.0, 9.02000045776367..."
496,"[337.0, 117.0, 5.0, 5.0, 5.0, 9.86999988555908..."
497,"[330.0, 120.0, 5.0, 4.5, 5.0, 9.5600004196167,..."
498,"[312.0, 103.0, 4.0, 4.0, 5.0, 8.43000030517578..."


In [28]:
train_df,test_df = output.randomSplit([0.7, 0.3])

In [29]:
train_df.show()
test_df.show()

+--------------------+---------------+
|            features|Chance of Admit|
+--------------------+---------------+
|[290.0,100.0,1.0,...|           0.47|
|[290.0,104.0,4.0,...|           0.45|
|[293.0,97.0,2.0,2...|           0.64|
|[294.0,95.0,1.0,1...|           0.49|
|[295.0,96.0,2.0,1...|           0.47|
|[295.0,99.0,2.0,2...|           0.57|
|[295.0,101.0,2.0,...|           0.69|
|[296.0,97.0,2.0,1...|           0.49|
|[296.0,99.0,2.0,2...|           0.61|
|[296.0,99.0,2.0,3...|           0.47|
|[297.0,96.0,2.0,2...|           0.43|
|[297.0,96.0,2.0,2...|           0.34|
|[297.0,100.0,1.0,...|           0.52|
|[297.0,101.0,3.0,...|           0.57|
|[298.0,92.0,1.0,2...|           0.51|
|[298.0,97.0,3.121...|           0.45|
|[298.0,98.0,2.0,4...|           0.34|
|[298.0,100.0,3.0,...|           0.58|
|[298.0,101.0,2.0,...|           0.54|
|[298.0,101.0,4.0,...|           0.53|
+--------------------+---------------+
only showing top 20 rows

+--------------------+---------------+

In [30]:
lin_reg = LinearRegression(featuresCol = 'features', labelCol='Chance of Admit')
linear_model = lin_reg.fit(train_df)

In [31]:
print("Coefficients: " + str(linear_model.coefficients)) # m
print("Intercept: " + str(linear_model.intercept)) # c 

Coefficients: [0.0021591926873072444,0.0023899862115994196,0.008208385949399653,-0.0009551495983607279,0.0162794086072186,0.12238570813726027,0.018245507265415475]
Intercept: -1.3576043915348561


In [32]:
# Training data Summary
trainSummary = linear_model.summary
print("RMSE: %f" % trainSummary.rootMeanSquaredError)
print("r2: %f" % trainSummary.r2)

RMSE: 0.059080
r2: 0.827074


In [33]:
# prediction

predictions = linear_model.transform(test_df)
predictions.select("prediction","Chance of Admit","features").show()

+-------------------+---------------+--------------------+
|         prediction|Chance of Admit|            features|
+-------------------+---------------+--------------------+
|  0.439560283209006|           0.46|[294.0,93.0,1.0,1...|
|0.42166014811169483|           0.46|[295.0,93.0,1.0,2...|
| 0.4731431174405889|           0.37|[295.0,99.0,1.0,2...|
|0.49570921627990083|           0.44|[296.0,95.0,2.0,3...|
|  0.517486206540043|            0.6|[296.0,101.0,1.0,...|
| 0.5194599998067666|           0.59|[297.0,98.0,2.0,2...|
| 0.5630628702205558|           0.54|[297.0,99.0,4.0,3...|
| 0.5118745653336181|           0.44|[298.0,98.0,2.0,1...|
|0.49105493897716523|           0.53|[298.0,99.0,1.0,1...|
| 0.6683572930070112|           0.69|[298.0,105.0,3.0,...|
|0.43449668722068036|           0.42|[299.0,94.0,1.0,1...|
| 0.5066478913188448|           0.42|[299.0,100.0,3.0,...|
| 0.5794286016080119|           0.62|[300.0,95.0,2.0,3...|
| 0.5550233576660655|           0.58|[300.0,99.0,1.0,1..

In [34]:
# accuracy on test data
from pyspark.ml.evaluation import RegressionEvaluator
pred_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="Chance of Admit",metricName="r2")
print("R Squared (R2) on test data =", pred_evaluator.evaluate(predictions))

R Squared (R2) on test data = 0.803883005325898


## Random Forest Regressor

VectorIndexer:

    is used to index categorical predictors in a featuresCol column. Remember that featuresCol is a single column consisting of vectors (refer to featuresCol and labelCol). Each row is a vector which contains values from each predictors.
    if you have string type predictors, you will first need to use index those columns with StringIndexer. featuresCol contains vectors, and vectors does not contain string values.


In [35]:
featureIndexer =VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(output)

In [36]:
featureIndexer = featureIndexer.transform(output)

In [37]:
new_indexed_data = featureIndexer.select("indexedFeatures", "Chance of Admit")

In [38]:
training, test = new_indexed_data.randomSplit([0.7, 0.3])

In [39]:
training.show()

+--------------------+---------------+
|     indexedFeatures|Chance of Admit|
+--------------------+---------------+
|[290.0,104.0,4.0,...|           0.45|
|[293.0,97.0,2.0,2...|           0.64|
|[294.0,93.0,1.0,1...|           0.46|
|[294.0,95.0,1.0,1...|           0.49|
|[295.0,93.0,1.0,2...|           0.46|
|[295.0,96.0,2.0,1...|           0.47|
|[295.0,99.0,1.0,2...|           0.37|
|[295.0,99.0,2.0,2...|           0.57|
|[295.0,101.0,2.0,...|           0.69|
|[296.0,95.0,2.0,3...|           0.44|
|[296.0,97.0,2.0,1...|           0.49|
|[296.0,99.0,2.0,2...|           0.61|
|[296.0,99.0,2.0,3...|           0.47|
|[297.0,96.0,2.0,2...|           0.34|
|[297.0,98.0,2.0,2...|           0.59|
|[297.0,100.0,1.0,...|           0.52|
|[298.0,97.0,3.121...|           0.45|
|[298.0,98.0,2.0,4...|           0.34|
|[298.0,100.0,3.0,...|           0.58|
|[298.0,101.0,4.0,...|           0.53|
+--------------------+---------------+
only showing top 20 rows



In [40]:
test.show()

+--------------------+---------------+
|     indexedFeatures|Chance of Admit|
+--------------------+---------------+
|[290.0,100.0,1.0,...|           0.47|
|[296.0,101.0,1.0,...|            0.6|
|[297.0,96.0,2.0,2...|           0.43|
|[297.0,99.0,4.0,3...|           0.54|
|[297.0,101.0,3.0,...|           0.57|
|[298.0,92.0,1.0,2...|           0.51|
|[298.0,98.0,2.0,1...|           0.44|
|[298.0,99.0,1.0,1...|           0.53|
|[298.0,101.0,2.0,...|           0.54|
|[299.0,94.0,1.0,1...|           0.42|
|[299.0,97.0,3.0,5...|           0.38|
|[299.0,100.0,3.0,...|           0.63|
|[300.0,98.0,1.0,2...|           0.61|
|[300.0,100.0,3.0,...|           0.64|
|[300.0,100.0,3.12...|           0.62|
|[300.0,101.0,3.0,...|           0.59|
|[300.0,102.0,3.0,...|           0.63|
|[300.0,104.0,3.0,...|           0.71|
|[300.0,105.0,1.0,...|           0.58|
|[302.0,99.0,1.0,2...|           0.57|
+--------------------+---------------+
only showing top 20 rows



In [41]:
random_forest_reg = RandomForestRegressor(featuresCol="indexedFeatures",labelCol="Chance of Admit" )

In [42]:
# Train model.  This also runs the indexer.
model = random_forest_reg.fit(training)

In [43]:
# Make predictions.
predictions = model.transform(test)

In [44]:
predictions.show()

+--------------------+---------------+-------------------+
|     indexedFeatures|Chance of Admit|         prediction|
+--------------------+---------------+-------------------+
|[290.0,100.0,1.0,...|           0.47|0.46985061628006913|
|[296.0,101.0,1.0,...|            0.6| 0.5147199358954799|
|[297.0,96.0,2.0,2...|           0.43|0.49979474238504445|
|[297.0,99.0,4.0,3...|           0.54| 0.5288219662740945|
|[297.0,101.0,3.0,...|           0.57| 0.5453666267105651|
|[298.0,92.0,1.0,2...|           0.51| 0.4693042278654591|
|[298.0,98.0,2.0,1...|           0.44|0.48720948452272567|
|[298.0,99.0,1.0,1...|           0.53|0.48193329818301545|
|[298.0,101.0,2.0,...|           0.54| 0.5203645224893086|
|[299.0,94.0,1.0,1...|           0.42|0.47033379569724965|
|[299.0,97.0,3.0,5...|           0.38| 0.5485203484757015|
|[299.0,100.0,3.0,...|           0.63| 0.5631625186986213|
|[300.0,98.0,1.0,2...|           0.61| 0.5628900508869175|
|[300.0,100.0,3.0,...|           0.64|  0.67689512393091

In [45]:
evaluator = RegressionEvaluator(labelCol="Chance of Admit", predictionCol="prediction", metricName="rmse")
print ("Root Mean Squared Error (RMSE) on test data = ",evaluator.evaluate(predictions))

Root Mean Squared Error (RMSE) on test data =  0.06244640475332184


In [46]:
evaluator = RegressionEvaluator(labelCol="Chance of Admit", predictionCol="prediction", metricName="r2")
print("R Squared (R2) on test data =", evaluator.evaluate(predictions))

R Squared (R2) on test data = 0.8068730920049179
