# Step01: Data split

In [None]:
# helper functions form WF MDD (split datasets, VIF, WOE, mono bin)

def mono_bin(temp_df, feature, target, n=10):
    # rho for spearman correlation
    custom_rho = 1
    r = 0
    
    while np.abs(r) < custom_rho and n > 1:
        try:
            #quantile discretizer custs data into equal number of bins
            qds = QuantileDiscretizer(numBuckets = n, inputCol = feature, outputCol = 'buckets',
                                     relativeError = 0.01, handleInvalid = 'error')
            temp_df = qds.setHandleInvalid('keep').fit(temp_df).transform(temp_df)  #keep NAN
            
            #create corr_df is Python implemented
            corr_df = temp_df.groupby('buckets').agg({feature: 'avg', target: 'avg'}).toPandas()
            corr_df.columns = ['buckets', feature, target]
            r,p = stats.spearmanr(corr_df[feature], corr_df[target])
            n = n-1
            
        except Exception as e:
            n = n-1
            
        return temp_df
    

    
# transform in WOE
def fit_woe_on_training(path, df, target, label):
    lst_tbl, lst_iv = list(),list()
    max_bin = 10
    
    count = -1
    for feature in final_vars:
        print("feature is:", feature)
        #execute if feature is not a target col
        if feature != target:
            count = count + 1
            temp_df = df.select([feature,target])   #spark
            
            #perform monotonic binning
            if feature in num_vars:
                temp_df = mono_bin(temp_df,feature,target,n=max_bin)
                #numeric values
                grouped = temp_df.groupby('buckets')
            else:
                #categorical
                grouped = temp_df.groupby(feature)
                
            #count and event value for each group
            count_df = grouped.agg(F.count(target).alias('count')).toPandas()
            event_df = grouped.agg(F.sum(target).alias('event')).toPandas()
            
            #store min/max for variables
            if feature in num_vars:
                min_value = grouped.agg(F.min(feature).alias('min')).toPandas()['min']
                max_value = grouped.agg(F.max(feature).alias('max')).toPandas()['max']
            else:
                min_value = count_df[feature]
                max_value = count_df[feature]
                
                
            #calculate WOE and IV
            temp_woe_df = calculate_woe(count_df, event_df, min_value, max_value, feature)
            
            #sort by min value and keep increasing order
            temp_woe_df.sort_values(by = 'min_value', inplace=True)
            temp_woe_df.reset_index(inplace=True)
            temp_woe_df.drop(['index'], axis=1, inplace=True)
            temp_woe_df.reset_index(inplace=True)
            temp_woe_df.rename(columns={'index':'bin'},inplace=True)
                
            #mapping tavble between bin number and WOE
            temp_woe_df['bin_adjust'] = np.where(temp_woe_df['min_value'].isna(), -999, temp_woe_df['bin'])
            
            #separate IV dataset
            iv = pd.DataFrame({'IV': temp_woe_df.groupby('varname').tot_iv.max()})
            iv['predictive_ind'] = np.where(iv['IV']>= 0.02, 1, 0)
            iv = iv.reset_index()
            
            # ----------------------------------
            # save table for each predictor
            # ----------------------------------
            woe_iv_dict = temp_woe_df.to_dict()
            lst_tbl.append(woe_iv_dict)
            
            iv_dict = iv.to_dict()
            lst_iv.append(iv_dict)
            
        # ---------------------------------
        # save table for each predictor
        # ---------------------------------
        filename = (path + label + '_woe_tbl_FIT')
        pickle.dump(lst_tbl, open(filename,'wb'))
        print("WOE table saved")
        
        filename = (path + label + '_iv_tbl_FIT')
        pickle.dump(lst_iv, open(filename,'wb'))
        print("IV table saved")

## Read Hive

In [None]:
phys = spark.sql(""" select * from table """)
phys = phys.filter( (F.col('week_n') >= 18) )
phys.createOrReplaceTempView('phys')

print("number of obs:", phys.count())
print("number of cols:", len(phys.columns))

stats = phys.groupBy(['week_n']) \ 
            .agg(F.sum(F.col('target')).alias('tot pos'),
                F.count('*').alias('tot rows'),
                (F.sum(F.col('target'))/F.count('*')).alias('target rate')).orderBy('week_n')
                
df = stats.toPandas()

#fill missing
phys = phys.na.fill(value=0)

## Basic feature engineering (using spark window functions)

### List of initial features

In [None]:
pk_lst = ['id']
lst = ['svc_cat','proc_cd']
target = ['target']

full_lst = pk_lst + lst + target
print(full_lst)

#read data by trimming blanks
sdf = sdf.select(*full_lst)\
        .withColumn('svc_cat',trim(F.col('svc_cat')))
sdf.createOrReplaceTempView(sdf)

### target rate

In [None]:
stats = sdf.groupBy(['cohort','target'])\
            .agg(F.countDistinct(F.col('id')).alias('mbrs'))\
            .orderBy('cohort').toPandas()

stats2 = stats.pivot(index='cohort', columns = 'target', values = 'mbrs').reset_index().add_prefix('target_')
stats2['target_rate'] = stats2['target_1']/(stats2['target_0'] + stats2['target_1'])
stats2

### Step01 A: Categorical features

### Get top 5 categories for each ID ranked by total items

In [None]:
#define variable to partition
var_to_agg = ['proc_cd']

#iterate over categorical variables
for k in var_to_agg:
    
    #define variables final ID-level output: count of svc
    window_var = ['cohort','id'] + [k]
    metrics_var = ['dt_cnt']
    modeling_var = ['cohort','id'] + metrics_var + [k]
    
    #define window partition
    windowPartition = Window.partitionBy(window_var)
    
    #to rank order by total svc from high to low
    windowPartition2 = Window.partitionBy('cohort','id').orderBy(F.col('dt_cnt').desc())
    
    #apply window partition
    sdf2 = sdf.withColumn("dt_cnt",approx_count_distinct(F.col('svc_dt')).over(windowPartition))
    #sdf2 = sdf2.withColumn("cost",sum(F.col('cst')).over(windowPartition))
    
    #generate aggregated stats by ID
    agg_sdf = sdf.select(*modeling_var).distinct().orderBy(window_var)
    
    #rank-order total cost by ID
    agg_sdf = agg_sdf.withColumn("cnt_rank",dense_rank().over(windowPartition2))
    
    #select top 5 elements based on cost
    agg_sdf2 = agg_sdf.filter(F.col('cnt_rank') <= 5)
    
    #transform cost amount into 1/0
    if k == 'proc_cd':
        agg_sdf3 = agg_sdf2.withColumn("proc_cd2",when(F.col('dt_cnt')>0,1).otherwise(F.lit(0)))
        k1 = 'proc_cd2'
        
        #pivot to become ID-level
        pivotb = agg_sdf3.groupBy('cohort','id').pivot(k).agg(max(k1))
        
    else:
        #pivot to become Id-level
        pivotb = agg_sdf2.groupBy('cohort','id').pivot(k).agg(max("dt_cnt"))
        
        #get all cols of dataframe into a list
        total_columns = pivotb.columns
        total_columns.remove('id')
        total_columns.remove('cohort')
        print("total cols to rename:", len(total_columns))
        
        #run loop to rename all cols of dataframe with prefix
        for j in range(len(total_columns)):
            pivotb = pivotb.withColumnRenamed(total_columns[j], (k + '_') + total_columns[j])
        print("finished renaming cols")
        
    #fillna
    pivotb2 = pivotb.na.fill(value=0)
    
    #write as parquet
    print("writing to parquet ...........")
    filename = (k + '.parquet')
    pivotb2.write.partitionBy('cohort').mode('overwrite').parquet(path + filename)
    print(filename + " saved as parquet")

### Verify parquet files

In [None]:
var_to_agg = ['proc_cd']

#iterate over categorical variables
for k in var_to_agg:
    filename = (k + '.parquet')
    print("reading filename:", filename)
    sdf = spark.read.parquet(path + filename).cache()
    
    print("number of cols",len(sdf.columns))
    sdf.groupBy('cohort')\
        .agg(F.countDistinct(F.col('id')).alias('mbrs'),
            F.count(F.col('id')).alias('tot rows'))\
        .orderBy('cohort').show(10)
    print()

### Step01 B: Categorical features
### For each svc, get days since last event

In [None]:
var_to_agg = ['svc']

#iterate over categorical variables
for k in var_to_agg:
    
    #define variables final ID-level output: count of svc
    window_var = ['cohort','id'] + [k]
    metrics_var = ['days_since_lst_visit']
    modeling_var = ['cohort','id'] + metrics_var + [k]
    
    #define window partition
    windowPartition = Window.partitionBy(window_var).orderBy(F.col('days_since_lst_visit').asc())
    
    #apply window partition
    sdf2 = sdf.filter(F.col("days_since_lst_visit")>0).withColumn("days_ago_rk",dense_rank().over(windowPartition))
    
    #extract first instance of "days since last visit"
    agg_sdf2 = sdf2.filter(F.col("days_ago_rk") == 1).select(*modeling_var).distinct()
    
    #rename ID
    agg_sdf2 = agg_sdf2.withColumnRenamed('id','id_n')
    
    #append to list of ID
    final_sdf = sdf.select('cohort','id').distinct()\
                    .join(agg_sdf2, (sdf.id == agg_sdf2.id_n), "left").drop(*(['id_n']))
    
    
    #pivot to become ID-level
    pivotb = final_sdf.groupBy('cohort','id').pivot(k).agg(max('days_since_lst_visit'))
    
    #fillna
    pivotb = pivotb.na.fill(value=0)  #defaulting to day zero
    
    #remove extra null col
    pivotb2 = pivotb.drop(pivotb.null)
        
    #get all cols of dataframe into a list
    total_columns = pivotb2.columns
    total_columns.remove('id')
    total_columns.remove('cohort')
    print("total cols to rename:", len(total_columns))
        
    #run loop to rename all cols of dataframe with prefix
    for j in range(len(total_columns)):
        pivotb2 = pivotb2.withColumnRenamed(total_columns[j], (k + '_lst_visit_') + total_columns[j])
    print("finished renaming cols")
        
    #write as parquet
    print("writing to parquet ...........")
    filename = (k + '.parquet')
    pivotb2.write.partitionBy('cohort').mode('overwrite').parquet(path + filename)
    print(filename + " saved as parquet")

### C. One-hot encoding

In [None]:
cols_to_hot_encode = ['proc_cd']
all_cols = ['cohort','id'] + cols_to_hot_encode

df = profile.select(*all_cols).distinct().toPandas()

enc = OneHotEncoder(categories = 'auto', handle_unknown = 'ignore')

#passing instance 
enc_data_array = enc.fit_transform(df[cols_to_hot_encode]).toarray()
feat_labels = enc.categories_
feat_labels = np.array(feat_labels).ravel()
renamed_feats = enc.get_feature_names(cols_to_hot_encode)

#create pandas dataframe for encoded features
df_sklearn_encoded = pd.DataFrame(enc_data_array,columns = renamed_feats)
df_sklearn_encoded = df_sklearn_encoded.astype('int')

#drop original categorical cols once one-hot encoded
drop_raw_cols = df.drop(columns = cols_to_hot_encode,axis=1)
drop_raw_cols.head()

#join to major dataframe
data_out = pd.concat([drop_raw_cols,df_sklearn_encoded],axis=1)
print("dimension:", data_out.shape)
print(data_out.head())

### joining to categorical variables

In [None]:
var_to_agg = ['svc']

#iterate over categorical variables
for k in var_to_agg:
    filename = (k + '.parquet')
    print("reading filename:", filename)
    sdf = spark.read.parquet(path + filename).cache()
    sdf = sdf.withColumnRenamed('id','id_n')
    
    #open individial file and join to existng table
    final_sdf = final_sdf.join(sdf, (final_sdf.id == sdf.id_n), "left").drop(*(['id_n']))
    print("table join completed with variable:", k, "\n")

## Split into segments

In [None]:
# Hospital
df_hosp_manual = phys.filter( (F.col('hospital_ind') == 1) & (F.col('auto_adj') == 0) )
print("Hospital manual")

# save parquet
df_hosp.write.partitionBy('week_n').mode('overwrite').parquet(path + 'segment1')

## Split into train/out-time

In [None]:
seg_lab = ['seg1','seg2','seg3','seg4']

for s in seg_lab:
    print(s)
    seg = spark.read.parquet(path + s + '.parquet')
    train = seg.filter( (F.col('week_n') >= 18) & (F.col('week_n') <= 27) )
    val = seg.filter( (F.col('week_n') >= 28) & (F.col('week_n') <= 30) )
    
    print("training")
    print("number of obs:", train.count())
    print("number of cols:", len(train.columns))
    print()
    
    # save parquet
    train.write.partitionBy('week_n').mode('overwrite').parquet(path + s + 'train_cohort.parquet')
    print("training has been saved")

## Split into in-time train/val

In [None]:
seg_lab = ['seg1','seg2','seg3','seg4']

for s in seg_lab:
    print(s)
    
    #read saved training
    seg = spark.read.parquet(path + s + 'train_cohort.parquet')
    
    split_datasets(sdf_input = seg,
                  path = path,
                  train_size = 0.80,
                  test_size = 0.20,
                  s = s)
    print()

## Read datasets

In [1]:
seg_lab = ['seg1','seg2','seg3','seg4']

for s in seg_lab:
    print("***************************")
    print(s)
    # --------------------------
    # training
    # --------------------------
    trn_sdf = spark.read.parquet(path + s + 'in_smpl_trainset.parquet')
    print("number of obs:", trn_sdf.count())
    print("number of cols:", len(trn_sdf.columns))
    stats = phys.groupBy(['week_n']) \ 
            .agg(F.sum(F.col('target')).alias('tot pos'),
                F.count('*').alias('tot rows'),
                 (F.count('*') - F.sum(F.col("target"))).alias('tot neg'),
                (F.sum(F.col('target'))/F.count('*')).alias('target rate')).orderBy('week_n').toPandas()
    print(stats)
    print()

SyntaxError: unexpected character after line continuation character (4050191432.py, line 12)