In [1]:
import os
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset


# Define the classes
classes = ['cherry', 'strawberry', 'tomato']
data_dir = './train_data'

# Dictionary to store the loaded images
data = {}

# List of images to exclude
excluded_images = {
    'cherry_0055.jpg',
    'cherry_0105.jpg',
    'cherry_0147.jpg',
    'strawberry_0931.jpg',
    'tomato_0087.jpg'
}

for class_name in classes:
    class_dir = os.path.join(data_dir, class_name)
    images = []
    
    # Loop through all files in the class directory
    for file_name in os.listdir(class_dir):
        if file_name.endswith('.jpg'):  # Check for image files
            # Check if the file should be excluded
            if file_name in excluded_images:
                continue  # Skip this file
            file_path = os.path.join(class_dir, file_name)
            
            # Open the image and append it to the list
            img = Image.open(file_path)
            images.append(img)
    
    # Store images for this class
    data[class_name] = images


# Example: Accessing images from the 'cherry' class
print(f'Loaded {len(data["cherry"])} images from cherry class.')
print(f'Loaded {len(data["strawberry"])} images from strawberry class.')
print(f'Loaded {len(data["tomato"])} images from tomato class.')

Loaded 1492 images from cherry class.
Loaded 1494 images from strawberry class.
Loaded 1494 images from tomato class.


In [2]:
# Define the target resolution
target_size = (300, 300)

# Dictionary to hold filtered data
filtered_data = {}

count = 0
# Iterate through the classes
for class_name, images in data.items():
    filtered_images = []
    
    # Check each image for its resolution
    for img in images:
        if img.size == target_size:
            filtered_images.append(img)  # Keep images that match 300x300
        else:
            count += 1
    
    # Store only the filtered images in the new dictionary
    filtered_data[class_name] = filtered_images

# Example: Accessing filtered images
print(f'Filtered {len(filtered_data["cherry"])} images from cherry class.')
print(f'Filtered {len(filtered_data["strawberry"])} images from strawberry class.')
print(f'Filtered {len(filtered_data["tomato"])} images from tomato class.')
print(f'Removed {count} images in total.')
print(f'Filtered {len(filtered_data["cherry"])+len(filtered_data["strawberry"])+len(filtered_data["tomato"])} images in total.')


Filtered 1475 images from cherry class.
Filtered 1477 images from strawberry class.
Filtered 1476 images from tomato class.
Removed 52 images in total.
Filtered 4428 images in total.


In [3]:
import numpy as np
from collections import defaultdict

def detect_and_filter_rgb_outliers(image_data, thresholds):
    filtered_data = defaultdict(list)
    outliers = []
    grayscale_count = 0
    total_input_images = sum(len(images) for images in image_data.values())
    
    for class_name, images in image_data.items():
        for img in images:
            img_np = np.array(img)  # Convert image to NumPy array
            
            if len(img_np.shape) == 2:  # Grayscale image (only height and width)
                grayscale_count += 1
                continue
            
            # Calculate the mean pixel intensity for each RGB channel
            mean_channels = np.mean(img_np, axis=(0, 1))
            
            # Detect if any of the channels are outside their specific thresholds
            condition = (mean_channels < [t[0] for t in thresholds]) | (mean_channels > [t[1] for t in thresholds])
            if np.any(condition):
                outliers.append(img)
            else:
                filtered_data[class_name].append(img)
    
    total_processed_images = sum(len(images) for images in filtered_data.values()) + len(outliers)
    
    print(f"Input images: {total_input_images}")
    print(f"Processed images: {total_processed_images}")
    print(f"Removed Grayscale images: {grayscale_count}")
    print(f"RGB images: {total_processed_images - grayscale_count}")
    print(f"Outliers: {len(outliers)}")
    print(f"Images in filtered_data: {sum(len(images) for images in filtered_data.values())}")
    
    return dict(filtered_data), outliers

# Define channel-specific thresholds based on the distributions
thresholds = [
    (27, 238),  # Red channel (low, high)
    (14, 220),  # Green channel (low, high)
    (8, 218)    # Blue channel (low, high)
]

# Use the optimized function with new thresholds
filtered_data, rgb_outliers = detect_and_filter_rgb_outliers(filtered_data, thresholds)
print(f'\nFound {len(rgb_outliers)} potential RGB channel-based outliers out of {sum(len(images) for images in filtered_data.values()) + len(rgb_outliers)} total images.')
print(f'Filtered data contains {sum(len(images) for images in filtered_data.values())} images after RGB channel-based filtering.')

Input images: 4428
Processed images: 4427
Removed Grayscale images: 1
RGB images: 4426
Outliers: 144
Images in filtered_data: 4283

Found 144 potential RGB channel-based outliers out of 4427 total images.
Filtered data contains 4283 images after RGB channel-based filtering.


In [4]:
import numpy as np
from PIL import Image
import numpy as np

def normalize_filtered_data(filtered_data):
    normalized_data = {}
    
    for class_name, images in filtered_data.items():
        normalized_images = []
        for img in images:
            # Convert PIL Image to numpy array
            img_np = np.array(img)
            
            # Check if the image is RGB (3 channels)
            if len(img_np.shape) == 3 and img_np.shape[2] == 3:
                # Normalize to [0, 1] range
                img_normalized = img_np.astype(np.float32) / 255.0
                
                normalized_images.append(img_normalized)
            else:
                # If not RGB, keep the original image as a NumPy array
                normalized_images.append(img_np.astype(np.float32) / 255.0)
        
        normalized_data[class_name] = normalized_images
    
    return normalized_data

normalized_filtered_data = normalize_filtered_data(filtered_data)

print(f"Normalized data contains {sum(len(images) for images in normalized_filtered_data.values())} images.")
# You can now use normalized_filtered_data for further processing or machine learning tasks
# Optional: Check a sample image to confirm normalization
if normalized_filtered_data and list(normalized_filtered_data.values())[0]:
    sample_image = list(normalized_filtered_data.values())[0][0]
    sample_array = np.array(sample_image)
    print(f"Sample image shape: {sample_array.shape}")
    print(f"Sample image data type: {sample_array.dtype}")
    print(f"Sample image value range: [{np.min(sample_array)}, {np.max(sample_array)}]")

Normalized data contains 4283 images.
Sample image shape: (300, 300, 3)
Sample image data type: float32
Sample image value range: [0.0, 1.0]


In [5]:
normalized_filtered_data

{'cherry': [array([[[0.3254902 , 0.42352942, 0.3372549 ],
          [0.3137255 , 0.4117647 , 0.3254902 ],
          [0.3019608 , 0.4       , 0.3137255 ],
          ...,
          [0.67058825, 0.70980394, 0.5137255 ],
          [0.6745098 , 0.7137255 , 0.5058824 ],
          [0.65882355, 0.7137255 , 0.4745098 ]],
  
         [[0.32156864, 0.41960785, 0.33333334],
          [0.3137255 , 0.4117647 , 0.3254902 ],
          [0.29803923, 0.39607844, 0.30980393],
          ...,
          [0.6666667 , 0.7058824 , 0.50980395],
          [0.67058825, 0.70980394, 0.5019608 ],
          [0.6627451 , 0.7058824 , 0.47843137]],
  
         [[0.31764707, 0.41568628, 0.32941177],
          [0.30588236, 0.40392157, 0.31764707],
          [0.29411766, 0.39215687, 0.30588236],
          ...,
          [0.654902  , 0.6901961 , 0.5058824 ],
          [0.67058825, 0.70980394, 0.5058824 ],
          [0.6627451 , 0.7058824 , 0.4862745 ]],
  
         ...,
  
         [[0.20784314, 0.28235295, 0.20784314],
    

In [6]:
import torch
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split

# Assuming normalized_filtered_data is already defined
X_data = []
y_labels = []

# Map labels for each category
label_mapping = {
    'cherry': 0,
    'strawberry': 1,
    'tomato': 2
}

# Step 1: Convert images to tensors and flatten them individually
for label, images in normalized_filtered_data.items():
    for img in images:
        img_tensor = torch.tensor(img).float()  # Convert to torch tensor and float32
        img_flattened = img_tensor.view(-1)  # Flatten the image from (300, 300, 3) -> (270000,)
        X_data.append(img_flattened)
        y_labels.append(label_mapping[label])

# Step 2: Stack tensors together
X = torch.stack(X_data)  # Now, X will be of shape [num_images, 270000]
y = torch.tensor(y_labels)

# Step 3: Split the dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Step 4: Create TensorDatasets and DataLoaders
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# Check shapes
print(f"Training X shape: {X_train.shape}, Training y shape: {y_train.shape}")
print(f"Testing X shape: {X_test.shape}, Testing y shape: {y_test.shape}")


Training X shape: torch.Size([3426, 270000]), Training y shape: torch.Size([3426])
Testing X shape: torch.Size([857, 270000]), Testing y shape: torch.Size([857])


In [7]:
class MLP(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, output_size)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.fc3(x)
        return x

class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        
        # First convolutional layer: input channels=3, output channels=16, kernel size=3x3
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=16, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, padding=1)
        
        # Max pooling layer to downsample
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        
        # Fully connected layers
        self.fc1 = nn.Linear(32 * 75 * 75, 128)  # Adjusting for 300x300 input size after pooling
        self.fc2 = nn.Linear(128, 3)  # Output size matches the number of classes (cherry, strawberry, tomato)
        
        # Activation function
        self.relu = nn.ReLU()

    def forward(self, x):
        # Apply first conv layer, activation, and pooling
        x = self.pool(self.relu(self.conv1(x)))
        
        # Apply second conv layer, activation, and pooling
        x = self.pool(self.relu(self.conv2(x)))
        
        # Flatten the output from convolutional layers
        x = x.view(-1, 32 * 75 * 75)
        
        # Apply fully connected layers
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        
        return x

In [10]:

# Define input, hidden, and output sizes
input_size = 3 * 300 * 300  # 270,000 for RGB images
hidden_size = 128  # You can tune this based on your needs
output_size = 3  # 3 classes: 'cherry', 'strawberry', 'tomato'

# Instantiate the model
model = MLP(input_size, hidden_size, output_size)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()  # Suitable for multi-class classification
optimizer = optim.Adam(model.parameters(), lr=0.001)  # You can adjust the learning rate

# Set the model to training mode
model.train()

# Training loop
num_epochs = 10  # Number of training epochs (can be adjusted)
for epoch in range(num_epochs):
    running_loss = 0.0
    for batch_idx, (data, labels) in enumerate(train_loader):
        # Flatten the input images
        data = data.view(data.size(0), -1)  # [batch_size, 270000]

        # Forward pass: compute predictions
        outputs = model(data)
        
        # Compute loss
        loss = criterion(outputs, labels)

        # Backward pass: compute gradients
        optimizer.zero_grad()  # Clear the previous gradients
        loss.backward()

        # Update weights
        optimizer.step()

        # Track loss
        running_loss += loss.item()

    # Print average loss for the epoch
    print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {running_loss / len(train_loader):.4f}")

print("Training complete!")


Epoch [1/10], Loss: 5.3227
Epoch [2/10], Loss: 1.1408
Epoch [3/10], Loss: 1.1185
Epoch [4/10], Loss: 0.9707
Epoch [5/10], Loss: 1.0370
Epoch [6/10], Loss: 0.9426
Epoch [7/10], Loss: 0.9707
Epoch [8/10], Loss: 0.9240
Epoch [9/10], Loss: 0.9562
Epoch [10/10], Loss: 0.8540
Training complete!


In [14]:
def test_model(model, test_loader):
    model.eval()  # Set the model to evaluation mode
    correct = 0
    total = 0

    # Disable gradient calculation for inference
    with torch.no_grad():
        for data, labels in test_loader:  # Use your test DataLoader here
            data = data.view(data.size(0), -1)  # Flatten the input images

            # Forward pass: compute predictions
            outputs = model(data)
            
            # Get the predicted class by finding the index of the max log-probability
            _, predicted = torch.max(outputs.data, 1)

            # Update total number of predictions
            total += labels.size(0)
            
            # Update correct predictions
            correct += (predicted == labels).sum().item()

    # Calculate and print accuracy
    accuracy = 100 * correct / total
    print(f'Accuracy of the model on the test images: {accuracy:.2f}%')

Accuracy of the model on the test images: 49.71%


In [16]:
test_model(model, test_loader)

Accuracy of the model on the test images: 49.71%
