## Distributed Computing using multiple threads with Torch
This example demonstrates the basic multithreaded computation.

Note:  this does not work on windows 10, but it is fine on a Ubuntu multicore processor without GPUs.

it will fail if there is a cuda device  on the server.

In [1]:
import os
import torch
import torch.distributed as dist
from torch.multiprocessing import Process

In [2]:
#set number of threads
P = 3

In [3]:
def run(rank, size):
    """ Distributed function to be implemented later. """
    pass

In [4]:
def init_process(rank, size, fn, backend='gloo'):
    """ Initialize the distributed environment. """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)

In [6]:
def run_message_passing(rank, size):
    tensor = torch.zeros(1)
    if rank == 0:
        tensor += 1
        # Send the tensor to process 1
        dist.send(tensor=tensor, dst=1)
    elif rank ==1:
        # Receive tensor from process 0
        dist.recv(tensor=tensor, src=0)
        tensor += 1
        dist.send(tensor=tensor, dst=2)
    else:
        dist.recv(tensor=tensor, src=1)
        tensor += 1
        if tensor == 3:
            print('done')

    print('Rank ', rank, ' has data ', tensor[0])

In [16]:
def run_reduce(rank, size):
    """ Simple point-to-point communication. """
    group = dist.new_group([0, 1, 2])
    tensor = torch.ones(1)
    tensor +=rank
    dist.all_reduce(tensor, op=dist.ReduceOp.SUM, group=group)
    print('Rank ', rank, ' has data ', tensor[0])

In [11]:
def run_gather(rank, size):
    """ Simple point-to-point communication. """
    grouplist = [i for i in range(P)]
    group = dist.new_group(grouplist)
    tensor = torch.ones(1)
    tensor +=rank
    lst = [torch.ones(1),torch.ones(1),torch.ones(1)]
    dist.all_gather(lst, tensor, group=group)
    print('Rank ', rank, ' has data ', lst)

In [13]:
size = P
processes = []
for rank in range(size):
    p = Process(target=init_process, args=(rank, size, run_message_passing))
    p.start()
    processes.append(p)

for p in processes:
    p.join()


done
Rank  1  has data  tensor(2.)
Rank  2  has data  tensor(3.)
Rank  0  has data  tensor(1.)


In [17]:
size = P
processes = []
for rank in range(size):
    p = Process(target=init_process, args=(rank, size, run_reduce))
    p.start()
    processes.append(p)

for p in processes:
    p.join()


Rank  0  has data  tensor(6.)
Rank  2  has data  tensor(6.)
Rank  1  has data  tensor(6.)


In [18]:
size = P
processes = []
for rank in range(size):
    p = Process(target=init_process, args=(rank, size, run_gather))
    p.start()
    processes.append(p)

for p in processes:
    p.join()


Rank  1  has data  [tensor([1.]), tensor([2.]), tensor([3.])]
Rank  2  has data  [tensor([1.]), tensor([2.]), tensor([3.])]
Rank  0  has data  [tensor([1.]), tensor([2.]), tensor([3.])]
