<a href="https://colab.research.google.com/github/Anyulund/NLP/blob/master/SparkNLPPipeliine.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive 
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


Task 1 - Install Spark, load required libraries, set environment variables, initiate Spark, load file 

In [None]:
'''
import shutil

shutil.rmtree('/content/spark-3.0.0-bin-hadoop3.2.tgz')
'''

"\nimport shutil\n\nshutil.rmtree('/content/spark-3.0.0-bin-hadoop3.2.tgz')\n"

In [None]:
# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.1.1-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"


# install findspark using pip
!pip install -q findspark

In [None]:
import os
import findspark
findspark.init()
from google.colab import files
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import isnan, when, count, col, lit
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder

In [None]:
sc = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
#files.upload()

In [None]:
data = sc.read.csv('/content/gdrive/MyDrive/MLPipelines/data.csv',inferSchema=True, header=True)

In [None]:
data.printSchema()
data.describe().toPandas().transpose()

root
 |-- pider: string (nullable = true)
 |-- 2017: string (nullable = true)
 |-- premium unleaded (recommended): integer (nullable = true)
 |-- 160: string (nullable = true)
 |-- 4: integer (nullable = true)
 |-- MANUAL: integer (nullable = true)
 |-- rear wheel drive: string (nullable = true)
 |-- 2: string (nullable = true)
 |-- Performance: integer (nullable = true)
 |-- Compact: string (nullable = true)
 |-- Convertible: string (nullable = true)
 |-- 35: string (nullable = true)
 |-- 26: integer (nullable = true)
 |-- 819: integer (nullable = true)
 |-- 24995: integer (nullable = true)



Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
pider,11880,,,Acura,Volvo
2017,11880,767.8436781609196,1511.4508757153594,124 Spider,xD
premium unleaded (recommended),11880,2010.4026094276094,7.56446225730375,1990,2017
160,11877,,,diesel,regular unleaded
4,11811,249.46769960206586,109.29410966315454,55,1001
MANUAL,11850,5.628101265822785,1.782882058448863,0,16
rear wheel drive,11880,,,AUTOMATED_MANUAL,UNKNOWN
2,11880,,,all wheel drive,rear wheel drive
Performance,11874,3.4376789624389423,0.8804751303515581,2,4


In [None]:
# Replace all None with N/A so Spark can read the missing values 
def replace(column, value):
  return when(column !=value, column).otherwise(lit(None))

data = data.withColumn("Compact", replace(col("Compact"), "N/A"))

In [None]:
data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]).show()

+-----+----+------------------------------+---+---+------+----------------+---+-----------+-------+-----------+---+---+---+-----+
|pider|2017|premium unleaded (recommended)|160|  4|MANUAL|rear wheel drive|  2|Performance|Compact|Convertible| 35| 26|819|24995|
+-----+----+------------------------------+---+---+------+----------------+---+-----------+-------+-----------+---+---+---+-----+
|    0|   0|                             0|  3| 69|    30|               0|  0|          6|   3742|          0|  0|  0|  0|    0|
+-----+----+------------------------------+---+---+------+----------------+---+-----------+-------+-----------+---+---+---+-----+



In [None]:
data = data.drop('Compact')
data = data.na.drop()
print(data.count(),len(data.columns))

11778 14


In [None]:
data.columns

['pider',
 '2017',
 'premium unleaded (recommended)',
 '160',
 '4',
 'MANUAL',
 'rear wheel drive',
 '2',
 'Performance',
 'Convertible',
 '35',
 '26',
 '819',
 '24995']

In [None]:
# combine many columns into one called \'Attributes' using VectorAssembler 
assembler = VectorAssembler(inputCols = ['premium unleaded (recommended)', '4','MANUAL', '26','819','24995'], outputCol = 'Attributes')

regressor = RandomForestRegressor(featuresCol='Attributes',labelCol='Performance')
pipeline = Pipeline(stages=(assembler,regressor))
pipeline.write().overwrite().save('pipeline') # lets you overwrite and save the pipeline without an error 
!ls


gdrive	  sample_data		     spark-3.1.1-bin-hadoop3.2.tgz
pipeline  spark-3.1.1-bin-hadoop3.2


In [None]:
pipelineModel = Pipeline.load('pipeline')
paramGrid = ParamGridBuilder().addGrid(regressor.numTrees,[100,500]).build()
crossval = CrossValidator(estimator=pipelineModel,
                          estimatorParamMaps = paramGrid,
                          evaluator = RegressionEvaluator(labelCol='Performance'),
                          numFolds = 3)

In [None]:
train,test = data.randomSplit([0.8, 0.2], seed = 123)
cvModel = crossval.fit(train)

In [None]:
bestModel = cvModel.bestModel
for x in range(len(bestModel.stages)):
  print(bestModel.stages[x])

VectorAssembler_f3c36fc03d8b
RandomForestRegressionModel: uid=RandomForestRegressor_b7e9bf1e264b, numTrees=500, numFeatures=6


In [None]:
pred = cvModel.transform(test)
pred.select('Performance','prediction').show()

+-----------+------------------+
|Performance|        prediction|
+-----------+------------------+
|          2|3.4423347532172746|
|          2|3.4779011247962313|
|          4| 3.579515014187246|
|          4|  3.67186498638074|
|          4|  3.67186498638074|
|          4| 3.651011958834196|
|          4| 3.300637574679277|
|          4| 3.300637574679277|
|          2|3.3602956469872116|
|          2|3.3602956469872116|
|          4|3.3602956469872116|
|          4|3.3802705638882062|
|          2|3.3980169512764222|
|          2| 3.375729232220516|
|          2| 3.187295137900144|
|          4|3.2274593488042593|
|          4|3.6100871944872344|
|          4|3.6100871944872344|
|          4| 3.609649590783871|
|          4|3.6100871944872344|
+-----------+------------------+
only showing top 20 rows



In [None]:
eval = RegressionEvaluator(labelCol='Performance')
rmse = eval.evaluate(pred)
mse = eval.evaluate(pred, {eval.metricName:"mse"})
mae = eval.evaluate(pred, {eval.metricName:"mae"})
r2 = eval.evaluate(pred, {eval.metricName:"r2"})

print('RMSE: %.3f' %rmse)
print('MSE: %.3f' %mse)
print('MAE: %.3f' %mae)
print('R2: %.3f' %r2)

RMSE: 0.779
MSE: 0.607
MAE: 0.658
R2: 0.214
