# 0. Imports

## 0.1. Libraries

In [31]:
# data manipulation
from pyspark.sql import SparkSession

# preprocessing
from pyspark.ml.feature import VectorAssembler, MinMaxScaler, StringIndexer
from pyspark.sql.functions import col, count

## 0.2. Data acquisition

In [2]:
MAX_MEMORY = '6g'

# instantiate spark session object
spark = SparkSession.builder \
                    .master('local[*]') \
                    .config('spark.local.dir', '/tmp') \
                    .config('spark.submit.deployMode', 'client') \
                    .config('spark.executor.instances', '16') \
                    .config('spark.driver.memory', MAX_MEMORY) \
                    .config('spark.executor.memory', MAX_MEMORY) \
                    .config('spark.executor.memoryOverhead', MAX_MEMORY) \
                    .config('spark.sql.debug.maxToStringFields', '100') \
                    .appName('airbnb-first-booking-prediction') \
                    .getOrCreate()

22/06/08 16:39:35 WARN Utils: Your hostname, irish resolves to a loopback address: 127.0.1.1; using 192.168.0.17 instead (on interface enp3s0)
22/06/08 16:39:35 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/08 16:39:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/06/08 16:39:35 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).


In [3]:
# read checkpoint dataset
df = spark.read.parquet('../data/interim/processed')

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

# 1. Data preprocessing

## 1.1. Data split

In [4]:
# split data into train (80%) and validation (20%)
train, val = df.randomSplit([0.8, 0.2], seed=0)

In [5]:
# data
X_train = train.drop('country_destination')
X_val = val.drop('country_destination')

# labels
y_train = train.select('country_destination')
y_val = val.select('country_destination')

In [6]:
# define categorical and numerical columns
cat_cols = [col for (col, dtype) in train.dtypes if dtype == 'string']
num_cols = list(set(train.columns) - set(cat_cols))

## 1.2. Rescale

### Min max scaler

In [7]:
# instantiate vector assembler
va_num = VectorAssembler(inputCols=['secs_elapsed'], outputCol='secs_elapsed_vect')

# transform
X_train_vect = va_num.transform(X_train)
X_val_vect = va_num.transform(X_val)

In [8]:
# instantiate and save the scaler
mms = MinMaxScaler(inputCol='secs_elapsed_vect', outputCol='secs_elapsed_scaled').fit(X_train_vect)
mms.write().overwrite().save('../parameters/mms')

# rescale
X_train = mms.transform(X_train_vect).drop('secs_elapsed', 'secs_elapsed_vect')
X_val = mms.transform(X_val_vect).drop('secs_elapsed', 'secs_elapsed_vect')

                                                                                

## 1.3. Encoding

### Label encoder

In [9]:
# columns to enconde
label_cols = ['gender', 'first_device_type', 'signup_method', 'signup_app']

for col in label_cols:
    # instantiate and save each caler
    si = StringIndexer(inputCol=col, outputCol=col + '_encoded').fit(X_train)
    si.write().overwrite().save('../parameters/si_' + col)
    
    # encode
    X_train = si.transform(X_train).drop(col)
    X_val = si.transform(X_val).drop(col)

                                                                                

### Frequency encoding

In [40]:
X_train_length = X_train.count()
X_val_length = X_val.count()

                                                                                

In [43]:
X_train.groupBy('language') \
       .agg((count('language') / X_train_length).alias('frequency')) \
       .show()



+--------+--------------------+
|language|           frequency|
+--------+--------------------+
|      en|  0.9634236100927227|
|      pl|1.541754790173585...|
|      pt|0.001579392565340...|
|      ko|0.006378627096633176|
|      cs|1.025977871218588...|
|      tr|1.750853541101286...|
|      de|0.002505281834448434|
|      is|1.505511006679450...|
|      es|0.003971761074288047|
|      el|1.781521357904016...|
|      it|0.001649649745652...|
|      sv|5.634514341665128E-4|
|      nl|3.515646998931087E-4|
|      hu|2.899502679530793...|
|      ca|9.757941709959402E-6|
|      ru|0.001656898502351...|
|      th|7.025718031170769E-5|
|      no|3.038901846815928E-5|
|      zh|0.011316703198541772|
|      fr|0.004262547737244837|
+--------+--------------------+
only showing top 20 rows





In [57]:
# columns to encode
frequency_cols = ['signup_flow', 'language', 'affiliate_channel', 'affiliate_provider',
                  'first_affiliate_tracked', 'first_device_type', 'first_browser',
                  'action','action_type', 'action_detail', 'device_type']

for col in frequency_cols:
    # instantiate and save each caler
    si = StringIndexer(inputCol=col, outputCol=col + '_encoded').fit(X_train)
    si.write().overwrite().save('../parameters/si_' + col)
    
    # encode
    X_train = si.transform(X_train).drop(frequency_cols)
    X_val = si.transform(X_val).drop(frequency_cols)

## 1.4. Transformation

In [None]:
transform_cols = ['week_of_year_account_created', 'timestamp_first_active_week_of_year',
                  'day_of_week_first_booking', 'timestamp_first_active_day_of_week',
                  'week_of_year_first_booking', 'day_of_week_account_created', 'month_account_created',
                  'day_account_created', 'timestamp_first_active_month', 'timestamp_first_active_day',
                  'month_first_booking', 'day_first_booking']

# 2. Feature selection

## 2.1. Feature importance

## 2.2. Select columns