<a href="https://colab.research.google.com/github/GMobinit/cat_dog_recognition_model/blob/main/cat_dog_recognition_vgg.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
! wget https://download.microsoft.com/download/3/e/1/3e1c3f21-ecdb-4869-8368-6deba77b919f/kagglecatsanddogs_5340.zip

In [None]:
import tensorflow as tf
import zipfile
tf.__version__

In [None]:
zipfile.ZipFile('kagglecatsanddogs_5340.zip', 'r').extractall('dataset')

In [None]:
import pathlib
data_dir = pathlib.Path('dataset/PetImages').with_suffix('')

In [None]:
cat_image_count = len(list(data_dir.glob('Cat/*.jpg')))
dog_image_count = len(list(data_dir.glob('Dog/*.jpg')))
print(f"cat image count: {cat_image_count}, dog image count: {dog_image_count}, all images: {cat_image_count+dog_image_count}")

In [None]:
# import PIL

In [None]:
# PIL.Image.open(str(list(data_dir.glob('Cat/910.jpg'))[0]))

In [None]:
list_ds  = tf.data.Dataset.list_files(str(data_dir/'*/*.jpg'), shuffle=False)
list_ds = list_ds.shuffle(cat_image_count+dog_image_count, reshuffle_each_iteration=False)
print(len(list_ds))

In [None]:
for file_add in list_ds.take(5):
  print(file_add)

In [None]:
labels = []
for item in data_dir.glob('*'):
  labels.append(item.name)

print(labels)

In [None]:
## train validation split
validation_percentage = 0.2
validation_size = int((cat_image_count+dog_image_count)*validation_percentage)
print(validation_size)
print(len(list_ds))
validation_ds = list_ds.take(validation_size)
train_ds = list_ds.skip(validation_size)

print(f'validation size {len(validation_ds)}, train size {len(train_ds)}')

In [None]:
import os
def get_label(file_path):
  separated_path = tf.strings.split(file_path, os.path.sep)
  label = separated_path[-2]
  one_hotted_label = (label == labels)
  return tf.argmax(one_hotted_label)

In [None]:
get_label('dataset/kagglecatsanddogs_3367a/PetImages/Cat/7411.jpg')

In [None]:
batch_size = 32
img_height = 180
img_width = 180

In [None]:
def decode_img(encoded_img):
  img = tf.io.decode_jpeg(encoded_img, channels=3)
  img = tf.image.resize(img, [img_height, img_width], method = tf.image.ResizeMethod.NEAREST_NEIGHBOR)
  return img

In [None]:
def process_path(img_path):
  label = get_label(img_path)
  img = tf.io.read_file(img_path)
  if tf.equal(tf.strings.length(img), 0):
    print("warning: empty file at ", img)
  try:
    img = decode_img(img)
  except Exception as e:
    print(f"wrong image format {img_path}")
  print(img)
  img = tf.cast(img, tf.float32)
  # img = ((img/255))
  return img, label

In [None]:
import os

def is_valid_image(img_path):
    img = tf.io.read_file(img_path)
    # Check if the file can be decoded as a JPEG image
    is_jpeg = tf.image.is_jpeg(img)
    return is_jpeg

In [None]:
AUTOTUNE = tf.data.AUTOTUNE

filtered_train_ds = train_ds.filter(is_valid_image)
filtered_validation_ds = validation_ds.filter(is_valid_image)

# Count the elements in the filtered datasets
train_count = 0
for _ in filtered_train_ds:
  train_count += 1

validation_count = 0
for _ in filtered_validation_ds:
  validation_count += 1


mapped_train_ds = filtered_train_ds.map(map_func= process_path,num_parallel_calls=AUTOTUNE)
mapped_validation_ds = filtered_validation_ds.map(map_func=process_path, num_parallel_calls=AUTOTUNE)

# Add checks to see the size of the datasets after filtering
print(f"Size of filtered_train_ds after filtering: {train_count}")
print(f"Size of filtered_validation_ds after filtering: {validation_count}")
print(f"Size of all filtered data: {validation_count+train_count}")
print(f"Size of mapped_train_ds after filtering: {tf.data.experimental.cardinality(mapped_train_ds).numpy()}")
print(f"Size of mapped_validation_ds after filtering: {tf.data.experimental.cardinality(mapped_validation_ds).numpy()}")

In [None]:
# for item in mapped_train_ds.take(5):
#   print("Image shape: ", item)

In [None]:
def configure_for_performance(ds: tf.data.Dataset):
  # ds = ds.cache()
  ds = ds.shuffle(buffer_size=500)
  ds = ds.batch(batch_size, drop_remainder=True)
  ds = ds.prefetch(buffer_size=AUTOTUNE)
  return ds

In [None]:
high_performance_train_ds = configure_for_performance(mapped_train_ds)
high_performance_validation_ds = configure_for_performance(mapped_validation_ds)

validation_count = 0
for _ in high_performance_validation_ds:
  validation_count += 1
print(f"Size of high_performance_validation_ds: {validation_count}")

In [None]:
for a,b in high_performance_train_ds.take(1):
  print(a.shape)
  print(b.numpy())

In [None]:
import matplotlib.pyplot as plt

for image_batch, label_batch in high_performance_train_ds.take(1):

  plt.figure(figsize=(10,10))
  for i in range(9):
    ax = plt.subplot(3,3,i+1)
    plt.imshow(image_batch[i].numpy().astype("uint32"))
    label = label_batch[i]
    plt.title(labels[label])
    plt.axis("off")

In [None]:
base_model = tf.keras.applications.VGG16(include_top=False, weights='imagenet', input_shape=(180,180,3))

In [None]:
base_model.trainable = False

In [None]:
flatten_layer = tf.keras.layers.GlobalAveragePooling2D()
dense_layer = tf.keras.layers.Dense(100, activation='relu')
dropout_layer = tf.keras.layers.Dropout(0.5)
prediction_layer = tf.keras.layers.Dense(1, activation='sigmoid')

In [None]:
model = tf.keras.models.Sequential([
  base_model,
  flatten_layer,
  dense_layer,
  dropout_layer,
  prediction_layer
])

In [None]:
model.compile(optimizer=tf.keras.optimizers.RMSprop(learning_rate=0.001), loss = tf.keras.losses.BinaryCrossentropy, metrics=['acc'])

In [None]:
model.summary()

In [None]:
model.fit(high_performance_train_ds, epochs=30, validation_data=high_performance_validation_ds)