<a href="https://colab.research.google.com/github/benjamin-carter/neural_bank/blob/master/A5_churn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Connect with Google Drive in order to upload dataset

In [0]:
from google.colab import drive

drive.mount("/content/gdrive")

View contents of Google Drive

In [0]:
ls "/content/gdrive/My Drive/Current"

Import necessary packages

In [0]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from matplotlib.colors import ListedColormap
from IPython.display import clear_output
from time import sleep
from scipy.stats import multivariate_normal as mvn
from scipy.stats import truncnorm as trn 

%matplotlib inline
cmap_bold = ListedColormap(["#FF0000", "#00FF00", "#0000FF"])
cmap_light = ListedColormap(["#FFBBBB", "#BBFFBB", "#BBBBFF"])

Define and code universal functions needed for modeling and measuring effectiveness of models

In [0]:
def linear(H):
  return H

def ReLU(H):
  return H * (H > 0)

def sigmoid(H):
  return 1 / (1 + np.exp(-H))

def softmax(H):
  eH = np.exp(H)
  return eH / eH.sum(axis = 1, keepdims = True)

def cross_entropy(Y, P_hat):
  return -(1 / len(Y))*np.sum(Y * np.log(P_hat))

def OLS(Y, Y_hat):
  return (1 / (2 * len(Y))) * np.sum((Y - Y_hat) ** 2)

def derivative(Z, a):
  if a == linear: return 1
  if a == sigmoid: return Z*(1-Z)
  
  elif a == np.tanh: return 1 - Z*Z 
  elif a == ReLU: return (Z > 0). astype(int)
  else: ValueError("Unkown Activation")

def one_hot_encode(y):
  N = len(y)
  K = len(set(y))
  Y = np.zeros((N, K))

  for i in range(N):
    Y[i,y[i]] = 1
  return Y 

def accuracy(y, y_hat): 
  return np.mean(y == y_hat)

def R2(y, y_hat):
  return sqrt((y - y_hat)**2)

def confusion(y, y_hat):
  confuse = np.zeros((2,2))
  confuse[0,0] = np.sum(y_hat[y == 1])
  confuse[0,1] = np.sum(y_hat[y == 0])
  confuse[1,0] = np.count_nonzero(y_hat[y == 1] == 0)
  confuse[1,1] = np.count_nonzero(y_hat[y == 0] == 0)
  return confuse

def indices_to_one_hot(data, nb_classes):
  #converts on iterable of indices to one hot labels
  targets = np.array(data).reshape(-1)
  #targets.astype(int)
  #nb_classes.astype(int)
  return np.eye(nb_classes)[targets]


Declare and code the class for teh Artificial Neural Network

In [0]:
class ANN():
  def __init__(self, architecture, activations = None, mode = 0):
    self.mode = mode
    self.architecture = architecture
    self.L = len(architecture) + 1
    self.activations = activations

  def fit(self, X, y, eta = 1e-2, epochs = 1e3, show_curve = False, lamb = 1e-2):
    epochs = int(epochs)
    
    if self.mode:
      Y = y
    else: Y = one_hot_encode(y)
   
    N, D = X.shape
    K = Y.shape[1]

    #self.W = {l: np.random.randn(M[0], M[1]) for l, M in enumerate(zip(([D] + self.architecture), (self.architecture + [K])), 1)}
    #self.b = {l: np.random.randn(M) for l, M in enumerate(self.architecture + [K], 1)}
    
    if self.activations is None:
      self.a = {l: ReLU for l in range(1, self.L)}
    else:
      self.a = {l: act for l, act in enumerate(self.activations, 1)}

    if self.mode:
      self.a[self.L] = linear
    else: 
      self.a[self.L] = softmax

    J = np.zeros(epochs)

    for epoch in range(epochs):
      self.forward(X)

      if self.mode:
        J[epoch] = OLS(Y, self.Z[self.L])

      else:
        J[epoch] = cross_entropy(Y, self.Z[self.L])

      dH = (1 / N) * (self.Z[self.L] - Y)

      for l in sorted(self.W.keys(), reverse = True):
        dW = self.Z[l - 1].T @ dH
        db = dH.sum(axis = 0)
        self.W[l] = (1 - lamb) * self.W[l] - eta*dW
        self.b[l] -= self.b[l] - eta*db 
        #self.W[l] = (1 - lamb) * self.W[l] - lamb*np.sign(self.W[l])- eta*dW
        #self.W[l] -= eta*dW - lamb * self.W[l]
        #self.b[l] -= eta*db - lamb * self.b[l]

        if l > 1:
          dZ = dH @ self.W[l].T
          dH = dZ * derivative(self.Z[l-1], self.a[l-1])

    if show_curve:
      plt.figure()
      plt.plot(J)
      plt.xlabel("epochs")
      plt.ylabel("J")
      plt.title("Training Curve")
      plt.show()

  def forward(self, X):
    self.Z = {0:X}

    for l in sorted(self.W.keys()):
      self.Z[l] = self.a[l](self.Z[l-1] @ self.W[l] + self.b[l])

  def predict(self, X):
    self.forward(X)
    
    if self.mode:
      return self.Z[self.L]

    else:
      return self.Z[self.L].argmax(axis = 1)


Upload data from Google Drive

In [0]:
churn_df = pd.read_csv("/content/gdrive/My Drive/Current/churn_df.csv")

#temp = churn_df[['Exited', 'CreditScore', 'Age', 'Tenure', 'Balance', 'NumOfProducts', 'HasCrCard', 'IsActiveMember', 'EstimatedSalary',
 #      'Spain', 'Germany', 'Male']]

temp = churn_df[['Exited', 'HasCrCard', 'IsActiveMember', 'Spain', 'France', 'Female', 'prod1',  'Bal1', 'Bal0', 'Sal3',
       'Sal2', 'Sal1', 'Sal4', 'CS1', 'CS0', 'Ten3', 'Ten2', 'Ten1', 'Ten0', 'Age3', 'Age1', 'Age2']]
churn_np = temp.to_numpy()
churn_np.shape


Run with training set as is, with Geography and Gender categorized.

In [80]:
N,D = churn_np.shape
scale = .8
test_s = int((1 - scale)*N)
np.random.shuffle(churn_np)
X_test = churn_np[:test_s,1:]
y_test = churn_np[:test_s,0].astype(int)
y_train = churn_np[test_s:,0].astype(int)
X_train = churn_np[test_s:,1:]
X_train.shape

(8001, 22)

Normalize Data

In [0]:
N, D = churn_np.shape

for i in range(1,D):
  up = np.max(churn_np[:,i])
  down = np.min(churn_np[:,i])
  if (up - down) != 0:
    for j in range(N):
      #churn_np[j,i] = (churn_np[j,i] - down) / (up - down)
      churn_np[j,i] = 2 * (churn_np[j,i] - down) / (up - down) - 1



Create training and testing sets. Also oversampling, SMOTE, and adding noise to training set as needed

In [0]:
N,D = churn_np.shape
scale = .8
test_s = int((1 - scale)*N)
np.random.shuffle(churn_np)
X_test = churn_np[:test_s,1:]
y_test = churn_np[:test_s,0].astype(int)
#ID_test = churn_np[:test_s, 0]

X_train = churn_np[test_s:,:]
# Oversampling
ones_churn = X_train[X_train[:,0] == 1,:].copy()
X_train = np.concatenate((ones_churn, X_train))
X_train = np.concatenate((ones_churn, X_train))
# SMOTE
smote_churn = ones_churn.copy()
smote_churn[:,2:] = ones_churn[:,2:] + .01 * np.random.randn()
X_train = np.concatenate((smote_churn, X_train))


#X_train = churn_np[test_s:,:]

np.random.shuffle(X_train)
#ID_train = X_train[:,0]
y_train = X_train[:,0].astype(int)
X_train = X_train[:,1:]
# Add noise to enire training set
X_train = X_train + .01 * np.random.randn()
X_train.shape


Run Artificial Neural Network with desired architecture and metrics for model verification.

In [0]:
#ann = ANN([30, 25, 20, 15, 10], [np.tanh, np.tanh, np.tanh, np.tanh, np.tanh])
ann.W = tempw
ann.b = tempb 
ann.fit(X_train, y_train, eta = 1e-3, epochs = 1e4, show_curve = True, lamb = 0.0000)
y_hat = ann.predict(X_test)
y_hat_train = ann.predict(X_train)
print(f"Training Accuracy : {accuracy(y_train, y_hat_train):0.4f}")
print(f"Testing Accuracy : {accuracy(y_test, y_hat):0.4f}")
tempw = ann.W
tempb = ann.b
print(f"Cost : {np.sum(y_hat[y_test == 0]) + 5 * np.count_nonzero(y_hat[y_test == 1] == 0):0.4f}")
print(f"1's Accuracy : {np.sum(y_hat[y_test == 1]) / (np.sum(y_hat[y_test == 1]) + np.count_nonzero(y_hat[y_test == 1] == 0)):0.4f}")
confusion(y_test, y_hat)

Save weights of the neural network when goal is reached.

In [0]:
dfw = pd.DataFrame.from_dict(ann.W, orient='index')
dfw.to_csv("/content/gdrive/My Drive/Current/weights_ann.csv")

dfb = pd.DataFrame.from_dict(ann.b, orient='index')
dfb.to_csv("/content/gdrive/My Drive/Current/bias_ann.csv")

Retrieve weights and bias of passed models as desired.

In [0]:
dfw = pd.read_csv("/content/gdrive/My Drive/Current/weights_ann.csv", index_col=0, squeeze=True).to_dict()

dfb = pd.read_csv("/content/gdrive/My Drive/Current/bias_ann.csv", index_col=0, squeeze=True).to_dict()

Learning subsets of the data

In [0]:
churn_df[['Exited', 'HasCrCard', 'IsActiveMember', 'Spain', 'Germany', 'France', 'Female', 'Male', 'prods4',
          'prods3', 'prods2', 'prod1', 'Bal2', 'Bal1', 'Bal0', 'Sal4', 'Sal3', 'Sal2', 'Sal1', 'Sal0', 
          'CS3', 'CS2', 'CS1', 'CS0', 'Ten4', 'Ten3', 'Ten2', 'Ten1', 'Ten0', 'Age3', 'Age2', 'Age1', 'Age0']]

#credit score
churn_df = pd.read_csv("/content/gdrive/My Drive/Current/churn_df.csv")
credscore = churn_df[['Exited', 'HasCrCard', 'IsActiveMember', 
                      'Spain', 'Germany', 'France', 'Female', 'Male', 'prods4',
                      'prods3', 'prods2', 'prod1', 'Bal2', 'Bal1', 'Bal0', 'Sal4', 'Sal3',
                      'Sal2', 'Sal1', 'Sal0', 'CS3', 'CS2', 'CS1', 'CS0', 'Ten4', 'Ten3',
                      'Ten2', 'Ten1', 'Ten0', 'Age3', 'Age2', 'Age1', 'Age0']]
churn_df.head()

N,D = churn_np.shape
temp = churn_np[churn_np[:,-1] == 0,:]
temp = temp[:,:-1]
churn_np = temp
churn_np.shape

Run General Logistic Regression model and test accuracy and given metrics.

In [0]:
glr = GenLogisticRegression()
glr.fit(X_train, y_train, show_curve = True)
y_hat_glr = glr.predict(X_test)
y_hat_train_glr = glr.predict(X_train)
print(f"Training Accuracy : {accuracy(y_train, y_hat_train_glr):0.4f}")
print(f"Testing Accuracy : {accuracy(y_test, y_hat_glr):0.4f}")
print(f"Cost : {np.sum(y_hat_glr[y_test == 0]) + 5 * np.count_nonzero(y_hat_glr[y_test == 1] == 0):0.4f}")
print(f"1's Accuracy : {np.sum(y_hat_glr[y_test == 1]) / (np.sum(y_hat_glr[y_test == 1]) + np.count_nonzero(y_hat_glr[y_test == 1] == 0)):0.4f}")
confusion(y_test, y_hat_glr)

Run KNN classifier model and test acuracy and given metrics.

In [0]:
knn = KNNClassifier()
knn.fit(X_train, y_train)
y_hat_knn = knn.predict(X_test,15)
y_hat_train_knn = knn.predict(X_train,15)
print(f"Training Accuracy : {accuracy(y_train, y_hat_train_knn):0.4f}")
print(f"Testing Accuracy : {accuracy(y_test, y_hat_knn):0.4f}")
print(f"Cost : {np.sum(y_hat_knn[y_test == 0]) + 5 * np.count_nonzero(y_hat_knn[y_test == 1] == 0):0.4f}")
print(f"1's Accuracy : {np.sum(y_hat_knn[y_test == 1]) / (np.sum(y_hat_knn[y_test == 1]) + np.count_nonzero(y_hat_knn[y_test == 1] == 0)):0.4f}")
confusion(y_test, y_hat_knn)

Run Bayes model and test accuracy and given metrics.

In [0]:
# run either Gauss model with original train / test data
gnb = GaussBayes()
gnb.fit(X_train, y_train)

y_hat_gnb = gnb.predict(X_test)
y_hat_train_gnb = gnb.predict(X_train)
print(f"Training Accuracy : {accuracy(y_train, y_hat_train_gnb):0.4f}")
print(f"Testing Accuracy : {accuracy(y_test, y_hat_gnb):0.4f}")
print(f"Cost : {np.sum(y_hat_gnb[y_test == 0]) + 5 * np.count_nonzero(y_hat_gnb[y_test == 1] == 0):0.4f}")
print(f"1's Accuracy : {np.sum(y_hat_gnb[y_test == 1]) / (np.sum(y_hat_gnb[y_test == 1]) + np.count_nonzero(y_hat_gnb[y_test == 1] == 0)):0.4f}")
confusion(y_test, y_hat_gnb)

Define classes for Logistic Regression, KNN Classifier, and Bayes method.

In [0]:
class GenLogisticRegression():
  
  def __init__(self, thresh = 0.5):
    self.W = None
    self.B = None

  def fit(self, X, y, eta = 1e-3, epochs = 1e4, show_curve = False):
    N,D = X.shape
    epochs = int(epochs)
    K = len(np.unique(y))
    
    self.y_values = np.unique(y, return_index=False)
    Y = indices_to_one_hot(y, K).astype(int)

    self.W = np.random.randn(D, K)
    self.B = np.random.randn(K)

    J = np.zeros(int(epochs))

    for epoch in range(epochs):
      P_hat = self.__forward__(X)
      J[epoch] = cross_entropy(Y, P_hat)
      self.W -= eta * (1/N) * X.T@(P_hat - Y)
      self.B -= eta * (1/N) * np.sum(P_hat - Y, axis = 0)

    if show_curve:
      plt.figure()
      plt.plot(J)
      plt.xlabel("epochs")
      plt.ylabel("J")
      plt.title("Training Curve")
      plt.show()

  def __forward__(self, X):
    return softmax(X @ self.W + self.B)

  def predict(self, X):
    return np.argmax(self.__forward__(X),axis=1)

class KNNClassifier():
  def fit(self, x, y):
    self.x = x
    self.y = y.astype(int)
  def predict(self, x, k, epsilon=1e-3):
    N = len(x)
    y_hat = np.zeros(N)
    for i in range(N):
      dist2 = np.sum((self.x - x[i])**2, axis =1)
      idxt = np.argsort(dist2)[:k]
      gamma_k = (np.sqrt(dist2[idxt] + epsilon))**-1
      y_hat[i] = np.bincount(self.y[idxt], weights = gamma_k).argmax()
    return y_hat

## Bayes with Gaussian distribution
class GaussBayes():
  # fitting model with training data. input data split into features, X, and labels, y
  def fit(self, X, y, epsilon = 1e-3):
    self.likelihoods = dict()
    self.priors = dict()
    self.K = set(y.astype(int))
    for k in self.K:
      X_k = X[y == k, :]
      N_k, D = X_k.shape
      self.likelihoods[k] = {"Mean": X_k.mean(axis=0), "Cov": (1/(N_k - 1)) * np.matmul((X_k - X_k.mean(axis=0)).T, X_k - X_k.mean(axis=0)) 
      + epsilon*np.identity(D)}
      self.priors[k] = len(X_k) / len(X)
  # prediction function with test data. input is only features.
  def predict(self, X):
    N, D = X.shape
    P_hat = np.zeros((N, len(self.K)))
    for k, l in self.likelihoods.items():
      P_hat[:, k] = mvn.logpdf(X, l["Mean"], l["Cov"]) + np.log(self.priors[k])  
    return P_hat.argmax(axis=1)



Run varying models for subsets of data based on categorical variables.

In [0]:
#churn_df[['Exited', 'HasCrCard', 'IsActiveMember', 'Spain', 'Germany', 'France', 'Female', 'Male', 'prods4',
#          'prods3', 'prods2', 'prod1', 'Bal2', 'Bal1', 'Bal0', 'Sal4', 'Sal3', 'Sal2', 'Sal1', 'Sal0', 
#          'CS3', 'CS2', 'CS1', 'CS0', 'Ten4', 'Ten3', 'Ten2', 'Ten1', 'Ten0', 'Age3', 'Age2', 'Age1', 'Age0']]

#credit score
churn_df = pd.read_csv("/content/gdrive/My Drive/Current/churn_df.csv")
german = churn_df[['Exited', 'HasCrCard', 'IsActiveMember', 'Germany', 'France', 'Male', 'prods2', 'Bal1', 'Bal0', 
                    'Sal3', 'Sal2', 'Sal1', 'Sal4', 'CS2', 'CS1', 'CS0', 'Ten3', 'Ten2', 'Ten1', 'Ten0',  'Age3']]  
churn_np = german.to_numpy()

N,D = churn_np.shape
temp = churn_np[churn_np[:,-1] == 1,:]
temp = temp[:,:-1]
churn_np = temp
print(churn_np.shape)
N,D = churn_np.shape
for i in range(1,D):
  up = np.max(churn_np[:,i])
  down = np.min(churn_np[:,i])
  if (up - down) != 0:
    for j in range(N):
      #churn_np[j,i] = (churn_np[j,i] - down) / (up - down)
      churn_np[j,i] = 2 * (churn_np[j,i] - down) / (up - down) - 1

scale = .8
test_s = int((1 - scale)*N)
np.random.shuffle(churn_np)
X_test = churn_np[:test_s,1:]
y_test = churn_np[:test_s,0].astype(int)
#ID_test = churn_np[:test_s, 0]

X_train = churn_np[test_s:,:]
# Oversampling
ones_churn = X_train[X_train[:,0] == 1,:].copy()
X_train = np.concatenate((ones_churn, X_train))
X_train = np.concatenate((ones_churn, X_train))

np.random.shuffle(X_train)
#ID_train = X_train[:,0]
y_train = X_train[:,0].astype(int)
X_train = X_train[:,1:]
# Add noise to enire training set
X_train = X_train + .01 * np.random.randn()
X_train.shape


ann = ANN([35, 25, 20, 15, 10], [np.tanh, np.tanh, np.tanh, np.tanh, np.tanh])
#ann.W = tempw
#ann.b = tempb 
ann.fit(X_train, y_train, eta = 1e-2, epochs = 1e3, show_curve = True, lamb = 0.000)
y_hat = ann.predict(X_test)
y_hat_train = ann.predict(X_train)
print(f"Training Accuracy : {accuracy(y_train, y_hat_train):0.4f}")
print(f"Testing Accuracy : {accuracy(y_test, y_hat):0.4f}")
tempw = ann.W
tempb = ann.b
print(f"Cost : {np.sum(y_hat[y_test == 0]) + 5 * np.count_nonzero(y_hat[y_test == 1] == 0):0.4f}")
print(f"1's Accuracy : {np.sum(y_hat[y_test == 1]) / (np.sum(y_hat[y_test == 1]) + np.count_nonzero(y_hat[y_test == 1] == 0)):0.4f}")
confusion(y_test, y_hat)