In [2]:
import datetime
from datetime import date
import pandas as pd
import numpy as np

from pyspark import SparkContext
from pyspark import SQLContext
from pyspark.sql.session import SparkSession


from pyspark.sql import DataFrameStatFunctions as statFunc

from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
from pyspark.sql.functions import desc

from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.types import IntegerType

## 1 Collect search data at hotel level

Collect hotel data from hdfs. Rate amentity and commission for each hotel are collected as array. The reason that I  collected amentity and commission separatly was that for commission, GDS rates all NULL. Therefore, when doing group by at the rate level to collect amnentity, GDS rates would be lost. I added one more step for commission collection, that was to first fill NULL and then doing the group by at rate level and hotel level.

### 1.1 Collect amentity data

In [None]:
# 1) group by at rate level to collect amentity discription for each rate
# 2) group by at the hotel level to collect amentities for each hotel
'''
use eandev;
drop table if exists eandev.yixli_src_amentity_2019;
create table eandev.yixli_src_amentity_2019 as
select
 a.message_id
,a.message_date
,a.check_in_date
,a.check_out_date
,a.tuid
,a.hotel_id
,a.hotel_result_index
,a.city
,a.star_rating
,a.filter_want_in_policy_rates_only
,array_contains(a.filter_rate_amenities,"EligibleForLoyalty") as  filter_eligible_for_loyalty
,array_contains(a.filter_rate_amenities,"FreeBreakfast") as  filter_free_breakfast
,array_contains(a.filter_rate_amenities,"FreeWifi") as  filter_free_wifi
,array_contains(a.filter_rate_amenities,"FreeParking") as  filter_free_parking
,concat_ws("|",collect_list(cast(a.rate_index as string))) as rate_index
,concat_ws("|",collect_list(cast(a.rate_amount as string))) as rate_amount
,concat_ws("|",collect_list(cast(a.rate_currency as string))) as rate_currency
,concat_ws("|",collect_list(a.rate_type)) as rate_type
,concat_ws("|",collect_list(cast(array_contains(a.amnts,"EligibleForLoyalty") as string))) as eligible_for_loyalty
,concat_ws("|",collect_list(cast(array_contains(a.amnts,"FreeBreakfast") as string))) as free_breakfast
,concat_ws("|",collect_list(cast(array_contains(a.amnts,"FreeWifi") as string))) as free_wifi
,concat_ws("|",collect_list(cast(array_contains(a.amnts,"FreeParking") as string))) as free_parking
,concat_ws("|",collect_list(cast(array_contains(a.amnts,"Refundable") as string))) as refundable
from (
select
 message_id
,message_timestamp as message_date
,check_in_date
,check_out_date
,filter_rate_amenities
,filter_want_in_policy_rates_only
,tuid
,v1.hotel_id
,v1.hotel_result_index as hotel_result_index
,CONCAT(lower(v1.city), '-',lower(v1.country_code))  as city
,v1.star_rating
,v2.rate_index
,v2.price.amount as rate_amount
,v2.price.currency as rate_currency
,(case when v2.rate_type = 'GDS_CHAIN_NEGOTIATED' then 'NEG'
  when v2.rate_type = 'GDS_CONSORTIA' then 'GDS'
  when v2.rate_type = 'GDS_NEGOTIATED' then 'NEG'
  when v2.rate_type = 'GDS_PUBLISHED' then 'GDS'
  when v2.rate_type = 'EPR_MERCHANT' then 'EPRM'
  when v2.rate_type = 'ESR_DIRECT_AGENCY' then 'ESRA'
  when v2.rate_type = 'EPR_DIRECT_AGENCY' then 'EPRA'
  when v2.rate_type = 'ESR_MERCHANT' then 'ESRM'
  else v2.rate_type end) as rate_type
,collect_list(v3.amenity_description) as amnts
from eandev.HotelMessages
lateral view explode(travelers) travl_tbl as tuid
lateral view explode(hotels) hotel_tbl as v1
lateral view explode(v1.prices) rate_tbl as v2
lateral view explode(v2.rate_amenities) amt_tbl as v3
where load_date between '20191001' and '20200101'
and message_id is not null
and message_id <> ''
and v2.selected = True
and type = 'HSR'
group by message_id, message_timestamp, check_in_date, check_out_date, filter_rate_amenities,
filter_want_in_policy_rates_only, tuid, v1.hotel_id, v1.hotel_result_index,v1.city, v1.country_code, v1.star_rating, v2.rate_index,v2.price.amount,v2.price.currency,
v2.rate_type
) as a
group by a.message_id, a.message_date, a.check_in_date, a.check_out_date,
a.filter_rate_amenities, a.filter_want_in_policy_rates_only, a.tuid, a.hotel_id,a.hotel_result_index, a.city, a.star_rating
'''

### 1.2 Collect commission data

In [None]:
# 1) fill the NULL for commission data
# 2) group by at rate level to obtain average commission for each rate
# 3) group by at the hotel level to collect commission for each hotel
'''
use eandev;
drop table if exists eandev.yixli_search_commission_2019_v3;
create table eandev.yixli_search_commission_2019_v3 as   
select
a.message_id
,a.message_date
,a.check_in_date
,a.check_out_date
,a.tuid
,a.hotel_id
,a.hotel_result_index
,concat_ws("|",collect_list(cast(a.rate_index as string))) as rate_index
,concat_ws("|",collect_list(cast(NVL(a.commission_base, 0.0) as string))) as commission_base
,concat_ws("|",collect_list(cast(NVL(a.commission_tax, 0.0) as string))) as commission_tax
,concat_ws("|",collect_list(cast(NVL(a.commission_currency, 'USD') as string))) as commission_currency
from
(
select
t.message_id
,t.message_date
,t.check_in_date
,t.check_out_date
,t.tuid
,t.hotel_id
,t.hotel_result_index
,t.rate_index
,t.rate_type
,avg(commission_base) as commission_base
,avg(commission_tax) as commission_tax
,t.commission_currency
from
(select 
message_id
,message_timestamp as message_date
,check_in_date
,check_out_date
,tuid
,v1.hotel_id
,v1.hotel_result_index
,v2.rate_index
,(case when v2.rate_type = 'GDS_CHAIN_NEGOTIATED' then 'NEG'
  when v2.rate_type = 'GDS_CONSORTIA' then 'GDS'
  when v2.rate_type = 'GDS_NEGOTIATED' then 'NEG'
  when v2.rate_type = 'GDS_PUBLISHED' then 'GDS'
  when v2.rate_type = 'EPR_MERCHANT' then 'EPRM'
  when v2.rate_type = 'ESR_DIRECT_AGENCY' then 'ESRA'
  when v2.rate_type = 'EPR_DIRECT_AGENCY' then 'EPRA'
  when v2.rate_type = 'ESR_MERCHANT' then 'ESRM'
  else v2.rate_type end) as rate_type
,NVL(v3.commission_base, 0.0) as commission_base
,NVL(v3.commission_tax, 0.0) as commission_tax
,NVL(v3.currency, 'USD') as commission_currency
from eandev.HotelMessages
lateral view explode(travelers) travl_tbl as tuid
lateral view explode(hotels) hotel_tbl as v1
lateral view explode(v1.prices) rate_tbl as v2
lateral view outer explode(v2.commission_list) cms_tbl as v3 
where load_date between '20191001' and '20200101'
and message_id is not null 
and message_id <> ''
and v2.selected = True
and type = 'HSR') as t
group by message_id, message_date, check_in_date, check_out_date,tuid, hotel_id,hotel_result_index,rate_index,rate_type,commission_currency) as a
group by message_id, message_date, check_in_date, check_out_date,tuid, hotel_id,hotel_result_index
'''

### 1.3 Filter hotels below PL

In [None]:
# find hotel result index of the PL
'''
use eandev; 
drop table if exists eandev.yixli_last_idx_peterman_line_2019;
create table eandev.yixli_last_idx_peterman_line_2019 as    
select  
  a.message_id,
  IF(b.message_id is null, 0, b.last_idx_peterman_line) as last_idx_peterman_line,
  IF(c.message_id is null, 0, c.last_hotel_result_index) as last_hotel_result_index
from eandev.HotelMessages as a 
left join (
  select  
   message_id
  ,MAX(v1.hotel_result_index) as last_idx_peterman_line
  from eandev.HotelMessages 
  lateral view explode(hotels) hotel_tbl as v1
  where load_date between '20191001' and '20200101'
  and ((v1.property_level_neg = true) or (v1.chain_level_neg = true) or (v1.preferred = true))
  and message_id is not null 
  and message_id <> ''
  and ((original_message_id is null) or (original_message_id == '')) 
  and type = 'HSR'  
  group by message_id
) as b ON a.message_id = b.message_id
left join (
  select  
   message_id
  ,MAX(v1.hotel_result_index) as last_hotel_result_index
  from eandev.HotelMessages 
  lateral view explode(hotels) hotel_tbl as v1
  where load_date between '20191001' and '20200101'
  and message_id is not null 
  and message_id <> ''
  and ((original_message_id is null) or (original_message_id == '')) 
  and type = 'HSR'  
  group by message_id
) as c ON a.message_id = c.message_id
where a.load_date between '20191001' and '20200101'
and a.message_id is not null 
and a.message_id <> ''
and ((original_message_id is null) or (original_message_id == '')) 
and a.type = 'HSR'
'''

In [None]:
# filter hotels below PL for amentity df
'''
use eandev; 
drop table if exists eandev.yixli_src_amentity_2019_pl;
create table eandev.yixli_src_amentity_2019_pl as  
  select b.*
  from yixli_src_amentity_2019 as b
join yixli_last_idx_peterman_line_2019 as c on (b.message_id=c.message_id)
where (b.hotel_result_index>c.last_idx_peterman_line)
order by b.message_id, b.hotel_result_index
'''

In [None]:
# filter hotels below PL for commission df
'''
use eandev; 
drop table if exists eandev.yixli_search_commission_2019_pl;
create table eandev.yixli_search_commission_2019_pl as  
  select b.*
  from yixli_search_commission_2019_v3 as b
join yixli_last_idx_peterman_line_2019 as c on (b.message_id=c.message_id)
where (b.hotel_result_index>c.last_idx_peterman_line)
order by b.message_id, b.hotel_result_index
'''

### 1.4 Filter last search

In [None]:
# filter only the last search data for amentity df
# count = 75682358
'''
use eandev; 
drop table if exists eandev.yixli_last_src_amentity_2019_pl;
create table eandev.yixli_last_src_amentity_2019_pl as  
select b.*
from
(select a.*
,row_number() over(partition by hotel_id,check_in_date,check_out_date,tuid order by message_date desc) as rn
from yixli_src_amentity_2019_pl as a) as b
where rn=1
'''

In [None]:
# filter only the last search data for commission df
# count = 75881436
'''
use eandev; 
drop table if exists eandev.yixli_last_src_commission_2019_pl;
create table eandev.yixli_last_src_commission_2019_pl as  
select b.*
from
(select a.*
,row_number() over(partition by hotel_id,check_in_date,check_out_date,tuid order by message_date desc) as rn
from yixli_search_commission_2019_pl as a) as b
where rn=1
'''

In [None]:
# transfer from hdfs to s3
'''
ssh Chwxedwhdc002.datawarehouse.expecn.com

hive -e "use eandev; INSERT OVERWRITE LOCAL DIRECTORY '/home/yixli/temp' ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' select * from yixli_last_src_commission_2019_pl1;"&

cd temp
ls | xargs -I '{}' -n1 sed -i 's/\\N//g' '{}' &
cd ..
cat temp/* >last_src_commission_2019_pl.csv


~/.linuxbrew/bin/aws_key_gen login

aws s3 cp last_src_commission_2019_pl.csv s3://ege-ds-workshops-corp/yixli/data_preparation/last_src_commission_2019_pl.csv

'''

'''
ssh Chwxedwhdc002.datawarehouse.expecn.com

hive -e "use eandev; INSERT OVERWRITE LOCAL DIRECTORY '/home/yixli/temp' ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' select * from yixli_last_src_amentity_2019_pl;"&

cd temp
ls | xargs -I '{}' -n1 sed -i 's/\\N//g' '{}' &
cd ..
cat temp/* >last_src_amentity_2019_pl.csv


~/.linuxbrew/bin/aws_key_gen login

aws s3 cp last_src_amentity_2019_pl.csv s3://ege-ds-workshops-corp/yixli/data_preparation/last_src_amentity_2019_pl.csv

'''

## 2 Find hotel booking index

### 2.1 Hotel df 

In [None]:
# inner join hotels from amentity df and commission df to obtain hotel df
# hotels below PL in last search: count = 75669163
'''
use eandev; 
drop table if exists eandev.hotels_2019_pl;
create table eandev.hotels_2019_pl as
select distinct *
from(
(select distinct message_id, tuid, hotel_id,check_in_date, check_out_date,hotel_result_index,message_date
from yixli_last_src_commission_2019_pl) as a
inner join (select distinct message_id, tuid, hotel_id,check_in_date, check_out_date,hotel_result_index,message_date
from yixli_last_src_amentity_2019_pl) as b 
on (a.message_id=b.message_id 
    and  a.tuid=b.tuid
    and a.hotel_id=b.hotel_id
    and a.check_in_date=b.check_in_date
    and a.check_out_date=b.check_out_date
    and a.hotel_result_index=b.hotel_result_index
    and a.message_date=b.message_date))
'''

In [None]:
# transfer from hdfs to s3
'''
ssh Chwxedwhdc002.datawarehouse.expecn.com

hive -e "use eandev; INSERT OVERWRITE LOCAL DIRECTORY '/home/yixli/temp' ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' select * from hotels_2019_pl;"&

cd temp
ls | xargs -I '{}' -n1 sed -i 's/\\N//g' '{}' &
cd ..
cat temp/* >hotels_2019_pl.csv


~/.linuxbrew/bin/aws_key_gen login

aws s3 cp hotels_2019_pl.csv s3://ege-ds-workshops-corp/yixli/data_preparation/hotels_2019_pl.csv

'''

In [11]:
# Read hotel df
search_schema = T.StructType([
    T.StructField("message_id", T.StringType(), True),    
     T.StructField("tuid", T.IntegerType(), True),   
     T.StructField("hotel_id", T.IntegerType(), True),                                                                           
    T.StructField("check_in_date", T.DateType(), True),                                 
    T.StructField("check_out_date", T.DateType(), True),                                
     T.StructField("hotel_result_index", T.IntegerType(), True),    
     T.StructField("message_date", T.TimestampType(), True)])

file_loc = "s3://ege-ds-workshops-corp/yixli/data_preparation/hotels_2019_pl.csv"
hotel_df = sqlContext.read.format('csv').\
            options(header='false', inferSchema='false', delimiter=',').\
            schema(search_schema).load(file_loc)

# load_date between '20191001' and '20200101'

print(hotel_df.count())

In [13]:
hotel_df.show(50)

+--------------------+--------+--------+-------------+--------------+------------------+--------------------+
|          message_id|    tuid|hotel_id|check_in_date|check_out_date|hotel_result_index|        message_date|
+--------------------+--------+--------+-------------+--------------+------------------+--------------------+
|00000ba1-bb93-41f...|20554265|15980970|   2019-11-05|    2019-11-06|                12|2019-11-01 10:24:...|
|00001dd6-89d5-49f...| 1398050|    9364|   2019-12-09|    2019-12-11|                11|2019-11-21 08:58:...|
|00001f75-62c8-462...|17921103|  799712|   2019-11-18|    2019-11-22|                14|2019-11-14 02:29:...|
|00002bac-aab1-471...|20387317|  598773|   2019-10-28|    2019-10-29|                 9|2019-10-18 06:34:...|
|00002bde-1b21-4d0...|20015413|  283121|   2019-11-19|    2019-11-21|                12|2019-11-18 16:15:...|
|000034e7-421a-423...| 8330241| 3773290|   2019-12-16|    2019-12-17|                 4|2019-12-12 00:45:...|
|00003c1e-

In [14]:
# create hotel index  
hotel_df = hotel_df.withColumn('hotel_index',row_number().over(Window.partitionBy("message_id", "tuid").\
                                                        orderBy(F.asc("hotel_result_index"))))

### 2.2 Booking data

In [5]:
# collect booking data from Redshift
sqlBookings = """SELECT 
      a.hotelfactid,
      a.hotelid as hotel_id,
     trunc(a.issuedate) as issue_date,
     trunc(a.traveldatestart) as check_in_date,
     trunc(a.traveldateend) as check_out_date,
     hsp.quarter as quarter,
      (case when a.customersystemid=1 then a.TUIDTraveler else g.TUIDInternal end) as tuid,      
      a.travelproductid AS travel_product_id,
      a.GroupAccountID AS group_account_id,
      hsp.rate_type AS hotel_rate_type,
      (case  
        when a.hotelrateTypeSupplyid IN (1,2,3,4,5,10,11,14,16,17,18,20,25,26,28,29,30,32) then 'GDS'
        when a.hotelrateTypeSupplyid IN (12,13,15,19,21,22,23,27,31,33,34) then 'NEG'
        when a.hotelrateTypeSupplyid = 6 then 'ESRA'
        when a.hotelrateTypeSupplyid = 7 then 'ESRM'
        when a.hotelrateTypeSupplyid = 8 then 'EPRA'
        when a.hotelrateTypeSupplyid = 9 then 'EPRM'
        when a.hotelrateTypeSupplyid IN (24,35) then 'HOTMIP'
        else 'UNK'
        end) as bk_rate_type,
      CASE WHEN a.OnlineBool = 1 THEN 'online' ELSE 'offline' END AS booking_type,
      a.bookingamtgross::DOUBLE precision*COALESCE(ex.exchangerate,1::DOUBLE precision) AS gross_booking_value_usd,
      a.bookingamtcommissionest::DOUBLE precision*COALESCE(ex.exchangerate,1::DOUBLE precision)
        + a.bookingamtgross::DOUBLE precision*COALESCE(ex.exchangerate,1::DOUBLE precision)*NVL(hsp.commission,0.00) AS total_commission,
      a.bookingamtmargin::DOUBLE precision*COALESCE(ex.exchangerate,1::DOUBLE precision) AS total_markup,
      4.3 AS total_gds,
      NVL(total_commission,0.00) + NVL(total_markup,0.00) + NVL(total_gds,0.00) AS total_supply_revenue
FROM public.HotelFact a
  JOIN public.ExchangeRateDailySubset ex
    ON a.IssueDateTimeID = ex.TimeID AND a.CurrencyCode = ex.FromCurrencyCode AND ex.ToCurrencyCode = 'USD'
  JOIN TravelerAccountDim g 
    ON a.TUIDTraveler = g.TUID AND a.customersystemid=g.customersystemid 
  JOIN public.GroupAccountdim d 
    ON a.GroupAccountID = d.GroupAccountID 
  JOIN public.hotel_supply hsp 
    ON hsp.year = EXTRACT (year FROM a.IssueDate)
    AND hsp.quarter = EXTRACT (quarter FROM a.IssueDate)
    AND hsp.rate_type = CASE
         WHEN a.HotelRateTypeSupplyID IN (7,9,24) THEN 'Expedia Collect'
         WHEN a.HotelRateTypeSupplyID IN (6,8,35) THEN 'Hotel Collect'
         WHEN a.HotelRateTypeSupplyID IN (12,13,15,19,21,22,23,27,31,33,34) THEN 'Negotiated'
         ELSE 'Published & GDS' END
    AND hsp.travel_product_id = a.TravelProductID
WHERE a.CustomerSystemID IN (1,2)
AND   (d.groupaccountinternaltypeid = 1 OR d.groupaccountinternaltypeid = 3)
AND a.BookingTypeID in (1,3) 
and a.issuedate>=to_date('20191001','YYYYMMDD') 
and a.issuedate<=to_date('20200101','YYYYMMDD')""".replace('\n',' ')

connection_string = "jdbc:redshift://egencia-reporting.czjkedodj6lc.us-west-2.redshift.amazonaws.com:5439/egedatamart?user=ds_rpt_user&password=Jan2019!";

bookings_df = sqlContext.read.\
    format("com.databricks.spark.redshift").\
    option("url", connection_string).\
    option("query", sqlBookings).\
    option("tempdir", "s3a://ege-ds-workshops-corp/yixli/").\
    load()

bookings_df.cache()
bookings_df.count()

1999891

In [None]:
# combine rate type
bookings_df = bookings_df.withColumn('rate_type',F.when(F.col('bk_rate_type')=='HOTMIP','ESRM').otherwise(F.col('bk_rate_type')))

# adjust rate amount and supply revenue by the duration of stay
bookings_df = bookings_df.\
                     withColumn('duration', F.datediff(F.col("check_out_date"),F.col( "check_in_date")).cast(IntegerType()))

bookings_df = bookings_df.\
                     withColumn('avg_book_rate_amount_usd',F.col('gross_booking_value_usd')/F.col( "duration")).\
                     withColumn('bk_avg_commission_usd',F.col('total_commission')/F.col( "duration")).\
                     withColumn('bk_avg_markup_usd',F.col('total_markup')/F.col( "duration")).\
                     withColumn('bk_avg_revenue_usd',F.col('total_supply_revenue')/F.col( "duration"))

# filter outliers
bookings_df = bookings_df.filter(F.col('total_supply_revenue')>0).filter(F.col('total_supply_revenue')<1000)

### 2.3 Match last search with bookings

In [28]:
# find bk_hotel_index
book_hotel_idx =hotel_df.join(bookings_df.select("hotel_id","check_in_date", "check_out_date","tuid","issue_date")
                                       ,["hotel_id","check_in_date", "check_out_date","tuid"],how='inner').\
                                       withColumnRenamed("hotel_index","bk_hotel_index").\
                                        select("message_id","tuid","bk_hotel_index").dropDuplicates()
# Add bk_hotel_index back to hotel df
hotel_df_with_bk_idx = hotel_df.join(book_hotel_idx,["message_id","tuid"])
hotel_df_with_bk_idx.count()

709268

In [32]:
# save hotel df with bk_hotel_index
dir = 's3://ege-ds-workshops-corp/yixli/data_preparation/'

datestamp = datetime.datetime.now().strftime('%m-%d-%Y')
hotel_df_with_bk_idx.repartition(1).write.mode('overwrite').parquet(dir+'hotel_df_with_bk_idx')

In [3]:
hotel_df_with_bk_idx = sqlContext.read.parquet('s3://ege-ds-workshops-corp/yixli/data_preparation/hotel_df_with_bk_idx')

## 3 Add hotel sorting scores

The hotel data from 10/01/2019 to 01/01/2020 doesn't include score. So I added the score from API.

In [None]:
'''
sudo pip-3.6 install git+https://github.expedia.biz/caldanarosenbe/datascience-fast-hotel-search-data.git@revenue_sort
'''

In [5]:
from fast_hotel_sort.utils.SearchDataUtils import SearchDataUtils
from datetime import datetime, timedelta, date

In [6]:
# collect hotel scores from API
cols = ["message_id"
        ,"timestamp"
        ,"search_date"
        ,"check_in_date"
        ,"check_out_date"
        ,"tuid"]
cols2 = ["message_id"
        ,"timestamp"
        ,"hotel_id"
        ,"star_rating"
        ,"city"
        ,"property_level_neg"
        ,"chain_level_neg"
        ,"preferred"
        ,"score_0"
        ,"score_1"
        ,"hotel_result_index"
        ,"country_code"]
q = SearchDataUtils()
sd = date(2019,10,1)
sd2 = date(2020,1,1)
dispHotelsDF = q.displayed_hotels(sd, sd2,cols, cols2)

In [7]:
dispHotelsDF.count()

149291990

In [8]:
# join hotel df with hotel scores
hotel_df_with_bk_idx_score = hotel_df_with_bk_idx.join(dispHotelsDF.select("message_id","tuid","hotel_id","check_in_date", "check_out_date",'score_1',"hotel_result_index"),
                                    ["message_id","tuid","hotel_id","check_in_date", "check_out_date","hotel_result_index"])

In [9]:
hotel_df_with_bk_idx_score.count()

10750007

In [11]:
# save hotel_df with bk_hotel_index and score
dir = 's3://ege-ds-workshops-corp/yixli/'

datestamp = datetime.now().strftime('%m-%d-%Y')
hotel_df_with_bk_idx_score.repartition(1).write.mode('overwrite').parquet(dir+'hotel_df_with_bk_idx_score')

In [7]:
hotel_df_with_bk_idx_score = sqlContext.read.parquet('s3://ege-ds-workshops-corp/yixli/hotel_df_with_bk_idx_score')

## 4 Convert hotel level df to rate level df

Expode the rate amentity and commission array for each hotel to obtain rate level data.

### 4.1 Expode rate amentity

In [2]:
# read amentity df
search_schema = T.StructType([
    T.StructField("message_id", T.StringType(), True),                                  #1                
    T.StructField("message_date", T.TimestampType(), True),                                  #2                               #1                
    T.StructField("check_in_date", T.DateType(), True),                                 #3
    T.StructField("check_out_date", T.DateType(), True),                                #4
    T.StructField("tuid", T.IntegerType(), True),                                       #5
    T.StructField("hotel_id", T.IntegerType(), True),                                   #6 
     T.StructField("hotel_result_index", T.IntegerType(), True),                                   #6 
     T.StructField("city", T.StringType(), True),           #9                               
     T.StructField("star_rating", T.StringType(), True),           #9                               
    T.StructField("filter_want_in_policy_rates_only", T.BooleanType(), True),           #9    
    T.StructField("filter_eligible_for_loyalty", T.BooleanType(), True),                #10 
    T.StructField("filter_free_breakfast", T.BooleanType(), True),                      #11
    T.StructField("filter_free_wifi", T.BooleanType(), True),                           #12    
    T.StructField("filter_free_parking", T.BooleanType(), True),                        #13
    T.StructField("rate_index", T.StringType(), True),                                      #14
    T.StructField("rate_amount", T.StringType(), True),                                     #15
    T.StructField("rate_currency", T.StringType(), True),                                   #16
    T.StructField("rate_type", T.StringType(), True),                                       #17
    T.StructField("eligible_for_loyalty", T.StringType(), True),                            #20
    T.StructField("free_breakfast", T.StringType(), True),                                  #21
    T.StructField("free_wifi", T.StringType(), True),                                       #22
    T.StructField("free_parking", T.StringType(), True),                                    #23
    T.StructField("refundable", T.StringType(), True)                                       #24
])                    

file_loc = "s3://ege-ds-workshops-corp/yixli/data_preparation/last_src_amentity_2019_pl.csv"
search_hotel_df = sqlContext.read.format('csv').\
            options(header='false', inferSchema='false', delimiter=',').\
            schema(search_schema).load(file_loc)

# load_date between '20191001' and '20200101'

search_hotel_df = search_hotel_df.\
            filter(F.col("hotel_id")>0).\
            filter(F.col('rate_amount').isNotNull())
print(search_hotel_df.count())

75638124

In [4]:
search_hotel_df.show(50)

+--------------------+--------------------+-------------+--------------+--------+--------+------------------+--------------------+-----------+--------------------------------+---------------------------+---------------------+----------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|          message_id|        message_date|check_in_date|check_out_date|    tuid|hotel_id|hotel_result_index|                city|star_rating|filter_want_in_policy_rates_only|filter_eligible_for_loyalty|filter_free_breakfast|filter_free_wifi|filter_free_parking|          rate_index|         rate_amount|       rate_currency|           rate_type|eligible_for_loyalty|      free_breakfast|           free_wifi|        free_parking|          refundable|
+--------------------+--------------------+-------------+--------------+--------+--------+------

In [3]:
# expode rate amentities for each hotel
combine = F.udf(lambda x1,x2,x3,x4,x5,x6,x7,x8,x9: list(zip(x1,x2,x3,x4,x5,x6,x7,x8,x9)),             
              T.ArrayType(T.StructType([T.StructField("rate_index_a", T.StringType()),                                      #1
                                    T.StructField("rate_amount_a", T.StringType()),                                         #2
                                    T.StructField("rate_currency_a", T.StringType()),                                   #3
                                    T.StructField("rate_type_a", T.StringType()),                                       #4
                                   # T.StructField("hot_mip_rate_a", T.StringType()),                                       #4
                                    T.StructField("eligible_for_loyalty_a", T.StringType()),                            #7
                                    T.StructField("free_breakfast_a", T.StringType()),                                  #8
                                    T.StructField("free_wifi_a", T.StringType()),                                       #9
                                    T.StructField("free_parking_a", T.StringType()),                                    #10
                                    T.StructField("refundable_a", T.StringType()),                                    #10
                                    ])))

search_df1= search_hotel_df.\
    withColumn("rate_index_a", F.split(F.col("rate_index"),"\|")).\
    withColumn("rate_amount_a", F.split(F.col("rate_amount"),"\|")).\
    withColumn("rate_currency_a", F.split(F.col("rate_currency"),"\|")).\
    withColumn("rate_type_a", F.split(F.col("rate_type"),"\|")).\
    withColumn("eligible_for_loyalty_a", F.split(F.col("eligible_for_loyalty"),"\|")).\
    withColumn("free_breakfast_a", F.split(F.col("free_breakfast"),"\|")).\
    withColumn("free_wifi_a", F.split(F.col("free_wifi"),"\|")).\
    withColumn("free_parking_a", F.split(F.col("free_parking"),"\|")).\
    withColumn("refundable_a", F.split(F.col("refundable"),"\|")).\
    withColumn("e", combine("rate_index_a", "rate_amount_a","rate_currency_a","rate_type_a",\
                "eligible_for_loyalty_a","free_breakfast_a","free_wifi_a","free_parking_a","refundable_a")).\
    withColumn("e",F.explode("e")).\
    select("message_id","message_date","check_in_date","check_out_date","hotel_id","hotel_result_index","tuid","city","star_rating", "filter_want_in_policy_rates_only",\
            "filter_eligible_for_loyalty", "filter_free_breakfast", "filter_free_wifi", "filter_free_parking",\
        F.col("e.rate_index_a").alias("rate_index"),\
        F.col("e.rate_amount_a").alias("rate_amount"),\
        F.col("e.rate_currency_a").alias("rate_currency"),\
        F.col("e.rate_type_a").alias("rate_type"),\
        F.col("e.eligible_for_loyalty_a").alias("eligible_for_loyalty"),\
        F.col("e.free_breakfast_a").alias("free_breakfast"),\
        F.col("e.free_wifi_a").alias("free_wifi"),\
        F.col("e.free_parking_a").alias("free_parking"),\
        F.col("e.refundable_a").alias("refundable"))

In [6]:
# convert column types
search_df1 = search_df1.\
            withColumn("rate_index", search_df1.rate_index.cast("integer")).\
            withColumn("rate_amount", search_df1.rate_amount.cast("float")).\
            withColumn("eligible_for_loyalty", search_df1.eligible_for_loyalty.cast("boolean")).\
            withColumn("free_breakfast", search_df1.free_breakfast.cast("boolean")).\
            withColumn("free_wifi", search_df1.free_wifi.cast("boolean")).\
            withColumn("free_parking", search_df1.free_parking.cast("boolean")).\
            withColumn("refundable", search_df1.refundable.cast("boolean"))

In [7]:
search_df1.count()

504487549

In [10]:
# save rate level amentity df
dir = 's3://ege-ds-workshops-corp/yixli/data_preparation/'

datestamp = datetime.datetime.now().strftime('%m-%d-%Y')
search_df1.repartition(1).write.mode('overwrite').parquet(dir+'rate_amentity_2019')

In [8]:
search_df1 = sqlContext.read.parquet('s3://ege-ds-workshops-corp/yixli/data_preparation/rate_amentity_2019')

In [13]:
search_df1.show()

+--------------------+--------------------+-------------+--------------+--------+------------------+--------+---------------+-----------+--------------------------------+---------------------------+---------------------+----------------+-------------------+----------+-----------+-------------+---------+--------------------+--------------+---------+------------+----------+
|          message_id|        message_date|check_in_date|check_out_date|hotel_id|hotel_result_index|    tuid|           city|star_rating|filter_want_in_policy_rates_only|filter_eligible_for_loyalty|filter_free_breakfast|filter_free_wifi|filter_free_parking|rate_index|rate_amount|rate_currency|rate_type|eligible_for_loyalty|free_breakfast|free_wifi|free_parking|refundable|
+--------------------+--------------------+-------------+--------------+--------+------------------+--------+---------------+-----------+--------------------------------+---------------------------+---------------------+----------------+-------------

### 4.2 Expode rate commission

In [14]:
# read rate commission df
search_schema = T.StructType([
    T.StructField("message_id", T.StringType(), True),                  #1
    T.StructField("message_date", T.TimestampType(), True),             #2
    T.StructField("check_in_date", T.DateType(), True),                 #3
    T.StructField("check_out_date", T.DateType(), True),                #4
    T.StructField("tuid", T.IntegerType(), True),                       #5
    T.StructField("hotel_id", T.IntegerType(), True),                   #6 
    T.StructField("hotel_result_index", T.IntegerType(), True),         #7
    T.StructField("rate_index", T.StringType(), True),                 #8 
    T.StructField("commission_base", T.StringType(), True),              #12
    T.StructField("commission_tax", T.StringType(), True),               #14
    T.StructField("commission_currency", T.StringType(), True)          #16
])

file_loc = "s3://ege-ds-workshops-corp/yixli/data_preparation/last_src_commission_2019_pl.csv"
print("Collecting search the data...")
search_commission_df = sqlContext.read.format('csv').\
            options(header='false', inferSchema='false', delimiter=',').\
            schema(search_schema).load(file_loc)

# load_date between '20191001' and '20200101'

search_commission_df = search_commission_df.\
            filter(F.col("hotel_id")>0)
print(search_commission_df.count())

Collecting search the data...
75669163

In [15]:
# expode rate commission for each hotel
combine = F.udf(lambda x1,x2,x3,x4: list(zip(x1,x2,x3,x4)),             
              T.ArrayType(T.StructType([T.StructField("rate_index_a", T.StringType()),                                      #1
                                    T.StructField("commission_base_a", T.StringType()),                                         #2
                                    T.StructField("commission_tax_a", T.StringType()),                                   #3
                                    T.StructField("commission_currency_a", T.StringType())                                    
                                    ])))

search_df2= search_commission_df.\
    withColumn("rate_index_a", F.split(F.col("rate_index"),"\|")).\
    withColumn("commission_base_a", F.split(F.col("commission_base"),"\|")).\
    withColumn("commission_tax_a", F.split(F.col("commission_tax"),"\|")).\
    withColumn("commission_currency_a", F.split(F.col("commission_currency"),"\|")).\
    withColumn("e", combine("rate_index_a", "commission_base_a","commission_tax_a","commission_currency_a")).\
    withColumn("e",F.explode("e")).\
    select("message_id","check_in_date","check_out_date","hotel_id","tuid",\
        F.col("e.rate_index_a").alias("rate_index"),\
        F.col("e.commission_base_a").alias("commission_base"),\
        F.col("e.commission_tax_a").alias("commission_tax"),\
        F.col("e.commission_currency_a").alias("commission_currency"))

In [17]:
# convert column types
search_df2 = search_df2.\
            withColumn("rate_index", search_df2.rate_index.cast("integer")).\
            withColumn("commission_base", search_df2.commission_base.cast("float")).\
             withColumn("commission_tax", search_df2.commission_tax.cast("float"))
          

In [18]:
search_df2.count()

508210906

In [19]:
# save rate level commission df
dir = 's3://ege-ds-workshops-corp/yixli/data_preparation/'

datestamp = datetime.datetime.now().strftime('%m-%d-%Y')
search_df2.repartition(1).write.mode('overwrite').parquet(dir+'rate_commission_2019')

In [9]:
search_df2 = sqlContext.read.parquet('s3://ege-ds-workshops-corp/yixli/data_preparation/rate_commission_2019')

In [22]:
search_df2.show()

+--------------------+-------------+--------------+--------+--------+----------+---------------+--------------+-------------------+
|          message_id|check_in_date|check_out_date|hotel_id|    tuid|rate_index|commission_base|commission_tax|commission_currency|
+--------------------+-------------+--------------+--------+--------+----------+---------------+--------------+-------------------+
|70a7986d-d662-4e5...|   2019-11-05|    2019-11-06|   12653|13172894|         0|          13.25|          5.39|                USD|
|70a7986d-d662-4e5...|   2019-11-05|    2019-11-06|   12653|13172894|         1|          13.25|          5.39|                USD|
|70a7986d-d662-4e5...|   2019-11-05|    2019-11-06|   12653|13172894|         2|          13.25|          5.39|                USD|
|70a7986d-d662-4e5...|   2019-11-05|    2019-11-06|   12653|13172894|         3|          13.25|          5.39|                USD|
|70a7986d-d662-4e5...|   2019-11-05|    2019-11-06|   12653|13172894|       

### 4.3 Join hotel level info back to rate data

join hotel df with rate amentity

In [10]:
# join hotel df with booking index and score with rate data to obtain the complete data at rate level
# 1) join with rate amentity
rate_df_with_bk_idx_score = hotel_df_with_bk_idx_score.select('message_id', 'tuid', 'hotel_id', 'hotel_result_index', 'hotel_index', 'bk_hotel_index', 'score_1').\
                    join(search_df1,
                  ['message_id', 'tuid', 'hotel_id', 'hotel_result_index',],how='inner')

# 2) join with rate commission
rate_df_with_bk_idx_score2 = hotel_df_with_bk_idx_score.select('message_id', 'tuid', 'hotel_id', 'hotel_index', 'bk_hotel_index', 'score_1').\
                    join(search_df2,
                  ['message_id', 'tuid', 'hotel_id'],how='inner')

# 3) join together to obtain the complete data of rate 
rate_df_all = rate_df_with_bk_idx_score.\
             join(rate_df_with_bk_idx_score2.select("commission_base","commission_tax","commission_currency","message_id","hotel_id","tuid","rate_index"),
                  ["message_id","hotel_id","tuid","rate_index"],how='inner')

In [15]:
rate_df_all.count()

76169602

In [16]:
# save rate level df
dir = 's3://ege-ds-workshops-corp/yixli/data_preparation/'
rate_df_all.repartition(1).write.mode('overwrite').parquet(dir+'rate_all_2019')

In [1]:
rate_df_all = sqlContext.read.parquet('s3://ege-ds-workshops-corp/yixli/data_preparation/rate_all_2019')

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
31,application_1600892099892_0032,pyspark3,idle,Link,Link,✔


SparkSession available as 'spark'.


In [4]:
rate_df_all.groupBy('rate_type').agg(F.count(F.col('message_id'))).show()

+---------+-----------------+
|rate_type|count(message_id)|
+---------+-----------------+
|     EPRA|          4305873|
|      NEG|             1748|
|     EPRM|          8715573|
|     ESRM|         37108626|
|     ESRA|         16447560|
|      GDS|          9590222|
+---------+-----------------+

In [24]:
# There were small number of NEG rates and I excluded them
rate_df_all = rate_df_all.filter(F.col('rate_type')!='NEG')

## 5 Further data cleaning for rate level df

### 5.1 Exchange rate currency to USD

In [25]:
# obtain exchange cuurency df from Redshift
sqlCurrency = """select DISTINCT
                 a.exchangeratedate as timestamp,
                 trunc(a.exchangeratedate) as exch_rate_date,
                 a.fromcurrencycode,
                 a.exchangerate
                 from public.exchangeratedailyfull a
                 where trunc(a.exchangeratedate)>=to_date('20191001','YYYYMMDD') 
                 and trunc(a.exchangeratedate)<=to_date('20200101','YYYYMMDD') 
                 and tocurrencycode = 'USD' """
connection_string = "jdbc:redshift://egencia-reporting.czjkedodj6lc.us-west-2.redshift.amazonaws.com:5439/egedatamart?user=ds_rpt_user&password=Jan2019!";

currency_df = sqlContext.read.\
    format("com.databricks.spark.redshift").\
    option("url", connection_string).\
    option("query", sqlCurrency).\
    option("tempdir", "s3a://ege-ds-workshops-corp/yixli/").\
    load().\
    dropDuplicates().\
    orderBy(["exch_rate_date","fromcurrencycode"],ascending=True)

In [26]:
# join rate level df with currency df to exchange currency to USD
rate_df_all.registerTempTable("rate_df_all")
currency_df.registerTempTable("currency_df")

rate_all_df = sqlContext.sql("select\
                             a.*,\
                             a.rate_amount*COALESCE(ex1.exchangerate,1) AS src_rate_amount_usd\
                             from rate_df_all a\
                             left join currency_df ex1 on to_date(a.message_date)=ex1.exch_rate_date and a.rate_currency=ex1.FromCurrencyCode").\
                             dropDuplicates()

### 5.2 Exchange comission currency to USD (Local rates)

In [28]:
# filter local rates
local_below_peterman_df = rate_all_df.filter(F.col('rate_type')!='GDS').filter(F.col('commission_base').isNotNull())
local_below_peterman_df.registerTempTable("local_below_peterman_df")
currency_df.registerTempTable("currency_df")

In [29]:
# convert commission to USD
local_below_peterman_usd_df = sqlContext.sql("select\
                             a.*,\
                             a.commission_base*COALESCE(ex2.exchangerate,1) AS src_commission_base_usd,\
                             a.commission_tax*COALESCE(ex2.exchangerate,1) AS src_commission_tax_usd\
                             from local_below_peterman_df a\
                             left join currency_df ex2 on to_date(a.message_date)=ex2.exch_rate_date and a.commission_currency=ex2.FromCurrencyCode").\
                             dropDuplicates()

In [30]:
# calculate supply revenue for local rates
local_below_peterman_usd_df = local_below_peterman_usd_df.\
withColumn('src_supply_revenue_usd',F.col('src_commission_base_usd')+F.col('src_commission_tax_usd'))

In [31]:
# filter outliers
local_below_peterman_usd_df = local_below_peterman_usd_df.filter(F.col('src_supply_revenue_usd')<1000)

In [32]:
local_below_peterman_usd_df.groupBy('rate_type').agg(F.count(F.col('message_id'))).show()

+---------+-----------------+
|rate_type|count(message_id)|
+---------+-----------------+
|     EPRA|          4212267|
|     EPRM|          8519699|
|     ESRM|         36264412|
|     ESRA|         16081946|
+---------+-----------------+

### 5.3 Obtain supply revenue for GDS rates

In [33]:
# collect supply revenue for GDS rates from Redshift
sqlGDS =  """SELECT 
      a.hotelid as hotel_id,
     trunc(a.issuedate) as issue_date,
     trunc(a.traveldatestart) as check_in_date,
     trunc(a.traveldateend) as check_out_date,
      (case when a.customersystemid=1 then a.TUIDTraveler else g.TUIDInternal end) as tuid,      
      a.travelproductid AS travel_product_id,
      a.GroupAccountID AS group_account_id,
      hsp.rate_type as hotel_rate_type,
      (case  
        when a.hotelrateTypeSupplyid IN (1,2,3,4,5,10,11,14,16,17,18,20,25,26,28,29,30,32) then 'GDS'
        when a.hotelrateTypeSupplyid IN (12,13,15,19,21,22,23,27,31,33,34) then 'NEG'
        when a.hotelrateTypeSupplyid = 6 then 'ESRA'
        when a.hotelrateTypeSupplyid = 7 then 'ESRM'
        when a.hotelrateTypeSupplyid = 8 then 'EPRA'
        when a.hotelrateTypeSupplyid = 9 then 'EPRM'
        when a.hotelrateTypeSupplyid IN (24,35) then 'HOTMIP'
        else 'UNK'
        end) as bk_rate_type,
      NVL(hsp.commission,0.00) As commission_rate,
      4.30 AS gds
FROM public.HotelFact a
  JOIN TravelerAccountDim g 
    ON a.TUIDTraveler = g.TUID AND a.customersystemid=g.customersystemid 
  JOIN public.GroupAccountdim d 
    ON a.GroupAccountID = d.GroupAccountID 
  JOIN public.hotel_supply hsp 
    ON hsp.year = EXTRACT (year FROM a.IssueDate)
    AND hsp.quarter = EXTRACT (quarter FROM a.IssueDate)
    AND hsp.rate_type = CASE
         WHEN a.HotelRateTypeSupplyID IN (7,9,24) THEN 'Expedia Collect'
         WHEN a.HotelRateTypeSupplyID IN (6,8,35) THEN 'Hotel Collect'
         WHEN a.HotelRateTypeSupplyID IN (12,13,15,19,21,22,23,27,31,33,34) THEN 'Negotiated'
         ELSE 'Published & GDS' END
    AND hsp.travel_product_id = a.TravelProductID
WHERE a.CustomerSystemID IN (1,2)
AND   (d.groupaccountinternaltypeid = 1 OR d.groupaccountinternaltypeid = 3)
AND a.BookingTypeID in (1,3) 
and hsp.year=2019
and hsp.quarter=4""".replace('\n',' ')
            
connection_string = "jdbc:redshift://egencia-reporting.czjkedodj6lc.us-west-2.redshift.amazonaws.com:5439/egedatamart?user=ds_rpt_user&password=Jan2019!";

gds_df = sqlContext.read.\
    format("com.databricks.spark.redshift").\
    option("url", connection_string).\
    option("query", sqlGDS).\
    option("tempdir", "s3a://ege-ds-workshops-corp/yixli/").\
    load().\
    filter(F.col("hotel_rate_type")=='Published & GDS').\
    filter(F.col('commission_rate').isNotNull()).\
    filter(F.col('gds').isNotNull()).\
    dropDuplicates()

In [34]:
# group by hotel_id to obtain hotel level GDS revenue
gds_df = gds_df.groupBy('hotel_id').agg(F.avg('commission_rate').alias('commission_rate'),
                                       F.avg('gds').alias('gds'))

In [36]:
# filter GDS rates
gds_below_peterman_df = rate_all_df.filter(F.col('rate_type')=='GDS')
gds_below_peterman_df.count()

9376998

In [37]:
# join the gds_df to obtain the supply revenue for GDS rates
gds_below_peterman_df = gds_below_peterman_df.\
join(gds_df.select('hotel_id','gds','commission_rate'),['hotel_id'],how='left').\
withColumn('src_supply_revenue_usd',F.col('src_rate_amount_usd')*F.col('commission_rate')+F.col('gds'))

In [38]:
# filter outliers
gds_below_peterman_df = gds_below_peterman_df.filter(F.col('src_supply_revenue_usd')<1000)

In [40]:
# add column: src_commission_base_usd = 0 to be consistent as local rates
gds_below_peterman_df = gds_below_peterman_df.withColumn('src_commission_base_usd',F.lit(0.0))

### 5.4 Union local rates df and GDS rates df together

In [42]:
# union local rates with GDS rates
search_revenue_below_peterman_usd_df = local_below_peterman_usd_df.select('message_id', 'hotel_id', 'check_in_date', 'check_out_date', 'tuid', 'rate_index', 'message_date', 'hotel_result_index','hotel_index', 'bk_hotel_index', 'score_1', 'city', 'star_rating', 'filter_want_in_policy_rates_only', 'filter_eligible_for_loyalty', 'filter_free_breakfast', 'filter_free_wifi', 'filter_free_parking',  'rate_type', 'eligible_for_loyalty', 'free_breakfast', 'free_wifi', 'free_parking', 'refundable',  'src_rate_amount_usd', 'src_commission_base_usd', 'src_supply_revenue_usd').\
                            union(gds_below_peterman_df.select('message_id', 'hotel_id', 'check_in_date', 'check_out_date', 'tuid', 'rate_index', 'message_date', 'hotel_result_index', 'hotel_index', 'bk_hotel_index', 'score_1','city', 'star_rating', 'filter_want_in_policy_rates_only', 'filter_eligible_for_loyalty', 'filter_free_breakfast', 'filter_free_wifi', 'filter_free_parking',  'rate_type', 'eligible_for_loyalty', 'free_breakfast', 'free_wifi', 'free_parking', 'refundable',  'src_rate_amount_usd', 'src_commission_base_usd', 'src_supply_revenue_usd'))

In [43]:
search_revenue_below_peterman_usd_df.count()

72740122

In [29]:
# An average of supply revenue per night by rate type
search_revenue_below_peterman_usd_df.groupBy('rate_type').agg(F.avg('src_supply_revenue_usd')).show()

+---------+---------------------------+
|rate_type|avg(src_supply_revenue_usd)|
+---------+---------------------------+
|     EPRA|         24.959300473259983|
|     EPRM|         33.338528055600676|
|     ESRM|         30.794078574840178|
|     ESRA|          22.62902275998205|
|      GDS|         19.094136118922332|
+---------+---------------------------+

In [44]:
# save rate df
dir = 's3://ege-ds-workshops-corp/yixli/data_preparation/'
search_revenue_below_peterman_usd_df.repartition(1).write.mode('overwrite').parquet(dir+'rate_all_usd_2019')

In [3]:
rate_all_usd_df = sqlContext.read.parquet('s3://ege-ds-workshops-corp/yixli/data_preparation/rate_all_usd_2019')

## 6 Get labels for modeling

To prepare the data for modeling, we need further get labels for rate df

### 6.1 Use rate level data: filter last search,hotels below PL, hotels have been booked

In [4]:
# the rate df is already filtered for the last search, hotels below PL
# so we only need to filter booked hotels here
bk_rate_all_usd_df = rate_all_usd_df.filter(F.col('hotel_index')==F.col('bk_hotel_index'))
bk_rate_all_usd_df.count()

3881004

### 6.2 Match with booking data

In [12]:
bookings_df = bookings_df.withColumn('rate_type',F.when(F.col('bk_rate_type')=='HOTMIP','ESRM').otherwise(F.col('bk_rate_type')))

In [13]:
# Step1: match by rate_type
book_rate_id =bk_rate_all_usd_df.join(bookings_df,["hotel_id","check_in_date", "check_out_date","tuid",'rate_type']).\
select("message_id","hotel_id","check_in_date", "check_out_date","tuid",'rate_type','rate_index','avg_book_rate_amount_usd','src_rate_amount_usd')

# Step2: match by rate_amount
book_rate_id = book_rate_id.\
                         withColumn('rate_diff',F.abs(F.col('avg_book_rate_amount_usd')-F.col('src_rate_amount_usd')))

book_rate_id = book_rate_id.filter((F.col('rate_diff')<1.0))

1953843

In [16]:
# For matched rates, fill label == 1
book_rate_id = book_rate_id.withColumn('label',F.lit(1))

In [17]:
# Add labels back to rate level df
bk_rate_all_usd_df_with_label = bk_rate_all_usd_df.join(book_rate_id.select("message_id","hotel_id","check_in_date", "check_out_date","tuid",'rate_type','rate_index','label'),
                                        ["message_id","hotel_id","check_in_date", "check_out_date","tuid",'rate_type','rate_index'],how='left')

In [18]:
# For not matched rates, fill label == 0
bk_rate_all_usd_df_with_label = bk_rate_all_usd_df_with_label.fillna({'label':0})

In [19]:
bk_rate_all_usd_df_with_label.filter(F.col('label')==1).count()

445989

In [20]:
bk_rate_all_usd_df_with_label.filter(F.col('label')==0).count()

3441975

In [21]:
# save rate level df with label
dir = 's3://ege-ds-workshops-corp/yixli/data_preparation/'
bk_rate_all_usd_df_with_label.repartition(1).write.mode('overwrite').parquet(dir+'bk_rate_all_usd_df_with_label')