<최초 MC 예측 수도코드 - 출처 : 단단한 강화학습 챕터 5> 

입력 : 평가 대상인 정책 $\pi$ 

초기화 
- 모든 $s \in S$ 에 대해 임의의 값으로 $V(s) \in R$를 초기화 
- 모든 $s \in S$ 에 대해 값이 채워지지 않는 리스트를 Returns(s) 변수에 대입

(각 에피소드에 대해) 무한 루프 :
- 정책 $\pi$를 따르는 하나의 에피소드를 생성 : $S_0, A_0, R_1, S_1, A_1, R_2, ... , S_{T-1}, A_{T-1}, R_T$ 
- 변수 G에 0을 대입 
- 에피소드의 각 단계에 대해 반복 수행, t= T-1, T-2, ..., 0 : 

 - 변수 G에 $\gamma G + R_{t+1}$을 대입 
 
 - $S_t$가 $S_o, S_1, ... S_{t-1}$ 안에 나타나지 않는다면 : 
 
  > 리스트 Returns($S_T$)에 변수 G를 새 항목으로 추가 
  
  > 리스트 Returns($S_T$)에 대한 평균을 $V(S_t)$에 대입 

**구현해야 하는 것** 
- 에피소드 생성 
- Returns(s) : list 
- V(s) : 각 s를 넣었을 때 가치 값이 나오도록 구현(dic) 
- R_T : 가치 함수 
- G : (갱신됨) $\gamma G + R_{t+1}$ 

- 확률값에 따라 random하게 샘플을 뽑아내는 함수(정책 $\pi$에 따라 s' 선택)  

- 정책 $\pi$에 따르는 에피소드 생성 함수

- 각 에피소드별 루프를 돌며 Returns(S_t)를 갱신할 함수 

**필요한 것** 
- $\gamma$ : 입력 값으로 받기 
- S : 모든 상태값의 집합 
- A : 모든 행동값의 집합 


**함수/데이터의 형태** 
- class First_Visit_MC : 

> def __init__(self, S, A, reward_func, pi, gamma = 0.9, num_episode = 10, len_episode = 20) 

> def choice_sample(self, list, prob) : => 단일 s' 값 반환 

> def make_episode(self, start_s, T, pi) : 
=> 최초 s값과 에피소드 길이 T, 정책 $\pi$를 넣으면 각 시간 단계에 따른 S,A,R을 계산하여 dic 형태로 반환. 

> def update_returns(self) : 

**전제**
- 상태 s에서 행동 a를 했을 때 결정론적으로 s' 상태로 변화한다. 

<외부 함수> 
- def reward_func(s',a,s) : 

- def pi(a,s) :


<고민점>
- 정책 $\pi$의 값이 확률론적으로 나왔을 때 A_t를 어떻게 정의할 것인가? 확률에 따른 random한 값으로? 이게 맞는 것 같은데. 

> 확률값에 따라 랜덤하게 s'를 생성한 다음, return_func(s',a,s)에 대입하면 수치상으로 값을 얻을 수 있겠다. 

- 에피소드 생성 간 데이터 형식은 어떻게? S,A,R을 한 번에 넣을 것인가, 아니면 따로 따로 구현할 것인가? 

> dic{"S": []. "A" : [], "R" : []} 형식이면 보기도 깔끔할 것 같다. 

- 추후에 정책 $\pi$를 변경하기 위해선, $\pi$가 class 내부 함수여야 할 것 같다. 

> 아! 아예 처음부터 pi를 외부에서 받으라고 문제에서 정의했기 때문에 고민할 필요 x

- '(각 에피소드에 대해) 무한 루프' 라면, 진짜 무한히 돌리라는 건가? 에피소드의 개수는 어떻게 하고? 

> 일단 임의로 총 에피소드의 개수는 정해야 겠다. 
> 하지만 루프를 멈출 조건이 필요하겠는걸. 모든 에피소드에 대한 갱신이 끝난 다음 마무리 하는 것으로? 

- Returns(S_t) 의 데이터 형식은 어떻게 해야하나? 

> s를 인덱스로 받아, 변수 G를 반환할 수 있는 list여야 한다.

In [15]:
from collections import defaultdict
import random

In [57]:
# 테스트 용 임시 데이터 
S = list(range(100)) 
A = list(range(-5,5))

In [6]:
# 외부용 함수 reward_func, pi 간략 구현 
def reward_func(next_s, a, s) : 
    # next_s 와 s의 차이가 짝수이면 +1, 홀수면 -1 
    # 단, a의 크기에 반비례함. 
    if abs(next_s - s) %2 == 0 : reward = 1 
    else : reward = -1
    
    if a == 0 : 
        return 0 
    else : 
        return reward / a # 즉, a가 양수이며 짝수이며, 가능한 작을 때 (=2) 일 때 최대의 보상이 주어지도록 설정 
    
def pi(a,s) : #s 상황에서 a를 선택할 확률. 전체 합은 1이여야 한다. 
    # 확률은 모두 동일하게 설정 
    
    return 1/10

In [2]:
# 함수 설정 및 초기화 테스트 

class First_Visit_MC : 
    def __init__(self, S, A, reward_func, pi, gamma = 0.9, num_episode = 10, len_episode = 20) : 
        self.S = S 
        self.A = A 
        self.reward_func = reward_func
        self.pi = pi 
        self.gamma = 0.9 
        self.num_episode = num_episode
        self.T = len_episode
        
        self.set_episode = [] # 추후에 어떤 episode들이 있었나 확인용 
        
        self.V, self.return_lst = self.initiate() 
    
    def initiate(self) : # V(s)와 returns(s) 초기화 함수 
        V = defaultdict(float)
        for s in self.S : 
            V[s] = 0 
        return_lst = [0]*len(self.S)
        return V, return_lst
    

In [7]:
test = First_Visit_MC(S,A,reward_func, pi) 
print(test.V, test.return_lst)

defaultdict(<class 'float'>, {0: 0, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0, 8: 0, 9: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0, 16: 0, 17: 0, 18: 0, 19: 0, 20: 0, 21: 0, 22: 0, 23: 0, 24: 0, 25: 0, 26: 0, 27: 0, 28: 0, 29: 0, 30: 0, 31: 0, 32: 0, 33: 0, 34: 0, 35: 0, 36: 0, 37: 0, 38: 0, 39: 0, 40: 0, 41: 0, 42: 0, 43: 0, 44: 0, 45: 0, 46: 0, 47: 0, 48: 0, 49: 0, 50: 0, 51: 0, 52: 0, 53: 0, 54: 0, 55: 0, 56: 0, 57: 0, 58: 0, 59: 0, 60: 0, 61: 0, 62: 0, 63: 0, 64: 0, 65: 0, 66: 0, 67: 0, 68: 0, 69: 0, 70: 0, 71: 0, 72: 0, 73: 0, 74: 0, 75: 0, 76: 0, 77: 0, 78: 0, 79: 0, 80: 0, 81: 0, 82: 0, 83: 0, 84: 0, 85: 0, 86: 0, 87: 0, 88: 0, 89: 0, 90: 0, 91: 0, 92: 0, 93: 0, 94: 0, 95: 0, 96: 0, 97: 0, 98: 0, 99: 0}) [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,

In [86]:
# choice_sample, make_episode 함수 구현 

class First_Visit_MC : 
    def __init__(self, S, A, reward_func, pi, gamma = 0.9, num_episode = 10, len_episode = 20) : 
        self.S = S 
        self.A = A 
        self.reward_func = reward_func
        self.pi = pi 
        self.gamma = 0.9 
        self.num_episode = num_episode
        self.T = len_episode
        
        self.set_episode = [] # 추후에 어떤 episode들이 있었나 확인용 
        
        self.V, self.return_lst, self.s_prob = self.initiate() 
    
    def initiate(self) : # 수정. 각 상태 s별 pi(a,s)의 값을 가지고 있는 dic 추가 생성 
        V = defaultdict(float)
        s_prob = defaultdict(list)
        for s in self.S : 
            V[s] = 0
            s_prob[s] = [0]*len(self.A)
            for index, a in enumerate(self.A) : 
                s_prob[s][index] = self.pi(a,s)

        return_lst = [0]*len(self.S)
        return V, return_lst, s_prob
    
    def choice_sample(self, s) : #수정. s_prob 에 대해 정의한 후 random.choice 함수 사용
        a= random.choices(self.A, weights = self.s_prob[s])
        return a[0], max(s+a[0],0) # a[0] +s 가 음수인 경우는 0으로 조정 
    
    def make_episode(self, start_s, T) :
        s = start_s
        episode = {"S" : [], "A" : [], "R" : []}
        episode["R"].append(0) # R_0 값 부여 
        for _ in range(T) : 
            episode["S"].append(s)
            a, next_s = self.choice_sample(s)
            r = self.reward_func(next_s, a, s)
            episode["A"].append(a)
            episode["R"].append(r)
            s = next_s
        return episode
            
        

In [87]:
# choice_sample, make_episode 함수 테스트
test = First_Visit_MC(S,A,reward_func, pi) 
print(test.make_episode(10,20))

{'S': [10, 10, 11, 8, 6, 8, 6, 2, 0, 0, 1, 0, 0, 0, 2, 2, 3, 0, 0, 4], 'A': [0, 1, -3, -2, 2, -2, -4, -5, -3, 1, -4, -3, -4, 2, 0, 1, -4, 0, 4, -4], 'R': [0, 0, -1.0, 0.3333333333333333, -0.5, 0.5, -0.5, -0.25, -0.2, -0.3333333333333333, -1.0, 0.25, -0.3333333333333333, -0.25, 0.5, 0, -1.0, 0.25, 0, 0.25, -0.25]}


In [120]:
# update_returns 함수 구현. 

class First_Visit_MC : 
    def __init__(self, S, A, reward_func, pi, gamma = 0.9, num_episode = 10, len_episode = 20) : 
        self.S = S 
        self.A = A 
        self.reward_func = reward_func
        self.pi = pi 
        self.gamma = 0.9 
        self.num_episode = num_episode
        self.T = len_episode
        
        self.set_episode = [] # 추후에 어떤 episode들이 있었나 확인용 
        
        self.V, self.return_lst, self.s_prob = self.initiate() 
    
    def initiate(self) : # 수정. 각 상태 s별 pi(a,s)의 값을 가지고 있는 dic 추가 생성 
        V = defaultdict(float)
        s_prob = defaultdict(list)
        for s in self.S : 
            V[s] = 0
            s_prob[s] = [0]*len(self.A)
            for index, a in enumerate(self.A) : 
                s_prob[s][index] = self.pi(a,s)

        return_lst = [0]*len(self.S)
        return V, return_lst, s_prob
    
    def choice_sample(self, s) : 
        a= random.choices(self.A, weights = self.s_prob[s])
        
        # 수정. a[0] +s 가 s의 범위 안에 들어오도록 수정 
        return a[0], min(max(s+a[0],0), 99)  
    
    def make_episode(self, start_s, T) :
        s = start_s
        episode = {"S" : [], "A" : [], "R" : []}
        episode["R"].append(0) # R_0 값 부여 
        for _ in range(T) : 
            episode["S"].append(s)
            a, next_s = self.choice_sample(s)
            r = self.reward_func(next_s, a, s)
            episode["A"].append(a)
            episode["R"].append(r)
            s = next_s
        return episode
            
    def update_returns(self) :
        for _ in range(self.num_episode) :
            # 시작 s는 시작 가정에 따라 랜덤하게 산출하겠음. 
            start_s = random.choice(self.S)
            set_s = set() # t 단계까지 나왔던 s 확인 
            set_s.add(start_s)
            episode = self.make_episode(start_s, self.T)
            G = 0 
            G_lst = [0]*self.T
            
            # G와 returns(S_t)를 업데이트 하는 순서가 각각 역순이라 따로 구현
            for t in reversed(range(self.T)) : 
                G = self.gamma *G + episode["R"][t]
                G_lst[t] = G
                
            for t in range(self.T): 
                s = episode["S"][t]
                if s not in set_s : 
                    self.return_lst[s] = G_lst[self.T-t-1]
                    self.V[s] = sum(self.return_lst) / len([i for i in self.return_lst if i != 0])
                set_s.add(s) 
        
        
        
                
        

In [121]:
# update_returns 구현 테스트 
test = First_Visit_MC(S,A,reward_func, pi) 
test.update_returns()
value = list(test.V.values())
print(value.index(max(value)))


14


위의 상태 가치로만은 계속 값이 변하는 모습을 보인다. 
행동 가치 함수에 대해서 알 때 정확한 정책을 산출할 수 있을 것으로 보인다.