In [None]:
import os
import cv2
import random
import numpy as np
import tensorflow as tf
from tensorflow import keras
import matplotlib.pyplot as plt
from sklearn.model_selection import KFold, StratifiedKFold

In [None]:
def create_dataset(dir, label, train, test):
    temp = []
    count = 0
    for pics in os.listdir(dir):
        img = cv2.imread(os.path.join(dir, pics), cv2.IMREAD_COLOR)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = cv2.resize(img, (220,220))
        temp.append([img, label])           # using int: sparse categorical
        count += 1
    length = int(count * 0.1)
    train += temp[length:]
    test += temp[:length]
    print(folder, count)

In [None]:
TEST = []
TEST.clear()
TRAINVAL = []
TRAINVAL.clear()

path = "/content/GarbageClassification/img/own"
for folder in os.listdir(path):
    count = 0
    temp = []
    if "trash" in folder:
        dir = os.path.join(path, folder)
        create_dataset(dir, 4, TRAINVAL, TEST)
    else:
        dir = os.path.join(path, folder)
        for sub in os.listdir(dir):
            fold = os.path.join(dir, sub)
            if "glass" in folder:
                create_dataset(fold, 0, TRAINVAL, TEST)
            elif "metal" in folder:
                create_dataset(fold, 1, TRAINVAL, TEST)
            elif "paper" in folder:
                create_dataset(fold, 2, TRAINVAL, TEST)
            elif "plastic" in folder:
                create_dataset(fold, 3, TRAINVAL, TEST)
        print(folder, count)

print("TRAINVAL:", len(TRAINVAL))
print("TEST:", len(TEST))

In [None]:
TRAINVAL = np.asarray(TRAINVAL)
TEST = np.asarray(TEST)

# training data augmentation
augmentation = tf.keras.Sequential([
    tf.keras.layers.experimental.preprocessing.RandomFlip(mode="horizontal_and_vertical", seed=18),
    tf.keras.layers.experimental.preprocessing.RandomRotation(factor=0.2, seed=18),
    tf.keras.layers.experimental.preprocessing.RandomContrast(factor=0.5, seed=18)
])

added = []
for i in range(len(TRAINVAL)):
    image = tf.expand_dims(TRAINVAL[i][0], axis=0)
    augmented = augmentation(image)
    augmented = np.asarray(augmented[0])
    added.append([augmented, TRAINVAL[i][1]])

added = np.asarray(added)

# # add augmented data to training data and shuffle
print(TRAINVAL.shape, added.shape)
TRAINVAL = np.concatenate((TRAINVAL, added))
print(TRAINVAL.shape)
np.random.shuffle(TRAINVAL)

In [None]:
# separate train and val
length = int(len(TRAINVAL) * 0.1)
train = TRAINVAL[length:]
val = TRAINVAL[:length]

# separate feature and label
TRAINVAL = []   # clear array
xtrain = []
ytrain = []
for feature, label in train:
    xtrain.append(feature)
    ytrain.append(label)
xtrain = np.asarray(xtrain)
ytrain = np.asarray(ytrain)

xval = []
yval = []
for feature, label in val:
    xval.append(feature)
    yval.append(label)
xval = np.asarray(xval)
yval = np.asarray(yval)

xtest = []
ytest = []
for feature, label in TEST:
    xtest.append(feature)
    ytest.append(label)
xtest = np.asarray(xtest)
ytest = np.asarray(ytest)
TEST = []       # clear array

print(xtrain.shape, ytrain.shape)
print(xval.shape, yval.shape)
print(xtest.shape, ytest.shape)

In [None]:
resnet = tf.keras.applications.MobileNet(
    include_top = False, 
    weights = 'imagenet',
    input_shape = (220,220,3)
)

model = tf.keras.models.Sequential()
model.add(resnet)
model.add(tf.keras.layers.GlobalAveragePooling2D())
model.add(tf.keras.layers.Dropout(0.2))
model.add(tf.keras.layers.Flatten())
model.add(tf.keras.layers.Dense(1024, activation='relu'))
model.add(tf.keras.layers.Dropout(0.2))
model.add(tf.keras.layers.Dense(5, activation='softmax'))

model.summary()

model.compile(
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.00001), 
    loss = 'sparse_categorical_crossentropy', 
    metrics = ['accuracy']
)

In [None]:
callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', mode='min', verbose=1, restore_best_weights=True, patience=3)

history = model.fit(xtrain, ytrain, 
                    batch_size=32, 
                    epochs=30,
                    callbacks=callback, 
                    validation_data=(xval,yval))

path = F"/content/gdrive/MyDrive/Colab Notebooks/saved_model/10 - Resnet/mobilenet4.h5"     # path to save model
model.save(path)

# accuracy graph
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

# loss graph
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
prediction = model.predict(xtest)

correct = 0
wrong_index = []
predicted_index = []

for i in range(len(prediction)):
    temp = np.argmax(prediction[i])
    if (temp == ytest[i]):
        correct += 1
    else:
        wrong_index.append(i)
        predicted_index.append(temp)

print("accuracy: ", correct / len(xtest) * 100)
print(wrong_index)
print(predicted_index)

In [None]:
cols = 6
rows = int(len(wrong_index) / cols) + 1
axes = []
figs = plt.figure(figsize=(17,10))

labels = ["glass", "metal", "paper", "plastic", "trash"]

for i in range(rows * cols):
    if(i < len(wrong_index)):
        axes.append(figs.add_subplot(rows, cols, i+1))
        pic = xtest[wrong_index[i]]
        name = labels[predicted_index[i]]
        axes[-1].set_title(name)
        plt.imshow(pic)
plt.show()