In [69]:
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import torchvision
from torchvision import datasets, transforms
import random
import torch.nn.functional as F
import matplotlib.pyplot as plt
from collections import deque
import torch.amp as amp
from sklearn.random_projection import GaussianRandomProjection
from sklearn.decomposition import PCA

# ✅ GPU 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def get_dataloader(batch_size=32, train = True):
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.5,), (0.5,))
    ])
    
    if train:
        # MNIST 데이터셋 로드
        mnist_dataset = datasets.MNIST(root='./data', train=True, transform=transform, download=True)
        data_loader = DataLoader(mnist_dataset, batch_size=len(mnist_dataset), shuffle=False)
    else:
        # MNIST 데이터셋 로드
        test_dataset = datasets.MNIST(root='./data', train=False, transform=transform, download=True)
        data_loader = DataLoader(test_dataset, batch_size=len(test_dataset), shuffle=False)
    
    # 전체 데이터셋 가져오기
    data_iter = iter(data_loader)
    images, labels = next(data_iter)
    
    # 2D로 변환 (batch_size, 28*28)
    images = images.view(images.size(0), -1).numpy()
    
    # Random Projection 적용 (784 -> 48)
    rp = GaussianRandomProjection(n_components=48)
    images_reduced = rp.fit_transform(images)
    
    # Tensor 변환
    images_reduced = torch.tensor(images_reduced, dtype=torch.float32)
    labels = labels.clone().detach().long()
    
    # 새로운 DataLoader 생성
    dataset = TensorDataset(images_reduced, labels)
    data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    
    return data_loader

class MLP(nn.Module):
    def __init__(self, input_dim= 48, hidden_dim= 48, output_dim=10):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_dim, output_dim)
    
    def forward(self, x):
        with amp.autocast(device_type=str(device)):    
            x = self.fc1(x)
            x = self.relu(x)
            x = self.fc2(x)
        return x

testloader = get_dataloader(batch_size=128, train=False)

# 모델 평가 함수
def evaluate_model(model, testloader):
    model.eval()  # 평가 모드 활성화
    correct = 0
    total = 0

    with torch.no_grad():  # 그래디언트 계산 비활성화
        for images, labels in testloader:
            images, labels = images.view(images.size(0), -1).to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)  # 가장 높은 확률의 클래스 선택
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    print(f"Test Accuracy: {accuracy:.2f}%")

In [70]:
class OptimizerEnv(gym.Env):
    def __init__(self, device):
        super(OptimizerEnv, self).__init__()
        self.device = device
        self.train_loader = get_dataloader(batch_size=128, train=True)
        
        self.action_space = gym.spaces.Discrete(5)
        self.observation_space = gym.spaces.Box(low=-np.inf, high=np.inf, shape=(202,), dtype=np.float32)

        # 2층 MLP 모델
        self.model = MLP().to(self.device)

        self.criterion = nn.CrossEntropyLoss()
        self.episode_step = 0
        self.init_loss = None   
        self.optimizer = None  
        self.smooth_loss = None
        self.loss_history = []
        self.rp_w = GaussianRandomProjection(n_components=100)
        self.rp_g = GaussianRandomProjection(n_components=100)
        # 🔥 AMP GradScaler 추가
        self.scaler = amp.GradScaler()
        # 🔥 데이터 로더 이터레이터 설정 (배치 단위 처리를 위해)
        self.train_loader_iter = iter(self.train_loader)
    
    # def step(self, action):
        # self._set_optimizer(action)
        """🔥 배치 단위로 옵티마이저 선택"""
    def step(self):     
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)
        # self.optimizer = optim.RMSprop(self.model.parameters(), lr=0.001)
        # self.optimizer = optim.Adagrad(self.model.parameters(), lr=0.001)
        # self.optimizer = optim.SGD(self.model.parameters(), lr=0.001)
        # self.optimizer = optim.SGD(self.model.parameters(), lr=0.001, momentum=0.9)
        try:
            images, labels = next(self.train_loader_iter)  # 🔥 다음 배치 가져오기
        except StopIteration:
            # 🔥 배치 다 돌면 새로운 epoch 시작
            self.train_loader_iter = iter(self.train_loader)
            images, labels = next(self.train_loader_iter)

        images, labels = images.view(images.size(0), -1).to(self.device), labels.to(self.device)

        # ✅ 자동 혼합 정밀도 적용
        with amp.autocast(device_type=str(self.device)):
            outputs = self.model(images)
            loss = self.criterion(outputs, labels)

        loss_value = loss.item()
        self.loss_history.append(loss_value)
        moving_avg_loss = np.mean(self.loss_history) if self.loss_history else 0

        self.optimizer.zero_grad()

        # ✅ Scaler를 사용하여 역전파
        self.scaler.scale(loss).backward()
        self.scaler.step(self.optimizer)
        self.scaler.update()
        
        # 🔥 MLP 모델의 가중치를 벡터로 변환
        model_params = torch.cat([p.flatten() for p in self.model.parameters()]).detach().cpu().numpy()

        # 🔥 MLP 모델의 그라디언트를 벡터로 변환 (없으면 0으로 채움)
        model_grads = []
        for p in self.model.parameters():
            if p.grad is not None:
                model_grads.append(p.grad.flatten())
            else:
                model_grads.append(torch.zeros_like(p.flatten()))  # 🔥 None이면 0으로 채움
        model_grads = torch.cat(model_grads).detach().cpu().numpy()
        model_grads = np.nan_to_num(model_grads, nan=0.0, posinf=0.0, neginf=0.0)

        # 🔥 첫 step에서 Random Projection 학습
        if self.episode_step == 0:
            self.rp_w.fit(model_params.reshape(1, -1))
            self.rp_g.fit(model_grads.reshape(1, -1))

        # 🔥 가중치 및 그라디언트를 Random Projection을 통해 100차원으로 축소
        reduced_params = self.rp_w.transform(model_params.reshape(1, -1)).flatten()
        reduced_grads = self.rp_g.transform(model_grads.reshape(1, -1)).flatten()

        

        # 🔥 state에 축소된 모델 가중치 및 그라디언트 추가
        state = np.concatenate(([self.episode_step, moving_avg_loss], reduced_params, reduced_grads)).astype(np.float32)
        self.episode_step += 1

        if self.init_loss is None:
            self.init_loss = loss_value # 🔥 초기 손실값 저장

        reward = -np.log(loss_value / self.init_loss) / self.episode_step - 1 if self.episode_step >= 2 else 0 # 🔥 보상 계산
  
        done = self.episode_step >= 10 * len(self.train_loader)  # 🔥 전체 배치 수 고려한 종료 조건
        
        return state, reward, done, {}

    def reset(self):
        """환경 초기화"""
        self.episode_step = 0
        self.model.apply(self._reset_weights)
        self.train_loader_iter = iter(self.train_loader)

        # 🔥 초기 가중치 및 그라디언트를 0으로 설정 (완전 초기화)
        reduced_params = np.zeros(100, dtype=np.float32)  # 가중치 초기값을 0으로 설정
        reduced_grads = np.zeros(100, dtype=np.float32)  # 초기에는 그라디언트 없음

        # 🔥 state에 축소된 가중치와 초기 그라디언트 포함
        state = np.concatenate(([self.episode_step, 0.0], reduced_params, reduced_grads)).astype(np.float32)
        return state


    def _set_optimizer(self, action):
        """매 배치마다 옵티마이저 변경"""
        optimizers = [
            optim.Adam(self.model.parameters(), lr=0.001),
            optim.RMSprop(self.model.parameters(), lr=0.001),
            optim.Adagrad(self.model.parameters(), lr=0.001),
            optim.SGD(self.model.parameters(), lr=0.001),
            optim.SGD(self.model.parameters(), lr=0.001, momentum=0.9)
        ]
        self.optimizer = optimizers[action]
    
    def _reset_weights(self, m):
        """모델 가중치 초기화"""
        if isinstance(m, nn.Linear):
            m.reset_parameters()

In [71]:
class DQNAgent:
    def __init__(self, state_size, action_size, device):
        self.state_size = state_size
        self.action_size = action_size
        self.device = device

        self.epsilon = 1.0  
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.99
        self.gamma = 0.9
        self.lr = 0.001
        
        self.model = nn.Sequential(
            nn.Linear(state_size, 24),
            nn.ReLU(),
            nn.Linear(24, 24),
            nn.ReLU(),
            nn.Linear(24, action_size)
        ).to(self.device)

        self.optimizer = optim.Adam(self.model.parameters(), lr=self.lr)
        self.memory = []
        self.memory_size = 5000  # 🔥 경험 저장 크기 설정
        self.memory = deque(maxlen=self.memory_size)  # 🔥 가장 오래된 데이터를 자동 삭제

        self.scaler = amp.GradScaler()

    def act(self, state):
        """ε-greedy 정책으로 행동 선택"""
        if random.random() < self.epsilon:
            return random.randint(0, self.action_size - 1)
        else:
            state = torch.FloatTensor(state).unsqueeze(0).to(self.device)
            with torch.no_grad():
                return torch.argmax(self.model(state)).item()

    def remember(self, state, action, reward, next_state, done):
        """경험 저장"""
        self.memory.append((state, action, reward, next_state, done))

    def train(self, batch_size=128):
        """DQN 학습"""
        if len(self.memory) < batch_size:
            return
        
        batch = random.sample(self.memory, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)

        states = torch.FloatTensor(np.array(states)).to(self.device)
        actions = torch.LongTensor(actions).unsqueeze(1).to(self.device)
        rewards = torch.FloatTensor(rewards).unsqueeze(1).to(self.device)
        next_states = torch.FloatTensor(np.array(next_states)).to(self.device)
        dones = torch.FloatTensor(dones).unsqueeze(1).to(self.device)

        with amp.autocast(device_type=str(self.device)):
            q_values = self.model(states).gather(1, actions)
            next_q_values = self.model(next_states).max(1, keepdim=True)[0]
            target_q_values = rewards + (1 - dones) * self.gamma * next_q_values

            loss = F.mse_loss(q_values, target_q_values.detach())

        # ✅ Scaler 적용
        self.scaler.scale(loss).backward()
        self.scaler.step(self.optimizer)
        self.scaler.update()

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay


In [72]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
env = OptimizerEnv(device)
agent = DQNAgent(state_size=202, action_size=5, device=device)

loss_history = []
num_episodes = 10

# 🔥 옵티마이저 선택 횟수 기록
optimizer_names = ["Adam", "RMSprop", "Adagrad", "SGD", "Momentum"]

reward_history = []  # 🔥 각 episode의 총 reward 저장 리스트

for episode in range(num_episodes):
    state = env.reset()
    done = False
    episode_losses = []
    episode_trajectory = []
    total_reward = 0  # 🔥 각 episode의 총 reward 초기화
    
    optimizer_count = {i: 0 for i in range(5)}  # 🔥 옵티마이저 선택 횟수 초기화

    while not done:
        action = agent.act(state)  # 🔥 매 batch마다 옵티마이저 선택
        optimizer_count[action] += 1  # 🔥 선택된 옵티마이저 카운트 증가
        # next_state, reward, done, _ = env.step(action)
        next_state, reward, done, _ = env.step()

        total_reward += reward  # 🔥 reward를 episode 단위로 누적
        episode_losses.append(state[1])  
        episode_trajectory.append((state, action, reward, next_state, done))

        state = next_state
    
    for transition in episode_trajectory:
        agent.remember(*transition)

    avg_loss = sum(episode_losses) / len(episode_losses)
    loss_history.append(avg_loss)
    reward_history.append(total_reward)  # 🔥 episode 종료 후 총 reward 저장

    print(f"Episode {episode+1}, Avg Loss: {avg_loss}, Total Reward: {total_reward}")
    print(f"Optimizer Counts: {', '.join([f'{optimizer_names[i]}: {optimizer_count[i]}' for i in range(5)])}")

    model = env.model
    # 모델 정확성 평가
    evaluate_model(model, testloader)

# plt.plot(loss_history, label="L2O Optimizer Loss")
# plt.xlabel("Episode")
# plt.ylabel("Loss")
# plt.title("Loss Reduction Over Training Episodes")
# plt.legend()
# plt.grid()
# plt.show()

Episode 1, Avg Loss: 0.5890555665182915, Total Reward: -4681.4114604612305
Optimizer Counts: Adam: 890, RMSprop: 994, Adagrad: 896, SGD: 962, Momentum: 948
Test Accuracy: 10.66%
Episode 2, Avg Loss: 0.4316786947344412, Total Reward: -4681.622147729182
Optimizer Counts: Adam: 969, RMSprop: 947, Adagrad: 926, SGD: 935, Momentum: 913
Test Accuracy: 12.14%
Episode 3, Avg Loss: 0.4194138993141748, Total Reward: -4681.3503441829835
Optimizer Counts: Adam: 921, RMSprop: 904, Adagrad: 965, SGD: 948, Momentum: 952
Test Accuracy: 10.40%
Episode 4, Avg Loss: 0.4157713455559094, Total Reward: -4681.542719725404
Optimizer Counts: Adam: 910, RMSprop: 947, Adagrad: 944, SGD: 949, Momentum: 940
Test Accuracy: 10.33%


KeyboardInterrupt: 

reward 수정
- moving average 방식
- 배치단위 리워드 -> 에폭 단위 리워드로 변경?
state 수정
- 파라미터 값 추가
loss 설정의 문제?

학습이 잘 되고있음을 확인할 수 있는 Loss, Reward 지표 수정

1개 선택 -- 각각 episode 1개 10번씩 정확도 확인
- adam,     94.66%, 95.39%, 96.23%, 95.77%, 96.36%, 95.60%, 96.50%, 95.00%, 96.09%, 95.57%  >>  95.717%
- RMSprop,  93.47%, 92.14%, 95.06%, 92.66%, 95.20%, 94.15%, 94.18%, 91.92%, 91.90%, 93.80%  >>  93.448%
- Adagrad,  95.51%, 96.14%, 95.67%, 95.64%, 95.97%, 96.06%, 96.28%, 96.00%, 95.54%, 95.04%  >>  95.785%
- SGD,      87.30%, 87.15%, 87.48%, 87.30%, 87.41%, 87.51%, 87.64%, 87.37%, 87.35%, 87.04%  >>  87.355%
- Momentum, 87.47%, 87.37%, 87.40%, 87.15%, 87.13%, 87.16%, 87.50%, 87.23%, 87.29%, 87.26%  >>  87.296%

5개 선택
- 1:40, 92.82%
- Episode 1, Avg Loss: 1.3594644563784921, Total Reward: -3.892620003062391
- Optimizer Counts: Adam: 928, RMSprop: 908, Adagrad: 929, SGD: 989, Momentum: 936

In [None]:
torch.save(model.state_dict(), "models/L20_Learned_MLP.pth")

RuntimeError: Parent directory models does not exist.

In [None]:
# 🔥 학습이 끝난 후 모델과 옵티마이저 저장
torch.save({
    'model_state_dict': agent.model.state_dict(),
    'optimizer_state_dict': agent.optimizer.state_dict(),
    'epsilon': agent.epsilon  # Epsilon 값도 저장 (탐색 비율 유지)
}, "dqn_agent.pth")

print("✅ 학습된 DQN 에이전트 저장 완료!")

def load_agent(agent, checkpoint_path="dqn_agent.pth"):
    checkpoint = torch.load(checkpoint_path, map_location=agent.device)
    agent.model.load_state_dict(checkpoint['model_state_dict'])
    agent.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    agent.epsilon = checkpoint['epsilon']  # 저장된 탐색 비율 로드
    print("✅ 저장된 DQN 에이전트 로드 완료!")

    return agent

✅ 학습된 DQN 에이전트 저장 완료!


In [None]:
# 🔥 새로운 환경 생성
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
env = OptimizerEnv(device)

# 🔥 새로운 DQNAgent 생성 (같은 구조로 초기화)
agent = DQNAgent(state_size=2, action_size=5, device=device)

# 🔥 저장된 모델 불러오기
agent = load_agent(agent)

# 🔥 모델 평가 (탐색 없이 greedy 행동)
state = env.reset()
done = False
total_reward = 0

while not done:
    action = torch.argmax(agent.model(torch.FloatTensor(state).unsqueeze(0).to(agent.device))).item()
    next_state, reward, done, _ = env.step(action)
    total_reward += reward
    state = next_state

print(f"🎯 평가 완료! 총 보상(Total Reward): {total_reward}")

✅ 저장된 DQN 에이전트 로드 완료!


  checkpoint = torch.load(checkpoint_path, map_location=agent.device)


TypeError: OptimizerEnv.step() takes 1 positional argument but 2 were given