<a href="https://colab.research.google.com/github/alfredwisana/big-data_project/blob/main/project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('local[*]').appName('myApp').getOrCreate()
import matplotlib.pyplot as plt
import seaborn as sns

In [2]:
# Read the CSV file containing Titanic dataset into Spark's DataFrame and show it

df = spark.read.format('csv').options(header=True, inferSchema=True).load('Clean_Dataset.csv')
df.show(30,truncate=50)

+---+---------+-------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+-----+
|_c0|  airline| flight|source_city|departure_time|stops| arrival_time|destination_city|  class|duration|days_left|price|
+---+---------+-------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+-----+
|  0| SpiceJet|SG-8709|      Delhi|       Evening| zero|        Night|          Mumbai|Economy|    2.17|        1| 5953|
|  1| SpiceJet|SG-8157|      Delhi| Early_Morning| zero|      Morning|          Mumbai|Economy|    2.33|        1| 5953|
|  2|  AirAsia| I5-764|      Delhi| Early_Morning| zero|Early_Morning|          Mumbai|Economy|    2.17|        1| 5956|
|  3|  Vistara| UK-995|      Delhi|       Morning| zero|    Afternoon|          Mumbai|Economy|    2.25|        1| 5955|
|  4|  Vistara| UK-963|      Delhi|       Morning| zero|      Morning|          Mumbai|Economy|    2.33|        1| 5955|
|  5|  Vistara| UK-945|      Del

In [3]:
# Try to print the schema

df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- airline: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- source_city: string (nullable = true)
 |-- departure_time: string (nullable = true)
 |-- stops: string (nullable = true)
 |-- arrival_time: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- class: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- days_left: integer (nullable = true)
 |-- price: integer (nullable = true)



In [4]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [5]:
df.select([count(when(isnan(c) | isnull(c),1)).alias(c) for c in df.columns]).show()

+---+-------+------+-----------+--------------+-----+------------+----------------+-----+--------+---------+-----+
|_c0|airline|flight|source_city|departure_time|stops|arrival_time|destination_city|class|duration|days_left|price|
+---+-------+------+-----------+--------------+-----+------------+----------------+-----+--------+---------+-----+
|  0|      0|     0|          0|             0|    0|           0|               0|    0|       0|        0|    0|
+---+-------+------+-----------+--------------+-----+------------+----------------+-----+--------+---------+-----+



<h1>Data Preprocessing

In [6]:
from pyspark.ml.feature import Normalizer,VectorAssembler, StringIndexer
from pyspark.ml import Pipeline

In [7]:
from pyspark.sql.functions import col, max as spark_max, abs as spark_abs

max_abs_value = df.select(spark_max(spark_abs(col("price"))).alias("max_abs_value")).collect()[0]["max_abs_value"]


In [8]:
print(max_abs_value)
len(str(int(max_abs_value)))

123071


6

In [9]:
scaling_factor = 10 ** (len(str(int(max_abs_value)))-1)
df = df.withColumn("normalized_price", col("price") / scaling_factor)
df.show(30,truncate=50)

+---+---------+-------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+-----+----------------+
|_c0|  airline| flight|source_city|departure_time|stops| arrival_time|destination_city|  class|duration|days_left|price|normalized_price|
+---+---------+-------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+-----+----------------+
|  0| SpiceJet|SG-8709|      Delhi|       Evening| zero|        Night|          Mumbai|Economy|    2.17|        1| 5953|         0.05953|
|  1| SpiceJet|SG-8157|      Delhi| Early_Morning| zero|      Morning|          Mumbai|Economy|    2.33|        1| 5953|         0.05953|
|  2|  AirAsia| I5-764|      Delhi| Early_Morning| zero|Early_Morning|          Mumbai|Economy|    2.17|        1| 5956|         0.05956|
|  3|  Vistara| UK-995|      Delhi|       Morning| zero|    Afternoon|          Mumbai|Economy|    2.25|        1| 5955|         0.05955|
|  4|  Vistara| UK-963|      Delhi

In [10]:
column_name = df.columns

In [11]:
column_name.remove('_c0')
column_name.remove('price')
column_name.remove('flight')
print(column_name)

['airline', 'source_city', 'departure_time', 'stops', 'arrival_time', 'destination_city', 'class', 'duration', 'days_left', 'normalized_price']


In [12]:
final_df = df.select(column_name)


indexers = [StringIndexer(inputCol=col, outputCol=col+'_idx').fit(final_df) for col in ['airline', 'stops','class']]
pipeline = Pipeline(stages=indexers)
final_df = pipeline.fit(final_df).transform(final_df)
final_df.show()

+---------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+----------------+-----------+---------+---------+
|  airline|source_city|departure_time|stops| arrival_time|destination_city|  class|duration|days_left|normalized_price|airline_idx|stops_idx|class_idx|
+---------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+----------------+-----------+---------+---------+
| SpiceJet|      Delhi|       Evening| zero|        Night|          Mumbai|Economy|    2.17|        1|         0.05953|        5.0|      1.0|      0.0|
| SpiceJet|      Delhi| Early_Morning| zero|      Morning|          Mumbai|Economy|    2.33|        1|         0.05953|        5.0|      1.0|      0.0|
|  AirAsia|      Delhi| Early_Morning| zero|Early_Morning|          Mumbai|Economy|    2.17|        1|         0.05956|        4.0|      1.0|      0.0|
|  Vistara|      Delhi|       Morning| zero|    Afternoon|          Mumbai|Economy|    2

In [13]:
# from pyspark.ml.feature import StringIndexer
indexers = [StringIndexer(inputCol=col, outputCol=col+'_idx', stringOrderType="alphabetAsc").fit(final_df) for col in ['source_city', "departure_time",'destination_city','arrival_time']]
# indexers = StringIndexer(inputCol="departure_time", outputCol="category_index", stringOrderType="alphabetAsc")
pipeline = Pipeline(stages=indexers)
final_df = pipeline.fit(final_df).transform(final_df)
final_df.show()

+---------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+----------------+-----------+---------+---------+---------------+------------------+--------------------+----------------+
|  airline|source_city|departure_time|stops| arrival_time|destination_city|  class|duration|days_left|normalized_price|airline_idx|stops_idx|class_idx|source_city_idx|departure_time_idx|destination_city_idx|arrival_time_idx|
+---------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+----------------+-----------+---------+---------+---------------+------------------+--------------------+----------------+
| SpiceJet|      Delhi|       Evening| zero|        Night|          Mumbai|Economy|    2.17|        1|         0.05953|        5.0|      1.0|      0.0|            2.0|               2.0|                 5.0|             5.0|
| SpiceJet|      Delhi| Early_Morning| zero|      Morning|          Mumbai|Economy|    2.33|        

In [14]:
assembler = VectorAssembler(inputCols=['airline_idx', 'source_city_idx', 'departure_time_idx', 'stops_idx', 'arrival_time_idx', 'destination_city_idx', 'class_idx','duration', 'days_left'], outputCol="features")
final_df = assembler.transform(final_df)
final_df.show(30,truncate=50)

+---------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+----------------+-----------+---------+---------+---------------+------------------+--------------------+----------------+---------------------------------------+
|  airline|source_city|departure_time|stops| arrival_time|destination_city|  class|duration|days_left|normalized_price|airline_idx|stops_idx|class_idx|source_city_idx|departure_time_idx|destination_city_idx|arrival_time_idx|                               features|
+---------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+----------------+-----------+---------+---------+---------------+------------------+--------------------+----------------+---------------------------------------+
| SpiceJet|      Delhi|       Evening| zero|        Night|          Mumbai|Economy|    2.17|        1|         0.05953|        5.0|      1.0|      0.0|            2.0|               2.0|                 5.

In [15]:
# Initialize the StandardScaler
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withMean=True, withStd=True)

# Fit the scaler to the data
scaler_model = scaler.fit(final_df)

# Transform the data
final_df = scaler_model.transform(final_df)

final_df.show(10,truncate=10)

+--------+-----------+--------------+-----+------------+----------------+-------+--------+---------+----------------+-----------+---------+---------+---------------+------------------+--------------------+----------------+----------+--------------+
| airline|source_city|departure_time|stops|arrival_time|destination_city|  class|duration|days_left|normalized_price|airline_idx|stops_idx|class_idx|source_city_idx|departure_time_idx|destination_city_idx|arrival_time_idx|  features|scaledFeatures|
+--------+-----------+--------------+-----+------------+----------------+-------+--------+---------+----------------+-----------+---------+---------+---------------+------------------+--------------------+----------------+----------+--------------+
|SpiceJet|      Delhi|       Evening| zero|       Night|          Mumbai|Economy|    2.17|        1|         0.05953|        5.0|      1.0|      0.0|            2.0|               2.0|                 5.0|             5.0|[5.0,2....|    [2.8530...|
|Spi

In [16]:
from pyspark.ml.feature import PCA


pca = PCA(k=2, inputCol="scaledFeatures", outputCol="pcafeatures")
model = pca.fit(final_df)

# Step 5: Transform Data
final_df = model.transform(final_df)


In [17]:
final_df.show(10, truncate=50)

+--------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+----------------+-----------+---------+---------+---------------+------------------+--------------------+----------------+--------------------------------------+--------------------------------------------------+----------------------------------------+
| airline|source_city|departure_time|stops| arrival_time|destination_city|  class|duration|days_left|normalized_price|airline_idx|stops_idx|class_idx|source_city_idx|departure_time_idx|destination_city_idx|arrival_time_idx|                              features|                                    scaledFeatures|                             pcafeatures|
+--------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+----------------+-----------+---------+---------+---------------+------------------+--------------------+----------------+--------------------------------------+----------------------------

In [18]:
feature_vector_df = final_df.select(['features','pcafeatures', 'normalized_price'])
feature_vector_df.show(truncate=False)

+---------------------------------------+----------------------------------------+----------------+
|features                               |pcafeatures                             |normalized_price|
+---------------------------------------+----------------------------------------+----------------+
|[5.0,2.0,2.0,1.0,5.0,5.0,0.0,2.17,1.0] |[-3.1227313170319326,0.8195523333926981]|0.05953         |
|[5.0,2.0,1.0,1.0,4.0,5.0,0.0,2.33,1.0] |[-3.2441211351582537,0.9389984415144089]|0.05953         |
|[4.0,2.0,1.0,1.0,1.0,5.0,0.0,2.17,1.0] |[-2.909630238064748,1.3489209830782212] |0.05956         |
|[0.0,2.0,4.0,1.0,0.0,5.0,0.0,2.25,1.0] |[-0.9312843519479208,1.4337267897901353]|0.05955         |
|[0.0,2.0,4.0,1.0,4.0,5.0,0.0,2.33,1.0] |[-0.8280395935471081,0.8337296928415222]|0.05955         |
|[0.0,2.0,4.0,1.0,0.0,5.0,0.0,2.33,1.0] |[-0.9259210191354426,1.4339920327685218]|0.05955         |
|[0.0,2.0,4.0,1.0,4.0,5.0,0.0,2.08,1.0] |[-0.8448000085861026,0.8329008085340645]|0.0606          |


<h3> Regression Model

In [19]:
from pyspark.ml.evaluation import RegressionEvaluator

In [20]:
(trainData, testData) = feature_vector_df.randomSplit([0.8,0.2],seed = 2)

<h5> XGBoost

In [21]:
from pyspark.ml.regression import GBTRegressor
import xgboost as xg
import sklearn
import numpy as np

In [22]:
xgb_r = xg.XGBRegressor(objective ='reg:linear',n_estimators = 10, seed = 123).fit(np.vstack(trainData.select("pcafeatures").collect()), np.vstack(trainData.select("normalized_price").collect()))



In [23]:
pred = xgb_r.predict(np.vstack(testData.select("pcafeatures").collect()))
pred

array([0.156964  , 0.1186531 , 0.1186531 , ..., 0.13934778, 0.13934778,
       0.13934778], dtype=float32)

In [24]:
from pyspark.sql.functions import monotonically_increasing_id

test_pred = spark.createDataFrame(pred)

testData_pred = testData.withColumn('key', monotonically_increasing_id())
test_pred = test_pred.withColumn('key', monotonically_increasing_id())

test_pred = test_pred.withColumn("prediction", test_pred["value"])
# Perform the join on the key column
test_with_pred = testData_pred.join(test_pred, on='key')


test_with_pred = test_with_pred.drop('key')

test_with_pred.show(10, truncate=50)

+----------------------------------+-------------------------------------------+----------------+-----------+-----------+
|                          features|                                pcafeatures|normalized_price|      value| prediction|
+----------------------------------+-------------------------------------------+----------------+-----------+-----------+
|(9,[0,1,7,8],[1.0,2.0,25.83,32.0])|  [0.41392440114449125,-0.2835078796124625]|         0.05504|   0.156964|   0.156964|
|  (9,[1,2,7,8],[2.0,4.0,4.83,8.0])|[-0.06280998827713538,-0.28395208578893105]|         0.24225|0.118653096|0.118653096|
| (9,[1,2,7,8],[2.0,4.0,4.83,20.0])| [-0.10377365454085037,-0.2761150616926321]|         0.13003|0.118653096|0.118653096|
| (9,[1,2,7,8],[2.0,4.0,4.83,23.0])|[-0.11401457110677912,-0.27415580566855735]|         0.10273|0.118653096|0.118653096|
| (9,[1,2,7,8],[2.0,4.0,4.83,26.0])|[-0.12425548767270786,-0.27219654964448264]|         0.10798|0.118653096|0.118653096|
| (9,[1,2,7,8],[2.0,4.0,

In [25]:
evaluator = RegressionEvaluator(labelCol='normalized_price', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(test_with_pred)
print ("Root Mean Square of XGBoost model is %g"%rmse)

Root Mean Square of XGBoost model is 0.275372


<h5> Random Forest Regressor

In [26]:
from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(featuresCol='features',labelCol='normalized_price').fit(trainData)
prediction = rf.transform(testData)

In [27]:
prediction.show(10, truncate=50)

+----------------------------------+-------------------------------------------+----------------+-------------------+
|                          features|                                pcafeatures|normalized_price|         prediction|
+----------------------------------+-------------------------------------------+----------------+-------------------+
|(9,[0,1,7,8],[1.0,2.0,25.83,32.0])|  [0.41392440114449125,-0.2835078796124625]|         0.05504| 0.1015390227579031|
|  (9,[1,2,7,8],[2.0,4.0,4.83,8.0])|[-0.06280998827713538,-0.28395208578893105]|         0.24225|0.14803961312439845|
| (9,[1,2,7,8],[2.0,4.0,4.83,20.0])| [-0.10377365454085037,-0.2761150616926321]|         0.13003|0.10153763330402718|
| (9,[1,2,7,8],[2.0,4.0,4.83,23.0])|[-0.11401457110677912,-0.27415580566855735]|         0.10273|0.10153763330402718|
| (9,[1,2,7,8],[2.0,4.0,4.83,26.0])|[-0.12425548767270786,-0.27219654964448264]|         0.10798|0.10153763330402718|
| (9,[1,2,7,8],[2.0,4.0,4.83,29.0])|  [-0.13449640423863

In [28]:
evaluator = RegressionEvaluator(labelCol='normalized_price', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(prediction)
print ("Root Mean Square of Random Forest model is %g"%rmse)

Root Mean Square of Random Forest model is 0.0620803


With PCA ed Features

In [29]:
rf = RandomForestRegressor(featuresCol='pcafeatures',labelCol='normalized_price').fit(trainData)
prediction = rf.transform(testData)

In [30]:
prediction.show(10, truncate=50)

+----------------------------------+-------------------------------------------+----------------+-------------------+
|                          features|                                pcafeatures|normalized_price|         prediction|
+----------------------------------+-------------------------------------------+----------------+-------------------+
|(9,[0,1,7,8],[1.0,2.0,25.83,32.0])|  [0.41392440114449125,-0.2835078796124625]|         0.05504|0.12411490720503404|
|  (9,[1,2,7,8],[2.0,4.0,4.83,8.0])|[-0.06280998827713538,-0.28395208578893105]|         0.24225|0.12331327429914984|
| (9,[1,2,7,8],[2.0,4.0,4.83,20.0])| [-0.10377365454085037,-0.2761150616926321]|         0.13003|0.12331327429914984|
| (9,[1,2,7,8],[2.0,4.0,4.83,23.0])|[-0.11401457110677912,-0.27415580566855735]|         0.10273|0.12331327429914984|
| (9,[1,2,7,8],[2.0,4.0,4.83,26.0])|[-0.12425548767270786,-0.27219654964448264]|         0.10798|0.12331327429914984|
| (9,[1,2,7,8],[2.0,4.0,4.83,29.0])|  [-0.13449640423863

In [31]:
evaluator = RegressionEvaluator(labelCol='normalized_price', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(prediction)
print ("Root Mean Square of Random Forest model is %g"%rmse)

Root Mean Square of Random Forest model is 0.134258


<h5> Support Vector Regressor

In [32]:
from sklearn.svm import LinearSVR

In [33]:
svr = LinearSVR(random_state=0,tol=1e-4,epsilon=0.1,dual=True)

In [44]:
svr_model = svr.fit(np.vstack(trainData.select("pcafeatures").collect()),np.vstack(trainData.select("normalized_price").collect()))

  y = column_or_1d(y, warn=True)


In [45]:
y_pred = svr.predict(np.vstack(testData.select("pcafeatures").collect()))

In [46]:
from pyspark.sql.functions import monotonically_increasing_id

test_pred = spark.createDataFrame(y_pred)

testData_pred = testData.withColumn('key', monotonically_increasing_id())
test_pred = test_pred.withColumn('key', monotonically_increasing_id())

test_pred = test_pred.withColumn("prediction", test_pred["value"])
# Perform the join on the key column
test_with_pred = testData_pred.join(test_pred, on='key')


test_with_pred = test_with_pred.drop('key')
test_with_pred = test_with_pred.drop('value')

test_with_pred.show(10, truncate=50)

+----------------------------------+-------------------------------------------+----------------+-------------------+
|                          features|                                pcafeatures|normalized_price|         prediction|
+----------------------------------+-------------------------------------------+----------------+-------------------+
|(9,[0,1,7,8],[1.0,2.0,25.83,32.0])|  [0.41392440114449125,-0.2835078796124625]|         0.05504| 0.2555560452891819|
|  (9,[1,2,7,8],[2.0,4.0,4.83,8.0])|[-0.06280998827713538,-0.28395208578893105]|         0.24225|0.19807070181172298|
| (9,[1,2,7,8],[2.0,4.0,4.83,20.0])| [-0.10377365454085037,-0.2761150616926321]|         0.13003|0.19304870063409196|
| (9,[1,2,7,8],[2.0,4.0,4.83,23.0])|[-0.11401457110677912,-0.27415580566855735]|         0.10273| 0.1917932003396842|
| (9,[1,2,7,8],[2.0,4.0,4.83,26.0])|[-0.12425548767270786,-0.27219654964448264]|         0.10798|0.19053770004527643|
| (9,[1,2,7,8],[2.0,4.0,4.83,29.0])|  [-0.13449640423863

In [47]:
evaluator = RegressionEvaluator(labelCol='normalized_price', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(test_with_pred)
print ("Root Mean Square of SVR model is %g"%rmse)

Root Mean Square of SVR model is 0.264315


In [38]:
# spark.stop