In [14]:
from collections import namedtuple
import random
from typing import List, NamedTuple

import gym

import numpy as np
import tensorflow as tf
from tensorflow.keras import layers

In [11]:
Transition = namedtuple('Transition', 'state action reward next_state')

In [None]:
class VectorizeWrapper(gym.Wrapper):
    def __init__(self):
        pass
    
    def step(self):
        pass

In [12]:
class ReplayBuffer:
    def __init__(self, capacity: int = 100000):
        self.capacity = capacity
        self.buffer = []
        self.position = 0
        
    def push(self, transition):
        if len(self.buffer) < self.capacity:
            self.buffer.append(None)
        self.buffer[self.position] = transition
        self.position = (self.position + 1) % self.capacity
    
    def sample(self, batch_size: int) -> List[NamedTuple]:
        if len(self.buffer) < batch_size:
            raise ValueError(f"Can't sample {batch_size} num elements from buffer of size {self.buffer}")
        return random.sample(self.buffer, batch_size)
    
    def __len__(self):
        return len(self.buffer)

In [15]:
def create_model(state_dim: int, action_dim: int, hidden_sizes: List[int]):
    model = tf.keras.models.Sequential()
    model.add(layers.InputLayer(input_shape=state_dim))
    for hidden_size in hidden_sizes:
        model.add(layers.Dense(hidden_size))
        model.add(layers.LeakyReLU())
    model.add(layers.Dense(action_dim))
    
    return model

def sync_models(model1, model2):
    model2.set_weights(model1.get_weights())

In [None]:
class Agent:
    def __init__(self, state_dim: int, action_dim: int):
        self.state_dim = state_dim
        self.action_dim = action_dim
        
    def act(self, state):
        raise NotImplementedError()
        
    def update(self, transitions):
        raise NotImplementedError()

In [None]:
class DQN(Agent):
    def __init__(self, state_dim: int, action_dim: int, hidden_sizes: List[int], gamma: float=0.95):
        super(DQN, self).__init__(state_dim, action_dim)
        self.q_net = create_model(state_dim, action_dim, hidden_sizes)
        self.optimizer = tf.keras.optimizers.Adam()
        
    def act(self, state):
        pass
    
    def _prepare_batches(transitions):
        pass
    
    def update(self, transitions):
        with tf.GradientTape as tape:
            Q_pred = self.q_net()

In [None]:
class DDQN:
    def __init__(self, state_dim: int, action_dim: int, hidden_sizes: List[int], gamma: float=0.95):
        super(DDQN, self).__init__(state_dim, action_dim)
        self.q_net = create_model(state_dim, action_dim, hidden_sizes)
        self.target_net = create_model(state_dim, action_dim, hidden_sizes)
        sync_models(self.q_net, self.target_net)
        self.optimizer = tf.keras.optimizers.Adam()
    
    def act(self, state):
        pass
    
    def update(self):
        with tf.GradientTape as tape:
            pass