In [61]:
%%writefile ddp.py

import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp

from torch.nn.parallel import DistributedDataParallel as DDP
import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp
import pandas as pd
from torch.nn.parallel import DistributedDataParallel as DDP
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import numpy as np
from sklearn.preprocessing import LabelEncoder
# On Windows platform, the torch.distributed package only
# supports Gloo backend, FileStore and TcpStore.
# For FileStore, set init_method parameter in init_process_group
# to a local file. Example as follow:
# init_method="file:///f:/libtmp/some_file"
# dist.init_process_group(
#    "gloo",
#    rank=rank,
#    init_method=init_method,
#    world_size=world_size)
# For TcpStore, same way as on Linux.

def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '5554'

    # initialize the process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()
    
class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(5, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 3)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))


def demo_basic(rank, world_size):
    print(f"Running basic DDP example on rank {rank}.")
    setup(rank, world_size)

    # create model and move it to GPU with id rank
    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()


def run_demo(demo_fn, world_size):
    mp.spawn(demo_fn,
             args=(world_size,),
             nprocs=world_size,
             join=True)



class ToyMpModel(nn.Module):
    def __init__(self, dev0, dev1):
        super(ToyMpModel, self).__init__()
        self.dev0 = dev0
        self.dev1 = dev1
        self.net1 = torch.nn.Linear(5, 10).to(dev0)
        self.relu = torch.nn.ReLU()
        self.net2 = torch.nn.Linear(10, 3).to(dev1)

    def forward(self, x):
        x = x.to(self.dev0)
        x = self.relu(self.net1(x))
        x = x.to(self.dev1)
        return self.net2(x)

def demo_model_parallel(rank, world_size):
    print(f"Running DDP with model parallel example on rank {rank}.")
    setup(rank, world_size)

    # setup mp_model and devices for this process
    dev0 = (rank * 2) % world_size
    dev1 = (rank * 2 + 1) % world_size
    mp_model = ToyMpModel(dev0, dev1)
    ddp_mp_model = DDP(mp_model)

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_mp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    # outputs will be on dev1
    outputs = ddp_mp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(dev1)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()

def train_on_csv(rank, world_size, csv_file):
    print(f"Running DDP training on rank {rank}.")
    setup(rank, world_size)

    # Load dataset from CSV file
    df = pd.read_csv(csv_file)

    # Assuming the last column is the target and all others are features
    X = df.iloc[:, :-1].values
    y = df.iloc[:, -1].values
    
    num_classes = len(np.unique(y))  # Or your label encoder
    
    X = X.astype(np.float32)  # Convert features to float32
    
    label_encoder = LabelEncoder()
    y = label_encoder.fit_transform(y)
    y = y.astype(np.int64)    # Convert labels to int64
    
    num_classes = len(np.unique(y))  # Or your label encoder
    print("Number of classes:", num_classes)

    # Split data into train/test
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    #print(X_train)

    # Scale features
    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)

    # Create a dataset and DataLoader
    train_dataset = torch.utils.data.TensorDataset(torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.long))
    train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset, num_replicas=world_size, rank=rank, shuffle=True)
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=8, sampler=train_sampler)

    # Initialize model, loss, and optimizer
    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])
    loss_fn = nn.CrossEntropyLoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    # Training loop
    for epoch in range(20):
        train_sampler.set_epoch(epoch)
        for batch in train_loader:
            X_batch, y_batch = batch
            X_batch, y_batch = X_batch.to(rank), y_batch.to(rank)

            optimizer.zero_grad()
            outputs = ddp_model(X_batch)
            #print("Output- ",outputs)
            #print("y- ",y_batch)
            loss = loss_fn(outputs, y_batch)
            loss.backward()
            optimizer.step()

        print(f"Rank {rank}, Epoch {epoch+1}, Loss: {loss.item()}")
    
    if rank == 0:
        model_save_path = "ddp_model.pth"
        torch.save(ddp_model.state_dict(), model_save_path)
        print(f"Model saved at {model_save_path}")
    

    cleanup()
    
def run_demo(demo_fn, world_size, csv_file):
    mp.spawn(demo_fn,
             args=(world_size, csv_file),
             nprocs=world_size,
             join=True)

if __name__ == "__main__":
    n_gpus = torch.cuda.device_count()
    print(f"total GPUs: {n_gpus}")
    assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}"
    world_size = n_gpus
    csv_file = "Iris.csv"  # Update this path to your CSV file

    run_demo(train_on_csv, world_size, csv_file)

Overwriting ddp.py


In [62]:
!python ddp.py

total GPUs: 2
Running DDP training on rank 1.
Running DDP training on rank 0.
Number of classes: 3
Number of classes: 3
Rank 1, Epoch 1, Loss: 0.9878944754600525
Rank 0, Epoch 1, Loss: 0.9715988039970398
Rank 0, Epoch 2, Loss: 1.0844911336898804
Rank 1, Epoch 2, Loss: 1.1329790353775024
Rank 1, Epoch 3, Loss: 1.2353006601333618
Rank 0, Epoch 3, Loss: 1.0441750288009644
Rank 0, Epoch 4, Loss: 1.185050368309021
Rank 1, Epoch 4, Loss: 1.100276231765747
Rank 1, Epoch 5, Loss: 1.0763195753097534
Rank 0, Epoch 5, Loss: 1.0393908023834229
Rank 0, Epoch 6, Loss: 1.0644296407699585
Rank 1, Epoch 6, Loss: 0.9357036352157593
Rank 0, Epoch 7, Loss: 1.2093515396118164
Rank 1, Epoch 7, Loss: 0.9351874589920044
Rank 1, Epoch 8, Loss: 1.035475254058838
Rank 0, Epoch 8, Loss: 0.9917013049125671
Rank 0, Epoch 9, Loss: 0.8907645344734192
Rank 1, Epoch 9, Loss: 1.1606793403625488
Rank 1, Epoch 10, Loss: 1.1588400602340698
Rank 0, Epoch 10, Loss: 1.0529487133026123
Rank 1, Epoch 11, Loss: 1.097440719604492

In [63]:
class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(5, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 3)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize the model and load the saved state dict
model = ToyModel().to(device)

# Load the saved state_dict
state_dict = torch.load("ddp_model.pth", map_location=device)

# Remove 'module.' prefix from the keys in the state_dict if present
state_dict = {key.replace('module.', ''): value for key, value in state_dict.items()}

# Load the corrected state_dict into the model
model.load_state_dict(state_dict)

# Set the model to evaluation mode
model.eval()

# Example usage (e.g., for inference)
input_data = torch.randn(1, 5).to(device)  # Example input for the model
output = model(input_data)
print(output)

tensor([[ 0.1471, -0.2529,  0.5504]], device='cuda:0',
       grad_fn=<AddmmBackward0>)
