# Chapter 7 Code

Covers code for Chapter 7, "Reframing Difficult Deep Learning Problems", of *Modern Deep Learning Design and Application*.

---

## Importing Libraries

In [None]:
# array processing + math
import numpy as np
import pandas as pd
import math

# plotting
import matplotlib.pyplot as plt
import seaborn as sns

# looping
from tqdm.notebook import tqdm

# deep learning staple libraries
import sklearn
import tensorflow as tf
from tensorflow import keras

# keras specifics
import keras.layers as L
import keras.backend as K
from keras.utils import plot_model

---

## DeepInsight

### Install + Import pyDeepInsight

In [None]:
!pip install git+git://github.com/alok-ai-lab/DeepInsight.git#egg=DeepInsight

In [None]:
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(data.drop('class',axis=1),
                                                    data['class'],
                                                    train_size=0.8)
y_train = keras.utils.to_categorical(y_train)
y_test = keras.utils.to_categorical(y_test)

In [None]:
from pyDeepInsight import ImageTransformer, LogScaler
ln = LogScaler()
X_train_norm = ln.fit_transform(X_train)
X_test_norm = ln.transform(X_test)
it = ImageTransformer(feature_extractor='kpca', 
                      pixels=32)
tf_train_x = it.fit_transform(X_train_norm)
tf_test_x = it.transform(X_test_norm)

for i in range(5):
    plt.figure(figsize=(10,10))
    plt.imshow(tf_train_x[i])
    plt.axis('off')
    plt.savefig(f'{i}.png', dpi=400)
    plt.show()

In [None]:
# input
inp = L.Input((32,32,3))

# branch 1
x = inp
for i in range(3):
    x = L.Conv2D(2**(i+3), (2,1), padding='same')(x)
    x = L.Conv2D(2**(i+3), (1,2), padding='same')(x)
    x = L.Conv2D(2**(i+3), (2,2), padding='same')(x)
    x = L.BatchNormalization()(x)
    x = L.Activation('relu')(x)
    x = L.MaxPooling2D((2,2))(x)
    x = L.Dropout(0.3)(x)
x = L.Conv2D(64, (2,2), padding='same')(x)
x = L.BatchNormalization()(x)
branch_1 = L.Activation('relu')(x)

# branch 2
x = inp
for i in range(3):
    x = L.Conv2D(2**(i+3), (5,1), padding='same')(x)
    x = L.Conv2D(2**(i+3), (1,5), padding='same')(x)
    x = L.Conv2D(2**(i+3), (5,5), padding='same')(x)
    x = L.BatchNormalization()(x)
    x = L.Activation('relu')(x)
    x = L.MaxPooling2D((2,2))(x)
    x = L.Dropout(0.3)(x)
x = L.Conv2D(64, (5,5), padding='same')(x)
x = L.BatchNormalization()(x)
branch_2 = L.Activation('relu')(x)

# concatenate + output
concat = L.Concatenate()([branch_1, branch_2])
global_pool = L.GlobalAveragePooling2D()(concat)
fc1 = L.Dense(32, activation='relu')(global_pool)
fc2 = L.Dense(32, activation='relu')(fc1)
fc3 = L.Dense(32, activation='relu')(fc2)
out = L.Dense(9, activation='softmax')(fc3)

# aggregate into model
model = keras.models.Model(inputs=inp, outputs=out)

In [None]:
model.compile(optimizer='adam',
              loss='categorical_crossentropy',
              metrics=['accuracy'])
model.fit(tf_train_x, y_train, epochs=100, validation_data=(tf_test_x, y_test))

In [None]:
plot_model(model, show_shapes=True, to_file='arch.png', dpi=400)

---

## Negative Learning for Noisy Labels

Note that for good performance, each stage of training requires training for at least 100 epochs, if not more. If one stage is not sufficiently trained/developed, it creates a performance bottleneck and stunts the performance of following stages and the pipeline as a whole.

### Process Data

Loading CIFAR-10 dataset from Keras datasets.

In [None]:
# load data
(x_train, y_train), (x_test, y_test) = keras.datasets.cifar10.load_data()

# generate corrupted datasets for corruption
cy_train = np.copy(y_train)

Adding noise by randomly corrupting 30% of the labels.

In [None]:
# define percent
perc = 0.2

# randomly select [perc*100]% elements from list of indices w/out replacement and sort
selected_indices = np.sort(np.random.choice(np.arange(len(x_train)),
                                            int(round(perc*len(x_train))),
                                            replace=False))

# define list to store corrupted labels
new_values = []

# loop through selected indices
for ind in tqdm(selected_indices):
    
    # get true label
    true_label = y_train[ind][0]
    
    # select random corrupted label
    corrupted = np.random.choice([i for i in range(10) if i!=true_label])
    
    # append to new values
    new_values.append(corrupted)

# assign corrupted labels    
cy_train[selected_indices] = np.array(new_values).reshape((len(new_values),1))

Generate negative labels.

In [None]:
# generate negative learning labels
ny_train = np.copy(cy_train)

# loop through training data
for ind in tqdm(range(len(ny_train))):
    listed_label = cy_train[ind][0]
    negative_label = np.random.choice([i for i in range(10) if i!=listed_label])
    ny_train[ind] = negative_label

Initially train standard model via negative learning.

In [None]:
# special loss function
import keras.backend as K
def special_loss(y_true, y_pred):
    y_true, y_pred = tf.cast(y_true, tf.float32), tf.cast(y_pred, tf.float32)
    log_inp = tf.clip_by_value(1-y_pred, 1e-5, 1.-1e-5)
    return -tf.reduce_mean(y_true * K.log(log_inp), axis=-1)

# create and train model
from tensorflow.python.keras.applications.efficientnet import EfficientNetB3
inp = L.Input((32,32,3))
base_model = EfficientNetB3(
    include_top=True, weights=None, input_tensor=inp, classes=10
)
nl_model = keras.models.Model(inputs=inp,
                              outputs=base_model.output)
sgd = keras.optimizers.SGD(learning_rate=0.025, momentum=0.1, nesterov=True)
nl_model.compile(optimizer=sgd,
                 loss=special_loss)
nl_model.fit(x_train, keras.utils.to_categorical(ny_train),
             epochs=100)

Selective negative learning - filter out problematic data.

In [None]:
# get predictions
predictions = nl_model.predict(x_train)

# get mask to select data w/ prediction confidence for positive label > 1/10
mask = [True if predictions[ind][cy_train[ind][0]] > 1/10 else False for ind in tqdm(range(len(x_train)))]

# select data for mask
selnl_x_train = x_train[mask]
selnl_y_train = ny_train[mask]

# continue to fit model
nl_model.fit(selnl_x_train, selnl_y_train,
             epochs=100)

Selective positive learning - filter out problematic data and train SelPL model on filtered data

In [None]:
# get predictions
predictions = nl_model.predict(x_train)

# get mask to select data w/ prediction confidence for positive label > 0.4
mask = [True if predictions[ind][ny_train[ind][0]] > 0.4 else False for ind in tqdm(range(len(x_train)))]

# select data for mask
selpl_x_train = x_train[mask]
selpl_y_train = cy_train[mask]

# recompile with new loss
nl_model.compile(optimizer='adam',
                 loss='sparse_categorical_crossentropy')

# continue to fit model
nl_model.fit(selpl_x_train, selpl_y_train,
             epochs=100)

Separate into corrupted and noncorrupted datasets.

In [None]:
# get predictions
predictions = nl_model.predict(x_train)

# get mask to separate into corrupted & noncorrupted
mask = [True if predictions[ind][cy_train[ind][0]] > 0.5 else False for ind in tqdm(range(len(x_train)))]

# select data for mask
clean_x_train = x_train[mask]
clean_y_train = cy_train[mask]
unclean_x_train = x_train[[not boolean for boolean in mask]]

# fit model on clean dataset
inp = L.Input((32,32,3))
base_model = EfficientNetB3(
    include_top=True, weights=None, input_tensor=inp, classes=10
)
model = keras.models.Model(inputs=inp,
                           outputs=base_model.output)
model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
               metrics=['accuracy'])
model.fit(clean_x_train, clean_y_train,
          epochs=10)

Get predictions of cleaned model on unclean dataset + correct corrupted labels.

In [None]:
pred_labels = model.predict(unclean_x_train)
unclean_y_train = []
for ind in tqdm(range(len(unclean_x_train))):
    label = np.where(pred_labels[ind]==np.max(pred_labels[ind]))
    unclean_y_train.append(label)
unclean_y_train = np.array(unclean_y_train).reshape((len(unclean_y_train),1))

Create final cleaned dataset and train new model on clean dataset.

In [None]:
# concatenate datasets (unclean datasets have been cleaned)
final_clean_x_train = np.concatenate([clean_x_train, unclean_x_train])
final_clean_y_train = np.concatenate([clean_y_train, unclean_y_train])

# fit model on complete final dataset
inp = L.Input((32,32,3))
xception = EfficientNetB3(
    include_top=True, weights=None, input_tensor=inp, classes=10
)
model = keras.models.Model(inputs=inp,
                           outputs=xception.output)
model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
               metrics=['accuracy'])
model.fit(final_clean_x_train, final_clean_y_train,
          epochs=10)

Compare this performance to a model trained on the original corrupted dataset.

---

## Siamese Networks

### Setting Up Data Pairs

In [None]:
# configurations
class_size = 10

# load MNIST data
(X_train, y_train), (X_test, y_test) = keras.datasets.mnist.load_data()

# select x instances from each class
samp_x_train, samp_y_train = [], []
for digit in range(10):
    indices = (y_train == digit).nonzero()[0]
    selected = np.random.choice(indices, size=class_size)
    samp_x_train.append(X_train[selected])
    samp_y_train.append(y_train[selected])

# convert data
samp_x_train = np.array(samp_x_train)/255
samp_x_train = samp_x_train.reshape((10*class_size, 28, 28, 1))
samp_y_train = np.array(samp_y_train)/255
samp_y_train = samp_y_train.reshape((10*class_size, 1))

# generate pairs
fs_x_train_1, fs_x_train_2, fs_y_train = [], [], []
indices = list(range(10*class_size))
for ind_1 in indices:
    label1 = samp_y_train[ind_1]
    for ind_2 in indices[ind_1:]:
        label2 = samp_y_train[ind_2]
        
        # append x
        fs_x_train_1.append(samp_x_train[ind_1])
        fs_x_train_2.append(samp_x_train[ind_2])
        
        # append similarity label
        if label1 == label2:
            fs_y_train.append(1)
        else:
            fs_y_train.append(0)
fs_x_train_1 = np.array(fs_x_train_1)
fs_x_train_2 = np.array(fs_x_train_2)
fs_y_train = np.array(fs_y_train)

### Build Siamese Network Architecture

In [None]:
from keras import layers
inp = layers.Input((28, 28, 1))
x = tf.keras.layers.BatchNormalization()(inp)
x = layers.Conv2D(4, (5, 5), activation="tanh")(x)
x = layers.AveragePooling2D(pool_size=(2, 2))(x)
x = layers.Conv2D(16, (5, 5), activation="tanh")(x)
x = layers.AveragePooling2D(pool_size=(2, 2))(x)
x = layers.Flatten()(x)

x = tf.keras.layers.BatchNormalization()(x)
x = layers.Dense(10, activation="tanh")(x)
embedding_network = keras.Model(inp, x)


input_1 = layers.Input((28, 28, 1), name='inp1')
input_2 = layers.Input((28, 28, 1), name='inp2')

tower_1 = embedding_network(input_1)
tower_2 = embedding_network(input_2)

merge_layer = layers.Lambda(distance)([tower_1, tower_2])
normal_layer = tf.keras.layers.BatchNormalization()(merge_layer)
output_layer = layers.Dense(1, activation="sigmoid")(normal_layer)
model = keras.Model(inputs=[input_1, input_2], outputs=output_layer)
model.compile(optimizer='adam',
              loss='binary_crossentropy',
              metrics=['accuracy'])
model.fit({'inp1':fs_x_train_1, 'inp2':fs_x_train_2},
          fs_y_train,
          epochs=10)

In [None]:
def parallel_branch():
    inp_layer = L.Input((28,28,1))
    x = L.BatchNormalization()(inp_layer)
    x = L.Conv2D(32, (3,3), activation='relu')(x)
    x = L.Conv2D(32, (3,3), activation='relu')(x)
    x = L.MaxPooling2D((2,2))(x)
    x = L.Conv2D(64, (3,3), activation='relu')(x)
    x = L.Conv2D(64, (3,3), activation='relu')(x)
    x = L.MaxPooling2D((2,2))(x)
    x = L.Flatten()(x)
    x = L.Dense(16, activation='relu')(x)
    branch = keras.models.Model(inputs=inp_layer,
                                outputs=x)
    return branch

def distance(representations):
    reps1, reps2 = representations
    squared = K.sum(K.square(reps1-reps2), axis=1, keepdims=True)
    return K.sqrt(K.maximum(squared,K.epsilon()))

inp1 = L.Input((28,28,1), name='inp1')
inp2 = L.Input((28,28,1), name='inp2')
branch = parallel_branch()
reps1 = branch(inp1)
reps2 = branch(inp2)
dist = L.Lambda(distance)([reps1, reps2])
out = L.Dense(1, activation='sigmoid')(dist)
model = keras.models.Model(inputs={'inp1':inp1, 'inp2':inp2},
                           outputs=out)
model.compile(optimizer='adam',
              loss='binary_crossentropy',
              metrics=['accuracy'])
model.fit({'inp1':fs_x_train_1, 'inp2':fs_x_train_2},
          fs_y_train,
          epochs=100)

---