# Crop Damage Assesment Using a Modified ReseNet34 Model

This notebook will dive into building an image classifcation model to classify
whether crops in an image are suffering drought, weeds, nutrient deficiency, or other factors such as wind or disease. This would help smallholder farm owners across all of Africa process their insurance claims faster in case of a drought or other circumstances that might affect their crops.

The ~26k image dataset used for training was retrieved from zindi.africa.

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
from keras.callbacks import EarlyStopping
from keras.layers import Dense, Conv2D,  MaxPool2D, Flatten, GlobalAveragePooling2D,  BatchNormalization, Layer, Add
from keras.models import Sequential
from keras.models import Model
from tensorflow.keras import layers, models
from tensorflow.keras.applications.resnet50 import preprocess_input, decode_predictions
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.models import load_model
from tensorflow.keras.optimizers import Adam

In [None]:
!gdown 1GhKtnR2bUdXo6CTntL8ji4gtE9SUAWMo

In [None]:
!unzip '/content/CGIR.zip';

In [None]:
%rm -rf Processed_Images

# Image Filters (Optional)


In this case, sharpening is applied.

In [None]:
import cv2
import os
from pathlib import Path

#sharpen the image
def sharpen_image(image):
    image_blurred = cv2.GaussianBlur(image, (0, 0), 3)
    image_sharp = cv2.addWeighted(image, 1.5, image_blurred, -0.5, 0)
    return image_sharp


def process_images_in_folder(input_folder, output_folder):
    # Make sure the input folder path exists
    if not os.path.exists(input_folder):
        print(f"Input folder not found: {input_folder}")
        return

    # Create the output folder if it doesn't exist
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
        print(f"Output folder created: {output_folder}")

    # Get a list of all files in the input folder
    files = os.listdir(input_folder)

    # Filter out non-image files (you can customize this based on your file extensions)
    image_files = [f for f in files if f.lower().endswith(('.png', '.jpg', '.jpeg'))]

    # Process each image in the input folder
    for image_file in image_files:
        # Construct the full path to the input image
        input_image_path = os.path.join(input_folder, image_file)

        # Read the input image
        image = cv2.imread(input_image_path)

        # Check if the input image is valid
        if image is None:
            print(f"Failed to read input image: {input_image_path}")
            continue

        # Apply the gray_image function
        processed_image = sharpen_image(image)

        # Construct the full path to the output image
        output_image_path = os.path.join(output_folder, image_file)

        # Save the processed image
        cv2.imwrite(output_image_path, processed_image * 255)

        print(f"Processed: {input_image_path} -> {output_image_path}")

# Specify the path to your input folder containing images
input_folder_path = "/content/CGIR/images"

# Specify the path to your output folder for processed images
output_folder_path = "/content/Processed_Images"

# Call the function to process images in the input folder and save to the output folder
process_images_in_folder(input_folder_path, output_folder_path)

In [None]:
!zip -r /content/Processedimgs.zip /content/Processed_Images

# Importing Dataset + Upsampling
Dataset was loaded from gooogle drive. Upsampling performed on underepresented classes to improve model genralizeability. In this case, the DR, ND and other classes are being duplicated several times to upsample the data. This puts each class at about 9000 images.

In [None]:
from google.colab import files
files.download("/content/Processedimgs.zip")

In [None]:
# Step 1: Read the CSV file
data = pd.read_csv('/content/CGIR/Train.csv')

wd_rows = data[data['damage'] == 'WD']
random_wd_rows = wd_rows.sample(n=9000)
G_rows = data[data['damage'] == 'G']
random_G_rows = G_rows.sample(n=9000)

DR_rows = data[data['damage'] == 'DR']
for i in range(0, 1):
  DR_rows = DR_rows.append(DR_rows, ignore_index=True)

ND_rows = data[data['damage'] == 'ND']
for i in range(0, 5):
  ND_rows = ND_rows.append(ND_rows, ignore_index=True)

other_rows = data[data['damage'] == 'other']
for i in range(0, 30):
  random_other_rows = other_rows.sample(n=270)
  other_rows = other_rows.append(random_other_rows, ignore_index=True)

data = random_wd_rows.append(random_G_rows, ignore_index=True)
data = data.append(DR_rows, ignore_index=True)
data = data.append(ND_rows, ignore_index=True)
data = data.append(other_rows, ignore_index=True)

# Shuffle the resulting dataframe for randomness
data= data.sample(frac=1).reset_index(drop=True)
value_counts = data['damage'].value_counts()

# Print the count of a specific value (replace 'your_value' with the value you're interested in)
print("Count of 'your_value':", value_counts['other'])

# Modified ResNet34 Model

Initially a standard ResNet34 architecture was used, but was then slightly modified by removing or adding layers. In the cell below, an improvement of 4-6% in accuracy was achieved over a regular ResNet34.



In [None]:
class ResnetBlock(Model):
    """
    A standard resnet block.
    """

    def __init__(self, channels: int, down_sample=False):
        """
        channels: same as number of convolution kernels
        """
        super().__init__()

        self.__channels = channels
        self.__down_sample = down_sample
        self.__strides = [2, 1] if down_sample else [1, 1]

        KERNEL_SIZE = (3, 3)
        # use He initialization, instead of Xavier (a.k.a 'glorot_uniform' in Keras), as suggested in [2]
        INIT_SCHEME = "he_normal"

        self.conv_1 = Conv2D(self.__channels, strides=self.__strides[0],
                             kernel_size=KERNEL_SIZE, padding="same", kernel_initializer=INIT_SCHEME)
        self.bn_1 = BatchNormalization()
        self.conv_2 = Conv2D(self.__channels, strides=self.__strides[1],
                             kernel_size=KERNEL_SIZE, padding="same", kernel_initializer=INIT_SCHEME)
        self.bn_2 = BatchNormalization()
        self.merge = Add()

        if self.__down_sample:
            # perform down sampling using stride of 2, according to [1].
            self.res_conv = Conv2D(
                self.__channels, strides=2, kernel_size=(1, 1), kernel_initializer=INIT_SCHEME, padding="same")
            self.res_bn = BatchNormalization()

    def call(self, inputs):
        res = inputs

        x = self.conv_1(inputs)
        x = self.bn_1(x)
        x = tf.nn.relu(x)
        x = self.conv_2(x)
        x = self.bn_2(x)

        if self.__down_sample:
            res = self.res_conv(res)
            res = self.res_bn(res)

        # if not perform down sample, then add a shortcut directly
        x = self.merge([x, res])
        out = tf.nn.relu(x)
        return out


class ResNet18(Model):

    def __init__(self, num_classes, **kwargs):
        """
            num_classes: number of classes in specific classification task.
        """
        super().__init__(**kwargs)
        self.conv_1 = Conv2D(64, (7, 7), strides=2,
                             padding="same", kernel_initializer="he_normal")
        self.init_bn = BatchNormalization()
        self.pool_2 = MaxPool2D(pool_size=(2, 2), strides=2, padding="same")
        self.res_1_1 = ResnetBlock(64)
        self.res_1_2 = ResnetBlock(64)
        self.res_2_1 = ResnetBlock(64)
        self.res_2_2 = ResnetBlock(64)
        self.res_3_1 = ResnetBlock(128, down_sample=True)
        self.res_3_2 = ResnetBlock(128)
        self.res_4_1 = ResnetBlock(128)
        self.res_4_2 = ResnetBlock(128)
        self.res_5_1 = ResnetBlock(256, down_sample=True)
        self.res_5_2 = ResnetBlock(256)
        self.res_6_1 = ResnetBlock(256)
        self.res_6_2 = ResnetBlock(256)
        self.res_7_1 = ResnetBlock(512, down_sample=True)
        self.res_7_2 = ResnetBlock(512)
        self.res_8_1 = ResnetBlock(1024, down_sample=True)
        self.res_8_2 = ResnetBlock(1024)
        self.avg_pool = GlobalAveragePooling2D()
        self.flat = Flatten()
        self.ann1 = Dense(128, activation='relu')
        self.fc = Dense(num_classes, activation="softmax")

    def call(self, inputs):
        out = self.conv_1(inputs)
        out = self.init_bn(out)
        out = tf.nn.relu(out)
        out = self.pool_2(out)
        for res_block in [self.res_1_1, self.res_1_2, self.res_2_1, self.res_2_2, self.res_3_1, self.res_3_2, self.res_4_1,
                          self.res_4_2, self.res_5_1, self.res_5_2, self.res_6_1, self.res_6_2, self.res_7_1, self.res_7_2,
                          self.res_8_1, self.res_8_2]:
        # for res_block in [self.res_1_1, self.res_1_2, self.res_2_1, self.res_2_2, self.res_3_1, self.res_3_2]:
            out = res_block(out)
        out = self.avg_pool(out)
        out = self.flat(out)
        out = self.ann1(out)
        out = self.fc(out)
        return out

# Defining Image Generators

Images were normalized, resized to 224x224 and data augmentation techniques were applied to increase variety in the duplicated images. A 70 to 30 split was used for training and validation datasets respectively, with a batch size of 64 being chosen through experimentation and many iterations.

In [None]:
datagen = ImageDataGenerator(
    preprocessing_function=preprocess_input,
    validation_split=0.3,
    rescale=1./255,  # scale pixel values to [0, 1]
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest'
)

train_generator = datagen.flow_from_dataframe(
    dataframe=data,
    directory='/content/Processed_Images',
    x_col='filename',
    y_col='damage',
    subset='training',
    color_mode='rgb',
    batch_size=64,
    seed=42,
    shuffle=True,
    class_mode='sparse',  # since we have regression output
    target_size=(224,224)  # default input size for ResNet50
)

val_generator = datagen.flow_from_dataframe(
    dataframe=data,
    directory='/content/Processed_Images',
    x_col='filename',
    y_col='damage',
    subset='validation',
    color_mode='rgb',
    batch_size=64,
    seed=42,
    shuffle=True,
    class_mode='sparse',
    target_size=(224,224)
)

In [None]:
model = ResNet18(5)
model.build(input_shape = (None,224,224,3))

model.compile(optimizer= Adam(learning_rate=0.0001), loss='sparse_categorical_crossentropy', metrics = ["accuracy"])

In [None]:
# model = load_model(r'C:\Users\aiman\Desktop\DL_dataset\model.h5')
history = model.fit(
    train_generator,
    validation_data=val_generator,
    epochs=100
)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
model.save('/content/drive/MyDrive/CGIR', save_format='tf')

# Testing Model on a Test Dataset


In [None]:
test_data = pd.read_csv('/content/CGIR/Test.csv')

datagen_test = ImageDataGenerator(
    preprocessing_function=preprocess_input,
    rescale=1./255  # scale pixel values to [0, 1]
)

test_generator = datagen_test.flow_from_dataframe(
    dataframe=test_data,
    directory=r'/content/CGIR/images',
    x_col='filename',
    y_col=None,
    color_mode='rgb',
    batch_size=64,
    seed=42,
    class_mode=None,
    shuffle=False,
    target_size=(224,224)  # default input size for ResNet50
)

In [None]:
# model = tf.keras.models.load_model('/content/drive/MyDrive/CGIR')
predictions = model.predict(test_generator)

# Creating CSV File Containing Classifcations

In [None]:
class_indices = train_generator.class_indices
print(predictions)
print(class_indices)

In [None]:
df = pd.DataFrame(columns=class_indices.keys())
predictionsDF = pd.DataFrame(predictions, columns=df.columns)
df1 = pd.concat([df, predictionsDF])
df1.shape

In [None]:
# Extract 'ID' column from test_df
ID_df = test_data[['ID']]
result_df = pd.concat([ID_df, predictionsDF], axis =1)
result_df.to_csv('predictions.csv', index=False)
result_df.head()
