In [1]:
import torch

print("CUDA Available:", torch.cuda.is_available())
print("Number of CUDA devices:", torch.cuda.device_count())


CUDA Available: True
Number of CUDA devices: 2


In [2]:
!nvidia-smi

Mon Oct  2 09:43:14 2023       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 515.65.01    Driver Version: 515.65.01    CUDA Version: 11.7     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla T4            On   | 00000000:4A:00.0 Off |                    0 |
| N/A   36C    P8    14W /  70W |      5MiB / 15360MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
|   1  Tesla T4            On   | 00000000:89:00.0 Off |                    0 |
| N/A   35C    P8    14W /  70W |      5MiB / 15360MiB |      0%      Default |
|       

In [3]:
import os
os.getcwd()
os.listdir()

['parallel-cnn.pt',
 'cnn.pth',
 'parallel-cnn.ipynb',
 'parallel-cnn.py',
 'cnn.py',
 'data',
 '.ipynb_checkpoints']

In [17]:
import os
import shutil

def delete_data():
    data_dir = '/fs03/vf38/msyukron/data'
    if os.path.exists(data_dir):
        shutil.rmtree(data_dir)
        print(f"{data_dir} has been deleted!")
    else:
        print(f"{data_dir} does not exist!")
              
delete_data()

import os
import shutil

# Remove corrupted CIFAR-10 data directory
data_dir = '/fs03/vf38/msyukron/data'
if os.path.exists(data_dir):
    shutil.rmtree(data_dir)


/fs03/vf38/msyukron/data has been deleted!


### CNN

In [18]:
%%writefile /fs03/vf38/msyukron/cnn.py

import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import argparse


class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, 3, padding=1)
        self.conv2 = nn.Conv2d(16, 32, 3, padding=1)
        self.conv3 = nn.Conv2d(32, 64, 3, padding=1)
        self.conv4 = nn.Conv2d(64, 128, 3, padding=1)
        self.fc1 = nn.Linear(128*2*2, 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, 10)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv3(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv4(x))
        x = F.max_pool2d(x, 2, 2)
        x = x.view(-1, 128*2*2)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

def load_train_objs():
    transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
    train_set = datasets.CIFAR10(root='/fs03/vf38/msyukron/data', train=True, download=True, transform=transform)
    model = SimpleCNN()  
    optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
    return train_set, model, optimizer

def train(model, train_loader, optimizer, epochs, save_every, device):
    model.to(device)
    criterion = nn.CrossEntropyLoss()
    
    for epoch in range(epochs):
        running_loss = 0.0
        for i, data in enumerate(train_loader, 0):
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            
        print("Epoch {}, Loss: {:.4f}".format(epoch+1, running_loss / len(train_loader)))
        
        # Save the model every 'save_every' epochs
        if (epoch + 1) % save_every == 0:
            PATH = '/fs03/vf38/msyukron/cnn.pth'
            torch.save(model.state_dict(), PATH)
            print("Model saved at {}".format(PATH))

    print("Finished Training")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Train a CNN on CIFAR10')
    parser.add_argument('epochs', type=int, help='Number of epochs to train')
    parser.add_argument('save_every', type=int, help='Save the model every X epochs')
    parser.add_argument('--batch_size', default=32, type=int, help='Batch size for training')
    args = parser.parse_args()
    
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    
    train_set, model, optimizer = load_train_objs()
    train_loader = DataLoader(train_set, batch_size=args.batch_size, shuffle=True)
    train(model, train_loader, optimizer, args.epochs, args.save_every, device)

Overwriting /fs03/vf38/msyukron/cnn.py


In [19]:
!time python3 /fs03/vf38/msyukron/cnn.py 150 50 --batch_size 64

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to /fs03/vf38/msyukron/data/cifar-10-python.tar.gz
100%|██████████████████████▉| 170303488/170498071 [00:13<00:00, 14051718.65it/s]Extracting /fs03/vf38/msyukron/data/cifar-10-python.tar.gz to /fs03/vf38/msyukron/data
Epoch 1, Loss: 2.3030
170500096it [00:30, 14051718.65it/s]                                            Epoch 2, Loss: 2.3028
Epoch 3, Loss: 2.3027
Epoch 4, Loss: 2.3025
Epoch 5, Loss: 2.3024
Epoch 6, Loss: 2.3023
Epoch 7, Loss: 2.3022
Epoch 8, Loss: 2.3021
Epoch 9, Loss: 2.3020
Epoch 10, Loss: 2.3019
Epoch 11, Loss: 2.3018
Epoch 12, Loss: 2.3017
Epoch 13, Loss: 2.3016
Epoch 14, Loss: 2.3015
Epoch 15, Loss: 2.3014
Epoch 16, Loss: 2.3012
Epoch 17, Loss: 2.3011
Epoch 18, Loss: 2.3010
Epoch 19, Loss: 2.3008
Epoch 20, Loss: 2.3006
Epoch 21, Loss: 2.3004
Epoch 22, Loss: 2.3001
Epoch 23, Loss: 2.2998
Epoch 24, Loss: 2.2995
Epoch 25, Loss: 2.2991
Epoch 26, Loss: 2.2987
Epoch 27, Loss: 2.2981
Epoch 28, Loss: 2.2975

In [20]:
import numpy as np
from torchvision import datasets
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import argparse

class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, 3, padding=1)
        self.conv2 = nn.Conv2d(16, 32, 3, padding=1)
        self.conv3 = nn.Conv2d(32, 64, 3, padding=1)
        self.conv4 = nn.Conv2d(64, 128, 3, padding=1)
        self.fc1 = nn.Linear(128*2*2, 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, 10)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv3(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv4(x))
        x = F.max_pool2d(x, 2, 2)
        x = x.view(-1, 128*2*2)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# Load the model
model = SimpleCNN()
model.load_state_dict(torch.load('/fs03/vf38/msyukron/cnn.pth'))
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.to(device)
model.eval()  # Set the model to evaluation mode

# Load the CIFAR-10 test set
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
test_set = datasets.CIFAR10(root='/fs03/vf38/msyukron/data', train=False, download=True, transform=transform)
test_loader = DataLoader(test_set, batch_size=10, shuffle=False)  # Use shuffle=False to get the first 10 samples

# Get 10 samples from test set
data_iter = iter(test_loader)
images, labels = next(data_iter)  # Using the built-in next function
images, labels = images.to(device), labels.to(device)

# Predict
outputs = model(images)
_, predicted = torch.max(outputs, 1)

# Print the predictions
classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
print('Predictions:', ' '.join('%5s' % classes[predicted[j]] for j in range(10)))


Files already downloaded and verified
Predictions:   cat  ship  ship plane  frog  frog   car  frog   cat   car


In [33]:
import os
import shutil

def delete_data():
    data_dir = '/fs03/vf38/msyukron/data'
    if os.path.exists(data_dir):
        shutil.rmtree(data_dir)
        print(f"{data_dir} has been deleted!")
    else:
        print(f"{data_dir} does not exist!")
              
delete_data()

import os
import shutil

# Remove corrupted CIFAR-10 data directory
data_dir = '/fs03/vf38/msyukron/data'
if os.path.exists(data_dir):
    shutil.rmtree(data_dir)


/fs03/vf38/msyukron/data has been deleted!


### Parallel CNN

In [34]:
%%writefile /fs03/vf38/msyukron/parallel-cnn.py

import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
import os
import time

class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, 3, padding=1)
        self.conv2 = nn.Conv2d(16, 32, 3, padding=1)
        self.conv3 = nn.Conv2d(32, 64, 3, padding=1)
        self.conv4 = nn.Conv2d(64, 128, 3, padding=1)
        self.fc1 = nn.Linear(128*2*2, 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, 10)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv3(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv4(x))
        x = F.max_pool2d(x, 2, 2)
        x = x.view(-1, 128*2*2)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

def ddp_setup(rank, world_size):
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "12355"
    init_process_group(backend="nccl", rank=rank, world_size=world_size)
    torch.cuda.set_device(rank)

def load_train_objs():
    transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
    train_set = datasets.CIFAR10(root='/fs03/vf38/msyukron/data', train=True, download=True, transform=transform)
    model = SimpleCNN()  
    optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
    return train_set, model, optimizer

def prepare_dataloader(dataset: datasets, batch_size: int):
    return DataLoader(
        dataset,
        batch_size=batch_size,
        pin_memory=True,
        shuffle=False,
        sampler=DistributedSampler(dataset)
    )

class Trainer:
    def __init__(self, model: torch.nn.Module, train_data: DataLoader, optimizer: torch.optim.Optimizer, gpu_id: int, save_every: int) -> None:
        self.gpu_id = gpu_id
        self.model = model.to(gpu_id)
        self.train_data = train_data
        self.optimizer = optimizer
        self.save_every = save_every
        self.model = DDP(model, device_ids=[gpu_id])

    def _run_batch(self, source, targets):
        self.optimizer.zero_grad()
        output = self.model(source)
        loss = F.cross_entropy(output, targets.squeeze().long())  # updated loss function
        loss.backward()
        self.optimizer.step()

    def _run_epoch(self, epoch):
        b_sz = len(next(iter(self.train_data))[0])
        print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
        self.train_data.sampler.set_epoch(epoch)
        for source, targets in self.train_data:
            source = source.to(self.gpu_id)
            targets = targets.to(self.gpu_id)
            self._run_batch(source, targets)

    def _save_checkpoint(self, epoch):
        ckp = self.model.module.state_dict()
        PATH = "/fs03/vf38/msyukron/parallel-cnn.pt"
        torch.save(ckp, PATH)
        print(f"Epoch {epoch} | Training checkpoint saved at {PATH}")

    def train(self, max_epochs: int):
        for epoch in range(max_epochs):
            self._run_epoch(epoch)
            if self.gpu_id == 0 and epoch % self.save_every == 0:
                self._save_checkpoint(epoch)

def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):
    ddp_setup(rank, world_size)
    dataset, model, optimizer = load_train_objs()
    train_data = prepare_dataloader(dataset, batch_size)
    trainer = Trainer(model, train_data, optimizer, rank, save_every)
    trainer.train(total_epochs)
    destroy_process_group()

if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser(description='simple distributed training job')
    parser.add_argument('total_epochs', type=int, help='Total epochs to train the model')
    parser.add_argument('save_every', type=int, help='How often to save a snapshot')
    parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)')
    args = parser.parse_args()
    
    world_size = torch.cuda.device_count()
    mp.spawn(main, args=(world_size, args.save_every, args.total_epochs, args.batch_size), nprocs=world_size)


Overwriting /fs03/vf38/msyukron/parallel-cnn.py


In [36]:
!time python3 /fs03/vf38/msyukron/parallel-cnn.py 150 50 --batch_size 64

Files already downloaded and verified
Files already downloaded and verified
[GPU1] Epoch 0 | Batchsize: 64 | Steps: 391
[GPU0] Epoch 0 | Batchsize: 64 | Steps: 391
Epoch 0 | Training checkpoint saved at /fs03/vf38/msyukron/parallel-cnn.pt
[GPU1] Epoch 1 | Batchsize: 64 | Steps: 391
[GPU0] Epoch 1 | Batchsize: 64 | Steps: 391
[GPU0] Epoch 2 | Batchsize: 64 | Steps: 391
[GPU1] Epoch 2 | Batchsize: 64 | Steps: 391
[GPU1] Epoch 3 | Batchsize: 64 | Steps: 391
[GPU0] Epoch 3 | Batchsize: 64 | Steps: 391
[GPU1] Epoch 4 | Batchsize: 64 | Steps: 391
[GPU0] Epoch 4 | Batchsize: 64 | Steps: 391
[GPU1] Epoch 5 | Batchsize: 64 | Steps: 391
[GPU0] Epoch 5 | Batchsize: 64 | Steps: 391
[GPU0] Epoch 6 | Batchsize: 64 | Steps: 391
[GPU1] Epoch 6 | Batchsize: 64 | Steps: 391
[GPU1] Epoch 7 | Batchsize: 64 | Steps: 391
[GPU0] Epoch 7 | Batchsize: 64 | Steps: 391
[GPU0] Epoch 8 | Batchsize: 64 | Steps: 391
[GPU1] Epoch 8 | Batchsize: 64 | Steps: 391
[GPU0] Epoch 9 | Batchsize: 64 | Steps: 391
[GPU1] Epoch 

In [37]:
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import torch.nn.functional as F

# 1. Load the trained model
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, 3, padding=1)
        self.conv2 = nn.Conv2d(16, 32, 3, padding=1)
        self.conv3 = nn.Conv2d(32, 64, 3, padding=1)
        self.conv4 = nn.Conv2d(64, 128, 3, padding=1)
        self.fc1 = nn.Linear(128*2*2, 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, 10)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv3(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv4(x))
        x = F.max_pool2d(x, 2, 2)
        x = x.view(-1, 128*2*2)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

def load_trained_model(checkpoint_path):
    model = SimpleCNN()
    checkpoint = torch.load(checkpoint_path)
    model.load_state_dict(checkpoint)
    model.eval()  # Set model to evaluation mode
    return model

model = load_trained_model("/fs03/vf38/msyukron/parallel-cnn.pt")

# 2. Get 10 new data points
transform = transforms.Compose(
    [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
testset = torchvision.datasets.CIFAR10(root='/fs03/vf38/msyukron/data', train=False, download=True, transform=transform)
testloader = DataLoader(testset, batch_size=10, shuffle=True)  # Only one batch of 10 images

images, labels = next(iter(testloader))

# 3. Prediction function
def predict(model, images):
    outputs = model(images)
    _, predicted = torch.max(outputs, 1)
    return predicted

predicted_labels = predict(model, images)

print("Predicted labels:", predicted_labels.numpy())
print("Actual labels   :", labels.numpy())


Files already downloaded and verified
Predicted labels: [6 6 8 0 6 8 6 1 9 6]
Actual labels   : [7 9 8 9 6 2 5 2 8 2]


<b> Summary: </b>
- The DDP version completed the training faster in real time compared to the non-DDP version. This is a good thing and is expected if you have multiple GPUs and the data loading/training can be parallelized efficiently. The real-time reduction suggests that distributing the model across multiple GPUs was effective in speeding up the training.

- The user time for the DDP version is almost double compared to the non-DDP version. This makes sense because in DDP, you are using multiple processes (one for each GPU). Each process contributes to the user time. So, if you had two GPUs, for example, you could expect the user time to be roughly double (since both GPUs are doing computational work).

- The sys time is slightly higher for the DDP version, which might be due to the overhead of inter-process and GPU-to-GPU communication.

Source: 
- https://github.com/pytorch/examples/blob/main/distributed/ddp-tutorial-series/multigpu.py
- https://www.youtube.com/watch?v=-LAtx9Q6DA8&ab_channel=PyTorch

In [11]:
%%writefile /fs03/vf38/msyukron/ddp_lightning.py

import os
import torch
from torch import nn
from torchvision import datasets, transforms
import pytorch_lightning as pl

class SimpleCNN(pl.LightningModule):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=5, stride=1, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.layer2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=5, stride=1, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.drop_out = nn.Dropout()
        self.fc1 = nn.Linear(8 * 8 * 64, 1000)
        self.fc2 = nn.Linear(1000, 10)

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.reshape(out.size(0), -1)
        out = self.drop_out(out)
        out = self.fc1(out)
        out = self.fc2(out)
        return out

    def training_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        loss = nn.CrossEntropyLoss()(y_hat, y)
        return loss

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=0.001)

def cifar10_dataloader(batch_size, train):
    dataset = datasets.CIFAR10(
        os.getcwd(), train=train, download=True,
        transform=transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
        ])
    )
    loader = torch.utils.data.DataLoader(
        dataset, batch_size=batch_size, shuffle=True
    )
    return loader

def main():
    model = SimpleCNN()
    trainer = pl.Trainer(
        gpus=2,
        accelerator='dp',
        max_epochs=5
    )
    trainer.fit(model, cifar10_dataloader(batch_size=64, train=True))

if __name__ == "__main__":
    main()


Overwriting /fs03/vf38/msyukron/ddp_lightning.py


In [12]:
!time python3 ddp_lightning.py

  f"Passing `Trainer(accelerator={self.distributed_backend!r})` has been deprecated"
GPU available: True, used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to /fs03/vf38/msyukron/cifar-10-python.tar.gz
100%|██████████████████████▉| 170205184/170498071 [00:15<00:00, 13941365.70it/s]Extracting /fs03/vf38/msyukron/cifar-10-python.tar.gz to /fs03/vf38/msyukron
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0,1]
Set SLURM handle signals.

  | Name     | Type       | Params
----------------------------------------
0 | layer1   | Sequential | 2.4 K 
1 | layer2   | Sequential | 51.3 K
2 | drop_out | Dropout    | 0     
3 | fc1      | Linear     | 4.1 M 
4 | fc2      | Linear     | 10.0 K
----------------------------------------
4.2 M     Trainable params
0         Non-trainable params
4.2 M     Total params
16.643    Total estimated model params size (MB)
  f"The dataloader, {name}, does not have 

In [10]:
# !pip3 install pytorch-lightning

import pytorch_lightning as pl
print(pl.__version__)


1.5.10


In [13]:
%%writefile /fs03/vf38/msyukron/ddp_lightning.py

import os
import torch
from torch import nn
from torchvision import datasets, transforms
import pytorch_lightning as pl

class SimpleCNN(pl.LightningModule):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=5, stride=1, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.layer2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=5, stride=1, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.drop_out = nn.Dropout()
        self.fc1 = nn.Linear(8 * 8 * 64, 1000)
        self.fc2 = nn.Linear(1000, 10)

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.reshape(out.size(0), -1)
        out = self.drop_out(out)
        out = self.fc1(out)
        out = self.fc2(out)
        return out

    def training_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        loss = nn.CrossEntropyLoss()(y_hat, y)
        return loss

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=0.001)

    def train_dataloader(self):
        return cifar10_dataloader(batch_size=64, train=True)

    def training_epoch_end(self, outputs):
        torch.save(self.state_dict(), "model.pt")

def cifar10_dataloader(batch_size, train):
    dataset = datasets.CIFAR10(
        os.getcwd(), train=train, download=True,
        transform=transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
        ])
    )
    loader = torch.utils.data.DataLoader(
        dataset, batch_size=batch_size, shuffle=True, num_workers=4
    )
    return loader

def main():
    model = SimpleCNN()
    trainer = pl.Trainer(
        gpus=2,
        strategy='ddp_spawn',
        max_epochs=5
    )
    trainer.fit(model)

if __name__ == "__main__":
    main()


Overwriting /fs03/vf38/msyukron/ddp_lightning.py


In [14]:
!time python3 ddp_lightning.py

GPU available: True, used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0,1]
initializing distributed: GLOBAL_RANK: 1, MEMBER: 2/2
initializing distributed: GLOBAL_RANK: 0, MEMBER: 1/2
----------------------------------------------------------------------------------------------------
distributed_backend=nccl
All distributed processes registered. Starting with 2 processes
----------------------------------------------------------------------------------------------------

Set SLURM handle signals.
Set SLURM handle signals.

  | Name     | Type       | Params
----------------------------------------
0 | layer1   | Sequential | 2.4 K 
1 | layer2   | Sequential | 51.3 K
2 | drop_out | Dropout    | 0     
3 | fc1      | Linear     | 4.1 M 
4 | fc2      | Linear     | 10.0 K
----------------------------------------
4.2 M     Trainable params
0         Non-trainable params
4.2 M     Total params
16.643    Total estim

In [18]:
import torch
from torchvision import datasets, transforms
import pytorch_lightning as pl

class SimpleCNN(pl.LightningModule):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=5, stride=1, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.layer2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=5, stride=1, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.drop_out = nn.Dropout()
        self.fc1 = nn.Linear(8 * 8 * 64, 1000)
        self.fc2 = nn.Linear(1000, 10)

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.reshape(out.size(0), -1)
        out = self.drop_out(out)
        out = self.fc1(out)
        out = self.fc2(out)
        return out

def cifar10_dataloader(batch_size, train):
    dataset = datasets.CIFAR10(
        os.getcwd(), train=train, download=True,
        transform=transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
        ])
    )
    loader = torch.utils.data.DataLoader(
        dataset, batch_size=batch_size, shuffle=False, num_workers=4
    )
    return loader

# Load model
model_path = "model.pt"
model = SimpleCNN()
model.load_state_dict(torch.load(model_path))
model.eval()

# Load data
test_loader = cifar10_dataloader(batch_size=64, train=False)

# Predict on test data
all_predictions = []

with torch.no_grad():
    for batch in test_loader:
        inputs, _ = batch
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        all_predictions.extend(predicted.cpu().numpy())

print(all_predictions)


RuntimeError: [enforce fail at inline_container.cc:145] . PytorchStreamReader failed reading zip archive: invalid header or archive is corrupted