In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from tqdm import tqdm
from Experts import BasicExpert
from Routers import HardRouter

In [None]:
class SparseMOE(nn.Module):
    def __init__(self, in_features, out_features, num_experts, k, balance_weight):
        super(SparseMOE, self).__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.num_experts = num_experts
        self.k = k
        self.balance_weight = balance_weight # load balancing for experts
        self.router = HardRouter(in_features, k, num_experts, self.gate)
        self.experts = nn.ModuleList([BasicExpert(in_features, out_features) for _ in range(num_experts)])


    def forward(self, x):
        device = x.device
        batch_size = x.size(0)
        output = torch.zeros(batch_size, self.out_features, device=device)
        expert_usage = torch.zeros(self.num_experts, device=device)
        # batch-wise calculation
        for i in range(batch_size):
            expert_indices, expert_weights = self.router(x[i])
            for j in range(len(expert_indices)):
                expert_idx = expert_indices[j]
                output = output + expert_weights[j] * self.experts[expert_indices[j]](x[i].to(device))
                expert_usage[expert_idx] += 1 # calculate number of usage times

        balance_loss = self.compute_balance_loss(expert_usage, batch_size)
        return output, balance_loss
    
    def compute_balance_loss(self, expert_usage, batch_size):
        expert_freq = expert_usage / batch_size
        # load balancing loss using variance
        balance_loss = torch.var(expert_freq) * self.balance_weight
        return balance_loss

In [None]:
class testModel(nn.Module):
    def __init__(self):
        super(testModel, self).__init__()
        self.fc1 = nn.Linear(32*32*3, 128)
        self.moe = SparseMOE(128, 32, 8, 2, 0.01)
        self.fc2 = nn.Linear(32, 10)
    
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x, balance_loss = self.moe(x)
        x = self.fc2(x)
        return x, balance_loss

In [None]:
# 数据预处理
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  # 归一化
])

# 加载训练集和测试集
train_dataset = datasets.CIFAR10(root='/work/datasets/CIFAR10', train=True, transform=transform)
test_dataset = datasets.CIFAR10(root='/work/datasets/CIFAR10', train=False, transform=transform)

# 创建数据加载器
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True, num_workers=3)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False, num_workers=3)

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
model = testModel()
model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
num_epochs = 50

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    running_balance_loss = 0.0
    
    train_loader_tqdm = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}", leave=False)
    
    for inputs, labels in train_loader_tqdm:
        inputs, labels = inputs.to(device), labels.to(device)
        inputs = inputs.view(inputs.size(0), -1)
        
        
        optimizer.zero_grad()
        outputs, balance_loss = model(inputs)
        main_loss = criterion(outputs, labels)
        total_loss = main_loss + balance_loss

        total_loss.backward()
        optimizer.step()
        
        running_loss += main_loss.item()
        running_balance_loss += balance_loss.item()
        
        train_loader_tqdm.set_postfix({
            "Loss": running_loss / (train_loader_tqdm.n + 1),
            "Balance Loss": running_balance_loss / (train_loader_tqdm.n + 1)
        })
    
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}, Balance Loss: {running_balance_loss/len(train_loader):.4f}")

In [None]:
model.eval()
correct = 0
total = 0
test_loss = 0
test_balance_loss = 0

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        inputs = inputs.view(inputs.size(0), -1)
        
        outputs, balance_loss = model(inputs)
        main_loss = criterion(outputs, labels)
        
        test_loss += main_loss.item()
        test_balance_loss += balance_loss.item()
        
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()

test_loss = test_loss / len(test_loader)
test_balance_loss = test_balance_loss / len(test_loader)
accuracy = 100. * correct / total

print(f'Test Loss: {test_loss:.4f} | Test Balance Loss: {test_balance_loss:.4f} | Accuracy: {accuracy:.2f}%')