In [None]:
!git clone https://github.com/gabrieldgf4/PlantVillage-Dataset.git data/PlantVillage

In [None]:
import numpy as np
import pandas as pd
from tensorflow.keras.models import Model
from tensorflow.keras import Input
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.python.framework.ops import Tensor
from tensorflow.keras.layers.experimental.preprocessing import RandomFlip, RandomRotation, RandomZoom
from tqdm import tqdm
import tensorflow as tf
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt

In [None]:
EPOCHS = 25
SEED=42
INIT_LR = 1e-3
BS = 32
default_image_size = tuple((256, 256))
image_size = 0
train_path = 'train/'
valid_path = 'val/'
directory_root = 'data/PlantVillage/'
width=256
height=256
depth=3

In [None]:

train_dataset = tf.keras.utils.image_dataset_from_directory(directory_root,
                                             shuffle=True,
                                             batch_size=BS,
                                             image_size=default_image_size,
                                             validation_split=0.2,
                                             subset='training',
                                             seed=SEED)
validation_dataset =tf.keras.utils.image_dataset_from_directory(directory_root,
                                             shuffle=True,
                                             batch_size=BS,
                                             image_size=default_image_size,
                                             validation_split=0.2,
                                             subset='validation',
                                             seed=SEED)

In [None]:
 
IMG_SIZE = (256, 256)
BATCH_SIZE = 64
data_generator = tf.keras.preprocessing.image.ImageDataGenerator(
    
    zoom_range = 0.2, 
    horizontal_flip = True,
    vertical_flip = True,
    rotation_range = 180,
    validation_split = 0.3) 

train_generator = data_generator.flow_from_directory(
    
                                            directory_root,
                                            class_mode = 'categorical',
                                             shuffle=True,
                                             batch_size=64,
                                             target_size =IMG_SIZE,
                                             
                                             subset='training',
                                             seed=42)

validation_generator = data_generator.flow_from_directory( 
directory_root,
                                            class_mode = 'categorical',
                                             shuffle=True,
                                             batch_size=64,
                                             target_size =IMG_SIZE,
                                             
                                             subset='validation',
                                             seed=42)

In [None]:

train_generator.reset()
x_train, y_train = next(train_generator)
for i in tqdm(range(int(len(train_generator)/16-1))): #1st batch is already fetched before the for loop.
  img, label = next(train_generator)
  x_train = np.append(x_train, img, axis=0 )
  y_train = np.append(y_train, label, axis=0)
print(x_train.shape, y_train.shape)

In [None]:

validation_generator.reset()
x_test, y_test = next(validation_generator)
for i in tqdm(range(int(len(validation_generator)/8)-1)): #1st batch is already fetched before the for loop.
  img, label = next(validation_generator)
  x_test = np.append(x_test, img, axis=0 )
  y_test = np.append(y_test, label, axis=0)
print(x_test.shape, y_test.shape)

In [None]:

del(train_generator)
del(validation_generator)

In [None]:

Y_train=[]
for i in range(y_train.shape[0]):
  temp=y_train[i][0:]
  temp = np.argmax(temp)
  Y_train.append(temp)

Y_train=np.asarray(Y_train)
Y_train.shape

In [None]:

Y_test=[]
for i in range(y_test.shape[0]):
  temp=y_test[i][0:]
  temp = np.argmax(temp)
  Y_test.append(temp)

Y_test=np.asarray(Y_test)
Y_test.shape

In [None]:
def data_augmenter():

    data_augmentation = tf.keras.Sequential()
    data_augmentation.add(RandomFlip('horizontal'))
    data_augmentation.add(RandomRotation(0.2))
    data_augmentation.add(RandomZoom(0.2))

    return data_augmentation

In [None]:
input_shape = (256,256,3)
model_input = Input(shape=input_shape)

In [None]:
def plotter(history, model):
  acc = history.history['accuracy']
  val_acc = history.history['val_accuracy']
  loss = history.history['loss']
  val_loss = history.history['val_loss']
  epochs = range(1, len(acc) + 1)
  #Train and validation accuracy
  plt.plot(epochs, acc, 'b', label='Training accuracy')
  plt.plot(epochs, val_acc, 'r', label='Validation accuracy')
  plt.title('Training and Validation accuracy')
  plt.legend()

  plt.figure()
  #Train and validation loss
  plt.plot(epochs, loss, 'b', label='Training loss')
  plt.plot(epochs, val_loss, 'r', label='Validation loss')
  plt.title('Training and Validation loss')
  plt.legend()
  plt.show()
  print("[INFO] Calculating model accuracy")
  scores = model.evaluate(x_test, y_test)
  print(f"Test Accuracy: {scores[1]*100}")

In [None]:
def best_weights(preds1):
  import pandas as pd
  df = pd.DataFrame([])

  for w1 in range(0, 5):
      for w2 in range(0,5):
              wts = [w1/10.,w2/10.]
              wted_preds1 = np.tensordot(preds1, wts, axes=((0),(0)))
              wted_ensemble_pred = np.argmax(wted_preds1, axis=1)
              weighted_accuracy = accuracy_score(y_test, wted_ensemble_pred)
              df = df.append(pd.DataFrame({'wt1':wts[0],'wt2':wts[1], 'acc':weighted_accuracy*100}, index=[0]), ignore_index=True)
              
  max_acc_row = df[df['acc']==df['acc'].max()]
  print("Max accuracy of ", max_acc_row[0], " obained with w1=", max_acc_row[1],
        " w2=", max_acc_row[2])         

In [None]:
def resnet(model_input: Tensor, data_augmentation=data_augmenter()):
  base_model = tf.keras.applications.resnet50.ResNet50(input_shape=(256,256,3),
                                                   include_top=False, 
                                                   weights='imagenet',input_tensor=model_input) 

  data_augmentation = data_augmenter()
  x = data_augmentation(model_input)
  x = tf.keras.applications.resnet50.preprocess_input(x) 
  x = base_model(x, training=False)  
  x = tf.keras.layers.GlobalAveragePooling2D()(x)

  x = tf.keras.layers.Dense(1024, activation='relu')(x)
  x = tf.keras.layers.Dropout(0.5)(x)
  x = tf.keras.layers.Dense(512, activation='relu')(x)
  x = tf.keras.layers.Dropout(0.5)(x)

# Output as per the number of classes
# Under Development
  predictions = tf.keras.layers.Dense(40, activation='softmax')(x)
  model = tf.keras.Model(inputs=model_input, outputs=predictions)
  
  return model

In [None]:
def vgg():
  base_model = tf.keras.applications.VGG16(input_shape=(256,256,3),
                                                   include_top=False, 
                                                   weights='imagenet',input_tensor=model_input) 
  data_augmentation = data_augmenter()
  x = data_augmentation(model_input)
  x = tf.keras.applications.resnet50.preprocess_input(x) 
  x = base_model(x, training=False) 

  x = tf.keras.layers.GlobalAveragePooling2D()(x)
  x = tf.keras.layers.Dense(256, activation='relu')(x)
  x = tf.keras.layers.Dropout(0.5)(x)
  x = tf.keras.layers.Dense(128, activation='relu')(x)
  x = tf.keras.layers.Dropout(0.5)(x)
  predictions = tf.keras.layers.Dense(40, activation='softmax')(x)
  model = Model(inputs=base_model.input, outputs=predictions)
  
  return model

In [None]:
model1 = resnet(model_input, data_augmentation=data_augmenter())
model1.summary()
model1.compile(optimizer='adam', metrics=['accuracy'], loss='sparse_categorical_crossentropy')
checkpoint = ModelCheckpoint('modified_resnet_50.h5', verbose=1, save_best_only=True)

history1 = model1.fit(
    train_dataset,validation_data=validation_dataset,
    steps_per_epoch=len(x_train) // BS,
    epochs=EPOCHS, 
    callbacks=[checkpoint],
    verbose=True
    )

In [None]:
model2 = vgg().     
model2.summary()
model2.compile(optimizer='adam', metrics=['accuracy'], loss='sparse_categorical_crossentropy')
checkpoint = ModelCheckpoint('modified_vgg_19.h5', verbose=1, save_best_only=True)

history2 = model2.fit(
    train_dataset,validation_data=validation_dataset,
    steps_per_epoch=len(x_train) // BS,
    epochs=EPOCHS, 
    callbacks=[checkpoint],
    verbose=True
    )

In [None]:
plotter(history1, model1)

In [None]:
plotter(history2, model2)

In [None]:
models = [model1, model2]

preds = [model.predict(x_test) for model in models]
preds=np.array(preds)
summed = np.sum(preds, axis=0)

In [None]:
best_weights(preds)

In [None]:
ideal_weights = [0.7, 0.2]

prediction1 = model1.predict_classes(x_test)
prediction2 = model2.predict_classes(x_test)

accuracy1 = accuracy_score(y_test, prediction1)
accuracy2 = accuracy_score(y_test, prediction2)

ideal_weighted_preds = np.tensordot(preds, ideal_weights, axes=((0),(0)))
ideal_weighted_ensemble_prediction = np.argmax(ideal_weighted_preds, axis=1)

ideal_weighted_accuracy = accuracy_score(y_test, ideal_weighted_ensemble_prediction)

print('Accuracy Score for model1 = ', accuracy1)
print('Accuracy Score for model2 = ', accuracy2)
print('Accuracy Score for average ensemble = ', ideal_weighted_accuracy)