In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
import cv2
from sklearn.model_selection import train_test_split

from zipfile import ZipFile
import os
import sys

# Load modules from the lib directory
sys.path.insert(0, "../lib")
from resnet import *

In [2]:
cnn_1 = load_resnet(classes=10, shape=(32, 32, 3))
cnn_1.trainable = False

In [257]:
def build_image_classifier(cnn: tf.keras.Model) -> tf.keras.Model:
    """
    Build a new model that takes images as input and outputs class probabilities.

    Parameters:
    -----------

    `cnn: tf.keras.Model` 
        The base CNN model to use.

    `return`
        The new model.
    """
    return tf.keras.Sequential([
        cnn,
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(units = 512),
        tf.keras.layers.Dense(units = 10, activation = "sigmoid")
    ])

In [258]:
class LabelCleaner(tf.keras.Model):

    def __init__(self, CNN: tf.keras.Model):
        super(LabelCleaner, self).__init__()

        # Base CNN model
        self.CNN = CNN
        
        # Fully connected dense layers
        self.fc_1 = tf.keras.layers.Dense(units = 20, use_bias=False)
        self.fc_2 = tf.keras.layers.Dense(units = 512)
        self.fc_3 = tf.keras.layers.Dense(units = 512, use_bias=False, activation = "relu")
        self.fc_4 = tf.keras.layers.Dense(units = 10, use_bias=False,)
        
        # Batch Normalization layers
        self.bn_1 = tf.keras.layers.BatchNormalization()
        self.bn_2 = tf.keras.layers.BatchNormalization()
        self.bn_3 = tf.keras.layers.BatchNormalization()

    def call(self, inputs):
        img, y = inputs

        # Get the CNN output
        x = self.CNN(img)

        # Embed the output of the CNN to the noisy labels
        x = tf.concat([x, y], axis = 1)
        
        x = self.fc_1(x)    # Linear followed by batch normalization
        x = self.bn_1(x)

        x = self.fc_2(x)    # Linear followed by batch normalization
        x = self.bn_2(x)

        x = self.fc_3(x)    # ReLU

        x = self.fc_4(x)    # Linear followed by batch normalization
        x = self.bn_3(x)

        x = x + y           # Residual connection

        return x

In [162]:
resnet50 = tf.keras.applications.ResNet50(weights="imagenet", include_top=False, input_shape=(32, 32, 3))
resnet50.trainable = False
cnn_2 = tf.keras.models.Sequential([
    resnet50,
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(units = 10, activation = "softmax")
])

In [147]:
resnet18 = ResNet18(10)
resnet18.build(input_shape=(None, 32, 32, 3))

cnn_3 = tf.keras.models.Sequential([
    resnet18,
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(units = 10, activation = "softmax")
])

In [5]:
# [DO NOT MODIFY THIS CELL]

n_images: int = 50_000
n_noisy: int = 40_000
n_clean: int = n_images - n_noisy

images : np.ndarray = np.empty((n_images, 32, 32, 3), dtype=np.float32)

# Load the data
for i in range(n_images):
    image_path = f"../data/images/{i+1:05d}.png"
    images[i,:,:,:] = cv2.cvtColor(cv2.imread(image_path),cv2.COLOR_BGR2RGB)

# load the labels
clean_labels = np.genfromtxt('../data/clean_labels.csv', delimiter=',', dtype="int8")
noisy_labels = np.genfromtxt('../data/noisy_labels.csv', delimiter=',', dtype="int8")

In [265]:
test_ratio: float = 0.2
train_size: float = n_images - (n_clean * test_ratio)
clean_noisy_ratio: float = 1 / 9
train_clean_size: int = int(np.floor(train_size * clean_noisy_ratio))
val_clean_size: int = int(np.floor((n_clean * (1 - test_ratio)) - train_clean_size))
test_clean_size: int = n_clean - train_clean_size - val_clean_size

IMG_SIZE: int = 32
IMG_SHAPE: tuple = (IMG_SIZE, IMG_SIZE, 3)

BATCH_SIZE: int = 32

In [9]:
images_normalized = tf.cast(images, dtype = tf.float32) / 255.0
clean_labels_one_hot = tf.one_hot(clean_labels, depth = 10)
noisy_labels_one_hot = tf.one_hot(noisy_labels, depth = 10)

In [109]:
T = tf.data.Dataset.from_tensor_slices((images_normalized, noisy_labels_one_hot))
V = tf.data.Dataset.from_tensor_slices((
    (images_normalized[:n_clean],
    noisy_labels_one_hot[:n_clean]),
    clean_labels_one_hot[:n_clean]
))

In [266]:
T = T.shuffle(buffer_size = 1000)
T = T.batch(BATCH_SIZE)

In [168]:

V = V.shuffle(buffer_size = 1000, seed = 42)
V_train = V.take(train_clean_size).batch(batch_size = BATCH_SIZE)
V_val = V.skip(train_clean_size).take(val_clean_size).batch(batch_size = BATCH_SIZE)
V_test = V.skip(train_clean_size + val_clean_size).take(test_clean_size).batch(batch_size = BATCH_SIZE)

In [160]:
@tf.function
def l1_loss(y_true, y_pred):
    return tf.reduce_sum(tf.abs(y_true - y_pred))

In [259]:
cleaner = LabelCleaner(cnn_2)
cleaner.compile(
    optimizer = tf.keras.optimizers.Adam(0.001),
    loss = l1_loss,
    metrics = ['accuracy']
)

In [260]:
cleaner.fit(
    V_train,
    epochs = 10,
    validation_data = V_val
)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x1f824652760>

In [261]:
cleaner.evaluate(V_test)



[36.179046630859375, 0.38350000977516174]

In [173]:
cnn_2.trainable = False
image_classifier = build_image_classifier(cnn_2)

In [262]:
cleaned_labels = cleaner.predict([images, noisy_labels_one_hot], batch_size=128)



In [263]:
def get_custom_cross_entropy(n_verified, n_cleaned):
    pi = tf.Variable(tf.ones((n_verified, 10)), dtype = tf.float32)
    pj = tf.Variable(tf.ones((n_cleaned, 10)), dtype = tf.float32)

    def cross_entropy(y_true, y_pred):
        y_true = tf.cast(y_true, dtype = tf.float32)
        y_pred = tf.cast(y_pred, dtype = tf.float32)
        a = tf.reduce_sum(pi * tf.math.log(y_true), axis = 1)
        b = tf.reduce_sum(pj * tf.math.log(y_pred), axis = 1)

        loss = -a-b
        return loss
    return tf.function(cross_entropy)

custom_cross_entropy = get_custom_cross_entropy(len(V), len(cleaned_labels))


In [270]:
get_custom_cross_entropy(5, 5)([1,0,0,1,0,0,1,0,0,1], [0,1,0,0,1,1,0,0,1,0])

<tf.Tensor: shape=(5,), dtype=float32, numpy=array([inf, inf, inf, inf, inf], dtype=float32)>

In [271]:
tf.cast([1,0,0,1,0,0,1,0,0,1], dtype = tf.float32)

<tf.Tensor: shape=(10,), dtype=float32, numpy=array([1., 0., 0., 1., 0., 0., 1., 0., 0., 1.], dtype=float32)>

In [254]:
image_classifier.compile(
    loss = tf.keras.losses.CategoricalCrossentropy(),
    optimizer=tf.keras.optimizers.Adam(0.001),
    metrics=['accuracy']
)

In [268]:
image_classifier.fit(
    T,
    epochs = 10,
)

Epoch 1/10
Epoch 2/10

KeyboardInterrupt: 