In [None]:
%%cleanup -f

In [None]:
%%configure -f
{"driverMemory": "48G", "executorMemory": "48G", "executorCores": 6, "numExecutors": 20}

In [None]:
spark.sql("set spark.sql.caseSensitive=true")

In [None]:
%%info

In [None]:
source_dir = ''
feature_set_name = ''
features_dir = ''

In [None]:
from pyspark.sql.functions import col, when

ddf = spark.read.parquet('%s/*' % source_dir)
conversions_ddf = ddf.filter((ddf['interactionType'] == 'conversion'))\
         .dropDuplicates()\
         .replace('', 'undefined')

In [None]:
conversions_ddf.printSchema()

In [None]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *

sqlC = SQLContext(spark.sparkContext)

In [None]:
column_map = {
    'trackId': 'user_id',
    'conversion_variables_destination': 'item_destination_raw',
    'conversion_variables_origin': 'item_origin_raw',
    'conversion_variables_booking_timestamp': 'item_booking_timestamp',
    'conversion_variables_dd': 'item_departure_date_1',
    'conversion_variables_rd': 'item_return_date_1',
    'conversion_variables_returnDate': 'item_return_date_2',
    'conversion_variables_travelClass': 'item_travel_class_raw',
    'conversion_variables_pax': 'item_pax_raw',
    'conversion_variables_pos': 'user_location',
    'conversion_variables_CustomerID': 'user_customer_id',
    'conversion_variables_SocialAge': 'user_age_bracket',
    'conversion_variables_AwardMiles': 'user_loyalty_points',
    'conversion_variables_curr': 'user_currency',
    'conversion_variables_country': 'user_country',
    'conversion_variables_device': 'user_device',
    'conversion_variables_language': 'user_language'
}

In [None]:
from pyspark.sql.functions import col

base_ddf = conversions_ddf.select([col(k).alias(v) for k, v in column_map.items()]).fillna(0)

In [None]:
# conversions_ddf.show(3, truncate = False)

In [None]:
# conversions_ddf.count()

In [None]:
from pyspark.ml.feature import SQLTransformer
from pyspark.sql.functions import pandas_udf, udf


def simple_map_operation(sql_context, f, columns_names_list, output_col_name, output_spark_type):
    f = udf(f, output_spark_type)
    return udf_to_transformer(sql_context, f, columns_names_list, output_col_name)


def udf_to_transformer(sql_context, f, columns_names_list, output_col_name):
    function_registered_name = f.__name__ + '_udf_version'
    sql_context.udf.register(function_registered_name, f)

    function_call = function_registered_name + str(tuple(columns_names_list)).replace("'", '')
    if len(columns_names_list) == 1:
        function_call = function_call.replace(',', '')

    sql_request = 'SELECT *, ' + function_call + ' AS ' + output_col_name + ' FROM __THIS__'
    return SQLTransformer(statement=sql_request)

### data cleaning

In [None]:
import re

def airport_cleaner(airport):
    if airport is not None and len(airport) >= 3:
        return re.sub(r'\W+', '', airport)
    else:
        return 'UNK'

In [None]:
origin_cleaner = simple_map_operation(sqlC, airport_cleaner, ['item_origin_raw'], 'item_origin', StringType())

In [None]:
destination_cleaner = simple_map_operation(sqlC, airport_cleaner, ['item_destination_raw'], 'item_destination', StringType())

In [None]:
def business_or_economy(travelClass):
    business_class = ['J', 'C', 'D', 'I', 'Z', 'O']
    try:
        if any(x in business_class for x in travelClass):
            return 'business'
        else:
            return 'economy'
    except TypeError:
        pass

In [None]:
class_transformer = simple_map_operation(sqlC, business_or_economy, ['item_travel_class_raw'], 'item_travel_class', StringType())

In [None]:
def item_assembler(origin, destination):
    if origin is not None and destination is not None:
        return origin + '-' + destination
    else:
        return 'undefined'

In [None]:
item_transformer = simple_map_operation(sqlC, item_assembler, ['item_origin', 'item_destination'], 'item_id', StringType())

In [None]:
def pax_cleaner(item_pax_raw):
    if item_pax_raw == 'undefined' or item_pax_raw is None:
        return 0
    else:
        return int(item_pax_raw)

In [None]:
pax_transformer = simple_map_operation(sqlC, pax_cleaner, ['item_pax_raw'], 'item_pax', IntegerType())

In [None]:
from datetime import datetime as dt

def merge_date_columns(primary_column, secondary_column):
    if primary_column and primary_column != 'undefined':
        return dt.strptime(primary_column, '%Y-%m-%d')
    elif secondary_column and secondary_column != 'undefined':
        return dt.strptime(secondary_column, '%Y-%m-%d')
    else:
        return None

In [None]:
from datetime import datetime as dt

def merge_timestamp_columns(primary_column):
    if primary_column and primary_column != 'undefined':
        return dt.strptime(primary_column, '%Y-%m-%dT%H:%M:%S.%fZ')
    else:
        return None

In [None]:
def date_columns(primary_column):
    if primary_column and primary_column != 'undefined':
        return dt.strptime(primary_column, '%Y-%m-%d')
    else:
        return None

In [None]:
def airline_membership(primary_column):
    if primary_column > 0:
        return 1
    else:
        return 0

In [None]:
departure_date_transformer = simple_map_operation(sqlC, date_columns, ['item_departure_date_1'], 'item_departure_date', DateType())

In [None]:
return_date_transformer = simple_map_operation(sqlC, merge_date_columns, ['item_return_date_1', 'item_return_date_2'], 'item_return_date', DateType())

In [None]:
booking_timestamp_transformer = simple_map_operation(sqlC, merge_timestamp_columns, ['item_booking_timestamp'], 'item_booking_date', DateType())

In [None]:
membership_transformer = simple_map_operation(sqlC, airline_membership, ['user_loyalty_points'], 'user_member', IntegerType())

In [None]:
airline_transformers = [origin_cleaner] + [destination_cleaner] + [class_transformer] + [item_transformer] + [pax_transformer] + [departure_date_transformer] + [return_date_transformer] + [booking_timestamp_transformer] + [membership_transformer]

### Generic destination-recommendation features

In [None]:
from datetime import date

def days_between(start_date, end_date, no_end_date=1):
    if start_date is not None and end_date is not None:
        return (end_date - start_date).days
    else:
        return no_end_date

In [None]:
los_transformer = simple_map_operation(sqlC, days_between, ['item_departure_date', 'item_return_date'], 'item_los', IntegerType())

In [None]:
dbd_transformer = simple_map_operation(sqlC, days_between, ['item_departure_date', 'item_booking_date'], 'user_dbd', IntegerType())

In [None]:
import datetime as dt
def holiday_date_mapper(departure_date):
    def calc_easter(year):
        "Returns Easter as a date object."
        a = year % 19
        b = year // 100
        c = year % 100
        d = (19 * a + b - b // 4 - ((b - (b + 8) // 25 + 1) // 3) + 15) % 30
        e = (32 + 2 * (b % 4) + 2 * (c // 4) - d - (c % 4)) % 7
        f = d + e - 7 * ((a + 11 * d + 22 * e) // 451) + 114
        month = f // 31
        day = f % 31 + 1
        date_ = dt.date(year, month, day)
        return date_
    
    if departure_date is not None and departure_date != 'undefined':
        if departure_date.isocalendar()[1] in (51, 52):
            return 'christmas'
        elif departure_date.isocalendar()[1] == (1):
            return 'new years'
        elif departure_date.month == (7 or 8):
            return 'summer'
        elif departure_date == calc_easter(departure_date.year):
            return 'easter'
        else:
            return 'no holiday'
    else:
        return 'no holiday'

In [None]:
holiday_transformer = simple_map_operation(sqlC, holiday_date_mapper, ['item_departure_date'], 'item_holiday', StringType())

In [None]:
drec_transformers = [los_transformer] + [dbd_transformer] + [holiday_transformer]

In [None]:
numerical_features = ['item_pax', 'item_los', 'user_dbd']
categorical_features = ['item_travel_class', 'item_holiday', 'item_origin', 'item_destination',\
                        'user_member', 'user_age_bracket', 'user_country', 'user_language', 'user_currency']
indexed_cat_cols = [c + '_idx' for c in categorical_features]
ohe_cat_cols = [c.replace('_idx', '_') for c in indexed_cat_cols]
user_features = list(filter(lambda c: c.startswith('user_'), ohe_cat_cols + numerical_features))
item_features = list(filter(lambda c: c.startswith('item_'), ohe_cat_cols + numerical_features))

In [None]:
from pyspark.ml.feature import StringIndexer, SQLTransformer, OneHotEncoderEstimator, VectorAssembler
from pyspark.ml import Pipeline

user_indexer = StringIndexer(inputCol='user_id', outputCol='user_idx', handleInvalid='keep')
item_indexer = StringIndexer(inputCol='item_id', outputCol='item_idx', handleInvalid='keep')
feature_indexers = [StringIndexer(inputCol=col, outputCol=col +'_idx', handleInvalid='keep') for col in categorical_features]
onehotencoders = OneHotEncoderEstimator(inputCols=indexed_cat_cols, outputCols=ohe_cat_cols, handleInvalid='keep', dropLast=False)
item_feature_assembler = VectorAssembler(inputCols=item_features, outputCol='item_features')
user_feature_assembler = VectorAssembler(inputCols=user_features, outputCol='user_features')

features_pipeline = Pipeline(stages=airline_transformers + drec_transformers + [user_indexer] + [item_indexer] + feature_indexers + [onehotencoders] + [item_feature_assembler] + [user_feature_assembler])

In [None]:
feature_model = features_pipeline.fit(base_ddf)

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DoubleType

array_udf = udf(lambda value: value.toArray().tolist(), ArrayType(DoubleType()))

In [None]:
feature_ddf = feature_model.transform(base_ddf).select(col('user_idx').cast('int').alias('user'), col('item_idx').cast('int').alias('item'), 'item_booking_date', 'item_departure_date', array_udf('item_features').alias('single_item_features'), array_udf('user_features').alias('single_user_features'))

In [None]:
feature_model.stages[1].write().overwrite().save('%s/user_index_%s' % (features_dir, feature_set_name))
feature_model.stages[2].write().overwrite().save('%s/item_index_%s' % (features_dir, feature_set_name))

In [None]:
feature_ddf.printSchema()

In [None]:
feature_ddf.show()

In [None]:
print('unique users: %s; unique items: %s' % (feature_ddf.select('user').distinct().count(), feature_ddf.select('item').distinct().count()))

In [None]:
# departures_ddf = feature_ddf.groupBy('item_departure_date').count().withColumnRenamed('count', 'departures')
# conversions_ddf = feature_ddf.groupBy('item_booking_date').count().withColumnRenamed('count', 'conversions')
# display(departures_ddf.join(conversions_ddf, departures_ddf.item_departure_date == conversions_ddf.item_booking_date).withColumnRenamed('item_booking_date', 'date').select('date', 'conversions', 'departures').orderBy(col('date').asc()))

In [None]:
train_ddf, test_ddf = feature_ddf.randomSplit([0.9, 0.1], seed = 93101)

In [None]:
print('train observations: %s; evaluation observations: %s' % (train_ddf.count(), test_ddf.count()))

In [None]:
from pyspark.sql.functions import size

num_item_features = feature_ddf.select(size('single_item_features').alias('nif')).first().nif
num_user_features = feature_ddf.select(size('single_user_features').alias('nuf')).first().nuf
print('item features: %s\nuser features: %s' % (num_item_features, num_user_features))

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import  StringType

vec_to_string_udf = udf(lambda value: ', '. join([str(d) for d in value]), StringType())

In [None]:
from pyspark.sql.functions import col, array, sum, count

agg_test_ddf = test_ddf.groupBy('user', 'item').agg(array(*[sum(col('single_item_features')[i]) for i in range(num_item_features)]).alias('item_features_vector'), array(*[sum(col('single_user_features')[i]) for i in range(num_user_features)]).alias('user_features_vector'), count(col('item')).alias('num_bookings')).select('user', 'item', 'num_bookings', vec_to_string_udf('item_features_vector').alias('item_features'), vec_to_string_udf('user_features_vector').alias('user_features'))

In [None]:
agg_train_ddf = train_ddf.groupBy('user', 'item').agg(array(*[sum(col('single_item_features')[i]) for i in range(num_item_features)]).alias('item_features_vector'), array(*[sum(col('single_user_features')[i]) for i in range(num_user_features)]).alias('user_features_vector'), count(col('item')).alias('num_bookings')).select('user', 'item', 'num_bookings', vec_to_string_udf('item_features_vector').alias('item_features'), vec_to_string_udf('user_features_vector').alias('user_features'))

In [None]:
agg_full_ddf = feature_ddf.groupBy('user', 'item').agg(array(*[sum(col('single_item_features')[i]) for i in range(num_item_features)]).alias('item_features_vector'), array(*[sum(col('single_user_features')[i]) for i in range(num_user_features)]).alias('user_features_vector'), count(col('item')).alias('num_bookings')).select('user', 'item', 'num_bookings', vec_to_string_udf('item_features_vector').alias('item_features'), vec_to_string_udf('user_features_vector').alias('user_features'))

In [None]:
agg_test_ddf.show()

In [None]:
agg_test_ddf.printSchema()

In [None]:
dev_ddf, rest_ddf = agg_train_ddf.randomSplit([0.05, 0.95], 411)
dev_ddf.coalesce(1).write.mode('overwrite').csv('%s/dev_%s' % (features_dir, feature_set_name), sep='\t', header='true')

In [None]:
agg_test_ddf.coalesce(1).write.mode('overwrite').csv('%s/evaluation_%s' % (features_dir, feature_set_name), sep='\t', header='true')

In [None]:
agg_train_ddf.coalesce(1).write.mode('overwrite').csv('%s/train_%s' % (features_dir, feature_set_name), sep='\t', header='true')

In [None]:
agg_full_ddf.coalesce(1).write.mode('overwrite').csv('%s/full_%s' % (features_dir, feature_set_name), sep='\t', header='true')

Historical bookings in the training data

In [None]:
from pyspark.sql.functions import concat_ws, collect_list

history_ddf = train_ddf.groupby('user').agg(concat_ws(',', collect_list(col('item'))).alias('historical_destinations'))

In [None]:
history_ddf.show(truncate = False)

In [None]:
history_ddf.coalesce(1).write.mode('overwrite').csv('%s/train_user_history_%s' % (features_dir, feature_set_name), sep='\t', header='true')