In [1]:
%%writefile horovod.py

import numpy
import os
from threading import Lock
from datetime import datetime
import argparse
import torchvision
import torchvision.transforms as transforms
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.nn.functional as F


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1,10, kernel_size=5)
        self.conv2 = nn.Conv2d(10,20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320,50)
        self.fc2 = nn.Linear(50,10)
        
    def forward(self,x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training = self.training)
        x = self.fc2(x)
        return F.log_softmax(x)
    
    

batch_size = 100
num_epochs = 5

# парамеры специфичные для оптиизатора
momentum = 0.5
log_interval = 100


def train_one_epoch(model, device, data_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(data_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % log_interval == 0:
            print("Train Epoch", epoch)
            
            
from time import time
import os

# function os saving of checkpoints
LOG_DIR = os.path.join("./logs/", str(time()), "MNISTDemo")

os.makedirs(LOG_DIR)

import horovod as hvd

# function for savinf this checkpoints in this path
def save_checkpoint(model, optimizer, epoch):
    filepath = LOG_DIR + '/checkpoint - {epoch}.pth.tar'.format(epoch=epoch)
    state = {'model': model.state_dict(), 'optimizer': optimizer.state_dict()}
    
    torch.save(state, filepath)
    
import torch.optim as optim
from torchvision import datasets, transforms

def train(learning_rate):
    device= torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    train_dataset = torchvision.datasets.MNIST('data', train=True,download = True, transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]))
    model = Net().to(device)
    
    data_loader = torch.utils.data.DataLoader(dataset = train_dataset, batch_size = batch_size, shuffle = False, num_workers=0, pin_memory=True)
    optimizer = optim.SGD(model.parameters(), lr= learning_rate,momentum = momentum)
    
    for epoch in range(1, num_epochs + 1):
        train_one_epoch(model, device, data_loader, optimizer, epoch)
        save_checkpoint(model, optimizer, epoch)

        

def train_hvd(learning_rate):
    hvd.init()
    
    device = torch.device(("cuda" if torch.cuda.is_available() else "cpu"))
    
    if device.type == "cuda":
        torch.cuda.set_device(hvd.local_rank())
        
        
    train_dataset = datasets.MNIST(
    root = 'data-%d'% hvd.rank(),
    train = True,
    download = True,
    transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]))
    
    train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size = batch_size, sampler = train_sampler)
    model = Net().to(device)
    
    optimizer = optim.SGD(model.parameters(), lr=learning_rate * hvd.size(), momentum = momentum)
    
    # это позволяет синхронизирвоат обработсик в мормент посмле гардлиетов
    optimizer = hvd.DistributedOptimizer(optimizer, named_parameters = model.named_parameters())
    
    
    # ставим для моджел пармеитры одинаковые
    hvd.broadcast_parameters(model.state_dict(), root_rank = 0)
    
    
    for epoch in range(1, num_epoch+1):
        train_one_epoch(model, device, train_loader, optimizer, epoch)
        if hvd.rank() == 0:
            # так как все синхронизировано то сохланеямтолктона одлном процессроре
            save_checkpoint(model, optimizer, epoch)
            
            
def train_hvd(learning_rate):
  
  # Initialize Horovod
    #hvd.init()  
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
  
    if device.type == 'cuda':
    # Pin GPU to local rank
        torch.cuda.set_device(hvd.local_rank())

    train_dataset = datasets.MNIST(
    # Use different root directory for each worker to avoid conflicts
    root='data-%d'% hvd.rank(),  
    train=True, 
    download=True,
    transform=transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
    )

    from torch.utils.data.distributed import DistributedSampler

    # Configure the sampler so that each worker gets a distinct sample of the input dataset
    train_sampler = DistributedSampler(train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
    # Use train_sampler to load a different sample of data on each worker
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, sampler=train_sampler)

    model = Net().to(device)

    # The effective batch size in synchronous distributed training is scaled by the number of workers
    # Increase learning_rate to compensate for the increased batch size
    optimizer = optim.SGD(model.parameters(), lr=learning_rate * hvd.size(), momentum=momentum)

    # Wrap the local optimizer with hvd.DistributedOptimizer so that Horovod handles the distributed optimization
    optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())

    # Broadcast initial parameters so all workers start with the same parameters
    hvd.broadcast_parameters(model.state_dict(), root_rank=0)

    for epoch in range(1, num_epochs + 1):
        train_one_epoch(model, device, train_loader, optimizer, epoch)
    # Save checkpoints only on worker 0 to prevent conflicts between workers
    if hvd.rank() == 0:
        save_checkpoint(hvd_log_dir, model, optimizer, epoch)
            
    
if __name__ == "__main__":
    train_hvd(0.001)

Overwriting horovod.py


In [2]:
from horovod import *

In [1]:
#train(learning_rate = 0.001)

In [3]:
!horovodrun

usage: horovodrun [-h] [-v] -np NP [-cb] [--disable-cache]
                  [--start-timeout START_TIMEOUT] [--network-interface NICS]
                  [--output-filename OUTPUT_FILENAME] [--verbose]
                  [--config-file CONFIG_FILE] [-p SSH_PORT]
                  [-i SSH_IDENTITY_FILE]
                  [--fusion-threshold-mb FUSION_THRESHOLD_MB]
                  [--cycle-time-ms CYCLE_TIME_MS]
                  [--cache-capacity CACHE_CAPACITY]
                  [--hierarchical-allreduce | --no-hierarchical-allreduce]
                  [--hierarchical-allgather | --no-hierarchical-allgather]
                  [--autotune] [--autotune-log-file AUTOTUNE_LOG_FILE]
                  [--autotune-warmup-samples AUTOTUNE_WARMUP_SAMPLES]
                  [--autotune-steps-per-sample AUTOTUNE_STEPS_PER_SAMPLE]
                  [--autotune-bayes-opt-max-samples AUTOTUNE_BAYES_OPT_MAX_SAMPLES]
                  [--autotune-gaussian-process-noise AUTOTUNE_GAUSSIAN

In [4]:
def train_hvd(learning_rate):
  
  # Initialize Horovod
    hvd.init()  
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
  
    if device.type == 'cuda':
    # Pin GPU to local rank
        torch.cuda.set_device(hvd.local_rank())

    train_dataset = datasets.MNIST(
    # Use different root directory for each worker to avoid conflicts
    root='data-%d'% hvd.rank(),  
    train=True, 
    download=True,
    transform=transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
    )

    from torch.utils.data.distributed import DistributedSampler

    # Configure the sampler so that each worker gets a distinct sample of the input dataset
    train_sampler = DistributedSampler(train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
    # Use train_sampler to load a different sample of data on each worker
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, sampler=train_sampler)

    model = Net().to(device)

    # The effective batch size in synchronous distributed training is scaled by the number of workers
    # Increase learning_rate to compensate for the increased batch size
    optimizer = optim.SGD(model.parameters(), lr=learning_rate * hvd.size(), momentum=momentum)

    # Wrap the local optimizer with hvd.DistributedOptimizer so that Horovod handles the distributed optimization
    optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())

    # Broadcast initial parameters so all workers start with the same parameters
    hvd.broadcast_parameters(model.state_dict(), root_rank=0)

    for epoch in range(1, num_epochs + 1):
        train_one_epoch(model, device, train_loader, optimizer, epoch)
    # Save checkpoints only on worker 0 to prevent conflicts between workers
    if hvd.rank() == 0:
        save_checkpoint(hvd_log_dir, model, optimizer, epoch)

In [5]:
!horovodrun

usage: horovodrun [-h] [-v] -np NP [-cb] [--disable-cache]
                  [--start-timeout START_TIMEOUT] [--network-interface NICS]
                  [--output-filename OUTPUT_FILENAME] [--verbose]
                  [--config-file CONFIG_FILE] [-p SSH_PORT]
                  [-i SSH_IDENTITY_FILE]
                  [--fusion-threshold-mb FUSION_THRESHOLD_MB]
                  [--cycle-time-ms CYCLE_TIME_MS]
                  [--cache-capacity CACHE_CAPACITY]
                  [--hierarchical-allreduce | --no-hierarchical-allreduce]
                  [--hierarchical-allgather | --no-hierarchical-allgather]
                  [--autotune] [--autotune-log-file AUTOTUNE_LOG_FILE]
                  [--autotune-warmup-samples AUTOTUNE_WARMUP_SAMPLES]
                  [--autotune-steps-per-sample AUTOTUNE_STEPS_PER_SAMPLE]
                  [--autotune-bayes-opt-max-samples AUTOTUNE_BAYES_OPT_MAX_SAMPLES]
                  [--autotune-gaussian-process-noise AUTOTUNE_GAUSSIAN

In [4]:
!horovodrun -np 1 python3 horovod.py

2021-10-14 13:28:54.809817: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2021-10-14 13:28:54.809844: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
2021-10-14 13:28:56.059723: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2021-10-14 13:28:56.059754: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
[0]<stderr>:Traceback (most recent call last):
[0]<stderr>:  File "horovod.py", line 176, in <module>
[0]<stderr>:    train_hvd(0.001)
[0]<stderr>:  File "horovod.py", line 143, in train_hvd
[0]<stderr>:    root='data-%d'% h