In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz
!tar xf spark-2.4.8-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 57 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 55.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=5d22c8e97cc2c71ae0ab18c7c6f4e90855651e64e004beb3b878a600e8035295
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [None]:
from pyspark import SparkContext, SparkConf

In [None]:
conf = SparkConf().setAppName('Spark Prac').setMaster('local')
sc = SparkContext(conf = conf)

In [None]:
from google.colab import files
uploaded = files.upload()

Saving Admission_Prediction.csv to Admission_Prediction.csv


In [None]:
import io
data1 = io.BytesIO(uploaded['Admission_Prediction.csv'])    

In [None]:
data = sc.textFile('Admission_Prediction.csv')

#### Import PySpark ML Libraries

In [None]:
from __future__ import print_function
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, VectorIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
spark = SparkSession.builder.appName("Spark ML Algo").getOrCreate()

In [None]:
dataframe = spark.read.csv("Admission_Prediction.csv", header=True)

In [None]:
type(dataframe)

pyspark.sql.dataframe.DataFrame

In [None]:
dataframe.show()

+---------+-----------+-----------------+----+----+----+--------+---------------+
|GRE Score|TOEFL Score|University Rating| SOP| LOR|CGPA|Research|Chance of Admit|
+---------+-----------+-----------------+----+----+----+--------+---------------+
|   337.00|     118.00|                4|4.50|4.50|9.65|    1.00|           0.92|
|   324.00|     107.00|                4|4.00|4.50|8.87|    1.00|           0.76|
|     null|     104.00|                3|3.00|3.50|8.00|    1.00|           0.72|
|   322.00|     110.00|                3|3.50|2.50|8.67|    1.00|           0.80|
|   314.00|     103.00|                2|2.00|3.00|8.21|    0.00|           0.65|
|   330.00|     115.00|                5|4.50|3.00|9.34|    1.00|           0.90|
|   321.00|     109.00|             null|3.00|4.00|8.20|    1.00|           0.75|
|   308.00|     101.00|                2|3.00|4.00|7.90|    0.00|           0.68|
|   302.00|     102.00|                1|2.00|1.50|8.00|    0.00|           0.50|
|   323.00|     

In [None]:
dataframe.printSchema()

root
 |-- GRE Score: string (nullable = true)
 |-- TOEFL Score: string (nullable = true)
 |-- University Rating: string (nullable = true)
 |-- SOP: string (nullable = true)
 |-- LOR: string (nullable = true)
 |-- CGPA: string (nullable = true)
 |-- Research: string (nullable = true)
 |-- Chance of Admit: string (nullable = true)



In [None]:
from pyspark.sql.functions import col
new_dataframe = dataframe.select(*(col(c).cast("float").alias(c) for c in dataframe.columns))

In [None]:
new_dataframe.printSchema()

root
 |-- GRE Score: float (nullable = true)
 |-- TOEFL Score: float (nullable = true)
 |-- University Rating: float (nullable = true)
 |-- SOP: float (nullable = true)
 |-- LOR: float (nullable = true)
 |-- CGPA: float (nullable = true)
 |-- Research: float (nullable = true)
 |-- Chance of Admit: float (nullable = true)



In [None]:
from pyspark.sql.functions import col, count, isnan, when

In [None]:
for c in new_dataframe.columns:
  print(c)

GRE Score
TOEFL Score
University Rating
SOP
LOR
CGPA
Research
Chance of Admit


In [None]:
### Checking for null ir nan type values in out columns
new_dataframe.select([count(when(col(c).isNull(), c)).alias(c) for c in new_dataframe.columns]).show()

+---------+-----------+-----------------+---+---+----+--------+---------------+
|GRE Score|TOEFL Score|University Rating|SOP|LOR|CGPA|Research|Chance of Admit|
+---------+-----------+-----------------+---+---+----+--------+---------------+
|       15|         10|               15|  0|  0|   0|       0|              0|
+---------+-----------+-----------------+---+---+----+--------+---------------+



In [None]:
from pyspark.ml.feature import Imputer

In [None]:
imputer = Imputer(inputCols=["GRE Score", "TOEFL Score","University Rating"], 
                  outputCols=["GRE Score", "TOEFL Score","University Rating"])
model = imputer.fit(new_dataframe)

imputed_data = model.transform(new_dataframe)

In [None]:
imputed_data

DataFrame[GRE Score: float, TOEFL Score: float, University Rating: float, SOP: float, LOR: float, CGPA: float, Research: float, Chance of Admit: float]

In [None]:
#checking for null ir nan type values in our columns
imputed_data.select([count(when(col(c).isNull(), c)).alias(c) for c in imputed_data.columns]).show()

+---------+-----------+-----------------+---+---+----+--------+---------------+
|GRE Score|TOEFL Score|University Rating|SOP|LOR|CGPA|Research|Chance of Admit|
+---------+-----------+-----------------+---+---+----+--------+---------------+
|        0|          0|                0|  0|  0|   0|       0|              0|
+---------+-----------+-----------------+---+---+----+--------+---------------+



In [None]:
features = imputed_data.drop('Chance of Admit')

In [None]:
features

DataFrame[GRE Score: float, TOEFL Score: float, University Rating: float, SOP: float, LOR: float, CGPA: float, Research: float]

In [None]:
#let's assemble our features together using vectorAssembler
assembler = VectorAssembler( inputCols=features.columns,outputCol="features")

In [None]:
output = assembler.transform(imputed_data)

#### Linear Regressor

In [None]:
output= output.select("features", "Chance of Admit")

In [None]:
output = assembler.transform(imputed_data)

In [None]:
output= output.select("features", "Chance of Admit")

In [None]:
train_df,test_df = output.randomSplit([0.7, 0.3])

In [None]:
train_df.show()
test_df.show()

+--------------------+---------------+
|            features|Chance of Admit|
+--------------------+---------------+
|[290.0,100.0,1.0,...|           0.47|
|[290.0,104.0,4.0,...|           0.45|
|[293.0,97.0,2.0,2...|           0.64|
|[294.0,95.0,1.0,1...|           0.49|
|[295.0,96.0,2.0,1...|           0.47|
|[295.0,99.0,2.0,2...|           0.57|
|[295.0,101.0,2.0,...|           0.69|
|[296.0,97.0,2.0,1...|           0.49|
|[296.0,99.0,2.0,2...|           0.61|
|[296.0,101.0,1.0,...|            0.6|
|[297.0,96.0,2.0,2...|           0.43|
|[297.0,96.0,2.0,2...|           0.34|
|[297.0,98.0,2.0,2...|           0.59|
|[297.0,99.0,4.0,3...|           0.54|
|[297.0,100.0,1.0,...|           0.52|
|[297.0,101.0,3.0,...|           0.57|
|[298.0,97.0,3.121...|           0.45|
|[298.0,98.0,2.0,1...|           0.44|
|[298.0,98.0,2.0,4...|           0.34|
|[298.0,99.0,1.0,1...|           0.53|
+--------------------+---------------+
only showing top 20 rows

+--------------------+---------------+

In [None]:
lin_reg = LinearRegression(featuresCol = 'features', labelCol='Chance of Admit')
linear_model = lin_reg.fit(train_df)

In [None]:
print("Coefficients: " + str(linear_model.coefficients))
print("Intercept: " + str(linear_model.intercept))

Coefficients: [0.0015330494946342618,0.0023519563543600987,0.0076305691461127645,-0.000640853503839564,0.018731808306761612,0.12635412086451406,0.025797081114095]
Intercept: -1.2012592480061368


In [None]:
trainSummary = linear_model.summary
print("RMSE: %f" % trainSummary.rootMeanSquaredError)
print("r2: %f" % trainSummary.r2)

RMSE: 0.059358
r2: 0.823672


In [None]:
# prediction

predictions = linear_model.transform(test_df)
predictions.select("prediction","Chance of Admit","features").show()

+-------------------+---------------+--------------------+
|         prediction|Chance of Admit|            features|
+-------------------+---------------+--------------------+
|0.44228849630862066|           0.46|[294.0,93.0,1.0,1...|
|0.42328441874278155|           0.46|[295.0,93.0,1.0,2...|
| 0.4747813232256899|           0.37|[295.0,99.0,1.0,2...|
| 0.5052685980765481|           0.44|[296.0,95.0,2.0,3...|
| 0.4841250147454388|           0.47|[296.0,99.0,2.0,3...|
| 0.5114524516204115|           0.51|[298.0,92.0,1.0,2...|
| 0.6771850593675526|           0.69|[298.0,105.0,3.0,...|
| 0.5383836327973122|           0.38|[299.0,97.0,3.0,5...|
|  0.672084228922913|           0.56|[299.0,102.0,3.0,...|
| 0.5879557326497913|           0.62|[300.0,95.0,2.0,3...|
| 0.6916804096917735|           0.64|[300.0,100.0,3.0,...|
|  0.544725539922505|           0.56|[300.0,102.0,2.0,...|
| 0.5983465665738099|           0.63|[300.0,102.0,3.0,...|
| 0.6111528133071027|           0.71|[300.0,104.0,3.0,..

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
pred_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="Chance of Admit",metricName="r2")
print("R Squared (R2) on test data =", pred_evaluator.evaluate(predictions))

R Squared (R2) on test data = 0.8110789276193163
