### 1. Loading Dependencies and Dataset 

In [82]:
import os
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import tensorflow as tf, tensorflow.keras.backend as K
from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Model
from tensorflow.keras import optimizers
from tensorflow.keras.optimizers import Adam
import efficientnet.tfkeras as efn
from tensorflow.keras.layers import Dense,Dropout,Input,Reshape,Lambda, GlobalAveragePooling2D, Concatenate, Multiply

In [2]:
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"

In [3]:
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

In [4]:
#TPU Configurations
AUTO = tf.data.experimental.AUTOTUNE
try:
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
    print('Running on TPU ', tpu.master())
except ValueError:
    tpu = None

if tpu:
    tf.config.experimental_connect_to_cluster(tpu)
    tf.tpu.experimental.initialize_tpu_system(tpu)
    strategy = tf.distribute.experimental.TPUStrategy(tpu)
else:
    strategy = tf.distribute.get_strategy()

print("REPLICAS: ", strategy.num_replicas_in_sync)


REPLICAS:  1


In [5]:
IMG_SIZE_h = 224 
IMG_SIZE_w = 224
channel = 3
BATCH_SIZE = 32*strategy.num_replicas_in_sync
num_classes = 2

In [16]:
#Train Test Split
from sklearn.model_selection import train_test_split
import os
from pathlib import Path
import cv2
test_path="./data/test"
train_path="./data/train"

test_set = [str(file) for file in Path(test_path).rglob(f'*.png')]
train_set = [str(file) for file in Path(train_path).rglob(f'*.png')]

images =[]
labels = []

image_path = []

for file in test_set:
    image = cv2.imread(file)
    image = cv2.resize(image, (IMG_SIZE_h,IMG_SIZE_w))
    images.append(image)

    image_path.append(file)
    labels.append(1)
    
for file in train_set:
    image = cv2.imread(file)
    image = cv2.resize(image, (IMG_SIZE_h,IMG_SIZE_w))

    images.append(image)
    image_path.append(file)
    labels.append(0)

imgaes = np.array(images, dtype=np.float32) / 255.0
labels = np.array(labels, dtype=np.float32)


In [17]:
x_train,x_val,y_train,y_val = train_test_split(imgaes,np.array(labels),test_size=0.198,shuffle=True) 

### 2. Functions

#### i)Focal Loss + Label Smoothing

In [18]:
import keras.backend as K
import tensorflow as tf

def binary_focal_loss_with_label_smoothing(gamma=2.0, alpha=0.25, ls=0.1):
    """
    Implementation of Focal Loss for binary classification with label smoothing.
    Formula:
        loss = -alpha*((1-p)^gamma)*log(p)
        y_ls = (1 - α) * y_true + α
    Parameters:
        alpha -- weighting factor for the positive class
        gamma -- focusing parameter for modulating factor (1-p)
        ls    -- label smoothing parameter
    Default value:
        gamma -- 2.0
        alpha -- 0.25
        ls    -- 0.1
    """
    def focal_loss(y_true, y_pred):
        # Define epsilon to prevent division by zero or log(0)
        epsilon = K.epsilon()
        
        # Apply label smoothing
        y_true_ls = (1 - ls) * y_true + ls  # Smoothing label for binary case

        # Clip predictions to prevent log(0) and other issues
        y_pred = tf.clip_by_value(y_pred, epsilon, 1.0 - epsilon)
        
        # Calculate cross entropy
        cross_entropy = -y_true_ls * tf.math.log(y_pred) - (1 - y_true_ls) * tf.math.log(1 - y_pred)
        
        # Calculate weight factor based on the focal loss formula
        weight = alpha * y_true_ls * tf.math.pow((1 - y_pred), gamma) + (1 - alpha) * (1 - y_true_ls) * tf.math.pow(y_pred, gamma)

        # Final loss computation
        loss = weight * cross_entropy
        
        # Sum losses for each example in the batch
        return tf.reduce_sum(loss, axis=1)

    return focal_loss



def categorical_focal_loss_with_label_smoothing(gamma=2.0, alpha=0.25,ls=0.1,classes=2):
    """
    Implementation of Focal Loss from the paper in multiclass classification
    Formula:
        loss = -alpha*((1-p)^gamma)*log(p)
        y_ls = (1 - α) * y_hot    + α / classes
    Parameters:
        alpha -- the same as wighting factor in balanced cross entropy
        gamma -- focusing parameter for modulating factor (1-p)
        ls    -- label smoothing parameter(alpha)
        classes     -- No. of classes
    Default value:
        gamma -- 2.0 as mentioned in the paper
        alpha -- 0.25 as mentioned in the paper
        ls    -- 0.1
        classes     -- 4
    """
    def focal_loss(y_true, y_pred):
        # Define epsilon so that the backpropagation will not result in NaN
        # for 0 divisor case
        epsilon = K.epsilon()
        # Add the epsilon to prediction value
        #y_pred = y_pred + epsilon
        #label smoothing
        y_pred_ls = (1 - ls) * y_pred + ls / classes
        # Clip the prediction value
        y_pred_ls = tf.clip_by_value(y_pred_ls, epsilon, 1.0-epsilon)
        # Calculate cross entropy
        cross_entropy = -y_true*tf.math.log(y_pred_ls)
        # Calculate weight that consists of  modulating factor and weighting factor
        weight = alpha * y_true * tf.math.pow((1-y_pred_ls), gamma)
        # Calculate focal loss
        loss = weight * cross_entropy
        # Sum the losses in mini_batch
        loss = tf.math.reduce_sum(loss, axis=1)
        return loss
    
    return focal_loss   


#### ii) BiLinear Layer (outer_product())

In [19]:
import tensorflow as tf


def outer_product(x):
    #Einstein Notation  [batch,1,1,depth] x [batch,1,1,depth] -> [batch,depth,depth]
    phi_I = tf.einsum('ijkm,ijkn->imn',x[0],x[1])
    
    # Reshape from [batch_size,depth,depth] to [batch_size, depth*depth]
    phi_I = tf.reshape(phi_I,[-1,x[0].shape[3]*x[1].shape[3]])
    
    # Divide by feature map size [sizexsize]
    size1 = int(x[1].shape[1])
    size2 = int(x[1].shape[2])
    phi_I = tf.divide(phi_I, size1*size2)
    
    # Take signed square root of phi_I
    y_ssqrt = tf.multiply(tf.sign(phi_I),tf.sqrt(tf.abs(phi_I)+1e-12))
    
    # Apply l2 normalization
    z_l2 = tf.nn.l2_normalize(y_ssqrt, axis=1)
    return z_l2

def get_output_shape(input_shape):
    # Calculate the output shape based on the outer product operation
    # Assuming input_shape is (batch_size, 3584)
    return (input_shape[0], input_shape[0][1] * input_shape[0][2])

#### iii)F1 Score

In [20]:
from keras import backend as K
import tensorflow as tf
import keras.backend as K

import sys



from keras import backend as K
from keras.metrics import Precision, Recall


class F1Score(tf.keras.metrics.Metric):
    def __init__(self, name="f1_score", **kwargs):
        super(F1Score, self).__init__(name=name, **kwargs)
        self.precision = tf.keras.metrics.Precision()
        self.recall = tf.keras.metrics.Recall()

    def update_state(self, y_true, y_pred, sample_weight=None):
        self.precision.update_state(y_true, y_pred, sample_weight)
        self.recall.update_state(y_true, y_pred, sample_weight)

    def result(self):
        precision = self.precision.result()
        recall = self.recall.result()
        return 2 * (precision * recall) / (precision + recall + tf.keras.backend.epsilon())

    def reset_states(self):
        self.precision.reset_states()
        self.recall.reset_states()


### 3. Model

In [21]:
import tensorflow as tf

def get_model():
    # Define input
    input_shape = (IMG_SIZE_h, IMG_SIZE_w, channel)
    input_tensor = Input(shape=(IMG_SIZE_h, IMG_SIZE_w, channel))

    # Create EfficientNet backbones
    base_model1 = efn.EfficientNetB0(weights='imagenet', include_top=False, input_shape=input_shape, )
    base_model2 = efn.EfficientNetB0(weights='noisy-student', include_top=False, input_shape=input_shape, )

    # base_model1 = tf.keras.applications.EfficientNetV2B0 (weights='imagenet', include_top=False, input_shape=input_shape, )
    # base_model2 = tf.keras.applications.EfficientNetV2B0 (weights='noisy-student', include_top=False, input_shape=input_shape, )
    # base_model1 = tf.keras.applications.EfficientNetV2B0(weights='imagenet', include_top=False, input_shape=input_shape, )

    base_model1.name = "EfficientNetB0_imagenetWeight"
    base_model2.name = "EfficientNetB0_noisy-studentWeight"
    for layer in base_model1.layers:
        layer.name = 'model1_' + layer.name

    for layer in base_model2.layers:
        layer.name = 'model2_' + layer.name

    x1 = base_model1(input_tensor)
    x2 = base_model2(input_tensor)

    # Get the output features from both models and apply global pooling
    d1 = GlobalAveragePooling2D()(x1)  # This will be shape (batch_size, 1792) for EfficientNetB4
    d2 = GlobalAveragePooling2D()(x2)  # This will be shape (batch_size, 1792) for EfficientNetB4

    # Instead of outer product, use a simpler approach to combine features
    # Option 1: Concatenate features
    combined_features = Concatenate()([d1, d2])  # Shape will be (batch_size, 3584)

    # Option 2: Element-wise multiplication (if you want interaction between features)
    # combined_features = Multiply()([d1, d2])  # Shape will be (batch_size, 1792)

    # Add an intermediate dense layer to reduce dimensionality if needed
    intermediate = Dense(512, activation='relu')(combined_features)

    # Final prediction layer
    predictions = Dense(1, activation='sigmoid', name='predictions')(intermediate)

    # Create the full model
    model = Model(inputs=input_tensor, outputs=predictions)

    # Now compile the combined model
    model.compile(
        optimizer=Adam(learning_rate=0.0003, decay=1e-3),
        loss=binary_focal_loss_with_label_smoothing(gamma=2.0, alpha=0.75, ls=0.125)
    )

    return model

In [48]:
import tensorflow as tf
def outer_product(x):
    #Einstein Notation  [batch,1,1,depth] x [batch,1,1,depth] -> [batch,depth,depth]
    phi_I = tf.einsum('ijkm,ijkn->imn',x[0],x[1])
    
    # Reshape from [batch_size,depth,depth] to [batch_size, depth*depth]
    phi_I = tf.reshape(phi_I,[-1,x[0].shape[3]*x[1].shape[3]])
    
    # Divide by feature map size [sizexsize]
    size1 = int(x[1].shape[1])
    size2 = int(x[1].shape[2])
    phi_I = tf.divide(phi_I, size1*size2)
    
    # Take signed square root of phi_I
    y_ssqrt = tf.multiply(tf.sign(phi_I),tf.sqrt(tf.abs(phi_I)+1e-12))
    
    # Apply l2 normalization
    z_l2 = tf.nn.l2_normalize(y_ssqrt, axis=1)
    return z_l2

def get_model():
    # Define input
    input_shape = (IMG_SIZE_h, IMG_SIZE_w, channel)
    input_tensor = Input(shape=(IMG_SIZE_h, IMG_SIZE_w, channel))

    # Create EfficientNet backbones
    base_model1 = efn.EfficientNetB0(weights='imagenet', include_top=False, input_shape=input_shape, )
    base_model2 = efn.EfficientNetB0(weights='noisy-student', include_top=False, input_shape=input_shape, )

    # base_model1 = tf.keras.applications.EfficientNetV2B0 (weights='imagenet', include_top=False, input_shape=input_shape, )
    # base_model2 = tf.keras.applications.EfficientNetV2B0 (weights='noisy-student', include_top=False, input_shape=input_shape, )
    # base_model1 = tf.keras.applications.EfficientNetV2B0(weights='imagenet', include_top=False, input_shape=input_shape, )

    base_model1.name = "EfficientNetB0_imagenetWeight"
    base_model2.name = "EfficientNetB0_noisy-studentWeight"
    for layer in base_model1.layers:
        layer.name = 'model1_' + layer.name

    for layer in base_model2.layers:
        layer.name = 'model2_' + layer.name

    # base_model1 = Model(inputs = input_tensor, outputs= base_model1.output)
    # base_model2 = Model(inputs = input_tensor, outputs= base_model2.output )

    base_model1(input_tensor)
    base_model2(input_tensor)


    base_model1.compile(
        optimizer=Adam(learning_rate=0.0003, decay=1e-3),
        loss=binary_focal_loss_with_label_smoothing(gamma=2.0, alpha=0.75, ls=0.125)
    ) 
    base_model2.compile(
        optimizer=Adam(learning_rate=0.0003, decay=1e-3),
        loss=binary_focal_loss_with_label_smoothing(gamma=2.0, alpha=0.75, ls=0.125)
    ) 

    d1 = base_model1.output  # This will be shape (batch_size, 1792) for EfficientNetB4
    d2 = base_model2.output
    
    combined_features = Concatenate()([d1, d2])

    bilinear = Lambda(outer_product, name='outer_product1', output_shape=(None, 128*128))(combined_features)
    
    predictions = Dense(1, activation='sigmoid', name='predictions')(bilinear)
    model = Model(inputs=input_tensor, outputs=predictions)
    
    return model

In [83]:


def outer_product(inputs):
    # Unpack the inputs
    x1, x2 = inputs
    
    # Get shapes
    batch_size = tf.shape(x1)[0]
    height = tf.shape(x1)[1]
    width = tf.shape(x1)[2]
    depth1 = x1.shape[3]
    depth2 = x2.shape[3]
    
    # Reshape tensors to 2D
    x1_flat = tf.reshape(x1, [batch_size * height * width, depth1])
    x2_flat = tf.reshape(x2, [batch_size * height * width, depth2])
    
    # Reshape to 3D tensors
    x1_3d = tf.reshape(x1_flat, [batch_size, height * width, depth1])
    x2_3d = tf.reshape(x2_flat, [batch_size, height * width, depth2])
    
    # Compute outer product using batch matrix multiplication
    phi_I = tf.matmul(tf.transpose(x1_3d, [0, 2, 1]), x2_3d)  # [batch, depth1, depth2]
    
    # Reshape to 2D
    phi_I = tf.reshape(phi_I, [batch_size, depth1 * depth2])
    
    # Normalize by feature map size
    phi_I = phi_I / tf.cast(height * width, tf.float32)
    
    # Signed square root
    y_ssqrt = tf.sign(phi_I) * tf.sqrt(tf.abs(phi_I) + 1e-12)
    
    # L2 normalization
    z_l2 = tf.nn.l2_normalize(y_ssqrt, axis=1)
    
    return z_l2

def get_model():
    # Define input
    IMG_SIZE_h = 224
    IMG_SIZE_w = 224
    channel = 3
    
    input_tensor = Input(shape=(IMG_SIZE_h, IMG_SIZE_w, channel))
    
    # Create EfficientNet backbones
    base_model1 = efn.EfficientNetB0(weights='imagenet', include_top=False)
    base_model2 = efn.EfficientNetB0(weights='noisy-student', include_top=False)
    
    base_model1.name = "EfficientNetB0_imagenetWeight"
    base_model2.name = "EfficientNetB0_noisy-studentWeight"
    
    # Rename layers to avoid name conflicts
    for layer in base_model1.layers:
        layer.name = 'model1_' + layer.name
    for layer in base_model2.layers:
        layer.name = 'model2_' + layer.name
    
    # Get outputs from each model
    d1 = base_model1(input_tensor)
    d2 = base_model2(input_tensor)
    
    # Apply bilinear pooling
    bilinear = Lambda(outer_product)([d1, d2])
    
    # Final prediction layer
    predictions = Dense(1, activation='sigmoid', name='predictions')(bilinear)
    
    # Create the model
    model = Model(inputs=input_tensor, outputs=predictions)
    
    # Compile the model
    model.compile(
        optimizer=Adam(learning_rate=0.0003, decay=1e-3),
        loss=binary_focal_loss_with_label_smoothing(gamma=2.0, alpha=0.75, ls=0.125),
        metrics=['accuracy']
    )
    
    return model

In [84]:
import keras


opt = Adam(learning_rate=0.0003, decay=1e-3)

model = get_model()

model.compile(
    optimizer=Adam(learning_rate=0.0003, decay=1e-3), 
    loss=binary_focal_loss_with_label_smoothing(gamma=2.0, alpha=0.75, ls=0.125),
    metrics=[
        keras.metrics.BinaryAccuracy(name="accuracy"),
        keras.metrics.Precision(name="precision"),
        keras.metrics.Recall(name="recall"),
        # keras.metrics.F1Score(name="f1score"),
        F1Score
    ]
)

In [52]:
model.summary()

KeyboardInterrupt: 

### 4. Training

In [85]:
history = model.fit(x = x_train,
                    y = y_train,
                    epochs=5,
                    verbose=1,
                    validation_data=(x_val, y_val)
                    )
#it will take some time to start training

Epoch 1/5
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m108s[0m 6s/step - accuracy: 0.4369 - f1_score: 0.4355 - loss: 0.0739 - precision: 0.3120 - recall: 0.7480 - val_accuracy: 0.2586 - val_f1_score: 0.4110 - val_loss: 0.0720 - val_precision: 0.2586 - val_recall: 1.0000
Epoch 2/5
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m40s[0m 5s/step - accuracy: 0.8520 - f1_score: 0.7791 - loss: 0.0492 - precision: 0.6936 - recall: 0.8957 - val_accuracy: 0.3103 - val_f1_score: 0.4286 - val_loss: 0.0691 - val_precision: 0.2727 - val_recall: 1.0000
Epoch 3/5
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m40s[0m 5s/step - accuracy: 0.9487 - f1_score: 0.9033 - loss: 0.0326 - precision: 0.8674 - recall: 0.9482 - val_accuracy: 0.8103 - val_f1_score: 0.4762 - val_loss: 0.0689 - val_precision: 0.8333 - val_recall: 0.3333
Epoch 4/5
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m40s[0m 5s/step - accuracy: 0.9405 - f1_score: 0.8946 - loss: 0.0253 - precision: 0.8183 - 

In [None]:
%matplotlib inline 

import matplotlib as mpl
import matplotlib.pyplot as plt


print ('Matplotlib version: ', mpl.__version__) # >= 2.0.0

val_f1 = history.history['val_f1']
f1 = history.history['f1']
epochs = range(len(f1))

df_categorical_accuracy = pd.DataFrame(val_f1, columns = ['val_f1']) 
df_f1 = pd.DataFrame(f1, columns = ['f1'])

df_categorical_accuracy.to_csv('val_f1.csv')
df_f1.to_csv('f1.csv')

In [92]:
model.summary()

In [None]:
f, ax = plt.subplots(figsize=(12,4)) # set the size that you'd like (width, height)
plt.title('F1 Score')
plt.ylabel('f1 score')
plt.xlabel('Epochs')
plt.plot(epochs,val_f1,label='Validation F1 Score')
plt.plot(epochs, f1,label='Training F1 Score')
plt.legend()
plt.figure()
plt.savefig('F1.png')
plt.show()

### 5. Testing and Saving Model 

In [None]:
path='../input/plant-pathology-2020-fgvc7/'

test = pd.read_csv(path+'test.csv')
test_id = test['image_id']

root = 'images'
x_test = [(os.path.join(GCS_DS_PATH,root,idee+'.jpg')) for idee in test_id]

In [None]:
test_dataset = (
    tf.data.Dataset
    .from_tensor_slices(x_test)
    .map(decode_image, num_parallel_calls=AUTO)
    .batch(BATCH_SIZE)
)

In [None]:
y_pred = model.predict(test_dataset,verbose=1)

In [None]:
def save_results(y_pred):
    
    path='../input/plant-pathology-2020-fgvc7/'
    test = pd.read_csv(path + 'test.csv')
    test_id = test['image_id']

    res = pd.read_csv(path+'train.csv')
    res['image_id'] = test_id
  
    labels = res.keys()

    for i in range(1,5):
        res[labels[i]] = y_pred[:,i-1]

    res.to_csv('submission.csv',index=False)
  
    print(res.head)

In [None]:
save_results(y_pred)

In [None]:
model_json = model.to_json()
with open("Model.json", "w") as json_file:
    json_file.write(model_json)
# serialize weights to HDF5
model.save_weights("Model.h5")

In [24]:
from tensorflow.python.keras.models import model_from_json

# load json and create model
json_file = open('Model.json', 'r')
loaded_model_json = json_file.read()
json_file.close()
loaded_model = model_from_json(loaded_model_json)
# load weights into new model
loaded_model.load_weights("Model.h5")
# loaded_model.summary()

FileNotFoundError: [Errno 2] No such file or directory: 'Model.json'