In [None]:
from keras.models import Model, load_model
from keras.layers import Dense, Dropout, Flatten, Input, BatchNormalization, Conv2D, MaxPool2D,Activation, MaxPooling2D, Add, ZeroPadding2D, AveragePooling2D
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.regularizers import l2
from keras.preprocessing.image import ImageDataGenerator
from keras.callbacks import Callback, ModelCheckpoint
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

In [None]:
MODEL_NAME = 'model.h5'
BATCH_SIZE = 20
IMAGE_SIZE = (256,256)
IMAGE_NUM = 1856
TRAINING_SET_NUM = int(0.7 * IMAGE_NUM)
VALIDATION_SET_NUM = int(0.2 * IMAGE_NUM)
TEST_SET_NUM = IMAGE_NUM - TRAINING_SET_NUM - VALIDATION_SET_NUM
MAX_MEAT = 47
MAX_VEGGIE = 101
MAX_NOODLE = 268

In [None]:
def shuffle_data() :
	dataframe = pd.read_csv('./dataset/data_aug.csv', delimiter = ',', header = 0)
	dataframe = dataframe.reindex(np.random.permutation(dataframe.index))
	dataframe.to_csv('./dataset/data_shuf.csv', sep = ',', encoding = 'utf-8', index = False)

dataframe = pd.read_csv('./dataset/data_shuf.csv', delimiter = ',', header = 0)
dataframe.head()

In [None]:
datagen = ImageDataGenerator(
            rescale=1./255,
            brightness_range=[0.9,1.1],
            shear_range=1,
            zoom_range=0.05,
            rotation_range=10,
            width_shift_range=0.03,
            height_shift_range=0.03,
            vertical_flip=True,
            horizontal_flip=True)

datagen_no_aug = ImageDataGenerator(rescale=1./255)

train_generator = datagen.flow_from_dataframe(
    dataframe = dataframe.loc[0 : TRAINING_SET_NUM - 1],
    directory = './dataset/images',
    x_col = 'filename',
    y_col = ['norm_meat', 'norm_veggie', 'norm_noodle'],
	shuffle = False,
    target_size = IMAGE_SIZE,
    batch_size = BATCH_SIZE,
    class_mode = 'raw')

validation_generator = datagen_no_aug.flow_from_dataframe(
    dataframe = dataframe.loc[TRAINING_SET_NUM : TRAINING_SET_NUM + VALIDATION_SET_NUM - 1],
    directory = './dataset/images',
    x_col = 'filename',
    y_col = ['norm_meat', 'norm_veggie', 'norm_noodle'],
    shuffle = False,
    target_size = IMAGE_SIZE,
    batch_size = BATCH_SIZE,
    class_mode = 'raw')

test_generator = datagen_no_aug.flow_from_dataframe(
    dataframe = dataframe.loc[TRAINING_SET_NUM + VALIDATION_SET_NUM : IMAGE_NUM - 1],
    directory = './dataset/images',
    x_col = 'filename',
    y_col = ['norm_meat', 'norm_veggie', 'norm_noodle'],
    shuffle = False,
    target_size = IMAGE_SIZE,
    batch_size = BATCH_SIZE,
    class_mode = 'raw')

In [None]:
def simple_cnn() :
	inputIm = Input(shape = (IMAGE_SIZE[0], IMAGE_SIZE[1], 3, ))
	conv1 = Conv2D(64, 3, activation = 'relu')(inputIm)
	conv1 = Conv2D(64, 3, activation = 'relu')(conv1)
	conv1 = BatchNormalization()(conv1)
	pool1 = MaxPool2D()(conv1)
	conv2 = Conv2D(128, 3, activation = 'relu')(pool1)
	conv2 = Conv2D(128, 3, activation = 'relu')(conv2)
	conv2 = BatchNormalization()(conv2)
	pool2 = MaxPool2D()(conv2)
	conv3 = Conv2D(256, 3, activation = 'relu')(pool2)
	conv3 = Conv2D(256, 3, activation = 'relu')(conv3)
	conv3 = BatchNormalization()(conv3)
	pool3 = MaxPool2D()(conv3)
	conv4 = Conv2D(512, 3, activation = 'relu')(pool3)
	conv4 = Conv2D(512, 3, activation = 'relu')(conv4)
	conv4 = BatchNormalization()(conv4)
	pool4 = MaxPool2D()(conv4)
	conv5 = Conv2D(1024, 3, activation = 'relu')(pool4)
	conv5 = Conv2D(1024, 3, activation = 'relu')(conv5)
	conv5 = BatchNormalization()(conv5)
	pool5 = MaxPool2D()(conv5)
	flat = Flatten()(pool5)
	dense1 = Dense(512, activation = 'sigmoid')(flat)
	dense1 = Dropout(0.5)(dense1)
	dense1 = Dense(512, activation = 'sigmoid')(dense1)
	dense1 = Dropout(0.5)(dense1)
	dense1 = Dense(512, activation = 'sigmoid')(dense1)
	dense1 = Dropout(0.5)(dense1)
	predictedW = Dense(3, activation = 'sigmoid')(dense1)

	return Model(inputs = inputIm, outputs = predictedW)

In [None]:
def res_identity(x, filter): 
  x_skip = x 

  x = Conv2D(filter, 3, padding = 'same')(x)
  x = BatchNormalization(axis = 3)(x)
  x = Activation('relu')(x)

  x = Conv2D(filter, 3, padding = 'same')(x)
  x = BatchNormalization(axis = 3)(x)

  x = Add()([x, x_skip])
  x = Activation('relu')(x)
  return x

def res_conv(x, filter):
  x_skip = x

  x = Conv2D(filter, 3, 2, padding = 'same')(x)
  x = BatchNormalization(axis = 3)(x)
  x = Activation('relu')(x)

  x = Conv2D(filter, 3, padding = 'same')(x)
  x = BatchNormalization(axis = 3)(x)

  x_skip = Conv2D(filter, 1, 2)(x_skip)

  x = Add()([x, x_skip])
  x = Activation('relu')(x)
  return x

def resnet34():
  input_im = Input(shape = (IMAGE_SIZE[0], IMAGE_SIZE[1], 3))
  x = ZeroPadding2D(3)(input_im)

  x = Conv2D(64, 7, 2, padding = 'same')(x)
  x = BatchNormalization()(x)
  x = Activation('relu')(x)
  x = MaxPool2D(3, 2, padding = 'same')(x)

  block_layers = [3, 4, 6, 3]
  filter_size = 64

  for i in range(4) :
    if i == 0 :
      for j in range(block_layers[i]) :
        x = res_identity(x, filter_size)
    else :
      filter_size *= 2
      x = res_conv(x, filter_size)
      for j in range(block_layers[i] - 1) :
        x = res_identity(x, filter_size)

  x = AveragePooling2D(2, padding = 'same')(x)
  x = Flatten()(x)
  x = Dense(512, activation = 'sigmoid')(x)
  x = Dropout(0.5)(x)
  x = Dense(512, activation = 'sigmoid')(x)
  x = Dropout(0.5)(x)
  x = Dense(512, activation = 'sigmoid')(x)
  x = Dropout(0.5)(x)
  x = Dense(3, activation = 'sigmoid')(x)

  return Model(inputs = input_im, outputs = x)

In [None]:
def res50_identity(x, filters): 
  x_skip = x
  f1, f2 = filters

  x = Conv2D(f1, kernel_size=(1, 1), strides=(1, 1), padding='valid', kernel_regularizer=l2(0.001))(x)
  x = BatchNormalization()(x)
  x = Activation('relu')(x)

  x = Conv2D(f1, kernel_size=(3, 3), strides=(1, 1), padding='same', kernel_regularizer=l2(0.001))(x)
  x = BatchNormalization()(x)
  x = Activation('relu')(x)

  x = Conv2D(f2, kernel_size=(1, 1), strides=(1, 1), padding='valid', kernel_regularizer=l2(0.001))(x)
  x = BatchNormalization()(x)

  x = Add()([x, x_skip])
  x = Activation('relu')(x)
  return x

def res50_conv(x, s, filters):
  x_skip = x
  f1, f2 = filters

  x = Conv2D(f1, kernel_size=(1, 1), strides=(s, s), padding='valid', kernel_regularizer=l2(0.001))(x)
  x = BatchNormalization()(x)
  x = Activation('relu')(x)

  x = Conv2D(f1, kernel_size=(3, 3), strides=(1, 1), padding='same', kernel_regularizer=l2(0.001))(x)
  x = BatchNormalization()(x)
  x = Activation('relu')(x)

  x = Conv2D(f2, kernel_size=(1, 1), strides=(1, 1), padding='valid', kernel_regularizer=l2(0.001))(x)
  x = BatchNormalization()(x)

  x_skip = Conv2D(f2, kernel_size=(1, 1), strides=(s, s), padding='valid', kernel_regularizer=l2(0.001))(x_skip)
  x_skip = BatchNormalization()(x_skip)

  x = Add()([x, x_skip])
  x = Activation('relu')(x)
  return x

def resnet50():
  input_im = Input(shape=(IMAGE_SIZE[0], IMAGE_SIZE[1], 3))
  x = ZeroPadding2D(padding=(3, 3))(input_im)

  x = Conv2D(64, kernel_size=(7, 7), strides=(2, 2))(x)
  x = BatchNormalization()(x)
  x = Activation('relu')(x)
  x = MaxPooling2D((3, 3), strides=(2, 2))(x)

  x = res50_conv(x, s=1, filters=(64, 256))
  x = res50_identity(x, filters=(64, 256))
  x = res50_identity(x, filters=(64, 256))

  x = res50_conv(x, s=2, filters=(128, 512))
  x = res50_identity(x, filters=(128, 512))
  x = res50_identity(x, filters=(128, 512))
  x = res50_identity(x, filters=(128, 512))

  x = res50_conv(x, s=2, filters=(256, 1024))
  x = res50_identity(x, filters=(256, 1024))
  x = res50_identity(x, filters=(256, 1024))
  x = res50_identity(x, filters=(256, 1024))
  x = res50_identity(x, filters=(256, 1024))
  x = res50_identity(x, filters=(256, 1024))

  x = res50_conv(x, s=2, filters=(512, 2048))
  x = res50_identity(x, filters=(512, 2048))
  x = res50_identity(x, filters=(512, 2048))

  x = AveragePooling2D((2, 2), padding='same')(x)

  x = Flatten()(x)
  x = Dense(512, activation = 'sigmoid')(x)
  x = Dropout(0.5)(x)
  x = Dense(512, activation = 'sigmoid')(x)
  x = Dropout(0.5)(x)
  x = Dense(512, activation = 'sigmoid')(x)
  x = Dropout(0.5)(x)
  x = Dense(3, activation='sigmoid', kernel_initializer='he_normal')(x)

  model = Model(inputs=input_im, outputs=x, name='Resnet50')
  return model

In [None]:

model = simple_cnn()
model.summary()
model.compile(optimizer = Adam(learning_rate = 1e-4), loss = 'mse', metrics = ['mean_absolute_error'])

In [None]:
class PlotLosses(Callback):
    def on_train_begin(self, logs = {}):
        self.x = []
        self.losses = []
        self.val_losses = []
        self.fig = plt.figure()
        self.logs = []

    def on_epoch_end(self, epoch, logs = {}):
        self.logs.append(logs)
        self.x.append(epoch)
        self.losses.append(logs.get('mean_absolute_error'))
        self.val_losses.append(logs.get('val_mean_absolute_error'))

        plt.clf()
        plt.plot(self.x, self.losses, label = 'mean_absolute_error')
        plt.plot(self.x, self.val_losses, label = 'val_mean_absolute_error')
        plt.legend()
        plt.pause(0.01)


checkpoint = ModelCheckpoint(MODEL_NAME, verbose = 1, monitor = 'val_mean_absolute_error', save_best_only = True, mode = 'min')
plot_losses = PlotLosses()

model.fit(
    train_generator,
    steps_per_epoch = len(train_generator),
    epochs = 80,
    validation_data = validation_generator,
    validation_steps = len(validation_generator),
    callbacks = [checkpoint, plot_losses])

In [None]:
test_generator.reset()
model = load_model('model_best_simple_cnn.h5')
score = model.evaluate(
    test_generator,
    steps = len(test_generator))
print('score (mse, mae):\n',score)

test_generator.reset()
model = load_model('model_best_simple_cnn.h5')
predict = model.predict(
    test_generator,
    steps = len(test_generator),
    workers = 1,
    use_multiprocessing = False)

sum_e_meat = 0
sum_e_veggie = 0
sum_e_noodle = 0
mx_ae_meat = 0
mx_ae_veggie = 0
mx_ae_noodle = 0
test_generator.reset()
for (i, t) in enumerate(test_generator.labels) :
	t_norm_meat, t_norm_veggie, t_norm_noodle = t
	t_meat, t_veggie, t_noodle = (t_norm_meat * MAX_MEAT, t_norm_veggie * MAX_VEGGIE, t_norm_noodle * MAX_NOODLE)
	p_meat, p_veggie, p_noodle = (predict[i][0] * MAX_MEAT, predict[i][1] * MAX_VEGGIE, predict[i][2] * MAX_NOODLE)
	ae_meat, ae_veggie, ae_noodle = (abs(t_meat - p_meat), abs(t_veggie - p_veggie), abs(t_noodle - p_noodle))
	sum_e_meat += ae_meat
	sum_e_veggie += ae_veggie
	sum_e_noodle += ae_noodle
	mx_ae_meat, mx_ae_veggie, mx_ae_noodle = (max(mx_ae_meat, ae_meat), max(mx_ae_veggie, ae_veggie), max(mx_ae_noodle, ae_noodle))
mae_meat = sum_e_meat / test_generator.samples
mae_veggie = sum_e_veggie / test_generator.samples
mae_noodle = sum_e_noodle / test_generator.samples
print("mae_meat : ", mae_meat)
print("mae_veggie : ", mae_veggie)
print("mae_noodle : ", mae_noodle)
print("mx_ae_meat : ", mx_ae_meat)
print("mx_ae_veggie : ", mx_ae_veggie)
print("mx_ae_noodle : ", mx_ae_noodle)