In [43]:
# !pip install awswrangler

In [44]:
import pandas as pd

import boto3
import numpy as np
import io
import awswrangler as wr
import sagemaker
from sagemaker import get_execution_role
from sagemaker.session import s3_input, Session
import pickle


import constants as params

In [45]:
def create_columns_types(df_cols, y_name):
    
    ind_cols = list(df_cols[df_cols.str.contains('_ind')])
    pmpm_cols =  list(df_cols[df_cols.str.contains('_pmpm_')])
    score_cols = list(df_cols[df_cols.str.contains('_score')])

    categorical_cols = ind_cols + [y_name]
    numerical_cols = pmpm_cols + score_cols
    ordinal_cols = []
    return (categorical_cols, numerical_cols, ordinal_cols)

In [46]:
def data_type_conversion(df, categorical_cols, numerical_cols):
    df[numerical_cols] = df[numerical_cols].apply(pd.to_numeric, errors='coerce')
    df[categorical_cols] = df[categorical_cols].astype("category")
    return df

#### Creating summation features

In [47]:
def sum_feature_generation(df, categories, categorical_cols, numerical_cols):
    for category in categories:
        new_ind_name = category + "_ind_sum"
        subset_cols_criteria = "_" + category + "_"
        subset_cols = [x for x in df.columns if subset_cols_criteria in x and '_ind' in x]
        df[new_ind_name] = df[subset_cols].apply(pd.to_numeric, errors='coerce').sum(axis=1)
        df[subset_cols] = df[subset_cols].astype("category")
        df[new_ind_name] = df[new_ind_name].astype("category")
        numerical_cols.append(new_ind_name)
        

        new_pmpm_name = category + "_pmpm_sum"
        subset_cols_criteria = "_" + category + "_"
        subset_cols = [x for x in df.columns if subset_cols_criteria in x and '_pmpm' in x]

        df[new_pmpm_name] = df[subset_cols].apply(pd.to_numeric, errors='coerce').sum(axis=1)
        numerical_cols.append(new_pmpm_name)
#         df = add_columns(df=df, col_name=new_ind_name, columns= subset_cols)
    return (df_subset, categorical_cols, numerical_cols)

#### creating a final column to have the count of number of services used by each person using ind values


In [48]:
def total_sum_features(df, numerical_cols, categorical_cols):
    df['total_ind'] = df[df.columns[df.columns.str.contains('_ind_sum')]].sum(axis=1)
    df['total_ind'] = df['total_ind'].apply(pd.to_numeric, errors = 'coerce')
    numerical_cols.append('total_ind')
    
    df['total_pmpm'] = df[df.columns[df.columns.str.contains('_pmpm_sum')]].sum(axis=1)
    df['total_pmpm'] = df['total_pmpm'].apply(pd.to_numeric, errors = 'coerce')
    numerical_cols.append('total_pmpm')

    df['service_bool'] = np.where(df['total_ind'] == 0, 0, 1)
    df['service_bool'] = df['service_bool'].astype("category")
    categorical_cols.append('service_bool')
    return df, numerical_cols, categorical_cols

In [49]:
# # To create a new bucket
# bucket_name = 'bucket-name55422' # new bucket_name
# my_region = boto3.session.Session().region_name

# s3 = boto3.resource('s3')
# try:
#     if my_region == "us-east-1":
#         s3.create_bucket(Bucket = bucket_name)
#         print('S3 bucket created successfully')
# except Exception as e:
#     print('S3 error')

In [50]:
sess = sagemaker.Session()
role = get_execution_role()

bucket = "humana-data"
prefix = 'rawdata/original_pq_files'
prefix_data = "intermediate/condition/data"
prefix_metadata = "intermediate/condition/models/metadata"
prefix_model = "intermediate/condition/models"


conn = boto3.client('s3')
contents = conn.list_objects(Bucket=bucket, Prefix=prefix)['Contents']

In [51]:

condition_file = "Condition.pq"
dependent_file = "dependent.pq"

condition_df = wr.s3.read_parquet(path = f's3://{bucket}/{prefix}/{condition_file}')
condition_df = condition_df.set_index(['person_id_syn'])
condition_df.columns = condition_df.columns.str.lower()

dependent_df = wr.s3.read_parquet(path = f's3://{bucket}/{prefix}/{dependent_file}')
dependent_df = dependent_df.set_index(['person_id_syn'])
dependent_df['transportation_issues'] = dependent_df['transportation_issues'].astype("category")
dependent_df.columns = dependent_df.columns.str.lower()

df_subset = dependent_df.merge(condition_df, how='left', left_index=True, right_index=True)

categorical_cols, numerical_cols, ordinal_cols = create_columns_types(df_cols = df_subset.columns,
                                                                      y_name = params.dependent_variable)

df_subset[categorical_cols] = np.where(df_subset[categorical_cols] != 0, 1,0)

df_subset, categorical_cols, numerical_cols = sum_feature_generation(df_subset, params.categories, 
                                              categorical_cols =categorical_cols,
                                             numerical_cols = numerical_cols)

df_subset, numerical_cols, categorical_cols = total_sum_features(df = df_subset, 
                                                                 numerical_cols=numerical_cols, 
                                                                 categorical_cols=categorical_cols)


df_subset = data_type_conversion(df=df_subset, 
                                 categorical_cols=categorical_cols, 
                                 numerical_cols=numerical_cols)

  df[new_pmpm_name] = df[subset_cols].apply(pd.to_numeric, errors='coerce').sum(axis=1)
  df[new_ind_name] = df[subset_cols].apply(pd.to_numeric, errors='coerce').sum(axis=1)
  df['total_ind'] = df[df.columns[df.columns.str.contains('_ind_sum')]].sum(axis=1)
  df['service_bool'] = np.where(df['total_ind'] == 0, 0, 1)


In [60]:
train_df = df_subset.sample(frac=0.7, random_state=543)
valid_df = df_subset[~(df_subset.index.isin(train_df.index))]

In [61]:
filename_train ='train_fe.pq'
filename_valid ='valid_fe.pq'

wr.s3.to_parquet(train_df, path = f's3://{bucket}/{prefix_data}/{filename_train}', compression='gzip', index=True)
wr.s3.to_parquet(valid_df, path = f's3://{bucket}/{prefix_data}/{filename_valid}', compression='gzip', index=True)

{'paths': ['s3://humana-data/intermediate/condition/data/valid_fe.pq'],
 'partitions_values': {}}

#### Build XgBoost model

In [62]:
filename_train ='train_fe.pq'
filename_valid ='valid_fe.pq'

train_df = wr.s3.read_parquet(path = f's3://{bucket}/{prefix_data}/{filename_train}')
valid_df = wr.s3.read_parquet(path = f's3://{bucket}/{prefix_data}/{filename_valid}')

In [63]:
from sklearn.preprocessing import OneHotEncoder

# one hot encoding
enc = OneHotEncoder(sparse=False, handle_unknown="ignore" )
one_hot_encode_cols = [x for x in categorical_cols if x != params.dependent_variable]
train_df_encoded = pd.DataFrame(enc.fit_transform(train_df[one_hot_encode_cols]), index=train_df.index)
train_df_encoded.columns = enc.get_feature_names_out()

train_model_df = pd.concat([train_df.drop(one_hot_encode_cols, axis=1), train_df_encoded], axis=1, ignore_index=False)

In [64]:
# Save the encoded categorical model
import tempfile
import boto3
import joblib

filename_encoder ='enc.pkl'
path = prefix_model + '/' + filename_encoder

# WRITE
with tempfile.TemporaryFile() as fp:
    joblib.dump(enc, fp)
    fp.seek(0)
    conn.put_object(Body=fp.read(), Bucket=bucket, Key=path)

# READ
# with tempfile.TemporaryFile() as fp:
#     conn.download_fileobj(Fileobj=fp, Bucket=bucket, Key=path)
#     fp.seek(0)
#     enc2 = joblib.load(fp)

# # DELETE
# conn.delete_object(Bucket=bucket, Key=path)

In [65]:
valid_df_encoded = pd.DataFrame(enc.transform(valid_df[one_hot_encode_cols]), index=valid_df.index)
valid_df_encoded.columns = enc.get_feature_names_out()
valid_model_df = pd.concat([valid_df.drop(one_hot_encode_cols, axis=1), valid_df_encoded], axis=1, ignore_index=False)

In [66]:
# # Write the metadata of index
# train_id_index = train_model_df.reset_index()["person_id_syn"]
# train_id_index = train_id_index.reset_index().rename(columns={'index':'index_num'})
# filename_train_id_index = "train_index_id.pq"
# wr.s3.to_parquet(train_id_index, path = 's3://{}/{}/{}'.format(bucket, prefix_metadata, filename_train_id_index),
#                  compression='gzip', index=False)


# valid_id_index = valid_model_df.reset_index()["person_id_syn"]
# valid_id_index = valid_id_index.reset_index().rename(columns={'index':'index_num'})
# filename_valid_id_index = "valid_index_id.pq"
# wr.s3.to_parquet(valid_id_index, path = 's3://{}/{}/{}'.format(bucket, prefix_metadata, filename_valid_id_index),
#                  compression='gzip', index=False)


# # Write the metadata of columns
# train_model_columns = train_model_df.columns
# test_model_columns = [x for x in train_model_columns if x != params.dependent_variable]

In [74]:
# Write the metadata of columns, index of train and valid and test
filename_metadata = prefix_metadata + '/' + 'metadata_dict'

metadata_dict = {}
metadata_dict['train_index'] = dict(zip(list(train_model_df.reset_index()['person_id_syn'].index), train_model_df.reset_index()['person_id_syn']))
metadata_dict['valid_index'] = dict(zip(list(valid_model_df.reset_index()['person_id_syn'].index), valid_model_df.reset_index()['person_id_syn']))
metadata_dict['train_columns'] = list(train_model_df.columns)
metadata_dict['test_columns'] = [x for x in train_model_df.columns if x != params.dependent_variable]
metadata_dict['categorical_cols'] = [x for x in categorical_cols if x != params.dependent_variable]
metadata_dict['numerical_cols'] = numerical_cols.copy()


conn.put_object(Bucket=bucket, Key=filename_metadata, Body=pickle.dumps(metadata_dict))


# # to read the file
# metadata_dict = conn.get_object(Bucket=bucket, Key=filename_metadata)
# serializedObject = metadata_dict['Body'].read()
# metadata_dict = pickle.loads(serializedObject)
# metadata_dict

{'ResponseMetadata': {'RequestId': 'BF0JN46KWCB3FCXN',
  'HostId': '+5G7zi7mTkJWk0HT7kdoX929ATxAczLGnxU/XcJAGHJxCl527WnUSnBpBwGJuxAppYy2hE7pONw=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': '+5G7zi7mTkJWk0HT7kdoX929ATxAczLGnxU/XcJAGHJxCl527WnUSnBpBwGJuxAppYy2hE7pONw=',
   'x-amz-request-id': 'BF0JN46KWCB3FCXN',
   'date': 'Fri, 11 Nov 2022 21:07:14 GMT',
   'x-amz-version-id': 'AI3fnePi0dg6f5ZaaPtWmVa_UaX2sT4I',
   'etag': '"c47ce33b1e1a100b81e5f57a530bd737"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"c47ce33b1e1a100b81e5f57a530bd737"',
 'VersionId': 'AI3fnePi0dg6f5ZaaPtWmVa_UaX2sT4I'}

In [30]:
import s3fs
prefix_data = "intermediate/condition/data"
filename_train_model ='train_model.csv'
filename_valid_model ='valid_model.csv'

bytes_to_write = train_model_df.to_csv(index=False, header=False).encode()
fs = s3fs.S3FileSystem()
with fs.open('s3://{}/{}/{}'.format(bucket, prefix_data, filename_train_model), 'wb') as f:
    f.write(bytes_to_write)

bytes_to_write = valid_model_df.to_csv(index=False, header=False).encode()
fs = s3fs.S3FileSystem()
with fs.open('s3://{}/{}/{}'.format(bucket, prefix_data, filename_valid_model), 'wb') as f:
    f.write(bytes_to_write)

In [31]:
prefix_data = "intermediate/condition/data"
filename_train_model ='train_model.csv'
filename_valid_model ='valid_model.csv'

## Specifies the path to training and validation in S3
s3_input_train = sagemaker.inputs.TrainingInput(s3_data='s3://{}/{}/{}'.format(bucket, prefix_data, filename_train_model), content_type='csv')
s3_input_valid = sagemaker.inputs.TrainingInput(s3_data='s3://{}/{}/{}'.format(bucket, prefix_data, filename_valid_model), content_type='csv')


In [32]:
from sagemaker.amazon_estimator import get_image_uri
from sagemaker.session import s3_input, Session



In [33]:
prefix_model

'intermediate/condition/models'

In [34]:
prefix_model = "intermediate/condition/models/xgboost-condition-data"
output_path  = 's3://{}/{}/output'.format(bucket, prefix_model)
output_path

's3://humana-data/intermediate/condition/models/xgboost-condition-data/output'

In [35]:
region_name = boto3.Session().region_name

In [36]:
container = get_image_uri(region_name, 'xgboost', 'latest')

The method get_image_uri has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


In [37]:
hyperparameters = {
    "num_round":100,
    "max_depth":8,
    "eta":0.2,
    "gamma": 3,
    "objective":"binary:logistic",
}

In [38]:
estimator = sagemaker.estimator.Estimator(image_uri = container,
                                         hyperparameters = hyperparameters,
                                         role = sagemaker.get_execution_role(),
                                         instance_count = 1,
                                         instance_type = 'ml.m5.large',
                                         volume_size = 5,
                                         output_path = output_path,
                                         use_spot_instances = True,
                                          max_run = 300,
                                          max_wait = 600)


In [39]:
estimator.fit({'train':s3_input_train, 'validation':s3_input_valid})

2022-11-11 20:32:41 Starting - Starting the training job...
2022-11-11 20:33:05 Starting - Preparing the instances for trainingProfilerReport-1668198760: InProgress
............
2022-11-11 20:35:05 Downloading - Downloading input data...
2022-11-11 20:35:38 Training - Training image download completed. Training in progress..[34mArguments: train[0m
[34m[2022-11-11:20:35:43:INFO] Running standalone xgboost training.[0m
[34m[2022-11-11:20:35:43:INFO] File size need to be processed in the node: 190.78mb. Available memory size in the node: 383.48mb[0m
[34m[2022-11-11:20:35:43:INFO] Determined delimiter of CSV input is ','[0m
[34m[20:35:43] S3DistributionType set as FullyReplicated[0m
[34m[20:35:44] 48700x669 matrix with 32580300 entries loaded from /opt/ml/input/data/train?format=csv&label_column=0&delimiter=,[0m
[34m[2022-11-11:20:35:44:INFO] Determined delimiter of CSV input is ','[0m
[34m[20:35:44] S3DistributionType set as FullyReplicated[0m
[34m[20:35:44] 20872x669 mat

[34m[44]#011train-error:0.111766#011validation-error:0.147374[0m
[34m[20:36:44] src/tree/updater_prune.cc:74: tree pruning end, 1 roots, 34 extra nodes, 36 pruned nodes, max_depth=8[0m
[34m[45]#011train-error:0.111622#011validation-error:0.14747[0m
[34m[20:36:45] src/tree/updater_prune.cc:74: tree pruning end, 1 roots, 186 extra nodes, 70 pruned nodes, max_depth=8[0m
[34m[46]#011train-error:0.110842#011validation-error:0.147183[0m
[34m[20:36:47] src/tree/updater_prune.cc:74: tree pruning end, 1 roots, 214 extra nodes, 92 pruned nodes, max_depth=8[0m
[34m[47]#011train-error:0.109836#011validation-error:0.147422[0m
[34m[20:36:48] src/tree/updater_prune.cc:74: tree pruning end, 1 roots, 162 extra nodes, 80 pruned nodes, max_depth=8[0m
[34m[48]#011train-error:0.10924#011validation-error:0.146991[0m
[34m[20:36:49] src/tree/updater_prune.cc:74: tree pruning end, 1 roots, 116 extra nodes, 70 pruned nodes, max_depth=8[0m
[34m[49]#011train-error:0.108727#011validation-error

[34m[20:37:44] src/tree/updater_prune.cc:74: tree pruning end, 1 roots, 0 extra nodes, 38 pruned nodes, max_depth=0[0m
[34m[93]#011train-error:0.102895#011validation-error:0.147374[0m
[34m[20:37:45] src/tree/updater_prune.cc:74: tree pruning end, 1 roots, 0 extra nodes, 38 pruned nodes, max_depth=0[0m
[34m[94]#011train-error:0.102895#011validation-error:0.147374[0m
[34m[20:37:46] src/tree/updater_prune.cc:74: tree pruning end, 1 roots, 0 extra nodes, 38 pruned nodes, max_depth=0[0m
[34m[95]#011train-error:0.102895#011validation-error:0.147374[0m
[34m[20:37:48] src/tree/updater_prune.cc:74: tree pruning end, 1 roots, 0 extra nodes, 38 pruned nodes, max_depth=0[0m
[34m[96]#011train-error:0.102895#011validation-error:0.147374[0m
[34m[20:37:49] src/tree/updater_prune.cc:74: tree pruning end, 1 roots, 0 extra nodes, 38 pruned nodes, max_depth=0[0m
[34m[97]#011train-error:0.102895#011validation-error:0.147374[0m
[34m[20:37:50] src/tree/updater_prune.cc:74: tree pruning e

### Deploy ML model

In [85]:
xgb_condition_predictor = estimator.deploy(initial_instance_count = 1, instance_type='ml.m5.large')

-----!

### Predicts on Test Data

In [86]:
from sagemaker.predictor import csv_serializer

In [None]:
valid_data_array = valid_model_df.drop(columns= ['transportation_issues']).values
# xgb_condition_predictor.content_type = 'text/csv'
# xgb_condition_predictor.serializer = csv_serializer
predictions = xgb_condition_predictor.predict(valid_data_array).decode('utf-8')
prdictions_array = np.fromstring(predictions[1:], sep=',')
print(predictions_array.shape)
