Can a machine identify a bee as a honey bee or a bumble bee? These bees have different behaviors and appearances, but given the variety of backgrounds, positions, and image resolutions, it can be a challenge for machines to tell them apart.

Being able to identify bee species from images is a task that ultimately would allow researchers to more quickly and effectively collect field data. Pollinating bees have critical roles in both ecology and agriculture, and diseases like colony collapse disorder threaten these species. Identifying different species of bees in the wild means that we can better understand the prevalence and growth of these important insects

In [None]:
import pickle
from pathlib import Path
from skimage import io
from PIL import Image

import pandas as pd
import numpy as np

import matplotlib.pyplot as plt
%matplotlib inline

from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

# import keras library
import keras
from tensorflow import keras

# import Sequential from the keras models module
from keras.models import Sequential
from keras import optimizers
from keras.layers import BatchNormalization

# import Dense, Dropout, Flatten, Conv2D, MaxPooling2D from the keras layers module
from keras.layers import Dense, Dropout, Flatten, Conv2D, MaxPooling2D

In [None]:
# load labels.csv from datasets folder using pandas

labels = pd.read_csv('\labels.csv', index_col=None)
labels = labels[labels['photo_quality'] == 1]

# to keep only columns 'path', 'is_bee', 'label'
labels = labels[['path', 'is_bee', 'label']]

labels = labels[(labels['label'] == 'bee') | (labels['label'] == 'wasp')]
labels.is_bee.value_counts() #0 - wasp, 1 - bee
y = labels.is_bee

# concate bees and wasps in 1 dataframe, include 1000 image of each genus
def concat(labels):
    labels_bee = labels[labels['label'] == 'bee'][:1000]
    labels_wasp = labels[labels['label'] == 'wasp'][:1000]
    labels_concat = pd.concat([labels_bee, labels_wasp], axis=0)
    return labels_concat

labels_concat = concat(labels)

#Now we will import all images. Once imported, we will stack the resulting arrays into a single matrix and assign it to X.
image_paths = list(labels_concat.path)
image_new_paths = []

#pipeline for resizing images
def process_image(path):
    #Load the image with Image.open and create paths to save our images to
    img = Image.open('D:\\Python\\bee_or_wasp\\kaggle_bee_vs_wasp\\{}'.format(path))
    
     # create paths to save files to
    rcz_path = "D:\\Python\\bee_or_wasp\\kaggle_bee_vs_wasp\\saved_images\\rsz\\rsz_{}.jpg".format(path.stem)
    
    #resize images and safe them to created path
    rcz = img.resize((50, 50))
    rcz.save(rcz_path)
    
    image_new_paths.append(rcz_path)
    return image_new_paths
    
# for loop over image paths
for img_path in image_paths:
    process_image(Path(img_path))
    
labels_concat['rsz_path'] = image_new_paths

# create empty list
image_list = []

for i in labels_concat.rsz_path:
    # load image
    img = io.imread(i).astype(np.float64)  
    # append to list of all images
    image_list.append(img)
    
# convert image list to single array
X = np.array(image_list)
y = labels_concat.is_bee

print(X.shape)
print(y.shape)

In [None]:
# split out evaluation sets (x_eval and y_eval)
x_interim, x_eval, y_interim, y_eval = train_test_split(X,
                                           y,
                                           test_size=0.2,
                                           random_state=52)

# split remaining data into train and test sets
x_train, x_test, y_train, y_test = train_test_split(x_interim,
                                                   y_interim, 
                                                   test_size=0.4,
                                                   random_state=52)

# initialize standard scaler
ss = StandardScaler()

def scale_features(train_features, test_features):
    for image in train_features:
        # for each channel, apply standard scaler's fit_transform method
        for channel in range(image.shape[2]):
            image[:, :, channel] = ss.fit_transform(image[:, :, channel])
    for image in test_features:
        # for each channel, apply standard scaler's transform method
        for channel in range(image.shape[2]):
            image[:, :, channel] = ss.fit_transform(image[:, :, channel])

# apply scale_features to four sets of features
scale_features(x_interim, x_eval)
scale_features(x_train, x_test)

# set model constants
num_classes = 1

# define model as Sequential
model = Sequential()

# first convolutional layer with 32 filters
model.add(Conv2D(128, kernel_size=(3, 3), activation='relu', input_shape=(50, 50, 3)))
model.add(BatchNormalization())
# add a second 2D convolutional layer with 64 filters
model.add(Conv2D(64, kernel_size=(3, 3), activation='relu'))
model.add(BatchNormalization())
# reduce dimensionality through max pooling
model.add(MaxPooling2D(pool_size=(2,2)))

# third convolutional layer with 64 filters
model.add(Conv2D(32, kernel_size=(3, 3), activation='relu'))
model.add(BatchNormalization())
# add dropout to prevent over fitting
model.add(Dropout(0.25))
# necessary flatten step preceeding dense layer
model.add(Flatten())
# fully connected layer
model.add(Dense(64, activation='relu'))

# add additional dropout to prevent overfitting
model.add(Dropout(0.50))

# prediction layers
model.add(Dense(num_classes, activation='sigmoid', name='preds'))

# show model summary
model.summary()

model.compile(
    # set the loss as binary_crossentropy
    loss=keras.losses.binary_crossentropy,
    # set the optimizer as stochastic gradient descent
    optimizer=keras.optimizers.SGD(learning_rate=0.001),
    # set the metric as accuracy
    metrics=['accuracy']
)

# mock-train the model using the first ten observations of the train and test sets
my_model = model.fit(
    x_train,
    y_train,
    epochs=100,
    verbose=1,
    validation_data=(x_test, y_test),
)


In [None]:
# Extract the history from the training object
history = my_model.history

# Plot the training loss 
plt.plot(history['loss'], label='train_loss', color = 'blue')
# Plot the validation loss
plt.plot(history['val_loss'], label='validation_loss', color = 'green')

# Show the figure
plt.show()

In [None]:
# Plot the training accuracy
plt.plot(history['accuracy'], label='accuracy', color = 'blue')
# Plot the validation accyracy
plt.plot(history['val_accuracy'], label='validation_accuracy', color = 'green')

# Show the figure
plt.show()

In [None]:
# load pre-trained model
pretrained_cnn = model

# evaluate model on test set
score = pretrained_cnn.evaluate(x_test, y_test, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])

print("")

# evaluate model on holdout set
eval_score = pretrained_cnn.evaluate(x_eval, y_eval, verbose=0)
# print loss score
print('Eval loss:', eval_score[0])
# print accuracy score
print('Eval accuracy:', eval_score[1])

In [None]:
# predicted probabilities for x_eval
y_proba = pretrained_cnn.predict(x_eval)

print("First five probabilities:")
print(y_proba[:5])
print("")

# predicted classes for x_eval
y_pred = np.round(y_proba).astype('int')

print("First five class predictions:")
print(y_pred[:5])
print("")

#[f(x) if condition else g(x) for x in sequence]

[print('bumble bee') if i == 1 else print('honey bee') for i in y_pred]