In [None]:
!nvidia-smi

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import os
import PIL
import tensorflow as tf

from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential

In [None]:
import pathlib

data_dir = pathlib.Path("image")

image_count = len(list(data_dir.glob('*/*.jpg')))
print(image_count)

In [None]:
fabrics = list(data_dir.glob('fabric/*.jpg'))
print(len(fabrics))
PIL.Image.open(str(fabrics[0]))

In [None]:
batch_size = 16
img_height = 299
img_width = 299

In [None]:
class_names = np.array(sorted([item.name for item in data_dir.glob('*') if item.name != "LICENSE.txt"]))
print(class_names)

In [None]:
ds_each_class = [tf.data.Dataset.list_files(str(data_dir/f'{class_name}/*.jpg'), shuffle=False) for class_name in class_names]

for index, ds in enumerate(ds_each_class):
  ds_each_class[index] = ds.shuffle(image_count//10, seed=123, reshuffle_each_iteration=False)

In [None]:
for f in ds_each_class[0].take(10):
  print(f.numpy())

for f in ds_each_class[1].take(10):
  print(f.numpy())

In [None]:
# split first class dataset into 5 equal sized parts for 5-fold cross validation
A = ds_each_class[0].shard(num_shards=5, index=0)
B = ds_each_class[0].shard(num_shards=5, index=1)
C = ds_each_class[0].shard(num_shards=5, index=2)
D = ds_each_class[0].shard(num_shards=5, index=3)
E = ds_each_class[0].shard(num_shards=5, index=4)
for i in range(1, 10):
  A = A.concatenate(ds_each_class[i].shard(num_shards=5, index=0))
  B = B.concatenate(ds_each_class[i].shard(num_shards=5, index=1))
  C = C.concatenate(ds_each_class[i].shard(num_shards=5, index=2))
  D = D.concatenate(ds_each_class[i].shard(num_shards=5, index=3))
  E = E.concatenate(ds_each_class[i].shard(num_shards=5, index=4))

In [None]:
print(A.cardinality().numpy())
print(B.cardinality().numpy())
print(C.cardinality().numpy())
print(D.cardinality().numpy())
print(E.cardinality().numpy())

In [None]:
def get_label(file_path):
  # convert the path to a list of path components
  parts = tf.strings.split(file_path, os.path.sep)
  # The second to last is the class-directory
  one_hot = parts[-2] == class_names
  # Integer encode the label
  return tf.argmax(one_hot)

In [None]:
def decode_img(img):
  # convert the compressed string to a 3D uint8 tensor
  img = tf.image.decode_jpeg(img, channels=3)
  # resize the image to the desired size
  return tf.image.resize(img, [img_height, img_width])

In [None]:
def process_path(file_path):
  label = get_label(file_path)
  # load the raw data from the file as a string
  img = tf.io.read_file(file_path)
  img = decode_img(img)
  return img, label

In [None]:
AUTOTUNE = tf.data.experimental.AUTOTUNE

In [None]:
# Set `num_parallel_calls` so multiple images are loaded/processed in parallel.
A = A.map(process_path, num_parallel_calls=AUTOTUNE)
B = B.map(process_path, num_parallel_calls=AUTOTUNE)
C = C.map(process_path, num_parallel_calls=AUTOTUNE)
D = D.map(process_path, num_parallel_calls=AUTOTUNE)
E = E.map(process_path, num_parallel_calls=AUTOTUNE)

In [None]:
for image, label in A.take(1):
  print("Image shape: ", image.numpy().shape)
  print("Label: ", label.numpy())

In [None]:
def configure_for_performance(ds):
  ds = ds.cache()
  ds = ds.shuffle(buffer_size=1000)
  ds = ds.batch(batch_size)
  ds = ds.prefetch(buffer_size=AUTOTUNE)
  return ds

In [None]:
ds_fold_dict = {0:A, 1:B, 2:C, 3:D, 4:E}

In [None]:
preprocess_input = keras.applications.resnet_v2.preprocess_input

In [None]:
base_model = keras.applications.ResNet152V2(include_top=False, input_shape=(img_height, img_width, 3))

In [None]:
# don't train base model weights
base_model.trainable = False

In [None]:
base_model.summary()

In [None]:
base_learning_rate = 0.0001
def create_model():
  data_augmentation = keras.Sequential(
    [
      layers.experimental.preprocessing.RandomFlip("horizontal", 
                                                  input_shape=(img_height, 
                                                                img_width,
                                                                3)),
      layers.experimental.preprocessing.RandomRotation(0.1),
      layers.experimental.preprocessing.RandomZoom(0.1),
    ]
  )

  global_average_layer = keras.layers.GlobalAveragePooling2D()
  prediction_layer = keras.layers.Dense(10)

  inputs = keras.Input(shape=(img_height, img_width, 3))
  x = data_augmentation(inputs)
  x = preprocess_input(x)
  x = base_model(x, training=False)
  x = global_average_layer(x)
  x = keras.layers.Dropout(0.2)(x)
  outputs = prediction_layer(x)
  model = keras.Model(inputs, outputs)
  optimizer = keras.optimizers.Adam(lr=base_learning_rate)
  loss= keras.losses.SparseCategoricalCrossentropy(from_logits=True)
  model.compile(optimizer=optimizer,
                loss=loss,
                metrics=['accuracy'])
  model.summary()
  return model

In [None]:
no_epochs = 100

In [None]:
history_map = {}
base_model_acc_list = []
final_acc_list = []

for i in range(5):
  print('fold', i + 1)
  temp_dict = ds_fold_dict.copy()
  # get test set for this iteration
  current_val_ds = temp_dict[i]

  # get training set for this iteration from remaining data samples
  del temp_dict[i]
  current_train_ds = None
  for ds_shard in temp_dict.values():
    if current_train_ds is None:
      current_train_ds = ds_shard
    else:
      current_train_ds = current_train_ds.concatenate(ds_shard)
  
  current_train_ds = configure_for_performance(current_train_ds)
  current_val_ds = configure_for_performance(current_val_ds)

  # create a new model
  model = create_model()
  # get initial test accuracy
  base_model_acc_list.append(model.evaluate(current_val_ds)[1])
  # train for specified epochs
  history = model.fit(current_train_ds,
                    epochs=no_epochs,
                    validation_data=current_val_ds)
  
  # save results
  if i == 0:
    history_map['accuracy'] = [history.history['accuracy']]
    history_map['val_accuracy'] = [history.history['val_accuracy']]
    history_map['loss'] = [history.history['loss']]
    history_map['val_loss'] = [history.history['val_loss']]
  else:
    history_map['accuracy'].append(history.history['accuracy'])
    history_map['val_accuracy'].append(history.history['val_accuracy'])
    history_map['loss'].append(history.history['loss'])
    history_map['val_loss'].append(history.history['val_loss'])
  
  # get final test accuracy
  final_acc_list.append(np.amax(history.history['val_accuracy']))

In [None]:
print("Base model accuracy:", np.mean(base_model_acc_list))
print("Final accuracy:", np.mean(final_acc_list))

In [None]:
acc = np.mean(history_map['accuracy'], axis=0)
val_acc = np.mean(history_map['val_accuracy'], axis=0)

plt.figure(1)
plt.plot(acc, label='Training Accuracy')
plt.plot(val_acc, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.ylabel('Accuracy')
plt.ylim([min(plt.ylim()),1])
plt.title('Training and Validation Accuracy')
plt.savefig('cv_acc_resnet152_balanced.jpg')
plt.show()

In [None]:
loss = np.mean(history_map['loss'], axis=0)
val_loss = np.mean(history_map['val_loss'], axis=0)

plt.figure(2)
plt.plot(loss, label='Training Loss')
plt.plot(val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.ylabel('Cross Entropy')
#plt.ylim([0,1.0])
plt.title('Training and Validation Loss')
plt.xlabel('epoch')
plt.savefig('cv_loss_resnet152_balanced.jpg')
plt.show()