<a href="https://colab.research.google.com/github/dhaev/Machine-Learning/blob/main/CNN_Autoencoders.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.utils import to_categorical

import pandas as pd
import numpy as np
import math
import matplotlib.pyplot as plt

import zipfile
from PIL import Image, UnidentifiedImageError
import os

import concurrent.futures
import multiprocessing

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# @title Your Title Here

class CustomDataset():
    def __init__(self, x=None, y=None, img_size=(224, 224)):
        self.x = x
        self.y = y
        self.img_size = img_size

    @staticmethod
    def process_image(filename):
        try:
            image = tf.io.read_file(filename)
            if filename.lower().endswith('.jpeg') or filename.lower().endswith('.jpg'):
                image = tf.image.decode_jpeg(image, channels=3)
            elif filename.lower().endswith('.png'):
                image = tf.image.decode_png(image, channels=3)
            else:
                image = tf.image.decode_image(image, channels=3)
            image = tf.image.resize(image, [224, 224])
            image /= 255.0  # normalize to [0,1] range

            # If the image does not have three dimensions, return None
            if len(image.shape) != 3:
                return None
            return image
        except Exception as e:
            # print(f"Error: {e}, with image path: {filename}. Skipping.")
            return None

    @classmethod
    def get_batch_images(cls, batch_x, batch_y):
        batch_images = []
        batch_labels = []
        with concurrent.futures.ThreadPoolExecutor() as executor:
            images = list(executor.map(cls.process_image, batch_x))
        for image, label in zip(images, batch_y):
            if image is not None:
                batch_images.append(image)
                batch_labels.append(label)
        batch_images = tf.stack(batch_images)
        batch_labels = tf.one_hot(batch_labels, depth=2)
        return batch_images, batch_images

In [None]:
'''

In this modified version, I’ve moved the sampling of indices to a separate method sample_indices for better readability.
The half_batch_size is now computed only once, and list comprehension is used to create sampled_indices_list.
These changes should makes the code faster and more efficient
'''
class BatchData():
    def __init__(self, data=None, batch_size=64):
        if not isinstance(data, pd.DataFrame):
            raise TypeError("data must be a pandas DataFrame")
        if not isinstance(batch_size, int) or batch_size <= 0:
            raise ValueError("batch_size must be a positive integer")
        self.data = data
        self.labels = self.data['labels'].unique()
        self.dfs = {label: self.data[self.data['labels'] == label] for label in self.labels}
        self.indices = {label: np.arange(len(df)) for label, df in self.dfs.items()}
        self.batch_size = batch_size
        self.batches = []

    def batch(self, max_iterations=2000):
        if not isinstance(max_iterations, int) or max_iterations <= 0:
            raise ValueError("max_iterations must be a positive integer")
        iterations = 0
        half_batch_size = int(self.batch_size / 2)
        while all(len(indices) > 0 for indices in self.indices.values()) and iterations < max_iterations:
            sampled_indices_list = [
                self.sample_indices(label, half_batch_size)
                for label in self.labels
            ]
            self.batches.append(sampled_indices_list)
            iterations += 1

    def sample_indices(self, label, half_batch_size):
        if len(self.indices[label]) < half_batch_size:
            sampled_indices = np.random.choice(self.indices[label], size=half_batch_size, replace=True)
        else:
            sampled_indices = np.random.choice(self.indices[label], size=half_batch_size, replace=False)
        self.indices[label] = np.setdiff1d(self.indices[label], sampled_indices)
        return (label, sampled_indices)

    def generator(self):
        while True:
            for batch in self.batches:
                sampled_dfs = [
                    self.dfs[label].iloc[indices]
                    for label, indices in batch
                ]
                sampled_df = pd.concat(sampled_dfs).sample(frac=1).reset_index(drop=True)
                batch_x = sampled_df.iloc[:, 0]  # All rows and the first column
                batch_y = sampled_df.iloc[:, -1]  # All rows and the last column
                batch_images, batch_labels = CustomDataset.get_batch_images(batch_x, batch_y)
                yield batch_images, batch_labels


In [None]:

def zip_extract(file_name):
    # Open the zip file in read mode
    with zipfile.ZipFile(file_name, 'r') as zip_ref:
        # Extract all files in the zip file
        zip_ref.extractall()

# List of zip files to extract
zip_files = [
    # "/content/drive/MyDrive/Machine Learning/datasets/cats_and_dogs.zip",
             "/content/drive/MyDrive/Machine Learning/datasets/Cats-vs-Dogs.zip"
             ]

# # Create a pool of workers
# with multiprocessing.Pool() as pool:
#     # Use the pool to run zip_extract concurrently on all zip files
#     pool.map(zip_extract, zip_files)

# Create a ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor() as executor:
    # Use the executor to run zip_extract concurrently on all zip files
    executor.map(zip_extract, zip_files)

In [None]:

def balance_dataframe(df, label_column='labels', n_samples=1600):
    # Get the unique labels
    labels = df[label_column].unique()

    # Create an empty DataFrame to store the balanced data
    balanced_df = pd.DataFrame()

    for label in labels:
        # Get a subset of the DataFrame with the current label
        subset = df[df[label_column] == label]

        # If the subset is larger than n_samples, randomly select n_samples rows
        if len(subset) > n_samples:
            subset = subset.sample(n_samples)
        # If the subset is smaller than n_samples, oversample it to reach n_samples
        elif len(subset) < n_samples:
            subset = subset.sample(n_samples, replace=True)

        # Append the subset to the balanced DataFrame
        balanced_df = pd.concat([balanced_df, subset])

    return balanced_df

In [None]:
def load_images(folder_name):
    try:
        folder_path = os.path.join(base_folder_path, folder_name)
        image_path = [f"{folder_path}/{x}" for x in os.listdir(folder_path)]
        label_dict = {'Cat': 0, 'Dog': 1}
        # label_dict = {'cat': 0, 'dog': 1}
        image_label = [label_dict[folder_name]] * len(image_path)
        return image_path, image_label
    except Exception as e:
        print(f"Error: {e}, with folder: {folder_name}. Skipping.")
        return [], []

base_folder_path = '/content/Cats-vs-Dogs/PetImages'
# base_folder_path = '/content/drive/MyDrive/Machine Learning/datasets/Cats-vs-Dogs/PetImages'
# base_folder_path = '/content/drive/MyDrive/Machine Learning/datasets/cats_and_dogs/train'
pet_dict = {'images': [], 'labels': []}

try:
    with concurrent.futures.ThreadPoolExecutor() as executor:
        results = list(executor.map(load_images, os.listdir(base_folder_path)))
except Exception as e:
    print(f"Error: {e}. Failed to load images.")

for result in results:
    pet_dict['images'].extend(result[0])
    pet_dict['labels'].extend(result[1])
pet_df = pd.DataFrame(pet_dict)
# pet_df.to_csv("/content/drive/MyDrive/Machine Learning/datasets/cats_and_dogs/train/cats_and_dogs_train.csv",index=False)

In [None]:
# !cp -r "/content/Cats-vs-Dogs/PetImages/Cat" "/content/drive/MyDrive/Machine Learning/Cats-vs-Dogs/PetImages/"

In [None]:
# Shuffle the data
pet_df = pet_df.sample(frac=1, random_state=42)

# Define sizes
train_size = int(0.7 * len(pet_df))
val_size = int(0.15 * len(pet_df))
test_size = int(0.15 * len(pet_df))

# Split the data
train_dataset = pet_df[:train_size]
val_dataset = pet_df[train_size:train_size+val_size]
test_data = pet_df[train_size+val_size:]

In [None]:

batch_data_instance = BatchData(data=train_dataset, batch_size=32)

batch_data_instance.batch()
set(pet_df['labels'].values)

{0, 1}

In [None]:
val_features, val_labels = CustomDataset.get_batch_images(val_dataset['images'], val_dataset['labels'])

In [None]:
set(val_dataset['labels'].values)

{0, 1}

In [None]:
from keras import layers

input_img = keras.Input(shape=(224, 224, 3))

x = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(input_img)
x = layers.MaxPooling2D((2, 2), padding='same')(x)
x = layers.Conv2D(8, (3, 3), activation='relu', padding='same')(x)
x = layers.MaxPooling2D((2, 2), padding='same')(x)
x = layers.Conv2D(8, (3, 3), activation='relu', padding='same')(x)
encoded = layers.MaxPooling2D((2, 2), padding='same')(x)

# at this point the representation is (4, 4, 8) i.e. 128-dimensional

x = layers.Conv2D(8, (3, 3), activation='relu', padding='same')(encoded)
x = layers.UpSampling2D((2, 2))(x)
x = layers.Conv2D(8, (3, 3), activation='relu', padding='same')(x)
x = layers.UpSampling2D((2, 2))(x)
x = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(x)  # change this line
x = layers.UpSampling2D((2, 2))(x)
decoded = layers.Conv2D(3, (3, 3), activation='sigmoid', padding='same')(x)

autoencoder = keras.Model(input_img, decoded)
autoencoder.compile(optimizer='adam', loss='binary_crossentropy')


In [None]:
epoch = 15
# Get the number of steps per epoch
steps_per_epoch = len(batch_data_instance.batches)
print(f"steps_per_epoch = {steps_per_epoch}")

steps_per_epoch = 546


In [None]:
aut  = autoencoder.fit(batch_data_instance.generator(), steps_per_epoch=steps_per_epoch, epochs=20, validation_data=(val_features,val_features))

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20

In [None]:
def plot_history(hist):
  # acc = hist.history['accuracy']
  # val_acc = hist.history['val_accuracy']

  loss = hist.history['loss']
  val_loss = hist.history['val_loss']

  epochs_range = range(epoch)

  plt.figure(figsize=(8, 8))
  plt.subplot(1, 2, 1)
  # plt.plot(epochs_range, acc, label='Training Accuracy')
  # plt.plot(epochs_range, val_acc, label='Validation Accuracy')
  # plt.legend(loc='lower right')
  # plt.title('Training and Validation Accuracy')

  plt.subplot(1, 2, 2)
  plt.plot(epochs_range, loss, label='Training Loss')
  plt.plot(epochs_range, val_loss, label='Validation Loss')
  plt.legend(loc='upper right')
  plt.title('Training and Validation Loss')
  plt.show()

In [None]:
plot_history(aut)

In [None]:
# hist_df = pd.DataFrame(history.history)

In [None]:
# with open('/content/drive/MyDrive/Machine Learning/cnn/History/model1', mode='w') as f:
#     hist_df.to_csv(f, index=False)

In [None]:
# rd=pd.read_csv('/content/drive/MyDrive/Machine Learning/cnn/History/model1')
# rd

In [None]:
# # Assuming 'model' is your Keras model
# with open('/content/drive/MyDrive/Machine Learning/cnn/History/model1_summary.txt', 'w') as f:
#     # Pass the file handle in as a lambda function to 'print_fn' for the summary method
#     model.summary(print_fn=lambda x: f.write(x + '\n'))


In [None]:
decoded_imgs = autoencoder.predict(val_features)

n = 10
plt.figure(figsize=(20, 4))
for i in range(1, n + 1):
    # Display original
    ax = plt.subplot(2, n, i)
    plt.imshow(x_test[i].reshape(28, 28))
    plt.gray()
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)

    # Display reconstruction
    ax = plt.subplot(2, n, i + n)
    plt.imshow(decoded_imgs[i].reshape(28, 28))
    plt.gray()
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)
plt.show()