### Linear Regression

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

In [2]:
spark = SparkSession.builder.appName("Linear Regression").getOrCreate()
data = spark.read.csv("..\Data\Car_Sales.csv", header = True, inferSchema = True)
data.show()

+-------------+------+---------+-------+-------+-----------+------------+----+----------------+
|        Brand| Price|     Body|Mileage|EngineV|Engine Type|Registration|Year|           Model|
+-------------+------+---------+-------+-------+-----------+------------+----+----------------+
|          BMW|  4200|    sedan|    277|      2|     Petrol|         yes|1991|             320|
|Mercedes-Benz|  7900|      van|    427|    2.9|     Diesel|         yes|1999|    Sprinter 212|
|Mercedes-Benz| 13300|    sedan|    358|      5|        Gas|         yes|2003|           S 500|
|         Audi| 23000|crossover|    240|    4.2|     Petrol|         yes|2007|              Q7|
|       Toyota| 18300|crossover|    120|      2|     Petrol|         yes|2011|           Rav 4|
|Mercedes-Benz|199999|crossover|      0|    5.5|     Petrol|         yes|2016|          GLS 63|
|          BMW|  6100|    sedan|    438|      2|        Gas|         yes|1997|             320|
|         Audi| 14200|    vagon|    200|

In [3]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType
def OneHotEncoding(df, col_name = ""):
    ###Gather the distinct values 
    distinct_values = list(df.select(col_name).distinct().toPandas()[col_name])
    # for each of the gathered values create a new column
#For Body
    for distinct_value in distinct_values:
        function = udf(lambda item: 1 if item == distinct_value else 0, IntegerType())
        new_column_name = col_name+'_'+distinct_value
        df = df.withColumn(new_column_name, function(col(col_name)))
    
    return df

In [4]:
data.columns

['Brand',
 'Price',
 'Body',
 'Mileage',
 'EngineV',
 'Engine Type',
 'Registration',
 'Year',
 'Model']

In [5]:
#Encodin gBrand
data = OneHotEncoding(data, col_name = "Brand")

In [6]:
#Encoding Engine Type Column
data = OneHotEncoding(data, col_name = "Engine Type")

In [7]:
#Encoding Body and Registration Columns
data = OneHotEncoding(data, col_name = "Body")
data = OneHotEncoding(data, col_name = "Registration")
data.columns

['Brand',
 'Price',
 'Body',
 'Mileage',
 'EngineV',
 'Engine Type',
 'Registration',
 'Year',
 'Model',
 'Brand_Volkswagen',
 'Brand_Mitsubishi',
 'Brand_Audi',
 'Brand_Mercedes-Benz',
 'Brand_Renault',
 'Brand_BMW',
 'Brand_Toyota',
 'Engine Type_Diesel',
 'Engine Type_Other',
 'Engine Type_Gas',
 'Engine Type_Petrol',
 'Body_van',
 'Body_crossover',
 'Body_other',
 'Body_sedan',
 'Body_hatch',
 'Body_vagon',
 'Registration_no',
 'Registration_yes']

In [8]:
data.printSchema()

root
 |-- Brand: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- Body: string (nullable = true)
 |-- Mileage: integer (nullable = true)
 |-- EngineV: string (nullable = true)
 |-- Engine Type: string (nullable = true)
 |-- Registration: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Model: string (nullable = true)
 |-- Brand_Volkswagen: integer (nullable = true)
 |-- Brand_Mitsubishi: integer (nullable = true)
 |-- Brand_Audi: integer (nullable = true)
 |-- Brand_Mercedes-Benz: integer (nullable = true)
 |-- Brand_Renault: integer (nullable = true)
 |-- Brand_BMW: integer (nullable = true)
 |-- Brand_Toyota: integer (nullable = true)
 |-- Engine Type_Diesel: integer (nullable = true)
 |-- Engine Type_Other: integer (nullable = true)
 |-- Engine Type_Gas: integer (nullable = true)
 |-- Engine Type_Petrol: integer (nullable = true)
 |-- Body_van: integer (nullable = true)
 |-- Body_crossover: integer (nullable = true)
 |-- Body_other: integer (nulla

In [9]:
data = data.drop( "Engine Type", "Body", "Registration", "Brand")


In [10]:
data.columns

['Price',
 'Mileage',
 'EngineV',
 'Year',
 'Model',
 'Brand_Volkswagen',
 'Brand_Mitsubishi',
 'Brand_Audi',
 'Brand_Mercedes-Benz',
 'Brand_Renault',
 'Brand_BMW',
 'Brand_Toyota',
 'Engine Type_Diesel',
 'Engine Type_Other',
 'Engine Type_Gas',
 'Engine Type_Petrol',
 'Body_van',
 'Body_crossover',
 'Body_other',
 'Body_sedan',
 'Body_hatch',
 'Body_vagon',
 'Registration_no',
 'Registration_yes']

In [11]:
#Checking the number of Columns in the our dataset
len(data.columns)

24

In [12]:
#Converting String Columns into Float
from pyspark.sql.types import DoubleType
data = data.withColumn("Price", data['Price'].cast("double"))
data = data.withColumn("EngineV", data['EngineV'].cast("double"))
data.printSchema()


root
 |-- Price: double (nullable = true)
 |-- Mileage: integer (nullable = true)
 |-- EngineV: double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Model: string (nullable = true)
 |-- Brand_Volkswagen: integer (nullable = true)
 |-- Brand_Mitsubishi: integer (nullable = true)
 |-- Brand_Audi: integer (nullable = true)
 |-- Brand_Mercedes-Benz: integer (nullable = true)
 |-- Brand_Renault: integer (nullable = true)
 |-- Brand_BMW: integer (nullable = true)
 |-- Brand_Toyota: integer (nullable = true)
 |-- Engine Type_Diesel: integer (nullable = true)
 |-- Engine Type_Other: integer (nullable = true)
 |-- Engine Type_Gas: integer (nullable = true)
 |-- Engine Type_Petrol: integer (nullable = true)
 |-- Body_van: integer (nullable = true)
 |-- Body_crossover: integer (nullable = true)
 |-- Body_other: integer (nullable = true)
 |-- Body_sedan: integer (nullable = true)
 |-- Body_hatch: integer (nullable = true)
 |-- Body_vagon: integer (nullable = true)
 |-- Registration_n

##### Feature Assembling

In [13]:
from pyspark.ml.feature import VectorAssembler

featureassembler = VectorAssembler(inputCols = ['Price', 'Mileage', 'EngineV', 'Year', 'Brand_Volkswagen', 'Brand_Mitsubishi', 'Brand_Audi', 'Brand_Mercedes-Benz', 'Brand_Renault', 'Brand_BMW', 'Brand_Toyota', 'Engine Type_Diesel',
 'Engine Type_Other',
 'Engine Type_Gas',
 'Engine Type_Petrol',
 'Body_van',
 'Body_crossover',
 'Body_other',
 'Body_sedan',
 'Body_hatch',
 'Body_vagon',
 'Registration_no',
 'Registration_yes'], outputCol = 'features')

output = featureassembler.transform(data)

In [14]:
output.columns


['Price',
 'Mileage',
 'EngineV',
 'Year',
 'Model',
 'Brand_Volkswagen',
 'Brand_Mitsubishi',
 'Brand_Audi',
 'Brand_Mercedes-Benz',
 'Brand_Renault',
 'Brand_BMW',
 'Brand_Toyota',
 'Engine Type_Diesel',
 'Engine Type_Other',
 'Engine Type_Gas',
 'Engine Type_Petrol',
 'Body_van',
 'Body_crossover',
 'Body_other',
 'Body_sedan',
 'Body_hatch',
 'Body_vagon',
 'Registration_no',
 'Registration_yes',
 'features']

In [15]:
(train, test) = output.randomSplit([0.7, 0.3], 123445)

In [16]:
lr  = LinearRegression(featuresCol = "features", labelCol = "Price",maxIter = 10, regParam = 0.3, elasticNetParam = 0.8)
lrModel = lr.fit(train)

Py4JJavaError: An error occurred while calling o359.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 (TID 11) (Abdul-Majeed executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:188)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:108)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:121)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:162)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:81)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:130)
	at org.apache.spark.sql.execution.python.EvalPythonExec$$Lambda$3413/282444629.apply(Unknown Source)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
	at org.apache.spark.rdd.RDD$$Lambda$2605/881382808.apply(Unknown Source)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.sql.execution.SQLExecutionRDD.$anonfun$compute$1(SQLExecutionRDD.scala:52)
	at org.apache.spark.sql.execution.SQLExecutionRDD$$Lambda$2614/28698895.apply(Unknown Source)
	at org.apache.spark.sql.internal.SQLConf$.withExistingConf(SQLConf.scala:158)
	at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2383/1660185319.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:175)
	... 42 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler$$Lambda$3494/527461330.apply(Unknown Source)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler$$Lambda$3492/479375520.apply(Unknown Source)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2309)
	at org.apache.spark.rdd.RDD.$anonfun$fold$1(RDD.scala:1183)
	at org.apache.spark.rdd.RDD$$Lambda$3408/2094857765.apply(Unknown Source)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.fold(RDD.scala:1177)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$1(RDD.scala:1246)
	at org.apache.spark.rdd.RDD$$Lambda$3404/1594814844.apply(Unknown Source)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1222)
	at org.apache.spark.ml.optim.WeightedLeastSquares.fit(WeightedLeastSquares.scala:107)
	at org.apache.spark.ml.regression.LinearRegression.trainWithNormal(LinearRegression.scala:451)
	at org.apache.spark.ml.regression.LinearRegression.$anonfun$train$1(LinearRegression.scala:345)
	at org.apache.spark.ml.regression.LinearRegression$$Lambda$3302/914813876.apply(Unknown Source)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at org.apache.spark.ml.util.Instrumentation$$$Lambda$3303/1040663620.apply(Unknown Source)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:327)
	at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:184)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:151)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:188)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:108)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:121)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:162)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:81)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:130)
	at org.apache.spark.sql.execution.python.EvalPythonExec$$Lambda$3413/282444629.apply(Unknown Source)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
	at org.apache.spark.rdd.RDD$$Lambda$2605/881382808.apply(Unknown Source)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.sql.execution.SQLExecutionRDD.$anonfun$compute$1(SQLExecutionRDD.scala:52)
	at org.apache.spark.sql.execution.SQLExecutionRDD$$Lambda$2614/28698895.apply(Unknown Source)
	at org.apache.spark.sql.internal.SQLConf$.withExistingConf(SQLConf.scala:158)
	at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2383/1660185319.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:175)
	... 42 more


In [None]:
#Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
print("numIterations: %d" % train.totalIterations)

In [None]:
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))

In [None]:
trainingSummary.residuals.show()

In [None]:
print("RMSE: %F " % trainingSummary.rootMeanSquaredError)

In [None]:
print("r2: %f" % trainingSummary.r2)

In [None]:
prediction = lrModel.transform(test)
prediction.show()