### Let's implement toy pp example

In [1]:
import random

import numpy as np

import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# set seed for reproducibility

np.random.seed(0)
torch.manual_seed(0)
torch.backends.cudnn.deterministic = True

In [3]:
# We will use FashionMNIST dataset

training_data = datasets.FashionMNIST(
    root="data",
    train=True,
    download=True,
    transform=ToTensor()
)

test_data = datasets.FashionMNIST(
    root="data",
    train=False,
    download=True,
    transform=ToTensor()
)

train_dataloader = DataLoader(training_data, batch_size = 32)
test_dataloader = DataLoader(test_data, batch_size = 32)

train_dataloader.dataset[0][0].shape

Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-images-idx3-ubyte.gz
Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-images-idx3-ubyte.gz to data/FashionMNIST/raw/train-images-idx3-ubyte.gz


100%|██████████| 26421880/26421880 [00:03<00:00, 6779296.49it/s] 


Extracting data/FashionMNIST/raw/train-images-idx3-ubyte.gz to data/FashionMNIST/raw

Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-labels-idx1-ubyte.gz
Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-labels-idx1-ubyte.gz to data/FashionMNIST/raw/train-labels-idx1-ubyte.gz


100%|██████████| 29515/29515 [00:00<00:00, 117665.79it/s]


Extracting data/FashionMNIST/raw/train-labels-idx1-ubyte.gz to data/FashionMNIST/raw

Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-images-idx3-ubyte.gz
Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-images-idx3-ubyte.gz to data/FashionMNIST/raw/t10k-images-idx3-ubyte.gz


100%|██████████| 4422102/4422102 [00:09<00:00, 483862.69it/s] 


Extracting data/FashionMNIST/raw/t10k-images-idx3-ubyte.gz to data/FashionMNIST/raw

Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-labels-idx1-ubyte.gz
Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-labels-idx1-ubyte.gz to data/FashionMNIST/raw/t10k-labels-idx1-ubyte.gz


100%|██████████| 5148/5148 [00:00<00:00, 9239314.07it/s]

Extracting data/FashionMNIST/raw/t10k-labels-idx1-ubyte.gz to data/FashionMNIST/raw






torch.Size([1, 28, 28])

In [4]:
class SingleNeuralNetwork(nn.Module):
    def __init__(self):
        super(SingleNeuralNetwork, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28*28, 512),
            nn.ReLU(),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 10),
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits
    
my_single_net = SingleNeuralNetwork()
my_single_net

SingleNeuralNetwork(
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (linear_relu_stack): Sequential(
    (0): Linear(in_features=784, out_features=512, bias=True)
    (1): ReLU()
    (2): Linear(in_features=512, out_features=256, bias=True)
    (3): ReLU()
    (4): Linear(in_features=256, out_features=10, bias=True)
  )
)

In [5]:
def single_train_loop(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    for batch, (X, y) in enumerate(dataloader):
        optimizer.zero_grad()

        pred = model(X)
        loss = loss_fn(pred, y)
        loss.backward(retain_graph = True)
        optimizer.step()

        if batch % 1000 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"train loss : {loss:>7f} [{current:>5d}/{size:>5d}]")

def single_test_loop(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0

    with torch.no_grad():
        for X , y in dataloader:
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error : \n Accuracy : {(100*correct):>0.1f}%, Avg loss : {test_loss:>8f} \n")

In [6]:
loss_fn = nn.CrossEntropyLoss()

# set hyperparams
learning_rate = 1e-3
batch_size = 64
epochs = 5

single_optimizer = torch.optim.SGD(my_single_net.parameters(), lr = learning_rate)

In [7]:
for t in range(epochs):
    print(f"Epoch {t+1}\n-----------------")
    single_train_loop(train_dataloader, my_single_net, loss_fn, single_optimizer)
    single_test_loop(test_dataloader, my_single_net, loss_fn)
print("Done!")

Epoch 1
-----------------
train loss : 2.303715 [    0/60000]
train loss : 2.134295 [32000/60000]
Test Error : 
 Accuracy : 59.5%, Avg loss : 1.950056 

Epoch 2
-----------------
train loss : 1.944278 [    0/60000]
train loss : 1.494424 [32000/60000]
Test Error : 
 Accuracy : 62.0%, Avg loss : 1.306143 

Epoch 3
-----------------
train loss : 1.313682 [    0/60000]
train loss : 1.045949 [32000/60000]
Test Error : 
 Accuracy : 65.2%, Avg loss : 1.012107 

Epoch 4
-----------------
train loss : 0.981220 [    0/60000]
train loss : 0.852415 [32000/60000]
Test Error : 
 Accuracy : 67.1%, Avg loss : 0.880317 

Epoch 5
-----------------
train loss : 0.815726 [    0/60000]
train loss : 0.753456 [32000/60000]
Test Error : 
 Accuracy : 69.2%, Avg loss : 0.807341 

Done!


In [8]:
class NeuralNetwork1(nn.Module):
    def __init__(self):
        super(NeuralNetwork1, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28*28, 512),
            nn.ReLU(),
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

class NeuralNetwork2(nn.Module):
    def __init__(self):
        super(NeuralNetwork2, self).__init__()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 10),
        )

    def forward(self, x):
        logits = self.linear_relu_stack(x)
        return logits

model1 = NeuralNetwork1()
model2 = NeuralNetwork2()

In [9]:
optimizer1 = torch.optim.SGD(model1.parameters(), lr = learning_rate)
optimizer2 = torch.optim.SGD(model2.parameters(), lr = learning_rate)

Let's Complete pipeline parallel train loop

In [10]:
def pp_train_loop(dataloader, model1, model2, loss_fn, optimizer1, optimizer2):
    size = len(dataloader.dataset)
    for batch, (X, y) in enumerate(dataloader):
        pred1 = model1(X)

        # input of model2 forward is output of model1 forward!
        # Your codes start here
        pred2 = model2(pred1)
        # Your codes end here

        # loss can be obtained with output of model2
        # Your codes start here
        loss2 = loss_fn(pred2, y)
        # Your codes end here

        optimizer2.zero_grad()
        pred1.retain_grad()
        loss2.backward(retain_graph = True)
        inter_grad = pred1.grad
        optimizer2.step()

        optimizer1.zero_grad()

        # input of model1 backward is output of model2 backward!
        # Your codes start here
        pred1.backward(inter_grad)
        # Your codes end here

        optimizer1.step()

        if batch % 1000 == 0:
            loss2, current = loss2.item(), batch * len(X)
            print(f"loss : {loss2:>7f} [{current:>5d}/{size:>5d}]")

def pp_test_loop(dataloader, model1, model2, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0

    with torch.no_grad():
        for X , y in dataloader:
            pred1 = model1(X)
            pred2 = model2(pred1)
            test_loss += loss_fn(pred2, y).item()
            correct += (pred2.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error : \n Accuracy : {(100*correct):>0.1f}%, Avg loss : {test_loss:>8f} \n")

for t in range(epochs):
    print(f"Epoch {t+1}\n-----------------")
    pp_train_loop(train_dataloader, model1, model2, loss_fn, optimizer1, optimizer2)
    pp_test_loop(test_dataloader, model1, model2, loss_fn)
print("Done!")

Epoch 1
-----------------
loss : 2.301928 [    0/60000]
loss : 2.130015 [32000/60000]
Test Error : 
 Accuracy : 56.9%, Avg loss : 1.944549 

Epoch 2
-----------------
loss : 1.929348 [    0/60000]
loss : 1.456204 [32000/60000]
Test Error : 
 Accuracy : 63.4%, Avg loss : 1.314913 

Epoch 3
-----------------
loss : 1.310612 [    0/60000]
loss : 1.047293 [32000/60000]
Test Error : 
 Accuracy : 65.4%, Avg loss : 1.016318 

Epoch 4
-----------------
loss : 0.985983 [    0/60000]
loss : 0.856761 [32000/60000]
Test Error : 
 Accuracy : 67.9%, Avg loss : 0.877020 

Epoch 5
-----------------
loss : 0.818541 [    0/60000]
loss : 0.758251 [32000/60000]
Test Error : 
 Accuracy : 70.1%, Avg loss : 0.800100 

Done!


In [13]:
single_net_param_sum = 0
for param in my_single_net.parameters():
    flattened_param = param.reshape(1,-1)
    single_net_param_sum += len(flattened_param[0])
single_net_param_sum

535818

In [14]:
partitioned_net_param_sum = 0
for param in model1.parameters():
    flattened_param = param.reshape(1,-1)
    partitioned_net_param_sum += len(flattened_param[0])
for param in model2.parameters():
    flattened_param = param.reshape(1,-1)
    partitioned_net_param_sum += len(flattened_param[0])
partitioned_net_param_sum

535818