# **Imports**

In [None]:
import os, glob
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd

from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.callbacks import Callback, EarlyStopping
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.applications.resnet50 import preprocess_input
from sklearn.metrics import classification_report

In [None]:
"""
Load the rice image dataset and preprocessing 
"""

import pathlib
import zipfile
import requests
from concurrent.futures import ThreadPoolExecutor
from PIL import Image


def download_dataset(url, filename):
    response = requests.get(url)
    if response == 200:
        with open(filename, "wb") as f:
            f.write(response.content)
            return True
    else:
        return False


def unzip_file(file_name, location):
    with zipfile.ZipFile(file_name, location) as f:
        f.extractall(location)


def check_file_exists(directory_path) -> bool:
    return os.path.exists(directory_path)


def print_number_of_files(data_dir):
    Arborio = list(data_dir.glob("Arborio/*"))
    Basmati = list(data_dir.glob("Basmati/*"))
    Ipsala = list(data_dir.glob("Ipsala/*"))
    Jasmine = list(data_dir.glob("Jasmine/*"))
    Karacadag = list(data_dir.glob("Karacadag/*"))

    print("The length of Arborio: %d" % len(Arborio))
    print("The length of Jasmine: %d" % len(Jasmine))
    print("The length of Basmati: %d" % len(Basmati))
    print("The length of Ipsala: %d" % len(Ipsala))
    print("The length of Karacadag: %d" % len(Karacadag))


def get_image_files(dir_path):
    all_image_dirs = [f.path for f in os.scandir(dir_path) if f.is_dir()]
    for dir in all_image_dirs:
        for img in os.listdir(dir):
            yield os.path.join(dir, img)


def get_image_size(image_path) -> tuple:
    image = Image.open(image_path)
    return image.size


def check_image_size(image_path, actual_width, actual_height) -> bool:
    width, height = get_image_size(image_path)
    if actual_width is width and actual_height is height:
        print("checked")
        return True
    else:
        return False


def resize_image(image_path, target_size=(250, 250)):
    image = Image.open(image_path)
    resized_image = image.resize(target_size)
    resized_image.save(image_path)
    print(f'Resized image "{image_path}" to {target_size}.')


def resize_if_required(data_dir, actual_width, actual_height):
    with ThreadPoolExecutor(max_workers=8) as executor:
        for img in get_image_files(data_dir):
            result = executor.submit(check_image_size, img, actual_width, actual_height)
            if not result:
                resize_image(img)


directory_path = "Rice_Image_Dataset"
if not check_file_exists(directory_path):
    if not download_dataset(link, directory_path):
        print("Download failed")
        exit()

In [None]:
root_path = "./Rice_Image_Dataset/"
class_names = sorted(os.listdir(root_path))[:-1]
class_names

In [None]:
name_class = os.listdir(root_path)
name_class

In [None]:
filepaths = list(glob.glob(root_path + "/**/*.*"))

In [None]:
print(filepaths[0:2])

In [None]:
labels = list(map(lambda x: os.path.split(os.path.split(x)[0])[1], filepaths))

In [None]:
filepath = pd.Series(filepaths, name="Filepath").astype(str)
labels = pd.Series(labels, name="Label")
data = pd.concat([filepath, labels], axis=1)
data = data.sample(frac=1).reset_index(drop=True)
data.head(5)

In [None]:
# preproccessing
data_dir = pathlib.Path(root_path).absolute()
first_image = next(get_image_files(data_dir))
actual_width, actual_height = get_image_size(first_image)
resize_if_required(data_dir, actual_width, actual_height)

In [None]:
counts = data.Label.value_counts()
sns.barplot(x=counts.index, y=counts)
plt.xlabel("Type")
plt.xticks(rotation=90)

In [None]:
train, test = train_test_split(data, test_size=0.25, random_state=42)

In [None]:
fig, axes = plt.subplots(
    nrows=5, ncols=3, figsize=(10, 8), subplot_kw={"xticks": [], "yticks": []}
)
for i, ax in enumerate(axes.flat):
    ax.imshow(plt.imread(data.Filepath[i]))
    ax.set_title(data.Label[i])
plt.tight_layout()
plt.show()

In [None]:
train_datagen = ImageDataGenerator(preprocessing_function=preprocess_input)
test_datagen = ImageDataGenerator(preprocessing_function=preprocess_input)

In [None]:
train_gen = train_datagen.flow_from_dataframe(
    dataframe=train,
    x_col="Filepath",
    y_col="Label",
    target_size=(100, 100),
    class_mode="categorical",
    batch_size=32,
    shuffle=True,
    seed=42,
)
valid_gen = train_datagen.flow_from_dataframe(
    dataframe=test,
    x_col="Filepath",
    y_col="Label",
    target_size=(100, 100),
    class_mode="categorical",
    batch_size=32,
    shuffle=False,
    seed=42,
)
test_gen = test_datagen.flow_from_dataframe(
    dataframe=test,
    x_col="Filepath",
    y_col="Label",
    target_size=(100, 100),
    class_mode="categorical",
    batch_size=32,
    shuffle=False,
)

In [None]:
pretrained_model = ResNet50(
    input_shape=(100, 100, 3), include_top=False, weights="imagenet", pooling="avg"
)

pretrained_model.trainable = False

In [None]:
inputs = pretrained_model.input

x = Dense(128, activation="relu")(pretrained_model.output)
x = Dense(128, activation="relu")(x)

outputs = Dense(5, activation="softmax")(x)

model = Model(inputs=inputs, outputs=outputs)

In [None]:
model.compile(optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"])

In [None]:
my_callbacks = [
    EarlyStopping(monitor="val_accuracy", min_delta=0, patience=2, mode="auto")
]

In [None]:
history = model.fit(train_gen, validation_data=valid_gen, epochs=10)

In [None]:
model.save("model_resnet50_augmented.h5")

In [None]:
pd.DataFrame(history.history)[["accuracy", "val_accuracy"]].plot()
plt.title("Accuracy")
plt.show()

pd.DataFrame(history.history)[["loss", "val_loss"]].plot()
plt.title("Loss")
plt.show()

In [None]:
results = model.evaluate(test_gen, verbose=0)

print("    Test Loss: {:.5f}".format(results[0]))
print("Test Accuracy: {:.2f}%".format(results[1] * 100))

In [None]:
# Predict the label of the test_gen
pred = model.predict(test_gen)
pred = np.argmax(pred, axis=1)

# Map the label
labels = train_gen.class_indices
labels = dict((v, k) for k, v in labels.items())
pred = [labels[k] for k in pred]

In [None]:
y_test = list(test.Label)
print(classification_report(y_test, pred))

In [None]:
fig, axes = plt.subplots(
    nrows=5, ncols=2, figsize=(12, 8), subplot_kw={"xticks": [], "yticks": []}
)

for i, ax in enumerate(axes.flat):
    ax.imshow(plt.imread(test.Filepath.iloc[i]))
    ax.set_title(f"True: {test.Label.iloc[i]}\nPredicted: {pred[i]}")
plt.tight_layout()
plt.show()