In [None]:
"""
    정리
    1. agent를 통해서 policy(랜덤값이 앱실론보다 작으면 랜덤 인덱스와 랜덤 action을, 
                            그렇지 않으면 내부 신경망(LSTM)에 state값을 넣어 
                            가능한 모든 action에 대한 action_values(Q_values) 중 가장 큰 값에 해당하는 인덱스와 그 값을 반환한다.
                            이때 사용되는 내부 신경망은 main_network이다.)
        에 맞는 action_index와 action_value를 찾아낸다.
    2. 찾아낸 action_index와 action_value를 env에 넣어서 RF 모델을 돌린 뒤 next_state(다음 action 집합)와 reward(정확도)를 찾아낸다.
    3. memory(replay buffer)에 현재 state와 action_index, 그리고 위에서 도출된 next_state, reward를 넣는다.
    4. replay buffer에서 원하는 batch size만큼 sample(state, action_index, reward, next_state)을 뽑아낸다.
    5. sample의 state를 main_network에 넣어서 도출된 값들 중 action index에 맞는 값을 뽑고 이를 main Q 값으로 한다.
    6. sample의 next_state를 target_network에 넣어서 값을 도출한 뒤, 그 중 가장 큰 값을 찾아낸다.
    7. 위에서 찾아낸 값을 Q라고 한다면, reward + gamma(discount factor)*Q = target Q로 정한다.
    8. mainQ와 targetQ를 loss function에 넣어서 내부 신경망 파라미터를 역전파로 업데이트 한다. 이때 옵티마이저도 사용된다.
    9. 이 과정을 반복하여 main Q가 target Q에 가까워질 수 있게 한다.
    
"""

In [1]:
import gym

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split

class h2y2_RF_Model():
    def __init__(self, action):
        self.parameter = action
        self.model = RandomForestClassifier(self.parameter)
        self.data = 
        self.train_x, self.test_x, self.train_y, self.test_y = train_test_split(x_data, y_data, test_size=0.3, random_state=42)
        
    def evaluate(self):
        self.model.fit(self.train_x, self.train_y)
        predict = self.model.predict(self.test_x)
        return accuracy_score(self.test_y, predict)
    

In [None]:
class H2Y2_env():
    def __init__():
        "initial"
        
    def reset():
        "env의 초기 state 설정"
        
    def step(self, action_index, action_value):
        "action을 넣어서 next_state와 reward를 반환하는 함수"
        done = 0
        rf_model = h2y2_RF_Model(action_value)
        reward = rf_model.evaluate()
        if reward == 1:
            done = 1
        return next_state, reward, done
        

In [None]:
class H2Y2_Agent():
    def __init__():
        self.main_network = Network(state_size,action_size).float().to(device)
        self.target_network = Network(state_size,action_size).float().to(device)
        self.memory = ReplayBuffer()
    
    def select_action(self, state, eps=0.):
        "내부의 신경망에 state를 넣어 모든 action_value에 대한 값을 뽑고, argmax로 선택"

        state = torch.from_numpy(state).float().unsqueeze(0).to(device)
        self.main_network.eval()
        with torch.no_grad(): # 연산속도 증가
            action_values = self.main_network(state)
        self.main_network.train()
    
        # action selection
        if random.random() > eps:
            max_index = torch.argmax(action_values.cpu().data.numpy())
            return max_index, action_values.gather(1, max_index)
        else:
            random_index = random.choice(np.arange(self.action_size))
            return random_index, action_values.gather(1, random_index)
            
    def step():
        self.memory.add(state, action, reward, next_state, done)
        
        # Learn every UPDATE_EVERY time steps.
        self.t_step = (self.t_step + 1) % self.hyperparameter['target_update']
        if self.t_step == 0:
            # If enough samples are available in memory, get random subset and learn
            if len(self.memory) > self.hyperparameter['batch_size']:
                experiences = self.memory.sample()
                self.learn(experiences, GAMMA)
                
    def learn(self, experiences, gamma):
        """Update value parameters using given batch of experience tuples.

        Params
        ======
            experiences (Tuple[torch.Tensor]): tuple of (s, a, r, s', done) tuples 
            gamma (float): discount factor
        """
        states, actions, rewards, next_states, dones = experiences

        # Get max predicted Q values (for next states) from target model
        Q_targets_next = self.target_network(next_states).detach().max(1)[0].unsqueeze(1)
        # Compute Q targets for current states 
        Q_targets = rewards + (gamma * Q_targets_next * (1 - dones))

        # Get expected Q values from local model
        Q_expected = self.main_network(states).gather(1, actions)

        # Compute loss
        loss = F.mse_loss(Q_expected, Q_targets)
        # Minimize the loss
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # ------------------- update target network ------------------- #
        self.soft_update(self.qnetwork_local, self.qnetwork_target, TAU)  

In [None]:
class ReplayBuffer:

    def __init__(self, action_size, buffer_size, batch_size, seed):
        """Initialize a ReplayBuffer object.

        Params
        ======
            action_size (int): dimension of each action
            buffer_size (int): maximum size of buffer
            batch_size (int): size of each training batch
            seed (int): random seed
        """
        self.action_size = action_size
        self.memory = deque(maxlen=buffer_size)  
        self.batch_size = batch_size
        self.experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"])
        self.seed = random.seed(seed)
    
    def add(self, state, action, reward, next_state, done):
        e = self.experience(state, action, reward, next_state, done)
        self.memory.append(e)
    
    def sample(self):
        experiences = random.sample(self.memory, k=self.batch_size)

        states = torch.from_numpy(np.vstack([e.state for e in experiences if e is not None])).float().to(device)
        actions = torch.from_numpy(np.vstack([e.action for e in experiences if e is not None])).long().to(device)
        rewards = torch.from_numpy(np.vstack([e.reward for e in experiences if e is not None])).float().to(device)
        next_states = torch.from_numpy(np.vstack([e.next_state for e in experiences if e is not None])).float().to(device)
        dones = torch.from_numpy(np.vstack([e.done for e in experiences if e is not None]).astype(np.uint8)).float().to(device)
  
        return (states, actions, rewards, next_states, dones)

    def __len__(self):
        return len(self.memory)

In [None]:
class Network(nn.Module):
    
    def __init__(self,input_size,out_size):
        super(Network,self).__init__()
        self.input_size = input_size
        self.out_size = out_size
        
        self.conv_layer1 = nn.Conv2d(in_channels=1,out_channels=32,kernel_size=8,stride=4) # potential check - in_channels
        self.conv_layer2 = nn.Conv2d(in_channels=32,out_channels=64,kernel_size=4,stride=2)
        self.conv_layer3 = nn.Conv2d(in_channels=64,out_channels=64,kernel_size=3,stride=1)
        self.conv_layer4 = nn.Conv2d(in_channels=64,out_channels=512,kernel_size=7,stride=1)
        self.lstm_layer = nn.LSTM(input_size=512,hidden_size=512,num_layers=1,batch_first=True)
        self.adv = nn.Linear(in_features=512,out_features=self.out_size)
        self.val = nn.Linear(in_features=512,out_features=1)
        self.relu = nn.ReLU()
        
    def forward(self,x,bsize,time_step,hidden_state,cell_state):
        x = x.view(bsize*time_step,1,self.input_size,self.input_size)
        
        conv_out = self.conv_layer1(x)
        conv_out = self.relu(conv_out)
        conv_out = self.conv_layer2(conv_out)
        conv_out = self.relu(conv_out)
        conv_out = self.conv_layer3(conv_out)
        conv_out = self.relu(conv_out)
        conv_out = self.conv_layer4(conv_out)
        conv_out = self.relu(conv_out)
        
        conv_out = conv_out.view(bsize,time_step,512)
        
        lstm_out = self.lstm_layer(conv_out,(hidden_state,cell_state))
        out = lstm_out[0][:,time_step-1,:]
        h_n = lstm_out[1][0]
        c_n = lstm_out[1][1]
        
        adv_out = self.adv(out)
        val_out = self.val(out)
        
        qout = val_out.expand(bsize,self.out_size) + (adv_out - adv_out.mean(dim=1).unsqueeze(dim=1).expand(bsize,self.out_size))
        
        return qout, (h_n,c_n)
    
    def init_hidden_states(self,bsize):
        h = torch.zeros(1,bsize,512).float().to(device)
        c = torch.zeros(1,bsize,512).float().to(device)
        
        return h,c

In [1]:
class H2Y2():
    def __init__():
        self.agent = H2Y2_Agent()
        self.env = H2Y2_env()
        
    def dqn(self, n_episodes=2000, max_t=1000, eps_end=0.01, eps_decay=0.995):
            
        for i_episode in range(1, n_episodes+1):
            state = self.env.reset()
            score = 0
            for t in range(max_t):
                action_index, action_value = self.agent.select_action(state, self.eps)
                next_state, reward, done = self.env.step(action_index, action_value)
                self.agent.step(state, action_index, reward, next_state, done)
                state = next_state
                score += reward
                if done:
                    break 
            self.scores_window.append(score)       # save most recent score
            self.scores.append(score)              # save most recent score
            self.eps = max(eps_end, eps_decay*self.eps) # decrease epsilon
            if np.mean(self.scores_window)>=200.0:
                break
        return np.mean(self.scores_window)

SyntaxError: unexpected EOF while parsing (4141694303.py, line 5)