In [None]:
!pip install pytorch_lightning
!pip install torchsummary

In [1]:
import pytorch_lightning as pl
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
import torchvision.utils as vutils
from pytorch_lightning import Trainer, loggers
from torch.optim import Adam
from torch.utils.data import DataLoader, Subset
#from torchsummary import summary
from torchvision.datasets import MNIST

# normalization constants
MEAN = torch.tensor([0.5], dtype=torch.float32)
STD = torch.tensor([0.5], dtype=torch.float32)

In [33]:
class Autoencoder(pl.LightningModule):
    def __init__(self):
        super().__init__()
        #vhparams = vars(hparams)
        #for key in vhparams.keys():
        #    self.hparams[key] = vhparams[key]    
        self.hparams['data_root'] = "data"
        self.hparams['log_dir'] = "logs"
        self.hparams['num_workers'] = 4
        self.hparams['image_size'] = 16
        self.hparams['max_epochs'] = 10
        self.hparams['batch_size'] = 64
        self.hparams['nc'] = 1
        self.hparams['nz'] = 32
        self.hparams['nfe'] = 64
        self.hparams['nfd'] = 64
        self.hparams['lr'] = 0.0002
        self.hparams['beta1'] = 0.9
        self.hparams['beta2'] = 0.999
        self.hparams['gpus'] = 1

        self.encoder = nn.Sequential(
            # input (nc) x 16 x 16
            nn.Conv2d(self.hparams.nc, self.hparams.nfe, 4, 2, 1, bias=False),
            nn.BatchNorm2d(self.hparams.nfe),
            nn.LeakyReLU(True),
            # input (nfe) x 8 x 8
            nn.Conv2d(self.hparams.nfe, self.hparams.nfe * 2, 4, 2, 1, bias=False),
            nn.BatchNorm2d(self.hparams.nfe * 2),
            nn.LeakyReLU(True),
            # input (nfe*2) x 4 x 4
            nn.Conv2d(self.hparams.nfe * 2, self.hparams.nfe * 4, 4, 2, 1, bias=False),
            nn.BatchNorm2d(self.hparams.nfe * 4),
            nn.LeakyReLU(True),
            # input (nfe*4) x 2 x 2
            nn.Conv2d(self.hparams.nfe * 4, self.hparams.nz, 2, 1, 0, bias=False),
            nn.BatchNorm2d(self.hparams.nz),
            nn.LeakyReLU(True),
            # output (nz) x 1 x 1
        )

        self.decoder = nn.Sequential(
            # input (nz) x 1 x 1
            nn.ConvTranspose2d(self.hparams.nz, self.hparams.nfd * 4, 2, 1, 0, bias=False),
            nn.BatchNorm2d(self.hparams.nfd * 4),
            nn.ReLU(True),
            # input (nfd*4) x 2 x 2
            nn.ConvTranspose2d(self.hparams.nfd * 4, self.hparams.nfd * 2, 4, 2, 1, bias=False),
            nn.BatchNorm2d(self.hparams.nfd * 2),
            nn.ReLU(True),
            # input (nfd*2) x 4 x 4
            nn.ConvTranspose2d(self.hparams.nfd * 2, self.hparams.nfd, 4, 2, 1, bias=False),
            nn.BatchNorm2d(self.hparams.nfd),
            nn.ReLU(True),
            # input (nfd) x 8 x 8
            nn.ConvTranspose2d(self.hparams.nfd, self.hparams.nc, 4, 2, 1, bias=False),
            nn.Tanh()
            # output (nc) x 16 x 16
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

    def prepare_data(self):

        transform = transforms.Compose(
            [
                transforms.Resize(self.hparams.image_size),
                transforms.CenterCrop(self.hparams.image_size),
                transforms.ToTensor(),
                transforms.Normalize(MEAN.tolist(), STD.tolist()),
            ]
        )

        #dataset = ImageFolder(root=self.hparams.data_root, transform=transform)
        dataset = MNIST(root='../input/data', download=True, transform=transform)


        # train, val and test split
        end_train_idx = 45000
        end_val_idx = 52000
        end_test_idx = len(dataset)

        self.train_dataset = Subset(dataset, range(0, end_train_idx))
        self.val_dataset = Subset(dataset, range(end_train_idx + 1, end_val_idx))
        self.test_dataset = Subset(dataset, range(end_val_idx + 1, end_test_idx))

    def train_dataloader(self):
        return DataLoader(
            self.train_dataset, batch_size=self.hparams.batch_size, shuffle=True, num_workers=self.hparams.num_workers
        )

    def val_dataloader(self):
        return DataLoader(
            self.val_dataset, batch_size=self.hparams.batch_size, num_workers=self.hparams.num_workers
        )

    def test_dataloader(self):
        return DataLoader(
            self.test_dataset, batch_size=self.hparams.batch_size, num_workers=self.hparams.num_workers
        )

    def configure_optimizers(self):
        return Adam(self.parameters(), lr=self.hparams.lr, betas=(self.hparams.beta1, self.hparams.beta2))

    def save_images(self, x, output, name, n=16):
        """
        Saves a plot of n images from input and output batch
        """

        if self.hparams.batch_size < n:
            raise IndexError("You are trying to plot more images than your batch contains!")

        # denormalize images
        denormalization = transforms.Normalize((-MEAN / STD).tolist(), (1.0 / STD).tolist())
        x = [denormalization(i) for i in x[:n]]
        output = [denormalization(i) for i in output[:n]]

        # make grids and save to logger
        grid_top = vutils.make_grid(x, nrow=n)
        grid_bottom = vutils.make_grid(output, nrow=n)
        grid = torch.cat((grid_top, grid_bottom), 1)
        self.logger.experiment.add_image(name, grid)

    def training_step(self, batch, batch_idx):
        x, _ = batch
        output = self(x)
        loss = F.mse_loss(output, x)

        # save input and output images at beginning of epoch
        if batch_idx == 0:
            self.save_images(x, output, "train_input_output")

        logs = {"loss": loss}
        return {"loss": loss, "log": logs}

    def validation_step(self, batch, batch_idx):
        x, _ = batch
        output = self(x)
        loss = F.mse_loss(output, x)
        logs = {"val_loss": loss}
        return {"val_loss": loss, "log": logs}

    def validation_epoch_end(self, outputs):
        avg_loss = torch.stack([x["val_loss"] for x in outputs]).mean()
        logs = {"avg_val_loss": avg_loss}
        return {"avg_val_loss": avg_loss, "log": logs}

    def test_step(self, batch, batch_idx):
        x, _ = batch
        output = self(x)
        loss = F.mse_loss(output, x)

        # save input and output images at beginning of epoch
        if batch_idx == 0:
            self.save_images(x, output, "test_input_output")

        logs = {"test_loss": loss}
        return {"test_loss": loss, "log": logs}

    def test_epoch_end(self, outputs):
        avg_loss = torch.stack([x["test_loss"] for x in outputs]).mean()
        logs = {"avg_test_loss": avg_loss}
        return {"avg_test_loss": avg_loss, "log": logs}

In [4]:
model = Autoencoder()

In [5]:
logger = loggers.TensorBoardLogger(model.hparams.log_dir, name=f"bs{model.hparams.batch_size}_nf{model.hparams.nfe}")

In [9]:
# print detailed summary with estimated network size
summary(model, (model.hparams.nc, model.hparams.image_size, model.hparams.image_size), device="cpu")

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1             [-1, 64, 8, 8]           1,024
       BatchNorm2d-2             [-1, 64, 8, 8]             128
         LeakyReLU-3             [-1, 64, 8, 8]               0
            Conv2d-4            [-1, 128, 4, 4]         131,072
       BatchNorm2d-5            [-1, 128, 4, 4]             256
         LeakyReLU-6            [-1, 128, 4, 4]               0
            Conv2d-7            [-1, 256, 2, 2]         524,288
       BatchNorm2d-8            [-1, 256, 2, 2]             512
         LeakyReLU-9            [-1, 256, 2, 2]               0
           Conv2d-10             [-1, 32, 1, 1]          32,768
      BatchNorm2d-11             [-1, 32, 1, 1]              64
        LeakyReLU-12             [-1, 32, 1, 1]               0
  ConvTranspose2d-13            [-1, 256, 2, 2]          32,768
      BatchNorm2d-14            [-1, 25

In [18]:
trainer = Trainer(logger=logger, accelerator='mps', devices=model.hparams.gpus, max_epochs=model.hparams.max_epochs)
trainer.fit(model)

GPU available: True (mps), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
Missing logger folder: logs/bs64_nf64

  | Name    | Type       | Params
---------------------------------------
0 | encoder | Sequential | 690 K 
1 | decoder | Sequential | 690 K 
---------------------------------------
1.4 M     Trainable params
0         Non-trainable params
1.4 M     Total params
5.521     Total estimated model params size (MB)


Sanity Checking: 0it [00:00, ?it/s]

Training: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

  rank_zero_warn("Detected KeyboardInterrupt, attempting graceful shutdown...")


In [11]:
torch.save(model.state_dict(), 'outputs/autoencoder.pth')

In [12]:
trainer.test(model)

INFO:pytorch_lightning.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Testing: 0it [00:00, ?it/s]

[{}]

In [6]:
model = Autoencoder()
model.load_state_dict(torch.load('outputs/autoencoder.pth', map_location=torch.device('mps')))
model.eval()

Autoencoder(
  (encoder): Sequential(
    (0): Conv2d(1, 64, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): LeakyReLU(negative_slope=True)
    (3): Conv2d(64, 128, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (4): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): LeakyReLU(negative_slope=True)
    (6): Conv2d(128, 256, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (7): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (8): LeakyReLU(negative_slope=True)
    (9): Conv2d(256, 32, kernel_size=(2, 2), stride=(1, 1), bias=False)
    (10): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (11): LeakyReLU(negative_slope=True)
  )
  (decoder): Sequential(
    (0): ConvTranspose2d(32, 256, kernel_size=(2, 2), stride=(1, 1), bias=Fals

Classifier

In [25]:
class LinearModel(pl.LightningModule):
    def __init__(self,encoderr):
        super().__init__()
        self.automatic_optimization = False
        self.encoder = encoderr.encoder
        self.MLP = nn.Sequential(
            nn.Linear(32, 10),
            nn.ReLU(),
        )
        

    def forward(self, x):
      return self.MLP(x.view(-1,32))
      

    def training_step(self, batch, batch_nb):
        optii = self.optimizers()
        x, y = batch
        self.encoded_labels = torch.zeros(len(y),10)
        for i in range(len(y)):
          self.encoded_labels[i][(y[i])]=1
        self.encoded_labels = self.encoded_labels.to('cuda')
        self.encoded_features = self.encoder(x).detach()
        loss = F.cross_entropy(self(self.encoded_features), self.encoded_labels)
        optii.zero_grad()
        self.manual_backward(loss)
        optii.step()
        ##############
        return loss

    def configure_optimizers(self):
        optii = torch.optim.Adam(self.MLP.parameters(), lr=0.005)
        return optii

In [30]:
#train_ds = MNIST(PATH_DATASETS, train=True, download=True, transform=model.transforms.ToTensor())
model.prepare_data()

In [8]:
train_dataloader = DataLoader(model.train_dataset, batch_size=64, num_workers=2)

In [28]:
classifier = LinearModel(model)

In [16]:
# Initialize a trainer
trainer = Trainer(
    accelerator="cuda",
    devices=1,
    max_epochs=30,
)

INFO:pytorch_lightning.utilities.rank_zero:GPU available: True (cuda), used: True
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:IPU available: False, using: 0 IPUs
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs


In [17]:
# Train the model ⚡
trainer.fit(classifier, train_dataloader)

INFO:pytorch_lightning.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO:pytorch_lightning.callbacks.model_summary:
  | Name    | Type       | Params
---------------------------------------
0 | encoder | Sequential | 690 K 
1 | MLP     | Sequential | 330   
---------------------------------------
690 K     Trainable params
0         Non-trainable params
690 K     Total params
2.762     Total estimated model params size (MB)


Training: 0it [00:00, ?it/s]

  rank_zero_warn("Detected KeyboardInterrupt, attempting graceful shutdown...")


In [18]:
torch.save(classifier.state_dict(), 'outputs/classifier.pth')

In [29]:
classifier = LinearModel(model)
classifier.load_state_dict(torch.load('outputs/classifier.pth', map_location=torch.device('mps')))
classifier.eval()

LinearModel(
  (encoder): Sequential(
    (0): Conv2d(1, 64, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): LeakyReLU(negative_slope=True)
    (3): Conv2d(64, 128, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (4): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): LeakyReLU(negative_slope=True)
    (6): Conv2d(128, 256, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (7): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (8): LeakyReLU(negative_slope=True)
    (9): Conv2d(256, 32, kernel_size=(2, 2), stride=(1, 1), bias=False)
    (10): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (11): LeakyReLU(negative_slope=True)
  )
  (MLP): Sequential(
    (0): Linear(in_features=32, out_features=10, bias=True)
    (1): ReLU()
  )
)

In [34]:
test_dataloader = DataLoader(model.test_dataset,batch_size = 1000)
test_features, test_labels = next(iter(test_dataloader))
#print(test_labels)

In [35]:
test_features.shape

torch.Size([1000, 1, 16, 16])

In [None]:
test_features = test_features.to('mps')
encoded_feature = model.encoder(test_features)
encoded_feature = encoded_feature.to('cpu')
a=classifier(encoded_feature)
y_pred = torch.max(a, 1)[1]

In [37]:
#y_pred = y_pred.to('cpu')
from sklearn.metrics import f1_score, accuracy_score, confusion_matrix, classification_report, roc_curve
print(confusion_matrix(test_labels,y_pred))
print(classification_report(test_labels,y_pred))

[[ 93   0   1   0   0   1   2   0   2   0]
 [  0 117   1   0   0   1   0   0   0   1]
 [  0   4  80   2   3   0   1   3   6   5]
 [  1   1   3  79   0   8   3   0   5   1]
 [  0   1   0   0  87   0   1   1   0   4]
 [  0   2   1   3   2  69   2   1   0   0]
 [  1   0   4   0   2   0 101   0   0   0]
 [  1   3   1   0   1   0   0  98   0   4]
 [  0   6   2   4   1   1   1   0  74   1]
 [  0   0   0   1   4   1   0   7   2  81]]
              precision    recall  f1-score   support

           0       0.97      0.94      0.95        99
           1       0.87      0.97      0.92       120
           2       0.86      0.77      0.81       104
           3       0.89      0.78      0.83       101
           4       0.87      0.93      0.90        94
           5       0.85      0.86      0.86        80
           6       0.91      0.94      0.92       108
           7       0.89      0.91      0.90       108
           8       0.83      0.82      0.83        90
           9       0.84     