In [2]:
import pandas as pd
import cv2
from matplotlib import pyplot as plt
import numpy as np
from typing import Callable
from skimage.feature import peak_local_max
from skimage.segmentation import watershed
from scipy import ndimage
import os
import json
from tqdm import tqdm


In [3]:
# This utility function will be used to display images in the notebook

def display_image(mat, axes=None, cmap=None, hide_axis=True):
    """
    Display a given matrix into Jupyter's notebook
    
    :param mat: Matrix to display
    :param axes: Subplot on which to display the image
    :param cmap: Color scheme to use
    :param hide_axis: If `True` axis ticks will be hidden
    :return: Matplotlib handle
    """
    img = cv2.cvtColor(mat, cv2.COLOR_BGR2RGB) if mat.ndim == 3 else mat
    cmap= cmap if mat.ndim != 2 or cmap is not None else 'gray'
    if axes is None:
        if hide_axis:
            plt.xticks([])
            plt.yticks([])
        return plt.imshow(img, cmap=cmap)
    else:
        if hide_axis:
            axes.set_xticks([])
            axes.set_yticks([])
        return axes.imshow(img, cmap=cmap)

In [5]:
# Preprocessing function that will be used to prepare the image for coin detection
# Preprocessing steps:
    # 1. Resize the image to a smaller size
    # 2. Convert the image to grayscale
    # 3. Adjust the brightness of the image
    # 4. Apply inRange to keep only the pixels within the bounds
    # 5. Apply closing and opening morphological operations to remove noise
    # 6. Return the preprocessed image
def preprocess(image,offset=180, size_ratio=0.1,lower_bound=np.array([0, 85, 150]),upper_bound=np.array([155,205,255])):
    image = cv2.resize(image, (int(image.shape[1]*size_ratio), int(image.shape[0]*size_ratio)), interpolation=cv2.INTER_AREA)
    gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    brightness = cv2.mean(gray_image)[0]
    brightness_offset=offset-brightness
    image_float = image.astype(np.float32)
    image = np.clip(image_float + brightness_offset, 0, 255).astype(np.uint8)
    # Apply inRange to keep only the pixels within the bounds
    mask = cv2.inRange(image, lower_bound, upper_bound)

    # Invert the mask to get a mask for pixels outside the bounds
    mask_inv = cv2.bitwise_not(mask)

    # Create a white image of the same size as the original image
    white_image = np.ones_like(image) * 255

    # Use the inverted mask to turn the pixels outside the bounds in the white image to white
    white_parts = cv2.bitwise_and(white_image, white_image, mask=mask_inv)

    # Use the original mask to keep the pixels within the bounds in the original image
    colored_parts = cv2.bitwise_and(image, image, mask=mask)

    # Combine the two parts
    result = cv2.add(white_parts, colored_parts)
    # Define the structuring element
    kernel1 = np.ones((1,1),np.uint8)

    # Apply the opening operation
    opened = cv2.morphologyEx(result, cv2.MORPH_OPEN, kernel1)

    # Apply the closing operation
    closed = cv2.morphologyEx(opened, cv2.MORPH_CLOSE, kernel1)

    
    kernel2 = np.ones((1,1),np.uint8)
    final = cv2.morphologyEx(closed, cv2.MORPH_CLOSE, kernel2)
    final = cv2.morphologyEx(final, cv2.MORPH_OPEN, kernel2)
    return final

# Function detect coins accepts an image preprocessed and detects the coins in it
# The function uses the Hough Circle Transform to detect the coins in the image
# The function returns the original image with the detected coins drawn on it, the number of coins detected, the centers of the detected coins, and the segmented coin images
def detect_coins(image, init_image, param1 = 150, param2 = 15, size_ratio=0.1):
    min_radius=int(150*size_ratio)
    max_radius=int(400*size_ratio)

    # Convert image to grayscale
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    # Apply Gaussian blur to reduce noise
    blurred = cv2.GaussianBlur(gray, (5, 5), 0)
    edges = cv2.Canny(blurred, 150, 25)
    kernel = np.ones((5,5),np.uint8)
    thresh = cv2.morphologyEx(edges, cv2.MORPH_CLOSE, kernel)
    circles = cv2.HoughCircles(thresh, cv2.HOUGH_GRADIENT, dp=1, minDist=3*min_radius,
                               param1=param1, param2=param2, minRadius=min_radius, maxRadius=max_radius)
    circles_rescaled = circles/size_ratio
    centers = []  # List to store detected centers
    coin_images = []  # List to store segmented coin images
    if (circles_rescaled is not None):
        if  (len(circles_rescaled[0]) < 25):
            number_circles = len(circles_rescaled[0])
            circles_rescaled = np.round(circles_rescaled[0, :]).astype("int")
            for (x, y, r) in circles_rescaled:
                # Draw circle boundary (commented out so that final segmentations of coins don't contain drawings)
                # cv2.circle(init_image, (x, y), r, (0, 255, 0), 4)
                # Draw square bounding box centered on coin center (commented out so that final segmentations of coins don't contain drawings)
                x1, y1 = x - int(r), y - int(r)
                x2, y2 = x + int(r), y + int(r)
                #cv2.rectangle(init_image, (x1, y1), (x2, y2), (255, 0, 0), 2)
                centers.append((x*10, y*10))

                coin_image = init_image[y1:y2, x1:x2]
                coin_images.append(coin_image)
        else:
            number_circles = 0
    else:
        number_circles = 0

    return init_image, number_circles, centers, coin_images

### Segmentation of Train Images

In [None]:
# Directory containing images
directory = './train/'
save_directory = './image_segmentation/'
image_subfolders =['1. neutral_bg','2. noisy_bg', '3. hand', '4. neutral_bg_outliers','5. noisy_bg_outliers', '6. hand_outliers']
# List to store images
images = []
data = {}
# Iterate over files in the directory
i = 0
for context in image_subfolders:
    image_directory = os.path.join(directory, context)
    for filename in os.listdir(image_directory):
        if filename.endswith('.JPG'):
            # Load image
            image = cv2.imread(os.path.join(image_directory, filename))
            if image is not None:
                # Append image to list
                images.append(image)
                image = detect_coins(preprocess(image), init_image=image)
                data[f"{context}/{filename}"] = [[int(i[0]),int(i[1])] for i in image[2]]
                #save image
                save_path = os.path.join(save_directory, context, filename)
                cv2.imwrite(save_path, image[0])
                
                for coin in image[3]:
                    if not coin.size == 0:  # Check if coin image is not empty
                        coin_name = f'{filename}_coin_{i}.JPG'
                        cv2.imwrite(os.path.join(save_directory, 'detected_coins', coin_name), coin)
                        i += 1
                    else:
                        print(f"Error: coin image for {filename} is empty")
            else:
                print(f"Error loading image: {filename}")
            
# Save the marked centers to a JSON file
json_path = "detected_centers.json"
with open(json_path, "w") as json_file:
    json.dump(data, json_file, indent=4)
print(f"Marked centers saved to {json_path}")

### Segmentation of Ref Images

In [6]:
# Directory containing images
directory ='./ref'
save_directory = directory+'/segmentation'
# List to store images
images = []
# Iterate over files in the directory
i = 0
for filename in os.listdir(directory):
    if filename.endswith('.JPG'):
        # Load image
        image = cv2.imread(os.path.join(directory, filename))
        if image is not None:
            # Append image to list
            images.append(image)
            image = detect_coins(preprocess(image), init_image=image)
            #save image
            save_path = os.path.join(save_directory, filename)
            cv2.imwrite(save_path, image[0])
                
            for coin in image[3]:
                if not coin.size == 0:  # Check if coin image is not empty
                    coin_name = f'{filename}_coin_{i}.JPG'                   
                    cv2.imwrite(os.path.join(save_directory, 'detected_coins', coin_name), coin)
                    i += 1
                else:
                    print(f"Error: coin image for {filename} is empty")
        else:
            print(f"Error loading image: {filename}")

### Segmentation of Test Images

In [5]:
# Directory containing images
directory = './test'
save_directory = directory+'/segmentation'
# List to store images
images = []
# Iterate over files in the directory
i = 0
for filename in os.listdir(directory):
    if filename.endswith('.JPG'):
        # Load image
        image = cv2.imread(os.path.join(directory, filename))
        if image is not None:
            # Append image to list
            images.append(image)
            image = detect_coins(preprocess(image), init_image=image)
            #save image
            save_path = os.path.join(save_directory, filename)
            cv2.imwrite(save_path, image[0])
                
            for coin in image[3]:
                if not coin.size == 0:  # Check if coin image is not empty
                    coin_name = f'{filename}_coin_{i}.JPG'                   
                    cv2.imwrite(os.path.join(save_directory, 'detected_coins', coin_name), coin)
                    i += 1
                else:
                    print(f"Error: coin image for {filename} is empty")
        else:
            print(f"Error loading image: {filename}")
            

Error: coin image for L0000103.JPG is empty
Error: coin image for L0000140.JPG is empty
Error: coin image for L0000045.JPG is empty
Error: coin image for L0000095.JPG is empty


## Classification

In [None]:
import os
import json
from PIL import Image
from PIL.Image import Resampling
from sklearn.model_selection import train_test_split
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
import torch
import torch.nn as nn
import torchvision.models as models
import torch.optim as optim
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from torchvision.models import  ResNet50_Weights
from torch.optim import lr_scheduler



### Data preparation and splitting

We create a custom dataset class to load the data and split it into train and validation sets. We also create a custom collate function to pad the sequences to the same length so we don't loose information when resizing when using the resnet50 model because it uses images of size 224x224.
Since our segmentation may produce images without coins sometimes, we added a label for the background class.

In [None]:
# Define the class labels
classes = ["5CHF", "2CHF", "1CHF", "0.5CHF", "0.2CHF", "0.1CHF", "0.05CHF", "2EUR", "1EUR", "0.5EUR",
           "0.2EUR", "0.1EUR", "0.05EUR", "0.02EUR", "0.01EUR", "OOD", "BG"]

# Custom Dataset Class
class CoinDataset(Dataset):
    def __init__(self, labels, img_dir, transform=None):
        self.labels = labels
        self.img_dir = img_dir
        self.transform = transform

        # Convert labels from dictionary to list of tuples
        self.data = list(self.labels.items())

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_name, label = self.data[idx]
        img_path = os.path.join(self.img_dir, img_name)
        image = Image.open(img_path)
        if self.transform:
            image = self.transform(image)
        return image, torch.tensor(classes.index(label))  # Convert label to tensor index


class ResizeWithPad:
    def __init__(self, target_size):
        self.target_size = target_size

    def __call__(self, image):
        # Resize the image maintaining the aspect ratio
        image.thumbnail(self.target_size, Resampling.LANCZOS)
        # Create a new image with a white background
        new_image = Image.new("RGB", self.target_size, (255, 255, 255))
        # Paste the resized image onto the new image, centered
        new_image.paste(image, ((self.target_size[0] - image.size[0]) // 2, (self.target_size[1] - image.size[1]) // 2))
        return new_image

# Data Transformations
transform_train = transforms.Compose([
    ResizeWithPad((224, 224)),
    transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.05),
    transforms.RandomAdjustSharpness(sharpness_factor=2),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
transform_val = transforms.Compose([
    ResizeWithPad((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])



# Load the JSON file
with open('./labels.json') as f:
    labels = json.load(f)

# Split the data into training and validation sets
train_labels, val_labels = train_test_split(list(labels.items()), test_size=0.1, random_state=42)

# Convert back to dictionaries for the Dataset class
train_labels = dict(train_labels)
val_labels = dict(val_labels)

# Set the image directory
img_dir = '/detected_coins/'

# Create datasets
train_dataset = CoinDataset(labels=train_labels, img_dir=img_dir, transform=transform_train)
val_dataset = CoinDataset(labels=val_labels, img_dir=img_dir, transform=transform_val)

# Create dataloaders
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)

### Model preparation


In [None]:

num_classes = len(classes)  # 15 classes + 1 for OOD + 1 for BG

model = models.resnet50(weights=ResNet50_Weights.IMAGENET1K_V2)

# Replace the last fully connected layer
model.fc = nn.Linear(model.fc.in_features, num_classes)

for param in model.parameters():
    param.requires_grad = True


# Move the model to the appropriate device (GPU or CPU)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)


### Training

In [None]:
best_model_path = './v2.pth'

def train(model, train_loader, val_loader, criterion, optimizer,scheduler, num_epochs):

    model.train()
    for epoch in range(num_epochs):

        running_loss = 0.0
        running_corrects = 0
        best_val_accuracy = 0.0

        for images, labels in train_loader:
            images = images.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()

            outputs = model(images)
            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)

            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            running_corrects += torch.sum(preds == labels.data)

        train_loss = running_loss / len(train_loader)
        train_accuracy = running_corrects.double() / len(train_loader.dataset)


        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0


        with torch.no_grad():
            for images, labels in val_loader:
                images = images.to(device)
                labels = labels.to(device)

                outputs = model(images)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        val_loss /= len(val_loader)
        val_accuracy = 100 * correct / total
        # Save the best model
        if val_accuracy > best_val_accuracy:
          best_val_accuracy = val_accuracy
          torch.save(model.state_dict(), best_model_path)

        scheduler.step(val_loss)

        print(f'Epoch [{epoch+1}/{num_epochs}], Training Loss: {train_loss*100}, Training Accuracy: {train_accuracy}, Validation Loss: {val_loss}, Validation Accuracy: {val_accuracy}%')


In [None]:
optimizer = optim.Adam(model.fc.parameters(), lr=0.01,weight_decay=1e-3)
exp_lr_scheduler = lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=4)
criterion = nn.CrossEntropyLoss()
train(model, train_loader, val_loader, criterion, optimizer,exp_lr_scheduler, num_epochs=10)

## Prediction

In [None]:
df_out = pd.read_csv('./sample_submission.csv')
df_out

In [None]:
# Load the model
model = models.resnet50(weights=ResNet50_Weights.IMAGENET1K_V2)
num_classes = len(classes)
model.fc = torch.nn.Linear(model.fc.in_features, num_classes)
model.load_state_dict(torch.load(best_model_path))
model.eval()
model = model.to('cuda' if torch.cuda.is_available() else 'cpu')

# Define the transform
transform = transforms.Compose([
    ResizeWithPad((224,224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

In [None]:
def predict_image(image_path, model, transform, device):
    image = Image.open(image_path)
    image = transform(image).unsqueeze(0).to(device)
    with torch.no_grad():
        outputs = model(image)
        _, predicted = torch.max(outputs, 1)
    return predicted.item()

In [None]:
test_dir = './segments_wo_bb/'
for image_name in os.listdir(test_dir):
  image_path = os.path.join(test_dir, image_name)
  image_id = image_name.split('_')[0]
  image_id = image_id[:8]
  predicted_label = predict_image(image_path, model, transform, device)
  #dismiss images classfied as background
  if classes[predicted_label] != "BG":
    row_idx = df_out[df_out['id'] == image_id].index[0]
    df_out.loc[row_idx, classes[predicted_label]] += 1




In [None]:
#to csv for submission
df_out.to_csv('sub.csv')