In [1]:
# https://stackoverflow.com/questions/42991198/how-do-i-read-a-parquet-in-pyspark-written-from-spark
from pyspark.sql import SparkSession
# initialise sparkContext
spark = SparkSession.builder \
    .master('local') \
    .appName('myAppName') \
    .config('spark.executor.memory', '5gb') \
    .config("spark.cores.max", "6") \
    .getOrCreate()

sc = spark.sparkContext

# using SQLContext to read parquet file
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [2]:
import pyspark
import pyarrow
import pandas as pd

In [3]:
dataset= spark.read.load("H1full.csv", format="csv", header="true", inferSchema=True)
dataset
cols = dataset.columns

In [4]:
type(dataset)

pyspark.sql.dataframe.DataFrame

In [5]:
dataset.printSchema()

root
 |-- IsCanceled: integer (nullable = true)
 |-- LeadTime: integer (nullable = true)
 |-- ArrivalDateYear: integer (nullable = true)
 |-- ArrivalDateMonth: string (nullable = true)
 |-- ArrivalDateWeekNumber: integer (nullable = true)
 |-- ArrivalDateDayOfMonth: integer (nullable = true)
 |-- StaysInWeekendNights: integer (nullable = true)
 |-- StaysInWeekNights: integer (nullable = true)
 |-- Adults: integer (nullable = true)
 |-- Children: integer (nullable = true)
 |-- Babies: integer (nullable = true)
 |-- Meal: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- MarketSegment: string (nullable = true)
 |-- DistributionChannel: string (nullable = true)
 |-- IsRepeatedGuest: double (nullable = true)
 |-- PreviousCancellations: integer (nullable = true)
 |-- PreviousBookingsNotCanceled: integer (nullable = true)
 |-- ReservedRoomType: string (nullable = true)
 |-- AssignedRoomType: string (nullable = true)
 |-- BookingChanges: integer (nullable = true)
 |-- Depos

In [6]:
# https://towardsdatascience.com/machine-learning-with-pyspark-and-mllib-solving-a-binary-classification-problem-96396065d2aa
# https://docs.databricks.com/applications/machine-learning/mllib/binary-classification-mllib-pipelines.html

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
categoricalColumns = ["Country", "MarketSegment"]
stages = [] # stages in our Pipeline
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    # encoder = OneHotEncoderEstimator(inputCol=categoricalCol + "Index", outputCol=categoricalCol + "classVec")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]

In [7]:
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol="IsCanceled", outputCol="label")
stages += [label_stringIdx]

In [8]:
# Transform all features into a vector using VectorAssembler
numericCols = ["LeadTime", "ADR"]
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [9]:
from pyspark.ml.classification import LogisticRegression
  
partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(dataset)
preppedDataDF = pipelineModel.transform(dataset)

In [10]:
# Fit model to prepped data
lrModel = LogisticRegression().fit(preppedDataDF)

# ROC for training data
display(lrModel, preppedDataDF, "ROC")

LogisticRegressionModel: uid = LogisticRegression_87a6375f3a0b, numClasses = 2, numFeatures = 132

DataFrame[IsCanceled: int, LeadTime: int, ArrivalDateYear: int, ArrivalDateMonth: string, ArrivalDateWeekNumber: int, ArrivalDateDayOfMonth: int, StaysInWeekendNights: int, StaysInWeekNights: int, Adults: int, Children: int, Babies: int, Meal: string, Country: string, MarketSegment: string, DistributionChannel: string, IsRepeatedGuest: double, PreviousCancellations: int, PreviousBookingsNotCanceled: int, ReservedRoomType: string, AssignedRoomType: string, BookingChanges: int, DepositType: string, Agent: string, Company: string, DaysInWaitingList: int, CustomerType: string, ADR: double, RequiredCarParkingSpaces: int, TotalOfSpecialRequests: int, ReservationStatus: string, ReservationStatusDate: timestamp, CountryIndex: double, CountryclassVec: vector, MarketSegmentIndex: double, MarketSegmentclassVec: vector, label: double, features: vector]

'ROC'

In [11]:
display(lrModel, preppedDataDF)

LogisticRegressionModel: uid = LogisticRegression_87a6375f3a0b, numClasses = 2, numFeatures = 132

DataFrame[IsCanceled: int, LeadTime: int, ArrivalDateYear: int, ArrivalDateMonth: string, ArrivalDateWeekNumber: int, ArrivalDateDayOfMonth: int, StaysInWeekendNights: int, StaysInWeekNights: int, Adults: int, Children: int, Babies: int, Meal: string, Country: string, MarketSegment: string, DistributionChannel: string, IsRepeatedGuest: double, PreviousCancellations: int, PreviousBookingsNotCanceled: int, ReservedRoomType: string, AssignedRoomType: string, BookingChanges: int, DepositType: string, Agent: string, Company: string, DaysInWaitingList: int, CustomerType: string, ADR: double, RequiredCarParkingSpaces: int, TotalOfSpecialRequests: int, ReservationStatus: string, ReservationStatusDate: timestamp, CountryIndex: double, CountryclassVec: vector, MarketSegmentIndex: double, MarketSegmentclassVec: vector, label: double, features: vector]

In [12]:
# Keep relevant columns
selectedcols = ["label", "features"] + cols
dataset = preppedDataDF.select(selectedcols)
display(dataset)

DataFrame[label: double, features: vector, IsCanceled: int, LeadTime: int, ArrivalDateYear: int, ArrivalDateMonth: string, ArrivalDateWeekNumber: int, ArrivalDateDayOfMonth: int, StaysInWeekendNights: int, StaysInWeekNights: int, Adults: int, Children: int, Babies: int, Meal: string, Country: string, MarketSegment: string, DistributionChannel: string, IsRepeatedGuest: double, PreviousCancellations: int, PreviousBookingsNotCanceled: int, ReservedRoomType: string, AssignedRoomType: string, BookingChanges: int, DepositType: string, Agent: string, Company: string, DaysInWaitingList: int, CustomerType: string, ADR: double, RequiredCarParkingSpaces: int, TotalOfSpecialRequests: int, ReservationStatus: string, ReservationStatusDate: timestamp]

In [13]:
### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100)
print(trainingData.count())
print(testData.count())

28056
12004


In [14]:
from pyspark.ml.classification import LogisticRegression

# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# Train model with Training Data
lrModel = lr.fit(trainingData)

In [15]:
predictions = lrModel.transform(testData)
predictions

DataFrame[label: double, features: vector, IsCanceled: int, LeadTime: int, ArrivalDateYear: int, ArrivalDateMonth: string, ArrivalDateWeekNumber: int, ArrivalDateDayOfMonth: int, StaysInWeekendNights: int, StaysInWeekNights: int, Adults: int, Children: int, Babies: int, Meal: string, Country: string, MarketSegment: string, DistributionChannel: string, IsRepeatedGuest: double, PreviousCancellations: int, PreviousBookingsNotCanceled: int, ReservedRoomType: string, AssignedRoomType: string, BookingChanges: int, DepositType: string, Agent: string, Company: string, DaysInWaitingList: int, CustomerType: string, ADR: double, RequiredCarParkingSpaces: int, TotalOfSpecialRequests: int, ReservationStatus: string, ReservationStatusDate: timestamp, rawPrediction: vector, probability: vector, prediction: double]

In [16]:
selected = predictions.select("label", "prediction", "probability")
display(selected)

DataFrame[label: double, prediction: double, probability: vector]

In [17]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

0.8071113029053274

In [18]:
evaluator.getMetricName()

'areaUnderROC'

In [19]:
print(lr.explainParams())

aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)
featuresCol: features column name. (default: features, current: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label, current: label)
lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)
lowerBoundsOnIntercepts: The lower bounds on intercepts if fitting under bound constrained optimization. The

In [20]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

In [21]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing

In [22]:
# Use test set to measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

In [23]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

0.8077333437748949

In [24]:
print('Model Intercept: ', cvModel.bestModel.intercept)

Model Intercept:  -1.7694283151982177


In [25]:
weights = cvModel.bestModel.coefficients
weights = [(float(w),) for w in weights]  # convert numpy type to float, and to tuple
weightsDF = sqlContext.createDataFrame(weights, ["Feature Weight"])
display(weightsDF)

DataFrame[Feature Weight: double]

In [26]:
# View best model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability")
display(selected)

DataFrame[label: double, prediction: double, probability: vector]

In [27]:
type(selected)

pyspark.sql.dataframe.DataFrame

In [28]:
selected.toPandas().to_csv('predictions.csv')