####  Dataset Loading

In [1]:
requirements_content = """
torch==2.1.2
torchvision==0.16.2
torchaudio==2.1.2
pytorch-lightning==2.1.3
ray[train]==2.9.0
deepspeed==0.12.3
tiktoken==0.5.2
pandas==2.1.4
numpy==1.24.3
pyyaml==6.0.1
omegaconf==2.3.0
dataclasses==0.6
typing-extensions==4.9.0
tensorboard==2.15.1
pytest==7.4.4
black==23.12.1
isort==5.13.2
"""

with open('/kaggle/working/requirements.txt', 'w') as f:
    f.write(requirements_content.strip())

In [2]:
# !apt install python3.10-venv
# !python -m venv venv
# !source venv/bin/activate
# !pip install -r /kaggle/working/requirements.txt

In [3]:
# !source venv/bin/activate

In [4]:
!pip install "ray[train]"  deepspeed

Collecting deepspeed
  Downloading deepspeed-0.16.2.tar.gz (1.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m18.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting tensorboardX>=1.9 (from ray[train])
  Downloading tensorboardX-2.6.2.2-py2.py3-none-any.whl.metadata (5.8 kB)
Collecting hjson (from deepspeed)
  Downloading hjson-3.1.0-py3-none-any.whl.metadata (2.6 kB)
Collecting nvidia-ml-py (from deepspeed)
  Downloading nvidia_ml_py-12.560.30-py3-none-any.whl.metadata (8.6 kB)
Downloading tensorboardX-2.6.2.2-py2.py3-none-any.whl (101 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m101.7/101.7 kB[0m [31m8.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading hjson-3.1.0-py3-none-any.whl (54 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m54.0/54.0 kB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading nvidia_ml_py-12.560.30-py3-none-any.w

In [5]:
import os
from typing import Dict, Any, Optional
import ray
from ray.air import RunConfig, ScalingConfig  # Updated import
from ray.train.lightning import RayLightningEnvironment, prepare_trainer
import torch
import logging


In [6]:
# !pip install "ray[train,tune]==2.9.0" torch==2.1.2 "ray[default,train]"

In [7]:
!pip install deepspeed



In [8]:
import torch
import pytorch_lightning as pl
import ray
import numpy as np
import pandas as pd
from typing_extensions import Protocol
import deepspeed
print(f"PyTorch version: {torch.__version__}")
print(f"PyTorch Lightning version: {pl.__version__}")
print(f"Ray version: {ray.__version__}")
print(f"DeepSpeed version: {deepspeed.__version__}")
print(f"NumPy version: {np.__version__}")
print(f"Pandas version: {pd.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA version: {torch.version.cuda}")
    print(f"Number of GPUs: {torch.cuda.device_count()}")

[2025-01-20 03:02:36,696] [INFO] [real_accelerator.py:222:get_accelerator] Setting ds_accelerator to cuda (auto detect)
PyTorch version: 2.5.1+cu121
PyTorch Lightning version: 2.5.0.post0
Ray version: 2.40.0
DeepSpeed version: 0.16.2
NumPy version: 1.26.4
Pandas version: 2.2.2
CUDA available: True
CUDA version: 12.1
Number of GPUs: 2


In [9]:
# data/dataset.py
from dataclasses import dataclass
from typing import List, Tuple, Optional, Any
import torch
from torch.utils.data import Dataset, DataLoader
import tiktoken
import pandas as pd
import numpy as np
import logging
import os

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class DatasetConfig:
    """Configuration class for dataset parameters"""
    csv_path: str
    batch_size: int = 4
    max_length: int = 256
    stride: int = 128
    tokenizer_name: str = "gpt2"
    num_workers: int = 4
    shuffle: bool = True
    drop_last: bool = True
    val_split: float = 0.1  # Validation split ratio
    seed: int = 42  # Random seed for reproducibility
    
    def __post_init__(self):
        """Validate configuration parameters"""
        if not os.path.exists(self.csv_path):
            raise FileNotFoundError(f"CSV file not found at {self.csv_path}")
        if self.batch_size < 1:
            raise ValueError("Batch size must be positive")
        if self.max_length < 1:
            raise ValueError("Max length must be positive")
        if self.stride < 1:
            raise ValueError("Stride must be positive")
        if self.num_workers < 0:
            raise ValueError("Number of workers must be non-negative")
        if not 0 <= self.val_split < 1:
            raise ValueError("Validation split must be between 0 and 1")

In [10]:
class GPTDataset(Dataset):
    """PyTorch Dataset for processing text data for GPT models"""
    
    def __init__(
        self, 
        texts: List[str],
        tokenizer_name: str = "gpt2",
        max_length: int = 256,
        stride: int = 128
    ):
        """Initialize the dataset with texts and processing parameters"""
        super().__init__()
        self.tokenizer = self._initialize_tokenizer(tokenizer_name)
        self.max_length = max_length
        self.stride = stride
        
        # Initialize storage for processed data
        self.input_ids: List[torch.Tensor] = []
        self.target_ids: List[torch.Tensor] = []
        
        # Process texts
        self._process_texts(texts)
        
        logger.info(f"Initialized dataset with {len(self)} samples")
    
    def _initialize_tokenizer(self, tokenizer_name: str) -> Any:
        """Initialize the tokenizer with error handling"""
        try:
            return tiktoken.get_encoding(tokenizer_name)
        except Exception as e:
            logger.error(f"Failed to initialize tokenizer: {str(e)}")
            raise RuntimeError(f"Tokenizer initialization failed: {str(e)}")
    
    def _is_valid_text(self, text: Any) -> bool:
        """Validate text input"""
        if text is None or pd.isna(text):
            return False
        if isinstance(text, (int, float)):
            return not np.isnan(float(text))
        return bool(str(text).strip())
    
    def _process_texts(self, texts: List[str]) -> None:
        """Process all texts in the dataset"""
        valid_count = 0
        for text in texts:
            if self._is_valid_text(text):
                self._process_single_text(str(text).strip())
                valid_count += 1
        
        if valid_count == 0:
            raise ValueError("No valid texts found in the dataset")
        
        logger.info(f"Processed {valid_count} valid texts out of {len(texts)} total texts")
    
    def _process_single_text(self, text: str) -> None:
        """Process a single text into chunks"""
        try:
            token_ids = self.tokenizer.encode(text, allowed_special={"<|endoftext|>"})
            
            for i in range(0, len(token_ids) - self.max_length, self.stride):
                input_chunk = token_ids[i:i + self.max_length]
                target_chunk = token_ids[i + 1:i + self.max_length + 1]
                
                if len(input_chunk) == self.max_length and len(target_chunk) == self.max_length:
                    self.input_ids.append(torch.tensor(input_chunk, dtype=torch.long))
                    self.target_ids.append(torch.tensor(target_chunk, dtype=torch.long))
                    
        except Exception as e:
            logger.warning(f"Error processing text chunk: {str(e)}")
    
    def __len__(self) -> int:
        """Return the number of samples in the dataset"""
        return len(self.input_ids)
    
    def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor]:
        """Get a sample from the dataset"""
        return self.input_ids[idx], self.target_ids[idx]

In [11]:
class DatasetManager:
    """Manager class for handling dataset creation and dataloader configuration"""
    
    def __init__(self, config: DatasetConfig):
        """Initialize the dataset manager with configuration"""
        self.config = config
        self.train_dataset: Optional[GPTDataset] = None
        self.val_dataset: Optional[GPTDataset] = None
        
        # Set random seed for reproducibility
        torch.manual_seed(self.config.seed)
        np.random.seed(self.config.seed)
    
    def _prepare_texts(self, content_series: pd.Series) -> List[str]:
        """Clean and prepare texts from DataFrame"""
        texts = []
        for text in content_series:
            if pd.isna(text):
                continue
            
            if isinstance(text, (int, float)):
                if not np.isnan(float(text)):
                    texts.append(str(text).strip())
            elif isinstance(text, str) and text.strip():
                texts.append(text.strip())
        
        return texts
    
    def _split_texts(self, texts: List[str]) -> Tuple[List[str], List[str]]:
        """Split texts into training and validation sets"""
        # Shuffle texts deterministically
        np.random.shuffle(texts)
        
        # Calculate split index
        val_size = int(len(texts) * self.config.val_split)
        
        # Split texts
        train_texts = texts[val_size:]
        val_texts = texts[:val_size]
        
        logger.info(f"Split dataset: {len(train_texts)} training samples, {len(val_texts)} validation samples")
        return train_texts, val_texts
    
    def initialize_datasets(self) -> None:
        """Initialize training and validation datasets from the CSV file"""
        try:
            df = pd.read_csv(self.config.csv_path)
            texts = self._prepare_texts(df["Content"][:10])
            train_texts, val_texts = self._split_texts(texts)
            
            # Initialize training dataset
            self.train_dataset = GPTDataset(
                texts=train_texts,
                tokenizer_name=self.config.tokenizer_name,
                max_length=self.config.max_length,
                stride=self.config.stride
            )
            
            # Initialize validation dataset
            self.val_dataset = GPTDataset(
                texts=val_texts,
                tokenizer_name=self.config.tokenizer_name,
                max_length=self.config.max_length,
                stride=self.config.stride
            )
            
            logger.info("Successfully initialized training and validation datasets")
            
        except Exception as e:
            logger.error(f"Failed to initialize datasets: {str(e)}")
            raise
    
    def _create_dataloader(self, dataset: Dataset, is_training: bool = True) -> DataLoader:
        """Create a DataLoader with the specified configuration"""
        return DataLoader(
            dataset,
            batch_size=self.config.batch_size,
            num_workers=self.config.num_workers,
            shuffle=self.config.shuffle if is_training else False,
            drop_last=self.config.drop_last if is_training else False,
            pin_memory=True
        )
    
    def get_train_dataloader(self) -> DataLoader:
        """Get the training DataLoader"""
        if self.train_dataset is None:
            raise RuntimeError("Datasets not initialized. Call initialize_datasets() first")
        return self._create_dataloader(self.train_dataset, is_training=True)
    
    def get_val_dataloader(self) -> DataLoader:
        """Get the validation DataLoader"""
        if self.val_dataset is None:
            raise RuntimeError("Datasets not initialized. Call initialize_datasets() first")
        return self._create_dataloader(self.val_dataset, is_training=False)

In [12]:
# models/model.py
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
from dataclasses import dataclass
from typing import Optional, Tuple

@dataclass
class GPTConfig:
    """Configuration for GPT model"""
    vocab_size: int
    block_size: int = 1024    # Maximum sequence length
    n_layer: int = 12         # Number of transformer layers
    n_head: int = 12         # Number of attention heads
    n_embd: int = 768        # Embedding dimension
    dropout: float = 0.1     # Dropout rate

class SelfAttention(nn.Module):
    """Multi-head self attention"""
    def __init__(self, config: GPTConfig):
        super().__init__()
        assert config.n_embd % config.n_head == 0
        
        # Key, query, value projections for all heads
        self.c_attn = nn.Linear(config.n_embd, 3 * config.n_embd)
        # Output projection
        self.c_proj = nn.Linear(config.n_embd, config.n_embd)
        # Regularization
        self.attn_dropout = nn.Dropout(config.dropout)
        self.resid_dropout = nn.Dropout(config.dropout)
        # Causal mask to ensure that attention is only applied to the left in the input sequence
        self.register_buffer(
            "mask",
            torch.tril(torch.ones(config.block_size, config.block_size))
            .view(1, 1, config.block_size, config.block_size)
        )
        
        self.n_head = config.n_head
        self.n_embd = config.n_embd

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        B, T, C = x.size() # batch size, sequence length, embedding dimension
        
        # Calculate query, key, values for all heads in batch
        q, k, v = self.c_attn(x).split(self.n_embd, dim=2)
        k = k.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)
        q = q.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)
        v = v.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)

        # Causal self-attention; Self-attend: (B, nh, T, hs) x (B, nh, hs, T) -> (B, nh, T, T)
        att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1)))
        att = att.masked_fill(self.mask[:,:,:T,:T] == 0, float('-inf'))
        att = F.softmax(att, dim=-1)
        att = self.attn_dropout(att)
        y = att @ v # (B, nh, T, T) x (B, nh, T, hs) -> (B, nh, T, hs)
        y = y.transpose(1, 2).contiguous().view(B, T, C) # re-assemble all head outputs side by side

        # Output projection
        y = self.resid_dropout(self.c_proj(y))
        return y

class Block(nn.Module):
    """Transformer block: communication followed by computation"""
    def __init__(self, config: GPTConfig):
        super().__init__()
        self.ln_1 = nn.LayerNorm(config.n_embd)
        self.attn = SelfAttention(config)
        self.ln_2 = nn.LayerNorm(config.n_embd)
        self.mlp = nn.Sequential(
            nn.Linear(config.n_embd, 4 * config.n_embd),
            nn.GELU(),
            nn.Linear(4 * config.n_embd, config.n_embd),
            nn.Dropout(config.dropout),
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = x + self.attn(self.ln_1(x))
        x = x + self.mlp(self.ln_2(x))
        return x

class GPT(nn.Module):
    """GPT Language Model"""
    def __init__(self, config: GPTConfig):
        super().__init__()
        self.config = config
        
        # Input embedding stem
        self.tok_emb = nn.Embedding(config.vocab_size, config.n_embd)
        self.pos_emb = nn.Parameter(torch.zeros(1, config.block_size, config.n_embd))
        self.dropout = nn.Dropout(config.dropout)
        
        # Transformer blocks
        self.blocks = nn.ModuleList([Block(config) for _ in range(config.n_layer)])
        
        # Decoder head
        self.ln_f = nn.LayerNorm(config.n_embd)
        self.head = nn.Linear(config.n_embd, config.vocab_size, bias=False)
        
        # Initialize weights
        self.apply(self._init_weights)

    def _init_weights(self, module: nn.Module) -> None:
        if isinstance(module, (nn.Linear, nn.Embedding)):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if isinstance(module, nn.Linear) and module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.LayerNorm):
            torch.nn.init.zeros_(module.bias)
            torch.nn.init.ones_(module.weight)

    def forward(self, idx: torch.Tensor, targets: Optional[torch.Tensor] = None) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
        """Forward pass of GPT model"""
        b, t = idx.size()
        assert t <= self.config.block_size, f"Cannot forward sequence of length {t}, block size is only {self.config.block_size}"

        # Forward the GPT model
        # token embeddings of shape (b, t, n_embd)
        token_embeddings = self.tok_emb(idx)
        position_embeddings = self.pos_emb[:, :t, :] # each position maps to a (learnable) vector
        x = self.dropout(token_embeddings + position_embeddings)
        
        # Forward through transformer blocks
        for block in self.blocks:
            x = block(x)
        x = self.ln_f(x)
        
        # Final linear layer
        logits = self.head(x)

        # If we are given some desired targets also calculate the loss
        loss = None
        if targets is not None:
            loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1))

        return logits, loss

    @torch.no_grad()
    def generate(
        self,
        idx: torch.Tensor,
        max_new_tokens: int,
        temperature: float = 1.0,
        do_sample: bool = True,
        top_k: Optional[int] = None
    ) -> torch.Tensor:
        """
        Generate new tokens from the model.
        Args:
            idx: (b, t) array of indices in the current sequence
            max_new_tokens: number of tokens to generate
            temperature: temperature for sampling
            do_sample: if True, sample from distribution, otherwise take argmax
            top_k: if set, only sample from the top k most probable tokens
        Returns:
            (b, t+n) array of indices in the sequence
        """
        for _ in range(max_new_tokens):
            # Crop sequence if needed
            idx_cond = idx if idx.size(1) <= self.config.block_size else idx[:, -self.config.block_size:]
            # Get predictions
            logits, _ = self(idx_cond)
            # Focus on last time step
            logits = logits[:, -1, :] / temperature
            
            # Optionally crop probabilities to only the top k options
            if top_k is not None:
                v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
                logits[logits < v[:, [-1]]] = float('-inf')
                
            # Apply softmax to convert logits to probabilities
            probs = F.softmax(logits, dim=-1)
            
            # Sample from the distribution or take the most likely element
            if do_sample:
                idx_next = torch.multinomial(probs, num_samples=1)
            else:
                _, idx_next = torch.topk(logits, k=1, dim=-1)
            
            # Append sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1)

        return idx

In [13]:
import torch
import multiprocessing
import os

# Check GPU count
gpu_count = torch.cuda.device_count()
print(f"Number of GPUs available: {gpu_count}")

# Check CPU count
cpu_count = multiprocessing.cpu_count()
print(f"Number of CPU cores: {cpu_count}")

# Check GPU information
if torch.cuda.is_available():
    for i in range(gpu_count):
        gpu_properties = torch.cuda.get_device_properties(i)
        print(f"\nGPU {i}: {gpu_properties.name}")
        print(f"Memory: {gpu_properties.total_memory / 1024**3:.2f} GB")
        print(f"Compute Capability: {gpu_properties.major}.{gpu_properties.minor}")

print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"Current GPU name: {torch.cuda.get_device_name(0)}")

Number of GPUs available: 2
Number of CPU cores: 4

GPU 0: Tesla T4
Memory: 14.74 GB
Compute Capability: 7.5

GPU 1: Tesla T4
Memory: 14.74 GB
Compute Capability: 7.5
CUDA available: True
Current GPU name: Tesla T4


In [14]:
# train.py
import ray
import ray.train
from ray.train import Checkpoint, DataConfig, ScalingConfig
from ray.train.torch import TorchTrainer
import deepspeed
from deepspeed.accelerator import get_accelerator
import torch
from tempfile import TemporaryDirectory
from typing import Dict, Any, Optional
from dataclasses import dataclass
import torch.nn.functional as F
import os

@dataclass
class RayConfig:
    """Ray-specific configuration for distributed training"""
    num_workers: int = 2         # Use both GPUs
    cpus_per_worker: int = 2     # 4 CPUs total, so 2 per worker
    gpus_per_worker: float = 1.0 # Each worker gets 1 full GPU
    use_gpu: bool = True
    address: Optional[str] = None

    
    def __post_init__(self):
        """Auto-detect GPU configuration"""
        if self.num_workers is None:
            if torch.cuda.is_available():
                # Use all available GPUs
                self.num_workers = torch.cuda.device_count()
            else:
                self.num_workers = 1
                self.use_gpu = False
                self.gpus_per_worker = 0
    
    def get_scaling_config(self) -> ScalingConfig:
        """Get Ray scaling configuration for multi-GPU"""
        return ScalingConfig(
            num_workers=self.num_workers,
            use_gpu=self.use_gpu,
            resources_per_worker={
                "CPU": self.cpus_per_worker,
                "GPU": self.gpus_per_worker
            },
            placement_strategy="PACK" if self.use_gpu else "SPREAD"
        )

@dataclass
class TrainingConfig:
    """Configuration for training parameters"""
    # Model configuration
    vocab_size: int = 50257  # GPT-2 vocabulary size
    block_size: int = 1024
    n_layer: int = 12
    n_head: int = 12
    n_embd: int = 768
    dropout: float = 0.1
    
    # Training configuration
    num_epochs: int = 3
    train_batch_size: int = 16
    eval_batch_size: int = 32
    log_interval: int = 10
    data_path: str = "/kaggle/input/llms-txt/files.csv"
    tokenizer_name: str = "gpt2"
    
    # Learning rate configuration
    learning_rate: float = 1e-4
    warmup_steps: int = 2000
    
    # DeepSpeed configuration
    fp16: bool = True
    bf16: bool = False
    zero_stage: int = 3

def create_deepspeed_config(config: TrainingConfig) -> Dict[str, Any]:
    """Create DeepSpeed configuration optimized for 2x T4 GPUs"""
    world_size = torch.cuda.device_count()  # Get number of available GPUs
    micro_batch = config.train_batch_size // world_size  # Ensure divisible by world_size
    gradient_acc_steps = 1
    train_batch = micro_batch * gradient_acc_steps * world_size
    
    print(f"DeepSpeed Config - world_size: {world_size}, micro_batch: {micro_batch}, train_batch: {train_batch}")
    
    return {
        "train_micro_batch_size_per_gpu": micro_batch,
        "gradient_accumulation_steps": gradient_acc_steps,
        "train_batch_size": train_batch,
        "optimizer": {
            "type": "AdamW",
            "params": {
                "lr": config.learning_rate,
                "betas": (0.9, 0.95),
                "weight_decay": 0.1,
            },
        },
        "scheduler": {
            "type": "WarmupLR",
            "params": {"warmup_num_steps": config.warmup_steps}
        },
        "fp16": {"enabled": config.fp16},
        "bf16": {"enabled": config.bf16},
        "zero_optimization": {
            "stage": config.zero_stage,
            "offload_optimizer": {
                "device": "none",
            },
            "offload_param": {
                "device": "none",
            },
            "overlap_comm": True,
            "contiguous_gradients": True,
            "reduce_bucket_size": 5e7,
            "stage3_prefetch_bucket_size": 5e7,
            "stage3_param_persistence_threshold": 1e5,
        },
        "gradient_clipping": 1.0,
        "steps_per_print": 10,
        "wall_clock_breakdown": False
    }

def train_func(config: Dict[str, Any]) -> None:
    """Training function that will be launched on each worker"""
    
    # Initialize distributed backend
    deepspeed.init_distributed()
    
    # Convert dictionary config to TrainingConfig
    train_config = TrainingConfig(
        vocab_size=config["vocab_size"],
        block_size=config["block_size"],
        n_layer=config["n_layer"],
        n_head=config["n_head"],
        n_embd=config["n_embd"],
        dropout=config["dropout"],
        num_epochs=config["num_epochs"],
        train_batch_size=config["train_batch_size"],
        eval_batch_size=config["eval_batch_size"],
        log_interval=config["log_interval"],
        data_path=config["data_path"],
        tokenizer_name=config["tokenizer_name"],
        learning_rate=config["learning_rate"],
        warmup_steps=config["warmup_steps"],
        fp16=config["fp16"],
        bf16=config["bf16"],
        zero_stage=config["zero_stage"]
    )
    
    # Initialize model configuration
    model_config = GPTConfig(
        vocab_size=train_config.vocab_size,
        block_size=train_config.block_size,
        n_layer=train_config.n_layer,
        n_head=train_config.n_head,
        n_embd=train_config.n_embd,
        dropout=train_config.dropout
    )
    
    # Initialize model
    model = GPT(model_config)
    
  
    
    # Initialize DeepSpeed with proper config
    ds_config = create_deepspeed_config(train_config)
    model, optimizer, _, scheduler = deepspeed.initialize(
        model=model,
        model_parameters=model.parameters(),
        config=ds_config
    )
    
    # Get data shard for this worker and create iterators
    train_dataset = ray.train.get_dataset_shard("train")
    val_dataset = ray.train.get_dataset_shard("validation")
    
    train_dataloader = train_dataset.iter_torch_batches(
        batch_size=train_config.train_batch_size,
        dtypes={"input_ids": torch.long, "labels": torch.long}
    )
    val_dataloader = val_dataset.iter_torch_batches(
        batch_size=train_config.eval_batch_size,
        dtypes={"input_ids": torch.long, "labels": torch.long}
    )
    
    device = get_accelerator().device_name(model.local_rank)
    
    # Training loop
    for epoch in range(train_config.num_epochs):  # Changed from config to train_config
        model.train()
        train_loss = 0.0
        num_train_batches = 0
        
        # Use the iterator directly
        for batch in train_dataloader:
            # Move batch to device
            input_ids = batch["input_ids"].to(device)
            targets = batch["labels"].to(device)
            
            # Forward pass
            logits, _ = model(input_ids)
            
            # Calculate loss
            loss = F.cross_entropy(
                logits.view(-1, logits.size(-1)),
                targets.view(-1),
                ignore_index=-100
            )
            
            # Backward pass using DeepSpeed engine
            model.backward(loss)
            model.step()
            
            train_loss += loss.item()
            num_train_batches += 1
            
            # Log progress
            if num_train_batches % train_config.log_interval == 0 and model.global_rank == 0:
                print(f"Epoch: {epoch}, Batch: {num_train_batches}, Loss: {loss.item():.4f}")
        
        # Validation
        model.eval()
        val_loss = 0.0
        num_val_batches = 0
        
        with torch.no_grad():
            for batch in val_dataloader:
                input_ids = batch["input_ids"].to(device)
                targets = batch["labels"].to(device)
                
                logits, _ = model(input_ids)
                loss = F.cross_entropy(
                    logits.view(-1, logits.size(-1)),
                    targets.view(-1),
                    ignore_index=-100
                )
                val_loss += loss.item()
                num_val_batches += 1
        
        # Average losses
        train_loss = train_loss / num_train_batches if num_train_batches > 0 else float('inf')
        val_loss = val_loss / num_val_batches if num_val_batches > 0 else float('inf')
        
        metrics = {
            "train_loss": train_loss,
            "val_loss": val_loss,
            "epoch": epoch
        }
        
        # Save checkpoint and report metrics
        if model.global_rank == 0:
            print(f"Epoch {epoch}: {metrics}")
            
        with TemporaryDirectory() as tmpdir:
            # Save model checkpoint
            model.save_checkpoint(tmpdir)
            
            # Ensure all workers finished saving
            torch.distributed.barrier()
            
            # Report metrics and checkpoint
            ray.train.report(
                metrics=metrics,
                checkpoint=Checkpoint.from_directory(tmpdir)
            )

def main():
    # Shutdown Ray if it's already running
    if ray.is_initialized():
        ray.shutdown()
    
    # Initialize Ray configurations
    ray_config = RayConfig()
    training_config = TrainingConfig()
    
    # Initialize Ray cluster
    if ray_config.address:
        ray.init(address=ray_config.address, ignore_reinit_error=True)
    else:
        ray.init(ignore_reinit_error=True)
    
    try:
        # Convert training config to dictionary
        train_config_dict = {
            key: getattr(training_config, key)
            for key in training_config.__dataclass_fields__
        }
        
        # Initialize datasets
        data_manager = DatasetManager(DatasetConfig(csv_path=training_config.data_path))
        data_manager.initialize_datasets()
        
        # Convert tensors to numpy arrays for Ray Dataset
        train_data = [(x.numpy(), y.numpy()) for x, y in data_manager.train_dataset]
        val_data = [(x.numpy(), y.numpy()) for x, y in data_manager.val_dataset]
        
        # Convert to Ray datasets with proper schema
        ray_datasets = {
            "train": ray.data.from_items([
                {"input_ids": x, "labels": y} for x, y in train_data
            ]),
            "validation": ray.data.from_items([
                {"input_ids": x, "labels": y} for x, y in val_data
            ])
        }
        
        # Initialize trainer
        trainer = TorchTrainer(
            train_func,
            train_loop_config=train_config_dict,
            scaling_config=ray_config.get_scaling_config(),
            datasets=ray_datasets,
            dataset_config=DataConfig(
                datasets_to_split=["train", "validation"]
            )
        )
        
        # Start training
        result = trainer.fit()
        
        # Get best checkpoints
        best_checkpoints = result.best_checkpoints
        return best_checkpoints
        
    finally:
        # Always shutdown Ray when we're done
        ray.shutdown()

if __name__ == "__main__":
    main()

2025-01-20 03:02:53,281	INFO worker.py:1821 -- Started a local Ray instance.
2025-01-20 03:03:01,487	INFO tune.py:616 -- [output] This uses the legacy output and progress reporter, as Jupyter notebooks are not supported by the new engine, yet. For more information, please see https://github.com/ray-project/ray/issues/36949


== Status ==
Current time: 2025-01-20 03:03:16 (running for 00:00:00.12)
Using FIFO scheduling algorithm.
Logical resource usage: 0/4 CPUs, 0/2 GPUs (0.0/1.0 accelerator_type:T4)
Result logdir: /tmp/ray/session_2025-01-20_03-02-49_860434_18/artifacts/2025-01-20_03-03-01/TorchTrainer_2025-01-20_03-03-01/driver_artifacts
Number of trials: 1/1 (1 PENDING)


[36m(TrainTrainable pid=499)[0m [2025-01-20 03:03:20,815] [INFO] [real_accelerator.py:222:get_accelerator] Setting ds_accelerator to cpu (auto detect)
== Status ==
Current time: 2025-01-20 03:03:21 (running for 00:00:05.14)
Using FIFO scheduling algorithm.
Logical resource usage: 4.0/4 CPUs, 2.0/2 GPUs (0.0/1.0 accelerator_type:T4)
Result logdir: /tmp/ray/session_2025-01-20_03-02-49_860434_18/artifacts/2025-01-20_03-03-01/TorchTrainer_2025-01-20_03-03-01/driver_artifacts
Number of trials: 1/1 (1 PENDING)




[36m(TrainTrainable pid=499)[0m 2025-01-20 03:03:22.746446: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
[36m(TrainTrainable pid=499)[0m 2025-01-20 03:03:22.767802: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
[36m(TrainTrainable pid=499)[0m 2025-01-20 03:03:22.774211: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


== Status ==
Current time: 2025-01-20 03:03:26 (running for 00:00:10.15)
Using FIFO scheduling algorithm.
Logical resource usage: 4.0/4 CPUs, 2.0/2 GPUs (0.0/1.0 accelerator_type:T4)
Result logdir: /tmp/ray/session_2025-01-20_03-02-49_860434_18/artifacts/2025-01-20_03-03-01/TorchTrainer_2025-01-20_03-03-01/driver_artifacts
Number of trials: 1/1 (1 RUNNING)




[36m(RayTrainWorker pid=548)[0m Setting up process group for: env:// [rank=0, world_size=2]


== Status ==
Current time: 2025-01-20 03:03:31 (running for 00:00:15.17)
Using FIFO scheduling algorithm.
Logical resource usage: 4.0/4 CPUs, 2.0/2 GPUs (0.0/1.0 accelerator_type:T4)
Result logdir: /tmp/ray/session_2025-01-20_03-02-49_860434_18/artifacts/2025-01-20_03-03-01/TorchTrainer_2025-01-20_03-03-01/driver_artifacts
Number of trials: 1/1 (1 RUNNING)


== Status ==
Current time: 2025-01-20 03:03:36 (running for 00:00:20.20)
Using FIFO scheduling algorithm.
Logical resource usage: 4.0/4 CPUs, 2.0/2 GPUs (0.0/1.0 accelerator_type:T4)
Result logdir: /tmp/ray/session_2025-01-20_03-02-49_860434_18/artifacts/2025-01-20_03-03-01/TorchTrainer_2025-01-20_03-03-01/driver_artifacts
Number of trials: 1/1 (1 RUNNING)




[36m(TorchTrainer pid=499)[0m Started distributed worker processes: 
[36m(TorchTrainer pid=499)[0m - (node_id=7097748658e24cb109d10619b0730414881dc5dc239aa64622ceb8d7, ip=172.19.2.2, pid=548) world_rank=0, local_rank=0, node_rank=0
[36m(TorchTrainer pid=499)[0m - (node_id=7097748658e24cb109d10619b0730414881dc5dc239aa64622ceb8d7, ip=172.19.2.2, pid=547) world_rank=1, local_rank=1, node_rank=0


[36m(RayTrainWorker pid=548)[0m [2025-01-20 03:03:39,718] [INFO] [real_accelerator.py:222:get_accelerator] Setting ds_accelerator to cuda (auto detect)
== Status ==
Current time: 2025-01-20 03:03:41 (running for 00:00:25.23)
Using FIFO scheduling algorithm.
Logical resource usage: 4.0/4 CPUs, 2.0/2 GPUs (0.0/1.0 accelerator_type:T4)
Result logdir: /tmp/ray/session_2025-01-20_03-02-49_860434_18/artifacts/2025-01-20_03-03-01/TorchTrainer_2025-01-20_03-03-01/driver_artifacts
Number of trials: 1/1 (1 RUNNING)




[36m(RayTrainWorker pid=548)[0m 2025-01-20 03:03:43.224879: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
[36m(RayTrainWorker pid=548)[0m 2025-01-20 03:03:43.247534: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
[36m(RayTrainWorker pid=548)[0m 2025-01-20 03:03:43.254343: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


[36m(RayTrainWorker pid=548)[0m [2025-01-20 03:03:45,670] [INFO] [comm.py:652:init_distributed] cdb=None
[36m(RayTrainWorker pid=547)[0m [2025-01-20 03:03:39,884] [INFO] [real_accelerator.py:222:get_accelerator] Setting ds_accelerator to cuda (auto detect)
== Status ==
Current time: 2025-01-20 03:03:46 (running for 00:00:30.26)
Using FIFO scheduling algorithm.
Logical resource usage: 4.0/4 CPUs, 2.0/2 GPUs (0.0/1.0 accelerator_type:T4)
Result logdir: /tmp/ray/session_2025-01-20_03-02-49_860434_18/artifacts/2025-01-20_03-03-01/TorchTrainer_2025-01-20_03-03-01/driver_artifacts
Number of trials: 1/1 (1 RUNNING)


[36m(RayTrainWorker pid=548)[0m DeepSpeed Config - world_size: 2, micro_batch: 8, train_batch: 16
[36m(RayTrainWorker pid=548)[0m [2025-01-20 03:03:48,305] [INFO] [logging.py:128:log_dist] [Rank 0] DeepSpeed info: version=0.16.2, git-hash=unknown, git-branch=unknown
[36m(RayTrainWorker pid=548)[0m [2025-01-20 03:03:48,305] [INFO] [config.py:733:__init__] Config mesh_dev

[36m(RayTrainWorker pid=547)[0m Using /root/.cache/torch_extensions/py310_cu121 as PyTorch extensions root...
[36m(RayTrainWorker pid=547)[0m 2025-01-20 03:03:43.267345: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
[36m(RayTrainWorker pid=547)[0m 2025-01-20 03:03:43.289253: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
[36m(RayTrainWorker pid=547)[0m 2025-01-20 03:03:43.295792: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
[36m(RayTrainWorker pid=548)[0m Creating extension directory /root/.cache/torch_extensions/py310_cu121/fused_adam...
[36m(RayTrainWorker pid=548)[0

== Status ==
Current time: 2025-01-20 03:03:51 (running for 00:00:35.30)
Using FIFO scheduling algorithm.
Logical resource usage: 4.0/4 CPUs, 2.0/2 GPUs (0.0/1.0 accelerator_type:T4)
Result logdir: /tmp/ray/session_2025-01-20_03-02-49_860434_18/artifacts/2025-01-20_03-03-01/TorchTrainer_2025-01-20_03-03-01/driver_artifacts
Number of trials: 1/1 (1 RUNNING)


== Status ==
Current time: 2025-01-20 03:03:56 (running for 00:00:40.33)
Using FIFO scheduling algorithm.
Logical resource usage: 4.0/4 CPUs, 2.0/2 GPUs (0.0/1.0 accelerator_type:T4)
Result logdir: /tmp/ray/session_2025-01-20_03-02-49_860434_18/artifacts/2025-01-20_03-03-01/TorchTrainer_2025-01-20_03-03-01/driver_artifacts
Number of trials: 1/1 (1 RUNNING)


== Status ==
Current time: 2025-01-20 03:04:01 (running for 00:00:45.37)
Using FIFO scheduling algorithm.
Logical resource usage: 4.0/4 CPUs, 2.0/2 GPUs (0.0/1.0 accelerator_type:T4)
Result logdir: /tmp/ray/session_2025-01-20_03-02-49_860434_18/artifacts/2025-01-20_03-03-01/Tor

[36m(RayTrainWorker pid=547)[0m Loading extension module fused_adam...
[36m(RayTrainWorker pid=548)[0m Using /root/.cache/torch_extensions/py310_cu121 as PyTorch extensions root...


[36m(RayTrainWorker pid=548)[0m [2025-01-20 03:04:24,427] [INFO] [utils.py:781:see_memory_usage] Stage 3 initialize beginning
[36m(RayTrainWorker pid=548)[0m [2025-01-20 03:04:24,428] [INFO] [utils.py:782:see_memory_usage] MA 0.33 GB         Max_MA 0.33 GB         CA 0.34 GB         Max_CA 0 GB 
[36m(RayTrainWorker pid=548)[0m [2025-01-20 03:04:24,429] [INFO] [utils.py:789:see_memory_usage] CPU Virtual Memory:  used = 5.26 GB, percent = 16.8%
[36m(RayTrainWorker pid=548)[0m [2025-01-20 03:04:24,430] [INFO] [stage3.py:168:__init__] Reduce bucket size 50000000
[36m(RayTrainWorker pid=548)[0m [2025-01-20 03:04:24,430] [INFO] [stage3.py:169:__init__] Prefetch bucket size 50000000
[36m(RayTrainWorker pid=548)[0m [2025-01-20 03:04:24,708] [INFO] [utils.py:781:see_memory_usage] DeepSpeedZeRoOffload initialize [begin]
[36m(RayTrainWorker pid=548)[0m [2025-01-20 03:04:24,709] [INFO] [utils.py:782:see_memory_usage] MA 0.33 GB         Max_MA 0.33 GB         CA 0.34 GB         Max_CA

[36m(SplitCoordinator pid=635)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-01-20_03-02-49_860434_18/logs/ray-data
[36m(SplitCoordinator pid=635)[0m Execution plan of Dataset: InputDataBuffer[Input] -> OutputSplitter[split(2, equal=True)]


(pid=635) Running 0: 0.00 row [00:00, ? row/s]

(pid=635) - split(2, equal=True) 1: 0.00 row [00:00, ? row/s]

[36m(RayTrainWorker pid=548)[0m [2025-01-20 03:04:29,126] [INFO] [loss_scaler.py:183:update_scale] [deepspeed] OVERFLOW! Rank 0 Skipping step. Attempted loss scale: 4294967296, reducing to 2147483648
[36m(RayTrainWorker pid=548)[0m [2025-01-20 03:04:29,126] [INFO] [stage3.py:2024:_loco_err_buf_update] update loco-zero++ error buffer with overflow: True
[36m(RayTrainWorker pid=548)[0m Time to load fused_adam op: 34.7537956237793 seconds
[36m(RayTrainWorker pid=548)[0m [2025-01-20 03:04:24,712] [INFO] [config.py:733:__init__] Config mesh_device None world_size = 2
[36m(RayTrainWorker pid=548)[0m [2025-01-20 03:04:29,951] [INFO] [loss_scaler.py:183:update_scale] [deepspeed] OVERFLOW! Rank 0 Skipping step. Attempted loss scale: 2147483648, reducing to 1073741824
[36m(RayTrainWorker pid=548)[0m [2025-01-20 03:04:29,952] [INFO] [stage3.py:2024:_loco_err_buf_update] update loco-zero++ error buffer with overflow: True
[36m(RayTrainWorker pid=548)[0m [2025-01-20 03:04:30,267] [INF

(pid=636) Running 0: 0.00 row [00:00, ? row/s]

(pid=636) - split(2, equal=True) 1: 0.00 row [00:00, ? row/s]

[36m(RayTrainWorker pid=548)[0m Loading extension module fused_adam...
[36m(SplitCoordinator pid=636)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-01-20_03-02-49_860434_18/logs/ray-data
[36m(SplitCoordinator pid=636)[0m Execution plan of Dataset: InputDataBuffer[Input] -> OutputSplitter[split(2, equal=True)]


== Status ==
Current time: 2025-01-20 03:06:37 (running for 00:03:21.32)
Using FIFO scheduling algorithm.
Logical resource usage: 4.0/4 CPUs, 2.0/2 GPUs (0.0/1.0 accelerator_type:T4)
Result logdir: /tmp/ray/session_2025-01-20_03-02-49_860434_18/artifacts/2025-01-20_03-03-01/TorchTrainer_2025-01-20_03-03-01/driver_artifacts
Number of trials: 1/1 (1 RUNNING)


== Status ==
Current time: 2025-01-20 03:06:42 (running for 00:03:26.35)
Using FIFO scheduling algorithm.
Logical resource usage: 4.0/4 CPUs, 2.0/2 GPUs (0.0/1.0 accelerator_type:T4)
Result logdir: /tmp/ray/session_2025-01-20_03-02-49_860434_18/artifacts/2025-01-20_03-03-01/TorchTrainer_2025-01-20_03-03-01/driver_artifacts
Number of trials: 1/1 (1 RUNNING)


[36m(RayTrainWorker pid=548)[0m Epoch 0: {'train_loss': 4.706414690884677, 'val_loss': 6.997663225446429, 'epoch': 0}
[36m(RayTrainWorker pid=547)[0m [2025-01-20 03:06:45,264] [INFO] [logging.py:128:log_dist] [Rank 1] Saving model checkpoint: /tmp/tmp8ygancw3/global_step330

[36m(RayTrainWorker pid=547)[0m Checkpoint successfully created at: Checkpoint(filesystem=local, path=/root/ray_results/TorchTrainer_2025-01-20_03-03-01/TorchTrainer_0f514_00000_0_2025-01-20_03-03-16/checkpoint_000000)
[36m(SplitCoordinator pid=635)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-01-20_03-02-49_860434_18/logs/ray-data
[36m(SplitCoordinator pid=635)[0m Execution plan of Dataset: InputDataBuffer[Input] -> OutputSplitter[split(2, equal=True)]


(pid=635) Running 0: 0.00 row [00:00, ? row/s]

(pid=635) - split(2, equal=True) 1: 0.00 row [00:00, ? row/s]

[36m(RayTrainWorker pid=548)[0m [2025-01-20 03:06:45,264] [INFO] [logging.py:128:log_dist] [Rank 0] Saving model checkpoint: /tmp/tmpe9502apj/global_step330/zero_pp_rank_0_mp_rank_00_model_states.pt
[36m(RayTrainWorker pid=548)[0m [2025-01-20 03:06:45,321] [INFO] [torch_checkpoint_engine.py:21:save] [Torch] Saving /tmp/tmpe9502apj/global_step330/zero_pp_rank_0_mp_rank_00_optim_states.pt...[32m [repeated 3x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/user-guides/configure-logging.html#log-deduplication for more options.)[0m
[36m(RayTrainWorker pid=548)[0m [2025-01-20 03:06:47,386] [INFO] [torch_checkpoint_engine.py:23:save] [Torch] Saved /tmp/tmpe9502apj/global_step330/zero_pp_rank_0_mp_rank_00_optim_states.pt.[32m [repeated 3x across cluster][0m
== Status ==
Current time: 2025-01-20 03:06:52 (running for 00:03:36.39)
Using FIFO scheduling algorithm.
Logical resourc

(pid=636) Running 0: 0.00 row [00:00, ? row/s]

(pid=636) - split(2, equal=True) 1: 0.00 row [00:00, ? row/s]

[36m(RayTrainWorker pid=548)[0m Checkpoint successfully created at: Checkpoint(filesystem=local, path=/root/ray_results/TorchTrainer_2025-01-20_03-03-01/TorchTrainer_0f514_00000_0_2025-01-20_03-03-16/checkpoint_000000)
[36m(SplitCoordinator pid=636)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-01-20_03-02-49_860434_18/logs/ray-data
[36m(SplitCoordinator pid=636)[0m Execution plan of Dataset: InputDataBuffer[Input] -> OutputSplitter[split(2, equal=True)]


== Status ==
Current time: 2025-01-20 03:08:58 (running for 00:05:42.15)
Using FIFO scheduling algorithm.
Logical resource usage: 4.0/4 CPUs, 2.0/2 GPUs (0.0/1.0 accelerator_type:T4)
Result logdir: /tmp/ray/session_2025-01-20_03-02-49_860434_18/artifacts/2025-01-20_03-03-01/TorchTrainer_2025-01-20_03-03-01/driver_artifacts
Number of trials: 1/1 (1 RUNNING)


== Status ==
Current time: 2025-01-20 03:09:03 (running for 00:05:47.18)
Using FIFO scheduling algorithm.
Logical resource usage: 4.0/4 CPUs, 2.0/2 GPUs (0.0/1.0 accelerator_type:T4)
Result logdir: /tmp/ray/session_2025-01-20_03-02-49_860434_18/artifacts/2025-01-20_03-03-01/TorchTrainer_2025-01-20_03-03-01/driver_artifacts
Number of trials: 1/1 (1 RUNNING)


== Status ==
Current time: 2025-01-20 03:09:08 (running for 00:05:52.21)
Using FIFO scheduling algorithm.
Logical resource usage: 4.0/4 CPUs, 2.0/2 GPUs (0.0/1.0 accelerator_type:T4)
Result logdir: /tmp/ray/session_2025-01-20_03-02-49_860434_18/artifacts/2025-01-20_03-03-01/Tor

[36m(RayTrainWorker pid=547)[0m Checkpoint successfully created at: Checkpoint(filesystem=local, path=/root/ray_results/TorchTrainer_2025-01-20_03-03-01/TorchTrainer_0f514_00000_0_2025-01-20_03-03-16/checkpoint_000001)
[36m(SplitCoordinator pid=635)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-01-20_03-02-49_860434_18/logs/ray-data
[36m(SplitCoordinator pid=635)[0m Execution plan of Dataset: InputDataBuffer[Input] -> OutputSplitter[split(2, equal=True)]


(pid=635) Running 0: 0.00 row [00:00, ? row/s]

(pid=635) - split(2, equal=True) 1: 0.00 row [00:00, ? row/s]

[36m(RayTrainWorker pid=548)[0m [2025-01-20 03:09:10,624] [INFO] [logging.py:128:log_dist] [Rank 0] Saving model checkpoint: /tmp/tmpseo7h9d6/global_step660/zero_pp_rank_0_mp_rank_00_model_states.pt
[36m(RayTrainWorker pid=548)[0m [2025-01-20 03:09:10,667] [INFO] [torch_checkpoint_engine.py:21:save] [Torch] Saving /tmp/tmpseo7h9d6/global_step660/zero_pp_rank_0_mp_rank_00_optim_states.pt...[32m [repeated 2x across cluster][0m
[36m(RayTrainWorker pid=548)[0m [2025-01-20 03:09:12,785] [INFO] [torch_checkpoint_engine.py:23:save] [Torch] Saved /tmp/tmpseo7h9d6/global_step660/zero_pp_rank_0_mp_rank_00_optim_states.pt.[32m [repeated 3x across cluster][0m
[36m(RayTrainWorker pid=548)[0m [2025-01-20 03:09:12,786] [INFO] [engine.py:3572:_save_zero_checkpoint] zero checkpoint saved /tmp/tmpseo7h9d6/global_step660/zero_pp_rank_0_mp_rank_00_optim_states.pt
[36m(RayTrainWorker pid=548)[0m [2025-01-20 03:09:12,798] [INFO] [torch_checkpoint_engine.py:33:commit] [Torch] Checkpoint global_

(pid=636) Running 0: 0.00 row [00:00, ? row/s]

(pid=636) - split(2, equal=True) 1: 0.00 row [00:00, ? row/s]

[36m(RayTrainWorker pid=548)[0m Checkpoint successfully created at: Checkpoint(filesystem=local, path=/root/ray_results/TorchTrainer_2025-01-20_03-03-01/TorchTrainer_0f514_00000_0_2025-01-20_03-03-16/checkpoint_000001)
[36m(SplitCoordinator pid=636)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-01-20_03-02-49_860434_18/logs/ray-data
[36m(SplitCoordinator pid=636)[0m Execution plan of Dataset: InputDataBuffer[Input] -> OutputSplitter[split(2, equal=True)]


== Status ==
Current time: 2025-01-20 03:11:24 (running for 00:08:08.01)
Using FIFO scheduling algorithm.
Logical resource usage: 4.0/4 CPUs, 2.0/2 GPUs (0.0/1.0 accelerator_type:T4)
Result logdir: /tmp/ray/session_2025-01-20_03-02-49_860434_18/artifacts/2025-01-20_03-03-01/TorchTrainer_2025-01-20_03-03-01/driver_artifacts
Number of trials: 1/1 (1 RUNNING)


== Status ==
Current time: 2025-01-20 03:11:29 (running for 00:08:13.04)
Using FIFO scheduling algorithm.
Logical resource usage: 4.0/4 CPUs, 2.0/2 GPUs (0.0/1.0 accelerator_type:T4)
Result logdir: /tmp/ray/session_2025-01-20_03-02-49_860434_18/artifacts/2025-01-20_03-03-01/TorchTrainer_2025-01-20_03-03-01/driver_artifacts
Number of trials: 1/1 (1 RUNNING)


== Status ==
Current time: 2025-01-20 03:11:34 (running for 00:08:18.07)
Using FIFO scheduling algorithm.
Logical resource usage: 4.0/4 CPUs, 2.0/2 GPUs (0.0/1.0 accelerator_type:T4)
Result logdir: /tmp/ray/session_2025-01-20_03-02-49_860434_18/artifacts/2025-01-20_03-03-01/Tor

[36m(RayTrainWorker pid=548)[0m Checkpoint successfully created at: Checkpoint(filesystem=local, path=/root/ray_results/TorchTrainer_2025-01-20_03-03-01/TorchTrainer_0f514_00000_0_2025-01-20_03-03-16/checkpoint_000002)
2025-01-20 03:11:41,990	INFO tune.py:1009 -- Wrote the latest version of all result files and experiment state to '/root/ray_results/TorchTrainer_2025-01-20_03-03-01' in 0.0086s.
2025-01-20 03:11:41,994	INFO tune.py:1041 -- Total run time: 520.51 seconds (505.82 seconds for the tuning loop).
[36m(RayTrainWorker pid=547)[0m Checkpoint successfully created at: Checkpoint(filesystem=local, path=/root/ray_results/TorchTrainer_2025-01-20_03-03-01/TorchTrainer_0f514_00000_0_2025-01-20_03-03-16/checkpoint_000002)


== Status ==
Current time: 2025-01-20 03:11:41 (running for 00:08:25.83)
Using FIFO scheduling algorithm.
Logical resource usage: 4.0/4 CPUs, 2.0/2 GPUs (0.0/1.0 accelerator_type:T4)
Result logdir: /tmp/ray/session_2025-01-20_03-02-49_860434_18/artifacts/2025-01-20_03-03-01/TorchTrainer_2025-01-20_03-03-01/driver_artifacts
Number of trials: 1/1 (1 TERMINATED)


[36m(RayTrainWorker pid=548)[0m [2025-01-20 03:11:36,281] [INFO] [logging.py:128:log_dist] [Rank 0] Saving model checkpoint: /tmp/tmplea7_9op/global_step990/zero_pp_rank_0_mp_rank_00_model_states.pt
[36m(RayTrainWorker pid=548)[0m [2025-01-20 03:11:36,325] [INFO] [torch_checkpoint_engine.py:21:save] [Torch] Saving /tmp/tmplea7_9op/global_step990/zero_pp_rank_0_mp_rank_00_optim_states.pt...[32m [repeated 2x across cluster][0m
[36m(RayTrainWorker pid=548)[0m [2025-01-20 03:11:38,383] [INFO] [torch_checkpoint_engine.py:23:save] [Torch] Saved /tmp/tmplea7_9op/global_step990/zero_pp_rank_0_mp_rank_00_optim_states.pt.[32m [re