In [15]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

spark = SparkSession.builder.master("local[*]").getOrCreate()

In [16]:
csv = spark.read.csv('./data/my_data.csv', inferSchema=True, header=True)
csv.show(10)

+--------------------+----------+----------+---------------+------------------+-------------+--------------+-----------+--------------+------------+---------+--------+---------+--------------+-------------------+---------------------------------+------------------------+-------------------------------+----------------------+--------------------------+----------------------------+--------------------+-------------------+----------------------------+-------------------------+----------------+-----------------+
|               legId|searchDate|flightDate|startingAirport|destinationAirport|fareBasisCode|travelDuration|elapsedDays|isBasicEconomy|isRefundable|isNonStop|baseFare|totalFare|seatsRemaining|totalTravelDistance|segmentsDepartureTimeEpochSeconds|segmentsDepartureTimeRaw|segmentsArrivalTimeEpochSeconds|segmentsArrivalTimeRaw|segmentsArrivalAirportCode|segmentsDepartureAirportCode| segmentsAirlineName|segmentsAirlineCode|segmentsEquipmentDescription|segmentsDurationInSeconds|segments

In [36]:
data = csv.select("segmentsAirlineCode", "elapsedDays", "isBasicEconomy", "baseFare", "totalFare", "totalTravelDistance", ((col("seatsRemaining") > 0).cast("Int").alias("label")))
data.show(20)

+-------------------+-----------+--------------+--------+---------+-------------------+-----+
|segmentsAirlineCode|elapsedDays|isBasicEconomy|baseFare|totalFare|totalTravelDistance|label|
+-------------------+-----------+--------------+--------+---------+-------------------+-----+
|                 DL|          0|         false|  217.67|    248.6|                947|    1|
|                 DL|          0|         false|  217.67|    248.6|                947|    1|
|                 DL|          0|         false|  217.67|    248.6|                947|    1|
|                 DL|          0|         false|  217.67|    248.6|                947|    1|
|                 DL|          0|         false|  217.67|    248.6|                947|    1|
|                 B6|          0|         false|  217.67|    248.6|                947|    1|
|             AA||AA|          0|         false|  213.02|    251.1|                956|    1|
|             AA||AA|          0|         false|  213.02|   

In [37]:
data.printSchema()

root
 |-- segmentsAirlineCode: string (nullable = true)
 |-- elapsedDays: integer (nullable = true)
 |-- isBasicEconomy: boolean (nullable = true)
 |-- baseFare: double (nullable = true)
 |-- totalFare: double (nullable = true)
 |-- totalTravelDistance: integer (nullable = true)
 |-- label: integer (nullable = true)



In [38]:
data = data.withColumn("baseFare", data["baseFare"].cast(IntegerType()))
data = data.withColumn("totalFare", data["totalFare"].cast(IntegerType()))
data = data.withColumn("elapsedDays", data["elapsedDays"].cast(IntegerType()))
data.printSchema()                      

root
 |-- segmentsAirlineCode: string (nullable = true)
 |-- elapsedDays: integer (nullable = true)
 |-- isBasicEconomy: boolean (nullable = true)
 |-- baseFare: integer (nullable = true)
 |-- totalFare: integer (nullable = true)
 |-- totalTravelDistance: integer (nullable = true)
 |-- label: integer (nullable = true)



In [42]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 66  Testing Rows: 33


In [52]:
strIdx = StringIndexer(inputCol = "segmentsAirlineCode", outputCol = "segmentsAirlineCodeIdx")
catVect = VectorAssembler(inputCols = ["segmentsAirlineCodeIdx", "elapsedDays"], outputCol="catFeatures")
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), outputCol = "idxCatFeatures")
numVect = VectorAssembler(inputCols = ["seatsRemaining"], outputCol="numFeatures")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normFeatures")
featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"], outputCol="features")
lr = LogisticRegression(labelCol="label",featuresCol="features",maxIter=10,regParam=0.3)
pipeline = Pipeline()

In [53]:
piplineModel = pipeline.fit(train)

KeyError: Param(parent='Pipeline_a081243ebe9e', name='stages', doc='a list of pipeline stages')