In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import wandb


import torchvision

In [2]:
class MyData(torch.utils.data.Dataset):
    def __init__(self, data, label, transform):
        self.data = torch.tensor(data).unsqueeze(-1).repeat(1, 1, 1, 3).numpy()
        print(self.data.shape)
        self.label = label
        self.transform = transform

    def __getitem__(self, idx):
        return self.transform(self.data[idx]), self.label[idx]

    def __len__(self):
        return len(self.label)

import torchvision.transforms as transforms
from torch.utils.data import DataLoader

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.13015008378839818,), (0.30690425774764835,)),
    # transforms.Pad(padding=(0, 8,0,8)),
    #  transforms.RandomPerspective(distortion_scale=0.6, p=1.0),
    transforms.RandomResizedCrop((40, 20), scale=(0.7, 0.9), ratio=(0.9, 1.1)),   
])


transform_synth = transforms.Compose([
    transforms.ToTensor(),
    # transforms.Normalize((0.13015008378839818,), (0.30690425774764835,)),
    # transforms.Pad(padding=(0, 8,0,8)),
    #  transforms.RandomPerspective(distortion_scale=0.6, p=1.0),
    transforms.RandomResizedCrop((40, 20), scale=(0.7, 0.9), ratio=(0.9, 1.1)),   
])
trainset = torchvision.datasets.MNIST(
    root='./data',  # Directory where data will be stored
    train=True,     # Get the training set
    download=True,  # Download the dataset if it's not already present
    transform=transform  # Apply the defined transformations
)

testset = torchvision.datasets.MNIST(
    root='./data',
    train=False,  # Get the test set
    download=True,
    transform=transform)
    
class SyntheticData(torch.utils.data.Dataset):
    def __init__(self, data, label, transform = transform):
        self.data = data
        self.label = label
        self.transform = transform

    def __getitem__(self, idx):
        length = self.__len__()
        index = torch.randint(0, length, (4,),)
        # print(index, length)
        final_img = []
        final_label = []

        for i in range(4):
            # print(self.data[index[i]].shape,)
            final_img.append(self.transform(self.data[index[i]]))
            final_label.append(self.label[index[i]])
            # print(final_img[-1].shape)
        

        return torch.cat(final_img, -1), torch.stack(final_label, -1)

    def __len__(self):
        return len(self.label)

In [3]:
class ConvNet(nn.Module):
    def __init__(self, input_channels, num_classes):
        super().__init__()
        # Assuming input images are 40 x 168

        self.model = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V1)

        self.num_ftrs = self.model.fc.in_features
        self.model.fc = nn.Linear(self.num_ftrs, 37)
        self.model.fc.requires_grad_(True)
        # self.fc = nn.Linear(self.flattened_size, num_classes)

    def forward(self, x):
        x= self.model(x)
        return x

In [4]:
def train_model(model, train_loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    running_accuracy = 0.0
    for idx, (images, labels) in enumerate(train_loader):
        images = images.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * images.size(0)

        _, predicted = torch.max(outputs, 1)
        correct_predictions = (predicted == labels).sum().item()
        total_samples = labels.size(0)
        accuracy = (correct_predictions / total_samples) * 100
        running_accuracy += accuracy * images.shape[0]

        # if idx % 10 == 1 :
        #     print(accuracy, loss)
        

    epoch_loss = running_loss / len(train_loader.dataset)
    epoch_accuracy = running_accuracy / len(train_loader.dataset)
    return epoch_loss, epoch_accuracy
    

In [5]:
input_channels = 1  
num_classes = 37
learning_rate = 0.001
batch_size = 640
num_epochs = 100


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# model = ConvNet(input_channels=input_channels, num_classes=num_classes).to(device)
# criterion = nn.CrossEntropyLoss() 
# optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

test_data1 = np.load('data2.npy')
test_lab1 = np.load('lab2.npy')
transform = transforms.Compose([
    transforms.ToTensor(),
    # transforms.Lambda(lambda x: x.repeat(3, 1, 1)),  # Repeat the single channel 3 times
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])  # Normalize
])
val_loader = torch.utils.data.DataLoader(MyData(test_data1, test_lab1, transform), batch_size=batch_size, num_workers=7)
# transform = torchvision.transforms.Compose([
#     torchvision.transforms.Resize((28,115)),
#     torchvision.transforms.ToTensor(),
# ])



(10000, 40, 168, 3)


In [6]:
model = torch.load('./resnet50-ema-model-epoch.pth').to(device)

In [7]:
criterion = nn.CrossEntropyLoss()

In [8]:

model.eval()
val_loss = 0.0
val_accuracy = 0.0
with torch.no_grad():
    for idx, (images, labels) in enumerate(val_loader):
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        loss = criterion(outputs, labels)
        val_loss += loss.item() * images.size(0)

        _, predicted = torch.max(outputs, 1)
        correct_predictions = (predicted == labels).sum().item()
        total_samples = labels.size(0)
        val_accuracy += (correct_predictions / total_samples) * 100 * images.size(0)

        # break
val_loss /= len(val_loader.dataset)
val_accuracy /= len(val_loader.dataset)

print(f'Validation Loss: {val_loss}')
print(f'Validation accuracy: {val_accuracy}' )



Validation Loss: 0.35054705786705015
Validation accuracy: 90.59


###  Model2:

In [9]:
import tqdm
import itertools
class ConvNet(nn.Module):
    def __init__(self, input_channels, num_classes):
        super().__init__()
        # Assuming input images are 40 x 168

        self.model = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V1)

        self.num_ftrs = self.model.fc.in_features
        self.model.fc = nn.Identity()
        self.fc= nn.ModuleList([ nn.Linear(self.num_ftrs, 10) for _ in range(4)])
        self.model.fc.requires_grad_(True)
        # self.fc = nn.Linear(self.flattened_size, num_classes)

    def forward(self, x):

        x= self.model(x)
        y = [ ]
        for _ in range(4):
            y.append(self.fc[_](x))
    
        return torch.stack(y, dim=-1)

def special_validation(val_loader, model, criterion, device):
    val_loss = 0.0
    val_accuracy = 0.0
    with torch.no_grad():
        for idx, (images, labels) in enumerate(tqdm.tqdm(val_loader)):
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            probab = outputs.softmax(-2).max(-2)[0].prod(-1)
            # print(probab[:2], outputs.softmax(-2)[:2], outputs.softmax(-2).max(-2)[0][:2])
            # print(-torch.log(probab).mean(), probab[:10])
            # exit()
            loss = criterion(outputs, labels)
            val_loss += loss.item() * images.size(0)

            predicted = outputs.argmax(-2).sum(-1)
            correct_predictions = (predicted == labels).sum().item()
            total_samples = labels.size(0)
            val_accuracy += (correct_predictions / total_samples) * 100 * images.size(0)
    val_loss /= len(val_loader.dataset)
    val_accuracy /= len(val_loader.dataset)

    return val_loss, val_accuracy
class CustomLoss():
    def __init__(self):
        numbers = list(range(10))  # Represents numbers 0 to 9
        all_sets = torch.tensor(list(itertools.product(numbers, repeat=4))).detach()
        self.all_sums = [[] for _ in range(37)]
        for vector in all_sets:
            sm = vector.sum()
            self.all_sums[sm].append(vector)

        for i in range(37):
            # print(i, self.all_sums[0].shape)
            self.all_sums[i] = torch.stack(self.all_sums[i], dim=0)

    def calculate(self, logits, all_sum):
        # logits are b x 10 x 4
        probability = logits.softmax(dim=1)
        loss = 0.0
        # print(self.all_sums)
        # construct b x number
        for logit,one_sum  in zip(probability, all_sum):
            # print(logit,one_sum)
            all_prob = logit[self.all_sums[one_sum], torch.arange(4)]
            loss += -torch.log(all_prob.prod(-1).sum())
            # for vector4 in self.all_sums[one_sum]:

            #     x = logit[vector4, torch.arange(4)].prod(-1)
            #     print(logit[vector4, torch.arange(4)], vector4, logit[vector4, torch.arange(4)].prod(-1))
            #     loss += x

            # print(loss)
        loss = loss/all_sum.shape[0]
        return loss

In [13]:
train_data1 = np.load('data2.npy')
train_lab1 = np.load('lab2.npy')

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.060661207217261905,),(0.2193603594906726,)),
])

# train_loader = torch.utils.data.DataLoader(MyData(train_data0, train_lab0, transform=transform_test), batch_size=batch_size, num_workers=5)
test_loader = torch.utils.data.DataLoader(MyData(train_data1, train_lab1, transform=transform_test), batch_size=batch_size, num_workers=5)

(10000, 40, 168, 3)


In [11]:
model = torch.load('./synthetic-ema-model-epoch.pth').to(device)
ls  = CustomLoss()
criterion2 = ls.calculate

In [14]:
test_loss, test_accuracy =  special_validation(test_loader, model, criterion2, device)
print(f'Test Loss: {test_loss}')
print(f'Test accuracy: {test_accuracy}' )

100%|██████████| 16/16 [00:04<00:00,  3.40it/s]

Test Loss: 3.172992643356323
Test accuracy: 53.33



