# Imported Spark SQL, machine learning libraries, and created sessions

In [1]:
import findspark
findspark.init()
import pyspark

In [13]:
# mengimport modul yang dibutuhkan
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

# membuat SparkSession
appName = "Regresi di Apache Spark"
spark = SparkSession \
.builder \
.appName(appName) \
.config("spark.some.config.option", "some-value") \
.getOrCreate()

sc = spark.sparkContext

# Loading data from files

In [3]:
# membuat skema file
flightSchema = StructType([
StructField("DayofMonth", IntegerType(), False),
StructField("DayOfWeek", IntegerType(), False),
StructField("Carrier", StringType(), False),
StructField("OriginAirportID", IntegerType(), False),
StructField("DestAirportID", IntegerType(), False),
StructField("DepDelay", IntegerType(), False),
StructField("ArrDelay", IntegerType(), False),
])

# membaca data dari file ke DataFrame
flightDataFrame = spark.read.csv('flights.csv', schema=flightSchema, header=True)
flightDataFrame.show(3)

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
+----------+---------+-------+---------------+-------------+--------+--------+
only showing top 3 rows



# Prepare data

In [4]:
# memilih kolom data yang terkait
data = flightDataFrame.select("DayofMonth", "DayOfWeek",
"OriginAirportID", "DestAirportID", "DepDelay", "ArrDelay")
data.show(3)

+----------+---------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+---------------+-------------+--------+--------+
|        19|        5|          11433|        13303|      -3|       1|
|        19|        5|          14869|        12478|       0|      -8|
|        19|        5|          14057|        14869|      -4|     -15|
+----------+---------+---------------+-------------+--------+--------+
only showing top 3 rows



# Sharing training and testing data

In [5]:
# membagi data 70% untuk training, 30% untuk testing
dataTerpisahkan = data.randomSplit([0.7, 0.3])
trainingData = dataTerpisahkan[0] #data training di index 0
testingData = dataTerpisahkan[1] #data testing di index 1
train_rows = trainingData.count()
test_rows = testingData.count()
print ("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 1890299  Testing Rows: 811919


# Prepare training data

In [6]:
# mendefinisikan assembler
assembler = VectorAssembler(inputCols = [
"DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID",
"DepDelay"], outputCol="features")

# mengubah data kita menjadi feature satu kolom menggunakan assembler
trainingDataFinal = assembler.transform(trainingData).select(
col("features"), (col("ArrDelay").cast("Int").alias("label")))
trainingDataFinal.show(truncate=False , n=3)

+------------------------------+-----+
|features                      |label|
+------------------------------+-----+
|[1.0,1.0,10140.0,10397.0,-4.0]|-11  |
|[1.0,1.0,10140.0,10397.0,-2.0]|-18  |
|[1.0,1.0,10140.0,10397.0,-2.0]|-17  |
+------------------------------+-----+
only showing top 3 rows



# Training our regression model

In [7]:
# memanggil regresi linear dari Spark untuk algoritma kita
algoritma = LinearRegression(
labelCol="label",featuresCol="features", maxIter=10, regParam=0.3)

# mentraining model kita dengan training data final
model = algoritma.fit(trainingDataFinal)
print("Model regresi selesai ditraining!")

Model regresi selesai ditraining!


# Prepare testing data

In [8]:
# mengubah data ke satu kolom fitur menggunakan assembler
testingDataFinal = assembler.transform(
testingData).select(
col("features"), (col("ArrDelay")).cast("Int").alias("trueLabel"))
testingDataFinal.show(truncate=False, n=2)

+-----------------------------+---------+
|features                     |trueLabel|
+-----------------------------+---------+
|[1.0,1.0,10140.0,10397.0,0.0]|-9       |
|[1.0,1.0,10140.0,10821.0,4.0]|4        |
+-----------------------------+---------+
only showing top 2 rows



# Predict regression results with the model we have trained

In [10]:
# memprediksi data testing final dengan model yg telah kita training
predisksiMentah = model.transform(testingDataFinal)

#memilih kolom tertentu, yang terkait saja
prediksiFinal = predisksiMentah.select("features", "prediction", "trueLabel")

# menampilkan 3 hasil prediksi
prediksiFinal.show(3)

+--------------------+------------------+---------+
|            features|        prediction|trueLabel|
+--------------------+------------------+---------+
|[1.0,1.0,10140.0,...|-3.550650606641696|       -9|
|[1.0,1.0,10140.0,...|0.3439867789976718|        4|
|[1.0,1.0,10140.0,...|-8.740304813274436|      -23|
+--------------------+------------------+---------+
only showing top 3 rows



# Measuring the accuracy of our regression model

In [11]:
# mengimpor modul untuk mengevaluasi akurasi regresi kita
from pyspark.ml.evaluation import RegressionEvaluator

# mendefinisikan evaluator dengan menginputkan kolom dari "label"
# dan "prediksi" data kita, serta matrik evaluasi yg kita inginkan (rmse)
evaluator = RegressionEvaluator(
labelCol="trueLabel", predictionCol="prediction", metricName="rmse")

# menghitung rmse dg evalutor yang telah kita definisikan
rmse = evaluator.evaluate(prediksiFinal)
print("Root Mean Square Error (RMSE):", rmse)

Root Mean Square Error (RMSE): 13.319806905481526


# Predict just one piece of data

In [14]:
# mengambil 1 baris data DataFrame (output berupa data list)
baris1List = testingData.take(1)

# mengubah list ke RDD, kemudian ke dataFrame
baris1DataFrame = sc.parallelize(baris1List).toDF()

# mengubah data kita ke format fitur data yg telah digunakan sblmnya menggunakan assembler
testingBaris1 = assembler.transform(
baris1DataFrame).select(col("features"),
(col("ArrDelay")).cast("Int").alias("trueLabel"))

# memprediksi data dengan model yg telah kita training
prediskiMentah = model.transform(testingBaris1)
prediksiFinal = prediskiMentah.select("features", "prediction", "trueLabel")

prediksiFinal.show()

+--------------------+------------------+---------+
|            features|        prediction|trueLabel|
+--------------------+------------------+---------+
|[1.0,1.0,10140.0,...|-3.550650606641696|       -9|
+--------------------+------------------+---------+



# Displays 5 rows of training data

In [15]:
trainingData.show(5)

+----------+---------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+---------------+-------------+--------+--------+
|         1|        1|          10140|        10397|      -4|     -11|
|         1|        1|          10140|        10397|      -2|     -18|
|         1|        1|          10140|        10397|      -2|     -17|
|         1|        1|          10140|        10397|       0|     -12|
|         1|        1|          10140|        10821|       8|      -9|
+----------+---------+---------------+-------------+--------+--------+
only showing top 5 rows



# Displays 4 rows of test data

In [16]:
testingData.show(4)

+----------+---------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+---------------+-------------+--------+--------+
|         1|        1|          10140|        10397|       0|      -9|
|         1|        1|          10140|        10821|       4|       4|
|         1|        1|          10140|        11259|      -5|     -23|
|         1|        1|          10140|        11259|      -5|     -14|
+----------+---------+---------------+-------------+--------+--------+
only showing top 4 rows



# Displays coefficients and constants

In [17]:
# Print the coefficients and intercept/constant for linear regression
print("Jumlah fitur: " + str(model.numFeatures))
print("Koefisien: " + str(model.coefficients)) # DayofMonth,DayOfWeek,OriginAirportID,DestAirportID,DepDelay
print("Konstanta: " + str(model.intercept))

Jumlah fitur: 5
Koefisien: [0.010972239517679812,-0.1415044049108317,0.00019352715667810667,-0.00023086025472954724,0.998130533411174]
Konstanta: -2.982229741541443


# Changed the number of variables to just two

In [18]:
# membagi data secara random, 70% untuk training, 30% untuk testing
dataTerpisahkan = data.randomSplit([0.7, 0.3])
trainingData2 = dataTerpisahkan[0] #data training di index 0
testingData2 = dataTerpisahkan[1] #data testing di index 1
train_rows2 = trainingData.count()
test_rows2 = testingData.count()

print ("Jumlah data training:", train_rows2, "| Jumlah data testing:", test_rows2)

Jumlah data training: 1890299 | Jumlah data testing: 811919


In [19]:
# mendefinisikan assembler
# hanya menggunakan dua variabel yaitu OriginAirportID & DestAirportID
assembler2 = VectorAssembler(inputCols = ["OriginAirportID", "DestAirportID"], outputCol="features")

#mengubah data kita menjadi feature satu kolom menggunakan assembler yang kita definisikan
trainingDataFinal2 = assembler2.transform(trainingData2).select(col("features"), (col("ArrDelay").cast("Int").alias("label")))
trainingDataFinal2.show(truncate=False, n=3)

+-----------------+-----+
|features         |label|
+-----------------+-----+
|[10140.0,10397.0]|-11  |
|[10140.0,10397.0]|-18  |
|[10140.0,10397.0]|-17  |
+-----------------+-----+
only showing top 3 rows



In [20]:
model2 = algoritma.fit(trainingDataFinal2)
print("Model regresi selesai ditraining!")

Model regresi selesai ditraining!


In [21]:
# mengubah data ke satu kolom fitur menggunakan assembler yg telah kita definisikan sblmnya
testingDataFinal2 = assembler2.transform(testingData2).select(col("features"), (col("ArrDelay")).cast("Int").alias("trueLabel"))
testingDataFinal2.show(truncate=False, n=2)

+-----------------+---------+
|features         |trueLabel|
+-----------------+---------+
|[10140.0,10397.0]|-9       |
|[10140.0,11259.0]|-11      |
+-----------------+---------+
only showing top 2 rows



In [22]:
# memprediksi data testing final dengan model yg telah kita training
prediksiMentah2 = model2.transform(testingDataFinal2)
# memilih kolom tertentu, yang terkait saja
prediksiFinal2 = prediksiMentah2.select("features", "prediction", "trueLabel")
# menampilkan 3 hasil prediksi
prediksiFinal2.show(3)

+-----------------+------------------+---------+
|         features|        prediction|trueLabel|
+-----------------+------------------+---------+
|[10140.0,10397.0]|7.5309839440462465|       -9|
|[10140.0,11259.0]| 7.443073537441167|      -11|
|[10140.0,11259.0]| 7.443073537441167|        5|
+-----------------+------------------+---------+
only showing top 3 rows



In [23]:
# mengimpor modul untuk mengevaluasi akurasi regresi kita
from pyspark.ml.evaluation import RegressionEvaluator

# mendefinisikan evaluator dengan menginputkan kolom dari "label"
# dan "prediksi" data kita, serta matrik evaluasi yg kita inginkan (rmse)
evaluator2 = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="rmse")

# menghitung rmse dg evalutor yang telah kita definisikan
rmse = evaluator2.evaluate(prediksiFinal2)
print("Root Mean Square Error (RMSE):", rmse)

Root Mean Square Error (RMSE): 38.6106863771816
