In [1]:
# PySpark Setup
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
# Start Spark Session
spark = SparkSession.builder \
    .appName("FlightFarePrediction") \
    .getOrCreate()

In [3]:
# Load Dataset
df = spark.read.csv("/kaggle/input/flight-price-prediction/Clean_Dataset.csv", header=True, inferSchema=True)
df.show(5)

+---+--------+-------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+-----+
|_c0| airline| flight|source_city|departure_time|stops| arrival_time|destination_city|  class|duration|days_left|price|
+---+--------+-------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+-----+
|  0|SpiceJet|SG-8709|      Delhi|       Evening| zero|        Night|          Mumbai|Economy|    2.17|        1| 5953|
|  1|SpiceJet|SG-8157|      Delhi| Early_Morning| zero|      Morning|          Mumbai|Economy|    2.33|        1| 5953|
|  2| AirAsia| I5-764|      Delhi| Early_Morning| zero|Early_Morning|          Mumbai|Economy|    2.17|        1| 5956|
|  3| Vistara| UK-995|      Delhi|       Morning| zero|    Afternoon|          Mumbai|Economy|    2.25|        1| 5955|
|  4| Vistara| UK-963|      Delhi|       Morning| zero|      Morning|          Mumbai|Economy|    2.33|        1| 5955|
+---+--------+-------+-----------+------

In [4]:
# Drop 'Unnamed: 0' and 'flight'
df = df.drop("Unnamed: 0", "flight")

# Map 'stops'
df = df.withColumn("stops", 
                   when(col("stops") == "zero", 0)
                   .when(col("stops") == "one", 1)
                   .otherwise(2))

# Map 'class'
df = df.withColumn("class", when(col("class") == "Economy", 0).otherwise(1))

df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- airline: string (nullable = true)
 |-- source_city: string (nullable = true)
 |-- departure_time: string (nullable = true)
 |-- stops: integer (nullable = false)
 |-- arrival_time: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- class: integer (nullable = false)
 |-- duration: double (nullable = true)
 |-- days_left: integer (nullable = true)
 |-- price: integer (nullable = true)



In [5]:
categorical_cols = ["airline", "source_city", "departure_time", "arrival_time", "destination_city"]
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index") for col in categorical_cols]
encoders = [OneHotEncoder(inputCol=col+"_index", outputCol=col+"_vec") for col in categorical_cols]

numeric_cols = ["stops", "class", "duration", "days_left"]
feature_cols = [col+"_vec" for col in categorical_cols] + numeric_cols

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

In [6]:
gbt = GBTRegressor(featuresCol="features", labelCol="price", maxIter=100, seed=42)

# Build pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, gbt])

In [7]:
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)
model = pipeline.fit(train_data)

# Predictions
predictions = model.transform(test_data)
predictions.select("price", "prediction").show(5)

+-----+------------------+
|price|        prediction|
+-----+------------------+
| 5956| 5272.939848837153|
| 6060| 8713.385272428532|
| 5954| 6779.234371475977|
| 5955| 7835.190452307589|
| 5949|10809.077027218733|
+-----+------------------+
only showing top 5 rows



In [8]:
evaluator_rmse = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
evaluator_mae = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="mae")
evaluator_r2 = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="r2")

rmse = evaluator_rmse.evaluate(predictions)
mae = evaluator_mae.evaluate(predictions)
r2 = evaluator_r2.evaluate(predictions)

print(f"PySpark Model Performance")
print(f"RMSE: {rmse:.2f}")
print(f"MAE: {mae:.2f}")
print(f"R² Score: {r2:.4f}")

PySpark Model Performance
RMSE: 4123.92
MAE: 2393.94
R² Score: 0.9672
