In [1]:
import jax.numpy as np
from jax import random, jit, grad, vmap
from jax.example_libraries import stax, optimizers
from jax.example_libraries.stax import Dense, Relu, Flatten, Softmax, LogSoftmax, Tanh
import jax.scipy.stats.norm as norm
from jax.scipy.stats import multivariate_normal
from jax.nn import sigmoid, log_sigmoid
from jax.nn import one_hot
import itertools
import torch.utils as trch
from torch.utils import data

from tqdm import trange
from functools import partial
import numpy.random as npr
import matplotlib.pyplot as plt
from matplotlib import cm
from mpl_toolkits.mplot3d import Axes3D
import pymc as pm
import seaborn as sns

import pandas as pd
from sklearn.decomposition import PCA
from sklearn.metrics import confusion_matrix
import numpy as npo

In [None]:
# Ground Truth

df = pd.read_csv("/content/sample_data/data.csv", delimiter=",", header='infer', 
                 infer_datetime_format=True, parse_dates=True,
                  dayfirst=True)

headers = df.columns
data = df[headers[5:58]].to_numpy().astype("float32")  # financial ratios
flag = df[headers[67:120]].to_numpy().astype("int32")  # flag 1, if data==0 or NaN
train_df = pd.DataFrame(df)  # make copy

for fin_ratio_ in headers[67:120]:
    train_df = train_df[train_df[fin_ratio_] > 0]  # Eliminate NaN & zeros
data_train = train_df[headers[5:58]].to_numpy().astype("float32")
flag_train = train_df[headers[67:120]].to_numpy().astype("int32")

sortedFullRatings = ['AAA', 'AA+', 'AA', 'AA-', 'A+', 'A', 'A-', 'BBB+', 'BBB', 
                     'BBB-', 'BB+', 'BB', 'BB-',
                     'B+', 'B', 'B-', 'CCC+', 'CCC', 'CCC-', 'D']
sortedMainRatings = ['AAA', 'AA', 'A', 'BBB', 'BB', 'B', 'CCC', 'D']
lessMainRatings = ['As', 'BBB', 'BB', 'B', 'C/Ds']
# lessMainRatings = ['I', 'S']       
# adjust to have # labels equal to mapRatings unique elements
mapRatings = npo.array([0, 0, 0, 1, 2, 3, 4, 4])
Y_full = npo.zeros((train_df.shape[0], len(sortedFullRatings))).astype("int32")
Y_main = npo.zeros((train_df.shape[0], len(sortedMainRatings))).astype("int32")
Y_less = npo.zeros((train_df.shape[0], mapRatings.max() + 1)).astype("int32")
for i in range(Y_full.shape[0]):
    Y_full[i, sortedFullRatings.index(train_df["Rating"][train_df.index[i]])] = 1
    Y_main[i, sortedMainRatings.index(train_df["RatingMain"][train_df.index[i]])] = 1
    Y_less[i, mapRatings[sortedMainRatings.index(train_df["RatingMain"][train_df.index[i]])]] = 1

Y_color = np.argmax(Y_less, axis=1)  # Use Y_less, Y_main, Y_full depending on how many labels

data_train_norm = (data_train - data_train.mean(axis=0)) #/ data_train.std(axis=0)

pca = PCA()  # Initialize with n_components=x parameter to only find the top eigenvectors
z = pca.fit_transform(data_train_norm)
# z = z / z.std(axis=0)   # normalise across companies for each reduced feature
n_pcs = pca.components_.shape[0]

print("1st component explains: {s}% variance".format(s=pca.explained_variance_ratio_[0] * 100))
threshold = np.where(np.cumsum(pca.explained_variance_ratio_) >= .95)[0][0] + 1
print("95% variance explained by {s} components".format(s=threshold))

X = data_train_norm
Y = Y_less
d = X.shape[1]                          # number of features
N = Y_less.shape[0]                     # number observations
k = Y_color.max() + 1                   # number of classifications

1st component explains: 62.83540725708008% variance
95% variance explained by 3 components


In [None]:
def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)
    
    fmt = '.2f' if normalize else '.0f'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                horizontalalignment="center",
                color="white" if cm[i, j] > thresh else "black")

    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.tight_layout()

In [None]:
randomOrder = npr.permutation(N)  # random sampling of observations
s = int(8 * N / 10)  # split training vs. testing 80:20
x_train = X[randomOrder[0:s], :]
y_train = np.array(Y[randomOrder[0:s], :])
x_test = X[randomOrder[s:N], :]
y_test = np.array(Y[randomOrder[s:N], :])   
k = Y_color.max()+1
y_train_flat = Y_color[randomOrder[0:s]]
y_test_flat = Y_color[randomOrder[s:N]] 


In [None]:
# Architecture
def NN(k):
    init, apply = stax.serial(Dense(16),   # hidden Layers
                              Tanh,
                              Dense(k),   #final dense layers =  num_classes
                              Softmax)
    return init, apply

class NNclassifier:
    # Initialize the class
    def __init__(self, d, k, rng_key=random.PRNGKey(0)):
        # MLP init and apply functions
        self.net_init, self.net_apply = NN(k)
        _, params = self.net_init(rng_key, (-1, d)) 

        # Optimizer initialization and update functions
        lr = optimizers.exponential_decay(1e-3, decay_steps=1000, decay_rate=0.999)
        self.opt_init, \
        self.opt_update, \
        self.get_params = optimizers.adam(lr)
        self.opt_state = self.opt_init(params)
        self.d = d
        self.k = k

        # Logger
        self.itercount = itertools.count()
        self.loss_log = []

    def loss(self, params, batch):
        data, labels = batch
        outputs = self.net_apply(params, data)
        loss = -labels*np.log(outputs)
        return np.mean(loss)

    @partial(jit, static_argnums=(0,))
    def step(self, i, opt_state, batch):
        params = self.get_params(opt_state)
        gradients = grad(self.loss)(params, batch)
        return self.opt_update(i, gradients, opt_state)

    def train(self, dataset, nIter = 10):
        data = iter(dataset)
        pbar = trange(nIter)
        # Main training loop
        for it in pbar:
            # Run one gradient descent update
            batch = next(data)
            self.opt_state = self.step(next(self.itercount), self.opt_state, batch)  
            if it % 50 == 0:
                # Logger
                params = self.get_params(self.opt_state)
                loss = self.loss(params, batch)
                self.loss_log.append(loss)
                pbar.set_postfix({'Loss': loss})

    @partial(jit, static_argnums=(0,))
    def predict(self, params, inputs):
        outputs = self.net_apply(params, inputs)
        return outputs



In [None]:
class DataGenerator(trch.data.Dataset):
  
    def __init__(self, images, labels, 
                 batch_size=128, 
                 rng_key=random.PRNGKey(1234)):
        'Initialization'
        self.images = images
        self.labels = labels
        self.N = labels.shape[0]
        self.batch_size = batch_size
        self.key = rng_key

    @partial(jit, static_argnums=(0,))
    def __data_generation(self, key, images, labels):
        'Generates data containing batch_size samples'
        idx = random.choice(key, self.N, (self.batch_size,), replace=False)
        images = images[idx,...]
        labels = labels[idx,...]
        return images, labels

    def __getitem__(self, index):
        'Generate one batch of data'
        self.key, subkey = random.split(self.key)
        images, labels = self.__data_generation(self.key, self.images, self.labels)
        return images, labels

In [None]:
# Create data iteraror
num_classes = k
dim = x_train.shape[1]
train_dataset = DataGenerator(x_train, y_train, batch_size=64)
print('Input Data shape:', x_train.shape, '\nOutput Label shape', y_train.shape)

# Initialize model
model = NNclassifier(dim, num_classes)

# Train model
model.train(train_dataset, nIter=2000)
opt_params = model.get_params(model.opt_state)
pred_train = np.argmax(model.predict(opt_params, x_train),1)

weights = model.get_params(model.opt_state)
print("Input weights to Hidden Layer:\n",weights[0][0].shape)
print("Activation(Hidden Layer (16) weights):\n",weights[0][1].shape)
print("Tanh:\n",weights[1])
print("Hidden Layer to Output Layer weights:\n", weights[2][0].shape)
print("Activation(Output Layer (2) weights):\n", weights[2][1].shape)
print("Softmax:\n",weights[3])

# Plot loss
plt.figure()
plt.plot(model.loss_log, lw=2)
plt.title('Training Accuracy {0:.2f}'.format(np.mean(y_train_flat==pred_train)))
plt.yscale('log')
plt.xlabel('Iter #')
plt.ylabel('Loss')
plt.show()

# Create data iteraror 
test_dataset = DataGenerator(x_test, y_test, batch_size=64)  # run only 2 dimensions to test
test_data = iter(test_dataset)

# Plot the confusion matrix
outputs = model.predict(opt_params, x_test)
pred_class = np.argmax(outputs, 1)
cm = confusion_matrix(y_test_flat, pred_class)
plot_confusion_matrix(cm, lessMainRatings, normalize=False, 
                      title='NN Classifier Confusion matrix \n Testing Accuracy {0:.2f}'.format(np.mean(y_test_flat==pred_class)), cmap=plt.cm.Blues)
plt.show()

print("Test Accuracy: ", np.mean(y_test_flat==pred_class))
