In [75]:
!pip install lightning





In [76]:
import torch
from torch.utils.data import Dataset
import torchvision
import torch.nn as nn
import torch.nn.parallel
import torch.optim as optim
from torchvision import transforms, models


import lightning as pl

import numpy as np
import matplotlib.pyplot as plt

# Hyperparameters

In [89]:
# Number of workers for dataloader
workers = 2
mnist_root = "../data/mnist"

# Picture parameters
batch_size = 128
image_size = 32

# Generator parameters
channels_num = 3
gen_feat_num = 64
gen_input_size = 100

# Discriminator parameters
disc_feat_num = 64

# Training parameters
num_epochs = 5
lr = 0.0002
beta = 0.5

# Data loading

In [90]:
mnist_train_data = torchvision.datasets.MNIST(mnist_root,
                                        download=True,
                                        train=True,
                                        transform=transforms.Compose([
                                            transforms.Resize(32),
                                            transforms.ToTensor()
                                        ])
)

mnist_test_data = torchvision.datasets.MNIST(mnist_root,
                                        download=False,
                                        train=False,
                                        transform=transforms.Compose([
                                            transforms.Resize(32),
                                            transforms.ToTensor()
                                        ])
)

train_data = torch.utils.data.DataLoader(mnist_train_data,
                                         batch_size=batch_size,
                                         shuffle=True,
                                         num_workers=workers)

test_data = torch.utils.data.DataLoader(mnist_test_data,
                                         batch_size=batch_size,
                                         shuffle=True,
                                         num_workers=workers)

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Preparing classificator

In [91]:
class ConvNetGS(pl.LightningModule):
    def __init__(self):
        super().__init__()

        self.model = models.convnext_base(num_of_classes=10)
        self.model.features[0][0] = nn.Conv2d(1, 128, kernel_size=(4, 4), stride=(4, 4))
        self.model.to(device)
        self.loss = nn.CrossEntropyLoss()

    def forward(self, x):
        x.to(device)
        return self.model(x)

    def training_step(self, batch, batch_no):
        x, y = batch
        logits = self(x)
        loss = self.loss(logits, y)
        return loss

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=lr)

In [91]:
classificator = ConvNetGS().to(device)
print(classificator)
trainer = pl.Trainer(
    num_nodes=1,
    max_epochs=5
)
trainer.fit(classificator, train_data)

trainer.save_checkpoint("mnist_convnet_v1.pt")

ConvNetGS(
  (model): ConvNeXt(
    (features): Sequential(
      (0): Conv2dNormActivation(
        (0): Conv2d(1, 128, kernel_size=(4, 4), stride=(4, 4))
        (1): LayerNorm2d((128,), eps=1e-06, elementwise_affine=True)
      )
      (1): Sequential(
        (0): CNBlock(
          (block): Sequential(
            (0): Conv2d(128, 128, kernel_size=(7, 7), stride=(1, 1), padding=(3, 3), groups=128)
            (1): Permute()
            (2): LayerNorm((128,), eps=1e-06, elementwise_affine=True)
            (3): Linear(in_features=128, out_features=512, bias=True)
            (4): GELU(approximate='none')
            (5): Linear(in_features=512, out_features=128, bias=True)
            (6): Permute()
          )
          (stochastic_depth): StochasticDepth(p=0.0, mode=row)
        )
        (1): CNBlock(
          (block): Sequential(
            (0): Conv2d(128, 128, kernel_size=(7, 7), stride=(1, 1), padding=(3, 3), groups=128)
            (1): Permute()
            (2): LayerNor

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name  | Type             | Params
-------------------------------------------
0 | model | ConvNeXt         | 88.6 M
1 | loss  | CrossEntropyLoss | 0     
-------------------------------------------
88.6 M    Trainable params
0         Non-trainable params
88.6 M    Total params
354.349   Total estimated model params size (MB)
C:\Users\User\anaconda3\lib\site-packages\lightning\pytorch\trainer\connectors\data_connector.py:436: Consider setting `persistent_workers=True` in 'train_dataloader' to speed up the dataloader worker initialization.


Training: |          | 0/? [00:00<?, ?it/s]

C:\Users\User\anaconda3\lib\site-packages\lightning\pytorch\trainer\call.py:54: Detected KeyboardInterrupt, attempting graceful shutdown...
ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.



Traceback (most recent call last):
  File "C:\Users\User\anaconda3\lib\site-packages\IPython\core\interactiveshell.py", line 3457, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "C:\Users\User\AppData\Local\Temp\ipykernel_22344\3012148268.py", line 9, in <module>
    trainer.save_checkpoint("mnist_convnet_v1.pt")
  File "C:\Users\User\anaconda3\lib\site-packages\lightning\pytorch\trainer\trainer.py", line 1384, in save_checkpoint
    self.strategy.save_checkpoint(checkpoint, filepath, storage_options=storage_options)
  File "C:\Users\User\anaconda3\lib\site-packages\lightning\pytorch\strategies\strategy.py", line 482, in save_checkpoint
    self.checkpoint_io.save_checkpoint(checkpoint, filepath, storage_options=storage_options)
  File "C:\Users\User\anaconda3\lib\site-packages\lightning\fabric\plugins\io\torch_io.py", line 55, in save_checkpoint
    _atomic_save(checkpoint, path)
  File "C:\Users\User\anaconda3\lib\site-packages\lightning\fabric\utilities\clo

TypeError: object of type 'NoneType' has no len()

In [None]:
from tqdm.autonotebook import tqdm
from sklearn.metrics import classification_report


def get_prediction(data, model: pl.LightningModule):
    model.freeze()
    probabilities = torch.softmax(model(data), dim=1)
    predicted_class = torch.argmax(probabilities, dim=1)
    return predicted_class, probabilities


trained_model = ConvNetGS.load_from_checkpoint("mnist_convnet_v1.pt", map_location="cuda")

true_y, predictions_y = [], []
for batch in tqdm(iter(test_data), total=len(test_data)):
    x, y = batch
    true_y.extend(y)
    predictions, probs = get_prediction(x.to(device), trained_model)
    probs.to(torch.device("cpu"))
    predictions_y.extend(predictions.cpu())


print(classification_report(true_y, predictions_y, digits=3))

In [None]:
trained_classificator = ConvNetGS.load_from_checkpoint("mnist_convnet_v1.pt", map_location="cuda")

print(trained_classificator)

# Feature extractor

In [None]:
class FeatureExtractor(pl.LightningModule):

    def __init__(self, classificator):
        super(FeatureExtractor, self).__init__()

        features = []
        last_module_name = ""
        for module_name, module in classificator.model.named_modules():
            if module_name.split(".")[0] != last_module_name:
                last_module_name = module_name.split(".")[0]
                features.append(module)

        self.features = nn.Sequential(*features[:-1])
        self.flatten = nn.Flatten()
        self.linear = nn.Linear(in_features=512, out_features=64)

    def forward(self, x):
        out = self.features(x)
        out = self.flatten(out)
        out = self.linear(out)
        return out


In [74]:
trained_classificator = ConvNetGS.load_from_checkpoint("mnist_convnet_v1.pt", map_location="cuda")

feature_extractor = FeatureExtractor(trained_classificator).to(device)

features = []

test_data = torch.utils.data.DataLoader(mnist_test_data,
                                         batch_size=batch_size,
                                         shuffle=True,
                                         num_workers=workers)

for batch in tqdm(iter(test_data), total=1):
    x, y = batch
    feature_extractor.freeze()
    result = feature_extractor(x.to(device))
    features.append(result.cpu().detach().numpy().reshape(-1))

features = np.array(features)


  0%|          | 0/1 [00:00<?, ?it/s]

[array([ 0.7106026 , -0.07017443, -0.2122746 , ..., -0.6785171 ,
        -1.9273187 ,  2.286946  ], dtype=float32)
 array([ 0.410731  , -1.2456834 , -0.9097442 , ...,  0.35476038,
         0.3998233 , -0.969803  ], dtype=float32)
 array([-0.26456523,  1.0940046 ,  0.78067064, ...,  0.11956294,
        -2.6421332 ,  1.5608226 ], dtype=float32)
 array([-0.49460477,  1.1773305 ,  1.1161937 , ..., -0.55718744,
        -0.4957794 ,  1.0166857 ], dtype=float32)
 array([-0.75410616,  0.32313445, -0.9976359 , ...,  0.4667137 ,
         1.9535078 ,  0.07961114], dtype=float32)
 array([-0.5435861,  1.1506131, -1.085178 , ..., -0.1531456, -1.6801205,
         0.7557149], dtype=float32)
 array([-1.6286862 ,  1.6955181 , -1.690027  , ...,  2.4549463 ,
        -4.16793   , -0.87827134], dtype=float32)
 array([-0.5849849,  0.8486128, -1.5900036, ..., -0.136146 , -2.54244  ,
         1.4401597], dtype=float32)
 array([ 0.4247976 ,  1.06565   ,  1.4647845 , ..., -0.13588615,
         1.8007092 ,  0.751

  features = np.array(features)


# Generator model

In [None]:
class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()
        self.main = nn.Sequential(
            # First block
            nn.ConvTranspose2d( gen_input_size, gen_feat_num * 4, 4, 2),
            nn.BatchNorm2d(gen_feat_num * 4),
            nn.ReLU(True),
            # Second block
            nn.ConvTranspose2d(gen_feat_num * 4, gen_feat_num * 2, 4, 2),
            nn.BatchNorm2d(gen_feat_num * 2),
            nn.ReLU(True),
            # Third block
            nn.ConvTranspose2d( gen_feat_num * 2, gen_feat_num, 4, 2),
            nn.BatchNorm2d(gen_feat_num * 2),
            nn.ReLU(True),
            # Final block
            nn.ConvTranspose2d( gen_feat_num, channels_num, 4, 2),
            nn.Tanh()
        )

In [None]:
def weights_init(model):
    classname = model.__class__.__name__
    if classname.find('Conv') != -1:
        nn.init.normal_(model.weight.data, 0.0, 0.02)
    elif classname.find('BatchNorm') != -1:
        nn.init.normal_(model.weight.data, 1.0, 0.02)

In [None]:
# Create the generator
generator = Generator().to(device)

# Initialize weights
generator.apply(weights_init)

# Print the model
print(generator)