## PDDD-PreTrain

#### Based i the pre-trained model revised in https://doi.org/10.34133/plantphenomics.0054
Xinyu Dong, Qi Wang, Qianding Huang, Qinglong Ge, Kejun Zhao ,Xingcai Wu, Xue Wu, Liang Lei, Gefei Hao

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Conv2D, Activation, MaxPooling2D
from tensorflow.keras.optimizers import SGD, Adam, RMSprop
from tensorflow.keras import regularizers, models, layers
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import cv2
import scipy
import os

import optuna
import wandb
import gc

In [None]:
gpus = tf.config.list_physical_devices('GPU')

if gpus:
    print("TensorFlow is using the GPU \n", gpus)
else:
    print("No GPU detected.")
    
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

In [None]:
os.makedirs("/workspace/Optuna-Trials/Plant-Pathology-Classificator-Conv2D-Trials", exist_ok = True)

In [None]:
gc.collect()
tf.keras.backend.clear_session()

In [None]:
from wandb.integration.keras import WandbMetricsLogger

wandb.require("core")
wandb.login()

In [None]:
# Carga de los datos 

df = pd.read_csv("/tf/Plant-Pathology-Classificator/plant-pathology-2020-/train.csv")
df.head()

In [None]:
import os

df["label"] = df[["healthy", "multiple_diseases", "rust", "scab"]].idxmax(axis=1)

df["filepath"] = df['image_id'].apply(lambda x: os.path.join("/tf/Plant-Pathology-Classificator/plant-pathology-2020-/images", f'{x}.jpg'))

In [None]:
from sklearn.model_selection import train_test_split

X_train, X_temp = train_test_split(df, test_size = 0.5, stratify = df["label"], random_state = 4)

X_test, X_val = train_test_split(X_temp, test_size = 0.4, stratify = X_temp["label"], random_state = 4)

print("Train size:", len(X_train))
print("Validation size:", len(X_val))
print("Test size:", len(X_test))

In [None]:
def color_saturation_filter(img):
    
    # Convertir a uint8
    if img.dtype == np.float32 and img.max() <= 1.0:
        img = (img * 255).astype(np.uint8)
    else:
        img = img.astype(np.uint8)

    # Convertir RGB a HSV
    hsv = cv2.cvtColor(img, cv2.COLOR_RGB2HSV).astype(np.float32)
    h, s, v = cv2.split(hsv)

    # Rangos de matiz
    red_mask1 = (h < 10)
    red_mask2 = (h > 170)
    orange_yellow_mask = (h >= 10) & (h <= 30)
    green_mask = (h >= 35) & (h <= 85)
    cyan_blue_mask = (h >= 90) & (h <= 130)

    # Saturar rojo, naranja y amarillo
    s[red_mask1 | red_mask2 | orange_yellow_mask] *= 3.0

    # Disminuir tonos azules
    s[cyan_blue_mask] *= 0.7
    
    # Disminuir saturación del verde
    s[green_mask] *= 0.7   

    # Disminuir luminancia del verde
    v[green_mask] *= 0.85 

    # Recortar valores a [0,255]
    s = np.clip(s, 0, 255)
    v = np.clip(v, 0, 255)

    # Juntar y convertir en RGB
    hsv_mod = cv2.merge([h, s, v]).astype(np.uint8)
    rgb_mod = cv2.cvtColor(hsv_mod, cv2.COLOR_HSV2RGB)

    return rgb_mod.astype(np.float32)


In [None]:
def custom_preprocessing(img):

    img = color_saturation_filter(img)
    
    return img

In [None]:
datagen_train = ImageDataGenerator(
    preprocessing_function = custom_preprocessing,
    rescale = 1./255,
    rotation_range = 0.01 ,          
    width_shift_range = 0.02,      
    height_shift_range = 0.02,     
    shear_range = 0.2,            
    horizontal_flip = True,       
    vertical_flip = True,         
    brightness_range = (1.0, 1.12),
    channel_shift_range = 30.0,
    fill_mode = "nearest")

datagen_test_and_val = ImageDataGenerator(rescale = 1./255,
                                          preprocessing_function = custom_preprocessing,
                                          dtype = "float32")

In [None]:
train = datagen_train.flow_from_dataframe(
    dataframe = X_train,
    x_col = "filepath",
    y_col = "label",
    target_size = (256,256),
    batch_size = 32,
    class_mode = "categorical",
    shuffle = True,
    seed = 4,
)

test = datagen_test_and_val.flow_from_dataframe(
    dataframe = X_test,
    x_col = "filepath",
    y_col = "label",
    target_size = (256,256),
    batch_size = 32,
    class_mode = "categorical",
    shuffle = False,
    seed = 4
)

val = datagen_test_and_val.flow_from_dataframe(
    dataframe = X_val,
    x_col = "filepath",
    y_col = "label",
    target_size = (256,256),
    batch_size = 32,
    shuffle = False,  
    seed = 4
)

mini_train = datagen_train.flow_from_dataframe(
    dataframe = mini_train,
    x_col = "filepath",
    y_col = "label",
    target_size = (256,256),
    batch_size = 32,
    class_mode = "categorical",
    shuffle = True,
    seed = 4
)

mini_val = datagen_test_and_val.flow_from_dataframe(
    dataframe = mini_val,
    x_col = "filepath",
    y_col = "label",
    target_size = (256,256),
    batch_size = 32,
    class_mode = "categorical",
    shuffle = False,
    seed = 4,
)

In [None]:
img, labels = next(train)

label_indices = np.argmax(labels, axis=1)
class_names = list(train.class_indices.keys())

plt.figure(figsize=(12, 6))
for i in range(10):
    plt.subplot(2, 5, i + 1)
    plt.imshow(img[i])
    plt.title(class_names[label_indices[i]])
    plt.axis("off")

plt.tight_layout()
plt.show()

img, labels = next(mini_train)

label_indices = np.argmax(labels, axis=1)
class_names = list(mini_train.class_indices.keys())

plt.figure(figsize=(12, 6))
for i in range(10):
    plt.subplot(2, 5, i + 1)
    plt.imshow(img[i])
    plt.title(class_names[label_indices[i]])
    plt.axis("off")

plt.tight_layout()
plt.show()

In [None]:
img, labels = next(val)

label_indices = np.argmax(labels, axis=1)
class_names = list(val.class_indices.keys())

plt.figure(figsize=(12, 6))
for i in range(10):
    plt.subplot(2, 5, i + 1)
    plt.imshow(img[i])
    plt.title(class_names[label_indices[i]])
    plt.axis("off")

plt.tight_layout()
plt.show()

img, labels = next(mini_val)

label_indices = np.argmax(labels, axis=1)
class_names = list(mini_val.class_indices.keys())

plt.figure(figsize=(12, 6))
for i in range(10):
    plt.subplot(2, 5, i + 1)
    plt.imshow(img[i])
    plt.title(class_names[label_indices[i]])
    plt.axis("off")

plt.tight_layout()
plt.show()

In [None]:
from tensorflow.keras import mixed_precision

mixed_precision.set_global_policy('mixed_float16')

In [None]:
import torch
import torchvision.models as models

model = models.resnet101(pretrained=False)

num_classes = 20
model.fc = torch.nn.Linear(model.fc.in_features, num_classes)


state_dict = torch.load("/tf/Plant-Pathology-Classificator/ResNet_101_ImageNet_plant-model-84.pth", map_location="cpu")
model.load_state_dict(state_dict)

model.fc = torch.nn.Identity() # Borrar última capa para exportar únicamente el extractor de características

model.eval()

# Conversión a .onnx

dummy_input = torch.randn(1, 3, 224, 224)

torch.onnx.export(
    model,
    dummy_input,
    "model.onnx",
    input_names=["input"],
    output_names=["features"],
    dynamic_axes={"input": {0: "batch"}, "features": {0: "batch"}},
)

import onnx
from onnx import numpy_helper

onnx_model = onnx.load("model.onnx")

def convert_model_to_fp32(model_path, output_path):
    model = onnx.load(model_path)

    for tensor in list(model.graph.initializer):
        arr = numpy_helper.to_array(tensor)
        if arr.dtype == "float16":
            print(f"Converting initializer {tensor.name} from float16 → float32")
            new_tensor = numpy_helper.from_array(arr.astype("float32"), name=tensor.name)
            model.graph.initializer.remove(tensor)
            model.graph.initializer.append(new_tensor)

    for value_info in list(model.graph.input) + list(model.graph.output):
        ttype = value_info.type.tensor_type
        if ttype.elem_type == 10:  # float16
            ttype.elem_type = 1     # float32

    for node in model.graph.node:
        for attr in node.attribute:

            if hasattr(attr, "t") and attr.t.data_type == 10:
                arr = numpy_helper.to_array(attr.t).astype("float32")
                attr.t.CopyFrom(numpy_helper.from_array(arr))

            if hasattr(attr, "tensors"):
                for t in attr.tensors:
                    if t.data_type == 10:
                        arr = numpy_helper.to_array(t).astype("float32")
                        t.CopyFrom(numpy_helper.from_array(arr))

    onnx.save(model, output_path)

convert_model_to_fp32("model.onnx", "model_all_fp32.onnx")

model_check = onnx.load("model_all_fp32.onnx")
float16_tensors = []
for t in model_check.graph.initializer:
    if numpy_helper.to_array(t).dtype == "float16":
        float16_tensors.append(t.name)

from onnx_tf.backend import prepare

onnx_model = onnx.load("model_all_fp32.onnx")
tf_rep = prepare(onnx_model)
tf_rep.export_graph("ResNet101_ImageNet_TF")
print("Finalizó conversión")

In [None]:
base = tf.saved_model.load("ResNet101_ImageNet_TF")

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models

# Model
base = tf.saved_model.load("ResNet101_ImageNet_TF")  # or your path
resnet_fn = base.signatures["serving_default"]

# Wrapper layer
    class ResNet101_Pretrained(tf.keras.layers.Layer):
        def __init__(self, resnet_fn):
            super().__init__()
            self.resnet_fn = resnet_fn

    def call(self, inputs):
        # NHWC → NCHW
        x = tf.transpose(inputs, [0, 3, 1, 2])
        x = tf.cast(x, tf.float32)
        
        if not self._trainable:
            with tf.GradientTape(stop_recording = True):
                outputs = self.resnet_fn(input = x)
        else:
            outputs = self.resnet_fn(input = x)
            
        features = list(outputs.values())[0]
        features = tf.ensure_shape(features, [None, 2048])
        return features

In [1]:
### Firs, the model will be checked using a color saturation filter.

In [None]:
num_classes = 4 

inputs = tf.keras.Input(shape = (224, 224, 3))
x = ResNet101_Pretrained(resnet_fn)(inputs)
x = layers.Dense(512, activation = "relu")(x)
x = layers.Dropout(0.3)(x)
x = layers.Dense(512, activation = "relu")(x)
x = layers.Dropout(0.3)(x)
outputs = layers.Dense(num_classes, activation = "softmax")(x)

model = models.Model(inputs, outputs)

model.summary()

In [None]:
early_stopping = EarlyStopping(monitor = 'val_accuracy', patience = 10, restore_best_weights = True)
lr_reduction = ReduceLROnPlateau(monitor = 'val_loss', factor = 0.2, patience = 5)

In [None]:
wandb.init(
    project = "Plant-Pathology-Classificator-Conv2D-ResNet101-ImageNet-Based-Model.ipynb-Series-1",
    name = "Trial_1_with_filter",
    reinit = True,
    config = {
            "Input_Layer": (224,224,3),
            "learning_rate": 1e-4,
            "Pretrained model": "ResNet101 ",
            "optimizer": "RMSprop",
            "loss" : "Categorical CrossEntropy"
        }
    ) 

In [None]:
model.compile(
    optimizer = tf.keras.optimizers.RMSprop(learning_rate = 1e-4),
    loss = "categorical_crossentropy",
    metrics = ["accuracy"]
)

history = model.fit(train,
                    validation_data = val,
                    epochs = 200,
                    callbaacks = [WandbMetricsLogger(log_freq = 5), early_stopping, lr_reduction])

In [None]:
model.save_weights("Plant-Pathology-Classificator-Conv2D-ResNet101-ImageNet-Based-Model-Filter.h5")

In [None]:
wandb.finish()

### Now, the model will be checked without the color saturation filter.

In [None]:
datagen_train2 = ImageDataGenerator(
    rescale = 1./255,
    rotation_range = 0.01 ,          
    width_shift_range = 0.02,      
    height_shift_range = 0.02,     
    shear_range = 0.2,            
    horizontal_flip = True,       
    vertical_flip = True,         
    brightness_range = (1.0, 1.12),
    channel_shift_range = 30.0,
    fill_mode = "nearest")

datagen_test_and_val2 = ImageDataGenerator(rescale = 1./255,
                                          dtype = "float32")

In [None]:
train = datagen_train2.flow_from_dataframe(
    dataframe = X_train,
    x_col = "filepath",
    y_col = "label",
    target_size = (256,256),
    batch_size = 32,
    class_mode = "categorical",
    shuffle = True,
    seed = 4,
)

test = datagen_test_and_val2.flow_from_dataframe(
    dataframe = X_test,
    x_col = "filepath",
    y_col = "label",
    target_size = (256,256),
    batch_size = 32,
    class_mode = "categorical",
    shuffle = False,
    seed = 4
)

val = datagen_test_and_val2.flow_from_dataframe(
    dataframe = X_val,
    x_col = "filepath",
    y_col = "label",
    target_size = (256,256),
    batch_size = 32,
    shuffle = False,  
    seed = 4
)

In [None]:
num_classes = 4  

inputs = tf.keras.Input(shape = (224, 224, 3))
x = TorchResNetWrapper(resnet_fn)(inputs)
x = layers.Dense(512, activation = "relu")(x)
x = layers.Dropout(0.4)(x)
x = layers.Dense(512, activation = "relu")(x)
x = layers.Dropout(0.3)(x)
outputs = layers.Dense(num_classes, activation = "softmax")(x)

model = models.Model(inputs, outputs)

model.summary()

In [None]:
wandb.init(
    project = "Plant-Pathology-Classificator-Conv2D-ResNet101-ImageNet-Based-Model.ipynb-Series-1",
    name = "Trial_1_without_filter",
    reinit = True,
    config = {
            "Input_Layer": (224,224,3),
            "learning_rate": 1e-4,
            "Pretrained model": "ResNet101 ",
            "optimizer": "RMSprop",
            "loss" : "Categorical CrossEntropy"
        }
    ) 

In [None]:
model.compile(
    optimizer = tf.keras.optimizers.RMSprop(learning_rate = 1e-4),
    loss = "categorical_crossentropy",
    metrics = ["accuracy"]
)

history = model.fit(train,
                    validation_data = val,
                    epochs = 200,
                    callbaacks = [WandbMetricsLogger(log_freq = 5), early_stopping, lr_reduction])

In [None]:
model.save_weights("Plant-Pathology-Classificator-Conv2D-ResNet101-ImageNet-Based-Model-without-Filter.h5")

In [None]:
wandb.finish()