In [33]:
from helper import *

# To plot pretty figures
%matplotlib widget

## Half the data will be split out as validation and 0.2 as the test set

In [18]:

def get_split_index(features, labels, test_size=0.1):
    features = np.array(features)
    # The train set will have equal amounts of each target class
    # Performing single split
    split = StratifiedShuffleSplit(n_splits=1, test_size=test_size, random_state=42)
    return [[train_index, test_index] for train_index,test_index in split.split(features, labels)]

def split_valid(features, original_labels, training_labels):
    train_index, validation_index = get_split_index(features, original_labels, test_size=0.5)[0]
    
    X_valid, y_valid, y_valid_original = features.iloc[validation_index],  training_labels.iloc[validation_index], original_labels.iloc[validation_index]
    X_train, y_train, y_original = features.iloc[train_index], training_labels.iloc[train_index], original_labels.iloc[train_index]
     
    return X_train, y_train, y_original, X_valid, y_valid, y_valid_original

def get_train_test_val(features, original_labels, training_labels):
    
    X, y, y_original, X_valid, y_valid, y_valid_original = split_valid(features,original_labels, training_labels)
   
    train_index, test_index = get_split_index(X, y_original)[0]
    X_train = X.iloc[train_index]
    y_train = y.iloc[train_index]
    X_test = X.iloc[test_index]
    y_test = y.iloc[test_index]

    return X_train, y_train, X_test, y_test, y_original, X_valid, y_valid, y_valid_original

### Train a DNN on the modified dataset

In [47]:
# Get split returns a generator
# List comprehension is one way to evaluate a generator

original_data, modded_samples, training_labels, original_labels = simulate_blobs(class_size=6000)

# Separating a hold out set that will be used for validation later
X_train, y_train, X_test, y_test, y_original, X_valid, y_valid, y_valid_original = get_train_test_val(modded_samples, original_labels, training_labels)


print("Train Size:", X_train.shape)
print("Test Size:", y_test.shape)


hot_encoder = dfHotEncoder()
hot_encoder.fit(training_labels)
print("Categories:", hot_encoder.categories_)

FigureCanvasNbAgg()

Train Size: (5400, 2)
Test Size: (600,)
Categories: [array([0, 1])]


In [48]:
NUM_FEATURES = X_train.shape[1]
NUM_LABELS = len(hot_encoder.categories_[0])

In [49]:
def build_dnn(num_features, num_labels=3):

#     reset_graph()
    
    keras.backend.clear_session()

    nn = keras.models.Sequential()
    Dense = keras.layers.Dense
    
    # Using He initialization
    he_init = tf.keras.initializers.he_uniform()
    
    nn.add(Dense(units = 16, activation="relu", input_dim=num_features,
                kernel_initializer=he_init))
    nn.add(Dense(units = 16, activation="relu",
                kernel_initializer=he_init))
    nn.add(Dense(units = 16, activation="relu",
                kernel_initializer=he_init))
    nn.add(Dense(units = 16, activation="relu",
            kernel_initializer=he_init))
    nn.add(Dense(units=2, activation= "softmax",
                kernel_initializer=he_init))

#     BCE = tf.keras.losses.BinaryCrossentropy(from_logits=True)
    
    nn.compile(loss="categorical_crossentropy",
                  optimizer='sgd',
                  metrics=['accuracy'])
    
    return nn

def train_model(model, X, y, X_test=[], y_test=[], epochs=30, batch_size=20, verbose=1, plot=True):
    
    ZScaler = StandardScaler().fit(X)
    
    X_train = ZScaler.transform(X)
    X_test = ZScaler.transform(X_test)
    
    y_train = hot_encoder.transform(y)
    y_test = hot_encoder.transform(y_test)
    
#     lr_scheduler = keras.callbacks.LearningRateScheduler(exp_decay)
    callback_list = []
    
    history = model.fit(X_train, y_train, epochs=epochs, batch_size = batch_size,
                        validation_data=(X_test, y_test), callbacks=callback_list, verbose=verbose)
    
#     if plot: plot_history(history)
    
    return history, ZScaler


In [50]:
nn = build_dnn(NUM_FEATURES)
%time history, Zscaler = train_model(nn, X_train, y_train, X_test, y_test, epochs=100, batch_size=10)

Train on 5400 samples, validate on 600 samples
Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100


In [51]:
# Plotting results from history
plot_history(history)

FigureCanvasNbAgg()

In [52]:
preds = [x for x in nn.predict(Zscaler.transform(X_test[:5]))]
_labels = [np.float(x) for x in y_test]
preds[:5],_labels[:5]

([array([6.1887513e-05, 9.9993813e-01], dtype=float32),
  array([1.0000000e+00, 4.1414715e-11], dtype=float32),
  array([2.8176837e-05, 9.9997187e-01], dtype=float32),
  array([9.9999976e-01, 2.5156234e-07], dtype=float32),
  array([1.0000000e+00, 1.4327063e-16], dtype=float32)],
 [1.0, 0.0, 1.0, 0.0, 0.0])

## Performing SVM on Modded Samples

In [53]:
from sklearn.svm import LinearSVC

svm_clf = Pipeline([
    ("scaler", StandardScaler()),
    ("SVM", LinearSVC(C=1, loss="hinge", max_iter=1000 ))
])

%time svm_clf.fit(X_train, y_train)
print("Linear SVM Test Accuracy: {:0.3f}".format(svm_clf.score(X_test, y_test)))

CPU times: user 7.09 ms, sys: 1.45 ms, total: 8.54 ms
Wall time: 7.2 ms
Linear SVM Test Accuracy: 0.687


## Performing LRP

In [54]:
model = nn
scaled_samples = Zscaler.transform(X_valid)
_labels = y_valid
# mod_labels = modded_labels[test_index]

predictions = model.predict(scaled_samples)
preds = np.array([np.argmax(x) for x in predictions])
true_labels = np.array([x for x in _labels])

correct = preds == true_labels
# versicolor = true_labels == 1

print("Validation Accuracy")
loss_and_metrics = model.evaluate(scaled_samples, hot_encoder.transform(y_valid))
print("Scores on validation set: loss={:0.3f} accuracy={:.4f}".format(*loss_and_metrics))

Validation Accuracy
Scores on validation set: loss=0.000 accuracy=1.0000


In [55]:
_labels[correct].value_counts()

1    3000
0    3000
Name: label, dtype: int64

In [56]:
import innvestigate
import innvestigate.utils as iutils

def perform_analysis(model, analyzer, data, labels=[]):
    analysis = analyzer.analyze(data)
    prediction = model.predict(data)
    
    df_anal = pd.DataFrame(analysis)
    
    return df_anal


# Stripping the softmax activation from the model
model_w_softmax = nn
model = iutils.keras.graph.model_wo_softmax(model_w_softmax)

# Creating an analyzer
lrp_E = innvestigate.analyzer.relevance_based.relevance_analyzer.LRPEpsilon(model=model, epsilon=1e-3)
lrp_Z = innvestigate.analyzer.relevance_based.relevance_analyzer.LRPZPlus(model=model)
lrp_AB   = innvestigate.analyzer.relevance_based.relevance_analyzer.LRPAlpha2Beta1(model=model)

# Getting all the samples that can be correctly predicted
test_idx = correct
all_samples = scaled_samples[test_idx]
all_labels = y_valid_original[test_idx]


# perform_analysis(nn,gradient_analyzer,flowers,types)
all_lrp_AB = perform_analysis(model,lrp_AB, all_samples)
all_lrp_E = perform_analysis(model,lrp_E, all_samples)
all_lrp_Z = perform_analysis(model,lrp_Z, all_samples)


In [57]:
plt.close("Comparison")
fig, axs = plt.subplots(2,2, figsize=(16,10), num="Comparison")
cmap = "Set1" #"Paired"
plot_args = {"kind":"scatter", "x":0,  "y":1, "c":"label", "cmap": cmap, "s":10, "alpha":0.25}

original_data.plot(ax=axs[0][0],title="Original Distribution", **plot_args)

plot_args["c"] = all_labels
all_lrp_E.plot(ax=axs[0][1], title="LRP E", **plot_args)

all_lrp_AB.plot(ax=axs[1][0], title="LRP AB", **plot_args)
all_lrp_Z.plot(ax=axs[1][1], title="LRP Z", **plot_args)

plt.tight_layout()
plt.show()
# plt.savefig(figures_dir+"multiclass_lrp.png")

FigureCanvasNbAgg()

In [58]:
# import time
# plt.show(block=False)
# time.sleep(3)
# plt.close('all')

In [59]:
plt.close("Positive Only LRP")
fig, axs = plt.subplots(1,3, figsize=(18,6), num="Positive Only LRP")

plot_args["c"] = "label"
original_data.plot(ax=axs[0], title="Original Distribution", **plot_args)

plot_args["c"] = all_labels
all_lrp_E.plot(ax=axs[1], title="LRP E", **plot_args)

pos_lrp = all_lrp_E.copy()
pos_lrp[pos_lrp<0] = 0
pos_lrp["label"] = all_labels.values
pos_lrp.plot(ax=axs[2],title="LRP E", **plot_args)

plt.tight_layout()
plt.show()

FigureCanvasNbAgg()

In [45]:
# plt.savefig(figures_dir+"multiclass_noisy_lrp.png")

In [46]:
plotSeparatedLRP(pos_lrp)

FigureCanvasNbAgg()