In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

# spark = (
#     SparkSession.builder
#     .appName("TestApp")
#     .master("spark://localhost:7077") # point to your Spark Master
#     .getOrCreate()
# )




spark = SparkSession.builder.master("local[*]").appName("TestApp").getOrCreate()

spark.sparkContext.setLogLevel("ERROR")


df = spark.read.csv("indian_flights_2025_comprehensive.csv", header=True)
df.show(5)


# spark.stop() 


+------+--------+-------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+-----+
|number| airline| flight|source_city|departure_time|stops| arrival_time|destination_city| classx|duration|days_left|price|
+------+--------+-------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+-----+
|     0|SpiceJet|SG-8709|      Delhi|       Evening| zero|        Night|          Mumbai|Economy|    2.17|        1| 5953|
|     1|SpiceJet|SG-8157|      Delhi| Early_Morning| zero|      Morning|          Mumbai|Economy|    2.33|        1| 5953|
|     2| AirAsia| I5-764|      Delhi| Early_Morning| zero|Early_Morning|          Mumbai|Economy|    2.17|        1| 5956|
|     3| Vistara| UK-995|      Delhi|       Morning| zero|    Afternoon|          Mumbai|Economy|    2.25|        1| 5955|
|     4| Vistara| UK-963|      Delhi|       Morning| zero|      Morning|          Mumbai|Economy|    2.33|        1| 5955|
+------+--------

In [16]:
import os
print(os.path.exists("test.csv"))

True


In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, avg, first
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.regression import FMRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
import pyspark.sql.functions as F

In [24]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, avg, max, min, stddev, count
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import FMRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
import pyspark.sql.functions as F

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("FlightPriceMultiplierModel_V2") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

print("=== Loading Dataset ===")
# Load the dataset
df = spark.read.csv("test.csv", header=True, inferSchema=True)

print(f"Total records: {df.count()}")
print("Sample data:")
df.show(10, truncate=False)

print("\n=== Data Analysis ===")
# Let's understand the data better
print("Unique flights:")
df.select("flight").distinct().show()

print("Price statistics by days_left:")
df.groupBy("days_left").agg(
    F.avg("price").alias("avg_price"),
    F.min("price").alias("min_price"),
    F.max("price").alias("max_price"),
    F.count("price").alias("count")
).orderBy("days_left").show()

print("Price statistics by class:")
df.groupBy("classx").agg(
    F.avg("price").alias("avg_price"),
    F.min("price").alias("min_price"),
    F.max("price").alias("max_price"),
    F.count("price").alias("count")
).orderBy("classx").show()

print("Price statistics by flight and days_left:")
df.groupBy("flight", "days_left").agg(
    F.avg("price").alias("avg_price"),
    F.count("price").alias("count")
).orderBy("flight", "days_left").show()

print("\n=== Creating Baseline Prices ===")
# For each unique route (flight), find the reference price
# Let's use the price at 1 day left as the baseline (last minute price)
baseline_df = df.filter(col("days_left") == 1).groupBy("flight").agg(
    F.avg("price").alias("baseline_price")
)

print("Baseline prices (at 1 day left):")
baseline_df.show()

# Join with original data
df_with_baseline = df.join(baseline_df, on="flight", how="inner")

# Calculate price ratio (this should be <= 1.0 for advance bookings)
df_with_ratio = df_with_baseline.withColumn(
    "price_ratio", 
    col("price") / col("baseline_price")
)

print("Data with price ratios:")
df_with_ratio.select("flight", "days_left", "classx", "price", "baseline_price", "price_ratio").show()

print("\n=== Price Ratio Analysis ===")
# Analyze the ratios
df_with_ratio.groupBy("days_left", "classx").agg(
    F.avg("price_ratio").alias("avg_ratio"),
    F.min("price_ratio").alias("min_ratio"),
    F.max("price_ratio").alias("max_ratio"),
    F.count("price_ratio").alias("count")
).orderBy("days_left", "classx").show()

print("\n=== Feature Engineering ===")
# Simple feature engineering
# Convert class to numeric (0 = Economy, 1 = Business)
df_features = df_with_ratio.withColumn(
    "class_numeric", 
    when(col("classx") == "Economy", 0.0).otherwise(1.0)
)

# Create features vector
assembler = VectorAssembler(
    inputCols=["days_left", "class_numeric"],
    outputCol="features"
)

# Apply feature engineering
df_final = assembler.transform(df_features)

print("Final feature data:")
df_final.select("days_left", "classx", "class_numeric", "price_ratio", "features").show()

print("\n=== Train/Test Split ===")
# Split data (90% train, 10% test)
train_data, test_data = df_final.randomSplit([0.9, 0.1], seed=42)

print(f"Training records: {train_data.count()}")
print(f"Test records: {test_data.count()}")

# If test data is too small, let's use a different split or all data for training
if test_data.count() < 2:
    print("Test data too small, using 80-20 split")
    train_data, test_data = df_final.randomSplit([0.8, 0.2], seed=42)
    if test_data.count() < 2:
        print("Still too small, using all data for both training and testing")
        train_data = df_final
        test_data = df_final

print("\n=== Training FMRegressor ===")
# Create and configure FMRegressor
fm_regressor = FMRegressor(
    featuresCol="features",
    labelCol="price_ratio",
    predictionCol="predicted_ratio",
    factorSize=4,          # Smaller factor size for simple data
    fitIntercept=True,
    fitLinear=True,
    regParam=0.1,          # Higher regularization to prevent overfitting
    maxIter=50,            # Fewer iterations
    stepSize=0.1,          # Smaller step size
    miniBatchFraction=1.0
)

print("Training model...")
model = fm_regressor.fit(train_data)

print("\n=== Making Predictions ===")
# Make predictions
predictions = model.transform(test_data)

print("Predictions:")
predictions.select(
    "flight", "days_left", "classx", "price", "baseline_price", 
    "price_ratio", "predicted_ratio"
).show()

print("\n=== Model Evaluation ===")
evaluator = RegressionEvaluator(
    labelCol="price_ratio",
    predictionCol="predicted_ratio"
)

rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})

print(f"RMSE: {rmse:.4f}")
print(f"MAE: {mae:.4f}")
print(f"R²: {r2:.4f}")

print("\n=== Simple Prediction Function ===")
def predict_price_multiplier(days_left, ticket_class):
    """
    Predict price multiplier for given inputs
    """
    from pyspark.sql import Row
    
    # Convert class to numeric
    class_numeric = 0.0 if ticket_class == "Economy" else 1.0
    
    # Create input DataFrame
    input_data = spark.createDataFrame([
        Row(days_left=float(days_left), class_numeric=class_numeric)
    ])
    
    # Add features
    input_with_features = assembler.transform(input_data)
    
    # Predict
    prediction = model.transform(input_with_features)
    multiplier = prediction.select("predicted_ratio").collect()[0][0]
    
    # Ensure multiplier is reasonable (between 0.3 and 1.5) - use Python built-ins correctly
    if multiplier < 0.3:
        multiplier = 0.3
    elif multiplier > 1.5:
        multiplier = 1.5
    
    return multiplier


=== Loading Dataset ===
Total records: 300153
Sample data:
+------+--------+-------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+-----+
|number|airline |flight |source_city|departure_time|stops|arrival_time |destination_city|classx |duration|days_left|price|
+------+--------+-------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+-----+
|0     |SpiceJet|SG-8709|Delhi      |Evening       |zero |Night        |Mumbai          |Economy|2.17    |1        |5953 |
|1     |SpiceJet|SG-8157|Delhi      |Early_Morning |zero |Morning      |Mumbai          |Economy|2.33    |1        |5953 |
|2     |AirAsia |I5-764 |Delhi      |Early_Morning |zero |Early_Morning|Mumbai          |Economy|2.17    |1        |5956 |
|3     |Vistara |UK-995 |Delhi      |Morning       |zero |Afternoon    |Mumbai          |Economy|2.25    |1        |5955 |
|4     |Vistara |UK-963 |Delhi      |Morning       |zero |Morning      |Mumbai  

In [28]:
print("\n=== Example Predictions ===")
test_scenarios = [
    (1, "Economy"),
    (5, "Economy"), 
    (10, "Economy"),
    (20, "Economy"),
    (1, "Business"),
    (5, "Business"),
    (10, "Business"),
    (20, "Business")
]

print("Price multiplier predictions:")
for days, class_type in test_scenarios:
    try:
        multiplier = predict_price_multiplier(days, class_type)
        base_price = 10000
        final_price = int(base_price * (multiplier))
        print(f"Days: {days:2d}, Class: {class_type:8s} -> Multiplier: {multiplier:.3f}, Price: ₹{final_price:,}")
    except Exception as e:
        print(f"Error for {days} days {class_type}: {e}")

print("\n=== Model Summary ===")
print(f"""
Model trained successfully!
- Input features: days_left, class (Economy/Business)  
- Output: price_multiplier (ratio of current price to baseline)
- Training RMSE: {rmse:.4f}
- Training R²: {r2:.4f}

Usage:
multiplier = predict_price_multiplier(days_left, "Economy" or "Business")
final_price = base_price * multiplier
""")




=== Example Predictions ===
Price multiplier predictions:
Days:  1, Class: Economy  -> Multiplier: 0.486, Price: ₹4,863
Days:  5, Class: Economy  -> Multiplier: 0.476, Price: ₹4,755
Days: 10, Class: Economy  -> Multiplier: 0.462, Price: ₹4,619
Days: 20, Class: Economy  -> Multiplier: 0.435, Price: ₹4,348
Days:  1, Class: Business -> Multiplier: 1.119, Price: ₹11,188
Days:  5, Class: Business -> Multiplier: 1.341, Price: ₹13,406
Days: 10, Class: Business -> Multiplier: 1.500, Price: ₹15,000
Days: 20, Class: Business -> Multiplier: 1.500, Price: ₹15,000

=== Model Summary ===

Model trained successfully!
- Input features: days_left, class (Economy/Business)  
- Output: price_multiplier (ratio of current price to baseline)
- Training RMSE: 1.0131
- Training R²: 0.5137

Usage:
multiplier = predict_price_multiplier(days_left, "Economy" or "Business")
final_price = base_price * multiplier



In [29]:
print("\n=== Model Training Complete! ===")
print(f"""
Summary:
- Model Type: Factorization Machine Regressor
- Training Records: {train_df.count()}
- Test Records: {test_df.count()}
- RMSE: {rmse:.4f}
- MAE: {mae:.4f}
- R²: {r2:.4f}

The model can now predict price multipliers based on:
- Days left until departure
- Class (Economy/Business)

Usage: multiplier = predict_price_multiplier(days_left, class)
Final price = base_price * multiplier
""")


=== Model Training Complete! ===

Summary:
- Model Type: Factorization Machine Regressor
- Training Records: 270138
- Test Records: 30015
- RMSE: 1.0131
- MAE: 0.5947
- R²: 0.5137

The model can now predict price multipliers based on:
- Days left until departure
- Class (Economy/Business)

Usage: multiplier = predict_price_multiplier(days_left, class)
Final price = base_price * multiplier

