# TD,SARSA,Q-learning

In [1]:
import numpy as np
import time

In [2]:
class Environment():
    def __init__(self,grid_size):
        self.grid_size = np.array(grid_size)
        self.S = [np.array([i,j]) for i in range(grid_size[0]) for j in range(grid_size[1])]
        self.Terminal_states = [(0,0),(self.grid_size[0]-1,self.grid_size[1]-1)]
        self.A = ["W","E","N","S"]
        self.A_to_coord = {"W" : [0,-1],"E" : [0,1], "N" : [-1,0], "S" : [1,0]}
        for k,v in self.A_to_coord.items():
            self.A_to_coord[k] = np.array(v)
        self.gamma = 1
        self.R = -1
    
    def move(self,s,a):
        """
        Input
        s : current_state(position)
        a : action(from deterministic policy π)
        output
        s_next : state after one_step transition
        ex)
        Input : s = [0,1],a = "W"
        Output : s_next = [0,0]
        """
        s_next = s + self.A_to_coord[a]
        if s_next[0] < 0 or s_next[1] <0 or s_next[0] >= self.grid_size[0] or s_next[1] >= self.grid_size[1]:
            return s # |S|를 넘어갈 경우, 원래 상태를 반환
        else:
            return s_next
        
    def move_test(self):
        S = [np.array([i,j]) for i in [0,1,2,3] for j in [0,1,2,3]]
        for s in S:
            for a in self.A:
                print(s,a,self.move(s,a))
    def dynamics(self,s_prime,r,s,a):
        r=-1
        """
        Input
        s : current_state(position)
        a : action(from deterministic policy π)
        s_prime : all of the possible states after one-step transition
        r : immediate reward(-1)
        output
        0 if s에서 a로 움직였을때의 변화된 상태 next_s와 s_prime의 input이 다를때
        1 if s에서 a로 움직였을때의 변화된 상태 next_s와 s_prime의 input이 같을때
        즉, a방향으로 움직이면 반드시 a방향으로 감(deterministic)
        """
        s_next = self.move(s,a)
        if np.sum(s_next != s_prime)>=1:
            return 0
        else:
            return 1
    def dynamics_test(self,check_p = 1):
        r=-1
        for s in self.S:
            for a in self.A:
                for s_prime in self.S: #가능한 모든 next_state
                    if self.dynamics(s_prime,r,s,a) == check_p:
                        print(f"state : {s} action : {a} s_prime : {s_prime} dynamics : {self.dynamics(s_prime,r,s,a)}")
    def sampling_action(self,s_t,π):
        """
        Input current state s_t : tuple
        ouput action : str (a ~ π(a|s))
        """
        actions = []
        prob = []
        for a,p in π[s_t].items():
            actions.append(a)
            prob.append(p)
        #print(prob)
        return np.random.choice(a=actions,p=prob)
    def generate_π(self,uniform=False):
        """
        Input : NA
        Output : {(x,y) : {"W" : pr1,"E" : pr2 ,...}}
        ex)
        {(0,0):{"W" : 0.1, "E" : 0.2, ...},(0,1):{"W":0.4,"E":0.5...}}
        """
        π = {(i,j): {} for i in range(self.grid_size[0]) for j in range(self.grid_size[1])}
        for t in π.values():
            unnormalized_prob = np.random.rand(4)
            if uniform == False:
                prob = unnormalized_prob/np.sum(unnormalized_prob)
            else:
                prob = [0.25] * 4
            for i in range(len(self.A)):
                t[self.A[i]] = prob[i]
        return π
    def argmax_a_Q(self,Q,s):
        max_action = "W"
        max_value = -5000
        for visit,Q_val in Q.items():
            if visit[0] == s:
                if Q_val > max_value:
                    max_action = visit[1]
                    max_value = Q_val                    
        return max_action

# TD

## strategy1

\begin{aligned}
\alpha(s) = \frac{1}{N(s)}
\end{aligned}

In [134]:
class TD(Environment):
    def __init__(self,grid_size=(4,4)):
        super().__init__(grid_size)
    def prediction(self,s_0,π,iter_nums = 50):
        t = time.time()
        V = {tuple(s) : 0 for s in self.S}
        N = {tuple(s) : 0 for s in self.S}
        for ep_num in range(iter_nums):
            N[s_0] += 1
            s_t = s_0
            while s_t not in self.Terminal_states:
                a_t = self.sampling_action(s_t,π)
                r,s_prime = -1,tuple(self.move(s_t,a_t))
                N[s_prime] += 1
                V[s_t] = V[s_t] + (1/N[s_t]) * (r + 1*V[s_prime] - V[s_t])
                s_t = s_prime
        print(f"lead time : {time.time()-t}")
        return N,V

td = TD()
π = td.generate_π(True)
N,V = td.prediction(s_0 = (2,1),π = π,iter_nums = 10000)
N,V

lead time : 3.7524216175079346


({(0, 0): 5023,
  (0, 1): 8538,
  (0, 2): 10765,
  (0, 3): 10788,
  (1, 0): 11414,
  (1, 1): 14717,
  (1, 2): 12761,
  (1, 3): 10782,
  (2, 0): 19311,
  (2, 1): 26842,
  (2, 2): 14842,
  (2, 3): 8566,
  (3, 0): 19240,
  (3, 1): 19401,
  (3, 2): 11260,
  (3, 3): 4977},
 {(0, 0): 0,
  (0, 1): -6.057028059816423,
  (0, 2): -8.23025593693218,
  (0, 3): -8.853066340068406,
  (1, 0): -6.0761203632257015,
  (1, 1): -7.667381252080251,
  (1, 2): -8.359148841208096,
  (1, 3): -8.301703652750412,
  (2, 0): -8.256318312924831,
  (2, 1): -8.343092685408902,
  (2, 2): -7.68047765577927,
  (2, 3): -6.103321633914351,
  (3, 0): -8.838984661536804,
  (3, 1): -8.232128867802299,
  (3, 2): -6.042348547612324,
  (3, 3): 0})

- True value에 수렴하지 않음
- 혹시 iteration을 덜해서 수렴하지 않은걸까?
    - 훨씬 많은 숫자를 부여함.

In [135]:
class TD(Environment):
    def __init__(self,grid_size=(4,4)):
        super().__init__(grid_size)
    def prediction(self,s_0,π,iter_nums = 50):
        t = time.time()
        V = {tuple(s) : 0 for s in self.S}
        N = {tuple(s) : 0 for s in self.S}
        for ep_num in range(iter_nums):
            N[s_0] += 1
            s_t = s_0
            while s_t not in self.Terminal_states:
                a_t = self.sampling_action(s_t,π)
                r,s_prime = -1,tuple(self.move(s_t,a_t))
                N[s_prime] += 1
                V[s_t] = V[s_t] + (1/N[s_t]) * (r + 1*V[s_prime] - V[s_t])
                s_t = s_prime
        print(f"lead time : {time.time()-t}")
        return N,V

td = TD()
π = td.generate_π(True)
N,V = td.prediction(s_0 = (2,1),π = π,iter_nums = 1000000)
N,V

lead time : 376.06808495521545


({(0, 0): 500675,
  (0, 1): 857102,
  (0, 2): 1071635,
  (0, 3): 1068941,
  (1, 0): 1141288,
  (1, 1): 1497208,
  (1, 2): 1284379,
  (1, 3): 1069050,
  (2, 0): 1930091,
  (2, 1): 2714779,
  (2, 2): 1498691,
  (2, 3): 854888,
  (3, 0): 1928620,
  (3, 1): 1929364,
  (3, 2): 1141312,
  (3, 3): 499325},
 {(0, 0): 0,
  (0, 1): -7.710187158147642,
  (0, 2): -10.649747597920722,
  (0, 3): -11.512033496895874,
  (1, 0): -7.772392586046007,
  (1, 1): -9.839184282402476,
  (1, 2): -10.751129025128156,
  (1, 3): -10.654188503693032,
  (2, 0): -10.819929137588238,
  (2, 1): -10.85383247342236,
  (2, 2): -9.84822261146621,
  (2, 3): -7.712727075121741,
  (3, 0): -11.760013045162655,
  (3, 1): -10.832300462530744,
  (3, 2): -7.79454500536325,
  (3, 3): 0})

- 늘렸음에도 불구하고 True value function에 한참 벗어남을 알 수 있음
- MC처럼 return의 갯수로 $\alpha$를 정하는 것은 잘못된 것 같음.
- return의 갯수가 점점 늘어나서 분모가 커짐에도 불구하고 true value function에 수렴하지 않는 것을 보아하니 작은 값으로 $\alpha$를 잡아야 할듯함.

## strategy2

\begin{aligned}
\alpha = k
\end{aligned}

In [9]:
class TD(Environment):
    def __init__(self,grid_size=(4,4)):
        super().__init__(grid_size)
    def prediction(self,s_0,π,iter_nums,α=0.1):
        V = {tuple(s) : 0 for s in self.S}
        for ep_num in range(iter_nums):
            s_t = s_0
            while s_t not in self.Terminal_states:
                a_t = self.sampling_action(s_t,π)
                r,s_prime = -1,tuple(self.move(s_t,a_t))
                V[s_t] = V[s_t] + α * (r + 1*V[s_prime] - V[s_t])
                s_t = s_prime
        return V

In [10]:
td = TD()
π = td.generate_π(True)
V = td.prediction(s_0 = (2,1),π = π,iter_nums = 10000,α = 0.25)
V

{(0, 0): 0,
 (0, 1): -8.39849373243713,
 (0, 2): -21.498903370973046,
 (0, 3): -23.920539288117975,
 (1, 0): -18.9460598907016,
 (1, 1): -17.750880979198747,
 (1, 2): -20.324083266925584,
 (1, 3): -18.930470481543285,
 (2, 0): -21.57693034519246,
 (2, 1): -22.12603238307893,
 (2, 2): -20.463756775083247,
 (2, 3): -11.004329018730525,
 (3, 0): -23.981268316159696,
 (3, 1): -21.34414457966366,
 (3, 2): -10.81270597397484,
 (3, 3): 0}

- $\alpha = 0.25$로 두면 어느정도 True Value에 좀 가까워진듯 한 느낌이 듦
- 더 반복수를 키워보면 수렴하지 않을까 싶음.

In [11]:
td = TD()
π = td.generate_π(True)
V = td.prediction(s_0 = (2,1),π = π,iter_nums = 100000,α = 0.25)
V

{(0, 0): 0,
 (0, 1): -10.892909639267481,
 (0, 2): -16.389984823596123,
 (0, 3): -20.578998512300483,
 (1, 0): -10.115580086229546,
 (1, 1): -16.7374227182455,
 (1, 2): -19.376736534483427,
 (1, 3): -17.403295865603702,
 (2, 0): -17.706264918772543,
 (2, 1): -18.118900013790565,
 (2, 2): -19.26210200598588,
 (2, 3): -6.951457310762545,
 (3, 0): -20.307685831910902,
 (3, 1): -21.122118838034453,
 (3, 2): -15.332188651767787,
 (3, 3): 0}

- 조금 더 True값과 가까워진 느낌은 있지만 그래도 좀 거리가 음.
- $\alpha$를 더 줄여봐야 할 것 같음.

In [12]:
td = TD()
π = td.generate_π(True)
V = td.prediction(s_0 = (2,1),π = π,iter_nums = 100000,α = 0.025)
V

{(0, 0): 0,
 (0, 1): -13.468481120602783,
 (0, 2): -18.04521926868933,
 (0, 3): -20.830446568464893,
 (1, 0): -14.162405826062944,
 (1, 1): -17.51880803992086,
 (1, 2): -18.99287999167345,
 (1, 3): -19.31588268251589,
 (2, 0): -19.68613223150354,
 (2, 1): -19.616839196046627,
 (2, 2): -17.409760812877337,
 (2, 3): -12.103952443141019,
 (3, 0): -21.66401720239512,
 (3, 1): -19.851963302190605,
 (3, 2): -14.369576703234577,
 (3, 3): 0}

- 더 가까워진 느낌이 드는데?
- 더 줄여보자.

In [13]:
td = TD()
π = td.generate_π(True)
V = td.prediction(s_0 = (2,1),π = π,iter_nums = 100000,α = 0.001)
V

{(0, 0): 0,
 (0, 1): -14.03275753892002,
 (0, 2): -20.120670066469458,
 (0, 3): -22.087632181402803,
 (1, 0): -14.356100397943504,
 (1, 1): -18.06656086981896,
 (1, 2): -20.057873409291272,
 (1, 3): -19.933245308012694,
 (2, 0): -20.11923861351503,
 (2, 1): -20.10721474631147,
 (2, 2): -18.017930371514616,
 (2, 3): -13.836185455422145,
 (3, 0): -22.0836078738287,
 (3, 1): -20.00180431090046,
 (3, 2): -14.000978315795132,
 (3, 3): 0}

- True값을 어느정도 잘 근사한다.
- 혹시 반복수를 더 늘리면 값이 증가하는 거 아닐까? 이게 정말 수렴한걸까?

In [15]:
td = TD()
π = td.generate_π(True)
V = td.prediction(s_0 = (2,1),π = π,iter_nums = 500000,α = 0.001)
V

{(0, 0): 0,
 (0, 1): -14.023947065291312,
 (0, 2): -19.88466015535378,
 (0, 3): -22.019204023526456,
 (1, 0): -13.997593366671204,
 (1, 1): -17.867270764828138,
 (1, 2): -19.89726566201253,
 (1, 3): -19.982618186721407,
 (2, 0): -19.907105325573927,
 (2, 1): -19.89049628015864,
 (2, 2): -17.926426396792216,
 (2, 3): -13.883623908772902,
 (3, 0): -21.86589991111874,
 (3, 1): -19.918957752488122,
 (3, 2): -14.041869061880227,
 (3, 3): 0}

- 수렴한 것이 맞는 것 같다.
- Insight
    1. $\alpha$가 감소하면 반드시 True에 수렴한다고 나와 있었다. 하지만 $\frac{1}{N}$으로 감소시켜도 True Value수렴하진 않았다. 아마 감소폭을 더 늘려야 할 듯 싶다.
    2. $\alpha$가 상수이면 평균적으로 True에 수렴한다고 나와있었다. 실험결과 $\alpha$만 적절하면 100% 모두 True Value에 수렴했다.
    3. TD에서 step size alpha는 기본적으로 작게 설정해줘야 한다. 최대 0.01이고 반드시 그 이하여야 할 것 같다.

# SARSA(On-Policy TD Control)

\begin{aligned}
\pi(a|s) = \begin{cases}
1-\epsilon + \frac{\epsilon}{|A(s)|}\quad(A = \text{greedy action})\\
\epsilon /|A(s)| \quad\quad\quad\quad (\text{otherwise}) 
\end{cases}
\end{aligned}

In [1]:
import numpy as np
import time
class Environment():
    def __init__(self,grid_size):
        self.grid_size = np.array(grid_size)
        self.S = [np.array([i,j]) for i in range(grid_size[0]) for j in range(grid_size[1])]
        self.Terminal_states = [(0,0),(self.grid_size[0]-1,self.grid_size[1]-1)]
        self.A = ["W","E","N","S"]
        self.A_to_coord = {"W" : [0,-1],"E" : [0,1], "N" : [-1,0], "S" : [1,0]}
        for k,v in self.A_to_coord.items():
            self.A_to_coord[k] = np.array(v)
        self.gamma = 1
        self.R = -1
    
    def move(self,s,a):
        """
        Input
        s : current_state(position)
        a : action(from deterministic policy π)
        output
        s_next : state after one_step transition
        ex)
        Input : s = [0,1],a = "W"
        Output : s_next = [0,0]
        """
        s_next = s + self.A_to_coord[a]
        if s_next[0] < 0 or s_next[1] <0 or s_next[0] >= self.grid_size[0] or s_next[1] >= self.grid_size[1]:
            return s # |S|를 넘어갈 경우, 원래 상태를 반환
        else:
            return s_next
        
    def move_test(self):
        S = [np.array([i,j]) for i in [0,1,2,3] for j in [0,1,2,3]]
        for s in S:
            for a in self.A:
                print(s,a,self.move(s,a))
    def dynamics(self,s_prime,r,s,a):
        r=-1
        """
        Input
        s : current_state(position)
        a : action(from deterministic policy π)
        s_prime : all of the possible states after one-step transition
        r : immediate reward(-1)
        output
        0 if s에서 a로 움직였을때의 변화된 상태 next_s와 s_prime의 input이 다를때
        1 if s에서 a로 움직였을때의 변화된 상태 next_s와 s_prime의 input이 같을때
        즉, a방향으로 움직이면 반드시 a방향으로 감(deterministic)
        """
        s_next = self.move(s,a)
        if np.sum(s_next != s_prime)>=1:
            return 0
        else:
            return 1
    def dynamics_test(self,check_p = 1):
        r=-1
        for s in self.S:
            for a in self.A:
                for s_prime in self.S: #가능한 모든 next_state
                    if self.dynamics(s_prime,r,s,a) == check_p:
                        print(f"state : {s} action : {a} s_prime : {s_prime} dynamics : {self.dynamics(s_prime,r,s,a)}")
    def sampling_action(self,s_t,π):
        """
        Input current state s_t : tuple
        ouput action : str (a ~ π(a|s))
        """
        actions = []
        prob = []
        for a,p in π[s_t].items():
            actions.append(a)
            prob.append(p)
        #print(prob)
        return np.random.choice(a=actions,p=prob)
    def generate_π(self,uniform=False):
        """
        Input : NA
        Output : {(x,y) : {"W" : pr1,"E" : pr2 ,...}}
        ex)
        {(0,0):{"W" : 0.1, "E" : 0.2, ...},(0,1):{"W":0.4,"E":0.5...}}
        """
        π = {(i,j): {} for i in range(self.grid_size[0]) for j in range(self.grid_size[1])}
        for t in π.values():
            unnormalized_prob = np.random.rand(4)
            if uniform == False:
                prob = unnormalized_prob/np.sum(unnormalized_prob)
            else:
                prob = [0.25] * 4
            for i in range(len(self.A)):
                t[self.A[i]] = prob[i]
        return π
    def argmax_a_Q(self,Q,s):
        max_action = "W"
        max_value = -5000
        for visit,Q_val in Q.items():
            if visit[0] == s:
                if Q_val > max_value:
                    max_action = visit[1]
                    max_value = Q_val                    
        return max_action

In [58]:
class SARSA(Environment):
    def __init__(self,grid_size=(4,4)):
        super().__init__(grid_size)
    def control(self,s_0,iter_nums,α=0.9,ϵ = 0.9995):
        Q = {(tuple(s),a):0 for s in self.S for a in self.A}
        π = self.generate_π()
        action_space_size = len(self.A)
        
        count=0
        for ep_num in range(1,iter_nums):
            s_t = s_0;a = self.sampling_action(s_0,π)
            a_t = self.sampling_action(s_t,π)
            epsilon = ϵ ** (ep_num + 1) #실수한 부분
            alpha = α**(ep_num+1) #GLIE?
            
            if ep_num % 1000 == 0:
                print(f"episode : {ep_num}, epsilon : {epsilon}, alpha : {alpha}")
            
            while s_t not in self.Terminal_states:
                count+=1
                r,s_prime = -1,tuple(self.move(s_t,a_t))
                #sampling action ~ ϵ-greedy action
                #1. make ϵ-greedy policy
                a_star = self.argmax_a_Q(Q,s_prime)
                for action,prob in π[s_prime].items():
                    if action == a_star:
                        π[s_prime][action] = 1 - epsilon + epsilon/action_space_size
                    else:
                        π[s_prime][action] = epsilon/action_space_size
                #2. sampling action
                a_prime = self.sampling_action(s_prime,π)
                Q[(s_t,a_t)] = Q[(s_t,a_t)] + alpha*(r + 1*Q[(s_prime,a_prime)] - Q[(s_t,a_t)])
                #print(alpha*(r + 1*Q[(s_prime,a_prime)] - Q[(s_t,a_t)]))
                s_t = s_prime;a_t = a_prime
                count+=1
        return Q,π

In [59]:
sarsa = SARSA()
Q,π = sarsa.control(s_0 = (2,1),iter_nums = 100000,α = np.exp(np.log(0.0001)/100000),ϵ = 0.9999)

episode : 1000, epsilon : 0.9047424102692003, alpha : 0.9119268439216149
episode : 2000, epsilon : 0.818640693009023, alpha : 0.8316871663561808
episode : 3000, epsilon : 0.740733027040136, alpha : 0.7585077106700777
episode : 4000, epsilon : 0.6702396082111145, alpha : 0.6917672538661829
episode : 5000, epsilon : 0.6064548440752158, alpha : 0.6308992338374627
episode : 6000, epsilon : 0.5487402913771806, alpha : 0.5753869398011344
episode : 7000, epsilon : 0.49651826565897955, alpha : 0.5247591259224881
episode : 8000, epsilon : 0.4492660590208904, alpha : 0.4785860108922666
episode : 9000, epsilon : 0.40651070816152135, alpha : 0.43647562949787516
episode : 10000, epsilon : 0.36782426032832716, alpha : 0.3980705052167768
episode : 11000, epsilon : 0.3328194897939159, alpha : 0.36304461558560236
episode : 12000, epsilon : 0.3011460219829113, alpha : 0.33110062458388595
episode : 13000, epsilon : 0.2724868264544462, alpha : 0.30196735853803136
episode : 14000, epsilon : 0.2465550436373

- 초기에 학습되지 않은 상태의 $\pi$에 의해 Q값이 영향을 많이 받을 수 있다.
- 적절히 탐험함과 동시에 $\epsilon$이 빨리 떨어져서 학습된 정보를 쓸 수 있게 해야 정확한 $Q$값을 구할 수 있다.

In [61]:
Q

{((0, 0), 'W'): 0,
 ((0, 0), 'E'): 0,
 ((0, 0), 'N'): 0,
 ((0, 0), 'S'): 0,
 ((0, 1), 'W'): -1.0,
 ((0, 1), 'E'): -3.3323793522994833,
 ((0, 1), 'N'): -2.0045782177129015,
 ((0, 1), 'S'): -3.3983693586776482,
 ((0, 2), 'W'): -2.123245282636658,
 ((0, 2), 'E'): -5.530109740776886,
 ((0, 2), 'N'): -3.9378481042906377,
 ((0, 2), 'S'): -4.573563369404286,
 ((0, 3), 'W'): -4.03339097693781,
 ((0, 3), 'E'): -5.42422622877365,
 ((0, 3), 'N'): -5.9243412887695985,
 ((0, 3), 'S'): -5.0411702164485215,
 ((1, 0), 'W'): -2.1433973390616656,
 ((1, 0), 'E'): -3.1222171379873207,
 ((1, 0), 'N'): -1.0,
 ((1, 0), 'S'): -3.206357191920265,
 ((1, 1), 'W'): -2.0643897617234646,
 ((1, 1), 'E'): -4.190076134990604,
 ((1, 1), 'N'): -2.264986949722798,
 ((1, 1), 'S'): -4.202230629642308,
 ((1, 2), 'W'): -3.9117031424234776,
 ((1, 2), 'E'): -3.657568832911959,
 ((1, 2), 'N'): -3.77091533257776,
 ((1, 2), 'S'): -3.050559637632963,
 ((1, 3), 'W'): -4.623726264709104,
 ((1, 3), 'E'): -3.7594694049532826,
 ((1, 3)

In [104]:
class DP_prediction(Environment):
    def __init__(self,grid_size=(4,4)):
        super().__init__(grid_size)
    def policy_eval(self,π,iter_nums = 10):
        V = {tuple(s): 0 for s in self.S}
        r = -1;gamma=1
        for iter_num in range(iter_nums):
            for s in self.S:
                s_t = tuple(s) #for each s \in S
                if s_t in self.Terminal_states:
                    continue
                v = V[s_t]
                updt_value = 0
                for a in self.A:
                    total = 0
                    for s_prime in self.S:
                        #print(s_prime,r,s_t,a)
                        #print(self.dynamics(s_prime,r,s_t,a))
                        total += self.dynamics(s_prime,r,s_t,a) * (r + gamma * V[tuple(s_prime)])
                    updt_value += π[s_t][a] * total
                V[s_t] = updt_value
                
        return V
dp_pred= DP_prediction()
V = dp_pred.policy_eval(π,10)
V

{(0, 0): 0,
 (0, 1): -1.0242829182033806,
 (0, 2): -2.141202785727903,
 (0, 3): -3.248416420140333,
 (1, 0): -1.024492195882349,
 (1, 1): -2.0431017158579077,
 (1, 2): -3.0429709841943056,
 (1, 3): -2.1677109327871933,
 (2, 0): -2.0489786066301985,
 (2, 1): -3.041941279994179,
 (2, 2): -2.041870178294855,
 (2, 3): -1.0243719054487954,
 (3, 0): -3.0979206693126837,
 (3, 1): -2.0470834198523264,
 (3, 2): -1.0233902362150626,
 (3, 3): 0}

- DP로 구한 optimal value function과 비교
- DP의 optimal value function의 각각의 value는 optimal action value function의 각각의 $s_t$에서  max action값이 되어야 함.

# Q-learning

In [1]:
import numpy as np
class Environment():
    def __init__(self,grid_size):
        self.grid_size = np.array(grid_size)
        self.S = [np.array([i,j]) for i in range(grid_size[0]) for j in range(grid_size[1])]
        self.Terminal_states = [(0,0),(self.grid_size[0]-1,self.grid_size[1]-1)]
        self.A = ["W","E","N","S"]
        self.A_to_coord = {"W" : [0,-1],"E" : [0,1], "N" : [-1,0], "S" : [1,0]}
        for k,v in self.A_to_coord.items():
            self.A_to_coord[k] = np.array(v)
        self.gamma = 1
        self.R = -1
    
    def move(self,s,a):
        """
        Input
        s : current_state(position)
        a : action(from deterministic policy π)
        output
        s_next : state after one_step transition
        ex)
        Input : s = [0,1],a = "W"
        Output : s_next = [0,0]
        """
        s_next = s + self.A_to_coord[a]
        if s_next[0] < 0 or s_next[1] <0 or s_next[0] >= self.grid_size[0] or s_next[1] >= self.grid_size[1]:
            return s # |S|를 넘어갈 경우, 원래 상태를 반환
        else:
            return s_next
        
    def move_test(self):
        S = [np.array([i,j]) for i in [0,1,2,3] for j in [0,1,2,3]]
        for s in S:
            for a in self.A:
                print(s,a,self.move(s,a))
    def dynamics(self,s_prime,r,s,a):
        r=-1
        """
        Input
        s : current_state(position)
        a : action(from deterministic policy π)
        s_prime : all of the possible states after one-step transition
        r : immediate reward(-1)
        output
        0 if s에서 a로 움직였을때의 변화된 상태 next_s와 s_prime의 input이 다를때
        1 if s에서 a로 움직였을때의 변화된 상태 next_s와 s_prime의 input이 같을때
        즉, a방향으로 움직이면 반드시 a방향으로 감(deterministic)
        """
        s_next = self.move(s,a)
        if np.sum(s_next != s_prime)>=1:
            return 0
        else:
            return 1
    def dynamics_test(self,check_p = 1):
        r=-1
        for s in self.S:
            for a in self.A:
                for s_prime in self.S: #가능한 모든 next_state
                    if self.dynamics(s_prime,r,s,a) == check_p:
                        print(f"state : {s} action : {a} s_prime : {s_prime} dynamics : {self.dynamics(s_prime,r,s,a)}")
    def sampling_action(self,s_t,π):
        """
        Input current state s_t : tuple
        ouput action : str (a ~ π(a|s))
        """
        actions = []
        prob = []
        for a,p in π[s_t].items():
            actions.append(a)
            prob.append(p)
        #print(prob)
        return np.random.choice(a=actions,p=prob)
    def generate_π(self,uniform=False):
        """
        Input : NA
        Output : {(x,y) : {"W" : pr1,"E" : pr2 ,...}}
        ex)
        {(0,0):{"W" : 0.1, "E" : 0.2, ...},(0,1):{"W":0.4,"E":0.5...}}
        """
        π = {(i,j): {} for i in range(self.grid_size[0]) for j in range(self.grid_size[1])}
        for t in π.values():
            unnormalized_prob = np.random.rand(4)
            if uniform == False:
                prob = unnormalized_prob/np.sum(unnormalized_prob)
            else:
                prob = [0.25] * 4
            for i in range(len(self.A)):
                t[self.A[i]] = prob[i]
        return π
    def argmax_a_Q(self,Q,s):
        max_action = "W"
        max_value = -5000
        for visit,Q_val in Q.items():
            if visit[0] == s:
                if Q_val > max_value:
                    max_action = visit[1]
                    max_value = Q_val                    
        return max_action
class Q_learning(Environment):
    def __init__(self,grid_size=(4,4)):
        super().__init__(grid_size)
    
    def control(self,s_0,iter_num,alpha,epsilon):
        Q = {(tuple(s),a) : 0 for s in self.S for a in self.A}
        π = self.generate_π()
        γ = 1
        for ep_num in range(iter_num):
            ϵ = epsilon ** (ep_num + 1)
            α = alpha ** (ep_num + 1)
            if ep_num % 1000 == True:
                print(f"epsilon : {ϵ} alpha : {α}")
            s_t = s_0
            while s_t not in self.Terminal_states:
                a_t = self.sampling_action(s_t,π)
                r,s_prime = -1,tuple(self.move(s_t,a_t))
                Q[(s_t,a_t)] = Q[(s_t,a_t)] + α * (r + γ * Q[(s_prime,self.argmax_a_Q(Q,s_prime))] - Q[(s_t,a_t)])
                
                a_star = self.argmax_a_Q(Q,tuple(s_t))
                for (state,action),value in Q.items():
                    if action == a_star:
                        π[state][action] = 1 - ϵ + ϵ /len(self.A)
                    else:
                        π[state][action] = ϵ/len(self.A)
                s_t = s_prime
        return π,Q
                
q_l = Q_learning()
policy,Q= q_l.control(s_0 = (2,1),iter_num = 10000,alpha = np.exp(np.log(0.001)/10000),epsilon = 0.9995)

epsilon : 0.9990002500000001 alpha : 0.9986194028465246
epsilon : 0.6058485196309606 alpha : 0.5004952959591761
epsilon : 0.36741975664072807 alpha : 0.25084185282524624
epsilon : 0.22282348342150354 alpha : 0.1257187342954265
epsilon : 0.13513237616300078 alpha : 0.06300862465664786
epsilon : 0.0819516812458937 alpha : 0.031579118286324974
epsilon : 0.0496999923314264 alpha : 0.015827050934311866
epsilon : 0.030140800044509277 alpha : 0.007932315874245822
epsilon : 0.0182790335512516 alpha : 0.0039755754492710235
epsilon : 0.011085408054012442 alpha : 0.0019925076614966753


In [2]:
Q

{((0, 0), 'W'): 0,
 ((0, 0), 'E'): 0,
 ((0, 0), 'N'): 0,
 ((0, 0), 'S'): 0,
 ((0, 1), 'W'): -1.0,
 ((0, 1), 'E'): -3.0,
 ((0, 1), 'N'): -2.0,
 ((0, 1), 'S'): -3.0,
 ((0, 2), 'W'): -2.0,
 ((0, 2), 'E'): -4.0,
 ((0, 2), 'N'): -3.0,
 ((0, 2), 'S'): -4.0,
 ((0, 3), 'W'): -3.0,
 ((0, 3), 'E'): -4.0,
 ((0, 3), 'N'): -4.0,
 ((0, 3), 'S'): -3.0,
 ((1, 0), 'W'): -2.0,
 ((1, 0), 'E'): -3.0,
 ((1, 0), 'N'): -1.0,
 ((1, 0), 'S'): -3.0,
 ((1, 1), 'W'): -2.0,
 ((1, 1), 'E'): -4.0,
 ((1, 1), 'N'): -2.0,
 ((1, 1), 'S'): -4.0,
 ((1, 2), 'W'): -3.0,
 ((1, 2), 'E'): -3.0,
 ((1, 2), 'N'): -3.0,
 ((1, 2), 'S'): -3.0,
 ((1, 3), 'W'): -4.0,
 ((1, 3), 'E'): -3.0,
 ((1, 3), 'N'): -4.0,
 ((1, 3), 'S'): -2.0,
 ((2, 0), 'W'): -3.0,
 ((2, 0), 'E'): -4.0,
 ((2, 0), 'N'): -2.0,
 ((2, 0), 'S'): -4.0,
 ((2, 1), 'W'): -3.0,
 ((2, 1), 'E'): -3.0,
 ((2, 1), 'N'): -3.0,
 ((2, 1), 'S'): -3.0,
 ((2, 2), 'W'): -4.0,
 ((2, 2), 'E'): -2.0,
 ((2, 2), 'N'): -4.0,
 ((2, 2), 'S'): -2.0,
 ((2, 3), 'W'): -3.0,
 ((2, 3), 'E'): -2.0,


In [5]:
policy

{(0, 0): {'W': 0.0016823817555366668,
  'E': 0.99495285473339,
  'N': 0.0016823817555366668,
  'S': 0.0016823817555366668},
 (0, 1): {'W': 0.0016823817555366668,
  'E': 0.99495285473339,
  'N': 0.0016823817555366668,
  'S': 0.0016823817555366668},
 (0, 2): {'W': 0.0016823817555366668,
  'E': 0.99495285473339,
  'N': 0.0016823817555366668,
  'S': 0.0016823817555366668},
 (0, 3): {'W': 0.0016823817555366668,
  'E': 0.99495285473339,
  'N': 0.0016823817555366668,
  'S': 0.0016823817555366668},
 (1, 0): {'W': 0.0016823817555366668,
  'E': 0.99495285473339,
  'N': 0.0016823817555366668,
  'S': 0.0016823817555366668},
 (1, 1): {'W': 0.0016823817555366668,
  'E': 0.99495285473339,
  'N': 0.0016823817555366668,
  'S': 0.0016823817555366668},
 (1, 2): {'W': 0.0016823817555366668,
  'E': 0.99495285473339,
  'N': 0.0016823817555366668,
  'S': 0.0016823817555366668},
 (1, 3): {'W': 0.0016823817555366668,
  'E': 0.99495285473339,
  'N': 0.0016823817555366668,
  'S': 0.0016823817555366668},
 (2, 0):