In [1]:
import torch
import cv2
from datetime import datetime
from sklearn.model_selection import train_test_split
import numpy as np
import pandas as pd

from pathlib import Path

In [None]:
# Check cuda is available and if so, set DEVICE to use gpu device
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {DEVICE}")

In [2]:
def image_preprocess(id: int, image_path: Path = Path("data/unzipped/images_training_rev1")):
    # Read in the image
    img = cv2.imread(image_path / (str(id) + ".jpg"))

    # Convert from BGR to RGB
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

    # Crop the image to 192x192 centered on the center pixel
    center_x, center_y = img.shape[1] // 2, img.shape[0] // 2
    start_x, start_y = center_x - 96, center_y - 96
    img = img[start_y : start_y + 192, start_x : start_x + 192]

    # Downsample to 64x64
    img = cv2.resize(img, (64, 64))

    # Normalize to [0, 1]
    img = img.astype("float32") / 255

    return img


In [3]:
def label_preprocess(data: pd.Series):
    return np.array([data.iloc[1:].to_numpy()]).astype(np.float32)

In [4]:
def prep_data(data: pd.DataFrame, test_size: float = 0.2, val_size: float = 0.5, number_of_galaxies: int = None):
    # Handle number of galaxies
    number_of_galaxies = len(data) if number_of_galaxies is None else number_of_galaxies

    def make_dataset(data: pd.DataFrame):
        labels = data.apply(lambda x: label_preprocess(x), axis=1)
        images = data["GalaxyID"].apply(lambda x: image_preprocess(x))

        input_data = torch.Tensor(np.stack(images.to_numpy())).permute(0, 3, 1, 2).to(DEVICE)
        output_data = torch.Tensor(np.stack(labels.to_numpy())).squeeze(1).to(DEVICE)

        dataset = torch.utils.data.TensorDataset(input_data, output_data)
        return dataset

    # Split data into train and test
    train_data, test_data = train_test_split(data.sample(number_of_galaxies), test_size=test_size)
    test_data, val_data = train_test_split(test_data, test_size=val_size)

    # Create training dataset
    train_dataset = make_dataset(train_data)

    # Create testing dataset
    test_dataset = make_dataset(test_data)

    # Create validation dataset
    val_dataset = make_dataset(val_data)

    return train_dataset, test_dataset, val_dataset

In [5]:
# Define the training and testing functions
def test(model, test_loader, criterion):
    model.eval()
    test_loss = 0
    with torch.no_grad():
        for data, target in test_loader:
            output = model(data)
            test_loss += criterion(output, target).item()  # sum up batch loss
    test_loss /= len(test_loader)
    return test_loss


def train(
    model: torch.nn.Module,
    epochs: int,
    train_loader: torch.utils.data.DataLoader,
    test_loader: torch.utils.data.DataLoader,
    optimizer: torch.optim.Optimizer,
    scheduler: torch.optim.lr_scheduler.LRScheduler,
    criterion: torch.nn.Module,
):
    train_losses = []
    test_losses = []
    for epoch in range(1, epochs + 1):
        model.train()
        epoch_loss = 0.0
        for batch_idx, (data, target) in enumerate(train_loader):
            # Run data through model and calculate loss
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)

            # Backpropagate
            loss.backward()
            optimizer.step()

            # Logging
            epoch_loss += loss.item()
            if batch_idx % 100 == 0:
                print(
                    "{} Train Epoch: {:>4} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
                        datetime.now().strftime("%m/%d %H:%M:%S"),
                        epoch,
                        batch_idx * len(data),
                        len(train_loader.dataset),
                        100.0 * batch_idx / len(train_loader),
                        loss.item(),
                    ),
                    end="\r",
                )

        scheduler.step()

        # Calculate performance on test set
        test_loss = test(model, test_loader, criterion)
        print(
            "{} Train Epoch: {:>4} [{}/{} ({:.0f}%)]\tTrain Loss: {:.6f}\tTest Loss: {:.6f}".format(
                datetime.now().strftime("%m/%d %H:%M:%S"),
                epoch,
                len(train_loader.dataset),
                len(train_loader.dataset),
                100.0,
                epoch_loss / len(train_loader),
                test_loss,
            )
        )

        # Track loss on train and test sets
        train_losses.append(epoch_loss / len(train_loader))
        test_losses.append(test_loss)

    return train_losses, test_losses

In [None]:
def break_out_solution(solutions):
    # Convert to numpy
    solutions = solutions.cpu().detach().numpy()

    # Columns 1-3 are class 1
    class1 = np.argmax(solutions[:, :3], axis=1)

    # Columns 4-5 are class 2
    class2 = np.argmax(solutions[:, 4:6], axis=1)

    # Columns 6-7 are class 3
    class3 = np.argmax(solutions[:, 6:8], axis=1)

    # Columns 8-9 are class 4
    class4 = np.argmax(solutions[:, 8:10], axis=1)

    # Columns 10-13 are class 5
    class5 = np.argmax(solutions[:, 10:14], axis=1)

    # Columns 14-15 are class 6
    class6 = np.argmax(solutions[:, 14:16], axis=1)

    # Columns 16-18 are class 7
    class7 = np.argmax(solutions[:, 16:19], axis=1)

    # Columns 19-25 are class 8
    class8 = np.argmax(solutions[:, 19:26], axis=1)

    # Columns 26-28 are class 9
    class9 = np.argmax(solutions[:, 26:29], axis=1)

    # Columns 29-31 are class 10
    class10 = np.argmax(solutions[:, 29:32], axis=1)

    # Columns 32-37 are class 11
    class11 = np.argmax(solutions[:, 32:37], axis=1)

    return [class1, class2, class3, class4, class5, class6, class7, class8, class9, class10, class11]