In [2]:
import os
from packaging import version
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data.distributed
from filelock import FileLock
from torchvision import datasets, transforms
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import kagglehub
import horovod
import horovod.torch as hvd

In [3]:
params = {
    'batch_size': 64,
    'test_batch_size': 1000,
    'epochs': 10,
    'lr': 0.01,
    'momentum': 0.5,
    'seed': 42,
    'log_interval': 10,
    'fp16_allreduce': False,
    'use_mixed_precision': False,
    'use_adasum': False,
    'gradient_predivide_factor': 1.0,
    'data_dir': './data',
    'num_proc': None,
    'hosts': None,
    'communication': None
}

In [4]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x)


In [5]:
common_transforms = transforms.Compose(
    [
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ]
)

In [6]:
import ssl
ssl._create_default_https_context = ssl._create_unverified_context

data_dir = 'Desktop/MNIST/'

train_dataset = datasets.MNIST(data_dir, train=True, download=True,transform=common_transforms)

test_dataset = datasets.MNIST(data_dir, train=False, transform=common_transforms)

model = Net()



In [7]:

def train_epoch(epoch):
    model.train()
    train_sampler.set_epoch(epoch)
    for batch_idx, (data, target) in enumerate(train_loader):
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % params['log_interval'] == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_sampler),
                100. * batch_idx / len(train_loader), loss.item()))


def metric_average(val, name):
    tensor = torch.tensor(val)
    avg_tensor = hvd.allreduce(tensor, name=name)
    return avg_tensor.item()

def test():
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            output = model(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()  # sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)

    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))



In [8]:
# Initialize Horovod
hvd.init()
torch.manual_seed(params['seed'])

# Imposta il numero di thread e i worker
torch.set_num_threads(4)

kwargs = {'num_workers': 4, 'pin_memory': False}  # No need for pin_memory on CPU

   
train_sampler = torch.utils.data.distributed.DistributedSampler(
    train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
train_loader = torch.utils.data.DataLoader(
    train_dataset, batch_size=params['batch_size'], sampler=train_sampler, **kwargs)



test_sampler = torch.utils.data.distributed.DistributedSampler(
    test_dataset, num_replicas=hvd.size(), rank=hvd.rank())
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=params['test_batch_size'],
                                          sampler=test_sampler, **kwargs)



lr_scaler = hvd.size() if not params['use_adasum'] else 1

optimizer = optim.SGD(model.parameters(), lr=params['lr'] * lr_scaler,
                      momentum=params['momentum'])

# Broadcast dei parametri e dello stato dell'ottimizzatore
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)

compression = hvd.Compression.fp16 if params['fp16_allreduce'] else hvd.Compression.none

optimizer = hvd.DistributedOptimizer(optimizer,
                                     named_parameters=model.named_parameters(),
                                     compression=compression,
                                     op=hvd.Adasum if params['use_adasum'] else hvd.Average,
                                     gradient_predivide_factor=params['gradient_predivide_factor'])

# Ciclo di addestramento e test
for epoch in range(1, params['epochs'] + 1):
    train_epoch(epoch)
    test()


  return F.log_softmax(x)



Test set: Average loss: 0.1832, Accuracy: 9462/10000 (95%)


Test set: Average loss: 0.1113, Accuracy: 9647/10000 (96%)


Test set: Average loss: 0.0876, Accuracy: 9718/10000 (97%)


Test set: Average loss: 0.0723, Accuracy: 9753/10000 (98%)


Test set: Average loss: 0.0639, Accuracy: 9791/10000 (98%)


Test set: Average loss: 0.0574, Accuracy: 9820/10000 (98%)


Test set: Average loss: 0.0552, Accuracy: 9816/10000 (98%)


Test set: Average loss: 0.0493, Accuracy: 9837/10000 (98%)


Test set: Average loss: 0.0469, Accuracy: 9837/10000 (98%)


Test set: Average loss: 0.0475, Accuracy: 9829/10000 (98%)

