In [2]:
import random
import numpy as np

from pettingzoo.atari import mario_bros_v3


In [25]:

class ReplayMemory:

    def __init__(self,capacity) -> None:
        self.capacity = capacity
        self.memory = []
        self.idx = 0
    
    def toElement(self,state,action,next_state,reward,terminal):
        return {'state':state, 'action':action, 'next_state':next_state,'reward':reward,'terminal': 1-int(terminal)}

    def fromElement(self,elm):
        return elm['state'],elm['action'],elm['next_state'],elm['reward'],elm['terminal']
    
    def store(self,elm):
        if(len(self.memory) < self.capacity):
            self.memory += [elm]
        else:
            self.memory[self.idx] = elm
            self.idx = (self.idx + 1) % self.capacity
    
    def sample(self,batchsize):
        indices_to_sample = random.sample(range(len(self.memory)),k = batchsize)

        return np.array(self.memory)[indices_to_sample]

    def __len__(self):
        return len(self.memory)
    
    def getField(self, memory, name = "state"):
        ans = []
        for i in range(len(memory)):    
            ans += [memory[i][name]]
        return np.array(ans)

In [4]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten, Convolution2D
from tensorflow.keras.optimizers import Adam

from tensorflow.keras.models import load_model

In [36]:
class DQNNet:

    def __init__(self,height,width,num_frames,num_actions):
    
        self.height = height
        self.width = width
        self.num_frames = num_frames
        self.num_actions = num_actions

    def build(self):
        model = Sequential()
        model.add(Convolution2D(16,(8,8),strides = (4,4), activation = 'relu',input_shape=(self.num_frames,self.height,self.width,1)))
        model.add(Convolution2D(32,(8,8),strides = (4,4), activation = 'relu'))
        model.add(Flatten())
        model.add(Dense(512,activation='relu'))
        model.add(Dense(256,activation='relu'))
        model.add(Dense(self.num_actions,activation='linear'))
        self.model = model
        return self.model

    def compile(self,lr):
        self.model.compile(optimizer = Adam(lr=lr),loss='mse')
        return self.model
    
    def predict(self,state):
        actions = self.model.predict(state)
        return actions

In [47]:
class Agent:
    def __init__(self,height,widht,num_frames,n_actions,epsilon,batch_size,alpha=0.0005,gamma=0.996,epsilon_step=0.00001,epsilon_min=0.01,mem_size=1000000,fname='dqn_model.h5'):
        self.action_space = [i for i in range(n_actions)]
        self.n_actions = n_actions
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_step = epsilon_step
        self.epsilon_min = epsilon_min
        self.batch_size = batch_size
        self.model_file = fname
        self.memory = ReplayMemory(mem_size)
        
        self.dqnnet = DQNNet(height,widht,num_frames,n_actions)
        self.dqnnet.build()
        self.dqnnet.compile(alpha)

    def choose_action(self,state):

        # state = state[np.newaxis,:]

        rand = np.random.random()
        if(rand < self.epsilon):
            action = np.random.choice(self.action_space)
        else:
            action = self.dqnnet.predict(state)
        
        return action
    
    def remember(self,state,action,next_state,reward,done):
        self.memory.store(self.memory.toElement(state,action,next_state,reward,done))
    
    def learn(self):
        if(len(self.memory) < self.batch_size):
            return
        # fill up the memory with random actions

        mem_sample = self.memory.sample(self.batch_size)
    
        state = self.memory.getField(mem_sample,name='state')
        action = self.memory.getField(mem_sample,name='action')
        next_state = self.memory.getField(mem_sample,name='next_state')
        reward = self.memory.getField(mem_sample,name='reward')
        terminal = self.memory.getField(mem_sample,name='terminal')

        action_indices = action
        print('inside learn')

        q_eval = self.dqnnet.predict(state)
        q_next = self.dqnnet.predict(next_state)

        q_target = q_eval.copy()

        batch_index = np.arange(self.batch_size, dtype = np.int32)

        
        q_target[batch_index, action_indices] = reward + self.gamma * np.max(q_next, axis=1)*terminal
        
        _ = self.dqnnet.model.fit(state,q_target,verbose=0)
    
        if(self.epsilon > self.epsilon_min):
            self.epsilon -= self.epsilon_step

    def save_model(self):
        self.q_eval.save(self.model_file)
    
    def load_model(self):
        self.q_eval = load_model(self.model_file)

In [8]:
env = mario_bros_v3.env(obs_type = 'grayscale_image')
env.reset()

  deprecation(
  deprecation(


In [9]:
def normal_step(agent,state,score,name='first_0'):
    action = agent.choose_action(np.array(state))
    print(action)
    reward = env.rewards[name]
    done = env.dones[name]
    observation = env.observe(name)

    old_state = np.array(state.copy())

    state.pop(0)
    state+=[observation]
    score+=reward
    agent.remember(old_state,action1,np.array(state),reward,done)
    agent.learn()

    return agent,state,action,score,done

def dont_learn_step(agent,state,score,name='first_0'):
    action = agent.choose_action(np.array(state))
    print(action)
    reward = env.rewards[name]
    done = env.dones[name]
    observation = env.observe(name)

    old_state = np.array(state.copy())

    state.pop(0)
    state+=[observation]
    score+=reward
    if(reward!=0):
        agent.remember(old_state,action1,np.array(state),reward,done)

    return agent,state,action,score,done

In [48]:
env.reset()
state, r, d, info = env.last()

agent1 = Agent(210,160,4,18,1.0,32)
agent2 = Agent(210,160,4,18,1.0,32)

print(state.shape)

score1 = 0
score2 = 0
state1 = []
state2 = []
for i in range(4):
    o,r,d,i = env.last()
    state1+=[o]
    score1 += r
    env.step(0)
    o,r,d,i = env.last()
    state2+=[o]
    score2+=r
    env.step(0)

action1 = 0
action2 = 0

for i in range(100):
    agent1,state1,action1,score1,done1 = normal_step(agent1,state1,score1)
    agent2,state2,action2,score2,done2 = normal_step(agent2,state2,score2)

    for i in range(3):
        agent1,state1,action1,score1,done1 = dont_learn_step(agent1,state1,score1)
        agent2,state2,action2,score2,done2 = dont_learn_step(agent2,state2,score2)




  super(Adam, self).__init__(name, **kwargs)


(210, 160, 1)
4
12
11
0
9
14
15
4
9
9
0
9
0
3
12
6
11
5
16
0
10
1
16
7
9
5
3
1
16
10
0
10
12
6
8
17
13
17
7
2
6
9
5
10
11
12
2
3
10
1
6
15
0
0
11
16
17
3
12
10
2
17
7
7
10
2
15
10
2
10
10
14
17
4
13
9
12
9
15
16
14
2
6
4
6
9
16
11
3
9
1
7
8
7
7
12
11
15
3
6
5
4
16
13
6
4
5
11
13
17
14
8
7
2
6
12
12
6
10
15
17
5
0
4
14
16
11
5
10
11
3
7
8
7
5
12
13
6
3
14
10
3
12
8
11
2
15
8
5
8
15
14
1
15
7
17
13
5
9
15
4
13
7
7
3
13
0
0
5
0
10
3
6
14
6
17
1
4
11
0
11
4
11
12
2
16
0
16
12
3
14
6
15
12
4
9
0
0
5
16
5
14
17
1
4
1
6
13
17
2
2
10
0
3
0
9
16
3
13
16
2
12
11
16
3
6
11
13
5
6
12
10
0
15
15
1
11
1
14
16
11
13
7
0
15
14
14
12
13
inside learn
12
inside learn
8
2
17
9
5
1
12
inside learn
2
inside learn
5
1
16
9
11
13
14
inside learn
9
inside learn
1
9
0
8
14
6
9
inside learn
12
inside learn
11
7
6
1
6
3
17
inside learn
16
inside learn
2
7
16
0
16
8
5
inside learn
4
inside learn
11
15
8
15
8
11
4
inside learn
17
inside learn
13
6
2
6
4
1
13
inside learn
15
inside learn
11
5
3
13
11
17
7
inside lea

KeyboardInterrupt: 