In [None]:
import os
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping, ModelCheckpoint, TensorBoard
from datetime import datetime
from google.colab import drive
import random

In [None]:
drive.mount('/content/drive')

In [None]:
rar_path = '/content/drive/My Drive/TrashNet/TrashNet.rar'
extract_path = '/content/dataset'

os.makedirs(extract_path, exist_ok=True)

!unrar x -o+ "{rar_path}" "{extract_path}"

In [None]:
base_dir = 'dataset/TrashNet'
classes = ['cardboard', 'glass', 'metal', 'paper', 'plastic', 'trash']

train_dir = os.path.join(base_dir, 'train')
validation_dir = os.path.join(base_dir, 'validation')

os.makedirs(train_dir, exist_ok=True)
os.makedirs(validation_dir, exist_ok=True)

In [None]:
def is_valid_image(file_path):
    try:
        with Image.open(file_path) as img:
            img.verify()
        return True
    except (IOError, SyntaxError) as e:
        return False

In [None]:
def convert_image(file_path):
    try:
        with Image.open(file_path) as img:
            if img.mode in ("P", "RGBA"):
                img = img.convert("RGBA")
            else:
                img = img.convert("RGB")
            img.save(file_path)
    except (IOError, SyntaxError) as e:
        os.remove(file_path)

In [None]:
for cls in classes:
    class_dir = os.path.join(base_dir, cls)
    for filename in os.listdir(class_dir):
        file_path = os.path.join(class_dir, filename)
        if filename.lower().endswith(('.jpg', '.JPG')):
            if is_valid_image(file_path):
                convert_image(file_path)
            else:
                os.remove(file_path)
        else:
            os.remove(file_path)

In [None]:
split_ratio = 0.8

for cls in classes:
    class_dir = os.path.join(base_dir, cls)
    images = os.listdir(class_dir)
    random.shuffle(images)

    split_point = int(split_ratio * len(images))
    train_images = images[:split_point]
    validation_images = images[split_point:]

    for img in train_images:
        source = os.path.join(class_dir, img)
        destination = os.path.join(train_dir, cls, img)
        os.makedirs(os.path.join(train_dir, cls), exist_ok=True)
        shutil.move(source, destination)

    for img in validation_images:
        source = os.path.join(class_dir, img)
        destination = os.path.join(validation_dir, cls, img)
        os.makedirs(os.path.join(validation_dir, cls), exist_ok=True)
        if img not in train_images:  # Sprawdź, czy obraz nie został już użyty w zbiorze treningowym
            shutil.move(source, destination)

In [None]:
base_path = '/content/dataset/TrashNet'
for cls in classes:
    folder_path = os.path.join(base_path, cls)
    if os.path.exists(folder_path):
        shutil.rmtree(folder_path)

In [None]:
# Data generators
train_datagen = ImageDataGenerator(
    rescale=1./255,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True
)

test_datagen = ImageDataGenerator(rescale=1./255)

train_generator = train_datagen.flow_from_directory(
    train_dir,
    target_size=(224, 224),
    batch_size=32,
    class_mode='categorical'
)

validation_generator = test_datagen.flow_from_directory(
    validation_dir,
    target_size=(224, 224),
    batch_size=32,
    class_mode='categorical'
)

num_classes = len(train_generator.class_indices)

In [None]:
# Define TensorBoard callback
def get_tensorboard_callback(model_name):
    log_dir = f"logs/{model_name}_" + datetime.now().strftime("%Y%m%d-%H%M%S")
    tensorboard_callback = TensorBoard(log_dir=log_dir, histogram_freq=1)
    return tensorboard_callback

# CNN MODEL


In [None]:
# Simple CNN Model
def simple_cnn_model(input_shape=(224, 224, 3), classes=num_classes):
    model = tf.keras.models.Sequential([
        tf.keras.layers.Conv2D(32, (3, 3), activation='relu', input_shape=input_shape),
        tf.keras.layers.MaxPooling2D((2, 2)),
        tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
        tf.keras.layers.MaxPooling2D((2, 2)),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dense(classes, activation='softmax')
    ])
    return model

In [None]:
model_cnn = simple_cnn_model()
model_cnn.compile(optimizer=Adam(), loss='categorical_crossentropy', metrics=['accuracy'])
model_cnn.summary()

In [None]:
callbacks = [
    EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True),
    ModelCheckpoint('simple_cnn_best_model.h5', monitor='val_loss', save_best_only=True),
    ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=0.00001),
    get_tensorboard_callback('simple_cnn')
]

In [None]:
history_cnn = model_cnn.fit(
    train_generator,
    steps_per_epoch=train_generator.samples // train_generator.batch_size,
    epochs=25,
    validation_data=validation_generator,
    validation_steps=validation_generator.samples // validation_generator.batch_size,
    callbacks=callbacks,
    shuffle=True
)

model_cnn.save('simple_cnn_final_model.h5')  # Save final model

# VGG MODEL

In [None]:
# VGG Model
def VGG19(input_shape=(224, 224, 3), classes=num_classes):
    model = tf.keras.models.Sequential()
    model.add(tf.keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same', input_shape=input_shape))
    model.add(tf.keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same'))
    model.add(tf.keras.layers.MaxPooling2D((2, 2), strides=(2, 2)))
    model.add(tf.keras.layers.BatchNormalization())

    # Block 2
    model.add(tf.keras.layers.Conv2D(128, (3, 3), activation='relu', padding='same'))
    model.add(tf.keras.layers.Conv2D(128, (3, 3), activation='relu', padding='same'))
    model.add(tf.keras.layers.MaxPooling2D((2, 2), strides=(2, 2)))
    model.add(tf.keras.layers.BatchNormalization())

    # Block 3
    model.add(tf.keras.layers.Conv2D(256, (3, 3), activation='relu', padding='same'))
    model.add(tf.keras.layers.Conv2D(256, (3, 3), activation='relu', padding='same'))
    model.add(tf.keras.layers.Conv2D(256, (3, 3), activation='relu', padding='same'))
    model.add(tf.keras.layers.MaxPooling2D((2, 2), strides=(2, 2)))
    model.add(tf.keras.layers.BatchNormalization())

    # Block 4
    model.add(tf.keras.layers.Conv2D(512, (3, 3), activation='relu', padding='same'))
    model.add(tf.keras.layers.Conv2D(512, (3, 3), activation='relu', padding='same'))
    model.add(tf.keras.layers.Conv2D(512, (3, 3), activation='relu', padding='same'))
    model.add(tf.keras.layers.MaxPooling2D((2, 2), strides=(2, 2)))
    model.add(tf.keras.layers.BatchNormalization())

    # Block 5
    model.add(tf.keras.layers.Conv2D(512, (3, 3), activation='relu', padding='same'))
    model.add(tf.keras.layers.Conv2D(512, (3, 3), activation='relu', padding='same'))
    model.add(tf.keras.layers.Conv2D(512, (3, 3), activation='relu', padding='same'))
    model.add(tf.keras.layers.MaxPooling2D((2, 2), strides=(2, 2)))
    model.add(tf.keras.layers.BatchNormalization())

    # Flattening
    model.add(tf.keras.layers.Flatten())

    # Fully connected layers
    model.add(tf.keras.layers.Dense(4096, activation='relu'))
    model.add(tf.keras.layers.Dropout(0.5))
    model.add(tf.keras.layers.Dense(4096, activation='relu'))
    model.add(tf.keras.layers.Dropout(0.5))
    model.add(tf.keras.layers.Dense(classes, activation='softmax'))

    return model

In [None]:
model_vgg = VGG19()
model_vgg.compile(optimizer=Adam(learning_rate=0.0001), loss='categorical_crossentropy', metrics=['accuracy'])
model_vgg.summary()

In [None]:
callbacks = [
    EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True),
    ModelCheckpoint('vgg19_best_model.h5', monitor='val_loss', save_best_only=True),
    ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=0.00001),
    get_tensorboard_callback('vgg19')
]

In [None]:
history_vgg = model_vgg.fit(
    train_generator,
    steps_per_epoch=train_generator.samples // train_generator.batch_size,
    epochs=25,
    validation_data=validation_generator,
    validation_steps=validation_generator.samples // validation_generator.batch_size,
    callbacks=callbacks,
    shuffle=True
)

model_vgg.save('vgg19_final_model.h5')  # Save final model

# CUSTOM RESNET50 MODEL

In [None]:
# Custom ResNet Model
def identity_block(X, f, filters):
    F1, F2, F3 = filters
    X_shortcut = X

    X = Conv2D(F1, (1, 1), padding='valid')(X)
    X = BatchNormalization(axis=3)(X)
    X = Activation('relu')(X)

    X = Conv2D(F2, (f, f), padding='same')(X)
    X = BatchNormalization(axis=3)(X)
    X = Activation('relu')(X)

    X = Conv2D(F3, (1, 1), padding='valid')(X)
    X = BatchNormalization(axis=3)(X)

    X = Add()([X, X_shortcut])
    X = Activation('relu')(X)

    return X

In [None]:
def convolutional_block(X, f, filters, s=2):
    F1, F2, F3 = filters
    X_shortcut = X

    X = Conv2D(F1, (1, 1), strides=(s, s), padding='valid')(X)
    X = BatchNormalization(axis=3)(X)
    X = Activation('relu')(X)

    X = Conv2D(F2, (f, f), padding='same')(X)
    X = BatchNormalization(axis=3)(X)
    X = Activation('relu')(X)

    X = Conv2D(F3, (1, 1), padding='valid')(X)
    X = BatchNormalization(axis=3)(X)

    X_shortcut = Conv2D(F3, (1, 1), strides=(s, s), padding='valid')(X_shortcut)
    X_shortcut = BatchNormalization(axis=3)(X_shortcut)

    X = Add()([X, X_shortcut])
    X = Activation('relu')(X)

    return X

In [None]:
def ResNet50(input_shape=(224, 224, 3), classes=num_classes):
    X_input = Input(input_shape)

    X = tf.keras.layers.ZeroPadding2D((3, 3))(X_input)

    X = Conv2D(64, (7, 7), strides=(2, 2), padding='valid')(X)
    X = BatchNormalization(axis=3)(X)
    X = Activation('relu')(X)
    X = tf.keras.layers.MaxPooling2D((3, 3), strides=(2, 2))(X)

    X = convolutional_block(X, f=3, filters=[64, 64, 256], s=1)
    X = identity_block(X, 3, [64, 64, 256])
    X = identity_block(X, 3, [64, 64, 256])

    X = convolutional_block(X, f=3, filters=[128, 128, 512], s=2)
    X = identity_block(X, 3, [128, 128, 512])
    X = identity_block(X, 3, [128, 128, 512])
    X = identity_block(X, 3, [128, 128, 512])

    X = convolutional_block(X, f=3, filters=[256, 256, 1024], s=2)
    X = identity_block(X, 3, [256, 256, 1024])
    X = identity_block(X, 3, [256, 256, 1024])
    X = identity_block(X, 3, [256, 256, 1024])
    X = identity_block(X, 3, [256, 256, 1024])
    X = identity_block(X, 3, [256, 256, 1024])

    X = convolutional_block(X, f=3, filters=[512, 512, 2048], s=2)
    X = identity_block(X, 3, [512, 512, 2048])
    X = identity_block(X, 3, [512, 512, 2048])

    X = GlobalAveragePooling2D()(X)

    X = Flatten()(X)
    X = Dense(1024, activation='relu')(X)
    X = Dropout(0.5)(X)
    X = Dense(classes, activation='softmax')(X)

    model = Model(inputs=X_input, outputs=X, name='ResNet50')

    return model

In [None]:
model_resnet_custom = ResNet50()
model_resnet_custom.compile(optimizer=Adam(learning_rate=0.0001), loss='categorical_crossentropy', metrics=['accuracy'])
model_resnet_custom.summary()

In [None]:
callbacks = [
    EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True),
    ModelCheckpoint('resnet50_custom_best_model.h5', monitor='val_loss', save_best_only=True),
    ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=0.00001),
    get_tensorboard_callback('resnet50_custom')
]

In [None]:
history_resnet_custom = model_resnet_custom.fit(
    train_generator,
    steps_per_epoch=train_generator.samples // train_generator.batch_size,
    epochs=25,
    validation_data=validation_generator,
    validation_steps=validation_generator.samples // validation_generator.batch_size,
    callbacks=callbacks,
    shuffle=True
)

model_resnet_custom.save('resnet50_custom_final_model.h5')  # Save final model

# BUILT-IN RESNET 50 MODEL

In [None]:
# Built-in ResNet50 Model
model_resnet50 = tf.keras.applications.ResNet50(
    include_top=True,
    weights=None,
    input_tensor=None,
    input_shape=(224, 224, 3),
    pooling=None,
    classes=num_classes,
    classifier_activation='softmax'
)

In [None]:
model_resnet50.compile(optimizer=Adam(learning_rate=0.0001), loss='categorical_crossentropy', metrics=['accuracy'])
model_resnet50.summary()


In [None]:
callbacks = [
    EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True),
    ModelCheckpoint('resnet50_builtin_best_model.h5', monitor='val_loss', save_best_only=True),
    ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=0.00001),
    get_tensorboard_callback('resnet50_builtin')
]

In [None]:
history_resnet50 = model_resnet50.fit(
    train_generator,
    steps_per_epoch=train_generator.samples // train_generator.batch_size,
    epochs=25,
    validation_data=validation_generator,
    validation_steps=validation_generator.samples // validation_generator.batch_size,
    callbacks=callbacks,
    shuffle=True
)

model_resnet50.save('resnet50_builtin_final_model.h5')  # Save final model