In [100]:
from IPython.display import Image
from IPython.core.display import HTML 
Image(url="https://www.tuigroup.com/damfiles/default/tuigroup-15/en/media/images-press-releases/logo_hotelbeds_group-98bdb34cb55c8a3138b8e0aa614e2c30.jpg" ,width=300, height=300)

In [103]:
from IPython.display import HTML
HTML('''<script>
code_show=true; 
function code_toggle() {
 if (code_show){
 $('div.input').hide();
 } else {
 $('div.input').show();
 }
 code_show = !code_show
} 
$( document ).ready(code_toggle);
</script>
The raw code for this IPython notebook is by default hidden for easier reading.
To toggle on/off the raw code, click <a href="javascript:code_toggle()">here</a>.''')

In [102]:
from IPython.display import HTML
style = "<style>div.warn { background-color: #fcf2f2;border-color: #dFb5b4; border-left: 5px solid #dfb5b4; padding: 0.5em;}</style>"
HTML(style)

# Datathon - Working File
<div class="warn">
*Institution:* IE HST - Master Business Analytics <br />
*Authour:* Group F, Lead: Shaurya Rawat <br />
*Date:* May/2017 <br />
</div>

## Table of contents | Summary
<a href='#gm'>1.General Remarks'</a> <br>
<a href='#lib'>2.Libraries and UDF'</a> <br>
<a href='#3'>3. Load and Pre-Process Table: Clients '</a> <br>
<a href='#4'>4. Load, Pre-Process and Join tables: Clients and 1st Booking'</a> <br>
<a href='#5'>5. Load and Pre-Process Data: Bookings Transfer'</a> <br>
<a href='#6'>6. Load, Pre-Process and Incidents | Join  Bookings'</a> <br>
<a href='#7'>7. Aggregate Bookings_Transfer and Join: Clients_1'</a> <br>
<a href='#8'>8. Load and Pre-Process Data: Bookings Activities'</a> <br>
<a href='#9'>9. Join Incidents, Promotional Deals and Bundles to booking_act'</a> <br>
<a href='#10'>10. Load, Pre-Process and Join Booking_Act: Products'</a> <br>
<a href='#11'>11. Store booking_act_2 to HIVE '</a> <br>
<a href='#12'>12. Aggregate Bookings_Activities and Join: Clients_2'</a> <br>
<a href='#13'>13. Load, Pre-Process and Join Clients_3: Demand Transfer'</a> <br>
<a href='#14'>14. Load, Pre-Process and Join Clients_4: Demand Activities'</a> <br>
<a href='#15'>15. Consolidate Data for Model'</a> <br>
<a href='#16'>16. High-level validation'</a> <br>
<a href='#17'>17. Code - Backlog'</a> <br>

## 1. General Remarks <a id='gr'></a>
<div class="warn">
Data is loaded from cluster and processed as dataframe with Pyspark. Although I am used and faster in programming in pandas, I tried to avoid conversion to PANDAS-DF, to allocate ressources better. In the following chuncks data is loaded, cleaned and combined. Different outputs are intermediate, for EDA or finally used for the modelling of customer segmentation. Libraries and UDF are loaded / defined right in the beginning.
</div>

## 2. Libraries and UDFs

In [3]:
#####################################  IMPORT LIBRARIES ######################################
import pandas as pd
import matplotlib.pyplot as plt; plt.rcdefaults()
import numpy as np
import scipy.stats as stats

#for date transformation
from datetime import datetime
from pyspark.sql.functions import col, udf, substring, desc, concat, lit, date_format, dayofmonth, regexp_replace, trim, col, lower
from pyspark.sql import functions as F


#for data type transformation
from pyspark.sql.types import DateType,DoubleType, IntegerType, FloatType
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.types import *
import datetime


# this allows plots to appear directly in the notebook
%matplotlib inline



#####################################  USER DEFINED FUNCTIONS ######################################
#cast to date
func =  udf (lambda x: datetime.strptime(x, '%d%m%Y'), DateType())


# count missing data
def count_nulls(df):
    null_counts = []          #make an empty list to hold our results
    for col in df.dtypes:     #iterate through the column data types we saw above, e.g. ('C0', 'bigint')
        cname = col[0]        #splits out the column name, e.g. 'C0'    
        ctype = col[1]        #splits out the column type, e.g. 'bigint'
        if ctype != 'string': #skip processing string columns for efficiency (can't have nulls)
            nulls = df.where( df[cname].isNull() ).count()
            result = tuple([cname, nulls])  #new tuple, (column name, null count)
            null_counts.append(result)      #put the new tuple in our result list
    return null_counts


#to add up columns
def increase (old, new): return old+new
udfIncrease=udf(increase)

<div class="warn">
all included in the beginning, please add libraries only here
</div>

## 3. Load and Pre-Process Table: Clients <a id='3'></a>

In [4]:
# load data # clean headers
clients = spark.read.csv("/datathon/IE_Challenge_3.Clients v2.txt", header=True, sep="#")
for name in clients.schema.names: clients = clients.withColumnRenamed(name, name.replace(' ', '_'))
for name in clients.schema.names: clients = clients.withColumnRenamed(name, name.replace('(', ''))
for name in clients.schema.names: clients = clients.withColumnRenamed(name, name.replace(')', ''))
for name in clients.schema.names: clients = clients.withColumnRenamed(name, name.lower())
clients = clients.withColumnRenamed('database_target_formula','lifecycle')
clients = clients.withColumnRenamed('country_commercial','country_home')
clients = clients.withColumnRenamed('market_country','country_market')


# create key (for booking, 1st booking, demand) and general unique identifier and add simple counter
clients_0 = clients.withColumn("pk_client",F.concat(col("atlas_number"), lit(""), col("branch_number")))
clients_0 = clients_0.withColumn('count', lit(1))
                        
    
# create int features for segmentation
clients_0 = clients_0.withColumn("lifecycle",F.substring(clients_0.lifecycle, 0, 1))


# remove not-needed features and duplicates (we need one too many relationship for date modelling part)
# atlas_code_distribution_type_details is poorly maintained 
# clients.select('atlas_code_distribution_type_details').show(5000)
drop_list = ['atlas_code_distribution_type_details','atlas_number','branch_number','commercial_stage','atlas_status','homeworker','inactive_reason','inactive_date','acquisition_channel_type','acquisition_channel_description','customer_acquisition_date','city_commercial','post_code_commercial']
clients_0 = clients_0.select([column for column in clients_0.columns if column not in drop_list])
clients_0 = clients_0.dropDuplicates()

<div class="warn">
With the above chunk we loaded and cleaned all data from the client table.
This dataframe contains all keys needed to connect other information.  
Output is the df __clients_0__.
</div>

## 4. Load, Pre-Process and Join tables: Clients and 1st Booking <a id='4'></a>

In [41]:
#load data # clean column names
first_bookings = spark.read.csv("/datathon/IE_Challenge_5.ClientDate1stBooking v2.txt", header=True, sep="#")
for name in first_bookings.schema.names: first_bookings = first_bookings.withColumnRenamed(name, name.lower())
for name in first_bookings.schema.names: first_bookings = first_bookings.withColumnRenamed(name, name.replace(' ', '_'))
first_bookings = first_bookings.withColumnRenamed('product', 'first_product') #ambigous
first_bookings = first_bookings.withColumnRenamed('first_booking', 'first_booking_date')

# create key to connect bookings
first_bookings_0 = first_bookings.withColumn('fk_first_booking',F.concat(col("client"), lit(""), col("branch")))


# create new feature - here days since first booking
timeFmt = "dd-MM-yyyy"
today = "02-07-2017"
first_bookings_0 = first_bookings_0.withColumn("today",F.lit(today))
first_bookings_0 = first_bookings_0.withColumn("first_booking_date_clean",F.substring(first_bookings_0.first_booking_date, 0, 10))
timeDiff = (F.unix_timestamp('today', format=timeFmt)-F.unix_timestamp('first_booking_date_clean', format=timeFmt))/24/3600
first_bookings_0 = first_bookings_0.withColumn("customer_since", timeDiff)


# remove not-needed features
first_bookings_0 = first_bookings_0.dropDuplicates()
drop_list = ['client','branch','last_booking']
first_bookings_EDA = first_bookings_0.select([column for column in first_bookings_0.columns if column not in drop_list])


# save table to Hive for EDA
first_bookings_EDA.write.mode("overwrite").saveAsTable("coyote.first_bookings_EDA")

In [42]:
# group first bookings by client, only client should have only one first booking date
first_bookings_1 = first_bookings_0.groupBy("fk_first_booking").agg({"customer_since": "min"})
for name in first_bookings_1.schema.names: first_bookings_1 = first_bookings_1.withColumnRenamed(name, name.replace('(', '_'))
for name in first_bookings_1.schema.names: first_bookings_1 = first_bookings_1.withColumnRenamed(name, name.replace(')', ''))

In [43]:
# now join the dataframes
clients_0 = clients_0.alias('clients_0')
first_bookings_1 = first_bookings_1.alias('first_bookings_1')
clients_1 = clients_0.join(first_bookings_1, clients_0.pk_client == first_bookings_1.fk_first_booking, "right") 

<div class="warn">
With the above two chunk we loaded and cleaned all data from the __1st Booking__ table.<br>
Furthermore relevant information was added to the forme client_0 table.<br>
Output of these two steps is the table __clients_1__.
</div>

## 5. Load and Pre-Process Data: Bookings Transfer <a id='5'></a>

In [106]:
# load data and clean columnnames
booking_transfer = spark.read.csv("/datathon/Q2_Transfers_Bookings.txt", header=True, sep="|")
for name in booking_transfer.schema.names:booking_transfer = booking_transfer.withColumnRenamed(name, name.lower())
booking_transfer_0 = booking_transfer


# key to match incidents / is unique booking_id at the same time | key to match clients
booking_transfer_0 = booking_transfer_0.withColumn("pk_incident",F.concat(col("incoming_office"), lit(""), col("booking"), lit(""), col("service_order")))
booking_transfer_0 = booking_transfer_0.withColumn("fk_client",F.concat(col("client"), lit(""), col("branch")))


# create new features (dates and simple counter)
# cast to date function does not work # booking_transfer = booking_transfer.withColumn("booking_date_clean", booking_transfer["booking_date_clean"].cast(DateType()))
booking_transfer_0 = booking_transfer_0.withColumn('count', lit(1))
booking_transfer_0 = booking_transfer_0.withColumn("booking_date_clean",F.substring(booking_transfer_0.booking_date, 0, 10))
booking_transfer_0 = booking_transfer_0.withColumn("service_date_clean",F.substring(booking_transfer_0.service_date, 0, 10))
booking_transfer_0 = booking_transfer_0.withColumn("booking_ahead",F.datediff(booking_transfer_0.service_date_clean, booking_transfer_0.booking_date_clean))
booking_transfer_0 = booking_transfer_0.withColumn("cancellation_date_clean",F.substring(booking_transfer_0.cancellation_date, 0, 10))
booking_transfer_0 = booking_transfer_0.withColumn("cancel_ahead",F.datediff(booking_transfer_0.service_date_clean, booking_transfer_0.cancellation_date_clean))


# reduce dimensionality when creating new features | named for EDA | numeric for modelling kMeans
booking_transfer_0 = booking_transfer_0.withColumn("interface_clean",
                                F.when(booking_transfer_0.interface_desc.like("%HOTEL%"),"Hotelbeds").\
                                when(booking_transfer_0.interface_desc.like("%BED%"),"Bedsonline").\
                                otherwise("other"))

booking_transfer_0 = booking_transfer_0.withColumn("interface_hotelbeds",
                                F.when(booking_transfer_0.interface_desc.like("%HOTEL%"),"1").\
                                otherwise("0"))

booking_transfer_0 = booking_transfer_0.withColumn("with_children",
                                F.when(booking_transfer_0.children>0,"1").\
                                when(booking_transfer_0.infants>0,"1").\
                                otherwise("0"))

booking_transfer_0 = booking_transfer_0.withColumn("application_clean",
                                F.when(booking_transfer_0.application.like("%lution%"),"web").\
                                when(booking_transfer_0.application.like("%xml%"),"xml").\
                                when(booking_transfer_0.application.like("%app%"),"xml").\
                                when(booking_transfer_0.application.like("%api%"),"api").\
                                otherwise("irrelevant"))

booking_transfer_0 = booking_transfer_0.withColumn("application_web",
                                F.when(booking_transfer_0.application.like("%lution%"),"1").\
                                otherwise("0"))

booking_transfer_0 = booking_transfer_0.withColumn("application_api",
                                F.when(booking_transfer_0.application.like("%api%"),"1").\
                                otherwise("0"))

booking_transfer_0 = booking_transfer_0.withColumn("sales_method_clean",
                                F.when(booking_transfer_0.sales_method.like("%widget%"),"widget").\
                                when(booking_transfer_0.application.like("%CROSS%"),"cross").\
                                when(booking_transfer_0.application.like("%Cross%"),"cross").\
                                when(booking_transfer_0.application.like("%OFF%"),"offline").\
                                otherwise("other"))

booking_transfer_0 = booking_transfer_0.withColumn("master_service_type_clean",
                                F.when(booking_transfer_0.master_service_type_desc.like("%Shared%"),"shared").\
                                when(booking_transfer_0.master_service_type_desc.like("%Private%"),"private").\
                                otherwise("other")) 

booking_transfer_0 = booking_transfer_0.withColumn("product_luxury",
                                F.when(booking_transfer_0.master_product_type_desc.like("%Luxury%"),"1").\
                                when(booking_transfer_0.master_product_type_desc.like("%Premium%"),"1").\
                                otherwise("0"))

booking_transfer_0 = booking_transfer_0.withColumn('international_booking',F.when(booking_transfer_0.destination_country==booking_transfer_0.country_market,lit(1)))


# instiate  and rename features
booking_transfer_0 = booking_transfer_0.withColumn("ttv_eur", booking_transfer_0["ttv_eur"].cast(DoubleType()))
booking_transfer_0 = booking_transfer_0.withColumn("pct_commission", booking_transfer_0["pct_commission"].cast(IntegerType()))
booking_transfer_0 = booking_transfer_0.withColumnRenamed('destination_country_desc','country_destination')


# run basic data quality checks (also in Tableau - iterative process) # remove bookings that cannot be matched to any client (according to documentation)
# booking_transfer = booking_transfer.where(col("fk_client").isNull()).count()
booking_transfer_0 = booking_transfer_0.na.drop(subset=["fk_client"])

# remove not-needed features
keep_list = ['fk_client','destination_country','interface_clean','country_destination','booking_date_clean','ttv_eur','booking_ahead','application_clean','']
booking_transfer_EDA = booking_transfer_0.select([column for column in booking_transfer_0.columns if column in keep_list])


# Store data to HIVE 
#booking_transfer_EDA.printSchema()
#booking_transfer_EDA.select("master_product_type_desc","luxury_pct").show(100)
booking_transfer_EDA.write.mode("overwrite").saveAsTable("coyote.booking_transfer_EDA")


# Store data to booking_transfer_1
# booking_transfer_1.printSchema()
keep_list = ['international_booking','ttv_eur','pct_commission','pk_incident','fk_client','count','booking_ahead','interface_hotelbeds','with_children','application_web','application_api','product_luxury']
booking_transfer_1 = booking_transfer_0.select([column for column in booking_transfer_0.columns if column in keep_list])

<div class="warn">
The above chunck has two outputs:  
1. Hive Table __booking_transfer_EDA__  for EDA in Tableau <br>
2. Dataframe __booking_transfer_1__ for modelling
</div>

## 6. Load, Pre-Process and Incidents | Join  Bookings <a id='6'></a>

In [46]:
# load data and clean column names
incidents = spark.read.csv("/datathon/IE_Challenge_4.IncidentsClaims v2.txt", header=True, sep="#")
for name in incidents.schema.names: incidents = incidents.withColumnRenamed(name, name.lower())
for name in incidents.schema.names: incidents = incidents.withColumnRenamed(name, name.replace(' ', '_'))


# create key to connect bookings and add counter
incidents_0 = incidents.withColumn('fk_incidents',F.concat(col("office_number"), lit(""), col("booking_number"), lit(""), col("service_id")))
incidents_0 = incidents_0.withColumn('count', lit(1))


# remove pre-travel complaints since they are not always negativ / rename features
incidents_0 = incidents_0.filter(~col('life_cycle').isin(["Pre Arrival"]))
incidents_0 = incidents_0.withColumnRenamed('date/time_opened','incident_date')


# remove not-needed features
keep_list = ['fk_incidents','date_time','life_cycle','service_type','count']
incidents_EDA = incidents_0.select([column for column in incidents_0.columns if column in keep_list])


# save to hive
# incidents.printSchema()
incidents_EDA.write.mode("overwrite").saveAsTable("coyote.incidents_EDA")


# Store data to incidents_1 (this df will be added to the booking df and be an input for segmentation)
keep_list = ['fk_incidents','date_time','life_cycle','service_type','count']
incidents_1 = incidents_0.select([column for column in incidents_0.columns if column in keep_list])
incidents_1 = incidents_1.groupBy("fk_incidents").agg({"count": "sum"})
incidents_1 = incidents_1.withColumnRenamed('sum(count)','no_incidents')
incidents_1 = incidents_1.na.drop(subset=["fk_incidents"])

In [47]:
# join booking_transfer_1 and incidents
incidents_1 = incidents_1.alias('incidents_1')
booking_transfer_1 = booking_transfer_1.alias('booking_transfer_1')
booking_transfer_2 = booking_transfer_1.join(incidents_1, booking_transfer_0.pk_incident == incidents_1.fk_incidents, "left")

# run basic checks
# booking_transfer_1.filter(col('no_incidents').isin(["1","2","3"])).select("fk_client","pk_incident","fk_incidents","no_incidents").groupBy("fk_client","fk_incidents").agg({"no_incidents":"sum"}).show()

<div class="warn">
Incidents and Claims was loaded and cleaned. In some cases, there was more than one incident per booking. If the case, number of incidents were aggregated. Furthermore only negative incidents were considered, since number of incidents shall serve as KPI for the customer segmentation and indicate the effort for individual handling. <br>
<br>
__Number of incidents__ was added to  table booking_transfer_1.<br>
Output is dataframe:__booking_transfer_2__.
</div>

##  7. Aggregate Bookings_Transfer and Join: Clients_1 <a id='7'></a>

In [48]:
# add prefix that states origin and helps better understand data
booking_transfer_3 = booking_transfer_2.groupby("fk_client").agg({"with_children":"sum", 
                                                                  "pct_commission":"avg", 
                                                                  "booking_ahead":"avg", 
                                                                  "ttv_eur":"sum",
                                                                  "no_incidents":"sum",
                                                                  "application_web":"sum",
                                                                  "application_api":"sum",
                                                                  "product_luxury":"sum",
                                                                  "count":"sum",
                                                                  "international_booking":"sum"
                                                                  })

for name in booking_transfer_3.schema.names: booking_transfer_3 = booking_transfer_3.withColumnRenamed(name, "booking_transfer_"+name)
for name in booking_transfer_3.schema.names: booking_transfer_3 = booking_transfer_3.withColumnRenamed(name, name.replace('(', '_'))
for name in booking_transfer_3.schema.names: booking_transfer_3 = booking_transfer_3.withColumnRenamed(name, name.replace(')', ''))

In [49]:
# join clients and booking_transfer_3
booking_transfer_3 = booking_transfer_3.alias('booking_transfer_3')
clients_1 = clients_1.alias('clients_1')
clients_2 = clients_1.join(booking_transfer_3, clients_1.pk_client == booking_transfer_3.booking_transfer_fk_client, "right")

# rename columns
for name in clients_2.schema.names: clients_2 = clients_2.withColumnRenamed(name, name.replace('(', '_'))
for name in clients_2.schema.names: clients_2 = clients_2.withColumnRenamed(name, name.replace(')', ''))

#cross-check positive - total ttv didnt change
#clients_2.agg({"sum_ttv_eur_trans": "sum"}).show()

<div class="warn">
In the two above chunk __booking_transfer_2__ was grouped by client (= __booking_transfer_3__) and then merged with the client dataframe.<br>
Output is the dataframe __clients_2__.
</div>

## 8. Load and Pre-Process Data: Bookings Activities <a id='8'></a>

In [50]:
# load data # clean column names
booking_act = spark.read.csv("/datathon/Q1_Activities_Bookings.txt", header=True, sep="|")
for name in booking_act.schema.names: booking_act = booking_act.withColumnRenamed(name, name.lower())
for name in booking_act.schema.names: booking_act = booking_act.withColumnRenamed(name, name.replace(' ', '_'))
booking_act = booking_act.withColumnRenamed('destination_country_desc','country_destination')


# key to match incidents / is unique booking_id at the same time | key to match clients
# booking_act_0 = booking_act_0.withColumn("fk_promo", substring(booking_act_0["service"],-4,4)) #substring to join promotions and tickets table
# booking_act_0 = booking_act_0.withColumn("fk_bundles", substring(booking_act_0["service"],-4,4)) #substring to join promotions and tickets table
booking_act_0 = booking_act.withColumn("fk_client",F.concat(col("client"), lit(""), col("branch")))
booking_act_0 = booking_act_0.withColumn("pk_incident",F.concat(col("incoming_office"), lit(""), col("booking"), lit(""), col("service_order")))
booking_act_0 = booking_act_0.withColumnRenamed('content_factsheet_code','fk_product')

# create new features (dates and simple counter)
#booking_transfer = booking_transfer.withColumn("booking_date_clean", booking_transfer["booking_date_clean"].cast(DateType()))
booking_act_0 = booking_act_0.withColumn('count', lit(1))
booking_act_0 = booking_act_0.withColumn("booking_date_clean",F.substring(booking_act_0.booking_date, 0, 10))
booking_act_0 = booking_act_0.withColumn("service_date_clean",F.substring(booking_act_0.service_date_from, 0, 10))
booking_act_0 = booking_act_0.withColumn("booking_ahead",F.datediff(booking_act_0.service_date_clean, booking_act_0.booking_date_clean))
booking_act_0 = booking_act_0.withColumn("cancellation_date_clean",F.substring(booking_act_0.cancellation_date, 0, 10))
booking_act_0 = booking_act_0.withColumn("cancel_ahead",F.datediff(booking_act_0.service_date_clean, booking_act_0.cancellation_date_clean))
booking_act_0 = booking_act_0.withColumn('international_booking',F.when(booking_act_0.destination_country==booking_act_0.country_market,lit(1)))


# reduce dimensionality when creating new features | named for EDA | numeric for modelling kMeans
booking_act_0 = booking_act_0.withColumn("interface_clean",
                                F.when(booking_act_0.interface_desc.like("%HOTEL%"),"Hotelbeds").\
                                when(booking_act_0.interface_desc.like("%BED%"),"Bedsonline").\
                                otherwise("other"))

booking_act_0 = booking_act_0.withColumn("interface_hotelbeds",
                                F.when(booking_act_0.interface_desc.like("%HOTEL%"),"1").\
                                otherwise("other"))

booking_act_0 = booking_act_0.withColumn("children_clean",
                                F.when(booking_act_0.children>0,"YES").\
                                when(booking_act_0.infants>0,"YES").\
                                otherwise("NO"))

booking_act_0 = booking_act_0.withColumn("with_children",
                                F.when(booking_act_0.children>0,"1").\
                                when(booking_act_0.infants>0,"1").\
                                otherwise("NO"))

booking_act_0 = booking_act_0.withColumn("application_clean",
                                F.when(booking_act_0.application.like("%lution%"),"web").\
                                when(booking_act_0.application.like("%xml%"),"xml").\
                                when(booking_act_0.application.like("%app%"),"xml").\
                                when(booking_act_0.application.like("%api%"),"api").\
                                otherwise("irrelevant"))

booking_act_0 = booking_act_0.withColumn("application_web",
                                F.when(booking_act_0.application.like("%lution%"),"1").\
                                otherwise("0"))

booking_act_0 = booking_act_0.withColumn("application_api",
                                F.when(booking_act_0.application.like("%api"),"1").\
                                otherwise("0"))

booking_act_0 = booking_act_0.withColumn("sales_method_clean",
                                F.when(booking_act_0.sales_method.like("%widget%"),"widget").\
                                when(booking_act_0.application.like("%CROSS%"),"cross").\
                                when(booking_act_0.application.like("%Cross%"),"cross").\
                                when(booking_act_0.application.like("%OFF%"),"offline").\
                                otherwise("other"))


# instantiate data type #drop bookings without assigned customer
booking_act_0 = booking_act_0.na.drop(subset=["fk_client"])
booking_act_0 = booking_act_0.withColumn("ttv_eur", booking_act_0["ttv_eur"].cast(FloatType()))
booking_act_0 = booking_act_0.withColumn("pct_commission", booking_act_0["pct_commission"].cast(FloatType()))
#booking_act = booking_act.withColumn('booking_date', func(col('booking_date')))
#booking_act = booking_act.withColumn('cancellation_date', func(col('cancellation_date')))
#booking_act = booking_act.withColumn('service_date_from', func(col('service_date_from')))
#booking_act = booking_act.withColumn('service_date_to', func(col('service_date_to')))

<div class="warn">
Boooking Acivities is loaded and cleaned. Output is:  
1. Output: __booking_act_0__ -  intermediate for further processing before EDA and Modelling
</div>

## 9. Join Incidents, Promotional Deals and Bundles to booking_act <a id='9'></a>

In [51]:
# NOTE: Incidents are already cleaned
booking_act_1 = booking_act_0.join(incidents_1, booking_act_0.pk_incident == incidents_1.fk_incidents, "left")

In [52]:
# NOTE: based on key fk_promo promotions are supposed to be joined - since there are very few hits the field is redundant
#check = booking_act.withColumn("fk_promo", substring(booking_act["service"],-4,4)).select("fk_promo")
#check = check.withColumn('count', lit(1))
#check.filter(col('fk_promo').isin(["OFEB","OFNR","OFFC", "OFLD"])).count()
#check.filter(col('fk_promo').isin(["OFEB","OFNR","OFFC", "OFLD"])).count()

In [53]:
# NOTE: based on key fk_promo bundles are supposed to be joined - since there are very few hits the field is redundant
#bundles = spark.read.csv("/datathon/IE_Challenge_7.TicketBundles_service code prefix v2.txt", header=True, sep="#")
#codes_promo = bundles.select("Code").rdd.flatMap(lambda x: x).collect()
#check.filter(col('fk_promo').isin(codes_promo).count()

<div class="warn"> 
Incident data has been added.  
Output: __booking_act_1__.
</div>

## 10. Load, Pre-Process and Join Booking_Act: Products <a id='10'></a>

In [54]:
# load data and clean headers
products = spark.read.csv("/datathon/IE_Challenge_2.ActivitiesContentFactsheet v2.txt", header=True, sep="#")
products = products.withColumnRenamed('Factsheet ID','pk_product').withColumnRenamed('Segmentation - duration','duration')

# introduce dummy features for model / can be used for EDA as well (convert to dimension)
products_0 = products.withColumn('target_couples',F.when(products.Segmentation.like("%Couples%"),lit(1)))
products_0 = products_0.withColumn('target_families',F.when(products_0.Segmentation.like("%Fami%"),lit(1)))
products_0 = products_0.withColumn('target_seniors',F.when(products_0.Segmentation.like("%Sen%"),lit(1)))
products_0 = products_0.withColumn('target_youth',F.when(products_0.Segmentation.like("%You%"),lit(1)))
products_0 = products_0.withColumn('full_day',F.when(products_0.duration.like("%Full day%"),lit(1)))

drop_list = ['Concepts','duration','Segmentation']
products = products.select([column for column in products.columns if column not in drop_list])

In [55]:
# join df booking_act_1
products_0 = products_0.alias('products_0')
booking_act_1 = booking_act_1.alias('booking_act_1')
booking_act_2 = booking_act_1.join(products_0, booking_act_1.fk_product == products_0.pk_product, "left")

<div class="warn"> 
Products were added.  
Output: __booking_act_2__. 
</div>

## 11. Store booking_act_2 to HIVE <a id='11'></a>

In [108]:
# remove not-needed features and save to HIVE for EDA
keep_list = ['fk_client','destination_country','interface_clean','country_destination','booking_date_clean','ttv_eur','booking_ahead','application_clean','full_day','target_families']
booking_act_EDA = booking_act_2.select([column for column in booking_act_2.columns if column in keep_list])
booking_act_EDA.write.mode("overwrite").saveAsTable("coyote.booking_act_EDA")

<div class="warn"> 
DF stored to HIVE for further processing in Tableau. Relevent features were selected.  
Output: __booking_act_EDA__
</div>

## 12. Aggregate Bookings_Activities and Join: Clients_2 <a id='12'></a>

In [57]:
# add prefix that states origin and helps better understand data
booking_act_3 = booking_act_2.groupby("fk_client").agg({"ttv_eur":"sum", 
                                                                  "pct_commission":"avg", 
                                                                  "with_children":"sum", 
                                                                  "no_incidents":"sum",
                                                                  "booking_ahead":"avg",
                                                                  "target_couples":"sum",
                                                                  "target_families":"sum",
                                                                  "target_seniors":"sum",
                                                                  "target_youth":"sum",
                                                                  "full_day":"sum",
                                                                  "application_web":"sum",
                                                                  "application_api":"sum",
                                                                  "international_booking":"sum",
                                                                  "count":"sum"
                                                                  })

for name in booking_act_3.schema.names: booking_act_3 = booking_act_3.withColumnRenamed(name, name.replace('(', '_'))
for name in booking_act_3.schema.names: booking_act_3 = booking_act_3.withColumnRenamed(name, name.replace(')', ''))
for name in booking_act_3.schema.names: booking_act_3 = booking_act_3.withColumnRenamed(name, "booking_act_"+name)

In [58]:
# join clients and booking_act_3
# full join since we dont want loose any sales from transfers (if customer is in list, he has at least transfer sales)
booking_act_3 = booking_act_3.alias('booking_act_3')
clients_2 = clients_2.alias('clients_2')
clients_3 = clients_2.join(booking_act_3, clients_2.pk_client == booking_act_3.booking_act_fk_client, "full")

for name in clients_3.schema.names: clients_3 = clients_3.withColumnRenamed(name, name.replace('(', '_'))
for name in clients_3.schema.names: clients_3 = clients_3.withColumnRenamed(name, name.replace(')', ''))

In [59]:
#### cross-check positive - total ttv didnt change
#clients_3.agg({"sum_ttv_eur_trans": "sum"}).show()
#booking_act.agg({"ttv_eur": "sum"}).show()
#clients_3.agg({"sum_ttv_eur_act": "sum"}).show()

<div class="warn"> 
Booking Activities was added to clients_2 <br>
Output: __clients_3__
</div>

## 13. Load, Pre-Process and Join Clients_3: Demand Transfer <a id='13'></a>

In [60]:
# load data # add column names
demand_transfer = spark.read.csv("/datathon/IE_Challenge_0.Transfers_Demand.txt", header=False, sep="|")
demand_transfer = demand_transfer.withColumnRenamed('_c0','request_date').withColumnRenamed('_c1','request_hour').withColumnRenamed('_c2','customer_id').withColumnRenamed('_c3','customer_branch').withColumnRenamed('_c4','des_interface').withColumnRenamed('_c5','booking_gate'). withColumnRenamed('_c6','pick_up_date'). withColumnRenamed('_c7','pick_up_hour'). withColumnRenamed('_c8','status'). withColumnRenamed('_c9','pick_up_type'). withColumnRenamed('_c10','pick_up_hotel'). withColumnRenamed('_c11','pick_up_terminal'). withColumnRenamed('_c12','pick_up_zone'). withColumnRenamed('_c13','drop_off_type'). withColumnRenamed('_c14','drop_off_hotel'). withColumnRenamed('_c15','drop_off_terminal'). withColumnRenamed('_c16','drop_off_zone'). withColumnRenamed('_c17','pick_up_zone2'). withColumnRenamed('_c18','drop_off_zone2'). withColumnRenamed('_c19','num_adults'). withColumnRenamed('_c20','num_children'). withColumnRenamed('_c21','num_requests'). withColumnRenamed('_c22','avg_products_returned'). withColumnRenamed('_c23','sales_origin')

# create primary key: incident / is unique booking_id at the same time
demand_transfer_0 = demand_transfer.withColumn("fk_client",F.concat(col("customer_id"), lit(""), col("customer_branch")))


# clean features
demand_transfer_0 = demand_transfer_0.withColumn("interface_clean",
                                F.when(demand_transfer_0.des_interface.like("%HOTEL%"),"Hotelbeds").\
                                when(demand_transfer_0.des_interface.like("%BED%"),"Bedsonline").\
                                otherwise("other"))

#demand_act.select('booking_gate').show(1000) # OK - features can be translated as for bookings
demand_transfer_0 = demand_transfer_0.withColumn("application_clean",
                                F.when(demand_transfer_0.booking_gate.like("%lution%"),"web").\
                                when(demand_transfer_0.booking_gate.like("%xml%"),"xml").\
                                when(demand_transfer_0.booking_gate.like("%app%"),"xml").\
                                when(demand_transfer_0.booking_gate.like("%ACT%"),"api").\
                                otherwise("irrelevant"))

# transform data type
#demand_act = demand_act.withColumn('request_date', func(col('request_date')))
#demand_transfer = demand_transfer.withColumn('request_date', func(col('request_date')))
demand_transfer_0 = demand_transfer_0.withColumn("avg_products_returned", demand_transfer_0["avg_products_returned"].cast(DoubleType()))
demand_transfer_0 = demand_transfer_0.withColumn("num_requests", demand_transfer_0["num_requests"].cast(IntegerType()))

In [61]:
# select eda reelevant features and save to HIVE
keep_list = ['fk_client','request_date','interface_clean','application_clean','status','num_requests','avg_products_returned']
demand_transfer_eda = demand_transfer_0.select([column for column in demand_transfer_0.columns if column in keep_list])
demand_transfer_eda.write.mode("overwrite").saveAsTable("coyote.demand_transfer_eda")

In [62]:
# we will consider num_requests and average products returned for the segmentation(booking gate is included from actual bookings) 
demand_transfer_1 = demand_transfer_0.groupBy("fk_client").agg({"num_requests":"sum","avg_products_returned":"avg"})
for name in demand_transfer_1.schema.names: demand_transfer_1 = demand_transfer_1.withColumnRenamed(name, "demand_transfer_"+name)
    
# add information from demand_transfer to clients table
clients_4 = clients_3.join(demand_transfer_1, clients_3.pk_client == demand_transfer_1.demand_transfer_fk_client, "left")
for name in clients_4.schema.names: clients_4 = clients_4.withColumnRenamed(name, name.replace('(', '_'))
for name in clients_4.schema.names: clients_4 = clients_4.withColumnRenamed(name, name.replace(')', ''))

<div class="warn"> 
Demand_Transfer was added to clients_3 <br>
Output: __clients_4__
</div>

## 14. Load, Pre-Process and Join Clients_4: Demand Activities <a id='14'></a>

In [63]:
# load data # add columnnames
demand_act = spark.read.csv("/datathon/IE_Challenge_0.Activities_Demand.txt", header=False, sep="|")
demand_act = demand_act.withColumnRenamed('_c0','request_date').withColumnRenamed('_c1','request_hour').withColumnRenamed('_c2','customer_id').withColumnRenamed('_c3','customer_branch').withColumnRenamed('_c4','des_interface').withColumnRenamed('_c5','booking_gate').withColumnRenamed('_c6','valid_from').withColumnRenamed('_c7','valid_to').withColumnRenamed('_c8','status').withColumnRenamed('_c9','pick_up_zone').withColumnRenamed('_c10','pick_up_hotel').withColumnRenamed('_c11','num_adult').withColumnRenamed('_c12','num_children').withColumnRenamed('_c13','num_requests').withColumnRenamed('_c14','avg_products_returned').withColumnRenamed('_c15','sales_origin')


# create primary key: incident / is unique booking_id at the same time
demand_act_0 = demand_act.withColumn("fk_client",F.concat(col("customer_id"), lit(""), col("customer_branch")))


### clean features
demand_act_0 = demand_act_0.withColumn("interface_clean",
                                F.when(demand_act_0.des_interface.like("%HOTEL%"),"Hotelbeds").\
                                when(demand_act_0.des_interface.like("%BED%"),"Bedsonline").\
                                otherwise("other"))

#demand_act.select('booking_gate').show(1000) # OK - features can be translated as for bookings
demand_act_0 = demand_act_0.withColumn("application_clean",
                                F.when(demand_act_0.booking_gate.like("%lution%"),"web").\
                                when(demand_act_0.booking_gate.like("%xml%"),"xml").\
                                when(demand_act_0.booking_gate.like("%app%"),"xml").\
                                when(demand_act_0.booking_gate.like("%ACT%"),"api").\
                                otherwise("irrelevant"))

# transform data type
#demand_act = demand_act.withColumn('request_date', func(col('request_date')))
demand_act_0 = demand_act_0.withColumn("avg_products_returned", demand_act_0["avg_products_returned"].cast(DoubleType()))
demand_act_0 = demand_act_0.withColumn("num_requests", demand_act_0["num_requests"].cast(IntegerType()))

In [64]:
# select eda reelevant features and save to HIVE
keep_list_demand_act = ['fk_client','request_date','interface_clean','application_clean','status','num_requests','avg_products_returned']
demand_act_eda = demand_act_0.select([column for column in demand_act_0.columns if column in keep_list_demand_act])
demand_act_eda.write.mode("overwrite").saveAsTable("coyote.demand_act_eda")

In [65]:
# we will consider num_requests and average products returned for the segmentation(booking gate is included from actual bookings) 
demand_act_1 = demand_transfer_0.groupBy("fk_client").agg({"num_requests":"sum","avg_products_returned":"avg"})
for name in demand_act_1.schema.names: demand_act_1 = demand_act_1.withColumnRenamed(name, "demand_act_"+name)

# add information from demand_transfer to clients table
clients_5 = clients_4.join(demand_act_1, clients_4.pk_client == demand_act_1.demand_act_fk_client, "left")
for name in clients_5.schema.names: clients_5 = clients_5.withColumnRenamed(name, name.replace('(', '_'))
for name in clients_5.schema.names: clients_5 = clients_5.withColumnRenamed(name, name.replace(')', ''))

<div class="warn"> 
Booking Activities was added to clients_4 <br>
Output: __clients_5__
</div>

## 15. Consolidate Data for Model <a id='15'></a>

In [87]:
# instantiate features - for efficient processing / numeric to run alg. calculations
clients_final = clients_5
clients_final = clients_final.withColumn("min_customer_since", clients_final["min_customer_since"].cast(IntegerType()))

clients_final = clients_final.withColumn("booking_transfer_sum_count", clients_final["booking_transfer_sum_count"].cast(IntegerType()))
clients_final = clients_final.withColumn("booking_transfer_sum_ttv_eur", clients_final["booking_transfer_sum_ttv_eur"].cast(FloatType()))
clients_final = clients_final.withColumn("booking_transfer_sum_product_luxury", clients_final["booking_transfer_sum_product_luxury"].cast(IntegerType()))
clients_final = clients_final.withColumn("booking_transfer_avg_pct_commission", clients_final["booking_transfer_avg_pct_commission"].cast(FloatType()))
clients_final = clients_final.withColumn("booking_transfer_sum_with_children", clients_final["booking_transfer_sum_with_children"].cast(IntegerType()))
clients_final = clients_final.withColumn("booking_transfer_avg_booking_ahead", clients_final["booking_transfer_avg_booking_ahead"].cast(IntegerType()))
clients_final = clients_final.withColumn("booking_transfer_sum_no_incidents", clients_final["booking_transfer_sum_no_incidents"].cast(IntegerType()))
clients_final = clients_final.withColumn("booking_transfer_sum_application_api", clients_final["booking_transfer_sum_application_api"].cast(IntegerType()))
clients_final = clients_final.withColumn("booking_transfer_sum_application_web", clients_final["booking_transfer_sum_application_web"].cast(IntegerType()))

clients_final = clients_final.withColumn("booking_act_sum_count", clients_final["booking_act_sum_count"].cast(IntegerType()))
clients_final = clients_final.withColumn("booking_act_sum_ttv_eur", clients_final["booking_act_sum_ttv_eur"].cast(FloatType()))
clients_final = clients_final.withColumn("booking_act_avg_pct_commission", clients_final["booking_act_avg_pct_commission"].cast(FloatType()))
clients_final = clients_final.withColumn("booking_act_sum_with_children", clients_final["booking_act_sum_with_children"].cast(IntegerType()))
clients_final = clients_final.withColumn("booking_act_avg_booking_ahead", clients_final["booking_act_avg_booking_ahead"].cast(IntegerType()))
clients_final = clients_final.withColumn("booking_act_sum_target_youth", clients_final["booking_act_sum_target_youth"].cast(IntegerType()))
clients_final = clients_final.withColumn("booking_act_sum_international_booking", clients_final["booking_act_sum_international_booking"].cast(IntegerType()))
clients_final = clients_final.withColumn("booking_act_sum_target_seniors", clients_final["booking_act_sum_target_seniors"].cast(IntegerType()))
clients_final = clients_final.withColumn("booking_act_sum_target_families", clients_final["booking_act_sum_target_families"].cast(IntegerType()))
clients_final = clients_final.withColumn("booking_act_sum_full_day", clients_final["booking_act_sum_full_day"].cast(IntegerType()))
clients_final = clients_final.withColumn("booking_act_sum_no_incidents", clients_final["booking_act_sum_no_incidents"].cast(IntegerType()))
clients_final = clients_final.withColumn("booking_act_sum_target_couples", clients_final["booking_act_sum_target_couples"].cast(IntegerType()))

clients_final = clients_final.withColumn("demand_transfer_sum_num_requests", clients_final["demand_transfer_sum_num_requests"].cast(IntegerType()))
clients_final = clients_final.withColumn("demand_transfer_avg_avg_products_returned", clients_final["demand_transfer_avg_avg_products_returned"].cast(FloatType()))

clients_final = clients_final.withColumn("demand_act_sum_num_requests", clients_final["demand_act_sum_num_requests"].cast(IntegerType()))
clients_final = clients_final.withColumn("demand_act_avg_avg_products_returned", clients_final["demand_act_avg_avg_products_returned"].cast(FloatType()))

clients_final = clients_final.na.fill(0)

In [88]:
# create totals from ttv and count plus its ratios
clients_final = clients_final.withColumn("total_ttv",clients_final.booking_transfer_sum_ttv_eur + clients_final.booking_act_sum_ttv_eur)
clients_final = clients_final.withColumn("share_transfer_ttv",clients_final.booking_transfer_sum_ttv_eur / clients_final.total_ttv)
clients_final = clients_final.withColumn("total_bookings_count",clients_final.booking_transfer_sum_count + clients_final.booking_act_sum_count)
clients_final = clients_final.withColumn("bookings_per_day",clients_final.total_bookings_count / clients_final.min_customer_since )

# create ratio: average_ttv per booking
clients_final = clients_final.withColumn("ttv_per_booking",clients_final.total_ttv / clients_final.total_bookings_count)

# create total and ratio: number of incidents
clients_final = clients_final.withColumn("total_incidents_count",clients_final.booking_transfer_sum_no_incidents + clients_final.booking_act_sum_no_incidents)
#clients_final = clients_final.withColumn("incidents_per_day",clients_final.total_incidents_count / clients_final.min_customer_since)
clients_final = clients_final.withColumn("incidents_per_booking",clients_final.total_incidents_count / clients_final.total_bookings_count)


# sum up number of requests, in order to identify highly demanding customers, build ratio: requests / bookings
clients_final = clients_final.withColumn("total_requests_count",clients_final.demand_transfer_sum_num_requests + clients_final.demand_act_sum_num_requests)
clients_final = clients_final.withColumn("requests_per_booking",clients_final.total_requests_count / clients_final.total_bookings_count)
clients_final = clients_final.withColumn("requests_since_first_booking",clients_final.total_requests_count / clients_final.min_customer_since)

# sum up booking channel and build ration per total number of bookings, remove subcategories
clients_final = clients_final.withColumn("share_booking_api",(clients_final.booking_transfer_sum_application_api + clients_final.booking_act_sum_application_api) / clients_final.total_bookings_count) 
clients_final = clients_final.withColumn("share_booking_web",(clients_final.booking_transfer_sum_application_web + clients_final.booking_act_sum_application_web) / clients_final.total_bookings_count) 

# sum up total international bookings and build ration per total number of bookings, remove subcategories
clients_final = clients_final.withColumn("share_international_booking",(clients_final.booking_act_sum_international_booking + clients_final.booking_transfer_sum_international_booking) / clients_final.total_bookings_count) 

# build ratio of total bookings (used for activities only)
clients_final = clients_final.withColumn("share_booking_children",clients_final.booking_act_sum_with_children / clients_final.total_bookings_count)
clients_final = clients_final.withColumn("share_booking_full_day",clients_final.booking_act_sum_full_day / clients_final.total_bookings_count)
clients_final = clients_final.withColumn("share_booking_target_couple",clients_final.booking_act_sum_target_couples / clients_final.total_bookings_count)
clients_final = clients_final.withColumn("share_booking_target_families",clients_final.booking_act_sum_target_families / clients_final.total_bookings_count)
clients_final = clients_final.withColumn("share_booking_target_youth",clients_final.booking_act_sum_target_youth / clients_final.total_bookings_count)
clients_final = clients_final.withColumn("share_booking_target_seniors",clients_final.booking_act_sum_target_seniors / clients_final.total_bookings_count)


# build ratio of total bookings (used for transfer only)
clients_final = clients_final.withColumn("share_booking_transfer_luxury",clients_final.booking_transfer_sum_product_luxury / clients_final.total_bookings_count)


# dont average, danger of a client only booking one category
# |-- booking_act_avg_booking_ahead: integer (nullable = true)
# |-- booking_transfer_avg_booking_ahead: integer (nullable = true)

# dont average, different business   
# |-- booking_transfer_avg_pct_commission: float (nullable = false)
# |-- booking_act_avg_pct_commission: float (nullable = false)

In [None]:
# remove features (in an iterative process with EDA in Tableau)
drop_list = ['booking_transfer_sum_no_incidents','booking_act_sum_no_incidents',
             'demand_act_sum_num_requests','demand_transfer_sum_num_requests',
             'demand_transfer_avg_avg_products_returned','demand_act_avg_avg_products_returned',
             'booking_act_sum_with_children','booking_act_sum_full_day',
             'booking_transfer_sum_application_api','booking_act_sum_application_api',
             'booking_transfer_sum_application_web','booking_act_sum_application_web',                
             'booking_act_sum_international_booking','booking_act_sum_international_booking',
             'booking_act_sum_target_couples','booking_act_sum_target_families',
             'booking_act_sum_target_youth','booking_act_sum_target_seniors',
             'booking_transfer_sum_with_children','booking_transfer_sum_product_luxury',
             'booking_act_fk_client','demand_transfer_fk_client','demand_act_fk_client']
             
clients_final = clients_final.select([column for column in clients_final.columns if column not in drop_list])           
clients_final.write.mode("overwrite").saveAsTable("coyote.clients_final")
#clients_final.printSchema()

<div class="warn"> 
Features were selected, consolidated and combined <br>
Output: __clients_final__
</div>

## 16. High-level validation <a id='16'></a>

In [69]:
## all relevant ttv from booking_transfer is covered
#clients_5.select("booking_transfer_sum_ttv_eur").agg({"booking_transfer_sum_ttv_eur":"sum"}).show()
#booking_transfer_0.select("ttv_eur").agg({"ttv_eur":"sum"}).show()

In [70]:
## all relevant ttv from booking_act is covered
#clients_5.select("booking_act_sum_ttv_eur").agg({"booking_act_sum_ttv_eur":"sum"}).show()
#booking_act_0.select("ttv_eur").agg({"ttv_eur":"sum"}).show()

In [71]:
## final ttv equals sum of transfer and activity
#clients_final.select("total_ttv").agg({"total_ttv":"sum"}).show()
#clients_final.select("booking_transfer_sum_ttv_eur","booking_act_sum_ttv_eur","total_ttv").show(100)

<div class="warn"> 
Everything alright, other checks run within code </div>

## 17. Code - Backlog <a id='17'></a>

<div class="warn"> 
Please skip reading, for my personal try and error only. </div>

In [72]:
# Within this chunk total TTV per customer by Activity vs. Transfer is calculates
# in a second step client data is joined

#clients = clients.alias('clients')
#total_transfer = booking_transfer.select("CLIENT","TTV_TRANS").groupby("CLIENT").agg({"TTV_TRANS":"sum"}).withColumnRenamed('sum(TTV_TRANS)','total_trans')
#total_act = booking_act.select("CLIENT","TTV_ACT").groupby("CLIENT").agg({"TTV_ACT":"sum"}).withColumnRenamed('sum(TTV_ACT)','total_act')                            

In [73]:
#act_totalTTV = booking_act.select("CLIENT","TTV").groupBy("CLIENT").agg({"TTV":"sum"}).orderBy('sum(TTV)', ascending=False).toPandas()
#act_totalTTV.boxplot(column=None, by=None, ax=None, fontsize=None, rot=0, grid=True, figsize=None, layout=None, return_type=None)

#act_cumTTV = act_totalTTV.cumsum(axis=None, dtype=None, out=None, skipna=True)
#act_cumTTV.plot(x=None, y=None, kind='line', ax=None, subplots=True)

In [74]:
#Duration = products.select("duration_clean").groupby("duration_clean").agg({"duration_clean":"count"}).toPandas()

#fig = plt.figure()
#ax = fig.add_subplot(111)

#objects = ['Other', 'Half Day', 'Full Day']
#x = list(range(len(objects)))
#y = Duration['count(duration_clean)'].tolist()

#plt.bar(x, y, align='center', alpha=0.5)
#plt.xticks(y_pos, objects)
#plt.ylabel('Number of Bookings')
#plt.title('Distribution of Duration')
#plt.show()

In [75]:
#demand_act.registerTempTable("demand_act")
#spark.sql("Select mean(num_children) from demand_act").show()

In [76]:
#%matplotlib inline
#import matplotlib.pyplot as plt

#bycountry = booking_act.groupby("COUNTRY_MARKET_DESC").agg({"PCT_COMMISSION":"mean"}).toPandas().sort_values("avg(PCT_COMMISSION)", axis=0, ascending=False).head(5)

#plt.style.use('ggplot') #looks better -compare R Library
#bycountry.plot(kind='bar')


### Segmenting Customers (Redundant - Model is run in Tableau (more convenient, better visulaizations, quick))

We have three Options/Libraries to run a ML-Model:
1. ML is generally built around dataframes - we use Spark 2.0 and process data sql.df (preferred)
2. MLLIB is built around RDDs
3. Sckit-Learn with Pandas

Using spark.ml is recommended because with DataFrames the API is more versatile and flexible. But we will keep supportingspark.mllib along with the development of spark.ml. Users should be comfortable using spark.mllib features and expect more features coming. Developers should contribute new algorithms to spark.ml if they fit the ML pipeline concept well, e.g., feature extractors and transformers.

In [77]:
#from pyspark.ml.clustering import KMeans
#from pyspark.ml.feature import VectorAssembler

#df = forK #only numeric input for kMeans
#cols = df.columns
#vectorAss = VectorAssembler(inputCols=cols, outputCol="features") #inputCols=["CHILDREN","ADULTS"]
#vdf = vectorAss.transform(df)
#vdf = vdf.select('features')
#vdf = vdf.limit(10)

#kmeans = KMeans(k=2, maxIter=5, seed=1)
#kmm = kmeans.fit(vdf)
#centers = kmm.clusterCenters()