간단한 상황
- Weapon : 50
- Target : 100 (weight : 1~100)
- 적재량 : 3

- 상황 구현
- 에이전트 구현

kernel은 base

목적함수 대신, 새로운 결과치를 내야 한다.
1개 요격 시 보상인데, 오래 걸렸을 수록 감쇠된 보상을 얻어야 한다.
그런데 위험도에 따라 보상이 달라져야겠지.
그럼 weight 만큼의 보상을 주면 되지 않을까

np.random.seed(42)는 요격확률과 초기 위험도를 설정하게 해 줄것이다.


random.seed(42)는 요격시 실제로 요격되는지에 설정할 것이다.

In [1]:
# 필요 라이브러리 import
import collections
import copy
import random

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [None]:
batch_size = 8
buffer_limit = 160
gamma = 0.5
learning_rate = 0.0005


In [4]:
# Check if CUDA is available
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")

class WTAWorld():
    def __init__(self, W, T, M):
        self.W = W
        self.T = T
        self.M = M

        # Target t에 weapon w를 할당할 때의 요격확률 생성
        self.PK_wt = torch.rand(W, T, device=device)
        
        # Target index 할당
        self.targets = torch.arange(T, device=device)
        
        # Target 별 초기 위험도 할당
        self.init_weights = torch.randint(1, 100, (T,), device=device)
        
        # Target의 현재 위험도 설정 (0 = 파괴)
        self.weights = self.init_weights.clone()
        self.M_W = [self.M] * self.W
        self.left_M_W = self.M_W

    def step(self, theta_wt):
        indices = torch.nonzero(theta_wt == 1).tolist()

        for coord in indices:
            w, t = coord
            if self.PK_wt[w][t] > 0 and self.left_M_W[w] > 0:
                pk_probability = self.PK_wt[w][t].item()
                random_value = random.random()
                if random_value < pk_probability:
                    self.weights[t] = 0
                self.left_M_W[w] -= 1

        done = self.is_done()
        r = torch.sum(self.init_weights - self.weights).item()
        self.init_weights = self.weights.clone()
        
        return self.PK_wt * self.weights, r, done

    def is_done(self):
        return torch.sum(torch.tensor(self.left_M_W)) == 0 or torch.sum(self.weights) == 0

    def get_state(self):
        return (self.PK_wt, self.weights, self.left_M_W)

    def reset(self):
        self.PK_wt = torch.rand(self.W, self.T, device=device)
        self.targets = torch.arange(self.T, device=device)
        self.init_weights = torch.randint(1, 100, (self.T,), device=device)
        self.weights = self.init_weights.clone()
        self.M_W = [self.M] * self.W
        self.left_M_W = self.M_W
        
        return self.PK_wt * self.weights


In [5]:
# Greedy Algorithm

def greedy(state):
    PK_wt = state[0]
    targets_weights = state[1]
    left_M_W = state[2]
    
    w = PK_wt.shape[0]
    t = PK_wt.shape[1]
    
    # w의 발사 여부
    m_w = [1 if m > 0 else m for m in left_M_W]
    # 할당 여부를 나타내는 theta_wt
    theta_wt = np.zeros((w, t), dtype=int)

    flattened = (PK_wt*targets_weights).flatten()
    sorted_indices = np.argsort(flattened)[::-1]
    
    i = 0
    indices_list = []
    while True :
        if i == w * t :
            break
        v = sorted_indices[i]
        row = v // t # Weapon
        col = v % t # Target

        if left_M_W[row] > 0 and m_w[row] == 1:
            
            m_w[row] -= 1
            theta_wt[row][col] = 1
            indices_list.append(v)
            print(f"weapon {row}를 target {col}에 할당, 요격확률*가중치:{flattened[v]:.2f} ")

        # 할당할 수 있는 유도탄의 개수를 모두 소모했다면 종료.
        if sum(left_M_W) == 0:
            break

        i += 1


    return theta_wt

In [7]:
# GPU라 안 됨
# env=WTAWorld(50,100,2)
# step_i = 1
# while True : 
#     print(f"{step_i}번째 스텝")
#     theta_wt =greedy(env.get_state())
#     print("------발사------")
#     s_prime, r, done = env.step(theta_wt)
#     print("------요격결과------")
#     print(env.get_state()[1])
#     print(f"보상: {r}")
#     if env.is_done():
#         break
#     step_i+=1

In [8]:
def main():
    env = WTAWorld(50,100,2)
    step_i = 1
    while True : 
        print(f"{step_i}번째 스텝")
        theta_wt = greedy(env.get_state())
        print("------발사------")
        s_prime, r, done = env.step(theta_wt)
        print("------요격결과------")
        print(wta.get_state()[0][1])
        if env.is_done():
            break
        step_i+=1
        print(f"보상: {r}")

In [9]:
class ReplayBuffer():
    def __init__(self):
        self.buffer = collections.deque(maxlen=buffer_limit)
    
    def put(self, transition):
        self.buffer.append(transition)
        
    def sample(self, n):
        mini_batch = random.sample(self.buffer, n)
        s_lst, a_lst, r_lst, s_prime_lst, done_mask_lst = [], [], [], [], []
        
        for transition in mini_batch:
            s, a, r, s_prime, done_mask = transition
            s_lst.append(s)
            a_lst.append(a)
            r_lst.append([r])
            s_prime_lst.append(s_prime)
            done_mask_lst.append([done_mask])
        
        # print(torch.tensor(s_lst, dtype=torch.float))
        # print(torch.tensor(a_lst))
        # print(torch.tensor(r_lst))
        # print(torch.tensor(s_prime_lst, dtype=torch.float))
        # print(torch.tensor(done_mask_lst))
        return torch.tensor(s_lst, dtype=torch.float), torch.tensor(a_lst), torch.tensor(r_lst), torch.tensor(s_prime_lst, dtype=torch.float), torch.tensor(done_mask_lst)
    
    def size(self):
        return len(self.buffer)

In [10]:
class Qnet(nn.Module):
    def __init__(self, W, T):
        super(Qnet, self).__init__()
        self.W = W
        self.T = T
        self.fc1 = nn.Linear(self.W*self.T, self.W*self.T*2)
        self.fc2 = nn.Linear(self.W*self.T*2, self.W*self.T*2)
        self.fc3 = nn.Linear(self.W*self.T*2, self.W*self.T)
        
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.sigmoid(self.fc3(x))
        return x
    
    def sample_action(self, obs, epsilon):
        out = self.forward(obs)
        # print(out)
        # print(out.shape)
        reshaped_out = out.reshape(self.W,self.T)
        
        one_hot = torch.zeros_like(reshaped_out)
        
        coin = random.random()
        if coin < epsilon:
            random_indices = torch.randint(0, reshaped_out.shape[1], (reshaped_out.shape[0],))
            one_hot[torch.arange(reshaped_out.shape[0]), random_indices] = 1
            return one_hot
        else :
            max_indices = torch.argmax(reshaped_out, dim=1)
            one_hot[torch.arange(reshaped_out.shape[0]), max_indices] = 1
            return one_hot

In [11]:
# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def train(q, q_target, memory, optimizer):
    # Transfer models to GPU
    q = q.to(device)
    q_target = q_target.to(device)
    
    for i in range(10):
        s, a, r, s_prime, done_mask = memory.sample(batch_size)
        
        # Transfer data to GPU
        s = s.to(device)
        a = a.to(device)
        r = r.to(device)
        s_prime = s_prime.to(device)
        done_mask = done_mask.to(device)

        q_out = q(s.flatten(start_dim=1))
        q_a = a.flatten(start_dim=1) * q_out
        max_q_prime = q_target(s_prime.flatten(start_dim=1)).max(1)[0].unsqueeze(1)
        target = r + gamma * max_q_prime * done_mask
        loss = F.smooth_l1_loss(q_a, target)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()


In [14]:
# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def main():
    env = WTAWorld(10,10,3)

    # Transfer the Q networks to GPU
    q = Qnet(env.W, env.T).to(device)
    q_target = Qnet(env.W, env.T).to(device)
    q_target.load_state_dict(q.state_dict())
    memory = ReplayBuffer()
    
    print_interval = 20
    score = 0.0
    optimizer = optim.Adam(q.parameters(), lr = learning_rate)
    
    for n_epi in range(1000):
        epsilon = max(0.01, 0.08, - 0.01*(n_epi/200))
        s = env.reset()

        # Convert s to a tensor and transfer to GPU
        s = s.to(device)
        done = False
        
        while not done:
            a = q.sample_action(s.flatten(), epsilon)
            s_prime, r, done = env.step(a)  # Assuming a is a tensor with one item

            # Convert s_prime to a tensor and transfer to GPU
            s_prime = s_prime.to(device)
            done_mask = 0.0 if done else 1.0
            memory.put((s.cpu().numpy(), a.cpu().numpy(), r/100.0, s_prime.cpu().numpy(), done_mask))  # Assuming you want to store numpy arrays in memory
            s = s_prime
            score += r
            if done:
                break
        
        if memory.size()>2000:
            train(q, q_target, memory, optimizer)
        
        if n_epi%print_interval == 0 and n_epi !=0:
            q_target.load_state_dict(q.state_dict())
            print(f"n_episode: {n_epi}, score: {(score/print_interval):.1f}, n_buffer: {memory.size()}, eps: {(epsilon*100):.1f}%")
            score = 0.0


In [15]:
main()

n_episode: 20, score: 341.6, n_buffer: 63, eps: 8.0%
n_episode: 40, score: 324.2, n_buffer: 123, eps: 8.0%
n_episode: 60, score: 272.1, n_buffer: 160, eps: 8.0%
n_episode: 80, score: 321.4, n_buffer: 160, eps: 8.0%
n_episode: 100, score: 313.4, n_buffer: 160, eps: 8.0%
n_episode: 120, score: 338.2, n_buffer: 160, eps: 8.0%
n_episode: 140, score: 323.9, n_buffer: 160, eps: 8.0%
n_episode: 160, score: 335.1, n_buffer: 160, eps: 8.0%
n_episode: 180, score: 342.3, n_buffer: 160, eps: 8.0%
n_episode: 200, score: 342.6, n_buffer: 160, eps: 8.0%
n_episode: 220, score: 315.2, n_buffer: 160, eps: 8.0%
n_episode: 240, score: 288.9, n_buffer: 160, eps: 8.0%
n_episode: 260, score: 355.1, n_buffer: 160, eps: 8.0%
n_episode: 280, score: 367.4, n_buffer: 160, eps: 8.0%
n_episode: 300, score: 328.4, n_buffer: 160, eps: 8.0%
n_episode: 320, score: 347.0, n_buffer: 160, eps: 8.0%
n_episode: 340, score: 346.2, n_buffer: 160, eps: 8.0%
n_episode: 360, score: 367.6, n_buffer: 160, eps: 8.0%
n_episode: 380,

In [None]:
env.get_state()

(tensor([[0.4584, 0.0016, 0.5562,  ..., 0.3055, 0.6100, 0.7293],
         [0.5924, 0.3393, 0.3580,  ..., 0.5522, 0.4694, 0.5062],
         [0.2949, 0.4367, 0.6616,  ..., 0.5104, 0.2648, 0.3293],
         ...,
         [0.9325, 0.2087, 0.9674,  ..., 0.2131, 0.5445, 0.2883],
         [0.3481, 0.7200, 0.4424,  ..., 0.9947, 0.3472, 0.7071],
         [0.6363, 0.9461, 0.4184,  ..., 0.4096, 0.7083, 0.3928]],
        device='cuda:0'),
 tensor([ 5, 83, 71, 33, 56, 32, 98, 95,  3, 60, 13, 69, 87,  9, 33, 27, 60, 42,
         60, 74, 46, 84,  8, 20, 48, 63, 83, 13, 16, 74, 69,  5, 15, 47, 70, 39,
         15,  3, 67,  4, 76, 57, 57, 64, 54,  6, 75, 17, 10, 17, 55, 27, 18, 67,
         23, 24,  2, 92, 96, 37, 39, 42, 23, 20, 67, 34, 88, 43, 33, 62,  5, 11,
         66, 52, 73, 61, 10, 53, 31, 51, 76, 55, 48, 45,  2,  4, 94, 15, 25, 53,
         83, 38, 51, 62, 40, 22, 36, 19, 52, 23], device='cuda:0'),
 [2,
  2,
  2,
  2,
  2,
  2,
  2,
  2,
  2,
  2,
  2,
  2,
  2,
  2,
  2,
  2,
  2,
  2,
  2,
 

In [None]:
q