In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql.functions import trim

from pyspark.sql import SQLContext

from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import StringIndexer
from pyspark.ml.clustering import KMeansModel
from pyspark.ml import Pipeline

import pandas as pd

hc = HiveContext(sc)

In [2]:
data = hc.table("cust_exp_enc.n369087_cp_segmentation_Sub_base_v3")

In [4]:
data.columns

['individual_id',
 'individual_analytics_identifier',
 'proxy_id',
 'funding_category',
 'customer_sub_segment_code',
 'medical_plan_type',
 'gender',
 'age',
 'covered_dependents_count',
 'vision_product',
 'dental_product',
 'rx_ind',
 'pas_12_months_login',
 'action_atts',
 'call_count',
 'hdhp_flag',
 'high_deductible_flag',
 'pas_12_claims',
 'pas_12_med_cost',
 'pas_12_med_cost_inn',
 'pas_12_med_cost_oon',
 'pas_12_inn_visits',
 'pas_12_oon_visits',
 'pas_12_pcp_visits',
 'urbsubr',
 'employment_index',
 'a_hh_median_income',
 'higher_education_index',
 'physical_inactivity_index',
 'hpd_at_risk',
 'hpd_child_chronic',
 'hpd_behavioral_health',
 'hpd_specialty_chronic',
 'hpd_at_polychronic',
 'hpd_others',
 'hpd_category_count',
 'disease_count',
 'eng_ind',
 'soe_active_not_engaged',
 'complaint_count',
 'appeal_count',
 'claim_tenure',
 'member_tenure',
 'new_ind']

In [3]:
cols_to_keep = ['individual_id',
 'individual_analytics_identifier',
 'proxy_id',
 'funding_category',
 'customer_sub_segment_code',
 'medical_plan_type',
 'gender',
 'age',
 'covered_dependents_count',
 'vision_product',
 'dental_product',
 'rx_ind',
 'hdhp_flag',
 'high_deductible_flag',
 'urbsubr',
 'employment_index',
 'a_hh_median_income',
 'higher_education_index',
 'physical_inactivity_index',
 'new_ind'
]

In [39]:
data_clssfctn = data[cols_to_keep]

In [40]:
data_clssfctn.groupby("new_ind").agg(F.count("individual_analytics_identifier").alias("t_cnt"),\
                                    F.countDistinct("individual_analytics_identifier").alias("uni_cnt")).collect()

[Row(new_ind=None, t_cnt=65847, uni_cnt=65847),
 Row(new_ind=1, t_cnt=2524685, uni_cnt=2524685),
 Row(new_ind=0, t_cnt=5369927, uni_cnt=5369927)]

In [41]:
#4_clusters
data_clustrng = hc.table("cust_exp_enc.n275675_cp_segmentation_base_prd_V1")

In [42]:
data_clssfctn_1 = data_clssfctn.join(data_clustrng.select("individual_analytics_identifier", "prediction"),\
                                     "individual_analytics_identifier", "inner")

In [14]:
data_clssfctn_1.show()

+-------------------------------+----------------+------------+----------------+-------------------------+------+---+------------------------+--------------+--------------+------+---------+--------------------+-------+----------------+------------------+----------------------+-------------------------+-------+----------+
|individual_analytics_identifier|   individual_id|    proxy_id|funding_category|customer_sub_segment_code|gender|age|covered_dependents_count|vision_product|dental_product|rx_ind|hdhp_flag|high_deductible_flag|urbsubr|employment_index|a_hh_median_income|higher_education_index|physical_inactivity_index|new_ind|prediction|
+-------------------------------+----------------+------------+----------------+-------------------------+------+---+------------------------+--------------+--------------+------+---------+--------------------+-------+----------------+------------------+----------------------+-------------------------+-------+----------+
|                       1000010

In [10]:
data_clssfctn_1.count()#7,828,256

2501368

In [43]:
data_clssfctn_1.write.option("sep","|").format('orc').mode("overwrite").saveAsTable("cust_exp_enc.n201366_segmentation_2020_nw_mbrs_clssfctn_inpv1_1")

In [44]:
inp_data = hc.table("cust_exp_enc.n201366_segmentation_2020_nw_mbrs_clssfctn_inpv1_1")

In [45]:
def drop_multiple_values(df, col_list):
    for item in col_list:
        df = df.filter(~(F.col(item).like('%/%')))
    return df

In [46]:
#Updating urban value in urban suburban indicator :
inp_data = inp_data.withColumn("urbsubr", when(trim(col("urbsubr")) == "U", "Urb").otherwise(col("urbsubr")))

In [47]:
def drop_unknown_n_nulls(df,col_list):
    for item in col_list:
        
        df = df.withColumn(item, when(F.lower(F.col(item)) == 'u', None).otherwise(F.col(item)))
        df = df.withColumn(item, when(trim(col(item)) == '',None).otherwise(col(item)))
    df = df.na.drop()
    return df

def string_to_num(df, col_list):
    for item in col_list:
        categories = df.select(item).distinct().rdd.flatMap(lambda x: x).collect()

        print categories
        exprs = [(F.when(F.col(item) == category, 1).otherwise(0)).alias(item+"_"+category.replace(" ","").\
                                                                         replace("(","").replace(")","")) for category in categories]

        df = df.select('*', *exprs)
        
    return df

In [48]:
def preprocess_part1(data, cols_to_keep):
    data_1 = drop_multiple_values(data, cols_to_keep)
    #print data_1.count()
    
    data_2 = drop_unknown_n_nulls(data_1, cols_to_keep)
    #print data_2.count()

    
    
    #data_3 = string_to_num(data_2,cat_list)
    #print data_3.count()
    
    return data_2

In [49]:
cat_list = ['funding_category',
                 'customer_sub_segment_code',
                 'gender',
                 'urbsubr',
           'medical_plan_type']

In [50]:
data_pre_processed = preprocess_part1(inp_data,cols_to_keep )
data_pre_processed.show(5)

+-------------------------------+-------------+------------+----------------+-------------------------+-----------------+------+---+------------------------+--------------+--------------+------+---------+--------------------+-------+----------------+------------------+----------------------+-------------------------+-------+----------+
|individual_analytics_identifier|individual_id|    proxy_id|funding_category|customer_sub_segment_code|medical_plan_type|gender|age|covered_dependents_count|vision_product|dental_product|rx_ind|hdhp_flag|high_deductible_flag|urbsubr|employment_index|a_hh_median_income|higher_education_index|physical_inactivity_index|new_ind|prediction|
+-------------------------------+-------------+------------+----------------+-------------------------+-----------------+------+---+------------------------+--------------+--------------+------+---------+--------------------+-------+----------------+------------------+----------------------+-------------------------+------

In [51]:
data_pre_processed.write.option("sep","|").format('orc').mode("overwrite").saveAsTable("cust_exp_enc.n201366_segmentation_2020_nw_mbrs_clssfctn_inpv2_2")

In [2]:
post_pre_process = hc.table("cust_exp_enc.n201366_segmentation_2020_nw_mbrs_clssfctn_inpv2_2")

In [3]:
to_scale_features = ['age',
             'covered_dependents_count',
                     'a_hh_median_income']

In [4]:
_scaled_pre_process = post_pre_process
for item in to_scale_features:
    vec_assembler = VectorAssembler(inputCols= [item] , outputCol = "vec"+item)
    scaler = StandardScaler(inputCol="vec"+item, outputCol= 'scaled_'+item)
    vec_scaler_pipeline = Pipeline(stages=[vec_assembler, scaler])
    scaled = vec_scaler_pipeline.fit(_scaled_pre_process)
    _scaled_pre_process = scaled.transform(_scaled_pre_process)
    

In [5]:
data_scaled = _scaled_pre_process.drop('age',
                                       'covered_dependents_count',
                                       'a_hh_median_income',
                                       'vecage',
                                       'veccovered_dependents_count',
                                       'veca_hh_median_income'
                                      )
for item in to_scale_features:
    
    data_scaled = data_scaled.withColumnRenamed("scaled_"+item, item)

In [6]:
data_scaled.columns

['individual_analytics_identifier',
 'individual_id',
 'proxy_id',
 'funding_category',
 'customer_sub_segment_code',
 'medical_plan_type',
 'gender',
 'vision_product',
 'dental_product',
 'rx_ind',
 'hdhp_flag',
 'high_deductible_flag',
 'urbsubr',
 'employment_index',
 'higher_education_index',
 'physical_inactivity_index',
 'new_ind',
 'prediction',
 'age',
 'covered_dependents_count',
 'a_hh_median_income']

In [8]:
cat_list = ['funding_category',
                 'customer_sub_segment_code',
                 'gender',
                 'urbsubr',
           'medical_plan_type']

In [9]:
con_list = ['vision_product',
            'dental_product',
            'rx_ind',
            'hdhp_flag',
            'high_deductible_flag',
            'employment_index',
            'higher_education_index',
            'physical_inactivity_index',
            'new_ind',
            'age',
            'covered_dependents_count',
            'a_hh_median_income'
           ]
labelcol = 'prediction'
indexcol = 'individual_analytics_identifier'

In [10]:
con_list_wo_income = [x for x in con_list if x != 'a_hh_median_income']

In [11]:
def string_to_num(df, col_list):
    for item in col_list:
        categories = df.select(item).distinct().rdd.flatMap(lambda x: x).collect()

        print categories
        exprs = [(F.when(F.col(item) == categories[i], 1).otherwise(0)).\
                 alias(item+"_"+categories[i].replace(" ","").\
                 replace("(","").replace(")","")) for i in range(0, len(categories)-1)]

        df = df.select('*', *exprs)
        
    return df

In [12]:
data_scaled_1 = string_to_num(data_scaled, cat_list)

[u'B', u'C', u'A']
[u'NA', u'SEL', u'BOA', u'SG', u'KEY', u'NAG', u'FED']
[u'F', u'M']
[u'S', u'R', u'Urb']
[u'Managed Choice', u'Indemnity Medical', u'HMO (ACAS)', u'QPOS (ACAS)', u'PPO Medical']


In [14]:
data_scaled_1.columns

['individual_analytics_identifier',
 'individual_id',
 'proxy_id',
 'funding_category',
 'customer_sub_segment_code',
 'medical_plan_type',
 'gender',
 'vision_product',
 'dental_product',
 'rx_ind',
 'hdhp_flag',
 'high_deductible_flag',
 'urbsubr',
 'employment_index',
 'higher_education_index',
 'physical_inactivity_index',
 'new_ind',
 'prediction',
 'age',
 'covered_dependents_count',
 'a_hh_median_income',
 'funding_category_B',
 'funding_category_C',
 'customer_sub_segment_code_NA',
 'customer_sub_segment_code_SEL',
 'customer_sub_segment_code_BOA',
 'customer_sub_segment_code_SG',
 'customer_sub_segment_code_KEY',
 'customer_sub_segment_code_NAG',
 'gender_F',
 'urbsubr_S',
 'urbsubr_R',
 'medical_plan_type_ManagedChoice',
 'medical_plan_type_IndemnityMedical',
 'medical_plan_type_HMOACAS',
 'medical_plan_type_QPOSACAS']

In [15]:
cat_list_split = ['funding_category_B',
 'funding_category_C',
 'customer_sub_segment_code_NA',
 'customer_sub_segment_code_SEL',
 'customer_sub_segment_code_BOA',
 'customer_sub_segment_code_SG',
 'customer_sub_segment_code_KEY',
 'customer_sub_segment_code_NAG',
 'gender_F',
 'urbsubr_S',
 'urbsubr_R',
 'medical_plan_type_ManagedChoice',
 'medical_plan_type_IndemnityMedical',
 'medical_plan_type_HMOACAS',
 'medical_plan_type_QPOSACAS']

### Creating features vector with categorical variables to do chi square test

In [30]:
def get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol):

    from pyspark.ml import Pipeline
    from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
    from pyspark.sql.functions import col

    
    
    assembler = VectorAssembler(inputCols= categoricalCols
                                , outputCol="features")

    pipeline = Pipeline(stages=[assembler])

    model=pipeline.fit(df)
    data = model.transform(df)

    data = data.withColumn('label',col(labelCol))

    return data

In [31]:
model_input = get_dummy(data_scaled_1,indexcol,cat_list_split,con_list_wo_income,labelcol)

In [34]:
model_input.select("features").show(5)

+--------------------+
|            features|
+--------------------+
|(15,[0,2,8,9,11],...|
|(15,[0,2,8,11],[1...|
|(15,[0,6,8],[1.0,...|
|(15,[0,2,13],[1.0...|
|(15,[0,2,8,10,11]...|
+--------------------+
only showing top 5 rows



#### Chi Square Tests for Binary and nominal

In [35]:
# Feature selection using chisquareSelector
from pyspark.ml.feature import ChiSqSelector
css = ChiSqSelector(featuresCol='features',outputCol='Aspect',labelCol='label',fpr=0.1)
css_fit = css.fit(model_input)
model_input_1 = css_fit.transform(model_input)

#model_input_1.select("Aspect").show(5,truncate=False)

In [36]:
css_fit.selectedFeatures

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]

In [38]:
model_input.select("features").show(5)

+--------------------+
|            features|
+--------------------+
|(15,[0,2,8,9,11],...|
|(15,[0,2,8,11],[1...|
|(15,[0,6,8],[1.0,...|
|(15,[0,2,13],[1.0...|
|(15,[0,2,8,10,11]...|
+--------------------+
only showing top 5 rows



In [40]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.stat import Statistics

In [41]:
to_labeled_point = lambda x: LabeledPoint(x[0], Vectors.dense(x[1].toArray()))

In [42]:
obs = (
    model_input
    .select('label', 'features')
    .rdd
    .map(to_labeled_point)
)

In [43]:
feature_test_results = Statistics.chiSqTest(obs)

chi_sq_data = []

for idx, result in enumerate(feature_test_results):
    row = {
        'feature_index': idx,
        'p_value': result.pValue,
        'statistic': result.statistic,
        'degrees_of_freedom': result.degreesOfFreedom
    }
    chi_sq_data.append(row)

In [44]:
chi_sq_data

[{'degrees_of_freedom': 3,
  'feature_index': 0,
  'p_value': 0.0,
  'statistic': 178320.53763035915},
 {'degrees_of_freedom': 3,
  'feature_index': 1,
  'p_value': 0.0,
  'statistic': 9896.189132675601},
 {'degrees_of_freedom': 3,
  'feature_index': 2,
  'p_value': 0.0,
  'statistic': 32564.321958920526},
 {'degrees_of_freedom': 3,
  'feature_index': 3,
  'p_value': 0.0,
  'statistic': 73873.9153762902},
 {'degrees_of_freedom': 3,
  'feature_index': 4,
  'p_value': 0.0,
  'statistic': 8237.96881328679},
 {'degrees_of_freedom': 3,
  'feature_index': 5,
  'p_value': 0.0,
  'statistic': 89708.4277285771},
 {'degrees_of_freedom': 3,
  'feature_index': 6,
  'p_value': 0.0,
  'statistic': 50838.460692831555},
 {'degrees_of_freedom': 3,
  'feature_index': 7,
  'p_value': 0.0,
  'statistic': 119108.73584925431},
 {'degrees_of_freedom': 3,
  'feature_index': 8,
  'p_value': 0.0,
  'statistic': 2694.8435616040883},
 {'degrees_of_freedom': 3,
  'feature_index': 9,
  'p_value': 0.0,
  'statistic'

## Creating Model Input vector

In [45]:
def get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol):

    from pyspark.ml import Pipeline
    from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
    from pyspark.sql.functions import col

    
    
    assembler = VectorAssembler(inputCols= categoricalCols +continuousCols
                                , outputCol="features")

    pipeline = Pipeline(stages=[assembler])

    model=pipeline.fit(df)
    data = model.transform(df)

    data = data.withColumn('label',col(labelCol))

    return data

In [46]:
model_input_fn = get_dummy(data_scaled_1,indexcol,cat_list_split,con_list,labelcol)

In [47]:
model_input_fn_1 = model_input_fn.select("individual_analytics_identifier","new_ind","features", "label")

In [48]:
model_input_fn_1.write.option("sep","|").format('orc').mode("overwrite").saveAsTable("cust_exp_enc.n201366_segmentation_2020_nw_mbrs_clssfctn_inpv3_3")

In [49]:
model_inp = hc.table("cust_exp_enc.n201366_segmentation_2020_nw_mbrs_clssfctn_inpv3_3")

In [50]:
train, test = model_inp.randomSplit([0.7, 0.3], seed = 777)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

Training Dataset Count: 5441106
Test Dataset Count: 2333834


In [52]:
from pyspark.ml.classification import RandomForestClassifier

In [53]:
rf = RandomForestClassifier(labelCol="label", seed=42)
rfModel = rf.fit(train)

In [54]:
rfModel.featureImportances

SparseVector(27, {0: 0.0114, 2: 0.0008, 3: 0.0007, 5: 0.0039, 6: 0.0002, 7: 0.0066, 8: 0.0002, 9: 0.0002, 10: 0.0019, 11: 0.0012, 12: 0.0, 13: 0.0003, 14: 0.0001, 15: 0.0001, 16: 0.0022, 17: 0.0029, 18: 0.0006, 19: 0.0033, 20: 0.0032, 21: 0.0007, 22: 0.0008, 23: 0.4168, 24: 0.5264, 25: 0.0142, 26: 0.0014})

In [83]:
for i in train.schema["features"].metadata["ml_attr"]["attrs"]:
    print train.schema["features"].metadata["ml_attr"]["attrs"][i]

[{u'name': u'funding_category_B', u'idx': 0}, {u'name': u'funding_category_C', u'idx': 1}, {u'name': u'customer_sub_segment_code_NA', u'idx': 2}, {u'name': u'customer_sub_segment_code_SEL', u'idx': 3}, {u'name': u'customer_sub_segment_code_BOA', u'idx': 4}, {u'name': u'customer_sub_segment_code_SG', u'idx': 5}, {u'name': u'customer_sub_segment_code_KEY', u'idx': 6}, {u'name': u'customer_sub_segment_code_NAG', u'idx': 7}, {u'name': u'gender_F', u'idx': 8}, {u'name': u'urbsubr_S', u'idx': 9}, {u'name': u'urbsubr_R', u'idx': 10}, {u'name': u'medical_plan_type_ManagedChoice', u'idx': 11}, {u'name': u'medical_plan_type_IndemnityMedical', u'idx': 12}, {u'name': u'medical_plan_type_HMOACAS', u'idx': 13}, {u'name': u'medical_plan_type_QPOSACAS', u'idx': 14}, {u'name': u'vision_product', u'idx': 15}, {u'name': u'dental_product', u'idx': 16}, {u'name': u'rx_ind', u'idx': 17}, {u'name': u'hdhp_flag', u'idx': 18}, {u'name': u'high_deductible_flag', u'idx': 19}, {u'name': u'employment_index', u'idx

In [57]:
def ExtractFeatureImp(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))

In [60]:
ExtractFeatureImp(rfModel.featureImportances, train, "features")

Unnamed: 0,idx,name,score
24,24,age_0,0.526369
23,23,new_ind,0.416823
25,25,covered_dependents_count_0,0.014202
0,0,funding_category_B,0.011353
7,7,customer_sub_segment_code_NAG,0.006622
5,5,customer_sub_segment_code_SG,0.003901
19,19,high_deductible_flag,0.00332
20,20,employment_index,0.003182
17,17,rx_ind,0.002861
16,16,dental_product,0.002246


In [61]:
train_predictions  = rfModel.transform(train)

In [62]:
test_predictions  = rfModel.transform(test)

In [65]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator as MCE
evaluator = MCE()

In [66]:
print evaluator.evaluate(train_predictions)
print evaluator.evaluate(test_predictions)

0.849629599072
0.849633782225


In [69]:
rf.explainParams()

'cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval. (default: False)\ncheckpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. (default: 10)\nfeatureSubsetStrategy: The number of features to consider for splits at each tree node. Supported options: auto, all, onethird, sqrt, log2, (0.0-1.0], [1-n]. (default: auto)\nfeaturesCol: features column name. (default: features)\nimpurity: Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini (default: gini)\nlabelCol: label column name. (default: label, current: label)\nmaxBins: Max number of bins for discretizing continuous features.  Must be >=2 and >

In [70]:
train_crschck = train_predictions.groupby("new_ind","label","prediction").count().toPandas()

In [71]:
train_crschck.sort_values(by = ["label"])

Unnamed: 0,new_ind,label,prediction,count
4,1,0,0.0,1040412
6,0,0,3.0,410193
9,1,0,2.0,3883
1,0,1,3.0,5082
5,0,1,1.0,1446332
0,0,2,3.0,85941
3,1,2,2.0,688730
7,0,2,1.0,262942
10,1,2,0.0,8410
2,0,3,1.0,19358


In [72]:

test_crschck = test_predictions.groupby("new_ind","label","prediction").count().toPandas()
test_crschck.sort_values(by = ["label"])

Unnamed: 0,new_ind,label,prediction,count
4,1,0,0.0,447008
6,0,0,3.0,175918
9,1,0,2.0,1717
1,0,1,3.0,2184
5,0,1,1.0,620093
0,0,2,3.0,36865
3,1,2,2.0,295720
7,0,2,1.0,112823
10,1,2,0.0,3569
2,0,3,1.0,8114


In [92]:
rfModel.getNumTrees, rfModel.numFeatures, rfModel.totalNumNodes, rfModel.extractParamMap

(20,
 27,
 1224,
 <bound method RandomForestClassificationModel.extractParamMap of RandomForestClassificationModel (uid=rfc_21c2ddbc87c6) with 20 trees>)