In [38]:
from transformers import T5Tokenizer
from transformers import T5ForConditionalGeneration, T5Tokenizer
from dataset_utils import T5Dataset, collator
import torch
from torch.utils.data import DataLoader
import os

model_variants = ["google-t5/t5-base", "google-t5/t5-3b", "google/flan-t5-large", "google-t5/t5-11b"]
device = 'cuda:0'

model = T5ForConditionalGeneration.from_pretrained(
    model_variants[0],
    torch_dtype=torch.float16,
    token = os.getenv('HF_ACCESS_TOKEN')
).to(device)

tokenizer = T5Tokenizer.from_pretrained(
    model_variants[0]
)



generation_config.json:   0%|          | 0.00/147 [00:00<?, ?B/s]

In [31]:
from torch.utils.data import Dataset
import torch.nn.utils
import numpy as np

class T5Dataset(Dataset):
    def __init__(
        self,
        texts,
        tokenizer,
        max_length=512,
        corruption_rate=0.15,
        mean_span_length=3
    ):
        # super().__init__()
        self.texts = texts
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.corruption_rate = corruption_rate
        self.mean_span_length = mean_span_length

    def corrupt_text(self, tokens):
        # tokens = text.strip().split()
        mask = self.generate_span_mask(len(tokens))
        input_tokens = []
        target_tokens = []
        sentinel = 0
        in_span = False
        for i, token in enumerate(tokens):
            if mask[i]:
                if not in_span:
                    input_tokens.append(f"<extra_id_{sentinel}>")
                    target_tokens.append(f"<extra_id_{sentinel}>")
                    sentinel += 1
                    in_span = True
                target_tokens.append(token)
            else:
                input_tokens.append(token)
                in_span = False

        if in_span:
            target_tokens.append(f"<extra_id_{sentinel+1}>")
        else:
            input_tokens.append(f"<extra_id_{sentinel}>")
            target_tokens.append(f"<extra_id_{sentinel}>")
        
        return " ".join(input_tokens), " ".join(target_tokens)

    def generate_span_mask(self, seq_len):
        num_tokens_to_mask = max(1, int(self.corruption_rate * seq_len))
        mask = np.zeros(seq_len, dtype=bool)
        num_masked = 0
        while num_masked < num_tokens_to_mask:
            span_start = np.random.randint(0, seq_len)
            span_length = max(1, np.random.poisson(self.mean_span_length))
            span_end = min(seq_len, span_start + span_length)
            if np.any(mask[span_start:span_end]):
                continue
            mask[span_start:span_end] = True
            num_masked += span_end - span_start
        return mask

    def __getitem__(self, idx):
        text = self.texts[idx]
        tokens = text.strip().split()
        
        # Handle empty text case first
        if len(tokens) == 0:
            input_ids = self.tokenizer.encode("", return_tensors="pt").squeeze(0)
            target_ids = self.tokenizer.encode("", return_tensors="pt").squeeze(0)
            return {"input_ids": input_ids, "labels": target_ids}
        
        try:
            corrupted_input, target = self.corrupt_text(tokens)
            input_ids = self.tokenizer.encode(
                corrupted_input, 
                truncation=True, 
                max_length=self.max_length, 
                return_tensors="pt"
            ).squeeze(0)
            
            target_ids = self.tokenizer.encode(
                target, 
                truncation=True, 
                max_length=self.max_length, 
                return_tensors="pt"
            ).squeeze(0)
            
            return {"input_ids": input_ids, "labels": target_ids}
        except Exception as e:
            print(f"Error processing item {idx}: {e}")
            # Fallback to empty tokens
            input_ids = self.tokenizer.encode("", return_tensors="pt").squeeze(0)
            target_ids = self.tokenizer.encode("", return_tensors="pt").squeeze(0)
            return {"input_ids": input_ids, "labels": target_ids}

    def __len__(self):
        return len(self.texts)


def collator(batch, tokenizer):
    try:
        input_ids = [item['input_ids'] for item in batch]
        labels = [item['labels'] for item in batch]
        input_ids = torch.nn.utils.rnn.pad_sequence(input_ids, batch_first=True, padding_value=tokenizer.pad_token_id)
        labels = torch.nn.utils.rnn.pad_sequence(labels, batch_first=True, padding_value=-100)
    except:
        print(batch)
    return {"input_ids": input_ids, "labels": labels, "attention_mask": input_ids.ne(tokenizer.pad_token_id)}

In [32]:
def my_collator(batch):
    return collator(batch, tokenizer)

raw_input = '/home/tadesa1/ADBMO-UNLV/data/processed_output_raw.txt'
with open(raw_input, 'r') as f:
    text = f.readlines()
    text = [line for line in text if line.strip()]

dataset = T5Dataset(text, tokenizer)
dataloader = DataLoader(dataset, batch_size=2, shuffle=True, collate_fn=my_collator)


In [34]:
for batch in dataloader:
    print(batch["input_ids"].shape)       # (batch_size, seq_len)
    print(batch["labels"].shape)
    print(batch["input_ids"])
    print(batch["labels"])
    print("-----------------")
    # break

torch.Size([2, 38])
torch.Size([2, 18])
tensor([[   37, 32099,  7869,   480,     6,    11,   180, 25210,    18,  3881,
           553,  4949,  7952,  1457,   856,  4962,     6,  1989,    16,     8,
          2625,    13,  2421,  1055,  1058,  3266, 32098,     1,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0],
        [ 2150,    12,     8,   419,  2708,  7159, 30729,  1693,     6,     8,
          5237, 15025,    13, 13485,    41,   994,  8918,    38,   332,  6218,
          5688,    61,  1341,    12, 32099,    11,   323,    18, 12990,    47,
          2008,    16,  7996,   668,     9,     5, 32098,     1]])
tensor([[32099,  2416,   344, 25007, 15766,    32,     7,   159,     6, 32098,
            21,  2847,  7765,   308,  4481,     5, 32096,     1],
        [32099,  9392,   358,    24,   130,  4019,    95,    18, 32098,     1,
          -100,  -100,  -100,  -100,  -100,  -100,  -100,  -100]])
-----------------
torch.Size([2, 38])
torch.Size([2, 7])
tenso

In [36]:
import torch
from transformers import T5ForConditionalGeneration, AdamW
from tqdm import tqdm

def train_t5_unsupervised(
    model,
    dataloader,
    optimizer=None,
    device="cuda" if torch.cuda.is_available() else "cpu",
    num_epochs=3,
    accumulation_steps=1,
    save_path="t5_finetuned.pt"
):
    model = model.to(device)
    model.train()

    if optimizer is None:
        optimizer = AdamW(model.parameters(), lr=5e-5)

    for epoch in range(num_epochs):
        total_loss = 0.0
        loop = tqdm(dataloader, desc=f"Epoch {epoch+1}/{num_epochs}", leave=False)

        for step, batch in enumerate(loop):
            # Move batch to device
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["labels"].to(device)

            # Forward pass
            outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss
            loss = loss / accumulation_steps
            loss.backward()

            # Optimizer step
            if (step + 1) % accumulation_steps == 0:
                optimizer.step()
                optimizer.zero_grad()

            total_loss += loss.item() * accumulation_steps
            loop.set_postfix(loss=loss.item())

        avg_loss = total_loss / len(dataloader)
        print(f"Epoch {epoch+1} Loss: {avg_loss:.4f}")

        # Save checkpoint
        torch.save(model.state_dict(), f"{save_path}_epoch{epoch+1}.pt")


In [40]:
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler
from transformers import T5ForConditionalGeneration, AdamW
from tqdm import tqdm
import os

def setup(rank, world_size):
    """
    Initialize the distributed environment.
    """
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'
    
    # Initialize the process group
    dist.init_process_group("nccl", rank=rank, world_size=world_size)

def cleanup():
    """
    Clean up the distributed environment.
    """
    dist.destroy_process_group()

def train_t5_on_gpu(
    rank,
    world_size,
    model,
    dataset,
    collate_fn,
    batch_size=8,
    optimizer=None,
    num_epochs=3,
    accumulation_steps=1,
    save_path="t5_finetuned.pt"
):
    # Setup the distributed environment
    setup(rank, world_size)
    
    # Create DistributedSampler to handle data distribution
    sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank, shuffle=True)
    
    # Create dataloader with the sampler
    dataloader = DataLoader(
        dataset, 
        batch_size=batch_size, 
        sampler=sampler,
        collate_fn=collate_fn,
        num_workers=4,
        pin_memory=True
    )
    
    # Move model to the correct device
    device = torch.device(f"cuda:{rank}")
    model = model.to(device)
    
    # Wrap the model with DDP
    model = DDP(model, device_ids=[rank], output_device=rank)
    
    # Create optimizer if not provided
    if optimizer is None:
        optimizer = AdamW(model.parameters(), lr=5e-5)
    
    for epoch in range(num_epochs):
        # Important: set the epoch for the sampler
        sampler.set_epoch(epoch)
        
        model.train()
        total_loss = 0.0
        
        if rank == 0:  # Only show progress on the first GPU
            loop = tqdm(dataloader, desc=f"Epoch {epoch+1}/{num_epochs}", leave=False)
        else:
            loop = dataloader
        
        for step, batch in enumerate(loop):
            # Move batch to device
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["labels"].to(device)
            
            # Forward pass
            outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss
            loss = loss / accumulation_steps
            loss.backward()
            
            # Optimizer step
            if (step + 1) % accumulation_steps == 0:
                optimizer.step()
                optimizer.zero_grad()
            
            total_loss += loss.item() * accumulation_steps
            
            if rank == 0 and isinstance(loop, tqdm):
                loop.set_postfix(loss=loss.item())
        
        # Calculate and print average loss across all GPUs
        avg_loss = total_loss / len(dataloader)
        torch.distributed.all_reduce(torch.tensor(avg_loss).to(device))
        avg_loss = avg_loss / world_size
        
        if rank == 0:
            print(f"Epoch {epoch+1} Loss: {avg_loss:.4f}")
            # Save checkpoint (only from one process to avoid corruptions)
            torch.save(model.module.state_dict(), f"{save_path}_epoch{epoch+1}.pt")
    
    # Clean up
    cleanup()

def train_t5_unsupervised(
    model,
    dataset,
    collate_fn,
    batch_size=8,
    num_gpus=2,  # Default to 2 GPUs
    optimizer=None,
    num_epochs=3,
    accumulation_steps=1,
    save_path="t5_finetuned.pt"
):
    """
    Main function to spawn multiple processes for distributed training.
    """
    # Make sure we have GPUs
    assert torch.cuda.is_available(), "No GPU available!"
    world_size = min(torch.cuda.device_count(), num_gpus)
    
    print(f"Starting distributed training on {world_size} GPUs...")
    
    # Spawn processes
    mp.spawn(
        train_t5_on_gpu,
        args=(world_size, model, dataset, collate_fn, batch_size, optimizer, num_epochs, accumulation_steps, save_path),
        nprocs=world_size,
        join=True
    )
    
    print("Training completed!")

In [41]:
train_t5_unsupervised(model, dataloader, collator)

Starting distributed training on 2 GPUs...


PicklingError: Can't pickle <class '__main__.T5Dataset'>: it's not the same object as __main__.T5Dataset

In [1]:
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler
from transformers import T5ForConditionalGeneration, AdamW
from transformers import T5ForConditionalGeneration, T5Tokenizer
from dataset_utils import T5Dataset, collator
from tqdm import tqdm
import os

def setup(rank, world_size):
    """
    Initialize the distributed environment.
    """
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'
    
    # Initialize the process group
    dist.init_process_group("nccl", rank=rank, world_size=world_size)

def cleanup():
    """
    Clean up the distributed environment.
    """
    dist.destroy_process_group()

def train_t5_on_gpu(
    rank,
    world_size,
    model,
    dataset,
    collate_fn,
    batch_size=8,
    optimizer=None,
    num_epochs=3,
    accumulation_steps=1,
    save_path="t5_finetuned.pt"
):
    # Setup the distributed environment
    setup(rank, world_size)
    
    # Create DistributedSampler to handle data distribution
    sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank, shuffle=True)
    
    # Create dataloader with the sampler
    dataloader = DataLoader(
        dataset, 
        batch_size=batch_size, 
        sampler=sampler,
        collate_fn=collate_fn,
        num_workers=4,
        pin_memory=True
    )
    
    # Move model to the correct device
    device = torch.device(f"cuda:{rank}")
    model = model.to(device)
    
    # Wrap the model with DDP
    model = DDP(model, device_ids=[rank], output_device=rank)
    
    # Create optimizer if not provided
    if optimizer is None:
        optimizer = AdamW(model.parameters(), lr=5e-5)
    
    for epoch in range(num_epochs):
        # Important: set the epoch for the sampler
        sampler.set_epoch(epoch)
        
        model.train()
        total_loss = 0.0
        
        if rank == 0:  # Only show progress on the first GPU
            loop = tqdm(dataloader, desc=f"Epoch {epoch+1}/{num_epochs}", leave=False)
        else:
            loop = dataloader
        
        for step, batch in enumerate(loop):
            # Move batch to device
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["labels"].to(device)
            
            # Forward pass
            outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss
            loss = loss / accumulation_steps
            loss.backward()
            
            # Optimizer step
            if (step + 1) % accumulation_steps == 0:
                optimizer.step()
                optimizer.zero_grad()
            
            total_loss += loss.item() * accumulation_steps
            
            if rank == 0 and isinstance(loop, tqdm):
                loop.set_postfix(loss=loss.item())
        
        # Calculate and print average loss across all GPUs
        avg_loss = total_loss / len(dataloader)
        torch.distributed.all_reduce(torch.tensor(avg_loss).to(device))
        avg_loss = avg_loss / world_size
        
        if rank == 0:
            print(f"Epoch {epoch+1} Loss: {avg_loss:.4f}")
            # Save checkpoint (only from one process to avoid corruptions)
            torch.save(model.module.state_dict(), f"{save_path}_epoch{epoch+1}.pt")
    
    # Clean up
    cleanup()

def train_t5_unsupervised(
    model,
    dataset,
    collate_fn,
    batch_size=8,
    num_gpus=2,  # Default to 2 GPUs
    optimizer=None,
    num_epochs=3,
    accumulation_steps=1,
    save_path="t5_finetuned.pt"
):
    """
    Main function to spawn multiple processes for distributed training.
    """
    # Make sure we have GPUs
    assert torch.cuda.is_available(), "No GPU available!"
    world_size = min(torch.cuda.device_count(), num_gpus)
    
    print(f"Starting distributed training on {world_size} GPUs...")
    
    # Spawn processes
    mp.spawn(
        train_t5_on_gpu,
        args=(world_size, model, dataset, collate_fn, batch_size, optimizer, num_epochs, accumulation_steps, save_path),
        nprocs=world_size,
        join=True
    )
    
    print("Training completed!")

def my_collator(batch):
    return collator(batch, tokenizer)

class CollatorWrapper:
    def __init__(self, tokenizer):
        self.tokenizer = tokenizer
    
    def __call__(self, batch):
        from dataset_utils import collator
        return collator(batch, self.tokenizer)

if __name__ == '__main__':
    import os
    os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'  # Suppress TensorFlow logging
    model_variants = ["google-t5/t5-base", "google-t5/t5-3b", "google/flan-t5-large", "google-t5/t5-11b"]
    device = 'cuda:0'

    model = T5ForConditionalGeneration.from_pretrained(
        model_variants[0],
        torch_dtype=torch.float16,
        token = os.getenv('HF_ACCESS_TOKEN')
    )

    tokenizer = T5Tokenizer.from_pretrained(
        model_variants[0]
    )

    raw_input = '/home/tadesa1/ADBMO-UNLV/data/processed_output_raw.txt'
    with open(raw_input, 'r') as f:
        text = f.readlines()
        text = [line for line in text if line.strip()]


    dataset = T5Dataset(text, tokenizer)
    dataloader = DataLoader(dataset, batch_size=2, shuffle=True, collate_fn=lambda batch: collator(batch, tokenizer))

    my_collator = CollatorWrapper(tokenizer)
    train_t5_unsupervised(model, dataset, my_collator)
    # train_t5_unsupervised(model, dataset, lambda batch: collator(batch, tokenizer))


2025-03-27 17:43:31.949152: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-03-27 17:43:31.961694: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1743097411.975699  975263 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1743097411.980248  975263 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-03-27 17:43:31.997105: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instr

Starting distributed training on 2 GPUs...


Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/home/tadesa1/anaconda3/envs/nlp_ml_env/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/home/tadesa1/anaconda3/envs/nlp_ml_env/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'train_t5_on_gpu' on <module '__main__' (built-in)>


KeyboardInterrupt: 