# CHECK GPU

In [None]:
import tensorflow as tf

# Avoid OOM errors by setting GPU Memory Consumption Growth
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus: 
    tf.config.experimental.set_memory_growth(gpu, True)

print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

from tensorflow.python.client import device_lib

def get_available_devices():
    local_device_protos = device_lib.list_local_devices()
    return [x.name for x in local_device_protos]

print(get_available_devices())


# CLEANING IMAGES

<div class="alert alert-block alert-info">
<b>Tip:</b> You can skip this part unless you add new data
</div>

Remove dodgy images also convert transparent background images

**Note:** The unwanted files will be moved to unwanted directory in case you need them

In [None]:
import os
import cv2
import imghdr
import numpy as np
from PIL import Image
import hashlib

data_dir = os.path.join('data', 'raw_data')
unwanted_dir = os.path.join('data', 'unwanted')

image_exts = ['jpeg', 'jpg', 'bmp', 'png']

# Create a dictionary to store the hashes of processed images
processed_images = {}

def calculate_image_hash(img):
    return hashlib.md5(img.tobytes()).hexdigest()

# Create the 'unwanted' directory if it doesn't exist
os.makedirs(unwanted_dir, exist_ok=True)

for image_class in os.listdir(data_dir):
    for image in os.listdir(os.path.join(data_dir, image_class)):
        image_path = os.path.join(data_dir, image_class, image)
        try:
            img = cv2.imread(image_path)
            tip = imghdr.what(image_path)
            if tip not in image_exts:
                print('Image not in ext list {}'.format(image_path))
                # Move the unwanted file to the 'unwanted' directory
                os.rename(image_path, os.path.join(unwanted_dir, image))
            else:
                # Convert palette images with transparency to RGBA using Pillow
                pil_img = Image.open(image_path)
                if 'P' in pil_img.mode and 'transparency' in pil_img.info:
                    img = cv2.cvtColor(cv2.cvtColor(img, cv2.COLOR_BGR2RGB), cv2.COLOR_RGB2RGBA)
                    
                    # Calculate the hash of the modified image
                    img_hash = calculate_image_hash(img)
                    
                    if img_hash not in processed_images:
                        # Save the modified image in the same directory
                        cv2.imwrite(image_path, img)
                        
                        # Add the image hash to the dictionary of processed images
                        processed_images[img_hash] = image_path
                    else:
                        # If the same image (based on content) exists, remove it
                        os.remove(image_path)
                        print('Duplicate image removed:', image_path)

        except Exception as e:
            print('Issue with image {}'.format(image_path))


# CHECK DATA

In [None]:
import os
import shutil
import random

data_dir = "data/raw_data/"
# Define source and destination directories
source_dirs = os.listdir(data_dir)
print('classes:', source_dirs)

# Check if the destination directories already exist
train_dest = os.path.join('data', 'train')
valid_dest = os.path.join('data', 'valid')

<div class="alert alert-block alert-info">
<b>Tip:</b> you can skip this part below also if there is already training and validation directory unless you want to delete and create a new ones
</div>

split the data from data/raw_data to training and validation data

In [None]:
#create training and validation data
train_dir_exists = os.path.exists(train_dest)
valid_dir_exists = os.path.exists(valid_dest)

if train_dir_exists or valid_dir_exists:
    # Ask the user for their choice
    choice = input("Destination directories already exist. Do you want to:\n"
                   "1. Skip the process (S)\n"
                   "2. Delete existing directories and create new ones (D)\n"
                   "Enter 'S' or 'D': ").strip().lower()

    if choice != 's' and choice != 'd':
        print("Invalid choice. Exiting.")
        exit()

    if choice == 's':
        print("Skipping the process.")
        exit()
    elif choice == 'd':
        print("Deleting existing directories and creating new ones.")

        if train_dir_exists:
            shutil.rmtree(train_dest)
        if valid_dir_exists:
            shutil.rmtree(valid_dest)

# Set the training-validation split ratio
split_ratio = 0.8

# Create destination directories
os.makedirs(train_dest, exist_ok=True)
os.makedirs(valid_dest, exist_ok=True)

# Initialize lists to track images in training and validation sets
training_images = []
validation_images = []

# Loop through source directories
for source_dir in source_dirs:
    source_dir_path = os.path.join(data_dir, source_dir)
    train_class_dest = os.path.join(train_dest, source_dir)
    valid_class_dest = os.path.join(valid_dest, source_dir)

    os.makedirs(train_class_dest, exist_ok=True)
    os.makedirs(valid_class_dest, exist_ok=True)

    files = os.listdir(source_dir_path)
    
    if len(set(files)) != len(files):
        print(f"Warning: Duplicate filenames found in {source_dir}. Please ensure all filenames are unique.")
    
    random.shuffle(files)  # Shuffle the list once

    split_point = int(len(files) * split_ratio)

    # Copy files to training and validation destinations and track them
    for file in files[:split_point]:
        image_path = os.path.join(train_class_dest, file)
        training_images.append(image_path)
        shutil.copy(os.path.join(source_dir_path, file), image_path)
    for file in files[split_point:]:
        image_path = os.path.join(valid_class_dest, file)
        validation_images.append(image_path)
        shutil.copy(os.path.join(source_dir_path, file), image_path)

# Check for duplicates between training and validation sets
duplicates = set(training_images) & set(validation_images)

if duplicates:
    print(f"Warning: Duplicates found between training and validation sets: {duplicates}")
else:
    print("No duplicates found between training and validation sets.")


# INSPECT THE IMAGE

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
import matplotlib.image as mpimg

# Define the directory for inspection (e.g., train or valid)
inspection_dir = train_dest  # You can change this to valid_dest if needed

# List the class directories within the inspection directory
class_dirs = os.listdir(inspection_dir)

# Number of rows and columns for each class collage
num_rows = len(class_dirs)  # One row per class
num_cols = 5

# Create a new figure
fig = plt.figure(figsize=(12, 12))

# Loop through each class to display a 5x5 collage
for class_index, class_dir in enumerate(class_dirs):
    class_path = os.path.join(inspection_dir, class_dir)
    class_files = os.listdir(class_path)

    num_images_to_inspect = min(num_cols, len(class_files))

    for i in range(num_images_to_inspect):
        img_path = os.path.join(class_path, class_files[i])
        img = mpimg.imread(img_path)

        # Add a subplot to the figure
        ax = fig.add_subplot(num_rows, num_cols, class_index * num_cols + i + 1)
        ax.imshow(img)
        ax.set_title(class_dir)
        ax.axis('off')

# Adjust layout and display the collage
plt.tight_layout()
plt.show()


# PREPROCESS


In [None]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import os

training_datagen = ImageDataGenerator(
    rescale=1./255.,
	rotation_range=40,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest'
)

validation_datagen = ImageDataGenerator(rescale = 1./255)

train_generator = training_datagen.flow_from_directory(
    train_dest,
    target_size=(224,224),
    class_mode='categorical',
    batch_size=32
)

validation_generator = validation_datagen.flow_from_directory(
	valid_dest,
	target_size=(224,224),
	class_mode='categorical',
    batch_size=32
)


# MODELING

In [None]:
import tensorflow_hub as hub

mobilenet_v3 = "https://tfhub.dev/google/imagenet/mobilenet_v3_large_100_224/classification/5"

feature_extractor_model = mobilenet_v3

feature_extractor_layer = hub.KerasLayer(
    feature_extractor_model,
    input_shape=(224, 224, 3),
    trainable=False)

model = tf.keras.Sequential([
  feature_extractor_layer,
  tf.keras.layers.Dense(512, activation='relu'),
  tf.keras.layers.Dropout(0.2),
  tf.keras.layers.Dense(6,activation='softmax')
])

model.summary()

In [None]:
model.compile(optimizer='adam',
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

# CALLBACK

In [None]:
%load_ext tensorboard
import datetime

log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")

tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='accuracy', patience=3)


<div class="alert alert-block alert-warning">
<b>Warning:</b> Don't run cell below if you want to save the previous logs
</div>

In [None]:
import shutil

# Clear any logs from previous runs
shutil.rmtree('./logs/')

# TRAINING


In [None]:
NUM_EPOCHS = 10

history = model.fit(train_generator,
                    validation_data=validation_generator,
                    epochs=NUM_EPOCHS,
                    verbose=1,
                    callbacks=[tensorboard_callback,early_stopping])

# ACCURACY

<div class="alert alert-block alert-warning">
<b>Warning:</b> Running cell below will interrupt the training model process
</div>

In [None]:
#tensorboard
%tensorboard --logdir logs/fit

In [None]:
import matplotlib.pyplot as plt

# Extract training and validation accuracy values from the 'history' object
acc = history.history['accuracy']
val_acc = history.history['val_accuracy']

print('acc :',acc)
print('val_acc :',val_acc)

# Extract training and validation loss values from the 'history' object
loss = history.history['loss']
val_loss = history.history['val_loss']

print('loss :',loss)
print('val_loss :',val_loss)

# Create a new figure with two subplots
plt.figure(figsize=(8, 8))
plt.subplot(2, 1, 1)  # First subplot (top)
plt.plot(acc, label='Training Accuracy')  # Plot training accuracy
plt.plot(val_acc, label='Validation Accuracy')  # Plot validation accuracy
plt.legend(loc='lower right')  # Add a legend in the lower right corner
plt.ylabel('Accuracy')  # Label for the y-axis
plt.ylim([min(plt.ylim()), 1])  # Set y-axis limits
plt.title('Training and Validation Accuracy')  # Title for the subplot

plt.subplot(2, 1, 2)  # Second subplot (bottom)
plt.plot(loss, label='Training Loss')  # Plot training loss
plt.plot(val_loss, label='Validation Loss')  # Plot validation loss
plt.legend(loc='upper right')  # Add a legend in the upper right corner
plt.ylabel('Cross Entropy')  # Label for the y-axis
plt.ylim([0, 1.0])  # Set y-axis limits
plt.title('Training and Validation Loss')  # Title for the subplot
plt.xlabel('epoch')  # Label for the x-axis
plt.show()  # Display the plot


# TEST

In [None]:
import os
from tensorflow.keras.preprocessing.image import img_to_array, load_img
import numpy as np
import matplotlib.pyplot as plt

class_labels = ["Cardboard", "Glass", "Metal", "Organic", "Paper","Plastic"]

# Path to the directory containing your test images
test_dir = os.path.join('data','test')

# Get a list of all image files in the test directory
image_paths = [os.path.join(test_dir, file) for file in os.listdir(test_dir) if file.endswith(('.jpg', '.png', '.jpeg'))]

for image_path in image_paths:
    # Load and preprocess the image using Pillow (PIL)
    img = load_img(image_path, target_size=(224, 224))  # Load and resize the image
    x = img_to_array(img)  # Convert the image to a NumPy array
    x = x / 255.0  # Normalize the pixel values to the range [0, 1]

    # Predict using the model
    yhat = model.predict(np.expand_dims(x, axis=0))
    predicted_class = np.argmax(yhat, axis=1)[0]
    predicted_label = class_labels[predicted_class]

    print(f'Predicted class for {image_path} is {predicted_label}')
    print(f'Class index: {predicted_class}')

    plt.imshow(x)  # Display the resized image
    plt.title(f'Predicted class: {predicted_label}')
    plt.show()


# GENERATE SAVED MODEL

In [None]:
# Save the model
export_dir = 'saved_model/1'
tf.saved_model.save(model, export_dir)