In [0]:
spark

In [0]:
import pyspark.sql.functions as F

# Libraries

In [0]:
import pyspark.sql.types as typ
from numpy import allclose
import math
from pyspark.ml.linalg import Vectors
# for model 
#---------------------------------------------------------------
from pyspark.ml.classification import GBTClassifier,RandomForestClassifier,OneVsRest
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# for feature engineering
#----------------------------------
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import IndexToString
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.util import MLUtils
from pyspark.ml.feature import StandardScaler

# Reading

In [0]:
dataset_labels = [
  ('id', typ.StringType()),
  ('date_account_created', typ.StringType()),
  ('timestamp_first_active', typ.LongType()),
  ('date_first_booking', typ.StringType()),
  ('gender', typ.StringType()),
  ('age', typ.DoubleType()),
  ('signup_method', typ.StringType()),
  ('signup_flow', typ.IntegerType()),
  ('language', typ.StringType()),
  ('affiliate_channel', typ.StringType()),
  ('affiliate_provider', typ.StringType()),
  ('first_affiliate_tracked', typ.StringType()),
  ('signup_app', typ.StringType()),
  ('first_device_type', typ.StringType()),
  ('first_browser', typ.StringType()),
  ('country_destination', typ.StringType()),
]

dataset_schema = typ.StructType([
  typ.StructField(e[0], e[1], True)
  for e in dataset_labels
])

In [0]:
dataset_file_location = "/FileStore/tables/Airbnb2/train_users_2.csv"
dataset = spark.read.csv(dataset_file_location,
                         header=True,
                         schema=dataset_schema)

In [0]:
dataset.rdd.getNumPartitions()

Out[262]: 6

In [0]:
dataset = dataset.repartition(8)

In [0]:
dataset.printSchema()

root
 |-- id: string (nullable = true)
 |-- date_account_created: string (nullable = true)
 |-- timestamp_first_active: long (nullable = true)
 |-- date_first_booking: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- signup_method: string (nullable = true)
 |-- signup_flow: integer (nullable = true)
 |-- language: string (nullable = true)
 |-- affiliate_channel: string (nullable = true)
 |-- affiliate_provider: string (nullable = true)
 |-- first_affiliate_tracked: string (nullable = true)
 |-- signup_app: string (nullable = true)
 |-- first_device_type: string (nullable = true)
 |-- first_browser: string (nullable = true)
 |-- country_destination: string (nullable = true)



In [0]:
#Check for duplicates
print('Count of rows: {0}'.format(dataset.count()))
print('Count of distinct rows: {0}'.format(dataset.distinct().count()))

Count of rows: 213451
Count of distinct rows: 213451


In [0]:
#show first 5 rows in data
dataset.toPandas().head(5)

Unnamed: 0,id,date_account_created,timestamp_first_active,date_first_booking,gender,age,signup_method,signup_flow,language,affiliate_channel,affiliate_provider,first_affiliate_tracked,signup_app,first_device_type,first_browser,country_destination
0,75sld3y378,2011-07-15,20110715182535,2011-07-15,FEMALE,62.0,basic,2,en,sem-non-brand,google,untracked,Web,iPad,Mobile Safari,US
1,4fr4zuwrpj,2012-04-03,20120403003314,,-unknown-,,basic,6,en,direct,other,omg,iOS,Mac Desktop,Safari,NDF
2,3gdx3eb0ce,2012-04-27,20120427183601,,MALE,26.0,facebook,0,en,sem-brand,google,untracked,Web,Other/Unknown,-unknown-,NDF
3,hoqd3tg750,2011-11-13,20111113174650,2011-11-14,-unknown-,,basic,0,en,direct,direct,untracked,Web,Windows Desktop,IE,ES
4,fxieheanyu,2011-10-24,20111024222124,,FEMALE,40.0,facebook,0,en,direct,direct,,Web,Other/Unknown,-unknown-,NDF


There are 16 features used to describe each user in the dataset:

- user id
- the date of account creation
- timestamp of the first activity, note that it can be earlier than
- date of first booking
- gender
- age
- signup_method
- the page a user came to signup up from
- international language preference
- what kind of paid marketing
- where the marketing is e.g. google, craigslist, other
- whats the first marketing the user interacted with before the signing up
- signup_app
- first_device_type
- first_browser

### Preprocessing
1. Replace every unknown values in action_ type and device_type with null to give it smentic meaning.
2. Replace all ages out of the range with the median.
3. Drop unused features like 'timestamp_first_active', 'date_first_booking'.
4. One hot encoding for the device_type and action_type to be better when used in modling phase.
5. Used left join because it give the best outcome.
6. Collecting all the numercial features, collecting all the Categorical features.

In [0]:
#function used to preprocessing the datasets.
def preprocessing(dataset, include_id = True):
    
    [median] = dataset.approxQuantile("age", [0.5], 0.05)
    
    
    # replace all ages out of the range with the median
    dataset  =  dataset.withColumn('age',\
                                   F.when((F.col('age') >= 1) & (F.col('age') <= 100), F.col('age'))\
                                    .otherwise(F.lit(median)))\
                        .withColumn('first_affiliate_tracked',\
                                   F.when(F.col('first_affiliate_tracked').isNull(), F.lit('untracked'))\
                                    .otherwise(F.col('first_affiliate_tracked')))
    #drop all the unused feature in the data
    columns_to_drop = ['date_account_created', 'timestamp_first_active', 'date_first_booking', 'user_id']
    if include_id :
        columns_to_drop.append('id')
    dataset = dataset.drop(*columns_to_drop)
    
    
    return dataset

In [0]:
def count_nulls(dataset):
    
    return dataset.select([
                F.count(F.when(F.isnan(col) | F.col(col).isNull(), col)).alias(col)
                for col,dtype in dataset.dtypes
            ]).take(1)

In [0]:
#check for nulls
count_nulls(dataset)

Out[130]: [Row(id=0, date_account_created=0, timestamp_first_active=0, date_first_booking=124543, gender=0, age=87990, signup_method=0, signup_flow=0, language=0, affiliate_channel=0, affiliate_provider=0, first_affiliate_tracked=6065, signup_app=0, first_device_type=0, first_browser=0, country_destination=0)]

In [0]:
#explor age feature.
dataset.selectExpr('min(age)','max(age)','mean(age)').show()

+--------+--------+-----------------+
|min(age)|max(age)|        mean(age)|
+--------+--------+-----------------+
|     1.0|  2014.0|49.66833517985669|
+--------+--------+-----------------+



In [0]:
# Function to join the 2 dataset togather
def join_with_session_df(dataset, session_df):
    # replace every unknown values in action_ type and device_type with null 
    session_df  =  session_df.withColumn('action_type',\
                                   F.when(F.col('action_type') == '-unknown-', F.lit(None))\
                                    .otherwise(F.col('action_type')))\
                             .withColumn('device_type',\
                                   F.when(F.col('device_type') == '-unknown-', F.lit(None))\
                                    .otherwise(F.col('device_type')))
    
    action_types = session_df.select(F.collect_set('action_type').alias('action_type')).first()['action_type']
    
    device_types = session_df.select(F.collect_set('device_type').alias('device_type')).first()['device_type']
    
    session_df = session_df\
                .groupby("user_id")\
                .agg(F.collect_set("device_type").alias('device_type'),\
                     F.sum("secs_elapsed").alias('secs_elapsed'),\
                     F.collect_set("action_type").alias('action_type'))
    
    # one hot encoding for the device_type and action_type to be better when used in modling phase.
    columns_distict_values_to_convert = {'device_type': device_types, 'action_type':action_types}
    columns_to_convert = ['action_type','device_type']
    for column in columns_to_convert:
        for value in columns_distict_values_to_convert[column]:
            session_df = session_df.withColumn(value, F.array_contains(column, value).cast("int"))
            
    session_df = session_df.drop(*columns_to_convert)
    
    #used left join to join the data because it give the best match between data.
    dataset = dataset.join(session_df,(dataset.id == session_df.user_id ),"left")
    
    dataset = dataset.na.fill(0,[value for column in columns_to_convert for value in columns_distict_values_to_convert[column]])
    
    [median_session] = dataset.approxQuantile("secs_elapsed", [0.5], 0.05)
    
    dataset = dataset.na.fill(median_session,['secs_elapsed'])
    
    return dataset
    

In [0]:
session_labels = [
  ('user_id', typ.StringType()),
  ('action', typ.StringType()),
  ('action_type', typ.StringType()),
  ('action_detail', typ.StringType()),
  ('device_type', typ.StringType()),
  ('secs_elapsed', typ.DoubleType()),
]

session_schema = typ.StructType([
  typ.StructField(e[0], e[1], True)
  for e in session_labels
])

In [0]:
#read session data.
session_file_location = "/FileStore/tables/Airbnb2/session/session_*.csv"
session_df = spark.read.csv(session_file_location,
                         header=True,
                         schema=session_schema)

In [0]:
#join the session dataset with the train_user dataset.
dataset = join_with_session_df(dataset, session_df)

In [0]:
#dataset after joining the 2 dataset
dataset.toPandas().head(5)

Unnamed: 0,id,date_account_created,timestamp_first_active,date_first_booking,gender,age,signup_method,signup_flow,language,affiliate_channel,...,Linux Desktop,iPhone,Windows Phone,Opera Phone,Tablet,Mac Desktop,iPodtouch,Blackberry,Android Phone,Windows Desktop
0,002dfbmaj5,2012-04-25,20120425183527,,FEMALE,26.0,facebook,0,en,direct,...,0,0,0,0,0,0,0,0,0,0
1,0043i3w366,2012-07-20,20120720025648,2012-07-20,FEMALE,55.0,basic,0,en,direct,...,0,0,0,0,0,0,0,0,0,0
2,006b76pgvn,2012-01-28,20120128034234,2012-06-29,FEMALE,44.0,facebook,2,en,other,...,0,0,0,0,0,0,0,0,0,0
3,008ocspyy3,2013-06-20,20130620171414,2013-06-20,-unknown-,,basic,0,en,sem-brand,...,0,0,0,0,0,0,0,0,0,0
4,009phc0f9r,2011-05-02,20110502032902,2011-05-04,-unknown-,,basic,0,en,direct,...,0,0,0,0,0,0,0,0,0,0


In [0]:
#apply the preprocessing function to fill the null and the outliers in age with the median (because the age have extreme outliers (max_age=2014)) and apply the OHC
dataset = preprocessing(dataset)

In [0]:
#reading the data after preprocessing and OHC
dataset.toPandas().head(5)

Unnamed: 0,gender,age,signup_method,signup_flow,language,affiliate_channel,affiliate_provider,first_affiliate_tracked,signup_app,first_device_type,...,Linux Desktop,iPhone,Windows Phone,Opera Phone,Tablet,Mac Desktop,iPodtouch,Blackberry,Android Phone,Windows Desktop
0,FEMALE,26.0,facebook,0,en,direct,direct,untracked,Web,Mac Desktop,...,0,0,0,0,0,0,0,0,0,0
1,FEMALE,55.0,basic,0,en,direct,direct,untracked,Web,Mac Desktop,...,0,0,0,0,0,0,0,0,0,0
2,FEMALE,44.0,facebook,2,en,other,craigslist,omg,Web,Mac Desktop,...,0,0,0,0,0,0,0,0,0,0
3,-unknown-,33.0,basic,0,en,sem-brand,google,omg,Web,Windows Desktop,...,0,0,0,0,0,0,0,0,0,0
4,-unknown-,33.0,basic,0,en,direct,direct,untracked,Web,Other/Unknown,...,0,0,0,0,0,0,0,0,0,0


In [0]:
#check for the nulls
count_nulls(dataset)

Out[139]: [Row(gender=0, age=0, signup_method=0, signup_flow=0, language=0, affiliate_channel=0, affiliate_provider=0, first_affiliate_tracked=0, signup_app=0, first_device_type=0, first_browser=0, country_destination=0, secs_elapsed=0, booking_response=0, data=0, click=0, message_post=0, view=0, modify=0, partner_callback=0, booking_request=0, submit=0, Android App Unknown Phone/Tablet=0, iPad Tablet=0, Chromebook=0, Linux Desktop=0, iPhone=0, Windows Phone=0, Opera Phone=0, Tablet=0, Mac Desktop=0, iPodtouch=0, Blackberry=0, Android Phone=0, Windows Desktop=0)]

In [0]:
target_column = 'country_destination'

In [0]:
# collecting all the numercial features.
numerical_columns = [ col for col,dtype in dataset.dtypes if dtype.startswith('int') or dtype.startswith('double')]

In [0]:
# collecting all the Categorical features.
categorical_columns = [ col for col,dtype in dataset.dtypes if dtype.startswith('string') and col != target_column ]

### Feature Engineering
 1. applay transformation into labels using indexer
 2. applay transformation into indexers categorical ohe
 3. data vectorization
 4. numerical standardization for data

In [0]:
def feature_engineering(dataset, categorical_columns, targetIndexerModel = None, scalerModel = None, pipeline_model = None):

    indexers = [StringIndexer(inputCol =column, outputCol=column+"_INDEX").setHandleInvalid("keep") for column in categorical_columns if column != 'id']
    ohe_encoders = [OneHotEncoder(inputCol=column+"_INDEX", outputCol="{0}_ENCODED".format(column)) for column in categorical_columns if column != 'id']
    if not pipeline_model:
        pipeline = Pipeline(stages = indexers + ohe_encoders)

        pipeline_model = pipeline.fit(dataset)

    df_transformed = pipeline_model.transform(dataset)
    
    
    if not targetIndexerModel:
        
        stringIndexer = StringIndexer(inputCol = target_column, outputCol = target_column + "_INDEX")

        targetIndexerModel = stringIndexer.fit(df_transformed)
    
        df_transformed = targetIndexerModel.transform(df_transformed)
    
    
    
    unneeded_columns = categorical_columns + [column + "_INDEX" for column in categorical_columns] + [target_column]
    
    df_transformed = df_transformed.drop(*unneeded_columns)
    
    
    
    assembler_inputs = [c for c in df_transformed.columns if c != 'country_destination_INDEX' and c != 'id']
    
#     print(assembler_inputs)
#     return None, None, None
    
    vec_assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")

    df_transformed = vec_assembler.transform(df_transformed).drop(*assembler_inputs)
    
    if not scalerModel:
        scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                            withStd=True, withMean=False)

        # Compute summary statistics by fitting the StandardScaler
        scalerModel = scaler.fit(df_transformed)

    # Normalize each feature to have unit standard deviation.
    df_transformed = scalerModel.transform(df_transformed)
    
    return df_transformed, targetIndexerModel, scalerModel, pipeline_model

In [0]:
df_transformed, targetIndexerModel, scalerModel, pipeline_model = feature_engineering(dataset, categorical_columns)

In [0]:
# stritified sampling 
seed = 42
fractions = df_transformed.select(target_column + "_INDEX").distinct().withColumn("fraction", F.lit(0.8)).rdd.collectAsMap()

In [0]:
train_df = df_transformed.stat.sampleBy(target_column + "_INDEX", fractions, seed)
test_df = df_transformed.subtract(train_df)

In [0]:
#train_df.count()
#test_df.count()

In [0]:
tot = train_df.count()
train_df.groupBy('country_destination_INDEX')\
       .count()\
       .withColumn('percentage',
                      F.round((F.col('count') / tot) * 100 , 3 )
                  ).show()

+-------------------------+-----+----------+
|country_destination_INDEX|count|percentage|
+-------------------------+-----+----------+
|                      8.0|  833|     0.487|
|                      0.0|99706|    58.349|
|                      7.0| 1148|     0.672|
|                      1.0|49888|    29.195|
|                      4.0| 2253|     1.318|
|                     11.0|  173|     0.101|
|                      3.0| 4048|     2.369|
|                      2.0| 8112|     4.747|
|                     10.0|  428|      0.25|
|                      6.0| 1788|     1.046|
|                      5.0| 1898|     1.111|
|                      9.0|  603|     0.353|
+-------------------------+-----+----------+



### Over Sampling

1. smot didn't work with databricks community pyspark 
so we oversampled randomly.
2. we did the over sampling because most of the features were unbalanced.

In [0]:
combined_df = train_df.filter(F.col("country_destination_INDEX") == 0)
major_size = combined_df.count()
for i in range(1,len(targetIndexerModel.labels)):
    minor_df = train_df.filter(F.col("country_destination_INDEX") == i)
    
    ratio = math.ceil(major_size/minor_df.count())

    a = range(ratio)

    # duplicate the minority rows
    oversampled_df = minor_df.withColumn("dummy", F.explode(F.array([F.lit(x) for x in a]))).drop('dummy')

    # combine both oversampled minority rows and previous majority rows 
    combined_df = combined_df.unionAll(oversampled_df)

In [0]:
tot = combined_df.count()
combined_df.groupBy('country_destination_INDEX')\
       .count()\
       .withColumn('percentage',
                      F.round((F.col('count') / tot) * 100 , 3 )
                  ).show()

+-------------------------+------+----------+
|country_destination_INDEX| count|percentage|
+-------------------------+------+----------+
|                      0.0| 99547|     8.254|
|                      1.0| 99686|     8.266|
|                      2.0|104975|     8.705|
|                      3.0|105716|     8.766|
|                      4.0|100100|       8.3|
|                      5.0| 99684|     8.266|
|                      6.0| 99848|     8.279|
|                      7.0| 99680|     8.265|
|                      8.0| 99592|     8.258|
|                      9.0| 97405|     8.077|
|                     10.0| 95172|     7.892|
|                     11.0|104574|     8.671|
+-------------------------+------+----------+



In [0]:
train_df = combined_df

# ONE VS All Gradient Boost Classfier

In [0]:
# instantiate the base classifier.
gpt= GBTClassifier(maxIter=10, featuresCol='scaledFeatures', labelCol='country_destination_INDEX')

# instantiate the One Vs Rest Classifier.
ovr = OneVsRest(classifier=gpt,featuresCol='scaledFeatures', labelCol='country_destination_INDEX')

# train the multiclass model.
model = ovr.fit(train_df)

In [0]:
# cv = CrossValidator(
#  estimator=ovr,
#  estimatorParamMaps=grid,
#  evaluator=evaluator
# )
# cv.fit(train_df)

# Naive Baise

In [0]:
# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial",featuresCol='features', labelCol='country_destination_INDEX')

# train the model
model = nb.fit(train_df)

In [0]:
# select example rows to display.
predictions = model.transform(test_df)
predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="country_destination_INDEX", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

+-------------------------+--------------------+--------------------+--------------------+--------------------+----------+
|country_destination_INDEX|            features|      scaledFeatures|       rawPrediction|         probability|prediction|
+-------------------------+--------------------+--------------------+--------------------+--------------------+----------+
|                      0.0|(155,[0,1,2,4,5,7...|(155,[0,1,2,4,5,7...|[-942.38712914413...|[0.99997276009051...|       0.0|
|                      0.0|(155,[0,1,2,4,5,7...|(155,[0,1,2,4,5,7...|[-963.54079447141...|[0.99968924596429...|       0.0|
|                      0.0|(155,[0,1,2,25,29...|(155,[0,1,2,25,29...|[-842.49198853571...|[0.99961730353728...|       0.0|
|                      0.0|(155,[0,2,4,5,6,7...|(155,[0,2,4,5,6,7...|[-527.10844699807...|[0.96305560874000...|       0.0|
|                      0.0|(155,[0,2,4,5,6,7...|(155,[0,2,4,5,6,7...|[-819.00680885825...|[0.94363661310047...|       0.0|
|               

In [0]:
from pyspark.mllib.evaluation import MulticlassMetrics

preds_and_labels = predictions.select(['prediction','country_destination_INDEX'])

metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))

print(evaluator.confusionMatrix().toArray())

# Sumbit Kaggle

In [0]:
dataset_file_location = "/FileStore/tables/Airbnb2/test_users.csv"
test_dataset = spark.read.csv(dataset_file_location,
                         header=True,
                         schema=dataset_schema[:-1])

In [0]:
test_dataset = join_with_session_df(test_dataset, session_df)

In [0]:
test_dataset = preprocessing(test_dataset, include_id = False)

In [0]:
df_test_transformed, _, _, _ = feature_engineering(test_dataset, categorical_columns, targetIndexerModel, scalerModel, pipeline_model)

In [0]:
df_test_transformed.toPandas().head(5)

Unnamed: 0,id,features,scaledFeatures
0,qxqrm3zij8,"(31.0, 0.0, 520989.0, 0.0, 1.0, 1.0, 1.0, 1.0,...","(3.420554786739142, 0.0, 0.551339210460597, 0...."
1,ab6t9i0zbh,"(31.0, 0.0, 2758326.0, 0.0, 1.0, 1.0, 0.0, 1.0...","(3.420554786739142, 0.0, 2.9190122613585636, 0..."
2,nh4n5oqhg0,"(26.0, 0.0, 109864.0, 0.0, 1.0, 1.0, 0.0, 1.0,...","(2.868852401781216, 0.0, 0.11626412653250459, ..."
3,0vik5hv9lx,"(44.0, 0.0, 546055.0, 0.0, 1.0, 1.0, 0.0, 1.0,...","(4.85498098762975, 0.0, 0.5778654301109262, 0...."
4,at8y572uuv,"(31.0, 0.0, 2910400.0, 0.0, 1.0, 1.0, 1.0, 1.0...","(3.420554786739142, 0.0, 3.0799453311385108, 0..."


In [0]:
predictions_test = model.transform(df_test_transformed)
predictions_test = IndexToString(inputCol="prediction", outputCol=target_column+"_label", labels=targetIndexerModel.labels).transform(predictions_test)

In [0]:
predictions_test.select('id',target_column+'_label').write.format('csv').option('header',True).mode('overwrite').option('sep','|').save('/FileStore/tables/Airbnb2/XGboost_results.csv')