In [0]:
# TensorFlow and tf.keras
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Dropout, Flatten, Dense

# Commonly used modules
import numpy as np
import os
import sys

# Images, plots, display, and visualization
import matplotlib.pyplot as plt
import pandas as pd
import cv2
import IPython
from six.moves import urllib

In [0]:
from google.colab import drive
drive.mount('/gdrive')

In [0]:
np.random.seed(43) # to make the results reproductible
tf.set_random_seed(42) # to make the results reproductible 

# 1 Importing images and setting up train test data

## 1.1 Importing images and creating training, validation and test datasets

In [0]:
#Hyperparameter definition

class Flags:
  
  def __init__(self):
    self.batch_size = 10
    self.epochs = 20
    self.image_size = 28
    self.train_split = [0.75*0.75,0.25*0.75,0.25]
    self.fixed_img_number = 700
    self.loss_function = 'Adam'

flags = Flags()

In [0]:
data_dir = '/gdrive/My Drive/DL project_2019/Raw_Dataset'

# Get the filenames and label of our data
image_filenames = []
image_labels = []
species = ['Bees','Mosquitoes', 'Flies', 'Butterflies']
for label, category in enumerate(species):
    image_names = os.listdir(os.path.join(data_dir, category))
    image_names = sorted(image_names) # to make the results reproductibles
    image_names = [x for x in image_names if os.stat(os.path.join(data_dir, category, x)).st_size != 0]
    
    image_names = image_names[:flags.fixed_img_number]
    image_filenames += [os.path.join(
        data_dir, category, image_name) for image_name in image_names]
    
    image_labels += [label] * len(image_names)
    
# Split data in three for training, validation and test
train_image_filenames, train_image_labels = [], []
valid_image_filenames, valid_image_labels = [], []
test_image_filenames, test_image_labels  = [], []

for image_filename, image_label in zip(image_filenames, image_labels):

    x = np.random.choice(['train', 'valid', 'test'], p=flags.train_split)

    if x == 'train':
        train_image_filenames.append(image_filename)
        train_image_labels.append(image_label)
    if x == 'valid':
        valid_image_filenames.append(image_filename)
        valid_image_labels.append(image_label)
    if x == 'test':
        test_image_filenames.append(image_filename)
        test_image_labels.append(image_label)

## 1.2 Defining Iterators for batch training

In [0]:
#Iterator builder 
def make_iterator(filenames, labels, batch_size, shuffle_and_repeat=False):
    """function that creates a `tf.data.Iterator` object"""
    dataset = tf.data.Dataset.from_tensor_slices((filenames, labels))
    if shuffle_and_repeat:
        dataset = dataset.apply(
            tf.data.experimental.shuffle_and_repeat(buffer_size=1000))

    def parse(filename, label):
        """function that reads the image and normalizes it"""
        try:
          image = tf.read_file(filename)
          image = tf.image.decode_jpeg(image, channels = 3)
          image = tf.cast(image, tf.float32)
          image = tf.image.resize(image, (flags.image_size,flags.image_size))
          image = image / 256
          return {'image': image, 'label': label}

    dataset = dataset.apply(tf.data.experimental.map_and_batch(
        map_func=parse, batch_size=batch_size, num_parallel_batches=8))

    if shuffle_and_repeat:
        return dataset.make_one_shot_iterator()
    else:
        return dataset.make_initializable_iterator()

#Building iterators
train_iterator = make_iterator(train_image_filenames, train_image_labels,
    batch_size=flags.batch_size, shuffle_and_repeat=True)
val_iterator = make_iterator(valid_image_filenames, valid_image_labels,
    batch_size=flags.batch_size, shuffle_and_repeat=True)
test_iterator = make_iterator(test_image_filenames, test_image_labels,
    batch_size=len(test_image_filenames), shuffle_and_repeat=True)

# 2 Constructing CNN 

## 2.1 Model Architecture

In [0]:
def model_builder():
  model = keras.Sequential()
  model.add(Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(flags.image_size, flags.image_size, 3)))
  model.add(MaxPooling2D(pool_size=(2, 2), strides = 2, padding = 'valid'))
  model.add(Conv2D(64, (3, 3), activation='relu'))
  model.add(MaxPooling2D(pool_size=(2, 2), strides = 2, padding = 'valid'))
  model.add(Dropout(0.25))
  model.add(Flatten())
  model.add(Dense(128, activation='relu'))
  model.add(Dropout(0.25))
  model.add(Dense(len(species), activation='softmax'))
  return model

model = model_builder()

In [0]:
model.compile(optimizer=tf.train.AdamOptimizer(), 
              loss= 'sparse_categorical_crossentropy',
              metrics=['accuracy']
             )

# 3 Training model

In [0]:
from operator import itemgetter


features = train_iterator.get_next()
images, labels = itemgetter('image', 'label')(features)

val_features = val_iterator.get_next()
val_images, val_labels = itemgetter('image', 'label')(val_features)


In [0]:
history = model.fit(images
                    , labels
                    , epochs=flags.epochs
                    , validation_data = (val_images, val_labels)
                    , steps_per_epoch= len(train_image_labels) // flags.batch_size
                    , validation_steps = len(valid_image_labels) // flags.batch_size
                   )


In [0]:

features = test_iterator.get_next()
test_images, test_labels = itemgetter('image', 'label')(features)


print(test_images.shape)
test_loss, test_acc = model.evaluate(test_images, test_labels, steps = 1)

print('Test accuracy:', test_acc)