In [1]:
import numpy as np
import pandas as pd
from statsmodels.stats.proportion import proportions_ztest
import sys
from datetime import date, timedelta

import bytedtqs
from pytqs import tqs
#import seaborn as sns
import scipy.stats as stats
import matplotlib.pyplot as plt
# from statsmodels.stats.proportion import proportions_ztest
# from statsmodels.stats.proportion import proportions_chisquare
from IPython.display import display, HTML

import logging
logging.basicConfig(level=logging.CRITICAL)
# logging.disable(sys.maxsize)

In [2]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

conf = SparkConf()
conf.setMaster('yarn')
conf.setAppName('hive_sql_query')

# set cluster name and queue that you have permission to access
conf.set("spark.hadoop.yarn.cluster.name", "default")
conf.set("spark.yarn.queue", "root.default_web.ez")

spark = SparkSession.builder.config(conf=conf).getOrCreate()

In [3]:
def two_proprotions_test(success_a, size_a, success_b, size_b):
    prop_a = success_a / size_a
    prop_b = success_b / size_b
    prop_pooled = (success_a + success_b) / (size_a + size_b)
    var = prop_pooled * (1 - prop_pooled) * (1 / size_a + 1 / size_b)
    zscore = np.abs(prop_b - prop_a) / np.sqrt(var)
    one_side = 1 - stats.norm(loc = 0, scale = 1).cdf(zscore)
    pvalue = one_side * 2
    return zscore, pvalue

In [10]:
# fetch data
# sql for raw data
sql = """
select 
    app_name
    ,install_period
    ,mile_stone_name
    ,source
    ,d0_in_group
    ,count(user_id) as dnu
    ,sum(d0_finish_lessons) as d0_finish_lessons
    ,sum(case when d0_finish_lessons > 0 then 1 else 0 end) as d0_finish_lessons_user
    ,sum(d3_study_duration) as d3_study_duration
    ,sum(case when d3_study_duration >= 600 then 1 else 0 end) as d3_study_duration_user
    ,sum(case when d6_vip_user > 0 then 1 else 0 end) as d6_vip_user
from
(
    select 
        ub.app_name
        ,ub.install_date
        ,ub.install_period
        ,ub.user_id
        ,mile_stone_name
        ,source
        ,case when first_entered_wechat_group_time > 0 then 1 else 0 end as d0_in_group
        ,case when order_cnt > 0 then 1 else 0 end as d6_vip_user
        ,sum(case when datediff(study_dt, ub.install_date) = 3 then study_duration else 0 end) as d3_study_duration
        ,sum(case when datediff(study_dt, ub.install_date) = 0 then finish_lessons else 0 end) as d0_finish_lessons
    from
    (
        select 
            app_name
            ,install_date
            ,case when install_date >= '2020-12-29' then 'Post' else 'Pre' end as install_period
            ,user_id
            ,first_entered_wechat_group_time
        from dm_ky.app_newer_revenue_di
        where date >= '20201201'
        and install_date >= '2020-12-01'
        and gap_days = 0 
        and app_name = 'eo'
        and user_id > 0

        union all 

        select 
            app_name
            ,install_date
            ,'Post' as install_period
            ,user_id
            ,first_entered_wechat_group_time
        from dm_ky.app_newer_revenue_di
        where date between '20201229' and '20210104'
        and install_date between '2020-12-29' and '2021-01-04'
        and gap_days = 0 
        and app_name = 'ez'   
        and user_id > 0
        and user_id in (
        SELECT
            distinct user_id
        FROM
            origin_log.dwd_abtest_vid_log_di
        where
            date >= '20201209'  -- 进入实验的日期
            and app in ('other', 'open_language')
            and version_id in ('2235962')    -- 合并实验 test group
        )

        union all 

        select 
            app_name
            ,install_date
            ,'Post' as install_period
            ,user_id
            ,first_entered_wechat_group_time
        from dm_ky.app_newer_revenue_di
        where date > '20210104'
        and install_date >= '2021-01-04'
        and gap_days = 0 
        and app_name = 'ez'   -- ez app test group
        and user_id > 0
    ) as ub 
    left join 
    (
        select
            user_id
            ,from_unixtime(unix_timestamp(date, 'yyyyMMdd'), 'yyyy-MM-dd') as study_dt
            ,study_duration_video + study_duration_video_review as study_duration
            ,finish_lessons_video + finish_lessons_video_review as finish_lessons
        from dm_ky.app_study_detail_wide_day
        where date >= '20201201'
    ) as st on ub.user_id = st.user_id
    left join
    (
        select 
            app_name
            ,install_date
            ,user_id
            ,order_cnt
        from dm_ky.app_newer_revenue_di
        where date >= '20201201'
        and install_date >= '2020-12-01'
        and gap_days = 6
        and user_id > 0
    ) as d6 on ub.app_name = d6.app_name and ub.install_date = d6.install_date and ub.user_id = d6.user_id
    left join
    (
        select
            user_id
            ,mile_stone_id
            ,source
            ,from_unixtime(unix_timestamp(date, 'yyyyMMdd'), 'yyyy-MM-dd') as level_dt
        from dm_ky.user_level_dict
        where date >= '20201201'
        and user_id > 0
    ) as l on ub.user_id = l.user_id and ub.install_date = l.level_dt
    left join 
    (
        select
            mile_stone_id,
            mile_stone_name,
            level_id,
            level_name
        from
            dm_eo.dim_eo_mile_stone
        where
            date = '${date}'
    ) n on cast(l.mile_stone_id as bigint) = n.mile_stone_id
    group by 
        ub.app_name
        ,ub.install_date
        ,ub.install_period
        ,ub.user_id
        ,mile_stone_name
        ,source
        ,case when first_entered_wechat_group_time > 0 then 1 else 0 end 
        ,case when order_cnt > 0 then 1 else 0 end
) as t 
group by 
    app_name
    ,install_period
    ,mile_stone_name
    ,source
    ,d0_in_group
"""

In [11]:
sql_w_param = sql

# run spark sql
df_raw_spark = spark.sql(sql_w_param)
# create pandas df
df = df_raw_spark.toPandas()

In [7]:
df.head()

Unnamed: 0,app_name,install_period,mile_stone_name,source,d0_in_group,dnu,d0_finish_lessons,d3_study_duration,d6_vip_user
0,ez,Post,,ez_level_test,0,36669,10600,2471907,116
1,eo,Post,,ez_level_test,0,767,973,723268,80
2,ez,Post,,,1,9,32,2314,0
3,ez,Post,,eo_level_test,1,11,11,8041,2
4,eo,Pre,,ez_advanced_test,0,123,90,151637,4


#### Post - finish vs unfinish

In [9]:
df_sub = df.loc[df.install_period == 'Post']
df_sub.groupby(['source']).size()

source
                           4
eo_advanced_test           3
eo_level_default           4
eo_level_test              4
ez_advanced_test           2
ez_level_merged            3
ez_level_test              4
ezo_default_ailab_test     3
ezo_default_damodel        3
ezo_default_less_trtest    3
unknown                    3
dtype: int64