# Installing necessary libraries

In [1]:
from google.colab import drive
drive.mount("/content/drive",force_remount=False)
root = "/content/drive/My Drive/Bigdata/project3/"

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar -xvf spark-2.4.5-bin-hadoop2.7.tgz
!pip install findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"


spark-2.4.5-bin-hadoop2.7/
spark-2.4.5-bin-hadoop2.7/licenses/
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-jtransforms.html
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-zstd.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-zstd-jni.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-xmlenc.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-vis.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-spire.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-sorttable.js.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-slf4j.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-scopt.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-scala.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-sbt-launch-lib.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-respond.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-reflectasm.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-pyrolite.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-py4j.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-protobuf.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-pmml-model

In [3]:
import findspark
findspark.init()
import sys
import pandas as pd
import numpy as np
import json
from pyspark import SparkContext
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Airbnb") \
    .getOrCreate()
sc= spark.sparkContext
sc


In [0]:
train_users = spark.read.format("csv")\
                  .option("header", "True")\
                  .load(root + "train_users_2.csv")

In [5]:
train_users.show()

+----------+--------------------+----------------------+------------------+---------+----+-------------+-----------+--------+-----------------+------------------+-----------------------+----------+-----------------+-------------+-------------------+
|        id|date_account_created|timestamp_first_active|date_first_booking|   gender| age|signup_method|signup_flow|language|affiliate_channel|affiliate_provider|first_affiliate_tracked|signup_app|first_device_type|first_browser|country_destination|
+----------+--------------------+----------------------+------------------+---------+----+-------------+-----------+--------+-----------------+------------------+-----------------------+----------+-----------------+-------------+-------------------+
|gxn3p5htnn|          2010-06-28|        20090319043255|              null|-unknown-|null|     facebook|          0|      en|           direct|            direct|              untracked|       Web|      Mac Desktop|       Chrome|                NDF|


In [0]:
train_users = train_users.drop('id','date_account_created','timestamp_first_active','date_first_booking')

# casting age and sign up flow to integer and float values

In [0]:
# As we apply casting we can rearange columns

from pyspark.sql.functions import col

dataset = train_users.select(col('age').cast('float'),
                         col('signup_flow').cast('int'),
                         col('gender'),
                         col('signup_method'),
                         col('language'),
                         col('affiliate_channel'),
                         col('affiliate_provider'),
                         col('first_affiliate_tracked'),
                         col('signup_app'),
                         col('first_device_type'),
                         col('first_browser'),
                         col("country_destination")
                         )

# dropping unnecessary columns

In [8]:
dataset = dataset.drop('first_affiliate_tracked')
dataset.printSchema()

root
 |-- age: float (nullable = true)
 |-- signup_flow: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- signup_method: string (nullable = true)
 |-- language: string (nullable = true)
 |-- affiliate_channel: string (nullable = true)
 |-- affiliate_provider: string (nullable = true)
 |-- signup_app: string (nullable = true)
 |-- first_device_type: string (nullable = true)
 |-- first_browser: string (nullable = true)
 |-- country_destination: string (nullable = true)



# Counting nan values in our data set

In [9]:
import pyspark.sql.functions as F

nan_values_each_column = dataset.select([F.count(F.when(F.isnan(i) | \
                                   F.col(i).contains('NA') | \
                                   F.col(i).contains('NULL') | \
                                   F.col(i).isNull(), i)).alias(i) \
                    for i in dataset.columns])
nan_values_each_column.show()

+-----+-----------+------+-------------+--------+-----------------+------------------+----------+-----------------+-------------+-------------------+
|  age|signup_flow|gender|signup_method|language|affiliate_channel|affiliate_provider|signup_app|first_device_type|first_browser|country_destination|
+-----+-----------+------+-------------+--------+-----------------+------------------+----------+-----------------+-------------+-------------------+
|87990|          0|     0|            0|       0|                0|                 0|         0|                0|            0|                  0|
+-----+-----------+------+-------------+--------+-----------------+------------------+----------+-----------------+-------------+-------------------+



# Determining categorical and numerical columns

In [0]:
categoricalColumns = dataset.columns[2:-1]

In [11]:
categoricalColumns

['gender',
 'signup_method',
 'language',
 'affiliate_channel',
 'affiliate_provider',
 'signup_app',
 'first_device_type',
 'first_browser']

In [0]:
numericCols =['age','signup_flow']
cols = dataset.columns

# Filling nan values in age with age 50 
actually the mode is mean but i found that as we get the age higher it improve the accuracy of the model so it is logical that older customers will not be interested to add their ages.

In [0]:
indexed_df = dataset.fillna({'age':50.0})

# Applying one hot encoder string indexer and vector assembler

In [0]:
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
stages = []
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
label_stringIdx = StringIndexer(inputCol = 'country_destination', outputCol = 'label')
stages += [label_stringIdx]
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [15]:
stages

[StringIndexer_baed7b54bb87,
 OneHotEncoderEstimator_d089adb1ef39,
 StringIndexer_ba503394f711,
 OneHotEncoderEstimator_b026d0df3065,
 StringIndexer_1849cd307e3b,
 OneHotEncoderEstimator_f50d2a63df41,
 StringIndexer_3e468eb3f5c5,
 OneHotEncoderEstimator_1902ffcba10f,
 StringIndexer_4f0358b5ea18,
 OneHotEncoderEstimator_a33c104f0881,
 StringIndexer_fa4d4789243a,
 OneHotEncoderEstimator_f5673453291b,
 StringIndexer_d8b4e1f556af,
 OneHotEncoderEstimator_ca2826212605,
 StringIndexer_55e60947ce39,
 OneHotEncoderEstimator_9f59253cce5b,
 StringIndexer_a61f1a8a588e,
 VectorAssembler_94b7e056eac1]

# Pipline through all of the above mentione stages

In [16]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(indexed_df)
df = pipelineModel.transform(indexed_df)
selectedCols = ['label', 'features'] + cols
df = df.select(selectedCols)
df.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- age: float (nullable = false)
 |-- signup_flow: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- signup_method: string (nullable = true)
 |-- language: string (nullable = true)
 |-- affiliate_channel: string (nullable = true)
 |-- affiliate_provider: string (nullable = true)
 |-- signup_app: string (nullable = true)
 |-- first_device_type: string (nullable = true)
 |-- first_browser: string (nullable = true)
 |-- country_destination: string (nullable = true)



# Splitting the data

In [17]:
train, test = df.randomSplit([0.7, 0.3], seed = 2018)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

Training Dataset Count: 149274
Test Dataset Count: 64177


# Applying logisitic Regression 

In [0]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
lrModel = lr.fit(train)
trainingSummary = lrModel.summary


In [0]:
predict_train=lrModel.transform(train)
predict_test=lrModel.transform(test)


In [21]:
predict_train.show()

+-----+--------------------+----+-----------+---------+-------------+--------+-----------------+------------------+----------+-----------------+-------------+-------------------+--------------------+--------------------+----------+
|label|            features| age|signup_flow|   gender|signup_method|language|affiliate_channel|affiliate_provider|signup_app|first_device_type|first_browser|country_destination|       rawPrediction|         probability|prediction|
+-----+--------------------+----+-----------+---------+-------------+--------+-----------------+------------------+----------+-----------------+-------------+-------------------+--------------------+--------------------+----------+
|  0.0|(117,[0,3,5,29,36...|17.0|          0|-unknown-|        basic|      en|           direct|            direct|       Web|      Mac Desktop|       Chrome|                NDF|[3.70726041571593...|[0.59809587516187...|       0.0|
|  0.0|(117,[0,3,5,29,36...|18.0|          0|-unknown-|        basic|   

In [22]:
predict_test.show()

+-----+--------------------+----+-----------+---------+-------------+--------+-----------------+------------------+----------+-----------------+-------------+-------------------+--------------------+--------------------+----------+
|label|            features| age|signup_flow|   gender|signup_method|language|affiliate_channel|affiliate_provider|signup_app|first_device_type|first_browser|country_destination|       rawPrediction|         probability|prediction|
+-----+--------------------+----+-----------+---------+-------------+--------+-----------------+------------------+----------+-----------------+-------------+-------------------+--------------------+--------------------+----------+
|  0.0|(117,[0,3,5,29,36...|19.0|          0|-unknown-|        basic|      en|           direct|            direct|       Web|      Mac Desktop|       Chrome|                NDF|[3.70767500710513...|[0.59825724909184...|       0.0|
|  0.0|(117,[0,3,5,29,36...|19.0|          0|-unknown-|        basic|   

# Evaluate the model (getting the accuracy)

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy_test = evaluator.evaluate(predict_test)
accuracy_train = evaluator.evaluate(predict_train)

In [24]:
print("Accuracy in test is ",accuracy_test )
print("Test Error Test = %g" % (1.0 - accuracy_test))


Accuracy in test is  0.6021939324056905
Test Error Test = 0.397806


In [25]:
print("Accuracy in train is ",accuracy_train )
print("Test Error Test = %g" % (1.0 - accuracy_train))


Accuracy in train is  0.6029449200798531
Test Error Test = 0.397055
