# Hotel CNN


In [None]:
import os
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf

from keras import optimizers
from keras.layers import Dense, Flatten
from keras.models import Sequential
from keras.preprocessing.image import ImageDataGenerator
from random import sample
from keras.applications.resnet_v2 import ResNet50V2
from tensorflow.keras.models import load_model

In [None]:
IMG_SIZE = 256

In [None]:
base_dir = 'D:/CNN Hotel Classification'
train_dir = os.path.join(base_dir,'train_images')
save_directory = os.path.join(base_dir,'processed_images15')
test_directory = os.path.join(base_dir,'test_images15')

In [None]:
directory_list = os.listdir(train_dir)
classes = np.empty((len(directory_list),1))
total = 0

for i, dir in enumerate(directory_list):
  temp_dir = os.path.join(train_dir, dir)
  classes[i] = len(os.listdir(temp_dir))
  if classes[i] >= 100:
    print(dir, classes[i])
    total += 1

plt.boxplot(classes)
print(total)

# Oversampling / Undersampling and Test dataset separation

In [None]:
train_datagen = ImageDataGenerator(rescale=1/255,
                                   rotation_range=40,
                                   width_shift_range=0.2,
                                   height_shift_range=0.2,
                                   shear_range=0.2,
                                   zoom_range=0.2,
                                   horizontal_flip=True,
                                   fill_mode='nearest')


In [None]:
def save_image(img_path, i, save_dir): 
    name = os.path.join(save_dir, str(i) + '.jpg')
    img = tf.keras.preprocessing.image.load_img(img_path).resize((IMG_SIZE, IMG_SIZE))
    img.save(name)

In [None]:
directory_list = os.listdir(train_dir)

min_img = 15
target_number = min_img * 8
max_target = 400
test_img = 2

# Create save and test directories
if not os.path.exists(save_directory):
  os.mkdir(save_directory)
if not os.path.exists(test_directory):
  os.mkdir(test_directory)

# Iterate Kaggle Dataset
for dir in directory_list:
  temp_train_dir = os.path.join(train_dir, dir)
  # Skip classes with less images than our minimum number required
  if len(os.listdir(temp_train_dir)) < min_img:
    continue
  
  # Create save and test class folders
  temp_save_dir = os.path.join(save_directory, dir)
  if not os.path.exists(temp_save_dir):
    os.mkdir(temp_save_dir)
  temp_test_dir = os.path.join(test_directory, dir)
  if not os.path.exists(temp_test_dir):
    os.mkdir(temp_test_dir)

  # List all image paths from class
  img_names = [os.path.join(temp_train_dir, name) for name in os.listdir(temp_train_dir)]

  # Oversampling
  if len(img_names) < target_number + test_img:
    # Determine number of images generated from each image
    remainder = sample(img_names, k= target_number % (len(img_names) - test_img)) 
    target_per_img = target_number // (len(img_names) - test_img) - 1
    i, j, test_i = 0, 0, 0

    for img_path in img_names:
      if test_i < test_img and img_path not in remainder:
        save_image(img_path, test_i, temp_test_dir)
        test_i += 1
        continue

      save_image(img_path, i+test_img, temp_save_dir)
      i += 1
      if target_per_img > 0 or img_path in remainder:
        img = tf.keras.preprocessing.image.load_img(img_path).resize((IMG_SIZE, IMG_SIZE))
        img_array = tf.keras.preprocessing.image.img_to_array(img)
        img_array = img_array.reshape((1,) + img_array.shape)

        for batch in train_datagen.flow(img_array, batch_size=1):
          temp = tf.keras.preprocessing.image.array_to_img(batch[0])
          name = os.path.join(temp_save_dir, str(i+test_img) + '.jpg')
          temp.save(name)
          i += 1
          j += 1

          if (img_path in remainder and j == (target_per_img + 1)) or \
            (img_path not in remainder and j == target_per_img):
              j = 0
              break

  # Undersampling
  elif len(img_names) >= target_number + test_img:
    target = 0
    if max_target + test_img < len(img_names):
      target = max_target + test_img
    else:
      target = len(img_names)
    random_images = sample(img_names, k= target)
    for i, img_path in enumerate(random_images):
      if i < test_img:
        save_image(img_path, i, temp_test_dir)
      else:
        save_image(img_path, i, temp_save_dir)


In [None]:
directory_list = os.listdir(save_directory)
classes = np.empty((len(directory_list),1))

for i, dir in enumerate(directory_list):
  temp_dir = os.path.join(save_directory, dir)
  classes[i] = len(os.listdir(temp_dir))

plt.boxplot(classes)

# Batch Generators for Train and Validation Split / Test

In [None]:
train_data = ImageDataGenerator(rescale=1/255,
                                samplewise_center=True,
                                validation_split=0.05)

train_generator = train_data.flow_from_directory(save_directory,
                                                 target_size = (IMG_SIZE, IMG_SIZE),
                                                 batch_size=128,
                                                 class_mode="categorical",
                                                 subset='training')

valid_generator = train_data.flow_from_directory(save_directory, 
                                                 target_size = (IMG_SIZE, IMG_SIZE),
                                                 batch_size=128,
                                                 class_mode="categorical",
                                                 subset='validation')

test_generator = train_data.flow_from_directory(test_directory,
                                                target_size = (IMG_SIZE, IMG_SIZE),
                                                batch_size=128,
                                                class_mode="categorical"
)


# Transfer Learning

In [None]:
conv_base = ResNet50V2(weights='imagenet',
                       include_top=False,
                       input_shape=(IMG_SIZE, IMG_SIZE, 3))
conv_base.summary()

# Build and Train Model

In [None]:
conv_base.trainable = False
model = Sequential([
    conv_base,
    Flatten(),
    # Dense(256, activation='relu'), # , kernel_initializer=tf.keras.initializers.HeUniform()
    Dense(794, activation='softmax')                    
])
model.summary()

In [None]:
model.compile(loss='categorical_crossentropy', optimizer=optimizers.Adam(learning_rate=0.001), metrics=['acc', tf.keras.metrics.TopKCategoricalAccuracy(k=5)])

In [None]:
history = model.fit(train_generator, steps_per_epoch=len(train_generator), epochs=2, validation_data=valid_generator, validation_steps=len(valid_generator))

In [None]:
model.save('model11epchSimple.h5')

In [None]:
plt.subplot(1, 3, 1)
plt.title("Loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.plot(history.history["loss"], label="Train")
plt.plot(history.history["val_loss"], label="Validation")
plt.legend()

plt.subplot(1, 3, 2)
plt.title("Accuracy")
plt.xlabel("Epochs")
plt.ylabel("Categorical Accuracy")
plt.plot(history.history["acc"], label="Train")
plt.plot(history.history["val_acc"], label="Validation")
plt.legend()

plt.subplot(1, 3, 3)
plt.title("Accuracy Top 5")
plt.xlabel("Epochs")
plt.ylabel("Categorical Accuracy")
plt.plot(history.history["top_k_categorical_accuracy"], label="Train")
plt.plot(history.history["val_top_k_categorical_accuracy"], label="Validation")
plt.legend()

In [None]:
trained_model = load_model('model11epchSimple.h5')

# Fine Tuning

In [None]:
base_ResNet50V2 = trained_model.layers[0]
base_ResNet50V2.trainable = False

for layer in base_ResNet50V2.layers[154:]:
    layer.trainable = True

# Make sure you have unfrozen the correct layers
for i, layer in enumerate(base_ResNet50V2.layers):
    print(i, layer.name, layer.trainable)

In [None]:
trained_model.compile(loss='categorical_crossentropy', optimizer=tf.optimizers.Adam(learning_rate=0.00001), metrics=['acc', tf.keras.metrics.TopKCategoricalAccuracy(k=5)])

In [None]:
history2 = trained_model.fit(train_generator, steps_per_epoch=len(train_generator), epochs=3, validation_data=valid_generator, validation_steps=len(valid_generator))

In [None]:
plt.subplot(1, 3, 1)
plt.title("Loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.plot(history2.history["loss"], label="Train")
plt.plot(history2.history["val_loss"], label="Validation")
plt.legend()

plt.subplot(1, 3, 2)
plt.title("Accuracy")
plt.xlabel("Epochs")
plt.ylabel("Categorical Accuracy")
plt.plot(history2.history["acc"], label="Train")
plt.plot(history2.history["val_acc"], label="Validation")
plt.legend()

plt.subplot(1, 3, 3)
plt.title("Accuracy Top 5")
plt.xlabel("Epochs")
plt.ylabel("Categorical Accuracy")
plt.plot(history2.history["top_k_categorical_accuracy"], label="Train")
plt.plot(history2.history["val_top_k_categorical_accuracy"], label="Validation")
plt.legend()

In [None]:
model.save('modelFT3Simple.h5')

# Test Model

In [None]:
model.evaluate(test_generator)