In [14]:
import torch

from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
from torch.utils.data import DataLoader
from dataloader import doc2dialDataset

import pandas as pd

In [37]:
# df = pd.read_csv('data/doc2dial/Train_dataset/DEFAULT/DEFAULT_withRefs.csv')

In [39]:
# df[:10].to_csv('data/doc2dial/TEST/DDP_Finetune.csv')

In [27]:
path = 'data/doc2dial/Train_dataset/DEFAULT/DEFAULT_withRefs.csv'

dataset = doc2dialDataset(path)
dataloader = DataLoader(dataset, batch_size=4, shuffle=False)

In [28]:
b = next(iter(dataloader))

In [29]:
questions = b['question']
answers = b['answer']

In [30]:
questions

['Hello, I forgot o update my address, can you help me with that?',
 'Can I do my DMV transactions online?',
 'Thanks, and in case I forget to bring all of the documentation needed to the DMV office, what can I do?',
 'Ok, and can you tell me again where should I report my new address?']

In [19]:
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
import os
from QAModel import format_prompt

In [20]:
from torch.utils.data.distributed import DistributedSampler

def prepare(rank, world_size, batch_size=2, pin_memory=False, num_workers=0):
    dataset = doc2dialDataset(path)
    sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank, shuffle=False, drop_last=False)
    
    dataloader = DataLoader(dataset, batch_size=batch_size, pin_memory=pin_memory, num_workers=num_workers, drop_last=False, shuffle=False, sampler=sampler)
    
    return dataloader

In [21]:
n_gpus = torch.cuda.device_count()
world_size = n_gpus
DDP_dataloader = prepare(rank=0, world_size=2, batch_size=2, pin_memory=False, num_workers=0)

In [22]:
first_batch = next(iter(DDP_dataloader))

In [25]:
qs = first_batch['question']
qs

['Hello, I forgot o update my address, can you help me with that?',
 'Thanks, and in case I forget to bring all of the documentation needed to the DMV office, what can I do?']

In [33]:
def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # initialize the process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()


def demo_parallel(rank, world_size):
    print(f"Running DDP with model parallel example on rank {rank}.")
    setup(rank, world_size)

    DDP_dataloader = prepare(rank, world_size, batch_size=2, pin_memory=False, num_workers=0)
    first_batch = next(iter(DDP_dataloader))

    tokenizer = T5Tokenizer.from_pretrained("t5-small")
    model = T5ForConditionalGeneration.from_pretrained("t5-small")

    model = model.to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    for epoch in epochs:
        #encoding inputs
        dataloader.sampler.set_epoch(epoch)

        for step, x in enumerate(epoch):

            qs = first_batch['question']
            answers = first_batch['answer']
            refs = first_batch['ref'] # ?

            temps = format_prompt(qs, refs)
            
            encoding = tokenizer(temps, 
                                return_tensors="pt", 
                                padding='longest', 
                                max_length=1024, 
                                truncation=True)
            
            input_ids, attention_mask = encoding.input_ids, encoding.attention_mask
            #encoding targets
            target_encoding = tokenizer(answers,
                                        return_tensors="pt",
                                        padding='longest',
                                        max_length=1024,
                                        truncation=True)
            labels = target_encoding.input_ids

            loss_fn = nn.MSELoss()
            optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

            optimizer.zero_grad()

            outputs = ddp_model(input_ids, attention_mask)

            loss_fn(outputs, labels).backward()
            optimizer.step()

    cleanup()

def run_demo(demo_fn, world_size):
    mp.spawn(demo_fn,
             args=(world_size,),
             nprocs=world_size,
             join=True)

In [36]:
n_gpus = torch.cuda.device_count()
assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}"
world_size = n_gpus
run_demo(demo_parallel, world_size)

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/home/ygong/miniconda3/envs/atlas-env/lib/python3.8/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/home/ygong/miniconda3/envs/atlas-env/lib/python3.8/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'demo_parallel' on <module '__main__' (built-in)>


ProcessExitedException: process 1 terminated with exit code 1