In [1]:
import pandas as pd
import numpy as np
import random
import copy
from itertools import combinations
from scipy.special import factorial, comb
from IPython.display import display, HTML
from datetime import date, timedelta, datetime
from typing import List
from collections.abc import Callable
from pprint import pprint

In [2]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

conf = SparkConf()
conf.setMaster('yarn')
conf.setAppName('hive_sql_query')

# set cluster name and queue that you have permission to access
conf.set("spark.hadoop.yarn.cluster.name", "default")
conf.set("spark.yarn.queue", "root.mouse_tiktok_ug_data")

spark = SparkSession.builder.config(conf=conf).getOrCreate()

## Shapley Value 分析算法

In [3]:
class ShapAnalysis:
    
    def __init__(self, df, date_base, date_curr, dim, var, metric, weight, func):
        self.df = df
        self.date_base = date_base
        self.date_curr = date_curr
        self.dim = dim
        self.var = var
        self.metric = metric
        self.weight = weight
        self.func = func
        # placeholders
        self.metric_base = 0
        self.metric_curr = 0
        self.metric_delta = 0
    
    def process_data(self):
        # generate df
        df_base = custom_results(self.df, self.dim, self.date_base)
        df_curr = custom_results(self.df, self.dim, self.date_curr)

        # fill NA
        df_base[self.dim] = df_base[self.dim].astype(str).fillna('_')
        df_curr[self.dim] = df_curr[self.dim].astype(str).fillna('_')
        df_base[self.var] = df_base[self.var].fillna(0)
        df_curr[self.var] = df_curr[self.var].fillna(0)

        # select required columns
        df_base = df_base[self.dim + self.var]
        df_curr = df_curr[self.dim + self.var]

        # combine dimensions into a tuple
        self.new_dim_col = '_dim'
        df_base[self.new_dim_col] = df_base[self.dim].apply(tuple, axis=1)
        df_curr[self.new_dim_col] = df_curr[self.dim].apply(tuple, axis=1)

        # drop old dim cols
        df_base, df_curr = df_base.drop(self.dim, axis=1), df_curr.drop(self.dim, axis=1)

        # find the set of all dim values
        self.dim_uniq = pd.concat([df_base[self.new_dim_col], df_curr[self.new_dim_col]]).unique()

        # make sure both dataframes have records for all dim values
        for d in self.dim_uniq:
            new_row = dict()
            new_row[self.new_dim_col] = d
            for v in self.var:
                new_row[v] = 0
            # tuple in set
            if d not in set(df_base[self.new_dim_col].values):
                df_base = df_base.append(new_row, ignore_index=True)
            if d not in set(df_curr[self.new_dim_col].values):
                df_curr = df_curr.append(new_row, ignore_index=True)
                
        self.df_base = df_base
        self.df_curr = df_curr
        
        # calc overall metrics
        self.metric_base, self.metric_curr = \
            self.func(self.df_base), self.func(self.df_curr)
        self.metric_delta = self.metric_curr - self.metric_base

        
    def analysis(self, sample_size=2):
        # players: dim x variable
        players = [(i, j) for i in range(len(self.dim_uniq)) for j in range(len(self.var))]

        # sample
        N = len(players)
        sample_size = min(sample_size, factorial(N))
        seq_list = list()
        random.seed(666)
        
        for _ in range(sample_size):
            seq = list(range(N))
            random.shuffle(seq)
            seq_list.append(seq)
            
        self.phi = dict()
        
        # reuse the same set of sequences for all players        
        for seq in seq_list:
            # make of copy of ctl
            df_s = self.df_base.copy()
            # current utility
            v_current = self.func(df_s)

            for i in range(N):
                # select player p
                p = players[seq[i]]
                # select dim and variable
                d, v = self.dim_uniq[p[0]], self.var[p[1]]
                # update df_s
                df_s.loc[lambda x: x[self.new_dim_col]==d, v] = \
                    self.df_curr.loc[lambda x: x[self.new_dim_col]==d, v].values
                # calculate marginal utility
                v_si = self.func(df_s)
                phi_i = v_si - v_current
                # update current utility
                v_current = v_si

                # add utility for player p
                if p in self.phi:
                    self.phi[p] += phi_i
                else:
                    self.phi[p] = phi_i
        
    
    def process_results(self):
        # standardize (because of sampling)
        phi_std = {k:1.0*self.metric_delta*v/sum(self.phi.values()) for k, v in self.phi.items()}

        # save contribution of each player
        self.con = \
        [{'维度':self.dim_uniq[k[0]], 
          '变量':self.var[k[1]],
          '贡献': v,
         } for k, v in phi_std.items()]

            
    def display_contribution(self):
        # contribution by dim combination
        con_by_dim = pd.DataFrame(self.con)
        
        # contribution by var
        con_by_var = con_by_dim.\
            groupby('变量')['贡献'].sum().reset_index()
        con_by_var['贡献权重'] = con_by_var['贡献']/self.metric_delta
        print("每个变量的整体贡献:")
        display(
            con_by_var.style.hide_index().\
            background_gradient(
                subset=pd.IndexSlice[:,['贡献权重']],
                # cmap='plasma',
                cmap='viridis',
                    ).\
            format({
                    '贡献':'{:.4f}',
                    '贡献权重':'{:.2%}',
                })
        )
        
        # split dim tuple into separate dims
        df_con_split = pd.concat(
            [
                pd.DataFrame(
                    con_by_dim['维度'].tolist(), 
                    columns=self.dim
                ), 
                con_by_dim
            ], 
            axis=1,
        )
        print("每个变量贡献，按不同维度分解:")
        
        for d in self.dim:
            print(f"\n{'-'*10} 维度: {d}")
            # aggregate by dimension to provide metrics
            df_base_d = custom_results(self.df, d, self.date_base)
            df_curr_d = custom_results(self.df, d, self.date_curr)
            df_con_by_var = list()
            
            for v in self.var:
                # print(f"{'-'*2} 指标: {v}. 累计贡献 {con_by_var[lambda x: x['变量']==v]['贡献'].values[0] :.4f}")
                df_con_grouped = \
                    df_con_split[lambda x: x['变量']==v].\
                    groupby(d)[['贡献']].sum().\
                    merge(df_base_d[[d, self.weight]], on=[d]).rename(columns={self.weight: '群体比重'}).\
                    merge(df_base_d[[d, v]], on=[d]).rename(columns={v: '基期'}).\
                    merge(df_curr_d[[d, v]], on=[d]).rename(columns={v: '现期'})
                # df_con_grouped['贡献权重'] = df_con_grouped['贡献']/df_con_grouped['贡献'].sum()
                df_con_grouped['群体比重'] = df_con_grouped['群体比重']/df_con_grouped['群体比重'].sum()
                # df_con_grouped['重要度'] = np.abs(df_con_grouped['贡献权重'])/df_con_grouped['群体权重']
                df_con_grouped['指标'] = v
                # append to the list
                df_con_by_var.append(df_con_grouped)
            
            # combine df for all vars and sort by contribution
            df_con_all_vars = pd.concat(df_con_by_var, axis=0).\
                reset_index(drop=True).\
                sort_values(by='贡献',ascending=True if self.metric_delta<0 else False)
            # add percentage metrics
            df_con_all_vars['贡献比重'] = df_con_all_vars['贡献']/self.metric_delta
            # add contribution per percent
            df_con_all_vars['单位贡献'] = np.abs(df_con_all_vars['贡献'])/df_con_all_vars['群体比重']
            df_con_all_vars['单位贡献'] = np.where(df_con_all_vars['群体比重'] >= 0.05, df_con_all_vars['单位贡献'], 0)
            # sort columns manually
            df_con_all_vars = df_con_all_vars[[d,'指标','基期','现期','贡献','贡献比重','群体比重','单位贡献']]
            # print(df_con_all_vars)
            
            # display
            display(
                df_con_all_vars.style.hide_index().\
                    background_gradient(
                        subset=pd.IndexSlice[:,['贡献比重','群体比重','单位贡献']],
                        cmap='viridis',
                ).\
                    format({
                    '基期':'{:.4f}',
                    '现期':'{:.4f}',
                    '贡献':'{:.4f}',
                    '贡献比重':'{:.2%}',
                    '群体比重':'{:.2%}',
                    '单位贡献':'{:.4f}'
                })
            )

## 数据源配置

提前为分析场景配置的，一般不需要修改

默认一次加载过去 DAYS_BACK 的数据，减少 DB 读取，后续都通过 Python 处理

In [6]:
# date range for raw data
DAYS_BACK = 10

data_date_start = date.today() - timedelta(days=DAYS_BACK+2)
data_date_end = date.today() - timedelta(days=2)

start_date = data_date_start.strftime('%Y%m%d')
end_date = data_date_end.strftime('%Y%m%d')

In [9]:
# start_date

每日分维度的基础指标的 SQL。注意：维度如果包含 NULL 值，pandas 聚合结果就不准群。

In [10]:
# sql for raw data
sql = """
select 
        from_unixtime(unix_timestamp(date, 'yyyyMMdd'), 'yyyy-MM-dd') as base_dt
        , coalesce(region_group, 'unknown') as region_group
        , coalesce(os, 'unknown') as os
        , CASE WHEN app_id in ('1340','1339') THEN 'Lite' WHEN app_id in ('1180','1233') THEN 'TTmain' ELSE 'Others' END as app_name
        , coalesce(media_type, 'unknown') as media_type
        , case
            when overall_score >= 11 then 'high++(>=11)'
            when overall_score >= 9.5 then 'high+(9.5~11)'
            when overall_score >= 8 then 'high(8~9.5)'
            when overall_score >= 6.5 then 'mid(6.5~8)'
            when overall_score >= 5 then 'low(5~6.5)'
            when overall_score > 0 then 'extremely_low(<5)'
            else 'other'
        end as overall_score_range,
       count(distinct device_id) as dnu,
       count(distinct case when event in ('login_notify') then device_id else null end) as login_notify_uv,
       count(distinct case when event in ('login_submit') and sequence_trans like '%login_notify%login_submit%' then device_id else null end) as login_submit_uv,
       count(distinct case when event in ('login_success') and sequence_trans like '%login_notify%login_submit%login_success%' then device_id else null end) as login_success_uv,
       count(distinct case when event in ('show_interest_selection','exit_interest_selection') and sequence_trans like '%login_notify%login_submit%login_success%interest_selection%' then device_id else null end) as interest_uv,
       count(distinct case when event in ('welcomescreen_show','exit_welcomescreen') and sequence_trans like '%login_notify%login_submit%login_success%interest_selection%welcomescreen%' then device_id else null end) as welcomescreen_uv,
       count(distinct case when event in ('video_play') and sequence_trans like '%login_notify%login_submit%login_success%interest_selection%welcomescreen%video_play%' then device_id else null end) as video_play_uv
from
(
    select *,concat_ws(',',event_sequence_list) as sequence_trans
    from ug_dw_int.dw_ug_new_user_onboarding_event_daily
    where date between '{start_date}' and '{end_date}'
    and app_id in ('1180','1233') -- For TT main only first
) as tt 
left join tt_ug_da.media_source_mapping ms on tt.new_active_media_source = ms.media_source
left join 
(
    select 
        distinct 
        lower(country_code) as country,
        region_group
    from musically.mds_dim_geo_country
    where date between '{start_date}' and '{end_date}'
) as ci on tt.country = ci.country
group by
        from_unixtime(unix_timestamp(date, 'yyyyMMdd'), 'yyyy-MM-dd')
        , coalesce(region_group, 'unknown') 
        , coalesce(os, 'unknown')
        , CASE WHEN app_id in ('1340','1339') THEN 'Lite' WHEN app_id in ('1180','1233') THEN 'TTmain' ELSE 'Others' END 
        , coalesce(media_type, 'unknown') 
        , case
            when overall_score >= 11 then 'high++(>=11)'
            when overall_score >= 9.5 then 'high+(9.5~11)'
            when overall_score >= 8 then 'high(8~9.5)'
            when overall_score >= 6.5 then 'mid(6.5~8)'
            when overall_score >= 5 then 'low(5~6.5)'
            when overall_score > 0 then 'extremely_low(<5)'
            else 'other'
        end 

"""

In [11]:
sql_w_param = sql.format(**{'start_date': start_date, 'end_date': end_date})

# run spark sql
df_raw_spark = spark.sql(sql_w_param)
# create pandas df
df_raw = df_raw_spark.toPandas()
# df_raw.head()

In [11]:
# df_raw.head()

In [21]:
# df_raw.groupby('base_dt').size()

后续按不同日期和维度在 Python 内聚合，提前写好聚合函数

In [12]:
def custom_aggregate(x):
    s = {
        '新增': x['dnu'].sum(),
        '登录通知': x['login_notify_uv'].sum(),
        '登录提交': x['login_submit_uv'].sum(),
        '登录成功': x['login_success_uv'].sum(),
        '兴趣选择': x['interest_uv'].sum(),
        '欢迎页展示': x['welcomescreen_uv'].sum(),
        '播放视频': x['video_play_uv'].sum(),
    

        '新增到播放视频': x['video_play_uv'].sum()/x['dnu'].sum(),
        
        '新增到登录通知': x['login_notify_uv'].sum()/x['dnu'].sum(),
        '登录通知到登录提交': 0 if x['login_notify_uv'].sum() == 0 else x['login_submit_uv'].sum()/x['login_notify_uv'].sum(),
        '登录提交到登录成功': 0 if x['login_submit_uv'].sum() == 0 else x['login_success_uv'].sum()/x['login_submit_uv'].sum(),
        '登录成功到兴趣选择': 0 if x['login_success_uv'].sum() == 0 else x['interest_uv'].sum()/x['login_success_uv'].sum(),
        '兴趣选择到欢迎页展示': 0 if x['interest_uv'].sum() == 0 else x['welcomescreen_uv'].sum()/x['interest_uv'].sum(),
        '欢迎页展示到播放视频': 0 if x['welcomescreen_uv'].sum() == 0 else x['video_play_uv'].sum()/x['welcomescreen_uv'].sum(),


    }
    
    return pd.Series(s, index=s.keys())


def custom_results(df, dim_str_list, date_range_list):
    temp = df[lambda x: x['base_dt'].between(*date_range_list)].\
        groupby(dim_str_list).apply(custom_aggregate).reset_index()
    temp['总新增'] = temp['新增'].sum()
    temp['总登录通知'] = temp['登录通知'].sum()
    temp['总登录提交'] = temp['登录提交'].sum()
    temp['总登录成功'] = temp['登录成功'].sum()
    temp['总兴趣选择'] = temp['兴趣选择'].sum()
    temp['总欢迎页展示'] = temp['欢迎页展示'].sum()
    temp['总播放视频'] = temp['播放视频'].sum()
    
    temp['新增占比'] = temp['新增']/temp['总新增']
    temp['登录通知占比'] = np.where(temp['总登录通知'] == 0, 0, temp['登录通知']/temp['总登录通知'])
    temp['登录提交占比'] = np.where(temp['总登录提交'] == 0, 0, temp['登录提交']/temp['总登录提交'])
    temp['登录成功占比'] = np.where(temp['总登录成功'] == 0, 0, temp['登录成功']/temp['总登录成功'])
    temp['兴趣选择占比'] = np.where(temp['总兴趣选择'] == 0, 0, temp['兴趣选择']/temp['总兴趣选择'])
    temp['欢迎页展示占比'] = np.where(temp['总欢迎页展示'] == 0, 0, temp['欢迎页展示']/temp['总欢迎页展示'])
    temp['播放视频占比'] = np.where(temp['总播放视频'] == 0, 0, temp['播放视频']/temp['总播放视频'])
    return temp

## 分析过程

每项配置对应一个分析任务。配置说明：

- date_base 参照数据起止日期
- date_curr 当前数据起止日期
- dim 分析维度
- var 用于计算指标的变量
- metric 指标名称，可以随便取
- weight 权重变量，通常选取用户数或用户占比
- func 从 var 计算 metric 的 Python 函数

配置可自行增改。

In [22]:
# common dates
date_list = df_raw.base_dt.unique()
latest_date = datetime.strptime(max(date_list), "%Y-%m-%d")

# YESTERDAY = (date.today() - timedelta(days=4)).strftime('%Y-%m-%d')

YESTERDAY = (latest_date - timedelta(days=0)).strftime('%Y-%m-%d')
DOD = (latest_date - timedelta(days=1)).strftime('%Y-%m-%d')
WOW = (latest_date - timedelta(days=7)).strftime('%Y-%m-%d')

# compare dates
wow = [WOW, WOW]
dod = [DOD, DOD]
ytd = [YESTERDAY, YESTERDAY]

In [23]:
latest_date

datetime.datetime(2022, 1, 25, 0, 0)

In [24]:
# config

# 新增入群率
转化率_dod = {
    'date_base': dod,
    'date_curr': ytd,
    'dim': ['region_group', 'media_type', 'os'],
    'var': ['新增占比', '新增到登录通知', '登录通知到登录提交', '登录提交到登录成功', '登录成功到兴趣选择', '兴趣选择到欢迎页展示', '欢迎页展示到播放视频'],
    'metric': '新增到播放视频',
    'weight': '新增占比',
    'func': lambda x: sum(x['新增占比']*x['新增到登录通知']*x['登录通知到登录提交']*x['登录提交到登录成功']*x['登录成功到兴趣选择']*x['兴趣选择到欢迎页展示']*x['欢迎页展示到播放视频']),
}

转化率_wow = {
    'date_base': wow,
    'date_curr': ytd,
    'dim': ['region_group', 'media_type', 'os'],
    'var': ['新增占比', '新增到登录通知', '登录通知到登录提交', '登录提交到登录成功', '登录成功到兴趣选择', '兴趣选择到欢迎页展示', '欢迎页展示到播放视频'],
    'metric': '新增到播放视频',
    'weight': '新增占比',
    'func': lambda x: sum(x['新增占比']*x['新增到登录通知']*x['登录通知到登录提交']*x['登录提交到登录成功']*x['登录成功到兴趣选择']*x['兴趣选择到欢迎页展示']*x['欢迎页展示到播放视频']),
}

In [25]:
# 顺序分析
for app_name in (['TTmain']):
    
    for conf in [
        转化率_dod,
    ]:
        print(f"{'-'*20} {(app_name.upper())} 的 {conf['metric']} 指标 {'-'*20}")
        s = ShapAnalysis(df_raw[lambda x: x['app_name'] == app_name], **conf)
        s.process_data()
        print(
            f"日期从 {conf['date_base']} 到 {conf['date_curr']}\n指标从 {s.metric_base: .4f} 到 {s.metric_curr: .4f}，" 
            f"绝对变化: {s.metric_delta :.4f} 相对变化: {s.metric_delta/s.metric_base :.2%}"
        )
        
        # 相对变化到达一定值才分析
        if abs(s.metric_curr/s.metric_base-1) >= 0.001:
            s.analysis(sample_size=20)
            s.process_results()
            s.display_contribution()

-------------------- TTMAIN 的 新增到播放视频 指标 --------------------


KeyError: '新增'

In [None]:
# 顺序分析
for app_name in (['TTmain']):
    
    for conf in [
        转化率_wow,
    ]:
        print(f"{'-'*20} {(app_name.upper())} 的 {conf['metric']} 指标 {'-'*20}")
        s = ShapAnalysis(df_raw[lambda x: x['app_name'] == app_name], **conf)
        s.process_data()
        print(
            f"日期从 {conf['date_base']} 到 {conf['date_curr']}\n指标从 {s.metric_base: .4f} 到 {s.metric_curr: .4f}，" 
            f"绝对变化: {s.metric_delta :.4f} 相对变化: {s.metric_delta/s.metric_base :.2%}"
        )
        
        # 相对变化到达一定值才分析
        if abs(s.metric_curr/s.metric_base-1) >= 0.001:
            s.analysis(sample_size=20)
            s.process_results()
            s.display_contribution()