In [1]:
# Phcli Jupyter Python Template
# 
# 使用手册：
# 1. 请将全局变量定义在第一个输入区内
# 2. Phcli 会自动在第二个输入区初始化 Spark Session
# 3. 所有 print 会在 phcli maxauto dag 后自动转为 logger.debug() 方法
# 4. 请在第三个输入区开始编码，phcli maxauto dag 后会全部归类为一个方法


# Config defined in here

############## == config == ###################
job_name = "default"
job_runtime = "python3"
job_command = "submit"
job_timeout = 720.0
############## == config == ###################


# Variables defined in here

############## == input args == ###################
max_path = "s3a://ph-max-auto/v0.0.1-2020-06-08/"
project_name = "Empty"
universe_choice = "Empty"
all_models = "Empty"
weight_upper = "1.25"
job_choice = "Empty"
############## == input args == ###################

############## == output args == ###################
c = 'abc'
d = 'def'
############## == output args == ###################

In [138]:
# Initialize the Spark Session
# YARN URL: http://161.189.223.227:8088/cluster
import os
from pyspark.sql import SparkSession, functions as F

# prepare
spark = SparkSession.builder \
    .master("yarn") \
    .appName("ywyuan write weight.default in jupyter using python3") \
    .config("spark.driver.cores", "1") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.instances", "1") \
    .config('spark.sql.codegen.wholeStage', False) \
    .enableHiveSupport() \
    .getOrCreate()

access_key = os.getenv("AWS_ACCESS_KEY_ID", "AKIAWPBDTVEAEU44ZAGT")
secret_key = os.getenv("AWS_SECRET_ACCESS_KEY", "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")
if access_key:
    spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
    spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)
    spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
    spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.cn-northwest-1.amazonaws.com.cn")

In [139]:
from pyspark.sql.types import StringType, IntegerType, DoubleType, StructType, StructField
from pyspark.sql import functions as func
import os
from pyspark.sql.functions import pandas_udf, PandasUDFType, col, udf

from scipy.stats import ranksums, mannwhitneyu
import pandas as pd
import numpy as np

In [153]:
'''
max_path = "s3a://ph-max-auto/v0.0.1-2020-06-08/"
project_name = "Gilead"
universe_choice = "乙肝:universe_传染,乙肝_2:universe_传染,乙肝_3:universe_传染,安必素:universe_传染"
all_models = "乙肝,乙肝_2,乙肝_3,安必素"
weight_upper = "1.25"
job_choice = "weight_default"
'''

In [154]:
# 是否运行此job
if job_choice != "weight_default":
     raise ValueError('不运行weight_default')

# 输入
universe_choice_dict={}
if universe_choice != "Empty":
    for each in universe_choice.replace(" ","").split(","):
        market_name = each.split(":")[0]
        universe_name = each.split(":")[1]
        universe_choice_dict[market_name]=universe_name

all_models = all_models.replace(", ",",").split(",")
weight_upper = float(weight_upper)

# 输出
project_path = project_path = max_path + "/" + project_name
weight_default_path = max_path + "/" + project_name + '/PHA_weight_default'

In [155]:
# ====  数据分析  ====

for index, market in enumerate(all_models):
    if market in universe_choice_dict.keys():
        universe_path = project_path + '/' + universe_choice_dict[market]
    else:
        universe_path = project_path + '/universe_base'

    universe = spark.read.parquet(universe_path)
    universe = universe.fillna(0, 'Est_DrugIncome_RMB')
    
    # 数据处理
    universe_panel = universe.where(col('PANEL') == 1).select('Panel_ID', 'Est_DrugIncome_RMB', 'Seg')
    universe_non_panel = universe.where(col('PANEL') == 0).select('Est_DrugIncome_RMB', 'Seg', 'City', 'Province')
    
    seg_multi_cities = universe.select('Seg', 'City', 'Province').distinct() \
                            .groupby('Seg').count()
    seg_multi_cities = seg_multi_cities.where(col('count') > 1).select('Seg').toPandas()['Seg'].tolist()

    universe_m = universe_panel.where(col('Seg').isin(seg_multi_cities)) \
                                .withColumnRenamed('Est_DrugIncome_RMB', 'Est_DrugIncome_RMB_x') \
                                .join(universe_non_panel, on='Seg', how='inner')
    
    # 秩和检验获得p值
    schema = StructType([
        StructField("Panel_ID", StringType(), True),
        StructField("City", StringType(), True),
        StructField("Province", StringType(), True),
        StructField("pvalue", DoubleType(), True)
        ])

    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    def wilcoxtest(pdf):
        # 秩和检验
        Panel_ID = pdf['Panel_ID'][0]
        City = pdf['City'][0]
        Province = pdf['Province'][0]
        a = pdf['Est_DrugIncome_RMB_x'].drop_duplicates().values.astype(float)
        b = pdf['Est_DrugIncome_RMB'].values.astype(float)
        pvalue = round(mannwhitneyu(a, b, alternative="two-sided")[1],6) # 等同于R中的wilcox.test()     
        return pd.DataFrame([[Panel_ID] + [City] + [Province] + [pvalue]], columns=["Panel_ID", "City", "Province", "pvalue"])

    universe_m_wilcox = universe_m.groupby('Panel_ID', 'City', 'Province') \
                                .apply(wilcoxtest)
    
    universe_m_maxmin = universe_m_wilcox.groupby('Panel_ID') \
                                        .agg(func.min('pvalue').alias('min'), func.max('pvalue').alias('max'))
    
    # 计算weight
    universe_m_weight = universe_m_wilcox.join(universe_m_maxmin, on='Panel_ID', how='left') \
                                        .withColumn('Weight', 
                    (col('pvalue') - col('min'))/(col('max') - col('min'))*(weight_upper-1/weight_upper) + 1/weight_upper)

    universe_m_weight = universe_m_weight.fillna(1, 'Weight')
    
    weight_out = universe_m_weight.withColumn('DOI', func.lit(market)) \
                                .withColumnRenamed('Panel_ID', 'PHA') \
                                .select('Province', 'City', 'DOI', 'Weight', 'PHA')
    
    # 结果输出
    if index ==0:
        weight_out = weight_out.repartition(1)
        weight_out.write.format("parquet") \
            .mode("overwrite").save(weight_default_path)
    else:
        weight_out = weight_out.repartition(1)
        weight_out.write.format("parquet") \
            .mode("append").save(weight_default_path)
        