In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec

# # Import from scikit-learn
# # sudo pip install scikit-learn
from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn.linear_model import Perceptron
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.metrics import roc_curve
from sklearn.metrics import roc_auc_score

# Import from NeuPy
from neupy import algorithms, utils, init

# Import from datetime
from datetime import datetime

# Import prettier output
from rich import print
from rich.console import Console
from rich.traceback import install
install(show_locals=True)
console = Console()

In [2]:
cc_df = pd.read_csv('./data/creditcard.csv')

(n_samples, n_features) = cc_df.shape

print(f"n_samples: {n_samples}, n_features: {n_features}")

In [4]:
data_corr = cc_df.corr()

for i in range(len(cc_df.columns)):
    for j in range(i):
        if abs(data_corr.iloc[i, j]) > 0.88:
            print(data_corr.columns[i], data_corr.columns[j], data_corr.iloc[i, j])

In [None]:
from traceback import print_tb
from imblearn.combine import SMOTETomek

TARGET = cc_df['Class']
FEATURES = cc_df.drop(['Class'], axis=1)

features_train, features_aux, target_train, target_aux = train_test_split(FEATURES, TARGET, test_size=0.5, random_state=42)
features_valid, features_test, target_valid, target_test = train_test_split(features_aux, target_aux, test_size=0.5, random_state=42)

# Balance trainning data with SMOTETomek
smote_tomek = SMOTETomek(random_state=42)
features_train, target_train = smote_tomek.fit_resample(features_train, target_train)

print(f'features_train: {features_train.shape}')
print(f'features_valid: {features_valid.shape}')
print(f'features_test: {features_test.shape}')
print(f'target_train: {target_train.shape}')
print(f'target_valid: {target_valid.shape}')
print(f'target_test: {target_test.shape}')

In [9]:
# Modeling SOM
# ------------
max_iter_som = 25
grid_height = 20
grid_width = 20
distance = 'euclid'
learning_radius = 5
step = 0.5
reduce_step_after = max_iter_som - 5
std = 1.0
reduce_std_after = max_iter_som - 5
weight = init.Normal()

print("Learning %dx%d SOM with %d maximum number of iterations and ..." % (grid_height, grid_width, max_iter_som))

now = datetime.now()
# Random generator seed for NeuPy
# utils.reproducible(0)

sofm = algorithms.SOFM(
    n_inputs = features_train.shape[1],
    features_grid = (grid_height, grid_width),
    distance = distance,
    weight = weight,
    learning_radius = learning_radius,
    reduce_radius_after = max_iter_som // learning_radius,  # 0 radius at end
    step = step,
    reduce_step_after = reduce_step_after,
    std = std,
    reduce_std_after = reduce_std_after,
    shuffle_data = False,
    verbose = True,
)

sofm.train(features_train, epochs=max_iter_som)
sofm_output_train = sofm.predict(features_train)
sofm_output_valid = sofm.predict(features_valid)
print("Number of seconds for training: %d" % (datetime.now() - now).total_seconds())

# Show results
print("Visualizing the Mean Absolute Error Trajectory")
# plt.plot(range(1, len(sofm.errors.train)+1), sofm.errors.train)
plt.plot(range(1, len(sofm.errors)+1), sofm.errors)
plt.xlabel('number of iterations')
plt.ylabel('MAE')
plt.show()


Main information

[ALGORITHM] SOFM

[OPTION] verbose = True
[OPTION] epoch_end_signal = None
[OPTION] show_epoch = 1
[OPTION] shuffle_data = False
[OPTION] step = 0.5
[OPTION] train_end_signal = None
[OPTION] n_inputs = 30
[OPTION] distance = euclid
[OPTION] features_grid = [20, 20]
[OPTION] grid_type = rect
[OPTION] learning_radius = 5
[OPTION] n_outputs = None
[OPTION] reduce_radius_after = 5
[OPTION] reduce_std_after = 20
[OPTION] reduce_step_after = 20
[OPTION] std = 1.0
[OPTION] weight = Normal(mean=0, std=0.01)


Start training

[TRAINING DATA] shapes: (282854, 30)
[TRAINING] Total epochs: 25

---------------------------------------------------------
|    Epoch    |  Train err  |  Valid err  |    Time     |
---------------------------------------------------------
|           1 |      423.45 |           - |       01:08 |


In [None]:
# Prototypes visualization
# ------------
# have a look at the grid
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec

def plot_prototypes_grid(grid_height, grid_width, prototypes, labels=[]):
    """
    Visualization prototypes of SOM grid and labels
    """
    print("Building visualization of prototypes grid ...")
    grid = gridspec.GridSpec(grid_height, grid_width)
    grid.update(wspace=0, hspace=0)
    for row_id in range(grid_height):
        print("Progress: {:.2%}".format(row_id / grid_height))
        for col_id in range(grid_width):
            index = row_id * grid_width + col_id
            prototype = prototypes[index]
            _ = plt.subplot(grid[index])
            _ = plt.bar(range(len(prototype)), prototype)
            if len(labels):
                _ = plt.text(0, 0, labels[index], color='r', fontsize=8)
            _ = plt.axis('off')
    plt.show()


plot_prototypes_grid(grid_height, grid_width, sofm.weight)

In [None]:
# Modeling Perceptron
# ------------
# Perceptron use de SOM output (grid array of 0 except winning output).
# That is, Counterpropagation Network (CPN)
max_iter_per = 30

print("Learning a Perceptron with %d maximum number of iterations and ..." % max_iter_per)

per = Perceptron(max_iter=max_iter_per, shuffle=False, verbose=True)
per.fit(sofm_output_train, target_train)

In [None]:
# Intitial results
# ------------
print("Printing initial results")

predict_train = per.predict(sofm_output_train)
predict_valid = per.predict(sofm_output_valid)

print("Train accuracy: %.3f%%" % (accuracy_score(target_train, predict_train) * 100))
print("Valid accuracy: %.3f%%" % (accuracy_score(target_valid, predict_valid) * 100))

print("Train confusion matrix:")
print(confusion_matrix(target_train, predict_train))
print("Valid confusion matrix:")
print(confusion_matrix(target_valid, predict_valid))

print("Train classification report:")
print(classification_report(target_train, predict_train))
print("Valid classification report:")
print(classification_report(target_valid, predict_valid))

In [None]:
# Labels visualization
# ------------
sofm_output_labels = np.zeros((grid_height * grid_width, grid_height * grid_width), dtype=int)
for i in range(grid_height * grid_width):
    sofm_output_labels[i][i] = 1
predict_labels = per.predict(sofm_output_labels)

plot_prototypes_grid(grid_height, grid_width, sofm.weight, predict_labels)

In [None]:

# Architecture optimization
# ------------
print("Architecture optimization")

# Test SOM with differents number of grid units and several repetitions
tests_grid_side = [5, 10, 15, 20, 25, 30, 35]
n_reps = 5

now = datetime.now()
best_sofm = []
best_per = []
best_acc = 0.0
accs_train = []
accs_valid = []
for grid_side in tests_grid_side:
    max_acc_train = max_acc_valid = 0.0
    for random_state in range(n_reps):
        # utils.reproducible(random_state)
        sofm = algorithms.SOFM(
            n_inputs = features_train.shape[1],
            features_grid = (grid_side, grid_side), 
            distance = distance, 
            weight = weight, 
            learning_radius = learning_radius, 
            reduce_radius_after = max_iter_som // learning_radius, 
            step = step, 
            reduce_step_after = reduce_step_after, 
            std = std, 
            reduce_std_after = reduce_std_after, 
            shuffle_data = False, 
            verbose = False
        )
        sofm.train(features_train, epochs=max_iter_som)
        sofm_output_train = sofm.predict(features_train)
        sofm_output_valid = sofm.predict(features_valid)
        per = Perceptron(max_iter=max_iter_per, shuffle=False, verbose=False)
        _ = per.fit(sofm_output_train, target_train)
        acc_train = accuracy_score(target_train, per.predict(sofm_output_train))
        acc_valid = accuracy_score(target_valid,per.predict(sofm_output_valid))
        print("Seed = %d, train acc = %.8f, valid acc = %.8f" % (random_state, acc_train, acc_valid))
        if (max_acc_valid < acc_valid):
            max_acc_valid = acc_valid
            max_acc_train = acc_train
            if (acc_valid > best_acc):
                best_acc = acc_valid
                best_per = per
                best_sofm = sofm
    accs_train.append(max_acc_train)
    accs_valid.append(max_acc_valid)
    print("Grid size = %ix%i, train acc = %.8f, max valid acc = %.8f" % (grid_side, grid_side, max_acc_train, max_acc_valid))

print("Number of seconds for training: %d" % (datetime.now() - now).total_seconds())
print("Best CPN valid accuracy: %.8f%%" % (best_acc * 100))
print("Best SOM: ", best_sofm)
print("Best Perceptron: ", best_per)

In [None]:
# Show results
width = 2
plt.bar(np.array(tests_grid_side) - width, 100 *(1- np.array(accs_train)), color='g', width=width, label='Train error')
plt.bar(np.array(tests_grid_side), 100 *(1- np.array(accs_valid)), width=width, label='Min valid error')
plt.xlabel('grid side')
plt.ylabel('error (%)')
plt.xticks(np.array(tests_grid_side), tests_grid_side)
plt.legend(loc='upper right')
plt.show()

In [None]:
# Final results of best CPN
# ------------
print("Printing final results")

sofm_output_train = best_sofm.predict(features_train)
sofm_output_valid = best_sofm.predict(features_valid)
sofm_output_test = best_sofm.predict(features_test)
predict_train = best_per.predict(sofm_output_train)
predict_valid = best_per.predict(sofm_output_valid)
predict_test = best_per.predict(sofm_output_test)

print("Train accuracy: %.3f%%" % (accuracy_score(target_train, predict_train) * 100))
print("Valid accuracy: %.3f%%" % (accuracy_score(target_valid, predict_valid) * 100))
print("Test accuracy: %.3f%%" % (accuracy_score(target_test, predict_test) * 100))

print("Train confusion matrix:")
print(confusion_matrix(target_train, predict_train))
print("Valid confusion matrix:")
print(confusion_matrix(target_valid, predict_valid))
print("Test confusion matrix:")
print(confusion_matrix(target_test, predict_test))

print("Train classification report:")
print(classification_report(target_train, predict_train))
print("Valid classification report:")
print(classification_report(target_valid, predict_valid))
print("Test classification report:")
print(classification_report(target_test, predict_test))

In [None]:
# ROC curves of test set
per_probs = best_per.decision_function(sofm_output_test)
classes  = np.unique(target_train)
per_auc = []
per_fpr = []
per_tpr = []
for cla in classes:
   per_auc.append(roc_auc_score(target_test==cla, per_probs[:,cla]))
   fpr, tpr, _ = roc_curve(target_test==cla, per_probs[:,cla])
   per_fpr.append(fpr)
   per_tpr.append(tpr)

print("Printing ROC curves of test set")
# plot the roc curve for the model
for cla in classes:
   # plt.plot(ns_fpr, ns_tpr, linestyle='--', label='No Skill')
   _ = plt.plot(per_fpr[cla], per_tpr[cla], marker='.', label='Class %d (AUC: %.5f)' % (cla, per_auc[cla]))

# axis labels
plt.plot([0, 1], [0, 1], color='navy', linestyle='--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
# show the legend
plt.legend()
# show the plot
plt.show()

In [None]:
# Show grid prototypes and labels
(grid_height, grid_width) = best_sofm.features_grid
sofm_output_labels = np.zeros((grid_height * grid_width, grid_height * grid_width), dtype=int)
for i in range(grid_height * grid_width):
    sofm_output_labels[i][i] = 1
predict_labels = best_per.predict(sofm_output_labels)

plot_prototypes_grid(grid_height, grid_width, best_sofm.weight, predict_labels)

In [None]:
# Show errors on real data
indxs = np.where(predict_test == target_test)[0]
indxs_err = indxs[(np.where(predict_test[(indxs)] != target_test[(indxs)]))[0]]
preds_err = predict_test[(indxs_err)]

count = [0, 0]

for idx in indxs_err:
    pred = predict_test[idx]
    real = target_test.iloc[idx]
    count[pred] += 1
    print("Fila: %d Predicción: %d Real: %d" % (idx, pred, real))

In [None]:
_, axes = plt.subplots(2, 5, figsize=(10, 5))
images_and_labels = list(zip(features_test.iloc[indxs_err].values.reshape((len(indxs_err), 8, 8)), target_test.iloc[indxs_err], preds_err))
for ax, (image, label1, label2) in zip(np.concatenate(axes), images_and_labels):
    ax.set_axis_off()
    ax.imshow(image, cmap=plt.cm.gray_r, interpolation='nearest')
    ax.set_title('real:%i pred:%i' % (label1, label2))

plt.show()