# Airbnb Models

In [1]:
# Imports
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
import pyspark.sql.functions as fn

In [2]:
%matplotlib inline
import matplotlib.pyplot as plt

In [3]:
# Start spark session
spark = SparkSession.builder.getOrCreate()

### Reading in Parquet Files
*Note: Don't need to read in a schema because this information is stored in the parquet file and applied when loaded*

In [4]:
# Read data in json format
train = spark.read.option("header",True).parquet("./data_preprocessed/train_data.parquet")
test = spark.read.option("header",True).parquet("./data_preprocessed/test_data.parquet")

### Creating One-Hot-Encoded Matricies for the Categorical Features

In [5]:
# One hot encoding for gender, signup method, language, signup app
from pyspark.ml.feature import OneHotEncoder, StringIndexer

# Gender
gender_idx = StringIndexer(inputCol="gender", outputCol="gender_idx")
gender_ohe = OneHotEncoder(inputCol="gender_idx", outputCol="gender_vec")

# Signup method
signup_method_idx = StringIndexer(inputCol="signup_method", outputCol="signup_method_idx")
signup_method_ohe = OneHotEncoder(inputCol="signup_method_idx", outputCol="signup_method_vec")

# Language
language_idx = StringIndexer(inputCol="language", outputCol="language_idx")
language_ohe = OneHotEncoder(inputCol="language_idx", outputCol="language_vec")

# Signup app
signup_app_idx = StringIndexer(inputCol="signup_app", outputCol="signup_app_idx")
signup_app_ohe = OneHotEncoder(inputCol="signup_app_idx", outputCol="signup_app_vec")

# First Device Type - Perhaps people browsing on their computer may be more serious about buying vs casual browsing on phone
device_idx = StringIndexer(inputCol="first_device_type", outputCol="first_device_type_idx")
device_ohe = OneHotEncoder(inputCol="first_device_type_idx", outputCol="first_device_type_vec")

### Imputing Age and Online Missing Data

In [6]:
# Impute missing values with median
from pyspark.ml.feature import Imputer

# Age
imputer_age = Imputer(
    inputCol='age_new', 
    outputCol='age_new_imputed'
    ).setStrategy("median")

# Total time elapsed
imputer_total_elapsed = Imputer(
    inputCol='total_time_elapsed', 
    outputCol='total_time_elapsed_imputed'
    ).setStrategy("median")

# # Total actions
imputer_total_num_actions = Imputer(
    inputCol='total_num_actions', 
    outputCol='total_num_actions_imputed'
    ).setStrategy("median")

### Selecting and Scaling Features

In [7]:
# Select features
features = [
    "age_new_imputed", 
    "age_missing",
    "gender_vec", 
    "signup_method_vec", 
    "language_vec", 
    "signup_app_vec",
    "total_time_elapsed_imputed", 
    'total_num_actions_imputed', 
    "first_device_type_vec"]

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=features,
                            outputCol="fts")

In [8]:
# Scale all features
from pyspark.ml.feature import MaxAbsScaler
# Using maxabsscaler because some OHE features are sparse
scaler = MaxAbsScaler(inputCol="fts", outputCol="features")

### Building First Pipeline for Lasso Regression

In [9]:
# Setup logistic regression model
max_iterations = 10

from pyspark.ml.classification import LogisticRegression

lasso = LogisticRegression(maxIter=max_iterations,
                        featuresCol = 'features',
                        labelCol = 'label',
                        elasticNetParam=1
                       )

# Configure pipeline
from pyspark.ml import Pipeline
pipeline_lasso = Pipeline(stages=[
    gender_idx, 
    gender_ohe, 
    signup_method_idx,
    signup_method_ohe,
    signup_app_idx, 
    signup_app_ohe,
    device_idx, 
    device_ohe,
    language_idx, 
    language_ohe,
    imputer_age, 
    imputer_total_elapsed, 
    imputer_total_num_actions, 
    assembler, 
    scaler, 
    lasso])

import time
t0 = time.time()
model_lasso = pipeline_lasso.fit(train)
print("train time:", time.time() - t0)

train time: 12.152521133422852


### Ridge Regression

In [10]:
# Setup logistic regression model
max_iterations = 10

from pyspark.ml.classification import LogisticRegression

ridge = LogisticRegression(maxIter=max_iterations,
                        featuresCol = 'features',
                        labelCol = 'label',
                        elasticNetParam=0
                       )

# Configure pipeline
from pyspark.ml import Pipeline
pipeline_ridge = Pipeline(stages=[
    gender_idx, 
    gender_ohe, 
    signup_method_idx,
    signup_method_ohe,
    signup_app_idx, 
    signup_app_ohe,
    device_idx, 
    device_ohe,
    language_idx, 
    language_ohe,
    imputer_age, 
    imputer_total_elapsed, 
    imputer_total_num_actions, 
    assembler, 
    scaler, 
    lasso])

t0 = time.time()
model_ridge = pipeline_ridge.fit(train)
print("train time:", time.time() - t0)

train time: 5.351033687591553


### Random Forest Classifier

In [11]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(
    featuresCol = 'features',
    labelCol = 'label'
)

In [12]:
pipeline_rf = Pipeline(stages=[
    gender_idx, 
    gender_ohe, 
    signup_method_idx,
    signup_method_ohe,
    signup_app_idx, 
    signup_app_ohe,
    device_idx, 
    device_ohe,
    language_idx, 
    language_ohe,
    imputer_age, 
    imputer_total_elapsed, 
    imputer_total_num_actions, 
    assembler, 
    scaler, 
    rf])

# Set up the parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [5, 20, 50]) \
    .addGrid(rf.maxDepth, [3,5,6]) \
    .build()

print('-'*30)
print('paramGrid', paramGrid, '\n')
print('len(paramGrid): {}'.format(len(paramGrid)))
print('-'*30)

crossval = CrossValidator(estimator=pipeline_rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=3,
                          seed=1)

t0 = time.time()
cv_model_rf = crossval.fit(train)
print("train time:", time.time() - t0)
print('-'*30)
print(cv_model_rf.avgMetrics)

------------------------------
paramGrid [{Param(parent='RandomForestClassifier_7872cbbe10d4', name='numTrees', doc='Number of trees to train (>= 1).'): 5, Param(parent='RandomForestClassifier_7872cbbe10d4', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 3}, {Param(parent='RandomForestClassifier_7872cbbe10d4', name='numTrees', doc='Number of trees to train (>= 1).'): 5, Param(parent='RandomForestClassifier_7872cbbe10d4', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 5}, {Param(parent='RandomForestClassifier_7872cbbe10d4', name='numTrees', doc='Number of trees to train (>= 1).'): 5, Param(parent='RandomForestClassifier_7872cbbe10d4', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 6}, {Param(parent='RandomForestClassif

### Naive Bayes Classifier

In [13]:
from pyspark.ml.classification import NaiveBayes

nb = NaiveBayes(
    featuresCol = 'features',
    labelCol = 'label'
)

pipeline_bayes = Pipeline(stages=[
    gender_idx, 
    gender_ohe, 
    signup_method_idx,
    signup_method_ohe,
    signup_app_idx, 
    signup_app_ohe,
    device_idx, 
    device_ohe,
    language_idx, 
    language_ohe,
    imputer_age, 
    imputer_total_elapsed, 
    imputer_total_num_actions, 
    assembler, 
    scaler, 
    nb
])

# Set up the parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(nb.smoothing, [0, 0.5, 1, 5]) \
    .build()

print('-'*30)
print('paramGrid', paramGrid, '\n')
print('len(paramGrid): {}'.format(len(paramGrid)))
print('-'*30)

crossval = CrossValidator(estimator=pipeline_bayes,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=3,
                          seed=1)

t0 = time.time()
cv_model_bayes = crossval.fit(train)
print("train time:", time.time() - t0)
print('-'*30)
print(cv_model_bayes.avgMetrics)

------------------------------
paramGrid [{Param(parent='NaiveBayes_9d6189695c22', name='smoothing', doc='The smoothing parameter, should be >= 0, default is 1.0'): 0.0}, {Param(parent='NaiveBayes_9d6189695c22', name='smoothing', doc='The smoothing parameter, should be >= 0, default is 1.0'): 0.5}, {Param(parent='NaiveBayes_9d6189695c22', name='smoothing', doc='The smoothing parameter, should be >= 0, default is 1.0'): 1.0}, {Param(parent='NaiveBayes_9d6189695c22', name='smoothing', doc='The smoothing parameter, should be >= 0, default is 1.0'): 5.0}] 

len(paramGrid): 4
------------------------------
train time: 36.54611682891846
------------------------------
[0.6742347141291045, 0.6742311507788784, 0.6742266566009952, 0.6742417471759576]


### Gradient Boosted Tree Classifier

In [14]:
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(
    featuresCol = 'features',
    labelCol = 'label',
    maxIter = 5
)

pipeline_gbt = Pipeline(stages=[
    gender_idx, 
    gender_ohe, 
    signup_method_idx,
    signup_method_ohe,
    signup_app_idx, 
    signup_app_ohe,
    device_idx, 
    device_ohe,
    language_idx, 
    language_ohe,
    imputer_age, 
    imputer_total_elapsed, 
    imputer_total_num_actions, 
    assembler, 
    scaler, 
    gbt
])

# Set up the parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [3, 5, 6]) \
    .addGrid(gbt.minWeightFractionPerNode, [0, 0.01, 0.1]) \
    .build()

print('-'*30)
print('paramGrid', paramGrid, '\n')
print('len(paramGrid): {}'.format(len(paramGrid)))
print('-'*30)

crossval = CrossValidator(estimator=pipeline_gbt,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=3,
                          seed=1)

t0 = time.time()
cv_model_gbt = crossval.fit(train)
print("train time:", time.time() - t0)
print('-'*30)
print(cv_model_gbt.avgMetrics)

------------------------------
paramGrid [{Param(parent='GBTClassifier_3fbadd496b77', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 3, Param(parent='GBTClassifier_3fbadd496b77', name='minWeightFractionPerNode', doc='Minimum fraction of the weighted sample count that each child must have after split. If a split causes the fraction of the total weight in the left or right child to be less than minWeightFractionPerNode, the split will be discarded as invalid. Should be in interval [0.0, 0.5).'): 0.0}, {Param(parent='GBTClassifier_3fbadd496b77', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 3, Param(parent='GBTClassifier_3fbadd496b77', name='minWeightFractionPerNode', doc='Minimum fraction of the weighted sample count that each child must have after split. If a split causes the fraction of the total weight in

### Saving Models

In [15]:
model_ridge.save('./models/ridge.model')
model_lasso.save('./models/lasso.model')
cv_model_bayes.bestModel.save('./models/naive_bayes.model')
cv_model_rf.bestModel.save('./models/random_forest.model')
cv_model_gbt.bestModel.save('./models/gbt.model')