In [35]:
from pyspark.sql import SparkSession
import numpy as np 
import pandas as pd
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.sql.types import DoubleType
from pyspark.ml import PipelineModel
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressor


In [2]:
def start_spark_session():
    spark = SparkSession.builder.appName("demo2")\
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3") \
    .config("spark.jars", "./jars/sqlite-jdbc-3.47.0.0.jar")\
    .config("spark.driver.extraClassPath", "./jars/sqlite-jdbc-3.47.0.0.jar").getOrCreate()
    return spark

In [3]:
spark = start_spark_session()

In [4]:
def read_data(spark, filepath):    
    data = spark.read.csv(filepath, header=True, inferSchema=True)
    return data

In [5]:
# features_cols=['airline',  'source_city', 'departure_time','stops', 'arrival_time',
#             'destination_city', 'class', 'duration','days_left', ]
# target_col='price'


In [6]:
# train_data = read_data(spark,"./docs/train.csv")
# test_data = read_data(spark,"./docs/test.csv")

In [13]:
class pre_process:
    
    
    train_data = None
    train_numerical_features = None
    train_categorical_features = None

    def __init__(self,train_data):
        self.train_data = train_data

    def split_features(self):
        pandas_df = self.train_data.toPandas()
        numerical_features = ['duration', 'days_left']
        categorical_features = pandas_df.select_dtypes(include='object').columns.tolist()
        self.train_numerical_features = numerical_features
        self.train_categorical_features = categorical_features
    
    
    
    # Convert categorical features into nominal features
    def get_indexers(self):
        indexers = [StringIndexer(inputCol=feature, outputCol=feature + "_index", handleInvalid='keep') for feature in self.train_categorical_features]
        return indexers
        
    # One hot encode categorical features
    def get_encoders(self):
        encoders = [OneHotEncoder(inputCols=[feature + "_index"], outputCols=[feature + "_encoded"],handleInvalid='keep') for feature in self.train_categorical_features]
        return encoders

    # Scale numerical features
    def get_numerical_f_assembler_scaler(self):
        numerical_assembler = VectorAssembler(inputCols=self.train_numerical_features,outputCol="numerical_features")
        scaler = MinMaxScaler(inputCol="numerical_features", outputCol="numerical_features_scaled")
        return numerical_assembler, scaler




    def get_pipeline_model(self):
        
        indexers = self.get_indexers()
        encoders = self.get_encoders()
        numerical_assembler, scaler = self.get_numerical_f_assembler_scaler()
        
        pipeline_indexers_encoders = Pipeline(stages=indexers + encoders + [numerical_assembler, scaler])
        pipeline_model = pipeline_indexers_encoders.fit(self.train_data)
        return pipeline_model


    def save_fitted_pipeline_model(self,pipeline_model, path_string):
        try:
            pipeline_model.save(path_string)
        except:
            print("Error Saving Pipeline Model")
        
    # Assemble vectorized features (categorical and numerical)
    def get_final_assembler(self):
        encoded_feature_cols = [feature + "_encoded" for feature in self.train_categorical_features]


        final_assembler = VectorAssembler(
            inputCols=encoded_feature_cols + ["numerical_features_scaled"],
            outputCol="features"
        )
        return final_assembler

In [14]:
train_data = read_data(spark,"./docs/train.csv")
train_data = train_data.withColumn("price", train_data["price"].cast(DoubleType()))
train_data.select("*").show()

+------+---------+-------+-----------+--------------+-----------+-------------+----------------+--------+--------+---------+-------+
|    id|  airline| flight|source_city|departure_time|      stops| arrival_time|destination_city|   class|duration|days_left|  price|
+------+---------+-------+-----------+--------------+-----------+-------------+----------------+--------+--------+---------+-------+
| 25807| GO_FIRST| G8-334|      Delhi| Early_Morning|        one|        Night|         Kolkata| Economy|   15.58|       33| 6395.0|
| 55971|Air_India| AI-637|     Mumbai|       Evening|two_or_more|    Afternoon|       Bangalore| Economy|   17.92|       19|10022.0|
| 43120| GO_FIRST| G8-339|     Mumbai|       Evening|       zero|        Night|           Delhi| Economy|    2.17|        2| 5942.0|
|170381|   Indigo|6E-6007|  Hyderabad|         Night|        one|Early_Morning|       Bangalore| Economy|    7.08|       47| 1755.0|
| 76546| GO_FIRST|G8-7536|     Mumbai|     Afternoon|        one|    

In [15]:
pre_processer = pre_process(train_data)
pre_processer.split_features()
pipeline_model = pre_processer.get_pipeline_model()
pre_processer.save_fitted_pipeline_model(pipeline_model, "./docs/pipeline_model_backup")

In [None]:

# test_data = read_data(spark,"./docs/test.csv")
# test_data = test_data.withColumn("price", test_data["price"].cast(DoubleType()))
# test_data.select("*").show()
# fitted_pipeline_model = PipelineModel.load("./docs/pipeline_model_backup")
# prepared_data = fitted_pipeline_model.transform(test_data)

# final_assembler = pre_processer.get_final_assembler()
# prepared_test_data = final_assembler.transform(prepared_data)
# prepared_test_data = prepared_test_data.select("features","price")
# lr = LinearRegression(featuresCol="features",labelCol="price")
# lrModel = lr.fit(prepared_test_data)

# predictions = lrModel.transform(prepared_test_data)

# evaluator = RegressionEvaluator(labelCol="price", predictionCol='prediction', metricName='r2')
# r2 = evaluator.evaluate(predictions)
# print("R2 score:", r2)
# random_forest = RandomForestRegressor(featuresCol="features",labelCol="price")
# random_forest_model = random_forest.fit(prepared_test_data)
# predictions = random_forest_model.transform(prepared_test_data)
# evaluator = RegressionEvaluator(labelCol="price", predictionCol='prediction', metricName='r2')
# r2 = evaluator.evaluate(predictions)
# print("R2 score:", r2)

+------+---------+-------+-----------+--------------+-----------+-------------+----------------+--------+--------+---------+-------+
|    id|  airline| flight|source_city|departure_time|      stops| arrival_time|destination_city|   class|duration|days_left|  price|
+------+---------+-------+-----------+--------------+-----------+-------------+----------------+--------+--------+---------+-------+
| 43836|Air_India|AI-9657|     Mumbai|       Evening|        one|      Evening|           Delhi| Economy|   21.67|        6|12978.0|
|251938|Air_India| AI-808|  Bangalore|         Night|        one|    Afternoon|          Mumbai|Business|   15.83|       39|54684.0|
| 13548|Air_India| AI-762|      Delhi|         Night|        one|   Late_Night|       Bangalore| Economy|    5.58|       19| 6367.0|
|105927|  Vistara| UK-858|  Bangalore| Early_Morning|        one|      Evening|         Kolkata| Economy|   10.17|       24| 7683.0|
|115887|  Vistara| UK-854|  Bangalore|       Evening|        one|    

In [22]:

# Stop Spark session
# spark.stop()