# Introduction

The aim is to predict which country a new user's first booking destination will be. In the data available, we are given a list of users along with their demographics, web session records, and some summary statistics. All the users in this dataset are from the USA.

There are 12 possible outcomes of the destination country: 'US', 'FR', 'CA', 'GB', 'ES', 'IT', 'PT', 'NL','DE', 'AU', 'NDF' (no destination found), and 'other'. Note that 'NDF' is different from 'other' because 'other' means there was a booking, but is to a country not included in the list, while 'NDF' means there wasn't a booking.

The training and test sets we got from Kaggle are splitted by date. In the test set, we will predict all the new users with first activities after 7/1/2014. 

The datasets are:

1- train_users.csv - the training set of users <br>
2- test_users.csv - the test set of users

* id: user id
* date_account_created: the date of account creation
* timestamp_first_active: timestamp of the first activity, note that it can be earlier than date_account_created or date_first_booking because a user can search before signing up
* date_first_booking: date of first booking
* gender
* age
* signup_method
* signup_flow: the page a user came to signup up from
* language: international language preference
* affiliate_channel: what kind of paid marketing
* affiliate_provider: where the marketing is e.g. google, craigslist, other
* first_affiliate_tracked: whats the first marketing the user interacted with before the signing up
* signup_app
* first_device_type
* first_browser
* country_destination: this is the target variable you are to predict

We will go through the following steps:
* Fetch Airbnb data
* Merge the test_users.csv and train_users.csv
* Clean the data
* Display the data
* Explore the new Dataframe
* Explore other data sets
* Create Assumptions, develop hypothesis, and create relationship
* Build Predictive Model
* Evaluate Model and calculated deviation of the predicted output

We will also show the different sub-steps that can be taken to reach the presented solution.

As we begin the study, we first need to ensure that the datasets are clean and that all irrelevant parts are removed or filled correctly.

In [2]:
%matplotlib inline
import numpy as np # library for working with Arrays
import pandas as pd # makes working with data tables easier
import matplotlib.pyplot as plt # module for plotting
import seaborn as sns #
from pyspark.sql.functions import * 


from sklearn import ensemble
from sklearn import linear_model
from sklearn.grid_search import GridSearchCV
from sklearn import preprocessing
from sklearn.cross_validation import train_test_split
import sklearn.metrics as metrics
from collections import Counter


In [3]:
df_train_users = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .option('inferSchema', 'true')\
  .load("/FileStore/tables/train_users_ru.csv")
  
display(df_train_users)

In [4]:
df_test_users = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .option('inferSchema', 'true')\
  .load("/FileStore/tables/test_users_ru.csv")
  
display(df_test_users)

We checked both train_users and test_users to understand the content of the data.
The results of the analysis are as followed:
1. The attributes of both data sets are similar
2. There is 1 additional column that is different in the train_users.csv data set titled country_destination and add new column to test dataframe
3. There are many irrelevant parts that needs to be resolved in the data sets

In [6]:
new_test = df_test_users.withColumn("country_destination",  lit(None))

# Merging
From the analysis we can deduce that the test_users is an extension from the train_users dataset. Therefore we can merge the datasets to streamline the data cleaning phase.

In [8]:
users = df_train_users.union(new_test)
display(users)

In [9]:
users.count()

In [10]:
# Cleaning Data

There are many null values for some columns. Now, we have applied values to represent to represent them. This action was completed so that we may be able to develop models on the complete set of users to easily identify patterns as we proceed with the analysis.

The values added to the Null values are as followed:
* Age >> 0
* dountry_destination >> NDF
* date_first_booking >> -unknown-
* first_affiliate_tracked >> untracked

In [11]:
 cleanedTrainingData = users.na.fill({'age': 0, 'date_first_booking': '-unknown-', 'country_destination' :'NDF', 'first_affiliate_tracked' : 'untracked'})
 display(cleanedTrainingData)

# Hypothesis

We believe that these unseen characteristics(age, gender, timestamp first active, signup flow, affiliate provider, date account create and date of first booking) have a direct influence on the country of travel. We will illustrate these characteristics in various diagrams and then we will perform a cross analysis to explore the possible combination of influences to see if the results effect the decision of country of travel.

In [13]:
sqlContext.registerDataFrameAsTable(cleanedTrainingData,"users_table")
newVal = sqlContext.sql("select age from users_table").collect()
fig = sns.distplot(newVal)
plt.xlabel('Age')
plt.ylabel('Distribution Percentage')

display()

This distribution plot illustrates the percetange of the total population of users.
<br>
<br>
Note: It can be observed that there is still inconsistencies with our data due to the age range extending to 1000+. So we will assume that life expectancy ends at the age of 90 and any values above 90 will be converted to 0.

In [15]:
new_cleaned = cleanedTrainingData.withColumn("age", when(col("age")>90, 0).otherwise(col("age")) )

display(new_cleaned)

Remember that 0 indicates all NULL entries and entries that exceed the age of 90. However, because of total count of the zeros and the frequency at which users input their age for their first booking we gain further understanding that age does not influence the decision to choose a specific destination.

In [17]:
sqlContext.registerDataFrameAsTable(new_cleaned,"users_table")
newVal = sqlContext.sql("select age from users_table").collect()
sns.distplot(newVal, kde = False)
plt.xlabel('Age')
plt.ylabel('Population Size')
display()

In [18]:
gender_df = new_cleaned.groupby("gender").count()
display(gender_df)

This pie chart illustrates the distribution of users between female, male, other, and unknown. "Other" indicates users that do not want to identify themselves as male or female and Unknown indicates users that have not enter any value for the gender category.

In [20]:
# Updating the users dataframe column that contains gender with values.
# Unknown: 0
# Male: 1
# Female: 2
def gender_id(gender):
    if gender == "MALE":
        return 1
    if gender == "FEMALE":
        return 2
    return 0

func_udf = udf(gender_id)
next_clean = new_cleaned.withColumn('gender',func_udf(new_cleaned['gender']))
display(next_clean)

Note: From a quick observation we can see that gender in another characteristic that does not influence on choice of destination. But because unknown values encompasses 47% of the data results, we must all see if the unknown category changes the decision result.

# Creation of Decision Tree

In this section of code, we will begin to analyze the details contained with the .csv file and formulate a determination about the countries that a user will likely travel to next. It is evidenced through the above figure that users are primarily booking reservations in the US or not booking a reservation at all.

Of the possible destination countries of travel ('US', 'FR', 'CA', 'GB', 'ES', 'IT', 'PT', 'NL','DE', 'AU', 'NDF','other'), we can derive from the figure above that the difference between male and female travel destinations are almost evenly distributed.
Since there isn't much variance, we will add elements of age to determine if it has any significance on the output of the data

In [23]:
# Import 'tree' from scikit-learn library
from sklearn import tree

In [24]:
# Updating the users dataframe column that contains gender with values.
# NDF=0 US=1 FR=2 CA=3 GB=4 ES=5 
# IT=6 PT=7 NL=8 DE=9 AU=10 other=11

def get_country_id(country):
    if country == "NDF":
        return 0
    if country == "US":
        return 1
    if country == "FR":
        return 2
    if country == "CA":
        return 3
    if country == "GB":
        return 4
    if country == "ES":
        return 5
    if country == "IT":
        return 6
    if country == "PT":
        return 7
    if country == "NL":
        return 8
    if country == "DE":
        return 9
    if country == "AU":
        return 10
    if country == "other":
        return 11
      

func_udf = udf(get_country_id)
country_vals = new_cleaned.withColumn('country_destination',func_udf(new_cleaned['country_destination']))
display(country_vals)

#country_vals.printSchema()

In [25]:
sqlContext.registerDataFrameAsTable(country_vals,"users_table")
target = sqlContext.sql("select country_destination from users_table").collect()
features_one = sqlContext.sql("select timestamp_first_active,signup_flow,age from users_table").collect()

# Fit your first decision tree: my_tree_one
my_tree_one = tree.DecisionTreeClassifier()
my_tree_one = my_tree_one.fit(features_one, target)

print(my_tree_one.feature_importances_)
print(my_tree_one.score(features_one, target))

# Quick Note

One way to quickly see the result of your decision tree is to see the importance of the features that are included. This is done by requesting the .feature_importances_attribute of your tree object. Another quick metric is the mean accuracy that you can compute using the .score() function with features_one and target as arguments.

Look at the importance and score of the included features. The features are printed in the order that they are created and called.

# First Prediction

In [28]:
my_prediction = my_tree_one.predict(features_one)
df =  pd.DataFrame(my_prediction,columns = ["country_destination"])

A=sqlContext.sql("select id from users_table").collect()
sample = pd.DataFrame(A, columns=["id"]) 

frames = [sample,df]
result = pd.concat(frames,axis=1)

print result

In [29]:
spark_df = sqlContext.createDataFrame(result)
display(spark_df)

# Second Prediction

In [31]:
from sklearn.model_selection import train_test_split

# Train data is the dataframe that is continously modified while train dataframe is static
train, test = country_vals.randomSplit([0.8, 0.2], seed=12345)

# Create the target and features: target, features_one
sqlContext.registerDataFrameAsTable(test,"test_table")
sqlContext.registerDataFrameAsTable(train,"train_table")
target = sqlContext.sql("select country_destination from test_table").collect()
features_one = sqlContext.sql("select timestamp_first_active,signup_flow,age  from test_table").collect()

# Fit your first decision tree: my_tree_one
my_tree_one = tree.DecisionTreeClassifier()
my_tree_one = my_tree_one.fit(features_one, target)

#One way to quickly see the result of your decision tree is to see the importance of the features that are included. 
#This is done by requesting the .feature_importances_ attribute of your tree #object. Another quick metric is the mean 
#accuracy that you can compute using the .score() #function with features_one and target as arguments.

#Look at the importance and score of the included features...

#NOTE: The features are printed in the order that they are created and called

print(my_tree_one.feature_importances_)
print(my_tree_one.score(features_one, target))

# Extract the features from the test set: "timestamp_first_active", "signup_flow", "age".
train_features = sqlContext.sql("select timestamp_first_active,signup_flow,age  from train_table").collect()

# Make your prediction using the test set
my_prediction = my_tree_one.predict(train_features)

# Create a data frame with two columns: userid & country_destination. country_destination contains the predictions
df =  pd.DataFrame(my_prediction,columns = ["country_destination"])

A=sqlContext.sql("select id from test_table").collect()
sample = pd.DataFrame(A, columns=["id"]) 

frames = [sample,df]
result = pd.concat(frames,axis=1)
print result


In [32]:
#display(features_one)
sqlContext.registerDataFrameAsTable(country_vals,"users_table")
sample = sqlContext.sql("select country_destination,timestamp_first_active,signup_flow,age  from users_table")
sample2 = sqlContext.sql("select timestamp_first_active,signup_flow,age  from users_table")

#sample.printSchema()
sample = sample.select(col('timestamp_first_active'),col('signup_flow'),col('age'),col('country_destination'), sample.country_destination.cast('integer').alias("new_country"))

display(sample2)

In [33]:
display(country_vals)

In [34]:

from pyspark.ml.feature import VectorAssembler, VectorIndexer
featuresCols = sample.columns
featuresCols.remove('country_destination')
featuresCols.remove('new_country')

train, test = sample.randomSplit([0.7, 0.3])

# This concatenates all feature columns into a single feature vector in a new column "rawFeatures".
vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="rawFeatures")
# This identifies categorical features and indexes them.
vectorIndexer = VectorIndexer(inputCol="rawFeatures", outputCol="features", maxCategories=4)

In [35]:
from pyspark.ml.regression import GBTRegressor
# Takes the "features" column and learns to predict "cnt"
gbt = GBTRegressor(labelCol="new_country")

In [36]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
# Define a grid of hyperparameters to test:
#  - maxDepth: max depth of each decision tree in the GBT ensemble
#  - maxIter: iterations, i.e., number of trees in each GBT ensemble
# In this example notebook, we keep these values small.  In practice, to get the highest accuracy, you would likely want to try deeper trees (10 or higher) and more trees in the ensemble (>100).
paramGrid = ParamGridBuilder()\
  .addGrid(gbt.maxDepth, [2, 5])\
  .addGrid(gbt.maxIter, [10, 100])\
  .build()
# We define an evaluation metric.  This tells CrossValidator how well we are doing by comparing the true labels with predictions.
evaluator = RegressionEvaluator(metricName="rmse", labelCol=gbt.getLabelCol(), predictionCol=gbt.getPredictionCol())
# Declare the CrossValidator, which runs model tuning for us.
cv = CrossValidator(estimator=gbt, evaluator=evaluator, estimatorParamMaps=paramGrid)

In [37]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, cv])

In [38]:
pipelineModel = pipeline.fit(train)

In [39]:
predictions = pipelineModel.transform(test)

In [40]:
display(predictions.select("new_country", "prediction", *featuresCols))

In [41]:
rmse = evaluator.evaluate(predictions) 
print "Deviation of our test set: %g" % rmse