In [None]:
# get data from this link: https://www.kaggle.com/jerzydziewierz/bee-vs-wasp

In [None]:
#### import libraries ####
import os, shutil
import random

from keras import layers
from keras import models
from keras import optimizers
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras import Model

from tensorflow.keras.initializers import glorot_uniform
from tensorflow.keras.regularizers import l2
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import Xception

In [None]:
# some parameters
flag_create_dataset = False
SHAPE = 224
BATCH_SIZE = 64
EPOCHS = 20

In [None]:
#### dataset creation (compatible with tensorflow) ####

# this is the path where I have put folders from Kaggle project
cwd_dir = "./drive/MyDrive/Python Code/kaggle_bee_vs_wasp"
# os.chdir(cwd_dir)

train_dir = os.path.join(cwd_dir, 'data', 'train')
val_dir = os.path.join(cwd_dir, 'data', 'validation')
test_dir = os.path.join(cwd_dir, 'data', 'test')

# if True, create dataset
if flag_create_dataset == True:
  # create directories
  os.mkdir(train_dir)
  os.mkdir(val_dir)
  os.mkdir(test_dir)

  # create a directory for each class (bee-wasp-insect-other)
  directories = [train_dir, val_dir, test_dir]
  for dir in directories:
    os.mkdir(os.path.join(dir, 'bee'))
    os.mkdir(os.path.join(dir, 'wasp'))
    os.mkdir(os.path.join(dir, 'insect'))
    os.mkdir(os.path.join(dir, 'other'))

  dict_folders = {'bee' : 'bee1', 'wasp' : 'wasp1', 'insect' : 'other_insect', 'other' : 'other_noinsect'}

  random.seed(1234)
  for i in range(0, len(dict_folders)):
    src_folder_chosen = list(dict_folders.values())[i]
    src_path_chosen = os.path.join(cwd_dir, src_folder_chosen)
    dst_folder_chosen = list(dict_folders.keys())[i]
    # dst_path_chosen = os.path.join(dst_folder_chosen)

    file_names = [f for f in os.listdir(src_path_chosen) if os.path.isfile(os.path.join(src_path_chosen, f))]
    for file_name in file_names:
      src = os.path.join(src_path_chosen, file_name)
      n_random = random.uniform(0, 1)
      if n_random < 0.65:
        dst = os.path.join(train_dir, dst_folder_chosen, file_name)
      elif n_random < 0.85:
        dst = os.path.join(val_dir, dst_folder_chosen, file_name)
      elif n_random <= 1:
        dst = os.path.join(test_dir, dst_folder_chosen, file_name)
      shutil.copyfile(src, dst)


In [None]:
#### create generators for model training ####
train_datagen = ImageDataGenerator(
    rescale = 1./255,
    rotation_range = 30,
    width_shift_range = 0.2,
    height_shift_range = 0.2,
    horizontal_flip = True
)
valid_datagen = ImageDataGenerator(
    rescale = 1./255,
    rotation_range = 0,
    width_shift_range = 0.0,
    height_shift_range = 0.0,
    horizontal_flip = False
)
train_generator = train_datagen.flow_from_directory(
    os.path.join(cwd_dir, 'data', 'train/'),
    target_size = (SHAPE, SHAPE),
    shuffle = True,
    batch_size = BATCH_SIZE,
    class_mode = 'categorical',
)
valid_generator = valid_datagen.flow_from_directory(
    os.path.join(cwd_dir, 'data', 'validation/'),
    target_size = (SHAPE, SHAPE),
    shuffle = True,
    batch_size = BATCH_SIZE,
    class_mode = 'categorical',
)


In [None]:
#### a simpre model from scratch ####
# model = models.Sequential()
# model.add(layers.Conv2D(32, (3, 3), activation = 'relu', input_shape = (SHAPE, SHAPE, 3)))
# model.add(layers.MaxPooling2D((2, 2)))
# model.add(layers.Conv2D(64, (3, 3), activation = 'relu'))
# model.add(layers.MaxPooling2D((2, 2)))
# model.add(layers.Conv2D(128, (3, 3), activation = 'relu'))
# model.add(layers.MaxPooling2D((2, 2)))
# model.add(layers.Conv2D(128, (3, 3), activation = 'relu'))
# model.add(layers.MaxPooling2D((2, 2)))
# model.add(layers.Flatten())
# model.add(layers.Dense(512, activation = 'relu'))
# model.add(layers.Dense(4, activation = 'softmax'))

# model.compile(loss = 'categorical_crossentropy', optimizer = optimizers.RMSprop(lr = 0.00001), metrics = ['acc'])

# earlystop = EarlyStopping(monitor = 'val_loss', patience = 4, verbose = 1)

# history = model.fit_generator(
#     train_generator,
#     epochs = 30,
#     callbacks = earlystop,
#     validation_data = valid_generator
# )
# # Save our model for inference
# model.save("primo_modello.h5")

In [None]:
#### transfer learning ####
from keras.applications.mobilenet_v2 import MobileNetV2
model = MobileNetV2(weights = 'imagenet', include_top = False, input_shape = (SHAPE, SHAPE, 3))

# do not train initial layers 
for layer in model.layers:
  layer.trainable = False

x = model.output
x = layers.AveragePooling2D(pool_size = (2, 2))(x)
x = layers.Dense(32, activation = 'relu')(x)
x = layers.Flatten()(x)
x = layers.Dropout(0.1)(x)
x = layers.Dense(128)(x)
x = layers.Dense(4, activation = 'softmax', kernel_regularizer = l2(.0005))(x)
model = Model(inputs = model.inputs, outputs = x)
opt = optimizers.SGD(lr = 0.0001, momentum = .9)
model.compile(loss = 'categorical_crossentropy', optimizer = opt, metrics = ['accuracy'])

# callbacks
earlystop = EarlyStopping(monitor = 'val_loss', patience = 4, verbose = 1)
checkpoint = ModelCheckpoint(
    os.path.join(cwd_dir, "model-weights/xception_checkpoint.h5"),
    monitor = "val_loss",
    mode = "min",
    save_best_only = True,
    save_weights_only = True,
    verbose = 1
)

# training
history = model.fit_generator(
    train_generator,
    epochs = EPOCHS,
    callbacks = [earlystop, checkpoint],
    validation_data = valid_generator
)
# Save our model for inference
model.save(os.path.join(cwd_dir, f"model-weights/mobilenetv2_epochs_{EPOCHS}_shape_{SHAPE}.h5"))



In [None]:
# def create_model(shape):   
    
#     from keras import layers
#     # from keras import models
#     from keras import optimizers
#     # from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
#     from tensorflow.keras import Model
    
#     # from tensorflow.keras.initializers import glorot_uniform
#     from tensorflow.keras.regularizers import l2
#     # from tensorflow.keras.preprocessing.image import ImageDataGenerator
#     from tensorflow.keras.applications import Xception
#     model = Xception(input_shape = (shape, shape, 3), include_top = False, weights = 'imagenet')

#     x = model.output
#     x = layers.AveragePooling2D(pool_size = (2, 2))(x)
#     x = layers.Dense(32, activation = 'relu')(x)
#     x = layers.Flatten()(x)
#     # spostato dopo Flatten
#     x = layers.Dropout(0.1)(x)
#     # a caso
#     x = layers.Dense(128)(x)
#     x = layers.Dense(4, activation = 'softmax', kernel_regularizer = l2(.0005))(x)
#     model = Model(inputs = model.inputs, outputs = x)
#     opt = optimizers.SGD(lr = 0.0001, momentum = .9)
#     model.compile(loss = 'categorical_crossentropy', optimizer = opt, metrics = ['accuracy'])
#     return model

In [None]:
# model2 = create_model(shape = 300)

In [None]:
# weights_path = "model-weights/xception_checkpoint.h5"
# model2.load_weights(weights_path)
