In [2]:
# Import required packages
import numpy as np
import cv2
import matplotlib.pyplot as plt
import tensorflow
import tensorflow.keras as keras
from keras import layers
from keras.models import Sequential
from keras.layers import Dense, Conv2D, Flatten, MaxPooling2D
from keras.constraints import maxnorm
from keras.wrappers.scikit_learn import KerasClassifier
from sklearn.model_selection import train_test_split, KFold, cross_val_score
from sklearn.metrics import classification_report
from sklearn.linear_model import LogisticRegression
from sklearn import metrics
import timeit

In [3]:
# [DO NOT MODIFY THIS CELL]

# load the images
n_img = 50000
n_noisy = 40000
n_clean_noisy = n_img - n_noisy
imgs = np.empty((n_img,32,32,3))
for i in range(n_img):
    img_fn = f'../data/images/{i+1:05d}.png'
    imgs[i,:,:,:]=cv2.cvtColor(cv2.imread(img_fn),cv2.COLOR_BGR2RGB)

# load the labels
clean_labels = np.genfromtxt('../data/clean_labels.csv', delimiter=',', dtype="int8")
noisy_labels = np.genfromtxt('../data/noisy_labels.csv', delimiter=',', dtype="int8")

### 2.2. Model I

In [3]:
# train_valid_test split
imgs_train, imgs_test, labels_train, labels_test  = train_test_split(imgs, noisy_labels, test_size=0.2, random_state=50)
imgs_train, imgs_valid, labels_train, labels_valid  = train_test_split(imgs_train, labels_train, test_size=0.25, random_state=20)

In [4]:
# Normalize x
X_train = np.array(imgs_train) / 255
X_valid = np.array(imgs_valid) / 255
X_test = np.array(imgs_test) / 255

In [5]:
# CNN
def model_I(image):
    '''
    This function should takes in the image of dimension 32*32*3 as input and returns a label prediction
    '''
    #create model
    model = Sequential()
    #add model layers
    model.add(Conv2D(32, (3,3), padding="same", activation="relu", input_shape=(32, 32, 3)))
    model.add(MaxPooling2D(2, 2))
    model.add(Conv2D(64, (3,3), padding="same", activation="relu"))
    model.add(MaxPooling2D(2, 2))
    model.add(Conv2D(64, (3,3), padding="same", activation="relu"))
    model.add(Flatten())
    model.add(Dense(64, activation="relu"))
    model.add(Dense(10))
    #compile model using accuracy to measure model performance
    model.compile(optimizer='adam', loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=['accuracy'])
    #train the model
    history = model.fit(X_train, labels_train, epochs=10, validation_data=(X_valid, labels_valid))
    #predict
    X_test = np.array(image)/255
    return np.round(model.predict(X_test))

In [None]:
    #compile model using accuracy to measure model performance
    model.compile(optimizer='adam', loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=['accuracy'])
    #train the model
    history = model.fit(X_train_2, labels_train, epochs=10)
    #predict
    X_test = np.array(image)/255    
    label = model.predict(X_test)
    model.save('model2.h5')
    label = np.argmax(np.round(label), axis=1)
    return label

In [6]:
# test for CNN (less than 10 min)
start = timeit.default_timer()
history = model_I(imgs_test)
stop = timeit.default_timer()
print('Time: ', stop - start, 'seconds')

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Time:  511.7168006999999 seconds


### 2.3. Model II

In [4]:
# train CNN using clean labels
imgs_train_2, imgs_test_2, labels_train, labels_test  = train_test_split(imgs[2000:10000], clean_labels[2000:], test_size=0.2, random_state=1)
imgs_train_2, imgs_valid_2, labels_train, labels_valid  = train_test_split(imgs_train_2, labels_train, test_size=0.25, random_state=1)

In [5]:
# Normalize x
X_train_2 = np.array(imgs_train_2) / 255
X_valid_2 = np.array(imgs_valid_2) / 255
X_test_2 = np.array(imgs_test_2) / 255

In [6]:
def model_II(image):
    '''
    This function should takes in the image of dimension 32*32*3 as input and returns a label prediction
    '''
    #create model
    model = Sequential()
    #add model layers
    model.add(Conv2D(32, (3,3), padding="same", activation="relu", input_shape=(32, 32, 3)))
    model.add(MaxPooling2D(2, 2))
    model.add(Conv2D(64, (3,3), padding="same", activation="relu"))
    model.add(MaxPooling2D(2, 2))
    model.add(Conv2D(64, (3,3), padding="same", activation="relu"))
    model.add(Flatten())
    model.add(Dense(64, activation="relu"))
    model.add(Dense(10))
    #compile model using accuracy to measure model performance
    model.compile(optimizer='adam', loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=['accuracy'])
    #train the model
    history = model.fit(X_train_2, labels_train, epochs=10, validation_data=(X_valid_2, labels_valid))
    #predict
    X_test = np.array(image)/255
    return np.round(model.predict(X_test))

In [7]:
# test (less than 1 min)
start = timeit.default_timer()
history = model_II(imgs_test_2)
stop = timeit.default_timer()
print('Time: ', stop - start, 'seconds')

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Time:  64.32693 seconds


In [8]:
noisy_x = imgs[10000:].reshape(-1, 32,32,3)
noisy_x = np.array(noisy_x) / 255

x_label = np.argmax(model_II(noisy_x), axis=1)
new_labels = np.append(clean_labels[2000:],x_label)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [18]:
# train & test datasets splits
x_train, x_test, y_train, y_test  = train_test_split(imgs[2000:], new_labels, test_size = 0.2, shuffle = False)
x_train, x_valid, y_train, y_valid  = train_test_split(x_train, y_train, test_size=0.25, random_state=1)


In [19]:
# Normalize x
x_train = np.array(x_train) / 255
x_valid = np.array(x_valid) / 255
x_test = np.array(x_test) / 255

In [20]:
def model_III(image):
    '''
    This function should takes in the image of dimension 32*32*3 as input and returns a label prediction
    '''
    #create model
    model = Sequential()
    #add model layers
    model.add(Conv2D(32, (3,3), padding="same", activation="relu", input_shape=(32, 32, 3)))
    model.add(MaxPooling2D(2, 2))
    model.add(Conv2D(64, (3,3), padding="same", activation="relu"))
    model.add(MaxPooling2D(2, 2))
    model.add(Conv2D(64, (3,3), padding="same", activation="relu"))
    model.add(Flatten())
    model.add(Dense(64, activation="relu"))
    model.add(Dense(10))
    #compile model using accuracy to measure model performance
    model.compile(optimizer='adam', loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=['accuracy'])
    #train the model
    history = model.fit(x_train, y_train, epochs=10, validation_data=(x_valid, y_valid))
    #predict
    X_test = np.array(image)/255
    return np.round(model.predict(X_test))

In [21]:
# test (less than 1 min)
start = timeit.default_timer()
history = model_III(x_test)
stop = timeit.default_timer()
print('Time: ', stop - start, 'seconds')

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Time:  386.06937640000024 seconds


## 3. Evaluation

For assessment, we will evaluate your final model on a hidden test dataset with clean labels by the `evaluation` function defined as follows. Although you will not have the access to the test set, the function would be useful for the model developments. For example, you can split the small training set, using one portion for weakly supervised learning and the other for validation purpose. 

In [22]:
# [DO NOT MODIFY THIS CELL]
def evaluation(model, test_labels, test_imgs):
    y_true = test_labels
    y_pred = []
    for image in test_imgs:
        y_pred.append(model(image))
    print(classification_report(y_true, y_pred))

In [31]:
# [DO NOT MODIFY THIS CELL]
# This is the code for evaluating the prediction performance on a testset
# You will get an error if running this cell, as you do not have the testset
# Nonetheless, you can create your own validation set to run the evlauation
n_test = 10000
test_labels = np.genfromtxt('../data/test_labels.csv', delimiter=',', dtype="int8")
test_imgs = np.empty((n_test,32,32,3))
for i in range(n_test):
    img_fn = f'../data/test_images/test{i+1:05d}.png'
    test_imgs[i,:,:,:]=cv2.cvtColor(cv2.imread(img_fn),cv2.COLOR_BGR2RGB)
evaluation(baseline_model, test_labels, test_imgs)

OSError: ../data/test_labels.csv not found.

The overall accuracy is $0.24$, which is better than random guess (which should have a accuracy around $0.10$). For the project, you should try to improve the performance by the following strategies:

- Consider a better choice of model architectures, hyperparameters, or training scheme for the predictive model;
- Use both `clean_noisy_trainset` and `noisy_trainset` for model training via **weakly supervised learning** methods. One possible solution is to train a "label-correction" model using the former, correct the labels in the latter, and train the final predictive model using the corrected dataset.
- Apply techniques such as $k$-fold cross validation to avoid overfitting;
- Any other reasonable strategies.

In [25]:
evaluation(model_III, clean_labels[0:2000], imgs[0:2000])

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10

KeyboardInterrupt: 