In [35]:
import pandas as pd
import numpy as np
import ast

import torch
from torch.utils.data import Dataset, DataLoader, Subset
from torch import nn, optim
from torchvision import datasets, transforms, utils, models
from torchsummary import summary

import matplotlib.pyplot as plt
from PIL import Image
import os
import altair as alt
alt.data_transformers.enable("vegafusion")


DataTransformerRegistry.enable('vegafusion')

## Data Cleaning

In [2]:
labels = pd.read_csv('data/small_data/labels.csv', index_col=0)
print(labels.shape)
labels.head()

(2218, 7)


Unnamed: 0,index,genes,sex,origin,price,birth,url
0,0-0,"['Yellow Belly', 'Pastel', 'Het Puzzle', '50% ...",female,Self Produced,650.0,7th April 2023,https://www.morphmarket.com/us/c/reptiles/pyth...
1,1-0,"['Piebald', 'Albino']",female,Self Produced,450.0,15th October 2023,https://www.morphmarket.com/us/c/reptiles/pyth...
2,1-1,"['Piebald', 'Albino']",female,Self Produced,450.0,15th October 2023,https://www.morphmarket.com/us/c/reptiles/pyth...
3,2-0,"['Butter', 'Yellow Belly', 'Hurricane', 'Leopa...",male,Self Produced,750.0,1st May 2022,https://www.morphmarket.com/us/c/reptiles/pyth...
4,2-1,"['Butter', 'Yellow Belly', 'Hurricane', 'Leopa...",male,Self Produced,750.0,1st May 2022,https://www.morphmarket.com/us/c/reptiles/pyth...


In [3]:
# 'Normal' if there is no genes
labels.loc[labels["genes"] == "[]", "genes"] = '["Normal"]'

In [4]:
clean_genes = []
list_genes = [ast.literal_eval(gene) for gene in labels['genes']]
for lst in list_genes:
    for element in lst:
        if "Het" in element:
            clean_genes.append('Het' + ' '.join(element.split('Het')[1:]))
        else:
            clean_genes.append(element)

clean_possible_genes = list(set(clean_genes))
print(f'Number of possible genes: {len(clean_possible_genes)}')
clean_possible_genes[:5]

Number of possible genes: 147


['KRG', 'Sulfur', 'Red Axanthic', 'Ajax', 'Het Axanthic (TSK)']

In [5]:
gene_extension_df = pd.DataFrame(np.zeros([labels.shape[0], len(clean_possible_genes)]), dtype=int, columns=clean_possible_genes)
labels_extended = pd.concat([labels, gene_extension_df], axis=1)
labels_extended.head()

Unnamed: 0,index,genes,sex,origin,price,birth,url,KRG,Sulfur,Red Axanthic,...,Super Enchi,Het Metal Flake,Sunset,Spotnose,Het Puzzle,Confusion,Wookie,Phantom,Lucifer,Het Lavender Albino
0,0-0,"['Yellow Belly', 'Pastel', 'Het Puzzle', '50% ...",female,Self Produced,650.0,7th April 2023,https://www.morphmarket.com/us/c/reptiles/pyth...,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,1-0,"['Piebald', 'Albino']",female,Self Produced,450.0,15th October 2023,https://www.morphmarket.com/us/c/reptiles/pyth...,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2,1-1,"['Piebald', 'Albino']",female,Self Produced,450.0,15th October 2023,https://www.morphmarket.com/us/c/reptiles/pyth...,0,0,0,...,0,0,0,0,0,0,0,0,0,0
3,2-0,"['Butter', 'Yellow Belly', 'Hurricane', 'Leopa...",male,Self Produced,750.0,1st May 2022,https://www.morphmarket.com/us/c/reptiles/pyth...,0,0,0,...,0,0,0,0,0,0,0,0,0,0
4,2-1,"['Butter', 'Yellow Belly', 'Hurricane', 'Leopa...",male,Self Produced,750.0,1st May 2022,https://www.morphmarket.com/us/c/reptiles/pyth...,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [6]:
len([len(lst) for lst in list_genes])

2218

In [41]:
start_row = 0
count = 0
for gene_col in clean_genes:
    labels_extended.loc[start_row, gene_col] = 1
    count += 1
    if count == [len(lst) for lst in list_genes][start_row]:
        start_row += 1
        count = 0
labels_extended.head()

Unnamed: 0,index,genes,sex,origin,price,birth,url,KRG,Sulfur,Red Axanthic,...,Super Enchi,Het Metal Flake,Sunset,Spotnose,Het Puzzle,Confusion,Wookie,Phantom,Lucifer,Het Lavender Albino
0,0-0,"['Yellow Belly', 'Pastel', 'Het Puzzle', '50% ...",female,Self Produced,650.0,7th April 2023,https://www.morphmarket.com/us/c/reptiles/pyth...,0,0,0,...,0,0,0,0,1,0,0,0,0,0
1,1-0,"['Piebald', 'Albino']",female,Self Produced,450.0,15th October 2023,https://www.morphmarket.com/us/c/reptiles/pyth...,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2,1-1,"['Piebald', 'Albino']",female,Self Produced,450.0,15th October 2023,https://www.morphmarket.com/us/c/reptiles/pyth...,0,0,0,...,0,0,0,0,0,0,0,0,0,0
3,2-0,"['Butter', 'Yellow Belly', 'Hurricane', 'Leopa...",male,Self Produced,750.0,1st May 2022,https://www.morphmarket.com/us/c/reptiles/pyth...,0,0,0,...,0,0,0,0,0,0,0,0,0,0
4,2-1,"['Butter', 'Yellow Belly', 'Hurricane', 'Leopa...",male,Self Produced,750.0,1st May 2022,https://www.morphmarket.com/us/c/reptiles/pyth...,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [8]:
assert list(labels_extended.select_dtypes('int').sum(axis=1)) == [len(lst) for lst in list_genes]

In [57]:
alt.Chart(labels_extended.iloc[:, 7:].melt(var_name='column')).mark_bar().encode(
    y=alt.Y('column').sort('-x'),
    x=alt.X('sum(value)')
)

## Load Data

In [9]:
image = Image.open("data/small_data/img/0-0.png")

# Convert the image to a tensor
transform = transforms.ToTensor()
tensor_image = transform(image)
tensor_image.shape

torch.Size([3, 1420, 1420])

In [10]:
class PythonGeneDataset(Dataset):
    def __init__(self, labels_df, img_dir, indices=None, transform=None):
        self.labels_df = labels_df
        if indices is not None:
            self.labels_df = self.labels_df.iloc[indices]
        self.img_dir = img_dir
        self.transform = transform
    
    def __len__(self):
        return len(self.labels_df)

    def __getitem__(self, idx):
        img_name = os.path.join(self.img_dir, f"{self.labels_df.iloc[idx, 0]}.png")
        image = Image.open(img_name)
        # Parse labels here based on your CSV structure and required format
        labels = torch.tensor(self.labels_df.iloc[idx, 7:].astype('float32').values)
        
        if self.transform:
            image = self.transform(image)

        return image, labels


In [11]:
IMAGE_SIZE = 224

transform = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.ToTensor()])

full_dataset = PythonGeneDataset(labels_df=labels_extended, img_dir='data/small_data/img/', transform=transform)

# Split dataset
total_size = len(full_dataset)
train_size = int(0.8 * total_size)
valid_size = total_size - train_size
train_indices, valid_indices = torch.utils.data.random_split(np.arange(total_size), [train_size, valid_size])

# Create train and validation datasets
train_dataset = Subset(full_dataset, train_indices)
valid_dataset = Subset(full_dataset, valid_indices)

# Initialize DataLoaders
BATCH_SIZE = 256
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=BATCH_SIZE, shuffle=False)


In [12]:
# class PythonGeneClassifier(nn.Module):
#     def __init__(self, num_classes):
#         super(PythonGeneClassifier, self).__init__()
#         # Increasing the complexity of the network
#         self.feature_extractor = nn.Sequential(
#             nn.Conv2d(3, 16, kernel_size=3, padding=1),
#             nn.ReLU(),
#             nn.MaxPool2d(2, 2),
#             nn.Conv2d(16, 32, kernel_size=3, padding=1),
#             nn.ReLU(),
#             nn.MaxPool2d(2, 2),
#             nn.Conv2d(32, 64, kernel_size=3, padding=1),
#             nn.ReLU(),
#             nn.MaxPool2d(2, 2),
#             nn.Dropout(0.25)  # Adding dropout for regularization
#         )

#         # Flatten layer is moved to the forward function
#         self.classifier = nn.Sequential(
#             nn.Linear(64 * 4 * 4, 512),  # Adjusted for 32x32 input images; calculate accordingly
#             nn.ReLU(),
#             nn.Dropout(0.5),
#             nn.Linear(512, num_classes)
#         )

#     def forward(self, x):
#         x = self.feature_extractor(x)
#         x = x.view(x.size(0), -1)  # Flatten the tensor
#         x = self.classifier(x)
#         return x

### Transfer Learning (DenseNet)

In [13]:
densenet = models.densenet121(weights='DenseNet121_Weights.DEFAULT')

for param in densenet.parameters():  # Freeze parameters so we don't update them
    param.requires_grad = False

In [14]:
densenet.classifier

Linear(in_features=1024, out_features=1000, bias=True)

In [15]:
num_labels = len(clean_possible_genes)
new_layers = nn.Sequential(
    nn.Linear(1024, 500),  # Reduce dimension from 1024 to 500
    nn.BatchNorm1d(500),   # Normalize the activations from the previous layer
    nn.ReLU(),             # Non-linear activation function
    nn.Dropout(0.5),       # Dropout for regularization (50% probability)
    nn.Linear(500, num_labels)  # Final layer for class predictions
)
densenet.classifier = new_layers
densenet.classifier

Sequential(
  (0): Linear(in_features=1024, out_features=500, bias=True)
  (1): BatchNorm1d(500, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (2): ReLU()
  (3): Dropout(p=0.5, inplace=False)
  (4): Linear(in_features=500, out_features=147, bias=True)
)

In [16]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using: {device}')
densenet.to(device)
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(densenet.parameters(), lr=0.001)

Using: cuda


In [17]:
def train_model(model, criterion, optimizer, num_epochs):
    for epoch in range(num_epochs):
        model.train()  # Set model to training mode
        train_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            train_loss += loss.item() * inputs.size(0)

        # Calculate average loss for the epoch
        train_loss = train_loss / len(train_loader.dataset)

        # Validation of the model
        model.eval()  # Set model to evaluate mode
        valid_loss = 0.0
        with torch.no_grad():
            for inputs, labels in valid_loader:
                inputs, labels = inputs.to(device), labels.to(device)

                outputs = model(inputs)
                loss = criterion(outputs, labels)
                valid_loss += loss.item() * inputs.size(0)

        # Calculate average loss over validation data
        valid_loss = valid_loss / len(valid_loader.dataset)

        # Print training/validation statistics
        print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Valid Loss: {valid_loss:.4f}')

# Call to train the model
num_epochs = 30  # Set the number of epochs
train_model(densenet, criterion, optimizer, num_epochs)


Epoch 1/30, Train Loss: 0.5065, Valid Loss: 0.4615
Epoch 2/30, Train Loss: 0.2011, Valid Loss: 0.2542
Epoch 3/30, Train Loss: 0.1118, Valid Loss: 0.1227
Epoch 4/30, Train Loss: 0.0867, Valid Loss: 0.0889
Epoch 5/30, Train Loss: 0.0771, Valid Loss: 0.0797
Epoch 6/30, Train Loss: 0.0713, Valid Loss: 0.0753
Epoch 7/30, Train Loss: 0.0679, Valid Loss: 0.0719
Epoch 8/30, Train Loss: 0.0647, Valid Loss: 0.0698
Epoch 9/30, Train Loss: 0.0617, Valid Loss: 0.0684
Epoch 10/30, Train Loss: 0.0597, Valid Loss: 0.0669
Epoch 11/30, Train Loss: 0.0574, Valid Loss: 0.0660
Epoch 12/30, Train Loss: 0.0554, Valid Loss: 0.0648
Epoch 13/30, Train Loss: 0.0533, Valid Loss: 0.0640
Epoch 14/30, Train Loss: 0.0512, Valid Loss: 0.0635
Epoch 15/30, Train Loss: 0.0492, Valid Loss: 0.0626
Epoch 16/30, Train Loss: 0.0477, Valid Loss: 0.0617
Epoch 17/30, Train Loss: 0.0459, Valid Loss: 0.0612
Epoch 18/30, Train Loss: 0.0444, Valid Loss: 0.0607
Epoch 19/30, Train Loss: 0.0430, Valid Loss: 0.0602
Epoch 20/30, Train Lo

### CNN from scratch

In [18]:
# model = PythonGeneClassifier(num_classes=len(clean_possible_genes))
# criterion = nn.BCEWithLogitsLoss()
# optimizer = optim.Adam(model.parameters())

# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# print(f'Training on {device}')
# model.to(device)

# summary(model, input_size=(3, IMAGE_SIZE, IMAGE_SIZE))

In [19]:
# def trainer(model, criterion, optimizer, trainloader, validloader, epochs=20, patience=5, verbose=True):
#     """Simple training wrapper for PyTorch network."""
#     train_loss, valid_loss, valid_accuracy = [], [], []
#     consec_increases = 0
#     for epoch in range(epochs):  # for each epoch
#         train_batch_loss = 0
#         valid_batch_loss = 0
#         valid_batch_acc = 0
#         # Training
#         for X, y in trainloader:
#             X, y = X.to(device), y.to(device)
#             optimizer.zero_grad()
#             y_hat = model(X)
#             loss = criterion(y_hat, y)
#             loss.backward()
#             optimizer.step()
#             train_batch_loss += loss.item()
#         train_loss.append(train_batch_loss / len(trainloader))
#         # Validation
#         with torch.no_grad():
#             for X, y in validloader:
#                 X, y = X.to(device), y.to(device)
#                 y_hat = model(X)
#                 loss = criterion(y_hat, y)
#                 valid_batch_loss += loss.item()
#                 _, predicted = torch.max(y_hat.data, 1)
#                 valid_batch_acc += (predicted == y).sum().item() / y.size(0)
#         valid_loss.append(valid_batch_loss / len(validloader))
#         valid_accuracy.append(valid_batch_acc / len(validloader))  # accuracy
#         model.train()
#         # Print progress
#         if verbose:
#             print(f"Epoch {epoch + 1}:",
#                   f"Train Loss: {train_loss[-1]:.3f}.",
#                   f"Valid Loss: {valid_loss[-1]:.3f}.",
#                   f"Valid Accuracy: {valid_accuracy[-1]:.2f}.")
#         # Early stopping
#         if epoch > 0 and valid_loss[-1] > valid_loss[-2]:
#             consec_increases += 1
#         else:
#             consec_increases = 0
#         if consec_increases == patience:
#             print(f"Stopped early at epoch {epoch + 1:3}: val loss increased for {consec_increases} consecutive epochs!")
#             break
#     results = {"train_loss": train_loss, "valid_loss": valid_loss, "valid_accuracy": valid_accuracy}
#     return results

## Prediction

In [32]:
img_code = 'test-1'
img = Image.open(f'data/small_data/img/{img_code}.png')
input_img = transform(img)
input_img = input_img.unsqueeze(0)
input_img.shape

densenet.eval()

# If using GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
densenet.to(device)
input_img = input_img.to(device)

with torch.no_grad():
    output = densenet(input_img)

predicted_probs = torch.sigmoid(output).to('cpu')

prediction = pd.DataFrame(predicted_probs, index=['predictions'],
                          columns=clean_possible_genes).T.sort_values(by=['predictions'], ascending=False)

# True labels
print(labels_extended.query(f'index == "{img_code}"').genes.to_list())

prediction.head(10)

[]


Unnamed: 0,predictions
Mojave,0.508364
Het Piebald,0.211932
Cinnamon,0.155002
Black Head,0.084467
Het Hypo,0.06713
Lesser,0.063807
Yellow Belly,0.051517
Super Pastel,0.046892
Banana,0.046526
Het Albino,0.03648
