In [0]:
from pyspark.sql.types import IntegerType
from itertools import chain
import pyspark.sql.functions as f
from pyspark.sql import Row
from pyspark.sql.functions import *
from pyspark.sql.types import ArrayType, DoubleType, FloatType, StringType
from pyspark.sql.window import Window
import numpy as np
import pandas as pd
from datetime import datetime,timedelta
from typing import List
spark.conf.set("spark.databricks.io.cache.enabled", "true")

In [0]:

rx = spark.sql("select SERVICE_DATE,PATIENT_ID,NDC_CODE,DAYS_SUPPLY_COUNT,PATIENT_ZIP3,HCP_ID,PRIMARY_PAY_TYPE_DESC from ADM_VW.FACT_APLD_RX_V")
rx = rx.withColumn("SERVICE_DATE", to_date(col("SERVICE_DATE").cast("string"),'yyyyMMdd'))
dim_ndc = spark.sql("select * from ADM_VW.DIM_NDC_V ")
activity = spark.sql("select * from ADM_VW.DIM_APLD_PATIENT_ACTIVITY_IND_V")

In [0]:
########  Filtering for 3 yrs data  ##########
rx_startdate = '2019-09-30 00:00:00'
rx = rx.filter(col("SERVICE_DATE") > unix_timestamp(lit(rx_startdate)).cast('timestamp'))

brand_list = ['TRELEGY ELLIPTA','BREZTRI AEROSPHERE'] 
#'ANORO ELLIPTA','SPIRIVA HANDIHALER','SPIRIVA RESPIMAT','SYMBICORT','BREZTRI AEROSPHERE','TRELEGY ELLIPTA'

##########  DOS Imputation buckets  ###########
splits = [-float("inf"),45,75,105,135,165,195,225,255,285,float("Inf")]
simple_dict = {0:30, 1:60, 2:90, 3:120, 4:150, 5 : 180, 6:210,7:240,8:270, 9:300 }

######  Cohort Month Start  ############
cohort_start = '2020-09-30 00:00:00'

####  Persistence Buckets #########
splits_pers = [-float("inf"),31,61,91,121,151,181,211,241,271,301,331,365,float("Inf")]

####  Current Year for age #######
current_year = 2022

In [0]:
ndc = dim_ndc.where(dim_ndc['BRAND_DESC'].rlike("|".join(["(" + pat + ")" for pat in brand_list])))
rx_control = rx.join(ndc[['NDC_CODE','BRAND_DESC']],['NDC_CODE'],'inner')
rx_control = rx_control.withColumn('BRAND_DESC',when(col('BRAND_DESC').isin('SPIRIVA HANDIHALER','SPIRIVA RESPIMAT'),'SPIRIVA').otherwise(col('BRAND_DESC')))

In [0]:
del rx,dim_ndc,ndc

In [0]:
# rx_control = rx_control.drop_duplicates(['PATIENT_ID','BRAND_DESC','SERVICE_DATE','DAYS_SUPPLY_COUNT'])
# rx_control = rx_control.groupby('PATIENT_ID','BRAND_DESC','SERVICE_DATE').agg(sum('DAYS_SUPPLY_COUNT').alias('DAYS_SUPPLY_COUNT'))


window = Window.partitionBy("PATIENT_ID","BRAND_DESC","SERVICE_DATE").orderBy(col("DAYS_SUPPLY_COUNT").desc(),col("tiebreak").asc())
rx_control = rx_control.withColumn('tiebreak', monotonically_increasing_id()).withColumn('rank', rank().over(window))\
.filter(col('rank') == 1).drop('rank','tiebreak')

In [0]:
from pyspark.ml.feature import Bucketizer

buck = Bucketizer(splits=splits,inputCol="DAYS_SUPPLY_COUNT",outputCol="DAYS_SUPPLY_Bucket")
rx_control = buck.transform(rx_control)


mapping_expr = create_map([lit(x) for x in chain(*simple_dict.items())])
rx_control = rx_control.withColumn('DAYS_SUPPLY_IMPUTED', mapping_expr[rx_control['DAYS_SUPPLY_Bucket']])

rx_control = rx_control.drop("DAYS_SUPPLY_COUNT",'DAYS_SUPPLY_Bucket')

In [0]:
#activity_rx = activity.filter(col('RX_DX_IND') == 'RX')
activity_rx = activity.groupby("PATIENT_ID").agg(min('MONTH_ID').alias('ELIGIBILITY_START_DATE'),max('MONTH_ID').alias('ELIGIBILITY_END_DATE'))

rx_control = rx_control.join(activity_rx,['PATIENT_ID'],'left')

rx_control=rx_control.withColumn("ELIGIBILITY_START_DATE", to_date(col("ELIGIBILITY_START_DATE").cast("string"),'yyyyMM'))
rx_control=rx_control.withColumn("ELIGIBILITY_END_DATE", last_day(to_date(col("ELIGIBILITY_END_DATE").cast("string"),'yyyyMM')))


rx_control=rx_control.withColumn("monthgap_before",months_between(col("service_date"),col("ELIGIBILITY_START_DATE")))
rx_control=rx_control.withColumn("monthgap_after",months_between(col("ELIGIBILITY_END_DATE"),col("service_date")))
rx_control = rx_control.withColumn("Max_Eligb", floor(col("monthgap_after")))

######   Eligb. flags dropped for fast execution   ###############

# rx_control=rx_control.withColumn("3month_eligb",when((col("monthgap_before")>= 0) & (col("monthgap_after")>= 3),1).otherwise(0))
# #4
# rx_control=rx_control.withColumn("6month_eligb",when((col("monthgap_before")>= 0) & (col("monthgap_after")>= 6),1).otherwise(0))
# #7
# rx_control=rx_control.withColumn("9month_eligb",when((col("monthgap_before")>= 0) & (col("monthgap_after")>= 9),1).otherwise(0))
# #10
# rx_control=rx_control.withColumn("12month_eligb",when((col("monthgap_before")>= 0) & (col("monthgap_after")>= 12),1).otherwise(0))
# #13
# rx_control=rx_control.withColumn("18month_eligb",when((col("monthgap_before")>= 0) & (col("monthgap_after")>= 18),1).otherwise(0))
# #19
# rx_control=rx_control.withColumn("23month_eligb",when((col("monthgap_before")>= 0) & (col("monthgap_after")>= 23),1).otherwise(0))
# #24

In [0]:
del activity, activity_rx

In [0]:
rx_control = rx_control.withColumn('prev_SD',lag(col('SERVICE_DATE'),1).over(Window.partitionBy('PATIENT_ID','BRAND_DESC').orderBy('SERVICE_DATE')))
rx_control = rx_control.withColumn('daysdiff', datediff(col("SERVICE_DATE"),col("prev_SD")))
rx_control = rx_control.withColumn("SoB_temp", when(col("daysdiff") > 365,'RESTART').otherwise(when(col("daysdiff") < 366,'CONTINUE').otherwise('NEW TO BRAND')))


########  To remove NTB for initial records, where 365 days haven't passed since data start date  ##########

#rx_startdate = rx_control.agg(min('SERVICE_DATE')).collect()[0][0]
rx_control = rx_control.withColumn("min_SD",lit(rx_startdate))
rx_control = rx_control.withColumn("min_SD",date_add(col("min_SD"),1))
rx_control = rx_control.withColumn('SoB',when(datediff(col("SERVICE_DATE"),col("min_SD"))<365,"Insufficient_lookback").otherwise(col('SoB_temp'))) 

rx_control = rx_control.drop("prev_SD",'daysdiff','SoB_temp','min_SD','ELIGIBILITY_START_DATE','ELIGIBILITY_END_DATE','monthgap_before','monthgap_after')

In [0]:
id = rx_control.groupBy('PATIENT_ID','BRAND_DESC').count().withColumn("ID", monotonically_increasing_id()).select('PATIENT_ID','BRAND_DESC','ID')
rx_control = rx_control.join(id,['PATIENT_ID','BRAND_DESC'],'left').sort(asc("ID"),asc("SERVICE_DATE"))
del id

In [0]:
rx_control = rx_control.withColumn('next_SD',lead(col('SERVICE_DATE'),1).over(Window.partitionBy('PATIENT_ID','BRAND_DESC').orderBy('SERVICE_DATE')))
rx_control = rx_control.withColumn("AD",lit('1970-01-01'))
rx_control = rx_control.withColumn("SDD", datediff(col('SERVICE_DATE'), col('AD'))).withColumn("NSDD",datediff(col('next_SD'), col('AD'))).na.fill(value=100000,subset=["NSDD"]).sort(asc('ID'),asc('SERVICE_DATE'))

In [0]:
prev_cf_dos = 0
prev_group = -1

def dos_adj(SDD,NSDD,DOS,gid):
  global prev_cf_dos,prev_group
  if gid != prev_group:
    prev_cf_dos = 0
    prev_group = gid
  ########  TRY LEAST   #########
    
  if NSDD - SDD > DOS + prev_cf_dos:
    UDOS = DOS + prev_cf_dos
  else:
    UDOS = NSDD - SDD
  #UDOS = least(NSDD - SDD,DOS + prev_cf_dos)
  ########  TRY LEAST   #########
  cfd = DOS + prev_cf_dos - UDOS
  prev_cf_dos = cfd
  return UDOS
  
dos_adj1 = udf(dos_adj, IntegerType())
rx_control = rx_control.withColumn('UDOS', dos_adj1('SDD','NSDD','DAYS_SUPPLY_IMPUTED','ID'))  
  

In [0]:
rx_control = rx_control.select('PATIENT_ID','BRAND_DESC','SERVICE_DATE','DAYS_SUPPLY_IMPUTED','Max_eligb','SoB','UDOS','PATIENT_ZIP3','HCP_ID','PRIMARY_PAY_TYPE_DESC')

rx_control = rx_control.withColumnRenamed('SERVICE_DATE','Anchor_Date').drop('DAYS_SUPPLY_IMPUTED','UDOS').join(rx_control.select('PATIENT_ID','BRAND_DESC','SERVICE_DATE','DAYS_SUPPLY_IMPUTED','UDOS'),['PATIENT_ID','BRAND_DESC'],'left').filter(  (datediff(col('SERVICE_DATE'),col('Anchor_Date'))>=0) & (datediff(col('SERVICE_DATE'),col('Anchor_Date'))<=365)  )

rx_control = rx_control.filter(col("Anchor_Date") > unix_timestamp(lit(cohort_start)).cast('timestamp')).sort(asc("PATIENT_ID"),asc("BRAND_DESC"),asc('Anchor_Date'),asc('SERVICE_DATE')) 

rx_control = rx_control.withColumn('ED_minus_SD',datediff(col('Anchor_Date'),col('SERVICE_DATE'))+365).withColumn('DOS_Cap',least('UDOS','ED_minus_SD'))\
.withColumn('COHORT_MONTH',concat(    year(col('Anchor_Date')).cast('string'),   date_format(col('Anchor_Date'), "MM").cast('string')          ))



rx_control = rx_control.withColumn('comp_flag_3',when(datediff(col('SERVICE_DATE'),col('Anchor_Date'))<=90,1).otherwise(0))\
.withColumn('ED_minus_SD_3',datediff(col('Anchor_Date'),col('SERVICE_DATE'))+90).withColumn('DOS_Cap_3',least('UDOS','ED_minus_SD_3'))

rx_control = rx_control.withColumn('comp_flag_6',when(datediff(col('SERVICE_DATE'),col('Anchor_Date'))<=180,1).otherwise(0))\
.withColumn('ED_minus_SD_6',datediff(col('Anchor_Date'),col('SERVICE_DATE'))+180).withColumn('DOS_Cap_6',least('UDOS','ED_minus_SD_6'))

rx_control = rx_control.withColumn('comp_flag_9',when(datediff(col('SERVICE_DATE'),col('Anchor_Date'))<=270,1).otherwise(0))\
.withColumn('ED_minus_SD_9',datediff(col('Anchor_Date'),col('SERVICE_DATE'))+270).withColumn('DOS_Cap_9',least('UDOS','ED_minus_SD_9'))

In [0]:
rx_control.write.mode("overwrite").option("overwriteSchema", "True").saveAsTable("default.Spark_INTM_Resp_Base_Table")
del rx_control

In [0]:
base_table_allmetrics = spark.sql("select * from default.Spark_INTM_Resp_Base_Table")

In [0]:
window = Window.partitionBy("PATIENT_ID","BRAND_DESC","COHORT_MONTH").orderBy(col("Anchor_Date").asc(),col("tiebreak").asc())

Comp12 = base_table_allmetrics.filter(  (col('Max_Eligb')>11) )\
.groupby('PATIENT_ID','BRAND_DESC','Anchor_date','COHORT_MONTH','SoB','PATIENT_ZIP3','HCP_ID','PRIMARY_PAY_TYPE_DESC','Max_Eligb').agg(sum('DOS_Cap').alias('Total_DOS'))\
.withColumn('Compliance_12',col('Total_DOS')/365)\
.withColumn('tiebreak', monotonically_increasing_id()).withColumn('rank', rank().over(window))\
.filter(col('rank') == 1).drop('rank','tiebreak')

Comp3 = base_table_allmetrics.filter(col('comp_flag_3')==1).filter(  (col('Max_Eligb')>2) )\
.groupby('PATIENT_ID','BRAND_DESC','Anchor_date','COHORT_MONTH','SoB','PATIENT_ZIP3','HCP_ID','PRIMARY_PAY_TYPE_DESC','Max_Eligb').agg(sum('DOS_Cap_3').alias('Total_DOS_3'))\
.withColumn('Compliance_3',col('Total_DOS_3')/90)\
.withColumn('tiebreak', monotonically_increasing_id()).withColumn('rank', rank().over(window))\
.filter(col('rank') == 1).drop('rank','tiebreak')

Comp6 = base_table_allmetrics.filter(col('comp_flag_6')==1).filter(  (col('Max_Eligb')>5) )\
.groupby('PATIENT_ID','BRAND_DESC','Anchor_date','COHORT_MONTH','SoB','PATIENT_ZIP3','HCP_ID','PRIMARY_PAY_TYPE_DESC','Max_Eligb').agg(sum('DOS_Cap_6').alias('Total_DOS_6'))\
.withColumn('Compliance_6',col('Total_DOS_6')/180)\
.withColumn('tiebreak', monotonically_increasing_id()).withColumn('rank', rank().over(window))\
.filter(col('rank') == 1).drop('rank','tiebreak')

Comp9 = base_table_allmetrics.filter(col('comp_flag_9')==1).filter(  (col('Max_Eligb')>8) )\
.groupby('PATIENT_ID','BRAND_DESC','Anchor_date','COHORT_MONTH','SoB','PATIENT_ZIP3','HCP_ID','PRIMARY_PAY_TYPE_DESC','Max_Eligb').agg(sum('DOS_Cap_9').alias('Total_DOS_9'))\
.withColumn('Compliance_9',col('Total_DOS_9')/270)\
.withColumn('tiebreak', monotonically_increasing_id()).withColumn('rank', rank().over(window))\
.filter(col('rank') == 1).drop('rank','tiebreak')

Comp = Comp12.join(Comp3,['PATIENT_ID','BRAND_DESC','COHORT_MONTH','Anchor_date','SoB','PATIENT_ZIP3','HCP_ID','PRIMARY_PAY_TYPE_DESC','Max_Eligb'],'outer')\
.join(Comp6,['PATIENT_ID','BRAND_DESC','COHORT_MONTH','Anchor_date','SoB','PATIENT_ZIP3','HCP_ID','PRIMARY_PAY_TYPE_DESC','Max_Eligb'],'outer')\
.join(Comp9,['PATIENT_ID','BRAND_DESC','COHORT_MONTH','Anchor_date','SoB','PATIENT_ZIP3','HCP_ID','PRIMARY_PAY_TYPE_DESC','Max_Eligb'],'outer')

In [0]:
del Comp3,Comp6,Comp9,Comp12

In [0]:
Ann = base_table_allmetrics.filter(  (col('Max_Eligb')>11) )\
.groupby('PATIENT_ID','BRAND_DESC','Anchor_date','COHORT_MONTH',"SoB",'PATIENT_ZIP3','HCP_ID','PRIMARY_PAY_TYPE_DESC').sum('DAYS_SUPPLY_IMPUTED')\
.withColumn('Annuity',col('sum(DAYS_SUPPLY_IMPUTED)')/30)\
.withColumn('tiebreak', monotonically_increasing_id()).withColumn('rank', rank().over(window))\
.filter(col('rank') == 1).drop('rank','tiebreak','sum(DAYS_SUPPLY_IMPUTED)')

In [0]:
pers = base_table_allmetrics.filter(  (col('Max_Eligb')>11) )\
               .withColumn('Claim_End',expr("date_add(SERVICE_DATE,DOS_Cap)"))

id = pers.groupBy('PATIENT_ID','BRAND_DESC','Anchor_Date').count().withColumn("ID", monotonically_increasing_id()).select('PATIENT_ID','BRAND_DESC','Anchor_Date','ID')
pers = pers.join(id,['PATIENT_ID','BRAND_DESC','Anchor_Date'],'left').sort(asc("ID"),asc("SERVICE_DATE"))


pers = pers.withColumn('prev_CED',lag(col('Claim_End'),1).over(Window.partitionBy('PATIENT_ID','BRAND_DESC','Anchor_Date').orderBy('Claim_End')))\
                       .withColumn('deltaPED',datediff(col('SERVICE_DATE'),col('prev_CED')))\
                       .withColumn('aggflag_temp',when(col('deltaPED')>30,0).otherwise(1)).sort(asc("ID"),asc('SERVICE_DATE'))



prev_gid = -1

def flagging(flag_temp,gid):
  global prev_flag, prev_gid
  if gid != prev_gid:
    prev_flag = 1
    prev_gid = gid
  flag = prev_flag * flag_temp
  prev_flag = flag
  return flag

flag1 = udf(flagging, IntegerType())
pers = pers.withColumn('aggflag', flag1('aggflag_temp','ID'))



pers = pers.filter(col('aggflag')==1).groupby('PATIENT_ID','BRAND_DESC','Anchor_Date','COHORT_MONTH','SoB','PATIENT_ZIP3','HCP_ID','PRIMARY_PAY_TYPE_DESC').agg(max('Claim_End').alias('Claim_End'))\
           .withColumn('DAYS_SUPPLY_IMPUTED',datediff(col('Claim_End'),col('Anchor_Date')))\
           .withColumn('tiebreak', monotonically_increasing_id()).withColumn('rank', rank().over(window))\
           .filter(col('rank') == 1).drop('rank','tiebreak')



from pyspark.ml.feature import Bucketizer

buck = Bucketizer(splits=splits_pers,inputCol="DAYS_SUPPLY_IMPUTED",outputCol="Pers_Month")
pers = buck.transform(pers)


In [0]:
rx = spark.sql("select SERVICE_DATE,PATIENT_ID,NDC_CODE,DAYS_SUPPLY_COUNT,DIAGNOSIS_CODE from ADM_VW.FACT_APLD_RX_V")

rx = rx.withColumn("SERVICE_DATE", to_date(col("SERVICE_DATE").cast("string"),'yyyyMMdd'))
rx_3yrs = rx.filter(col("SERVICE_DATE") > unix_timestamp(lit(rx_startdate)).cast('timestamp'))

dx = spark.sql("select SERVICE_DATE,PATIENT_ID,DIAGNOSIS_CODE from ADM_VW.FACT_APLD_DX_V")
dx = dx.withColumn("SERVICE_DATE", to_date(col("SERVICE_DATE").cast("string"),'yyyyMMdd'))

dim_ndc = spark.sql("select * from ADM_VW.DIM_NDC_V ")


#brand_list = ['BREZTRI AEROSPHERE','TRELEGY ELLIPTA'] 

#'ANORO ELLIPTA','SPIRIVA HANDIHALER','SPIRIVA RESPIMAT','SYMBICORT','BREZTRI AEROSPHERE','TRELEGY ELLIPTA'

ndc = dim_ndc.where(dim_ndc['BRAND_DESC'].rlike("|".join(["(" + pat + ")" for pat in brand_list])))
rx_control = rx_3yrs.join(ndc[['NDC_CODE','BRAND_DESC']],['NDC_CODE'],'inner')
rx_control = rx_control.withColumn('BRAND_DESC',when(col('BRAND_DESC').isin('SPIRIVA HANDIHALER','SPIRIVA RESPIMAT'),'SPIRIVA').otherwise(col('BRAND_DESC')))

pat = rx_control.select('PATIENT_ID').distinct()



gen_brands=spark.read.format("csv").option("inferSchema","true").option("header","true").option("sep",",").load("dbfs:/FileStore/Resp_BR_Initial_Controller_Product_List_V2.csv")
gen_ndc = gen_brands[['BRAND_DESC']].join(dim_ndc[['NDC_CODE','BRAND_DESC']],['BRAND_DESC'],'inner')

J_codes = ['J45','J82','J41','J42','J43','J44','J982']
#,'J40'
rx_indic = rx.select('PATIENT_ID','SERVICE_DATE','DIAGNOSIS_CODE')\
.where(col('DIAGNOSIS_CODE').rlike("|".join(["(" + pat + ")" for pat in J_codes]))).withColumn('Source',lit("Rx"))

dx_indic = dx.select('PATIENT_ID','SERVICE_DATE','DIAGNOSIS_CODE').where(col('DIAGNOSIS_CODE').rlike("|".join(["(" + pat + ")" for pat in J_codes]))).withColumn('Source',lit("Dx"))

rx_dx_indic = rx_indic.union(dx_indic).withColumn('Indication',when(col('DIAGNOSIS_CODE').rlike("J45|J82"),'Asthma').otherwise('COPD'))

rx_dx_indic = rx_dx_indic.groupby('PATIENT_ID').pivot('Indication').count().withColumn('Indication',when(col('Asthma').isNull(),"COPD").otherwise(when(col('COPD').isNull(),"Asthma").otherwise('Both')))


del rx, dx,rx_control,rx_indic,dx_indic

In [0]:
ntb_pat = base_table_allmetrics.filter(col('SoB')=='NEW TO BRAND').select('PATIENT_ID','BRAND_DESC','Anchor_Date').distinct()
Initial_therapy_flag = ntb_pat.join(rx_3yrs,['PATIENT_ID'],'inner').join(gen_ndc[['NDC_CODE']],['NDC_CODE'],'inner').withColumn('delta_days',datediff(col('Anchor_Date'),col('SERVICE_DATE')))\
.filter((col('delta_days')<365)  &  (col('delta_days')>=0)).withColumn('Initial_therapy_flag',lit('SWITCH/ADD-ON')).select('PATIENT_ID','BRAND_DESC','Anchor_Date','Initial_therapy_flag')\
.drop_duplicates(['PATIENT_ID','BRAND_DESC','Anchor_Date'])

Initial_therapy_flag = ntb_pat.join(Initial_therapy_flag,['PATIENT_ID','BRAND_DESC','Anchor_Date'],'left').na.fill(value='IMT',subset=["Initial_therapy_flag"])

del rx_3yrs

In [0]:
OND_flag = base_table_allmetrics.filter(col('SoB')=='NEW TO BRAND').groupby('PATIENT_ID','BRAND_DESC','Anchor_Date','Max_eligb').agg(min('ED_minus_SD').alias('min_gap'))\
.withColumn('One_n_done_flag',when(col('Max_eligb')<12,'N/A').otherwise(when(col('min_gap')<365,'N').otherwise('Y'))).select('PATIENT_ID','BRAND_DESC','Anchor_Date','Max_eligb','One_n_done_flag')

In [0]:
pat = spark.sql("select * from ADM_VW.DIM_PATIENT_V")
pat = pat.filter(col('DATA_SOURCE')=='APLD').dropDuplicates(['PATIENT_ID'])

df_age_gender=base_table_allmetrics.join(pat[['PATIENT_ID','PAT_BIRTH_YEAR','GENDER']],['PATIENT_ID'],'left').withColumn('Age',current_year-col('PAT_BIRTH_YEAR')).select('PATIENT_ID','Age','GENDER').distinct()

del pat

In [0]:
patient_flags = OND_flag[['PATIENT_ID','BRAND_DESC','Anchor_Date','One_n_done_flag']].join(Initial_therapy_flag,['PATIENT_ID','BRAND_DESC','Anchor_Date'],'outer')
patient_info = rx_dx_indic[['PATIENT_ID','Indication']].join(df_age_gender,['PATIENT_ID'],'outer')
del OND_flag,Initial_therapy_flag,rx_dx_indic,df_age_gender

In [0]:
df_hcp = spark.sql("select * from ADM_VW.DIM_HCP_V")
df_hcp=df_hcp.select('HCP_ID','STATE','SPECIALTY_CODE','SPECIALTY_DESC').filter(df_hcp['HCP_ID']!=7777777777).drop_duplicates(['HCP_ID'])

rx = spark.sql("select SERVICE_DATE,PATIENT_ID,HCP_ID from ADM_VW.FACT_APLD_RX_V").distinct().filter(col('HCP_ID')!=7777777777).filter(col('HCP_ID').isNotNull())

window = Window.partitionBy("PATIENT_ID").orderBy(col("SERVICE_DATE").desc(),col("tiebreak").asc())
rx = rx.withColumn('tiebreak', monotonically_increasing_id()).withColumn('rank', rank().over(window))\
.filter(col('rank') == 1).drop('rank','tiebreak')

df_hcp.join(rx,['HCP_ID'],'left')

del rx

In [0]:
Comp = Comp.join(df_hcp,['HCP_ID'],'left').join(patient_info,['Patient_Id'],'left').join(patient_flags,['PATIENT_ID','BRAND_DESC','Anchor_Date'],'left').drop('Anchor_Date')
Ann = Ann.join(df_hcp,['HCP_ID'],'left').join(patient_info,['Patient_Id'],'left').join(patient_flags,['PATIENT_ID','BRAND_DESC','Anchor_Date'],'left').drop('Anchor_Date')
pers = pers.join(df_hcp,['HCP_ID'],'left').join(patient_info,['Patient_Id'],'left').join(patient_flags,['PATIENT_ID','BRAND_DESC','Anchor_Date'],'left').drop('Anchor_Date')

del df_hcp,patient_info,patient_flags

Comp.write.mode("overwrite").option("overwriteSchema", "True").saveAsTable("default.Spark_INTM_DOS_adj_comp")
Ann.write.mode("overwrite").option("overwriteSchema", "True").saveAsTable("default.Spark_INTM_DOS_adj_ann")
pers.write.mode("overwrite").option("overwriteSchema", "True").saveAsTable("default.Spark_INTM_DOS_adj_pers")

del Comp,Ann,pers

In [0]:
Comp12 = spark.sql("select * from default.Spark_INTM_DOS_adj_comp").filter(col('Max_Eligb')>=12)

comp_tab=Comp12.groupby("COHORT_MONTH",'BRAND_DESC').pivot("SoB").agg(countDistinct('PATIENT_ID').alias("patient_count"),mean('Compliance_12').alias("Compliance"))
comp_tab = comp_tab.sort(asc("COHORT_MONTH"))
comp_tab.display()

comp_SoB=Comp12.groupby('Indication','BRAND_DESC').pivot('SoB').agg(countDistinct('PATIENT_ID','COHORT_MONTH').alias("patient_count"),mean('Compliance_12').alias("Compliance"))
comp_SoB.display()

comp_SoB=Comp12.groupby('Indication','BRAND_DESC').agg(countDistinct('PATIENT_ID','COHORT_MONTH').alias("patient_count"),mean('Compliance_12').alias("Compliance"))
comp_SoB.display()

comp_agg=Comp12.groupby('BRAND_DESC').agg(countDistinct('PATIENT_ID','COHORT_MONTH').alias("patient_count"),mean('Compliance_12').alias("Compliance"))
comp_agg.display()

del Comp12, comp_tab,comp_SoB,comp_agg

COHORT_MONTH,BRAND_DESC,CONTINUE_patient_count,CONTINUE_Compliance,NEW TO BRAND_patient_count,NEW TO BRAND_Compliance,RESTART_patient_count,RESTART_Compliance
202010,TRELEGY ELLIPTA,172142,0.7446409725771176,27165,0.5141899452609485,136.0,0.4284045124899276
202010,BREZTRI AEROSPHERE,3,0.6182648401826485,1913,0.4116520705483031,,
202011,TRELEGY ELLIPTA,174462,0.7394440803767316,26912,0.5034704689459699,345.0,0.4390152868771095
202011,BREZTRI AEROSPHERE,620,0.5493857711003095,3413,0.4230536747087085,,
202012,BREZTRI AEROSPHERE,1990,0.5632091966682731,4506,0.420280417586293,,
202012,TRELEGY ELLIPTA,188834,0.7289672700861727,32388,0.4840526086949151,643.0,0.4261573531604849
202101,BREZTRI AEROSPHERE,3271,0.5841462750698333,5063,0.4246683567866798,,
202101,TRELEGY ELLIPTA,188238,0.7385117383458181,34982,0.5236021969811462,985.0,0.5063514359223975
202102,TRELEGY ELLIPTA,182326,0.7419425749361502,34067,0.5268662760048576,948.0,0.5059823131610889
202102,BREZTRI AEROSPHERE,4558,0.604038661513401,7143,0.4133768283538437,,


Indication,BRAND_DESC,CONTINUE_patient_count,CONTINUE_Compliance,NEW TO BRAND_patient_count,NEW TO BRAND_Compliance,RESTART_patient_count,RESTART_Compliance
Both,TRELEGY ELLIPTA,497526,0.7282556775858551,74486,0.5318719818268678,2889.0,0.4705358539950781
Both,BREZTRI AEROSPHERE,26827,0.6108669909838323,17640,0.4568163886559183,,
COPD,TRELEGY ELLIPTA,1450361,0.7502506551203527,184153,0.5472171331030704,7611.0,0.4771964874199743
Asthma,TRELEGY ELLIPTA,208882,0.6672964207490578,68992,0.4767454475563544,767.0,0.4164883642013895
,BREZTRI AEROSPHERE,24598,0.5573559271440928,22841,0.3526334823284002,,
Asthma,BREZTRI AEROSPHERE,17989,0.5459738333243219,13710,0.3875457370382578,,
,TRELEGY ELLIPTA,418249,0.7099228397634484,105416,0.4581477325061026,2076.0,0.4512603267611582
COPD,BREZTRI AEROSPHERE,60417,0.6307204653684279,40011,0.4695234837816842,,


Indication,BRAND_DESC,patient_count,Compliance
Both,TRELEGY ELLIPTA,574901,0.7015164850419986
Both,BREZTRI AEROSPHERE,44467,0.5497553210923518
COPD,TRELEGY ELLIPTA,1642125,0.7262162804835888
Asthma,TRELEGY ELLIPTA,278641,0.6194252800271912
,BREZTRI AEROSPHERE,47439,0.458785861121721
Asthma,BREZTRI AEROSPHERE,31699,0.4774527695657819
,TRELEGY ELLIPTA,525741,0.6584181861723633
COPD,BREZTRI AEROSPHERE,100428,0.5664988097517991


BRAND_DESC,patient_count,Compliance
BREZTRI AEROSPHERE,224033,0.527767918730666
TRELEGY ELLIPTA,3021408,0.6998707733033219


In [0]:
Ann = spark.sql("select * from default.Spark_INTM_DOS_adj_ann")

ann_tab=Ann.groupby("COHORT_MONTH",'BRAND_DESC').pivot("SoB").agg(countDistinct('PATIENT_ID').alias("patient_count"),sum('Annuity').alias("claim_sum"))

ann_SoB=Ann.groupby("SoB",'Indication','BRAND_DESC').agg(countDistinct('PATIENT_ID','COHORT_MONTH').alias("patient_count"),sum('Annuity').alias("claim_sum"))
ann_SoB=ann_SoB.withColumn("Annuity",ann_SoB['claim_sum']/ann_SoB['patient_count'])
ann_SoB.display()

ann_SoB=Ann.groupby('Indication','BRAND_DESC').agg(countDistinct('PATIENT_ID','COHORT_MONTH').alias("patient_count"),sum('Annuity').alias("claim_sum"))
ann_SoB=ann_SoB.withColumn("Annuity",ann_SoB['claim_sum']/ann_SoB['patient_count'])
ann_SoB.display()

ann_agg=Ann.groupby('BRAND_DESC').agg(countDistinct('PATIENT_ID','COHORT_MONTH').alias("patient_count"),sum('Annuity').alias("claim_sum"))
ann_agg=ann_agg.withColumn("Annuity",ann_agg['claim_sum']/ann_agg['patient_count'])
ann_agg.display()

del Ann, ann_tab,ann_SoB,ann_agg

SoB,Indication,BRAND_DESC,patient_count,claim_sum,Annuity
NEW TO BRAND,,TRELEGY ELLIPTA,105416,628892.0,5.965811641496547
CONTINUE,,BREZTRI AEROSPHERE,24598,174879.0,7.10948044556468
NEW TO BRAND,Asthma,BREZTRI AEROSPHERE,13710,68125.0,4.969000729394603
RESTART,Both,TRELEGY ELLIPTA,2889,17899.0,6.195569401176877
NEW TO BRAND,Both,TRELEGY ELLIPTA,74486,518524.0,6.961361866659506
NEW TO BRAND,COPD,TRELEGY ELLIPTA,184153,1320922.0,7.1729594413341085
RESTART,,TRELEGY ELLIPTA,2076,12286.0,5.918111753371869
NEW TO BRAND,Asthma,TRELEGY ELLIPTA,68992,425937.0,6.173715793135436
CONTINUE,Asthma,BREZTRI AEROSPHERE,17989,124967.0,6.946856412251932
RESTART,COPD,TRELEGY ELLIPTA,7611,47716.0,6.269346997766391


Indication,BRAND_DESC,patient_count,claim_sum,Annuity
Both,TRELEGY ELLIPTA,574901,5229105.0,9.095661687838428
Both,BREZTRI AEROSPHERE,44467,316056.0,7.107652866170419
COPD,TRELEGY ELLIPTA,1642125,15469061.0,9.420148283474155
Asthma,TRELEGY ELLIPTA,278641,2219816.0,7.966580653959754
,BREZTRI AEROSPHERE,47439,278053.0,5.86127447880436
Asthma,BREZTRI AEROSPHERE,31699,193092.0,6.091422442348339
,TRELEGY ELLIPTA,525741,4476739.0,8.515103444471707
COPD,BREZTRI AEROSPHERE,100428,736116.0,7.329788505197754


BRAND_DESC,patient_count,claim_sum,Annuity
BREZTRI AEROSPHERE,224033,1523317.0,6.799520606339245
TRELEGY ELLIPTA,3021408,27394721.0,9.066872464758152


In [0]:
pers = spark.sql("select * from default.Spark_INTM_DOS_adj_pers").na.fill(value='None',subset=["INDICATION"])

pers_agg = pers.groupby("Pers_Month",'COHORT_MONTH','BRAND_DESC','SoB','indication').agg(countDistinct('Patient_ID').alias('Pat_count'))

pers_exp = pers.select("COHORT_MONTH",'BRAND_DESC','SoB','indication').distinct()
Months = spark.createDataFrame([{'Pers_Month':0},{'Pers_Month':1},{'Pers_Month':2},{'Pers_Month':3},{'Pers_Month':4},{'Pers_Month':5},{'Pers_Month':6},{'Pers_Month':7},{'Pers_Month':8},{'Pers_Month':9},{'Pers_Month':10},{'Pers_Month':11},{'Pers_Month':12}])
pers_exp = pers_exp.crossJoin(Months)

pers_agg = pers_exp.join(pers_agg,["Pers_Month",'COHORT_MONTH','BRAND_DESC','SoB','indication'],'left').na.fill(value=0,subset=["Pat_count"])

del pers_exp,Months

####  Cumulative   ###########
windowval = (Window.partitionBy('COHORT_MONTH','BRAND_DESC','SoB','indication').orderBy('Pers_Month')
             .rangeBetween(0,Window.unboundedFollowing ))
pers_cum = pers_agg.withColumn('cum_pat_count', sum('Pat_count').over(windowval))

#######   Diving by M0 #########
denom = pers_cum.filter(col('Pers_Month')==0).groupby('COHORT_MONTH','BRAND_DESC','SoB','indication').agg(sum('cum_pat_count').alias('M0_count'))


pers_cum = pers_cum.join(denom,['COHORT_MONTH','BRAND_DESC','SoB','indication'],'outer')


#######  Pivot   #########

pers_cum.filter(col('Pers_Month')==12).groupby('Indication','Pers_Month','BRAND_DESC').pivot('SoB').agg(sum('M0_count').alias('Patient_Count'),sum('cum_pat_count')/sum('M0_count')).sort(asc('Pers_Month')).display()
pers_cum.filter(col('Pers_Month')==12).groupby('Indication','Pers_Month','BRAND_DESC').agg(sum('M0_count').alias('Patient_Count'),sum('cum_pat_count')/sum('M0_count')).sort(asc('Pers_Month')).display()

del pers_cum,pers_agg,pers

Indication,Pers_Month,BRAND_DESC,CONTINUE_Patient_Count,CONTINUE_(sum(cum_pat_count) / sum(M0_count)),NEW TO BRAND_Patient_Count,NEW TO BRAND_(sum(cum_pat_count) / sum(M0_count)),RESTART_Patient_Count,RESTART_(sum(cum_pat_count) / sum(M0_count))
Both,12,TRELEGY ELLIPTA,497526,0.4373841769073375,74486,0.229371962516446,2889.0,0.1467635860159224
,12,TRELEGY ELLIPTA,418249,0.4346047450203109,105416,0.1861292403430219,2076.0,0.1440269749518304
COPD,12,BREZTRI AEROSPHERE,60417,0.3149775725375308,40011,0.1713028917047812,,
,12,BREZTRI AEROSPHERE,24598,0.2335962273355557,22841,0.0960991200035024,,
Both,12,BREZTRI AEROSPHERE,26827,0.2857196108398255,17640,0.1528344671201814,,
Asthma,12,TRELEGY ELLIPTA,208882,0.366101435260099,68992,0.1848040352504638,767.0,0.1134289439374185
COPD,12,TRELEGY ELLIPTA,1450361,0.4726416388747353,184153,0.2473160904248098,7611.0,0.157009591380896
Asthma,12,BREZTRI AEROSPHERE,17989,0.2029017733059091,13710,0.0964989059080962,,


Indication,Pers_Month,BRAND_DESC,Patient_Count,(sum(cum_pat_count) / sum(M0_count))
Both,12,TRELEGY ELLIPTA,574901,0.4089730231813825
,12,TRELEGY ELLIPTA,525741,0.3836356685135836
COPD,12,BREZTRI AEROSPHERE,100428,0.2577368861273748
,12,BREZTRI AEROSPHERE,47439,0.1673939163979004
Both,12,BREZTRI AEROSPHERE,44467,0.2330042503429509
Asthma,12,TRELEGY ELLIPTA,278641,0.3205163633492559
COPD,12,TRELEGY ELLIPTA,1642125,0.4459100251198903
Asthma,12,BREZTRI AEROSPHERE,31699,0.1568819205653175
