In [None]:
import pandas as pd
import numpy as np
from sklearn import metrics
import tensorflow as tf
from keras import optimizers

# 1. MNIST_784

In [None]:
from keras.datasets.mnist import load_data

(X_train, y_train), (X_test, y_test) = load_data()
print(X_test.shape, y_test.shape)

In [None]:
import matplotlib.pyplot as plt
import numpy as np
np.random.seed(0)

class_names = ['0', '1', '2', '3', '4', '5', '6', '7','8','9']
sample_size = 9
random_idx = np.random.randint(10000, size=sample_size)

plt.figure(figsize=(5,5))
for i, idx in enumerate(random_idx):
    plt.subplot(3,3,i+1)
    plt.xticks([])
    plt.yticks([])
    plt.imshow(X_test[idx], cmap='gray')
    plt.xlabel(class_names[y_test[idx]])
plt.show()

In [None]:
X_test = np.repeat(X_test[..., np.newaxis], 3, -1)
X_test.shape

In [None]:
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np

class_names = ['0', '1', '2', '3', '4', '5', '6', '7','8','9']
y_list = []

for i in range(100):
    if y_test[i] in y_list:
        continue
    y_list.append(y_test[i])    
    some_digit = X_test[i]
    # some_digit_image = some_digit.reshape(28, 28)

    plt.imshow(some_digit, cmap = "binary")
    plt.axis("off")
    plt.show()
    print(y_test[i], class_names[y_test[i]])

In [None]:
pd.Series(y_test).value_counts()

In [None]:
idx_7 = []
idx_9 = []
for i in range(len(y_test)):
    if list(y_test)[i] == 7:
        idx_7.append(i)
    if list(y_test)[i] == 9:
        idx_9.append(i)
print(len(idx_7), len(idx_9))

In [None]:
idx = idx_7[:50] + idx_9[:50]
idx.sort()

In [None]:
X = []
y = []
for i in idx:
    X.append(X_test[i])
    y.append(y_test[i])
X = np.array(X)
y = np.array(y)
print(X.shape, y.shape)

In [None]:
pd.Series(y).value_counts()

In [None]:
y_hard = [0.00 if x==7 else x for x in y]
y_hard = [1.00 if x==9 else x for x in y_hard]
pd.Series(y_hard).value_counts()

In [None]:
from sklearn.model_selection import StratifiedKFold
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state = 2)

epochs = 10
batch = 32
decay=0.0001

from keras.callbacks import EarlyStopping, ModelCheckpoint

In [None]:
# Without Dropout

# transformer
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_addons as tfa

num_classes = 1
input_shape = (28, 28, 3)

image_size = 54  # We'll resize input images to this size
patch_size = 6  # Size of the patches to be extract from the input images
num_patches = (image_size // patch_size) ** 2
projection_dim = 64
num_heads = 4
transformer_units = [
    projection_dim * 2,
    projection_dim,
]  # Size of the transformer layers
transformer_layers = 8
mlp_head_units = [2048, 1024]  # Size of the dense layers of the final classifier

data_augmentation = keras.Sequential(
    [
        layers.Normalization(),
        layers.Resizing(image_size, image_size),
        layers.RandomFlip("horizontal"),
        layers.RandomRotation(factor=0.02),
        layers.RandomZoom(
            height_factor=0.2, width_factor=0.2
        ),
    ],
    name="data_augmentation",
)

def mlp(x, hidden_units, dropout_rate):
    for units in hidden_units:
        x = layers.Dense(units, activation=tf.nn.gelu)(x)
        x = layers.Dropout(dropout_rate)(x)
    return x

class Patches(layers.Layer):
    def __init__(self, patch_size):
        super().__init__()
        self.patch_size = patch_size

    def call(self, images):
        batch_size = tf.shape(images)[0]
        patches = tf.image.extract_patches(
            images=images,
            sizes=[1, self.patch_size, self.patch_size, 1],
            strides=[1, self.patch_size, self.patch_size, 1],
            rates=[1, 1, 1, 1],
            padding="VALID",
        )
        patch_dims = patches.shape[-1]
        patches = tf.reshape(patches, [batch_size, -1, patch_dims])
        return patches
    
class PatchEncoder(layers.Layer):
    def __init__(self, num_patches, projection_dim):
        super().__init__()
        self.num_patches = num_patches
        self.projection = layers.Dense(units=projection_dim)
        self.position_embedding = layers.Embedding(
            input_dim=num_patches, output_dim=projection_dim
        )

    def call(self, patch):
        positions = tf.range(start=0, limit=self.num_patches, delta=1)
        encoded = self.projection(patch) + self.position_embedding(positions)
        return encoded
    
def create_vit_classifier():
    inputs = layers.Input(shape=input_shape)
    # Augment data.
    augmented = data_augmentation(inputs)
    # Create patches.
    patches = Patches(patch_size)(augmented)
    # Encode patches.
    encoded_patches = PatchEncoder(num_patches, projection_dim)(patches)

    # Create multiple layers of the Transformer block.
    for _ in range(transformer_layers):
        # Layer normalization 1.
        x1 = layers.LayerNormalization(epsilon=1e-6)(encoded_patches)
        # Create a multi-head attention layer.
        attention_output = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=projection_dim, dropout=0
        )(x1, x1)
        # Skip connection 1.
        x2 = layers.Add()([attention_output, encoded_patches])
        # Layer normalization 2.
        x3 = layers.LayerNormalization(epsilon=1e-6)(x2)
        # MLP.
        x3 = mlp(x3, hidden_units=transformer_units, dropout_rate=0)
        # Skip connection 2.
        encoded_patches = layers.Add()([x3, x2])

    # Create a [batch_size, projection_dim] tensor.
    representation = layers.LayerNormalization(epsilon=1e-6)(encoded_patches)
    representation = layers.Flatten()(representation)
    representation = layers.Dropout(0)(representation)
    # Add MLP.
    features = mlp(representation, hidden_units=mlp_head_units, dropout_rate=0)
    # Classify outputs.
    logits = layers.Dense(num_classes, activation='sigmoid')(features)
    # Create the Keras model.
    model = keras.Model(inputs=inputs, outputs=logits)
    return model

In [None]:
X

In [None]:
y_hard = pd.DataFrame(y_hard)
y_hard

# 1-0. Generating Prob_labels

In [None]:
gen_model = create_vit_classifier()   
gen_model.compile(loss='BinaryCrossentropy', optimizer=optimizers.Adam(learning_rate = 0.0002), metrics=['accuracy'])
history = gen_model.fit(X, y_hard, validation_split=0.2, epochs=epochs, verbose=0, batch_size=batch)#, callbacks=[early_stopping])
plt.plot(history.history['loss'], label='loss')
plt.ylim([0, 1])
plt.xlabel('Iteration',fontweight="bold",fontsize = 15)
plt.ylabel('Loss',fontweight="bold",fontsize = 15)
plt.title("Cost Function",fontweight="bold",fontsize = 20)
plt.legend()
plt.show()
result = gen_model.predict(X, verbose=0)
prob_label = list(result.reshape(len(X),))
prob_label

# 1-1. Focal(Hard) and SLS(Hard/diverse alphas)_option#1

In [None]:
for t in range(10):    # 10 times repeat    
    res = pd.DataFrame({'Focal':[0, 0, 0, 0, 0]}, index = ['Acc','Pre','Rec','F1','R-AUC']) 
    # Focal
    print('#'*50,'Focal','#'*50)
    list_acc = []
    list_pre = []
    list_rec = []
    list_f1 = []
    list_rauc = []   
    focal_model = create_vit_classifier()   

    n_iter = 0
    for train_index, test_index in skf.split(X, y_hard):  # straticiation by y_hard(binary label)
        n_iter += 1
        X_train = X[train_index]
        y_train= y_hard.iloc[train_index]
        if n_iter == 1:
            print(y_train.value_counts())
        X_test = X[test_index]
        y_test= y_hard.iloc[test_index]
#         print('#'*10,'{0}th CV'.format(n_iter),'#'*10)
        X_train = np.array(X_train)
        y_train = np.array(y_train)
        y_train = y_train.astype(float)
        X_test = np.array(X_test)
        y_test = np.array(y_test)
        y_test = y_test.astype(float)

        focal_model.compile(loss='BinaryFocalCrossentropy', optimizer=optimizers.Adam(learning_rate = 0.0002), metrics=['accuracy'])
        history = focal_model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=epochs, verbose=0, batch_size=batch)#, callbacks=[early_stopping])
#         plt.plot(history.history['loss'], label='loss')
#         plt.ylim([0, 1])
#         plt.xlabel('Iteration',fontweight="bold",fontsize = 15)
#         plt.ylabel('Loss',fontweight="bold",fontsize = 15)
#         plt.title("Cost Function",fontweight="bold",fontsize = 20)
#         plt.legend()
#         plt.show()
        predicted = np.round(focal_model.predict(X_test, verbose=0))
        list_acc.append(metrics.accuracy_score(y_test, predicted))
        list_pre.append(metrics.precision_score(y_test, predicted))
        list_rec.append(metrics.recall_score(y_test, predicted))
        list_f1.append(metrics.f1_score(y_test, predicted))
        list_rauc.append(metrics.roc_auc_score(y_test, predicted))
    res['Focal'] = [np.mean(list_acc), np.mean(list_pre), np.mean(list_rec), np.mean(list_f1), np.mean(list_rauc)]
    print([np.mean(list_acc), np.mean(list_pre), np.mean(list_rec), np.mean(list_f1), np.mean(list_rauc)])
    
    B = [0.00, 0.01, 0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09, 0.10]  # SLS with LS
    c = 0.1  # criterion decides easy/hard
    for b in B:
        print('#'*50,'SLS',b,'#'*50)
        y_005 = []
        for i in range(len(y_hard)):
            if list(y_hard[0])[i] == 0:
                if prob_label[i] <= c:
                    y_005.append(b)  # easy sample
                else:
                    y_005.append(0) # (or 0-b) hard sample
            if list(y_hard[0])[i] == 1:
                if prob_label[i] >= 1-c:
                    y_005.append(1-b)  # easy sample
                else:
                    y_005.append(1) # (or 1+b) hard sample
        y_005 = pd.DataFrame(y_005)     

        bce005_acc = []
        bce005_pre = []
        bce005_rec = []
        bce005_f1 = []
        bce005_rocauc = []
        model_005 = create_vit_classifier()
#             early_stopping = EarlyStopping(monitor='val_loss', patience=5)
        n_iter = 0
        for train_index, test_index in skf.split(X, y_hard):  # straticiation by y_hard(binary label)
            n_iter += 1
            X_train = X[train_index]
            y_005_train= y_005.iloc[train_index]
            if n_iter == 1:
                print(y_005_train.value_counts())
            X_test = X[test_index]
            y_test= y_hard.iloc[test_index]  # test with real(actual) label y
#                 print('#'*10,'{0}th CV'.format(n_iter),'#'*10)
            X_train = np.array(X_train)
            y_005_train = np.array(y_005_train)
            y_005_train = y_005_train.astype(float)
            X_test = np.array(X_test)
            y_test = np.array(y_test)
            y_test = y_test.astype(float)

            # MLP_BCE(y_005)
            model_005.compile(loss='BinaryCrossentropy', optimizer=optimizers.Adam(learning_rate = 0.0002), metrics=['accuracy'])
            history = model_005.fit(X_train, y_005_train, validation_data=(X_test, y_test), epochs=epochs, verbose=0, batch_size=batch)#, callbacks=[early_stopping])
#             plt.plot(history.history['loss'], label='loss')
#             plt.ylim([0, 1])
#             plt.xlabel('Iteration',fontweight="bold",fontsize = 15)
#             plt.ylabel('Loss',fontweight="bold",fontsize = 15)
#             plt.title("Cost Function",fontweight="bold",fontsize = 20)
#             plt.legend()
#             plt.show()
            predicted = np.round(model_005.predict(X_test, verbose=0))
            bce005_acc.append(metrics.accuracy_score(y_test, predicted))
            bce005_pre.append(metrics.precision_score(y_test, predicted))
            bce005_rec.append(metrics.recall_score(y_test, predicted))
            bce005_f1.append(metrics.f1_score(y_test, predicted))
            bce005_rocauc.append(metrics.roc_auc_score(y_test, predicted))
        res['SLS({})'.format(b)] = [np.mean(bce005_acc), np.mean(bce005_pre), np.mean(bce005_rec), np.mean(bce005_f1), np.mean(bce005_rocauc)]
        print([np.mean(bce005_acc), np.mean(bce005_pre), np.mean(bce005_rec), np.mean(bce005_f1), np.mean(bce005_rocauc)])        
    res.to_csv("ViT_MNIST_5CV(SLS_opt#1_c0.1).csv", mode = 'a', float_format='%.4g')

In [None]:
# smote
res = pd.read_csv("ViT_MNIST_5CV(SLS_opt#1_c0.1).csv")
res = res.dropna(axis=0)
res = res.rename(columns={'SLS(0.0)':'Hard'})
df_acc = res[res.iloc[:,0] == 'Acc']
df_acc = df_acc.reset_index(drop=True)
df_acc = df_acc.iloc[:,1:].astype(float)
col_name = df_acc.columns
ave = []
std = []
for i in range(12):
    ave.append(np.mean(list(df_acc.iloc[:,i])))
    std.append(np.std(list(df_acc.iloc[:,i])))
final = pd.DataFrame(ave, index=col_name, columns=["mean"])
final['std'] = std
final.sort_values("mean", ascending=False)

In [None]:
len(df_acc)

# 2. Fashion_MNIST

In [None]:
from keras.datasets.fashion_mnist import load_data

(X_train, y_train), (X_test, y_test) = load_data()
print(X_test.shape, y_test.shape)

In [None]:
import matplotlib.pyplot as plt
import numpy as np
np.random.seed(0)

class_names = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat', 'Sandal', 'Shirt', 'Sneaker','Bag','Ankle boot']
sample_size = 9
random_idx = np.random.randint(10000, size=sample_size)

plt.figure(figsize=(5,5))
for i, idx in enumerate(random_idx):
    plt.subplot(3,3,i+1)
    plt.xticks([])
    plt.yticks([])
    plt.imshow(X_test[idx], cmap='gray')
    plt.xlabel(class_names[y_test[idx]])
plt.show()

In [None]:
X_test = np.repeat(X_test[..., np.newaxis], 3, -1)
X_test.shape

In [None]:
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np

class_names = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat', 'Sandal', 'Shirt', 'Sneaker','Bag','Ankle boot']
y_list = []

for i in range(100):
    if y_test[i] in y_list:
        continue
    y_list.append(y_test[i])    
    some_digit = X_test[i]
    # some_digit_image = some_digit.reshape(28, 28)

    plt.imshow(some_digit, cmap = "binary")
    plt.axis("off")
    plt.show()
    print(y_test[i], class_names[y_test[i]])

In [None]:
pd.Series(y_test).value_counts()

In [None]:
# Picking only label (4, 6)->0 & 2->1
idx_2 = []
idx_4 = []
# idx_6 = []
for i in range(len(y_test)):
    if list(y_test)[i] == 2:
        idx_2.append(i)
    if list(y_test)[i] == 4:
        idx_4.append(i)
#     if list(y_test)[i] == 6:
#         idx_6.append(i)
print(len(idx_2), len(idx_4)) #, len(idx_6))

In [None]:
idx = idx_2[:50] + idx_4[:50] # + idx_6[:110]
idx.sort()

In [None]:
X = []
y = []
for i in idx:
    X.append(X_test[i])
    y.append(y_test[i])
X = np.array(X)
y = np.array(y)
print(X.shape, y.shape)

In [None]:
pd.Series(y).value_counts()

In [None]:
y_hard = [0.00 if x==2 else x for x in y]
y_hard = [1.00 if x==4 else x for x in y_hard]
pd.Series(y_hard).value_counts()

In [None]:
from sklearn.model_selection import StratifiedKFold
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state = 2)

epochs = 10
batch = 32
decay=0.0001

from keras.callbacks import EarlyStopping, ModelCheckpoint

In [None]:
# Without Dropout

# transformer
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_addons as tfa

num_classes = 1
input_shape = (28, 28, 3)

image_size = 54  # We'll resize input images to this size
patch_size = 6  # Size of the patches to be extract from the input images
num_patches = (image_size // patch_size) ** 2
projection_dim = 64
num_heads = 4
transformer_units = [
    projection_dim * 2,
    projection_dim,
]  # Size of the transformer layers
transformer_layers = 8
mlp_head_units = [2048, 1024]  # Size of the dense layers of the final classifier

data_augmentation = keras.Sequential(
    [
        layers.Normalization(),
        layers.Resizing(image_size, image_size),
        layers.RandomFlip("horizontal"),
        layers.RandomRotation(factor=0.02),
        layers.RandomZoom(
            height_factor=0.2, width_factor=0.2
        ),
    ],
    name="data_augmentation",
)

def mlp(x, hidden_units, dropout_rate):
    for units in hidden_units:
        x = layers.Dense(units, activation=tf.nn.gelu)(x)
        x = layers.Dropout(dropout_rate)(x)
    return x

class Patches(layers.Layer):
    def __init__(self, patch_size):
        super().__init__()
        self.patch_size = patch_size

    def call(self, images):
        batch_size = tf.shape(images)[0]
        patches = tf.image.extract_patches(
            images=images,
            sizes=[1, self.patch_size, self.patch_size, 1],
            strides=[1, self.patch_size, self.patch_size, 1],
            rates=[1, 1, 1, 1],
            padding="VALID",
        )
        patch_dims = patches.shape[-1]
        patches = tf.reshape(patches, [batch_size, -1, patch_dims])
        return patches
    
class PatchEncoder(layers.Layer):
    def __init__(self, num_patches, projection_dim):
        super().__init__()
        self.num_patches = num_patches
        self.projection = layers.Dense(units=projection_dim)
        self.position_embedding = layers.Embedding(
            input_dim=num_patches, output_dim=projection_dim
        )

    def call(self, patch):
        positions = tf.range(start=0, limit=self.num_patches, delta=1)
        encoded = self.projection(patch) + self.position_embedding(positions)
        return encoded
    
def create_vit_classifier():
    inputs = layers.Input(shape=input_shape)
    # Augment data.
    augmented = data_augmentation(inputs)
    # Create patches.
    patches = Patches(patch_size)(augmented)
    # Encode patches.
    encoded_patches = PatchEncoder(num_patches, projection_dim)(patches)

    # Create multiple layers of the Transformer block.
    for _ in range(transformer_layers):
        # Layer normalization 1.
        x1 = layers.LayerNormalization(epsilon=1e-6)(encoded_patches)
        # Create a multi-head attention layer.
        attention_output = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=projection_dim, dropout=0
        )(x1, x1)
        # Skip connection 1.
        x2 = layers.Add()([attention_output, encoded_patches])
        # Layer normalization 2.
        x3 = layers.LayerNormalization(epsilon=1e-6)(x2)
        # MLP.
        x3 = mlp(x3, hidden_units=transformer_units, dropout_rate=0)
        # Skip connection 2.
        encoded_patches = layers.Add()([x3, x2])

    # Create a [batch_size, projection_dim] tensor.
    representation = layers.LayerNormalization(epsilon=1e-6)(encoded_patches)
    representation = layers.Flatten()(representation)
    representation = layers.Dropout(0)(representation)
    # Add MLP.
    features = mlp(representation, hidden_units=mlp_head_units, dropout_rate=0)
    # Classify outputs.
    logits = layers.Dense(num_classes, activation='sigmoid')(features)
    # Create the Keras model.
    model = keras.Model(inputs=inputs, outputs=logits)
    return model

In [None]:
X

In [None]:
y_hard = pd.DataFrame(y_hard)
y_hard

# 2-0. Generating Prob_labels

In [None]:
gen_model = create_vit_classifier()   
gen_model.compile(loss='BinaryCrossentropy', optimizer=optimizers.Adam(learning_rate = 0.0001), metrics=['accuracy'])
history = gen_model.fit(X, y_hard, validation_split=0.2, epochs=epochs, verbose=0, batch_size=batch)#, callbacks=[early_stopping])
plt.plot(history.history['loss'], label='loss')
plt.ylim([0, 1])
plt.xlabel('Iteration',fontweight="bold",fontsize = 15)
plt.ylabel('Loss',fontweight="bold",fontsize = 15)
plt.title("Cost Function",fontweight="bold",fontsize = 20)
plt.legend()
plt.show()
result = gen_model.predict(X, verbose=0)
prob_label = list(result.reshape(len(X),))
prob_label

# 2-1. Focal(Hard) and SLS(Hard/diverse alphas)_option#1

In [None]:
for t in range(10):    # 10 times repeat    
    res = pd.DataFrame({'Focal':[0, 0, 0, 0, 0]}, index = ['Acc','Pre','Rec','F1','R-AUC']) 
    # Focal
    print('#'*50,'Focal','#'*50)
    list_acc = []
    list_pre = []
    list_rec = []
    list_f1 = []
    list_rauc = []   
    focal_model = create_vit_classifier()   

    n_iter = 0
    for train_index, test_index in skf.split(X, y_hard):  # straticiation by y_hard(binary label)
        n_iter += 1
        X_train = X[train_index]
        y_train= y_hard.iloc[train_index]
        if n_iter == 1:
            print(y_train.value_counts())
        X_test = X[test_index]
        y_test= y_hard.iloc[test_index]
#         print('#'*10,'{0}th CV'.format(n_iter),'#'*10)
        X_train = np.array(X_train)
        y_train = np.array(y_train)
        y_train = y_train.astype(float)
        X_test = np.array(X_test)
        y_test = np.array(y_test)
        y_test = y_test.astype(float)

        focal_model.compile(loss='BinaryFocalCrossentropy', optimizer=optimizers.Adam(learning_rate = 0.0001), metrics=['accuracy'])
        history = focal_model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=epochs, verbose=0, batch_size=batch)#, callbacks=[early_stopping])
#         plt.plot(history.history['loss'], label='loss')
#         plt.ylim([0, 1])
#         plt.xlabel('Iteration',fontweight="bold",fontsize = 15)
#         plt.ylabel('Loss',fontweight="bold",fontsize = 15)
#         plt.title("Cost Function",fontweight="bold",fontsize = 20)
#         plt.legend()
#         plt.show()
        predicted = np.round(focal_model.predict(X_test, verbose=0))
        list_acc.append(metrics.accuracy_score(y_test, predicted))
        list_pre.append(metrics.precision_score(y_test, predicted))
        list_rec.append(metrics.recall_score(y_test, predicted))
        list_f1.append(metrics.f1_score(y_test, predicted))
        list_rauc.append(metrics.roc_auc_score(y_test, predicted))
    res['Focal'] = [np.mean(list_acc), np.mean(list_pre), np.mean(list_rec), np.mean(list_f1), np.mean(list_rauc)]
    print([np.mean(list_acc), np.mean(list_pre), np.mean(list_rec), np.mean(list_f1), np.mean(list_rauc)])
    
    B = [0.00, 0.01, 0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09, 0.10]  # SLS with LS
    c = 0.1  # criterion decides easy/hard
    for b in B:
        print('#'*50,'SLS',b,'#'*50)
        y_005 = []
        for i in range(len(y_hard)):
            if list(y_hard[0])[i] == 0:
                if prob_label[i] <= c:
                    y_005.append(b)  # easy sample
                else:
                    y_005.append(0) # (or 0-b) hard sample
            if list(y_hard[0])[i] == 1:
                if prob_label[i] >= 1-c:
                    y_005.append(1-b)  # easy sample
                else:
                    y_005.append(1) # (or 1+b) hard sample
        y_005 = pd.DataFrame(y_005)     

        bce005_acc = []
        bce005_pre = []
        bce005_rec = []
        bce005_f1 = []
        bce005_rocauc = []
        model_005 = create_vit_classifier()
#             early_stopping = EarlyStopping(monitor='val_loss', patience=5)
        n_iter = 0
        for train_index, test_index in skf.split(X, y_hard):  # straticiation by y_hard(binary label)
            n_iter += 1
            X_train = X[train_index]
            y_005_train= y_005.iloc[train_index]
            if n_iter == 1:
                print(y_005_train.value_counts())
            X_test = X[test_index]
            y_test= y_hard.iloc[test_index]  # test with real(actual) label y
#                 print('#'*10,'{0}th CV'.format(n_iter),'#'*10)
            X_train = np.array(X_train)
            y_005_train = np.array(y_005_train)
            y_005_train = y_005_train.astype(float)
            X_test = np.array(X_test)
            y_test = np.array(y_test)
            y_test = y_test.astype(float)

            # MLP_BCE(y_005)
            model_005.compile(loss='BinaryCrossentropy', optimizer=optimizers.Adam(learning_rate = 0.0001), metrics=['accuracy'])
            history = model_005.fit(X_train, y_005_train, validation_data=(X_test, y_test), epochs=epochs, verbose=0, batch_size=batch)#, callbacks=[early_stopping])
#             plt.plot(history.history['loss'], label='loss')
#             plt.ylim([0, 1])
#             plt.xlabel('Iteration',fontweight="bold",fontsize = 15)
#             plt.ylabel('Loss',fontweight="bold",fontsize = 15)
#             plt.title("Cost Function",fontweight="bold",fontsize = 20)
#             plt.legend()
#             plt.show()
            predicted = np.round(model_005.predict(X_test, verbose=0))
            bce005_acc.append(metrics.accuracy_score(y_test, predicted))
            bce005_pre.append(metrics.precision_score(y_test, predicted))
            bce005_rec.append(metrics.recall_score(y_test, predicted))
            bce005_f1.append(metrics.f1_score(y_test, predicted))
            bce005_rocauc.append(metrics.roc_auc_score(y_test, predicted))
        res['SLS({})'.format(b)] = [np.mean(bce005_acc), np.mean(bce005_pre), np.mean(bce005_rec), np.mean(bce005_f1), np.mean(bce005_rocauc)]
        print([np.mean(bce005_acc), np.mean(bce005_pre), np.mean(bce005_rec), np.mean(bce005_f1), np.mean(bce005_rocauc)])        
    res.to_csv("ViT_F_MNIST_5CV(SLS_opt#1_c0.1).csv", mode = 'a', float_format='%.4g')

In [None]:
# smote
res = pd.read_csv("ViT_F_MNIST_5CV(SLS_opt#1_c0.1).csv")
res = res.dropna(axis=0)
res = res.rename(columns={'SLS(0.0)':'Hard'})
df_acc = res[res.iloc[:,0] == 'Acc']
df_acc = df_acc.reset_index(drop=True)
df_acc = df_acc.iloc[:,1:].astype(float)
col_name = df_acc.columns
ave = []
std = []
for i in range(12):
    ave.append(np.mean(list(df_acc.iloc[:,i])))
    std.append(np.std(list(df_acc.iloc[:,i])))
final = pd.DataFrame(ave, index=col_name, columns=["mean"])
final['std'] = std
final.sort_values("mean", ascending=False)

In [None]:
len(df_acc)

# 3. CIFAR-10

In [None]:
from tensorflow.keras.datasets import cifar10

(X_train, y_train), (X_test, y_test) = cifar10.load_data()
print(X_test.shape, y_test.shape)

In [None]:
y_test = y_test.reshape(10000,)

In [None]:
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np

class_names = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse','ship','truck']
y_list = []

for i in range(100):
    if y_test[i] in y_list:
        continue
    y_list.append(y_test[i])    
    some_digit = X_test[i]
    # some_digit_image = some_digit.reshape(28, 28)

    plt.imshow(some_digit, cmap = "binary")
    plt.axis("off")
    plt.show()
    print(y_test[i], class_names[y_test[i]])

In [None]:
pd.Series(y_test).value_counts()

In [None]:
# Picking only label (1, 7)->0 & 9->1
idx_1 = []
# idx_7 = []
idx_9 = []
for i in range(len(y_test)):
    if list(y_test)[i] == 1:
        idx_1.append(i)
#     if list(y_test)[i] == 7:
#         idx_7.append(i)
    if list(y_test)[i] == 9:
        idx_9.append(i)
print(len(idx_1), len(idx_9))

In [None]:
idx = idx_1[:50] + idx_9[:50]
idx.sort()

In [None]:
X = []
y = []
for i in idx:
    X.append(X_test[i])
    y.append(y_test[i])
X = np.array(X)
y = np.array(y)
print(X.shape, y.shape)

In [None]:
pd.Series(y).value_counts()

In [None]:
y_hard = [0.00 if x==1 else x for x in y]
y_hard = [1.00 if x==9 else x for x in y_hard]
pd.Series(y_hard).value_counts()

In [None]:
from sklearn.model_selection import StratifiedKFold
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state = 2)

epochs = 10
batch = 32
decay=0.0001

from keras.callbacks import EarlyStopping, ModelCheckpoint

In [None]:
# Without Dropout

# transformer
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_addons as tfa

num_classes = 1
input_shape = (32, 32, 3)

image_size = 54  # We'll resize input images to this size
patch_size = 6  # Size of the patches to be extract from the input images
num_patches = (image_size // patch_size) ** 2
projection_dim = 64
num_heads = 4
transformer_units = [
    projection_dim * 2,
    projection_dim,
]  # Size of the transformer layers
transformer_layers = 8
mlp_head_units = [2048, 1024]  # Size of the dense layers of the final classifier

data_augmentation = keras.Sequential(
    [
        layers.Normalization(),
        layers.Resizing(image_size, image_size),
        layers.RandomFlip("horizontal"),
        layers.RandomRotation(factor=0.02),
        layers.RandomZoom(
            height_factor=0.2, width_factor=0.2
        ),
    ],
    name="data_augmentation",
)

def mlp(x, hidden_units, dropout_rate):
    for units in hidden_units:
        x = layers.Dense(units, activation=tf.nn.gelu)(x)
        x = layers.Dropout(dropout_rate)(x)
    return x

class Patches(layers.Layer):
    def __init__(self, patch_size):
        super().__init__()
        self.patch_size = patch_size

    def call(self, images):
        batch_size = tf.shape(images)[0]
        patches = tf.image.extract_patches(
            images=images,
            sizes=[1, self.patch_size, self.patch_size, 1],
            strides=[1, self.patch_size, self.patch_size, 1],
            rates=[1, 1, 1, 1],
            padding="VALID",
        )
        patch_dims = patches.shape[-1]
        patches = tf.reshape(patches, [batch_size, -1, patch_dims])
        return patches
    
class PatchEncoder(layers.Layer):
    def __init__(self, num_patches, projection_dim):
        super().__init__()
        self.num_patches = num_patches
        self.projection = layers.Dense(units=projection_dim)
        self.position_embedding = layers.Embedding(
            input_dim=num_patches, output_dim=projection_dim
        )

    def call(self, patch):
        positions = tf.range(start=0, limit=self.num_patches, delta=1)
        encoded = self.projection(patch) + self.position_embedding(positions)
        return encoded
    
def create_vit_classifier():
    inputs = layers.Input(shape=input_shape)
    # Augment data.
    augmented = data_augmentation(inputs)
    # Create patches.
    patches = Patches(patch_size)(augmented)
    # Encode patches.
    encoded_patches = PatchEncoder(num_patches, projection_dim)(patches)

    # Create multiple layers of the Transformer block.
    for _ in range(transformer_layers):
        # Layer normalization 1.
        x1 = layers.LayerNormalization(epsilon=1e-6)(encoded_patches)
        # Create a multi-head attention layer.
        attention_output = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=projection_dim, dropout=0
        )(x1, x1)
        # Skip connection 1.
        x2 = layers.Add()([attention_output, encoded_patches])
        # Layer normalization 2.
        x3 = layers.LayerNormalization(epsilon=1e-6)(x2)
        # MLP.
        x3 = mlp(x3, hidden_units=transformer_units, dropout_rate=0)
        # Skip connection 2.
        encoded_patches = layers.Add()([x3, x2])

    # Create a [batch_size, projection_dim] tensor.
    representation = layers.LayerNormalization(epsilon=1e-6)(encoded_patches)
    representation = layers.Flatten()(representation)
    representation = layers.Dropout(0)(representation)
    # Add MLP.
    features = mlp(representation, hidden_units=mlp_head_units, dropout_rate=0)
    # Classify outputs.
    logits = layers.Dense(num_classes, activation='sigmoid')(features)
    # Create the Keras model.
    model = keras.Model(inputs=inputs, outputs=logits)
    return model

In [None]:
X

In [None]:
y_hard = pd.DataFrame(y_hard)
y_hard

# 3-0. Generating Prob_labels

In [None]:
gen_model = create_vit_classifier()   
gen_model.compile(loss='BinaryCrossentropy', optimizer=optimizers.Adam(learning_rate = 0.0002), metrics=['accuracy'])
history = gen_model.fit(X, y_hard, validation_split=0.2, epochs=epochs, verbose=0, batch_size=batch)#, callbacks=[early_stopping])
plt.plot(history.history['loss'], label='loss')
plt.ylim([0, 1])
plt.xlabel('Iteration',fontweight="bold",fontsize = 15)
plt.ylabel('Loss',fontweight="bold",fontsize = 15)
plt.title("Cost Function",fontweight="bold",fontsize = 20)
plt.legend()
plt.show()
result = gen_model.predict(X, verbose=0)
prob_label = list(result.reshape(len(X),))
prob_label

# 3-1. Focal(Hard) and SLS(Hard/diverse alphas)_option#1

In [None]:
for t in range(10):    # 10 times repeat    
    res = pd.DataFrame({'Focal':[0, 0, 0, 0, 0]}, index = ['Acc','Pre','Rec','F1','R-AUC']) 
    # Focal
    print('#'*50,'Focal','#'*50)
    list_acc = []
    list_pre = []
    list_rec = []
    list_f1 = []
    list_rauc = []   
    focal_model = create_vit_classifier()   

    n_iter = 0
    for train_index, test_index in skf.split(X, y_hard):  # straticiation by y_hard(binary label)
        n_iter += 1
        X_train = X[train_index]
        y_train= y_hard.iloc[train_index]
        if n_iter == 1:
            print(y_train.value_counts())
        X_test = X[test_index]
        y_test= y_hard.iloc[test_index]
#         print('#'*10,'{0}th CV'.format(n_iter),'#'*10)
        X_train = np.array(X_train)
        y_train = np.array(y_train)
        y_train = y_train.astype(float)
        X_test = np.array(X_test)
        y_test = np.array(y_test)
        y_test = y_test.astype(float)

        focal_model.compile(loss='BinaryFocalCrossentropy', optimizer=optimizers.Adam(learning_rate = 0.0002), metrics=['accuracy'])
        history = focal_model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=epochs, verbose=0, batch_size=batch)#, callbacks=[early_stopping])
#         plt.plot(history.history['loss'], label='loss')
#         plt.ylim([0, 1])
#         plt.xlabel('Iteration',fontweight="bold",fontsize = 15)
#         plt.ylabel('Loss',fontweight="bold",fontsize = 15)
#         plt.title("Cost Function",fontweight="bold",fontsize = 20)
#         plt.legend()
#         plt.show()
        predicted = np.round(focal_model.predict(X_test, verbose=0))
        list_acc.append(metrics.accuracy_score(y_test, predicted))
        list_pre.append(metrics.precision_score(y_test, predicted))
        list_rec.append(metrics.recall_score(y_test, predicted))
        list_f1.append(metrics.f1_score(y_test, predicted))
        list_rauc.append(metrics.roc_auc_score(y_test, predicted))
    res['Focal'] = [np.mean(list_acc), np.mean(list_pre), np.mean(list_rec), np.mean(list_f1), np.mean(list_rauc)]
    print([np.mean(list_acc), np.mean(list_pre), np.mean(list_rec), np.mean(list_f1), np.mean(list_rauc)])
    
    B = [0.00, 0.01, 0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09, 0.10]  # SLS with LS
    c = 0.1  # criterion decides easy/hard
    for b in B:
        print('#'*50,'SLS',b,'#'*50)
        y_005 = []
        for i in range(len(y_hard)):
            if list(y_hard[0])[i] == 0:
                if prob_label[i] <= c:
                    y_005.append(b)  # easy sample
                else:
                    y_005.append(0) # (or 0-b) hard sample
            if list(y_hard[0])[i] == 1:
                if prob_label[i] >= 1-c:
                    y_005.append(1-b)  # easy sample
                else:
                    y_005.append(1) # (or 1+b) hard sample
        y_005 = pd.DataFrame(y_005)     

        bce005_acc = []
        bce005_pre = []
        bce005_rec = []
        bce005_f1 = []
        bce005_rocauc = []
        model_005 = create_vit_classifier()
#             early_stopping = EarlyStopping(monitor='val_loss', patience=5)
        n_iter = 0
        for train_index, test_index in skf.split(X, y_hard):  # straticiation by y_hard(binary label)
            n_iter += 1
            X_train = X[train_index]
            y_005_train= y_005.iloc[train_index]
            if n_iter == 1:
                print(y_005_train.value_counts())
            X_test = X[test_index]
            y_test= y_hard.iloc[test_index]  # test with real(actual) label y
#                 print('#'*10,'{0}th CV'.format(n_iter),'#'*10)
            X_train = np.array(X_train)
            y_005_train = np.array(y_005_train)
            y_005_train = y_005_train.astype(float)
            X_test = np.array(X_test)
            y_test = np.array(y_test)
            y_test = y_test.astype(float)

            # MLP_BCE(y_005)
            model_005.compile(loss='BinaryCrossentropy', optimizer=optimizers.Adam(learning_rate = 0.0002), metrics=['accuracy'])
            history = model_005.fit(X_train, y_005_train, validation_data=(X_test, y_test), epochs=epochs, verbose=0, batch_size=batch)#, callbacks=[early_stopping])
#             plt.plot(history.history['loss'], label='loss')
#             plt.ylim([0, 1])
#             plt.xlabel('Iteration',fontweight="bold",fontsize = 15)
#             plt.ylabel('Loss',fontweight="bold",fontsize = 15)
#             plt.title("Cost Function",fontweight="bold",fontsize = 20)
#             plt.legend()
#             plt.show()
            predicted = np.round(model_005.predict(X_test, verbose=0))
            bce005_acc.append(metrics.accuracy_score(y_test, predicted))
            bce005_pre.append(metrics.precision_score(y_test, predicted))
            bce005_rec.append(metrics.recall_score(y_test, predicted))
            bce005_f1.append(metrics.f1_score(y_test, predicted))
            bce005_rocauc.append(metrics.roc_auc_score(y_test, predicted))
        res['SLS({})'.format(b)] = [np.mean(bce005_acc), np.mean(bce005_pre), np.mean(bce005_rec), np.mean(bce005_f1), np.mean(bce005_rocauc)]
        print([np.mean(bce005_acc), np.mean(bce005_pre), np.mean(bce005_rec), np.mean(bce005_f1), np.mean(bce005_rocauc)])        
    res.to_csv("ViT_CIFAR_10_5CV(SLS_opt#1_c0.1).csv", mode = 'a', float_format='%.4g')

In [None]:
# smote
res = pd.read_csv("ViT_CIFAR_10_5CV(SLS_opt#1_c0.1).csv")
res = res.dropna(axis=0)
res = res.rename(columns={'SLS(0.0)':'Hard'})
df_acc = res[res.iloc[:,0] == 'Acc']
df_acc = df_acc.reset_index(drop=True)
df_acc = df_acc.iloc[:,1:].astype(float)
col_name = df_acc.columns
ave = []
std = []
for i in range(12):
    ave.append(np.mean(list(df_acc.iloc[:,i])))
    std.append(np.std(list(df_acc.iloc[:,i])))
final = pd.DataFrame(ave, index=col_name, columns=["mean"])
final['std'] = std
final.sort_values("mean", ascending=False)

In [None]:
len(df_acc)