In [2]:
import numpy as np
import pandas as pd

from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import pyspark.sql.types as t

from pyspark.sql.types import *

import seaborn as sns
import matplotlib.pyplot as plt
# sns.set(style="ticks", color_codes=True)
%matplotlib inline

import warnings
warnings.filterwarnings("ignore")



## User Attributes

In [3]:
# gets partners in preferred partner program
preferredPartner = (spark.table('default.xxx_partners')
                    .select(f.col('id').alias('hotel_id'),'preferred')
                    .where(f.col('preferred') == 1)
                    ).cache()

# filters out test hotels
noTestHotel = (spark.table('xxx_reporting.customer_attributes')
               .where(f.col('status_id')!=  11)
               .select('hotel_id'))

# gets pool of partners with key attributes, excluding test hotels 
customerAttributes = (spark.table('xxx_reporting.customer_attributes')
                      .where(f.col('status_id')!=  11)
                      .select('hotel_id','country','nr_rooms','is_xml','pulse_os')
                      .join(preferredPartner, 'hotel_id','left')
                     ).cache()

### Opinion Leadership
<b>Q:</b> am I wiling to refer solution to others? <b>Proxy:</b> # of partner referrals

In [4]:
# takes the number of referrals per partner in a given year
referrer = (spark.table('default.acquisition_partnerincentiverewardslog')
           .where(f.col('promised_date').between('2017-01-01','2017-12-31'))
           .groupby(f.col('hotel_id_or_property_id').alias('hotel_id'))
           .agg(f.countDistinct('id').alias('nr_referrals'))
           .join(noTestHotel, 'hotel_id','inner')
          ).cache()

### Risk Tolerance
<b>Q:</b> am I open to buy a product in the solutions store? <b>Proxy:</b> # implemented opportunities in `Pricing` pillar, done through self-service centre & whether partner is part of the preferred program

In [5]:
# looks at the number of bought solutions in self-service centre
implementedOpps = (spark.table('reporting.kpi_initiative_performance')
                   .where(f.col('pillar_id').isin([1,2,3,4]))
                   .where(f.col('opportunity_type')=='global')
                   .where(f.col('implemented_opps_oc') == 1)
                   .where(f.col('yyyy_mm_dd').between('2017-01-01','2017-12-31'))
                   .join(noTestHotel, 'hotel_id','inner')
                   .groupBy('hotel_id').pivot('pillar')
                   .agg(f.round(f.sum('implemented_value_oc')/f.sum('implemented_opps_oc'),2)).fillna(0)
                  ).cache()

### Financial Resources
<b>Q:</b> can I spend disposable income on buying products? <b>Proxy:</b> wallet share (%) of annual budget for external apps from partner revenue on platform

In [6]:
# median price of products currently sold in solutions store
medianProductPrice = 52 * 12
medianProductPrice

624

In [7]:
# revenue generated by a partner in a given year for stayed reservations
revenue = (spark.table('default.reservation_flatter_without_user_ids')
            .where(f.col('status')=='ok')
            .where(f.col('checkout').between('2017-01-01','2017-12-31'))
            .join(noTestHotel, 'hotel_id','inner')
            .groupBy('hotel_id')
            .agg(f.round(f.sum('price_euro'),0).alias('revenue'))
            .withColumn('budget_share', f.round(medianProductPrice / f.col('revenue'),3))
         ).cache()

### Degree of Skepticism
<b>Q:</b> can the solutions store help my business?
<b>Proxy:</b> # partners who have ever had any B2B product & review score

In [8]:
# has user ever been a user of any first-party app?
internalAppUser = (spark.table('xxx_reporting.daily_customer_product_status_changes')
                   .where(f.col('live') == 1)
                   .where(f.col('product_family').isin(['web','rate']))
                   .join(noTestHotel, 'hotel_id','inner')
                   .groupBy('hotel_id')
                   .agg(f.sum('live').alias('nr_xxx_products'))
                ).cache()

In [9]:
# # users with a higher review score are usually looking to maintain or improve their scores
reviewScore = (spark.table('reporting.property_splits_scores')
               .select('hotel_id','review_score')
               ).cache()

## Point System and Weights

In [10]:
# weights per adoption category
opinionLeadership = 0.1
riskTolerance = 0.35
finResources = 0.25
degreeSkepticism = 0.3

In [11]:
def opinionleadership_score(nr_referrals):
    if nr_referrals is None or nr_referrals == 0:
        return 0
    elif nr_referrals == 1:
        return 1
    elif nr_referrals > 1:
        return 2

opinionleadership_score_udf = f.udf(opinionleadership_score, t.IntegerType())

In [12]:
def risktolerance_score(imp_opp, preferred):
    if (imp_opp is None or imp_opp == 0) and preferred == 0:
        return 0
    elif (imp_opp >= 1 and imp_opp <= 70) and preferred == 0:
        return 1
    elif (imp_opp > 70 and imp_opp <= 200) and preferred == 0:
        return 2
    elif imp_opp >= 200 and preferred == 0:
        return 3
    elif preferred >= 1:
        return 4

risktolerance_score_udf = f.udf(risktolerance_score, t.IntegerType())

In [13]:
def finresources_score(wallet):
    if wallet is None or wallet > 0.1:
        return 0
    elif wallet > 0.05:
        return 1
    else:
        return 2

finresources_score_udf = f.udf(finresources_score, t.IntegerType())

In [14]:
def skepticism_score(suite_app, review, pulse):
    if (suite_app is None or suite_app == 0) and review < 7.5 and pulse is None:
        return 0
    elif suite_app > 0 and review < 7.5 and pulse is None:
        return 1
    elif suite_app == 0 and review >= 7.5 and pulse is not None:
        return 2
    elif suite_app > 0 and review >= 7.5 and pulse is not None:
        return 3
    else:
        return 1

skepticism_score_udf = f.udf(skepticism_score, t.IntegerType())

In [15]:
adoptionCurve = (customerAttributes
                 .join(referrer,'hotel_id','left')
                 .join(revenue,'hotel_id','left')
                 .join(internalAppUser,'hotel_id','left')
                 .join(implementedOpps,'hotel_id','left')
                 .join(reviewScore,'hotel_id','left')
                 .drop('Availability','Content','Extra Features')
                 .fillna({'preferred':0,
                         'nr_referrals' : 0,
                         'revenue':0,
                         'budget_share': 0,
                         'nr_suite_products': 0,
                         'review_score': 0,
                         'Pricing': 0 
                         })
                ).cache()

for col_name in ['Pricing']:
    adoptionCurve = adoptionCurve.withColumnRenamed(col_name, 'value_per_opp')

## Technology Adoption Score

In [16]:
adoptionCurve = (adoptionCurve
                .withColumn('op_score_1', opinionleadership_score_udf('nr_referrals')*opinionLeadership/2)
                .withColumn('rt_score_2', risktolerance_score_udf('value_per_opp','preferred')*riskTolerance/4)
                .withColumn('fr_score_3', finresources_score_udf('budget_share')*finResources/2)
                .withColumn('sk_score_4', skepticism_score_udf('nr_suite_products','review_score','pulse_os')*degreeSkepticism/3)
                .withColumn('adoption_score', f.col('op_score_1') + f.col('rt_score_2') + f.col('fr_score_3')
                            + f.col('sk_score_4'))
                ).cache()