In [None]:
import os
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
import xgboost as xgb
from google.cloud import bigquery
from google.cloud import bigquery_storage

In [None]:
df = pd.read_csv('./BankChurners.csv')
df = df.drop(columns=['CLIENTNUM'])
# XGBoost deals with missing values if I set them to np.Nan
# I'm solving the problem with choice of algorithm since otherwise I'd either have to remove these or make guesses
# Also XGBoost performs well on tabular data so I might've used it anyways
columns_with_missing_labels = ['Education_Level', 'Marital_Status', 'Income_Category']
df[columns_with_missing_labels] = df[columns_with_missing_labels].replace('Unknown', np.NaN)
# df.isna().sum()

df = df.rename(columns={
    'Naive_Bayes_Classifier_Attrition_Flag_Card_Category_Contacts_Count_12_mon_Dependent_count_Education_Level_Months_Inactive_12_mon_1': 'NB1',
    'Naive_Bayes_Classifier_Attrition_Flag_Card_Category_Contacts_Count_12_mon_Dependent_count_Education_Level_Months_Inactive_12_mon_2': 'NB2'
})

numerical_columns = ['Customer_Age', 'Dependent_count', 'Months_on_book', 'Total_Relationship_Count', "Months_Inactive_12_mon","Contacts_Count_12_mon","Credit_Limit","Total_Revolving_Bal","Avg_Open_To_Buy","Total_Amt_Chng_Q4_Q1","Total_Trans_Amt","Total_Trans_Ct","Total_Ct_Chng_Q4_Q1","Avg_Utilization_Ratio","NB1","NB2"]
scaler = StandardScaler()
numerical_df = df[numerical_columns]
scaler.fit(numerical_df)
# Need to save the mean and std per col for receiving new input
scaled_df = pd.DataFrame(scaler.transform(numerical_df), columns=numerical_df.columns)
df[numerical_columns] = scaled_df

categorical_columns = ['Education_Level', 'Marital_Status', 'Income_Category', 'Gender', 'Card_Category']
encoder = LabelEncoder()
for col in categorical_columns:
  df[col] = df[[col]].apply(encoder.fit_transform)
df['Attrition_Flag'] = df['Attrition_Flag'].map({'Existing Customer': 0, 'Attrited Customer': 1})

df

In [None]:
y = df['Attrition_Flag']
x = df.drop(columns=['Attrition_Flag'])
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3, shuffle=True)
x_train.shape, x_test.shape, y_train.shape, y_test.shape

In [None]:
xgb_tree = xgb.XGBClassifier()
xgb_params = {}
xgb_params['eval_metric'] = 'auc'
xgb_params['early_stopping_rounds'] = 50
xgb_tree.set_params(**xgb_params)
xgb_tree.fit(x_train, y_train, eval_set=[(x_train, y_train), (x_test, y_test)], verbose=True)
print(f"Best iteration: {xgb_tree.best_iteration}")

In [None]:
y_pred = xgb_tree.predict_proba(x_test)
predictions = np.argmax(y_pred, axis=1)
# print(pd.Series(predictions).value_counts(normalize=True) * 100)
# print(y_test.value_counts(normalize=True) * 100)
auroc = roc_auc_score(y_test, predictions, multi_class="ovr")
print(f"Evaluation completed with model accuracy: {auroc}")

In [None]:
PROJECT_ID = 'totemic-guild-419402'
try:
    from google.colab import auth
    auth.authenticate_user(project_id = PROJECT_ID)
    print('Colab authorized to GCP')
    !gcloud config get-value account
    bq = bigquery.Client(project = PROJECT_ID)
    bqstorage_client = bigquery_storage.BigQueryReadClient()
except Exception:
    print('Not a Colab Environment')
    pass

# BigQuery Parameters
REGION = 'us-central1'
BQ_PROJECT = PROJECT_ID
BQ_DATASET = 'attrition_prevention'
BQ_TABLE = 'bank_churners'
BQ_REGION = REGION[0:2]

In [None]:
def clean_and_normalize(df: pd.DataFrame) -> pd.DataFrame:
  # df = df.drop(columns=['CLIENTNUM'])
  df = df.drop(columns=['entity_id'])
  # XGBoost deals with missing values if I set them to np.Nan
  # I'm solving the problem with choice of algorithm since otherwise I'd either have to remove these or make guesses
  # Also XGBoost performs well on tabular data so I might've used it anyways
  columns_with_missing_labels = ['Education_Level', 'Marital_Status', 'Income_Category']
  df[columns_with_missing_labels] = df[columns_with_missing_labels].replace('Unknown', np.NaN)
  # df.isna().sum()

  df = df.rename(columns={
      'Naive_Bayes_Classifier_Attrition_Flag_Card_Category_Contacts_Count_12_mon_Dependent_count_Education_Level_Months_Inactive_12_mon_1': 'NB1',
      'Naive_Bayes_Classifier_Attrition_Flag_Card_Category_Contacts_Count_12_mon_Dependent_count_Education_Level_Months_Inactive_12_mon_2': 'NB2'
  })

  numerical_columns = ['Customer_Age', 'Dependent_count', 'Months_on_book', 'Total_Relationship_Count', "Months_Inactive_12_mon","Contacts_Count_12_mon","Credit_Limit","Total_Revolving_Bal","Avg_Open_To_Buy","Total_Amt_Chng_Q4_Q1","Total_Trans_Amt","Total_Trans_Ct","Total_Ct_Chng_Q4_Q1","Avg_Utilization_Ratio","NB1","NB2"]
  scaler = StandardScaler()
  numerical_df = df[numerical_columns]
  scaler.fit(numerical_df)
  # Need to save the mean and std per col for receiving new input
  scaled_df = pd.DataFrame(scaler.transform(numerical_df), columns=numerical_df.columns)
  df[numerical_columns] = scaled_df

  categorical_columns = ['Education_Level', 'Marital_Status', 'Income_Category', 'Gender', 'Card_Category']
  encoder = LabelEncoder()
  for col in categorical_columns:
    df[col] = df[[col]].apply(encoder.fit_transform)
  df['Attrition_Flag'] = df['Attrition_Flag'].map({'Existing Customer': 0, 'Attrited Customer': 1})
  return df

In [None]:
# Reading from BigQuery

# Pagination is key with Big Data!
query = f"SELECT * FROM {BQ_DATASET}.{BQ_TABLE}"
job = bq.query(query)
iterator = job.result(page_size=5_100)

xgb_tree = xgb.XGBClassifier(
    eval_metric='auc',
    early_stopping_rounds=20
)

count = 0
for page in iterator.pages:
  df = pd.DataFrame([dict(row.items()) for row in page])
  if df.empty:
    continue

  df = clean_and_normalize(df)

  y = df['Attrition_Flag']
  x = df.drop(columns=['Attrition_Flag'])
  x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3, shuffle=True)

  # If I did not save between runs it would make a fresh tree on each call to .fit()!
  if not count:
    xgb_tree.fit(x_train, y_train, eval_set=[(x_train, y_train), (x_test, y_test)], verbose=True)
  else:
    xgb_tree.fit(x_train, y_train, eval_set=[(x_train, y_train), (x_test, y_test)], verbose=True, xgb_model='model.ubj')
  xgb_tree.save_model('model.ubj')
  print(f"Best iteration: {xgb_tree.best_iteration}")

  y_pred = xgb_tree.predict_proba(x_test)
  predictions = np.argmax(y_pred, axis=1)
  # print(pd.Series(predictions).value_counts(normalize=True) * 100)
  # print(y_test.value_counts(normalize=True) * 100)
  auroc = roc_auc_score(y_test, predictions, multi_class="ovr")
  print(f"Evaluation completed with model accuracy: {auroc}")
  count += 1

print(f"Total pages processed: {count}")

In [None]:
# Syncing your data:

# In my mind, the data should trigger a sync which should
# trigger the logic in this notebook
# I do not see a way to trigger from their scheduler built into the feature store view
# I am doing a pull semantic with a manual trigger here, but we should look into a push semantic

# Paste this into the cloud shell to trigger a manual sync of a Feature View
# https://cloud.google.com/vertex-ai/docs/featurestore/latest/sync-data#curl
# Still pre-GA so they haven't built an API for it
# There's the beta, then the beta of the beta, and that's where this API is
# https://cloud.google.com/vertex-ai/docs/reference/rpc/google.cloud.aiplatform.v1beta1#google.cloud.aiplatform.v1beta1.FeaturestoreService

# curl -X POST \
#     -H "Authorization: Bearer $(gcloud auth print-access-token)" \
#     -H "Content-Type: application/json; charset=utf-8" \
#     -d "" \
#     "https://us-central1-aiplatform.googleapis.com/v1beta1/projects/totemic-guild-419402/locations/us-central1/featureOnlineStores/attrition_bigtable_store/featureViews/attrition_view:sync"

In [None]:
# Reading from Feature Store
from google.cloud import aiplatform
import google.auth

creds, _ = google.auth.default()
feature_store_client = aiplatform.gapic.FeatureOnlineStoreAdminServiceClient(credentials=creds, client_options = dict(api_endpoint = f'{REGION}-aiplatform.googleapis.com'))

FEATURE_VIEW_NAME = 'attrition_view'
FEATURE_ONLINE_STORE_NAME = 'attrition_bigtable_store'
proj_id = ''

online_store = feature_store_client.get_feature_online_store(name = f'projects/{PROJECT_ID}/locations/{REGION}/featureOnlineStores/{FEATURE_ONLINE_STORE_NAME}')

online_store.name

In [None]:
feature_view = feature_store_client.get_feature_view(name = f'{online_store.name}/featureViews/{FEATURE_VIEW_NAME}')
feature_view.name

In [None]:
online_serve_client = aiplatform.gapic.FeatureOnlineStoreServiceClient(client_options = dict(api_endpoint = f'{REGION}-aiplatform.googleapis.com'))

In [None]:
# Notice that I had to hardcode the id of the row that I'm pulling; how does the prediction service know which ID to refresh & pull? 
# That means it must not; prediction endpoint is meant to be sent at
# Use airflow to trigger syncs, get ids, then send request to prediction endpoint? 

# Also only one row at a time means this is prediction only; no training with this 
from google.cloud.aiplatform_v1 import FeatureOnlineStoreServiceClient
from google.cloud.aiplatform_v1.types import feature_online_store_service as feature_online_store_service_pb2

data_client = FeatureOnlineStoreServiceClient(
  client_options={"api_endpoint": f'{REGION}-aiplatform.googleapis.com'}
)
dict(data_client.fetch_feature_values(
  request=feature_online_store_service_pb2.FetchFeatureValuesRequest(
    feature_view=feature_view.name,
    data_key=feature_online_store_service_pb2.FeatureViewDataKey(key = '826077033'),
    data_format=feature_online_store_service_pb2.FeatureViewDataFormat.PROTO_STRUCT,
  )
).proto_struct)

In [None]:
from google.cloud.aiplatform.featurestore import Featurestore

# Want to see a typo in a production google api? Uncomment the line below
featurestore = Featurestore(featurestore_name=online_store.name)

featurestore = Featurestore(featurestore_name='projects/614098673135/locations/us-central1/featurestores/attrition_bigtable_store')
featurestore