# Regression analysis in Spark

# Import modules and create spark session

In [4]:
#Import modules

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler


In [5]:
#Create Spark Session
appName = "Regression in Spark"
spark = SparkSession.builder.appName('appName').config("spark.some.config.option","some-value").getOrCreate()

In [6]:
#Create table schema

flightSchema = StructType([
StructField("DayofMonth",IntegerType(),False),
StructField("DayOfWeek",IntegerType(),False),
StructField("Carrier",StringType(),False),
StructField("OrginAirportID",IntegerType(),False),
StructField("DestAirportID",IntegerType(),False),
StructField("DepDelay",IntegerType(),False),
StructField("ArrDelay",IntegerType(),False),
])

In [7]:
flightDataFrame = spark.read.csv('dataset/raw-flight-data.csv',header=True,schema = flightSchema)
flightDataFrame.show(5)


+----------+---------+-------+--------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OrginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+--------------+-------------+--------+--------+
|        19|        5|     DL|         11433|        13303|      -3|       1|
|        19|        5|     DL|         14869|        12478|       0|      -8|
|        19|        5|     DL|         14057|        14869|      -4|     -15|
|        19|        5|     DL|         15016|        11433|      28|      24|
|        19|        5|     DL|         11193|        12892|      -6|     -11|
+----------+---------+-------+--------------+-------------+--------+--------+
only showing top 5 rows



# Select important data for Regression feature

In [11]:
#select related column data for regression input features

data = flightDataFrame.select("DayofMonth","DayOfWeek","OrginAirportID","DestAirportID","DepDelay","ArrDelay")
data.show(5)

+----------+---------+--------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|OrginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+--------------+-------------+--------+--------+
|        19|        5|         11433|        13303|      -3|       1|
|        19|        5|         14869|        12478|       0|      -8|
|        19|        5|         14057|        14869|      -4|     -15|
|        19|        5|         15016|        11433|      28|      24|
|        19|        5|         11193|        12892|      -6|     -11|
+----------+---------+--------------+-------------+--------+--------+
only showing top 5 rows



In [12]:
#Divide data into training and testing data

divideData = data.randomSplit([0.7,0.3])
trainingData = divideData[0] #index 0 = data training
testingData = divideData[1] #index 1 = data testing
train_rows = trainingData.count()
testing_rows = testingData.count()

print("Training data rows:",train_rows, ",Testing data rows:", testing_rows)

Training data rows: 340977 ,Testing data rows: 146237


# Preparing training data

In [13]:
#Define an assembler

assembler = VectorAssembler(inputCols = ["DayofMonth","DayOfWeek","OrginAirportID","DestAirportID","DepDelay"],outputCol="features")

In [14]:
#Change features into one column using defined assembler

trainingDataFinal = assembler.transform(trainingData).select(col("features"),(col("ArrDelay").cast("Int").alias("label")))

trainingDataFinal.show(truncate=False,n=3)

+------------------------------+-----+
|features                      |label|
+------------------------------+-----+
|[1.0,1.0,10140.0,10397.0,-4.0]|-11  |
|[1.0,1.0,10140.0,11292.0,2.0] |-1   |
|[1.0,1.0,10140.0,11298.0,-6.0]|-25  |
+------------------------------+-----+
only showing top 3 rows



# Train our Regression model using training data 

In [15]:
#Call spark linear regression and import before

algoritma = LinearRegression(labelCol = "label",featuresCol = "features",maxIter = 5,regParam = 0.3,elasticNetParam=0.8)



In [16]:
#train the model

model = algoritma.fit(trainingDataFinal)

print("Regression model is trained")

Py4JJavaError: An error occurred while calling o98.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 13.0 failed 1 times, most recent failure: Lost task 3.0 in stage 13.0 (TID 28, localhost, executor driver): scala.MatchError: [null,1.0,[8.0,3.0,10140.0,11298.0,10.0]] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
	at org.apache.spark.ml.regression.LinearRegression$$anonfun$train$1$$anonfun$8.apply(LinearRegression.scala:325)
	at org.apache.spark.ml.regression.LinearRegression$$anonfun$train$1$$anonfun$8.apply(LinearRegression.scala:325)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1334)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1145)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1145)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$25.apply(RDD.scala:1146)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$25.apply(RDD.scala:1146)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
	at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1098)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.fold(RDD.scala:1092)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1161)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1137)
	at org.apache.spark.ml.optim.WeightedLeastSquares.fit(WeightedLeastSquares.scala:105)
	at org.apache.spark.ml.regression.LinearRegression$$anonfun$train$1.apply(LinearRegression.scala:345)
	at org.apache.spark.ml.regression.LinearRegression$$anonfun$train$1.apply(LinearRegression.scala:319)
	at org.apache.spark.ml.util.Instrumentation$$anonfun$11.apply(Instrumentation.scala:183)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:183)
	at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:319)
	at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:176)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: scala.MatchError: [null,1.0,[8.0,3.0,10140.0,11298.0,10.0]] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
	at org.apache.spark.ml.regression.LinearRegression$$anonfun$train$1$$anonfun$8.apply(LinearRegression.scala:325)
	at org.apache.spark.ml.regression.LinearRegression$$anonfun$train$1$$anonfun$8.apply(LinearRegression.scala:325)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1334)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1145)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1145)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$25.apply(RDD.scala:1146)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$25.apply(RDD.scala:1146)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


# Prepare testing data

In [10]:
#change the feature data into one column using the defined assembler

testingDataFinal = assembler.transform(testingData).select(col("features"),col("ArrivalDelay").cast("Int").alias("trueLabel"))

testingDataFinal.show(truncate=False,n=3)


AnalysisException: "cannot resolve '`ArrivalDelay`' given input columns: [DestAirportID, DayOfWeek, DayofMonth, ArrDelay, features, DepDelay, OrginAirportID];;\n'Project [features#142, cast('ArrivalDelay as int) AS trueLabel#150]\n+- Project [DayofMonth#0, DayOfWeek#1, OrginAirportID#3, DestAirportID#4, DepDelay#5, ArrDelay#6, UDF(named_struct(DayofMonth_double_VectorAssembler_3ed062068c91, cast(DayofMonth#0 as double), DayOfWeek_double_VectorAssembler_3ed062068c91, cast(DayOfWeek#1 as double), OrginAirportID_double_VectorAssembler_3ed062068c91, cast(OrginAirportID#3 as double), DestAirportID_double_VectorAssembler_3ed062068c91, cast(DestAirportID#4 as double), DepDelay_double_VectorAssembler_3ed062068c91, cast(DepDelay#5 as double))) AS features#142]\n   +- Sample 0.7, 1.0, false, 8170379447133904029\n      +- Sort [DayofMonth#0 ASC NULLS FIRST, DayOfWeek#1 ASC NULLS FIRST, OrginAirportID#3 ASC NULLS FIRST, DestAirportID#4 ASC NULLS FIRST, DepDelay#5 ASC NULLS FIRST, ArrDelay#6 ASC NULLS FIRST], false\n         +- Project [DayofMonth#0, DayOfWeek#1, OrginAirportID#3, DestAirportID#4, DepDelay#5, ArrDelay#6]\n            +- Relation[DayofMonth#0,DayOfWeek#1,Carrier#2,OrginAirportID#3,DestAirportID#4,DepDelay#5,ArrDelay#6] csv\n"

# Predict the testing data using trained model

In [None]:
#predict testing data using model
prediction = model.transform(testingDataFinal)
#Display the prediction results
prediction.show(5)

# Calculate model performance

In [None]:
#import evaluator model for regression
from pyspark.ml.evaluation import RegressionEvaluator
    

In [None]:
#define evaluator
evaluator = RegressionEvaluator(labelCol="trueLabel",predictionCol="prediction",metricName="rmse")

In [None]:
#Calculate RSME of trained model

rmse = evaluator.evaluate(prediction)
print("Root means Square Error:" , rmse)