In [1]:
import os
from math import pi
import numpy as np
import pandas as pd
import skimage.io as skimg
import skimage.transform as transform
import tensorflow
from bokeh.io import output_file, show
from bokeh.layouts import gridplot
from bokeh.models import ColumnDataSource, Range1d, Plot, Title
from bokeh.models.glyphs import ImageURL
from bokeh.palettes import Spectral6
from bokeh.plotting import figure
from bokeh.transform import cumsum
from keras.callbacks import ModelCheckpoint, EarlyStopping
from keras.layers import Dense, Conv2D, Flatten, MaxPool2D, Dropout
from keras.models import Sequential
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn import metrics
from sklearn.model_selection import train_test_split as split

# Settings
np.random.seed(40)
tensorflow.random.set_seed(40)


In [2]:
# Output File for Bokeh Plots
output_file('honey_bee.html', title='Health of your Bees')

# Plot for loss
plot_loss = figure(title='Loss during Training & Validation')

# Plot for accuracy
plot_acc = figure(title='Accuracy during Training & Validation')

# Global variables
img_dir = './bee_imgs/'
width_img = 100
height_img = 100
channels = 3
print_flag = 1
test_size = 0.1
batch_size = 50
epoch_no = 50


In [3]:
# Load bee data
bees = pd.read_csv('./bee_data.csv',
                   index_col=False,
                   parse_dates={'datetime': [1, 2]},
                   dtype={'subspecies': 'category', 'health': 'category', 'caste': 'category'})


  bees = pd.read_csv('./bee_data.csv',
  bees = pd.read_csv('./bee_data.csv',


In [4]:
def get_image_sizes(file):
    image = skimg.imread(img_dir + file)
    return list(image.shape)

def get_images(file):
    image = skimg.imread(img_dir + file)
    image = transform.resize(image, (width_img, height_img), mode='reflect')
    return image[:, :, :channels]


In [5]:
def plot_metric(plot, metric):
    plot.line(list(range(1, epoch_no + 1)), hist_metrics[metric], line_color='green', line_width=1,
              legend_label="Training")
    plot.circle(list(range(1, epoch_no + 1)), hist_metrics[metric], fill_color="green", size=8, legend_label="Training")
    plot.line(list(range(1, epoch_no + 1)), hist_metrics['val_' + metric], line_color='red', line_width=1,
              line_dash='dotted', legend_label="Validation")
    plot.circle(list(range(1, epoch_no + 1)), hist_metrics['val_' + metric], fill_color="red", size=8,
                legend_label="Validation")
    plot.legend.location = "top_left"
    plot.legend.click_policy = "hide"
    return plot


In [6]:
def preprocess(data):
    data.dropna(inplace=True)
    image_plot = plot_imagesize(data['file'])
    bees_with_images = data['file'].apply(lambda file: os.path.exists(img_dir + file))
    return data[bees_with_images], image_plot

def balance(data, field):
    category_count = int(len(data) / data[field].cat.categories.size)
    return data.groupby(field, as_index=False).apply(
        lambda index: index.sample(category_count, replace=True)).reset_index(drop=True)


In [7]:
def test_accuracy(model, test_X, test_y):
    predicted = model.predict(test_X)
    test_predicted = np.argmax(predicted, axis=1)
    test_truth = np.argmax(test_y.values, axis=1)
    print(metrics.classification_report(test_truth, test_predicted, target_names=test_y.columns))
    test_res = model.evaluate(test_X, test_y.values, verbose=print_flag)
    print('Loss function: %s, accuracy:' % test_res[0], test_res[1])
    category_acc = np.logical_and((predicted > 0.5), test_y).sum() / test_y.sum()
    health = list(category_acc.keys())
    counts = list(category_acc)
    plot = figure(x_range=health, y_range=(0, 1), title='Accuracy for each health', toolbar_location=None, tools="")
    source = ColumnDataSource(data=dict(health=health, counts=counts, color=Spectral6))
    plot.vbar(x='health', top='counts', width=0.9, source=source, legend_label="health", color='color')
    plot.xaxis.visible = False
    plot.legend.orientation = "vertical"
    plot.legend.location = "top_center"
    return plot


In [9]:
def plot_imagesize(images):
    image_sizes = np.stack(images.apply(get_image_sizes))
    df = pd.DataFrame(image_sizes, columns=['w', 'h', 'c'])
    radii = np.random.random(size=df.shape[0]) * 2.5
    plot = figure(title="Height vs. Width of images", x_range=[0, 300], y_range=[0, 300])
    colors = [
        "#%02x%02x%02x" % (int(r), int(g), 150) for r, g in zip(50 + 2 * df['w'], 30 + 2 * df['h'])
    ]
    plot.scatter(df['w'], df['h'], radius=radii,
                 fill_color=colors, fill_alpha=0.6,
                 line_color=None)
    plot.xaxis.axis_label = 'Width'
    plot.yaxis.axis_label = 'Height'
    return plot


In [10]:
# Splitting and augmentation
bees, image_plot = preprocess(bees)
train_bees, test_bees = split(bees, test_size=test_size)
train_bees, validate_bees = split(train_bees, test_size=test_size)
train_bees = balance(train_bees, 'health')

train_X = np.stack(train_bees['file'].apply(get_images))
validate_X = np.stack(validate_bees['file'].apply(get_images))
test_X = np.stack(test_bees['file'].apply(get_images))

train_y = pd.get_dummies(train_bees['health'])
validate_y = pd.get_dummies(validate_bees['health'])
test_y = pd.get_dummies(test_bees['health'])

# Image data generator for augmentation
imgDG = ImageDataGenerator(
    rotation_range=180,
    zoom_range=0.1,
    width_shift_range=0.2,
    height_shift_range=0.2,
    horizontal_flip=True,
    vertical_flip=True)
imgDG.fit(train_X)


  return data.groupby(field, as_index=False).apply(
  return data.groupby(field, as_index=False).apply(


In [11]:
# Callbacks
early_stop = EarlyStopping(monitor='val_accuracy', patience=20, verbose=print_flag)
save_best = ModelCheckpoint('healthy_model'
                            , monitor='val_accuracy'
                            , verbose=print_flag
                            , save_best_only=True
                            , save_weights_only=True)

# Model definition
model = Sequential()
model.add(Conv2D(16, kernel_size=3, input_shape=(width_img, height_img, 3), activation='relu', padding='same'))
model.add(MaxPool2D(2))
model.add(Dropout(0.2))
model.add(Conv2D(16, kernel_size=3, activation='relu', padding='same'))
model.add(Dropout(0.2))
model.add(Flatten())
model.add(Dense(train_y.columns.size, activation='softmax'))
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Train model
steps = np.round(train_X.shape[0] / batch_size, 0)
history = model.fit(imgDG.flow(train_X, train_y, batch_size=batch_size)
                              , epochs=epoch_no
                              , validation_data=[validate_X, validate_y]
                              , steps_per_epoch=steps
                              , callbacks=[early_stop, save_best])
hist_metrics = history.history


Epoch 1/50
Epoch 1: val_accuracy improved from -inf to 0.59871, saving model to healthy_model
Epoch 2/50
Epoch 2: val_accuracy improved from 0.59871 to 0.72747, saving model to healthy_model
Epoch 3/50
Epoch 3: val_accuracy improved from 0.72747 to 0.76395, saving model to healthy_model
Epoch 4/50
Epoch 4: val_accuracy improved from 0.76395 to 0.78541, saving model to healthy_model
Epoch 5/50
Epoch 5: val_accuracy improved from 0.78541 to 0.80258, saving model to healthy_model
Epoch 6/50
Epoch 6: val_accuracy did not improve from 0.80258
Epoch 7/50
Epoch 7: val_accuracy did not improve from 0.80258
Epoch 8/50
Epoch 8: val_accuracy improved from 0.80258 to 0.85408, saving model to healthy_model
Epoch 9/50
Epoch 9: val_accuracy improved from 0.85408 to 0.85837, saving model to healthy_model
Epoch 10/50
Epoch 10: val_accuracy did not improve from 0.85837
Epoch 11/50
Epoch 11: val_accuracy improved from 0.85837 to 0.86695, saving model to healthy_model
Epoch 12/50
Epoch 12: val_accuracy di

In [12]:
# Plot results
plot_loss = plot_metric(plot_loss, 'loss')
plot_acc = plot_metric(plot_acc, 'accuracy')

# Load best weights
model.load_weights('healthy_model')

# Test accuracy and save model
results = model.evaluate(test_X, test_y.values, verbose=print_flag)
print('Model was created with Loss of ' + str(results[0]) + ' with Accuracy: ' + str(results[1]))
accuracy_plot = test_accuracy(model, test_X, test_y)

# Save model
model.save("new__model.h5")




Model was created with Loss of 0.171846404671669 with Accuracy: 0.9208494424819946
                            precision    recall  f1-score   support

Varroa, Small Hive Beetles       0.72      0.87      0.79        54
              ant problems       0.96      1.00      0.98        43
  few varrao, hive beetles       0.78      0.61      0.69        59
                   healthy       0.97      0.98      0.97       331
         hive being robbed       0.93      0.89      0.91        28
             missing queen       1.00      1.00      1.00         3

                  accuracy                           0.92       518
                 macro avg       0.89      0.89      0.89       518
              weighted avg       0.92      0.92      0.92       518

Loss function: 0.171846404671669, accuracy: 0.9208494424819946


In [2]:
pip install gradio


Note: you may need to restart the kernel to use updated packages.


In [None]:
import gradio as gr
import numpy as np
from tensorflow.keras.models import load_model
import skimage.io as skimg
import skimage.transform as transform

# Load the trained model
model = load_model("new__model.h5")

# Define image preprocessing for the model input
def preprocess_image(image):
    image = transform.resize(image, (width_img, height_img), mode='reflect')
    return image[:, :, :channels].reshape(1, width_img, height_img, channels)

# Define the prediction function
def classify_bee_health(image):
    # Preprocess the image
    processed_image = preprocess_image(image)
    
    # Predict the class probabilities
    prediction = model.predict(processed_image)
    
    # Get the predicted class label
    class_index = np.argmax(prediction, axis=1)[0]
    class_labels = train_y.columns  # Assuming train_y columns contain class names
    predicted_class = class_labels[class_index]
    
    return f"The bee's health status is: {predicted_class}"

# Create Gradio Interface
interface = gr.Interface(
    fn=classify_bee_health,
    inputs=gr.Image(type="numpy"),
    outputs="text",
    title="Bee Health Classification",
    description="Upload an image of a bee to determine its health status."
)

# Launch the interface
interface.launch()


Running on local URL:  http://127.0.0.1:7861

To create a public link, set `share=True` in `launch()`.




--------
Traceback (most recent call last):
  File "c:\Users\ishan\anaconda3\envs\Tensorflow\lib\site-packages\gradio\queueing.py", line 536, in process_events
    response = await route_utils.call_process_api(
  File "c:\Users\ishan\anaconda3\envs\Tensorflow\lib\site-packages\gradio\route_utils.py", line 322, in call_process_api
    output = await app.get_blocks().process_api(
  File "c:\Users\ishan\anaconda3\envs\Tensorflow\lib\site-packages\gradio\blocks.py", line 1935, in process_api
    result = await self.call_function(
  File "c:\Users\ishan\anaconda3\envs\Tensorflow\lib\site-packages\gradio\blocks.py", line 1520, in call_function
    prediction = await anyio.to_thread.run_sync(  # type: ignore
  File "c:\Users\ishan\anaconda3\envs\Tensorflow\lib\site-packages\anyio\to_thread.py", line 56, in run_sync
    return await get_async_backend().run_sync_in_worker_thread(
  File "c:\Users\ishan\anaconda3\envs\Tensorflow\lib\site-packages\anyio\_backends\_asyncio.py", line 2134, in run_s