In [None]:
#general imports
import torch
from torch import multiprocessing
import logging
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms

use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
dataloader_kwargs = {'pin_memory': True} if use_cuda else {}
try:
    multiprocessing.set_start_method('spawn')
except RuntimeError:
    pass

In [None]:
#model
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x=self.conv1(x)
        x=F.max_pool2d(x, 2)
        x = F.relu(x)
        x=self.conv2(x)
        x=self.conv2_drop(x)
        x=F.max_pool2d(x, 2)
        x = F.relu(x)
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        output=F.log_softmax(x, dim=1)
        return output
def train(rank, args, model, device,dataloader_kwargs):
    torch.manual_seed(args["seed"] + rank)
    train_data = datasets.FashionMNIST("./data", train=True,
                                       transform=transforms.Compose([
                                           transforms.ToTensor(),
                                           transforms.Normalize((0.1307,), (0.3081,))
                                       ]),
                                       target_transform=None,
                                       download=True)
    train_loader = torch.utils.data.DataLoader(train_data,
                                               batch_size=args["train_batch_size"],
                                               shuffle=True,
                                               num_workers=multiprocessing.cpu_count(),
                                               **dataloader_kwargs
                                               )
    optimizer = optim.SGD(model.parameters(), lr=args["lr"], momentum=args["momentum"])
    for epoch in range(1, args["epochs"] + 1):
        train_epoch(epoch, args, model, device, train_loader, optimizer)
        torch.save(model.state_dict(), "./mp_mnist_cnn.pt")
def test(args, model, device,dataloader_kwargs):
    torch.manual_seed(args["seed"])
    test_data = datasets.FashionMNIST("./data", train=False, transform=transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ]), target_transform=None, download=True)
    test_loader = torch.utils.data.DataLoader(test_data, 
                                              batch_size=args["test_batch_size"], 
                                              shuffle=False,
                                              num_workers=multiprocessing.cpu_count(),
                                               **dataloader_kwargs)
    test_epoch(model, device, test_loader)

def train_epoch(epoch, args, model, device, train_loader, optimizer):
    model.train()
    pid = os.getpid()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % args["log_interval"] == 0:
            print('{}\tTrain Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                pid,epoch, batch_idx * len(data), len(train_loader.dataset),
                           100. * batch_idx / len(train_loader), loss.item()))
def test_epoch(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()  # sum up batch loss
            pred = output.max(1)[1]  # get the index of the max log-probability
            correct += pred.eq(target).sum().item()
    test_loss /= len(test_loader.dataset)
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))
    

In [None]:
if __name__ == '__main__':
    args={
        "seed":42,
        "train_batch_size":250,
        "test_batch_size":500,
        "lr":1,
        "momentum":0.7,
        "epochs":5,
        "log_interval":5,
        "num_processes":2
    }

    torch.manual_seed(args["seed"])
    model = Net().to(device)

    model.share_memory()
    processes = []
    multiprocessing.log_to_stderr()
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)
    for rank in range(multiprocessing.cpu_count()):
        p = multiprocessing.Process(target=train, args=(rank, args, model, device,dataloader_kwargs))
        p.start()
        processes.append(p)
    for p in processes:
        p.join()


    test(args, model, device,dataloader_kwargs)