In [1]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
日规划
"""
import time
import datetime
from datetime import timedelta
import math
import sys
import numpy as np
# # old_time放在程序运行开始的地方
# old_time = time.time()

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, IntegerType, MapType
from pyspark.sql.functions import col, lit, array
from pyspark.sql.functions import udf

# import os
# os.environ['PYSPARK_PYTHON'] = "/usr/local/anaconda3/bin/python3.6"


spark = (SparkSession \
    .builder \
    .appName("LTM_new_user") \
    .enableHiveSupport() \
    .config("spark.sql.shuffle.partitions", "1000") \
    .getOrCreate())

spark.conf.set("spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation", "true")
spark.conf.set("hive.exec.dynamic.partition", "true")
spark.conf.set("hive.exec.dynamic.partition.mode", "true")
spark.sql("""ADD JAR hdfs://ns1009/user/mart_jypt/mart_jypt_usr_grow/liuyang266/hiveudf-1.0-SNAPSHOT-jar-with-dependencies.jar""")
spark.sql("""CREATE TEMPORARY FUNCTION hash_sub AS 'com.jd.bdptools.ly.HashMonthly'""")

# applicationId:spark程序的唯一标识符，其格式取决于调度程序的实现
app_id = spark.sparkContext.applicationId 
print(app_id)
print(spark.version)

import datetime


application_5938451104802_6408319
2.4.7.online-JD2.4.7.22.1-20220622-173211


In [2]:
# 获取渠道同比概率分布
sql_1 = """
    SELECT
        dt,
        rate_natural_cnt,
        rate_com_cnt,
        rate_free_cnt,
        rate_strong_ctrl_cnt,
        rate_normal_ctrl_cnt,
        rate_no_ctrl_cnt
    FROM
        dev.dev_yhzz_day_plan_distribution
    WHERE
        dt >= '2021-05-01'
        AND dt <= '2021-05-31'
    group by
        dt,
        rate_natural_cnt,
        rate_com_cnt,
        rate_free_cnt,
        rate_strong_ctrl_cnt,
        rate_normal_ctrl_cnt,
        rate_no_ctrl_cnt
    ORDER BY
        dt ASC
    """
data_channel_tb = spark.sql(sql_1)

    
# 获取渠道月规划
sql_2 = """
        SELECT
            user_log_acct,
            natural_visit_cnt_month,
            com_visit_cnt_month,
            free_visit_cnt_month,
            jd_strong_ctrl_visit_cnt_month,
            jd_normal_ctrl_visit_cnt_month,
            jd_no_ctrl_visit_cnt_month
        FROM
            (
                SELECT
                    a.user_log_acct,
                    natural_login_cnt_mtd natural_visit_cnt_month,
                    com_login_cnt_mtd com_visit_cnt_month,
                    free_login_cnt_mtd free_visit_cnt_month,
                    strong_ctrl_login_cnt_mtd jd_strong_ctrl_visit_cnt_month,
                    normal_ctrl_login_cnt_mtd jd_normal_ctrl_visit_cnt_month,
                    no_ctrl_login_cnt_mtd jd_no_ctrl_visit_cnt_month,
                    ceil(rand() * 100000) AS rd
                FROM
                    (
                        --7.9亿
                        SELECT
                            user_log_acct,
                            priority_type,
                            CASE
                                WHEN login_cnt_mtd < - 100
                                THEN 0
                                ELSE login_cnt_mtd
                            END AS login_cnt_mtd,
                            CASE
                                WHEN natural_login_cnt_mtd < - 100
                                THEN 0
                                ELSE natural_login_cnt_mtd
                            END AS natural_login_cnt_mtd,
                            CASE
                                WHEN com_login_cnt_mtd < - 100
                                THEN 0
                                ELSE com_login_cnt_mtd
                            END AS com_login_cnt_mtd,
                            CASE
                                WHEN free_login_cnt_mtd < - 100
                                THEN 0
                                ELSE free_login_cnt_mtd
                            END AS free_login_cnt_mtd,
                            CASE
                                WHEN strong_ctrl_login_cnt_mtd < - 100
                                THEN 0
                                ELSE strong_ctrl_login_cnt_mtd
                            END AS strong_ctrl_login_cnt_mtd,
                            CASE
                                WHEN normal_ctrl_login_cnt_mtd < - 100
                                THEN 0
                                ELSE normal_ctrl_login_cnt_mtd
                            END AS normal_ctrl_login_cnt_mtd,
                            CASE
                                WHEN no_ctrl_login_cnt_mtd < - 100
                                THEN 0
                                ELSE no_ctrl_login_cnt_mtd
                            END AS no_ctrl_login_cnt_mtd
                        FROM
                            app.app_yhzz_umc_unit_user
                        WHERE
                            dt = '2022-05-31'
                    )
                    a
            )
            t1
        WHERE
            rd <= 1
    """
data_month_plan = spark.sql(sql_2)

In [17]:
import pandas as pd
# pd_data_month_plan = data_month_plan.toPandas()
# 渠道同比数据转成pandas.df作为参数输入udf，data_month_plan以udf的形式读取
pd_data_channel_tb = data_channel_tb.toPandas()



In [21]:
# 分配结果微调
def create_uniques(arr):
    unq,c = np.unique(arr,return_counts=1)

    m = np.isin(arr,unq[c>1])

    newvals = np.setdiff1d(np.arange(31),arr[~m])
    np.random.shuffle(newvals)
    
    cnt = m.tolist().count(True)
    newvals = newvals[:cnt]
    arr[m] = newvals
    
    return arr


if __name__ == "__main__":
    
    # 初始参数
    day_select_natural = 0.0
    day_select_com = 0.0
    day_select_free = 0.0
    day_select_strong_ctrl = 0.0
    day_select_normal_ctrl = 0.0
    day_select_no_ctrl = 0.0

    # 初始化最终生成的日规划list
    day_plan_list = np.zeros((31, 6))


    # 同比概率分布  
    cum_probability = pd_data_channel_tb.drop(['dt'],axis=1).values

    # 累计概率分布
    cum_probability_2 = np.zeros((31,6))
    for j in range(6):
        for i in range(31):
            cum_probability_2[i,j] = np.sum(cum_probability[:i+1,j])
    print('累计概率分布', cum_probability_2)

    
    # 日规划udf
    @udf 
    def tb_day_plan(natural_visit_cnt_month, com_visit_cnt_month, free_visit_cnt_month, jd_strong_ctrl_visit_cnt_month, jd_normal_ctrl_visit_cnt_month, jd_no_ctrl_visit_cnt_month, cum_probability_2):
        
        # 获取各渠道的月规划次数
        natural_cnt = natural_visit_cnt_month
        com_cnt = com_visit_cnt_month
        free_cnt = free_visit_cnt_month
        strong_ctrl_cnt = jd_strong_ctrl_visit_cnt_month
        normal_ctrl_cnt = jd_normal_ctrl_visit_cnt_month
        no_ctrl_cnt = jd_no_ctrl_visit_cnt_month
    #         print('月初规划', natural_cnt,com_cnt,free_cnt,strong_ctrl_cnt,normal_ctrl_cnt,no_ctrl_cnt)

        # 生成初始解
        if natural_cnt > 0:
            day_select_natural = np.zeros((1, natural_cnt))
            # 生成natural_cnt个随机数 用来判断拆到哪些天
            rand_seed_natural = np.random.rand(1,natural_cnt)
#             print('rand_seed_natural:', rand_seed_natural)
            # 判断是选哪一天
            if natural_cnt >= 30:
                # todo：这里当时为了评估方便，先当作每天都是1，因为不影响dau指标，但实际上有的天不止1，得改一下
                day_select_natural = np.ones((1, 30))
            else:
                for i in range(natural_cnt):
                    # 将随机数和累积分布挨个对比，概率大的那天区间也大，被选中的概率自然大
                    for j in cum_probability_2[:,0]:
                        if j >=  rand_seed_natural[0,i]: 
                            day_select_natural[0,i] = cum_probability_2[:,0].tolist().index(j)+1
                            break
                # 微调，作用是若有的天被分配大于1次，则将这天重新打散分配，用create_uniques函数
                day_select_natural = create_uniques(np.array(day_select_natural[0]).astype('int64'))
            # 对应的索引
            idx_natural = np.stack(day_select_natural.astype('int64'))
            # 将索引对应的位置置为1
            day_plan_list[:,0][idx_natural-1] = 1


        if com_cnt > 0 :
            day_select_com = np.zeros((1, com_cnt))
            rand_seed_com = np.random.rand(1,com_cnt)
            # 判断是选哪一天
            if com_cnt >= 30:
                day_select_com = np.ones((1, 30))
            else:
                for i in range(com_cnt):
                    for j in cum_probability_2[:,1]:
                        if j >=  rand_seed_com[0,i]: 
                            day_select_com[0,i] = cum_probability_2[:,1].tolist().index(j)+1
                            break
                day_select_com = create_uniques(np.array(day_select_com[0]).astype('int64'))
            idx_com =  np.stack(day_select_com.astype('int64'))
            day_plan_list[:,1][idx_com-1] = 1

        if free_cnt > 0:
            day_select_free = np.zeros((1, free_cnt))
            rand_seed_free = np.random.rand(1,free_cnt)
            # 判断是选哪一天
            if free_cnt >= 30:
                day_select_free = np.ones((1, 30))
            else:
                for i in range(free_cnt):
                    for j in cum_probability_2[:,2]:
                        if j >=  rand_seed_free[0,i]: 
                            day_select_free[0,i] = cum_probability_2[:,2].tolist().index(j)+1
                            break
                day_select_free = create_uniques(np.array(day_select_free[0]).astype('int64'))
            idx_free =  np.stack(day_select_free.astype('int64'))
            day_plan_list[:,2][idx_free-1] = 1

        if strong_ctrl_cnt > 0:
            day_select_strong_ctrl = np.zeros((1, strong_ctrl_cnt))
            rand_seed_strong_ctrl = np.random.rand(1,strong_ctrl_cnt)
            # 判断是选哪一天
            if strong_ctrl_cnt >= 30:
                day_select_strong_ctrl = np.ones((1, 30))
            else:
                for i in range(strong_ctrl_cnt):
                    for j in cum_probability_2[:,3]:
                        if j >=  rand_seed_strong_ctrl[0,i]: 
                            day_select_strong_ctrl[0,i] = cum_probability_2[:,3].tolist().index(j)+1
                            break
                day_select_strong_ctrl = create_uniques(np.array(day_select_strong_ctrl[0]).astype('int64'))
            idx_strong_ctrl =  np.stack(day_select_strong_ctrl.astype('int64'))
            day_plan_list[:,3][idx_strong_ctrl-1] = 1

        if normal_ctrl_cnt > 0:
            day_select_normal_ctrl = np.zeros((1, normal_ctrl_cnt))
            rand_seed_normal_ctrl = np.random.rand(1,normal_ctrl_cnt)
            # 判断是选哪一天
            if normal_ctrl_cnt >= 30:
                day_select_normal_ctrl = np.ones((1, 30))
            else:
                for i in range(normal_ctrl_cnt):
                    for j in cum_probability_2[:,4]:
                        if j >=  rand_seed_normal_ctrl[0,i]: 
                            day_select_normal_ctrl[0,i] = cum_probability_2[:,4].tolist().index(j)+1
                            break
                day_select_normal_ctrl = create_uniques(np.array(day_select_normal_ctrl[0]).astype('int64'))
            idx_normal_ctrl =  np.stack(day_select_normal_ctrl.astype('int64'))
            day_plan_list[:,4][idx_normal_ctrl-1] = 1

        if no_ctrl_cnt > 0:
            day_select_no_ctrl = np.zeros((1, no_ctrl_cnt))
            rand_seed_no_ctrl = np.random.rand(1,no_ctrl_cnt)
#             print('rand_seed_no_ctrl:', rand_seed_no_ctrl)
            # 判断是选哪一天
            if no_ctrl_cnt >= 30:
                day_select_no_ctrl = np.ones((1, 30))
            else:
                for i in range(no_ctrl_cnt):
                    for j in cum_probability_2[:,5]:
                        if j >=  rand_seed_no_ctrl[0,i]: 
                            day_select_no_ctrl[0,i] = cum_probability_2[:,5].tolist().index(j)+1
                            break
                day_select_no_ctrl = create_uniques(np.array(day_select_no_ctrl[0]).astype('int64'))
#             print('微调前：', day_select_no_ctrl)
            idx_no_ctrl =  np.stack(day_select_no_ctrl.astype('int64'))
            day_plan_list[:,5][idx_no_ctrl-1] = 1
        

        # 最终对于一个pin的6个渠道的日规划的结果，
        res = {
            'c1':'1,2,3,...,30',
            'c2':'1,2,3,30',
            ......
            'c6':'12,3,4..30'
        }
        return res

    @udf
    def parse_res(x):
        # 解析日规划字典
        return 1

    
    # pin级别每日分配结果
    data_month_plan = data_month_plan.withColumn('final_res',tb_day_plan(c1,c2,c3....))
    
    # 渠道拆解(可选)
    data = data.withColumn('final_com_assign_cnt',parse_res())\
                .withColumn('final_jd_strong_assign_cnt',parse_res())
    
    # 日期拆解，分配结果写入到中间表
    data.resigterTempTable('temp_t')
    spark.sql("""
    insert over 
    select
        *,

    from
        temp_t
    
    """)

累计概率分布 [[0.10335037 0.03985507 0.17425432 0.         0.07294118 0.138833  ]
 [0.18270869 0.07246377 0.27629513 0.         0.13176471 0.17303823]
 [0.2423339  0.09299517 0.34850863 0.66666667 0.17529412 0.19315895]
 [0.29386712 0.12801932 0.42543171 0.66666667 0.21882353 0.21830986]
 [0.33333333 0.14009662 0.44270016 0.66666667 0.24235294 0.25251509]
 [0.36825667 0.15942029 0.48508634 0.66666667 0.27176471 0.2806841 ]
 [0.39664963 0.1884058  0.52904239 0.66666667 0.29882353 0.31187123]
 [0.42887564 0.20652174 0.57299843 0.66666667 0.32352941 0.34104628]
 [0.45528109 0.24275362 0.58712716 0.66666667 0.35882353 0.36217304]
 [0.48495173 0.26570048 0.59654631 0.66666667 0.39529412 0.37927565]
 [0.50695627 0.29227053 0.60753532 0.66666667 0.43058824 0.40744467]
 [0.5330778  0.32850242 0.62166405 0.66666667 0.45294118 0.42454728]
 [0.5552243  0.34057971 0.64207221 0.66666667 0.49058824 0.44567404]
 [0.57637706 0.36594203 0.67503925 0.66666667 0.50941176 0.4668008 ]
 [0.59923339 0.39492754 0.6

In [4]:
# 之前离线评估的代码
import numpy as np
import pandas as pd

# 分配结果微调
def create_uniques(arr):
    unq,c = np.unique(arr,return_counts=1)

    m = np.isin(arr,unq[c>1])

    newvals = np.setdiff1d(np.arange(31),arr[~m])
    np.random.shuffle(newvals)
    
    cnt = m.tolist().count(True)
    newvals = newvals[:cnt]
    arr[m] = newvals
    
    return arr


if __name__ == "__main__":
    
    s = 0
#     for k in range(100000):
    for k in range(pd_data_month_plan.shape[0]):
        # 获取各渠道的月规划次数
        natural_cnt = pd_data_month_plan.loc[k,'natural_visit_cnt_month']
        com_cnt = pd_data_month_plan.loc[k,'com_visit_cnt_month']
        free_cnt = pd_data_month_plan.loc[k,'free_visit_cnt_month']
        strong_ctrl_cnt = pd_data_month_plan.loc[k,'jd_strong_ctrl_visit_cnt_month']
        normal_ctrl_cnt = pd_data_month_plan.loc[k,'jd_normal_ctrl_visit_cnt_month']
        no_ctrl_cnt = pd_data_month_plan.loc[k,'jd_no_ctrl_visit_cnt_month']
#         print('月初规划', natural_cnt,com_cnt,free_cnt,strong_ctrl_cnt,normal_ctrl_cnt,no_ctrl_cnt)

        # day_select
        day_select_natural = 0.0
        day_select_com = 0.0
        day_select_free = 0.0
        day_select_strong_ctrl = 0.0
        day_select_normal_ctrl = 0.0 
        day_select_no_ctrl = 0.0

        # 初始化最终生成的日规划list
        day_plan_list = np.zeros((31, 6))

        # 同比概率分布  
        cum_probability = pd_data_channel_tb.drop(['dt'],axis=1).values

        # 累计概率分布
        cum_probability_2 = np.zeros((31,6))
        for j in range(6):
            for i in range(31):
                cum_probability_2[i,j] = np.sum(cum_probability[:i+1,j])


        # 生成初始解
#         if natural_cnt > 0:
#             day_select_natural = np.zeros((1, natural_cnt))
#             # 随机数 用来判断拆到哪天
#             rand_seed_natural = np.random.rand(1,natural_cnt)
# #             print('rand_seed_natural:', rand_seed_natural)
#             # 判断是选哪一天
#             if natural_cnt >= 31:
#                 day_select_natural = np.ones((1, 31))
#             else:
#                 for i in range(natural_cnt):
#                     for j in cum_probability_2[:,0]:
#                         if j >=  rand_seed_natural[0,i]: 
#                             day_select_natural[0,i] = cum_probability_2[:,0].tolist().index(j)+1
#                             break
# #                 print('微调前：', day_select_natural)
#                 day_select_natural = create_uniques(np.array(day_select_natural[0]).astype('int64'))
# #                 print('微调后：', day_select_natural)
#             idx_natural = np.stack(day_select_natural.astype('int64'))
# #             print('idx_natural',idx_natural)
#             day_plan_list[:,0][idx_natural-1] = 1
# #             print('day_plan_list',day_plan_list)
            
# #             print('idx_natural',idx_natural)
# #             print('微调前：', day_select_natural)
# #             print('微调后：', day_select_natural_new)

#         if com_cnt > 0 :
#             day_select_com = np.zeros((1, com_cnt))
#             rand_seed_com = np.random.rand(1,com_cnt)
#             # 判断是选哪一天
#             if com_cnt >= 31:
#                 day_select_com = np.ones((1, 31))
#             else:
#                 for i in range(com_cnt):
#                     for j in cum_probability_2[:,1]:
#                         if j >=  rand_seed_com[0,i]: 
#                             day_select_com[0,i] = cum_probability_2[:,1].tolist().index(j)+1
#                             break
#                 day_select_com = create_uniques(np.array(day_select_com[0]).astype('int64'))
#             idx_com =  np.stack(day_select_com.astype('int64'))
#             day_plan_list[:,1][idx_com-1] = 1

#         if free_cnt > 0:
#             day_select_free = np.zeros((1, free_cnt))
#             rand_seed_free = np.random.rand(1,free_cnt)
#             # 判断是选哪一天
#             if free_cnt >= 31:
#                 day_select_free = np.ones((1, 31))
#             else:
#                 for i in range(free_cnt):
#                     for j in cum_probability_2[:,2]:
#                         if j >=  rand_seed_free[0,i]: 
#                             day_select_free[0,i] = cum_probability_2[:,2].tolist().index(j)+1
#                             break
#                 day_select_free = create_uniques(np.array(day_select_free[0]).astype('int64'))
#             idx_free =  np.stack(day_select_free.astype('int64'))
#             day_plan_list[:,2][idx_free-1] = 1

#         if strong_ctrl_cnt > 0:
#             day_select_strong_ctrl = np.zeros((1, strong_ctrl_cnt))
#             rand_seed_strong_ctrl = np.random.rand(1,strong_ctrl_cnt)
#             # 判断是选哪一天
#             if strong_ctrl_cnt >= 31:
#                 day_select_strong_ctrl = np.ones((1, 31))
#             else:
#                 for i in range(strong_ctrl_cnt):
#                     for j in cum_probability_2[:,3]:
#                         if j >=  rand_seed_strong_ctrl[0,i]: 
#                             day_select_strong_ctrl[0,i] = cum_probability_2[:,3].tolist().index(j)+1
#                             break
#                 day_select_strong_ctrl = create_uniques(np.array(day_select_strong_ctrl[0]).astype('int64'))
#             idx_strong_ctrl =  np.stack(day_select_strong_ctrl.astype('int64'))
#             day_plan_list[:,3][idx_strong_ctrl-1] = 1

        if normal_ctrl_cnt > 0:
            day_select_normal_ctrl = np.zeros((1, normal_ctrl_cnt))
            rand_seed_normal_ctrl = np.random.rand(1,normal_ctrl_cnt)
            # 判断是选哪一天
            if normal_ctrl_cnt >= 31:
                day_select_normal_ctrl = np.ones((1, 31))
            else:
                for i in range(normal_ctrl_cnt):
                    for j in cum_probability_2[:,4]:
                        if j >=  rand_seed_normal_ctrl[0,i]: 
                            day_select_normal_ctrl[0,i] = cum_probability_2[:,4].tolist().index(j)+1
                            break
                day_select_normal_ctrl = create_uniques(np.array(day_select_normal_ctrl[0]).astype('int64'))
            idx_normal_ctrl =  np.stack(day_select_normal_ctrl.astype('int64'))
            day_plan_list[:,4][idx_normal_ctrl-1] = 1

#         if no_ctrl_cnt > 0:
#             day_select_no_ctrl = np.zeros((1, no_ctrl_cnt))
#             rand_seed_no_ctrl = np.random.rand(1,no_ctrl_cnt)
# #             print('rand_seed_no_ctrl:', rand_seed_no_ctrl)
#             # 判断是选哪一天
#             if no_ctrl_cnt >= 31:
#                 day_select_no_ctrl = np.ones((1, 31))
#             else:
#                 for i in range(no_ctrl_cnt):
#                     for j in cum_probability_2[:,5]:
#                         if j >=  rand_seed_no_ctrl[0,i]: 
#                             day_select_no_ctrl[0,i] = cum_probability_2[:,5].tolist().index(j)+1
#                             break
#                 day_select_no_ctrl = create_uniques(np.array(day_select_no_ctrl[0]).astype('int64'))
# #             print('微调前：', day_select_no_ctrl)
#             idx_no_ctrl =  np.stack(day_select_no_ctrl.astype('int64'))
#             day_plan_list[:,5][idx_no_ctrl-1] = 1
            
#             print('微调前：', day_select_no_ctrl)
            

        # 日规划格子
#         print('day_plan_list:', day_plan_list)
        
        # 计算dau
        for i in range(31):
            if np.sum(day_plan_list[i,:]) >= 1:
                s += 1
#         print('s', s)
                
    print('s,k:', s, k+1)
    print('mean:', s/(k+1))
        

s,k: 399899 791260
mean: 0.5053951924778202
