In [1]:
import zipfile
import os
import random
import shutil
import pandas as pd
import numpy as np
import seaborn as sns
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import matplotlib.pyplot as plt
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from tensorflow.keras.preprocessing import image
from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input
from tensorflow.keras.applications.resnet50 import ResNet50
from sklearn.model_selection import train_test_split
from PIL import Image as im
from glob import glob
import cv2
import tkinter as tk
from tkinter import filedialog
import re
from tensorflow import keras
from tensorflow.keras import layers




In [2]:
data_path = 'dataset/'

In [3]:
#Add class name prefix to filename. So for example "/paper104.jpg" become "paper/paper104.jpg"
def add_class_name_prefix(df, col_name):
    df[col_name] = df[col_name].apply(lambda x: x[:re.search("\d",x).start()] + '/' + x)
    return df

In [4]:
categories = {}
i = 0
for dirname, _, filenames in os.walk('dataset'):
    if filenames:  # Memastikan hanya direktori dengan file yang diproses
        categories[i] = os.path.basename(dirname)
        i += 1

print(categories)
print('defining constants successful!')

{0: 'botol-plastik', 1: 'daun-basah', 2: 'daun-kering', 3: 'kertas-bungkus', 4: 'kertas-koran', 5: 'logam-ferro', 6: 'logam-non-ferro', 7: 'plastik-sampul'}
defining constants successful!


In [5]:
# Add class name prefix to filename. So for example "/paper104.jpg" become "paper/paper104.jpg"
def add_class_name_prefix(df, col_name):
    df[col_name] = df[col_name].apply(lambda x: x[:re.search("\d",x).start()] + '/' + x)
    return df

# list conatining all the filenames in the dataset
filenames_list = []
# list to store the corresponding category, note that each folder of the dataset has one class of data
categories_list = []

for category in categories:
    filenames = os.listdir(data_path + categories[category])
    filenames_list = filenames_list  + filenames
    categories_list = categories_list + [category] * len(filenames)
    
df = pd.DataFrame({
    'filename': filenames_list,
    'category': categories_list
})

df = add_class_name_prefix(df, 'filename')

# Shuffle the dataframe
df = df.sample(frac=1).reset_index(drop=True)

print('number of elements = ' , len(df))

number of elements =  1600


In [None]:
df.head()

In [None]:
df.info()

In [8]:
# see sample image, you can run the same cell again to get a different image
random_row = random.randint(0, len(df)-1)
sample = df.iloc[random_row]
randomimage = tf.keras.utils.load_img(data_path +sample['filename'])

In [None]:
df_visualization = df.copy()
# Change the catgegories from numbers to names
df_visualization['category'] = df_visualization['category'].apply(lambda x:categories[x] )

df_visualization['category'].value_counts().plot.bar(x = 'count', y = 'category' )

plt.xlabel("Garbage Classes", labelpad=14)
plt.ylabel("Images Count", labelpad=14)
plt.title("Count of images per class", y=1.02);

In [None]:
#Change the categories from numbers to names
df["category"] = df["category"].replace(categories) 

# We first split the data into two sets and then split the validate_df to two sets
train_df, validate_df = train_test_split(df, test_size=0.2, random_state=42)
validate_df, test_df = train_test_split(validate_df, test_size=0.3, random_state=42)

train_df = train_df.reset_index(drop=True)
validate_df = validate_df.reset_index(drop=True)
test_df = test_df.reset_index(drop=True)

total_train = train_df.shape[0]
total_validate = validate_df.shape[0]

print('train size = ', total_validate , 'validate size = ', total_validate, 'test size = ', test_df.shape[0])

In [11]:
IMAGE_WIDTH = 224    
IMAGE_HEIGHT = 224
IMAGE_SIZE=(IMAGE_WIDTH, IMAGE_HEIGHT)
IMAGE_CHANNELS = 3

In [None]:

batch_size=64

train_datagen = ImageDataGenerator(
    rotation_range=30,
    shear_range=0.1,
    zoom_range=0.3,
    horizontal_flip=True,
    vertical_flip=True,
    width_shift_range=0.2,
    height_shift_range=0.2
)


train_generator = train_datagen.flow_from_dataframe(
    train_df, 
    data_path, 
    x_col='filename',
    y_col='category',
    target_size=IMAGE_SIZE,
    class_mode='categorical',
    batch_size=batch_size
)

In [None]:
test_datagen = ImageDataGenerator(rescale=1./255)
test_generator = test_datagen.flow_from_dataframe(
    test_df, 
    data_path, 
    x_col='filename',
    y_col='category',
    target_size=IMAGE_SIZE,
    class_mode='categorical',
    batch_size=batch_size
)

In [None]:
validation_datagen = ImageDataGenerator()

validation_generator = validation_datagen.flow_from_dataframe(
    validate_df, 
    data_path, 
    x_col='filename',
    y_col='category',
    target_size=IMAGE_SIZE,
    class_mode='categorical',
    batch_size=batch_size
)

In [None]:
class_names = train_df.category.unique()
print(class_names)

In [None]:
plt.figure(figsize=(15,15))
for i in range(9):
    random_row = random.randint(0, len(df)-1)
    sample = df.iloc[random_row]
    random_image = tf.keras.utils.load_img(data_path + sample['filename'])
    plt.subplot(330 + 1 + i)
    plt.title(sample['category'])
    plt.imshow(random_image)
plt.show()

In [None]:
plt.figure(figsize=(10, 5))
sns.countplot(x="category", data=df, palette='Blues')
plt.xticks(rotation=90)
plt.title('Categories')
plt.show()

In [None]:
# Create the base model from the pre-trained model MobileNet V2
IMG_SHAPE = (IMAGE_HEIGHT, IMAGE_WIDTH, 3)
base_model = tf.keras.applications.VGG16(input_shape = IMG_SHAPE,
                                         include_top = False,
                                         weights = 'imagenet')
#base_model.trainable = False
base_model.summary()

In [19]:
def print_layer_trainable():
    for layer in base_model.layers:
        print("{0}:\t{1}".format(layer.trainable, layer.name))

In [None]:
print_layer_trainable()

In [21]:
base_model.trainable = False
for layer in base_model.layers:
    layer.trainable = False

In [None]:
print_layer_trainable()

In [23]:
# Data augmentation layer
data_augmentation = keras.Sequential([
    keras.layers.RandomFlip('horizontal', input_shape=(IMAGE_HEIGHT, IMAGE_WIDTH, 3)),
    keras.layers.RandomRotation(0.2, fill_mode='nearest'),
    keras.layers.RandomZoom(0.1),
])

In [24]:
n_classes = len(class_names)

model = tf.keras.Sequential([
    data_augmentation,
    layers.Rescaling(1./255),
    layers.Conv2D(32, 3, activation='relu'),
    layers.MaxPooling2D(),
    layers.Conv2D(64, 3, activation='relu'),
    layers.MaxPooling2D(),
    layers.Conv2D(128, 3, activation='relu'),
    layers.MaxPooling2D(),
    layers.Flatten(),
    layers.Dense(128, activation='relu'),
    layers.Dropout(0.5),
    layers.Dense(n_classes, activation='softmax')
])

In [None]:
model.summary()

In [26]:
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), 
              loss='categorical_crossentropy', 
              metrics=['accuracy'])

In [27]:
# Model Chackpoint
tl_checkpoint_1 = ModelCheckpoint(filepath = 'vgg16_best_weights.keras', save_best_only = True, verbose = 0)

# EarlyStopping
early_stop = EarlyStopping(monitor = 'val_loss', patience = 5, restore_best_weights = True, mode = 'min')

#ReduceLROnPlateau to stabilize the training process of the model
rop_callback = ReduceLROnPlateau(monitor = 'val_loss', patience = 3, verbose = 1, factor = 0.5, min_lr = 0.000001)

In [None]:
%%time
history = model.fit(train_generator,
                    epochs = 50,
                    validation_data = validation_generator,
                    callbacks = [tl_checkpoint_1, early_stop, rop_callback])

In [None]:
# Ambil jumlah epochs dari history
epochs_range = range(len(history.history['accuracy']))

plt.figure(figsize=(20, 8))

# Plot Training and Validation Accuracy
plt.subplot(1, 2, 1)
plt.plot(epochs_range, history.history['accuracy'], label='Training Accuracy')
plt.plot(epochs_range, history.history['val_accuracy'], label='Validation Accuracy')
plt.legend(loc='lower right')
plt.title('Training and Validation Accuracy')

# Plot Training and Validation Loss
plt.subplot(1, 2, 2)
plt.plot(epochs_range, history.history['loss'], label='Training Loss')
plt.plot(epochs_range, history.history['val_loss'], label='Validation Loss')
plt.legend(loc='upper right')
plt.title('Training and Validation Loss')

plt.show()

In [None]:
test_datagen = ImageDataGenerator()

test_generator = test_datagen.flow_from_dataframe(
    dataframe= test_df,
    directory=data_path,
    x_col='filename',
    y_col='category',
    target_size=IMAGE_SIZE,
    color_mode="rgb",
    class_mode="categorical",
    batch_size=1,
    shuffle=False 
)

In [None]:
filenames = test_generator.filenames
nb_samples = len(filenames)

_, accuracy = model.evaluate(test_generator, steps=nb_samples // test_generator.batch_size)

print('Accuracy on test set = ', round((accuracy * 100), 2), '%')

In [None]:
gen_label_map = test_generator.class_indices
gen_label_map = dict((v,k) for k,v in gen_label_map.items())
print(gen_label_map)

In [None]:
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns

y_pred = model.predict(test_generator)
y_pred_classes = np.argmax(y_pred, axis=1)
y_true = test_generator.classes

cm = confusion_matrix(y_true, y_pred_classes)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.show()

print(classification_report(y_true, y_pred_classes, target_names=class_names))


In [None]:
model.save('garbage_model.h5')

In [35]:
# fine_tune  = base_model
# fine_tune.trainable = True

In [36]:
# for layer in fine_tune.layers:
#     # Boolean whether this layer is trainable.
#     trainable = ('block5' in layer.name or 'block4' in layer.name)
    
#     # Set the layer's bool.
#     layer.trainable = trainable

In [37]:
# fine_tune.summary()


In [38]:
# n_classes = len(class_names)

# model2 = Sequential([
#     data_augmentation,
#     keras.layers.Rescaling(1./255),
#     fine_tune,
#     keras.layers.GlobalAveragePooling2D(),
#     keras.layers.Dense(128, activation = 'relu'),
#     keras.layers.Dropout(0.5),
#     keras.layers.Dense(n_classes, activation = 'softmax')
# ])

In [39]:
# model2.summary()

In [40]:
# model2.compile(optimizer = tf.keras.optimizers.Adam(learning_rate = 0.001), 
#               loss = 'categorical_crossentropy', 
#               metrics = ['accuracy'])

In [41]:
# # Model Chackpoint
# tl_checkpoint_1 = ModelCheckpoint(filepath = 'vgg16_best_weights_fine_tuning.hdf5', save_best_only = True, verbose = 0)

# # EarlyStopping
# early_stop = EarlyStopping(monitor = 'val_loss', patience = 10, restore_best_weights = True, mode = 'min')

# #ReduceLROnPlateau to stabilize the training process of the model
# rop_callback = ReduceLROnPlateau(monitor = 'val_loss', patience = 3, verbose = 1, factor = 0.5, min_lr = 0.000001)

In [42]:
# %%time
# history = model2.fit(train_generator,
#                     epochs = 20,
#                     validation_data = validation_generator,
#                     callbacks = [tl_checkpoint_1, early_stop, rop_callback])

In [43]:
# import tensorflow as tf
# new_model = tf.keras.models.load_model('model_garbage.keras')

# # Show the model architecture
# new_model.summary()