In [1]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import cv2
import os
import zipfile
from PIL import Image
from six import BytesIO

2023-12-06 00:18:22.836366: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
# uncomment if zip file has not been extracted
# zip_filename = 'train.zip'
# with zipfile.ZipFile(zip_filename, 'r') as z:
#   z.extractall('train_images')

In [3]:
train_file_path = os.path.join('train_images', 'train')
labels_list = sorted(os.listdir(train_file_path))
if '.DS_Store' in labels_list:
    labels_list.remove('.DS_Store')
label_encoding = {label: idx for idx, label in enumerate(labels_list)}

file_paths = []
labels_all = []
for label in labels_list:
  label_path = os.path.join(train_file_path, label)
  for f in os.listdir(label_path):
    file_paths.append(os.path.join(label_path, f))
    labels_all.append(label_encoding[label])

In [4]:
def load_image_to_numpy(path):
  img_data = tf.io.gfile.GFile(path, 'rb').read()
  return np.array(tf.image.decode_jpeg(img_data, channels=3)).reshape((256,256,3)) / 255
  #image = Image.open(BytesIO(img_data))
  #return np.array(image.get_data().reshape((256, 256, 3)).astype(tf.float32))

In [None]:
# get images into numpy and shuffle/split data
mapped = np.array(list(map(load_image_to_numpy, file_paths)))
labels_all = np.array(labels_all)
indices = np.arange(len(mapped))
np.random.shuffle(indices)
mapped = mapped[indices]
labels_all = labels_all[indices]

train_size = int(len(labels_all) * 0.9)

mapped_train = mapped[:train_size]
test_dataset_images = mapped[train_size:]
test_dataset_labels = labels_all[train_size:]

In [None]:
print(len(mapped_train), len(test_dataset_images))

In [None]:
def label_generator():
  for x in range(len(mapped_train)):
    yield mapped[x], labels_all[x]

In [None]:
train_dataset = tf.data.Dataset.from_generator(
  generator=label_generator,
  output_signature=(
    tf.TensorSpec(shape=(256,256,3), dtype=tf.int32),
    tf.TensorSpec(shape=(), dtype=tf.int32))
  )

train_dataset = train_dataset.shuffle(buffer_size=len(file_paths)).batch(32)
train_dataset = train_dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

In [None]:
# normalize the dataset before?
model = tf.keras.Sequential()
model.add(tf.keras.layers.InputLayer(input_shape=(256,256,3)))
model.add(tf.keras.layers.Conv2D(16, kernel_size=(3,3), activation='relu'))
model.add(tf.keras.layers.Conv2D(16, kernel_size=(3,3), activation='relu'))
# model.add(tf.keras.layers.BatchNormalization)
model.add(tf.keras.layers.MaxPooling2D((2,2), strides=(2,2)))
model.add(tf.keras.layers.Conv2D(32, kernel_size=(3,3), activation='relu'))
model.add(tf.keras.layers.Conv2D(32, kernel_size=(3,3), activation='relu'))
model.add(tf.keras.layers.MaxPooling2D((2,2), strides=(2,2)))
model.add(tf.keras.layers.Conv2D(64, kernel_size=(3,3), activation='relu'))
model.add(tf.keras.layers.MaxPooling2D((2,2), strides=(2,2)))
model.add(tf.keras.layers.Conv2D(128, kernel_size=(3,3), activation='relu'))
model.add(tf.keras.layers.MaxPooling2D((2,2), strides=(2,2)))
model.add(tf.keras.layers.Flatten())
model.add(tf.keras.layers.Dense(32, activation='relu'))
# model.add(tf.keras.layers.Dense(16, activation='relu'))
model.add(tf.keras.layers.Dense(1, activation='sigmoid'))
model.compile(optimizer='adam', loss=tf.keras.losses.BinaryCrossentropy(from_logits=True), metrics=['accuracy'])
print(model.output_shape)
print(model.summary())

In [None]:
model.fit(train_dataset, epochs=10) # , validation_split=0.1

In [None]:
# iterator = iter(dataset)
# item = next(iterator)
# print(item)
# list(dataset.as_numpy_iterator())[:5]

In [None]:
# model.predict(mapped[0:10])
# model.save('overfitted_model')

In [None]:
def accuracy():
  correct_total = 0
  incorrect = []
  for x in range(len(test_dataset_labels)):
    t = np.array(test_dataset_images[x])
    t = np.expand_dims(t, 0)
    model_out = model.predict(t, verbose=0)
    if model_out > 0.5:
      pred = 1
    else:
      pred = 0
    if test_dataset_labels[x] == pred:
      correct_total += 1
    else:
      incorrect.append(model_out)
  return correct_total / len(test_dataset_labels), incorrect

accuracy()

In [None]:
tp = np.array(test_dataset_images[0])
tp = np.expand_dims(tp, 0)
print(tp.shape)
print(model.predict(tp))

In [None]:
test_loss, test_accuracy = model.evaluate(test_dataset_labels, test_dataset_labels, verbose=2)