# Mengimport Spark SQL, library machine learning dan membuat session

In [1]:
#mengimport modul yang dibutuhkan
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

#membuat SparkSession
appName = "Regresi di Spark"
spark = SparkSession \
    .builder \
    .appName(appName) \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# Memuat data dari file

In [2]:
#membuat skema file
flightSchema = StructType([
  StructField("DayofMonth", IntegerType(), False),
  StructField("DayOfWeek", IntegerType(), False),
  StructField("Carrier", StringType(), False),
  StructField("OriginAirportID", IntegerType(), False),
  StructField("DestAirportID", IntegerType(), False),
  StructField("DepDelay", IntegerType(), False),
  StructField("ArrDelay", IntegerType(), False),
])
#membaca data dari file ke DataFrame
flightDataFrame = spark.read.csv('dataset/flights.csv', 
                                 schema=flightSchema, header=True)
flightDataFrame.show(3)

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
+----------+---------+-------+---------------+-------------+--------+--------+
only showing top 3 rows



# Menyiapkan data

In [3]:
#memilih kolom data yang terkait
data = flightDataFrame.select("DayofMonth", "DayOfWeek", 
                              "OriginAirportID", "DestAirportID", 
                              "DepDelay", "ArrDelay")
data.show(3)

+----------+---------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+---------------+-------------+--------+--------+
|        19|        5|          11433|        13303|      -3|       1|
|        19|        5|          14869|        12478|       0|      -8|
|        19|        5|          14057|        14869|      -4|     -15|
+----------+---------+---------------+-------------+--------+--------+
only showing top 3 rows



# Membagi data training dan testing

In [4]:
#membagi data secara random, 70% untuk training, 30% untuk testing
dataTerpisahkan = data.randomSplit([0.7, 0.3]) 
trainingData = dataTerpisahkan[0] #data training di index 0
testingData = dataTerpisahkan[1] #data testing di index 1
train_rows = trainingData.count()
test_rows = testingData.count()
print ("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 1890251  Testing Rows: 811967


# Menyiapkan data training

In [5]:
#mendefinisikan assembler
assembler = VectorAssembler(inputCols = [
    "DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID", 
    "DepDelay"], outputCol="features")
#mengubah data kita menjadi feature satu kolom, 
#menggunakan assembler yang kita definisikan
trainingDataFinal = assembler.transform(trainingData).select(
    col("features"), (col("ArrDelay").cast("Int").alias("label")))
trainingDataFinal.show(truncate=False , n=3)

+------------------------------+-----+
|features                      |label|
+------------------------------+-----+
|[1.0,1.0,10140.0,10397.0,-4.0]|-11  |
|[1.0,1.0,10140.0,10821.0,8.0] |-9   |
|[1.0,1.0,10140.0,11259.0,-2.0]|-14  |
+------------------------------+-----+
only showing top 3 rows



# Training model regresi kita

In [6]:
#memanggil API regresi linear dari Spark untuk algoritma kita
algoritma = LinearRegression(
    labelCol="label",featuresCol="features", maxIter=10, regParam=0.3)
#mentraining model kita dengan training data final
model = algoritma.fit(trainingDataFinal)
print ("Model regresi selesai ditraining!")

Model regresi selesai ditraining!


# Menyiapkan data testing

In [7]:
#mengubah data ke satu kolom fitur menggunakan assembler 
#yg telah kita definisikan sblmnya
testingDataFinal = assembler.transform(
    testingData).select(
    col("features"), (col("ArrDelay")).cast("Int").alias("trueLabel"))
testingDataFinal.show(truncate=False, n=2)

+------------------------------+---------+
|features                      |trueLabel|
+------------------------------+---------+
|[1.0,1.0,10140.0,10397.0,-2.0]|-17      |
|[1.0,1.0,10140.0,11259.0,-3.0]|-11      |
+------------------------------+---------+
only showing top 2 rows



# Memprediksi hasil regresi dengan model yang telah kita training

In [8]:
#memprediksi data testing final dengan model yg telah kita training
predisksiMentah = model.transform(testingDataFinal)
#memilih kolom tertentu, yang terkait saja
prediksiFinal = predisksiMentah.select("features", "prediction", "trueLabel")
prediksiFinal.show(3)#menampilkan 3 hasil prediksi

+--------------------+------------------+---------+
|            features|        prediction|trueLabel|
+--------------------+------------------+---------+
|[1.0,1.0,10140.0,...|-5.560387187877221|      -17|
|[1.0,1.0,10140.0,...|-6.753304734049848|      -11|
|[1.0,1.0,10140.0,...|-4.758268756369903|      -11|
+--------------------+------------------+---------+
only showing top 3 rows



# Mengukur akurasi dari model regresi kita

In [9]:
#mengimpor modul untuk mengevaluasi akurasi regresi kita
from pyspark.ml.evaluation import RegressionEvaluator
#mendefinisikan evaluator dengan menginputkan kolom dari "label" 
#dan "prediksi" data kita, serta matrik evaluasi yg kita inginkan (rmse)
evaluator = RegressionEvaluator(
    labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
#menghitung rmse dg evalutor yang telah kita definisikan
rmse = evaluator.evaluate(prediksiFinal)
print ("Root Mean Square Error (RMSE):", rmse)

Root Mean Square Error (RMSE): 13.264533049472382


# Memprediksi satu data saja

In [10]:
#mengambil 1 baris data DataFrame (output berupa data list)
baris1List = testingData.take(1)
#mengubah list ke RDD, kemudian ke dataFrame
baris1DataFrame = sc.parallelize(baris1List).toDF()
#mengubah data kita ke format fitur data yg telah digunakan sblmnya 
#menggunakan assembler
testingBaris1 = assembler.transform(
    baris1DataFrame).select(col("features"), 
                            (col("ArrDelay")).cast("Int").alias("trueLabel"))
#memprediksi data dengan model yg telah kita training
prediskiMentah = model.transform(testingBaris1)
prediksiFinal = prediskiMentah.select("features", "prediction", "trueLabel")
prediksiFinal.show()

+--------------------+------------------+---------+
|            features|        prediction|trueLabel|
+--------------------+------------------+---------+
|[1.0,1.0,10140.0,...|-5.560387187877221|      -17|
+--------------------+------------------+---------+

