In [None]:
# Import Data Science Libraries
import numpy as np
import random
import pandas as pd
import tensorflow as tf
import os
import cv2

import PIL
from pathlib import Path
from PIL import UnidentifiedImageError

from sklearn.model_selection import train_test_split
from tensorflow.keras import layers, models
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.layers import Dense, Dropout, BatchNormalization
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from tensorflow.keras import Model
from tensorflow.keras.layers.experimental import preprocessing
from pathlib import Path
import os.path
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import classification_report, confusion_matrix

!pip install pillow
from PIL import Image
!pip install matplotlib Pillow

import matplotlib.image as mpimg

# Set the seed for reproducibility
def set_seed(seed=42):
    tf.random.set_seed(seed)
    np.random.seed(seed)
    random.seed(seed)

set_seed()

# Constants
BATCH_SIZE = 32
TARGET_SIZE = (224, 224)
df = "/Users/ganeshkota/Desktop/Animals/raw-img/"

# Function to load and preprocess data
def load_and_preprocess_data(dataset_path):
    image_dir = Path(dataset_path)
    filepaths = list(image_dir.glob(r'**/*.JPG')) + list(image_dir.glob(r'**/*.jpg')) + list(image_dir.glob(r'**/*.jpeg')) + list(image_dir.glob(r'**/*.PNG'))
    labels = list(map(lambda x: os.path.split(os.path.split(x)[0])[1], filepaths))
    filepaths = pd.Series(filepaths, name='Filepath').astype(str)
    labels = pd.Series(labels, name='Label')
    image_df = pd.concat([filepaths, labels], axis=1)
    return image_df

df_preprocessed = load_and_preprocess_data(df)

# Check for corrupted images within the dataset
def check_corrupted_images(dataset_path):
    path = Path(dataset_path).rglob("*.jpg")
    for img_p in path:
        try:
            img = PIL.Image.open(img_p)
        except PIL.UnidentifiedImageError:
            print(img_p)

check_corrupted_images(df)

# Display label distribution
def display_label_distribution(image_df):
    label_counts = image_df['Label'].value_counts()
    fig, axes = plt.subplots(nrows=1, ncols=1, figsize=(20, 6))
    sns.barplot(x=label_counts.index, y=label_counts.values, alpha=0.8, palette='pastel', ax=axes)
    axes.set_title('Distribution of Labels in Image Dataset', fontsize=16)
    axes.set_xlabel('Label', fontsize=14)
    axes.set_ylabel('Count', fontsize=14)
    axes.set_xticklabels(label_counts.index, rotation=45)
    fig.suptitle('Image Dataset Label Distribution', fontsize=20)
    fig.subplots_adjust(top=0.85)
    plt.show()

display_label_distribution(df_preprocessed)

# Display 16 pictures of the dataset with their labels
def display_images(image_df):
    random_index = np.random.randint(0, len(image_df), 10)
    fig, axes = plt.subplots(nrows=2, ncols=5, figsize=(10, 10), subplot_kw={'xticks': [], 'yticks': []})
    for i, ax in enumerate(axes.flat):
        ax.imshow(plt.imread(image_df.Filepath[random_index[i]]))
        ax.set_title(image_df.Label[random_index[i]])
    plt.tight_layout()
    plt.show()

display_images(df_preprocessed)

# Separate data into train and test sets
def split_data(image_df, test_size=0.2):
    train_df, test_df = train_test_split(image_df, test_size=test_size, shuffle=True, random_state=42)
    return train_df, test_df

train_split, test_split = split_data(df_preprocessed)
train_split.shape
test_split.shape

def preprocess_images(filepaths, labels):
    processed_images = []
    for filepath, label in zip(filepaths, labels):
        try:
            image = plt.imread(filepath)
            processed_image = resize_image(image)
            processed_images.append((processed_image, label))
        except FileNotFoundError as e:
            print(f"Error reading image {filepath}: {e}")
    return processed_images

def resize_image(image):
    resized_image = cv2.resize(image, (224, 224))
    return resized_image

def preprocess_data(train_df, test_df):
    train_split_images = preprocess_images(train_df['Filepath'].values, train_df['Label'].values)
    test_split_images = preprocess_images(test_df['Filepath'].values, test_df['Label'].values)
    return train_split_images, test_split_images

train_split.columns
train_split.head()
preprocess_data(train_split, test_split)
train_split_images, test_split_images = preprocess_data(train_split, test_split)
from tensorflow.keras.preprocessing.image import ImageDataGenerator

def augment_data(train_images):
    augment = ImageDataGenerator(
        rescale=1./255,
        horizontal_flip=True,
        rotation_range=0.1,
        zoom_range=0.1
    )

    # Extract file paths from the list of tuples
    filepaths = [image[0] for image in train_images]
    labels = [image[1] for image in train_images]

    # Convert to DataFrame
    train_df = pd.DataFrame({'Image': filepaths, 'Label': labels})

    # Ensure all values in the 'Image' column are strings
    train_df['Image'] = train_df['Image'].astype(str)

    augmented_images = augment.flow_from_dataframe(
        dataframe=train_df,
        x_col='Image',
        y_col='Label',
        target_size=TARGET_SIZE,
        color_mode='rgb',
        class_mode='categorical',
        batch_size=BATCH_SIZE,
        shuffle=True,
        seed=42,
        subset='training'
    )

    return augmented_images

# Separate processed images and labels from the tuples
train_split_processed_images = [image[0] for image in train_split_images]
train_split_labels = [image[1] for image in train_split_images]

val_split_processed_images = [image[0] for image in test_split_images]
val_split_labels = [image[1] for image in test_split_images]

# Call augment_data with processed images and labels
augment_images_result = augment_data(list(zip(train_split_processed_images, train_split_labels)))

# Load pretrained model
def load_pretrained_model():
    pretrained_model = tf.keras.applications.efficientnet.EfficientNetB7(
        input_shape=(224, 224, 3),
        include_top=False,
        weights='imagenet',
        pooling='max'
    )
    pretrained_model.trainable = False
    return pretrained_model

pretrained_model = load_pretrained_model()

# Build and compile the model
def build_and_compile_model(pretrained_model):
    # Data Augmentation Step
    augment = tf.keras.Sequential([
        layers.experimental.preprocessing.Resizing(224, 224),
        layers.experimental.preprocessing.Rescaling(1./255),
        layers.experimental.preprocessing.RandomFlip("horizontal"),
        layers.experimental.preprocessing.RandomRotation(0.1),
        layers.experimental.preprocessing.RandomZoom(0.1),
        layers.experimental.preprocessing.RandomContrast(0.1),
    ])

    inputs = pretrained_model.input
    x = augment(inputs)
    x = Dense(128, activation='relu')(pretrained_model.output)
    x = BatchNormalization()(x)
    x = Dropout(0.45)(x)
    x = Dense(256, activation='relu')(x)
    x = BatchNormalization()(x)
    x = Dropout(0.45)(x)
    outputs = Dense(10, activation='softmax')(x)
    model = Model(inputs=inputs, outputs=outputs)
    model.compile(
        optimizer=tf.keras.optimizers.legacy.Adam(0.01),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )
    return model

build_compile = build_and_compile_model(pretrained_model)
train_data_generator = augment_data(list(zip(train_split_processed_images, train_split_labels)))
val_data_generator = augment_data(list(zip(val_split_processed_images, val_split_labels)))

# # Train the model
# def train_model(model, train_images, val_images, epochs=1):
#     history = model.fit(
#         train_images,
#         steps_per_epoch=len(train_images),
#         validation_data=val_images,
#         validation_steps=len(val_images),
#         epochs=epochs,
#     )
#     return history


# Train the model
def train_model(model, train_generator, val_generator, epochs=1):
    history = model.fit(
        train_generator,
        validation_data=val_generator,
        epochs=epochs,
    )
    return history

# Call the modified train_model function
train_model(build_compile, train_data_generator, val_data_generator, epochs=5)

# Predict the label of the test_images
pred = build_compile.predict(val_data_generator)
pred = np.argmax(pred, axis=1)

# Map the label
labels = (train_data_generator.class_indices)
labels = dict((v, k) for k, v in labels.items())
pred = [labels[k] for k in pred]

# Display the result
print(f'The first 5 predictions: {pred[:5]}')

# Display 25 random pictures from the dataset with their labels
random_index = np.random.randint(0, len(test_split) - 1, 15)
fig, axes = plt.subplots(nrows=1, ncols=5, figsize=(25, 15), subplot_kw={'xticks': [], 'yticks': []})

for i, ax in enumerate(axes.flat):
    ax.imshow(plt.imread(test_split.Filepath.iloc[random_index[i]]))
    if test_split.Label.iloc[random_index[i]] == pred[random_index[i]]:
        color = "Orange"
    else:
        color = "Black"
    ax.set_title(f"True: {test_split.Label.iloc[random_index[i]]}\nPredicted: {pred[random_index[i]]}", color=color)
plt.show()

# Display confusion matrix and classification report
from sklearn.metrics import confusion_matrix

y_test = list(test_split.Label)
print(classification_report(y_test, pred))
confusion_matrix(y_test, pred)
