In [None]:
# Training Notebook

# 1. Sampling Images
import os
import random
import shutil

def sample_images(source_folder, destination_folder, sample_size=20):
    # Ensure the destination folder exists
    if not os.path.exists(destination_folder):
        os.makedirs(destination_folder)
    
    # Get a list of all files in the source folder
    all_files = os.listdir(source_folder)
    
    # Filter out non-image files (optional, based on common image extensions)
    image_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.gif', '.tiff'}
    image_files = [f for f in all_files if os.path.splitext(f)[1].lower() in image_extensions]
    
    # Check if the sample size is greater than the available images
    if sample_size > len(image_files):
        raise ValueError(f"Sample size {sample_size} is greater than the number of available images {len(image_files)}.")
    
    # Randomly select the sample size images
    sampled_files = random.sample(image_files, sample_size)
    
    # Copy sampled files to the destination folder
    for file in sampled_files:
        src_path = os.path.join(source_folder, file)
        dest_path = os.path.join(destination_folder, file)
        shutil.copy(src_path, dest_path)
    
    print(f"Sampled {sample_size} images to {destination_folder}")

# Specify the source and destination folders
source_folder = 'filtered_images'
destination_folder = 'sampled_images'

# Sample 100 images
sample_images(source_folder, destination_folder)

In [None]:
# 2. Plotting Sampled Images
import os
import matplotlib.pyplot as plt
from PIL import Image

def plot_images(folder, images_per_plot=5):
    # Get a list of all files in the folder
    all_files = os.listdir(folder)
    
    # Filter out non-image files (optional, based on common image extensions)
    image_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.gif', '.tiff'}
    image_files = [f for f in all_files if os.path.splitext(f)[1].lower() in image_extensions]
    
    # Calculate the number of plots needed
    num_plots = (len(image_files) + images_per_plot - 1) // images_per_plot
    
    # Plot the images
    for i in range(num_plots):
        fig, axs = plt.subplots(1, images_per_plot, figsize=(20, 5))
        for j in range(images_per_plot):
            idx = i * images_per_plot + j
            if idx < len(image_files):
                img_path = os.path.join(folder, image_files[idx])
                img = Image.open(img_path)
                axs[j].imshow(img)
                axs[j].set_title(image_files[idx])
                axs[j].axis('off')
            else:
                axs[j].axis('off')
        plt.show()

# Specify the folder containing the sampled images
folder = 'sampled_images'

# Plot the images 5 at a time
plot_images(folder, images_per_plot=5)

# 3. Training the Filtering Model
import os
import numpy as np
from PIL import Image
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Dataset, random_split
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import matplotlib.pyplot as plt

# Define directories
mixed_img_dirs = ['yield_prediction_fennel/cnn_input_data_filtered', 'yield_prediction_fennel/sampled_images']
good_img_dirs = ['yield_prediction_fennel/cnn_input_data_filtered2', 'yield_prediction_fennel/sampled_images2']

# Create a label dictionary
label_dict = {}
# Label all images in the good_img_dirs as 1
for good_img_dir in good_img_dirs:
    for img in os.listdir(good_img_dir):
        if img.lower().endswith('.jpg'):
            label_dict[img] = 1
# Label all images in the mixed_img_dirs as 0 if not already labeled as 1
for mixed_img_dir in mixed_img_dirs:
    for img in os.listdir(mixed_img_dir):
        if img.lower().endswith('.jpg') and img not in label_dict:
            label_dict[img] = 0

# Verify the labels
print(f"Total labeled images: {len(label_dict)}")
print(f"Number of good images: {sum(label_dict.values())}")
print(f"Number of bad images: {len(label_dict) - sum(label_dict.values())}")

# Step 1: Load and preprocess the data
class CustomImageDataset(Dataset):
    def __init__(self, img_dirs, label_dict, transform=None):
        self.img_dirs = img_dirs
        self.label_dict = label_dict
        self.transform = transform
        self.img_labels = [(img, label_dict[img]) for img_dir in img_dirs for img in os.listdir(img_dir) if img in label_dict]

    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        img_name, label = self.img_labels[idx]
        img_dir = [dir for dir in self.img_dirs if os.path.exists(os.path.join(dir, img_name))][0]
        img_path = os.path.join(img_dir, img_name)
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image, label

# Define transformations including data augmentation for the minority class
transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
    transforms.ToTensor(),
])

# Create dataset
full_dataset = CustomImageDataset(mixed_img_dirs, label_dict, transform=transform)

# Create dataloaders
train_size = int(0.7 * len(full_dataset))
val_size = int(0.1 * len(full_dataset))
test_size = len(full_dataset) - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(full_dataset, [train_size, val_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Step 2: Define the CNN model
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.fc1 = nn.Linear(128 * 16 * 16, 512)
        self.fc2 = nn.Linear(512, 2)
        self.dropout = nn.Dropout(0.25)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))
        x = x.view(-1, 128 * 16 * 16)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

# Initialize the model, loss function and optimizer
model = SimpleCNN()
class_weights = torch.tensor([700/250, 1.0], dtype=torch.float)  # Adjust the weights based on the class imbalance
criterion = nn.CrossEntropyLoss(weight=class_weights)
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
# Step 3: Train the model
num_epochs = 20
train_loss_list = []
val_loss_list = []
test_loss_list = []

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for i, (images, labels) in enumerate(train_loader):
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        if (i + 1) % 10 == 0:  # Print every 10 batches
            print(f"Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(train_loader)}], Loss: {loss.item():.4f}")
    train_loss_list.append(running_loss / len(train_loader))

    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for images, labels in val_loader:
            outputs = model(images)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
    val_loss_list.append(val_loss / len(val_loader))

    print(f"Epoch {epoch+1}, Train Loss: {running_loss / len(train_loader):.4f}, Val Loss: {val_loss / len(val_loader):.4f}")

# Step 4: Evaluate the model on the test set
correct = 0
total = 0
with torch.no_grad():
    for images, labels in test_loader:
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f"Accuracy of the model on the test images: {100 * correct / total:.2f}%")

# Plotting the loss
plt.figure()
plt.plot(train_loss_list, label='Train Loss')
plt.plot(val_loss_list, label='Validation Loss')
plt.legend()
plt.show()

In [None]:
# 4. Testing the Filtering Model and Storing Metrics
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import matplotlib.pyplot as plt
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset, random_split
from PIL import Image
import os
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score
import seaborn as sns

# Assuming the model and dataset class are already defined as in the training script

# Load and preprocess the data
class CustomImageDataset(Dataset):
    def __init__(self, img_dirs, label_dict, transform=None):
        self.img_dirs = img_dirs
        self.label_dict = label_dict
        self.transform = transform
        self.img_labels = [(img, label_dict[img]) for img_dir in img_dirs for img in os.listdir(img_dir) if img in label_dict]

    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        img_name, label = self.img_labels[idx]
        img_dir = [dir for dir in self.img_dirs if os.path.exists(os.path.join(dir, img_name))][0]
        img_path = os.path.join(img_dir, img_name)
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image, label

# Define transformations
transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
])

# Create dataset
mixed_img_dirs = ['cnn_input_data_filtered', 'sampled_images']
good_img_dirs = ['cnn_input_data_filtered2', 'sampled_images2']
label_dict = {}
for good_img_dir in good_img_dirs:
    for img in os.listdir(good_img_dir):
        if img.lower().endswith('.jpg'):
            label_dict[img] = 1
for mixed_img_dir in mixed_img_dirs:
    for img in os.listdir(mixed_img_dir):
        if img.lower().endswith('.jpg') and img not in label_dict:
            label_dict[img] = 0

full_dataset = CustomImageDataset(mixed_img_dirs, label_dict, transform=transform)

# Create dataloaders
train_size = int(0.7 * len(full_dataset))
val_size = int(0.1 * len(full_dataset))
test_size = len(full_dataset) - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(full_dataset, [train_size, val_size, test_size])

test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Define the model (assuming SimpleCNN is defined as in the training script)
model = SimpleCNN()
model.load_state_dict(torch.load('filtering_model.pth'))  # Load the trained model
model.eval()

# Evaluate the model and store results
all_labels = []
all_preds = []
all_images = []

with torch.no_grad():
    for images, labels in test_loader:
        outputs = model(images)
        _, predicted = torch.max(outputs, 1)
        all_labels.extend(labels.numpy())
        all_preds.extend(predicted.numpy())
        all_images.extend(images)

# Compute metrics
accuracy = accuracy_score(all_labels, all_preds)
conf_matrix = confusion_matrix(all_labels, all_preds)
class_report = classification_report(all_labels, all_preds, target_names=['Bad', 'Good'])

# Print metrics
print(f'Accuracy: {accuracy:.2f}')
print('Classification Report:')
print(class_report)

# Plot confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=['Bad', 'Good'], yticklabels=['Bad', 'Good'])
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.show()

# Plot a few test images with their predicted and true labels
num_images_to_plot = 20
plt.figure(figsize=(15, 10))
for i in range(num_images_to_plot):
    plt.subplot(4, 5, i+1)
    image = all_images[i].permute(1, 2, 0).numpy()
    true_label = 'Good' if all_labels[i] == 1 else 'Bad'
    predicted_label = 'Good' if all_preds[i] == 1 else 'Bad'
    plt.imshow(image)
    plt.title(f'True: {true_label}\nPred: {predicted_label}')
    plt.axis('off')
plt.tight_layout()
plt.show()

In [None]:
# 5. Manual Annotation and Additional Training
import os
import random
from PIL import Image
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
import torch
import torch.nn as nn
import torch.nn.functional as F
import pandas as pd
import matplotlib.pyplot as plt
from ipywidgets import interact, IntSlider, Dropdown, Button, VBox, HBox, Label, Output
from IPython.display import display, clear_output

# Define a custom dataset for the filtered images
class FilteredImagesDataset(Dataset):
    def __init__(self, img_names, img_dir, transform=None):
        self.img_names = img_names
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.img_names)

    def __getitem__(self, idx):
        img_name = self.img_names[idx]
        img_path = os.path.join(self.img_dir, img_name)
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        
        # Assign labels based on the filename (assuming this is how labels are determined)
        if 'good' in img_name.lower():
            label = 1  # Good
        else:
            label = 0  # Bad
            
        return image, img_name, label

# Define the transform for the test dataset
test_transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
])

# Load image names from the directory, excluding those in the training directories
test_img_dir = 'bright_spicy_2024/top_images' # now is split into good_top_images and bad_top_images
training_dirs = ['yield_prediction_fennel/cnn_input_data_filtered', 'yield_prediction_fennel/sampled_images']
training_images = set()
for training_dir in training_dirs:
    training_images.update(os.listdir(training_dir))

all_img_names = [img for img in os.listdir(test_img_dir) if img.lower().endswith('.jpg') and '_top_' in img and img not in training_images]

# Randomly select 300 images
selected_img_names = random.sample(all_img_names, 300)

# Create the test dataset and dataloader
test_dataset = FilteredImagesDataset(selected_img_names, test_img_dir, transform=test_transform)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Define the CNN model
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.fc1 = nn.Linear(128 * 16 * 16, 512)
        self.fc2 = nn.Linear(512, 2)  # Output 2 classes: Good (1) and Bad (0)
        self.dropout = nn.Dropout(0.25)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))
        x = x.view(-1, 128 * 16 * 16)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

# Function to test the model on the test dataset
def test_model(model, test_loader):
    model.eval()
    all_preds = []
    all_img_names = []
    all_images = []
    with torch.no_grad():
        for images, img_names, labels in test_loader:
            outputs = model(images)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_img_names.extend(img_names)
            all_images.extend(images.cpu().numpy())
    return all_img_names, all_preds, all_images

# Load the model and move it to the appropriate device
model = SimpleCNN()
model_path = 'filtering_model.pth'
model.load_state_dict(torch.load(model_path))
model.to('cuda' if torch.cuda.is_available() else 'cpu')

# Test the model and get the predictions
img_names, preds, images = test_model(model, test_loader)

# Collect corrected labels
corrected_labels = {}

current_index = 0
out = Output()

def annotate_images(index):
    global current_index
    current_index = index
    img = images[index].transpose((1, 2, 0))  # Convert from (C, H, W) to (H, W, C)
    img_name = img_names[index]
    pred = preds[index]
    label = "Good" if pred == 1 else "Bad"
    
    def on_dropdown_change(change):
        global current_index
        img_name = img_names[current_index]
        corrected_label = 1 if change['new'] == "Good" else 0
        corrected_labels[img_name] = corrected_label
    
    def on_button_click(_):
        corrected_label = 1 if dropdown.value == "Good" else 0
        corrected_labels[img_name] = corrected_label
        next_index = min(index + 1, len(images) - 1)
        slider.value = next_index
    
    dropdown = Dropdown(options=["Good", "Bad"], value=label, description="Label:")
    dropdown.observe(on_dropdown_change, names='value')
    
    button = Button(description="Next")
    button.on_click(on_button_click)
    
    with out:
        clear_output(wait=True)
        display(HBox([Label(f"Image: {img_name}"), dropdown, button]))
        plt.imshow(img)
        plt.axis('off')
        plt.show()

slider = IntSlider(min=0, max=len(images) - 1, step=1, description='Index:')
interact(annotate_images, index=slider)

display(out)

# After annotation, corrected_labels will have the manually corrected labels
print("Corrected Labels:", corrected_labels)

import csv

# Save corrected labels to a CSV file
with open('corrected_labels_top.csv', 'w', newline='') as csvfile:
    fieldnames = ['Image', 'Corrected Label']
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

    writer.writeheader()
    for img_name, label in corrected_labels.items():
        writer.writerow({'Image': img_name, 'Corrected Label': label})

print("Corrected labels saved to corrected_labels.csv")

import pandas as pd

# Load the corrected labels from CSV files
corrected_labels_top = pd.read_csv('corrected_labels_top.csv')
corrected_labels_side = pd.read_csv('corrected_labels_side.csv')

from torchvision import transforms
from torch.utils.data import Dataset, DataLoader

class CorrectedImagesDataset(Dataset):
    def __init__(self, img_names, labels, img_dir, transform=None):
        self.img_names = img_names
        self.labels = labels
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.img_names)

    def __getitem__(self, idx):
        img_name = self.img_names[idx]
        img_path = os.path.join(self.img_dir, img_name)
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        label = self.labels[idx]
        return image, label

# Define the transform
transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
])

# Create datasets
top_img_dir = '/home/ec2-user/SageMaker/notebooks/height_bright_spicy/bright_spicy_2024/top_images'
side_img_dir = '/home/ec2-user/SageMaker/notebooks/height_bright_spicy/bright_spicy_2024/side_images'

top_dataset = CorrectedImagesDataset(
    corrected_labels_top['Image'].tolist(),
    corrected_labels_top['Corrected Label'].tolist(),
    top_img_dir,
    transform=transform
)

side_dataset = CorrectedImagesDataset(
    corrected_labels_side['Image'].tolist(),
    corrected_labels_side['Corrected Label'].tolist(),
    side_img_dir,
    transform=transform
)

from torch.utils.data import random_split

# Define the split ratio
train_ratio = 0.8
top_train_size = int(train_ratio * len(top_dataset))
top_val_size = len(top_dataset) - top_train_size

side_train_size = int(train_ratio * len(side_dataset))
side_val_size = len(side_dataset) - side_train_size

# Split the datasets
top_train_dataset, top_val_dataset = random_split(top_dataset, [top_train_size, top_val_size])
side_train_dataset, side_val_dataset = random_split(side_dataset, [side_train_size, side_val_size])

# Create data loaders
train_loader = DataLoader(top_train_dataset + side_train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(top_val_dataset + side_val_dataset, batch_size=32, shuffle=False)

# Define the CNN model (reusing the previous definition)
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.fc1 = nn.Linear(128 * 16 * 16, 512)
        self.fc2 = nn.Linear(512, 2)
        self.dropout = nn.Dropout(0.25)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))
        x = x.view(-1, 128 * 16 * 16)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

# Initialize the model, loss function, and optimizer
model = SimpleCNN()
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Move the model to the appropriate device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

# Training loop
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
    print(f'Epoch {epoch+1}, Loss: {running_loss/len(train_loader)}')

    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    print(f'Validation Loss: {val_loss/len(val_loader)}, Accuracy: {100 * correct / total}%')

# Save the model after training
torch.save(model.state_dict(), 'filtering_model2.pth')