In [41]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

from dataclasses import dataclass

import random
from operator import itemgetter 

In [None]:
class DeepQNetwork(nn.Module):
    def __init__(self, learning_rate, state_shape, action_shape, n_dense_1, n_dense_2):
        super(DeepQNetwork, self).__init__()
        self.layer_1 =  nn.Linear(state_shape, n_dense_1)
        self.layer_2 =  nn.Linear(n_dense_1, n_dense_2)
        self.layer_3 =  nn.Linear(n_dense_2, action_shape, dtype= torch.float32)
        self.optimizer = torch.optim.Adam(self.parameters(), lr = learning_rate)
        self.cost_func = nn.MSELoss()
        
    def forward(self, state):
        x = F.relu(self.layer_1(state))
        x = F.relu(self.layer_2(x))
        actions = self.layer_3(x)
        return actions

@dataclass
class RLModelWrapper:
    state_shape:int
    is_training:bool

    current_state_memory:list = []
    nxt_state_memory:list = []
    actions_taken_memory: list = []
    action_indexes_memory : list = []
    reward_memory:list = []
    iteration_done_bool_memory: list = []

    EPSILON:float = 0.5 if is_training else 0
    epsilon_min:float = 0.01
    epsilon_decay:float = 0.01
    ALPHA:float= 0.05
    learning_rate:float = 0.001
    DISCOUNT:float = 0.95
    cost_tracker:float = 0

    start_action: float = 0.05
    end_action: float = 2.0
    action_gap: float = 0.05

    agent_actions: list = np.arange(start=start_action,stop=end_action,step=action_gap)
    n_actions: int = len(agent_actions)


    batch_size:int = 80
    batch_start:int = 0
    batch_end:int = batch_size

    main_model: DeepQNetwork = DeepQNetwork(learning_rate,state_shape, n_actions, n_dense_1=100, n_dense_2=100)
    target_model: DeepQNetwork = DeepQNetwork(learning_rate, state_shape, n_actions, n_dense_1=100, n_dense_2=100)
    target_model.load_state_dict(main_model.state_dict())

    def store_data(self, curr_obs, nxt_obs, action, reward, done) -> None:
        self.current_state_memory.append(curr_obs)
        self.nxt_state_memory.append(nxt_obs)
        self.actions_taken_memory.append(action)
        self.action_indexes_memory.append(self.action_mapping(action, reversed= True))
        self.reward_memory.append(reward)

        done_val = 1 if done else 0
        self.iteration_done_bool_memory.append(done_val)

    def action_mapping(self,action_or_index , reversed = False) -> int | float: 
        if reversed:
            # if want to get action index by the specific action
            return self.agent_actions.index(action_or_index)
        else:
            #if want to get action by specific action index
            return self.agent_actions[action_or_index]

    def take_action(self,observation):
        if np.random.uniform() > self.EPSILON:
            encoded = observation
            predicted = self.main_model.forward(torch.tensor(encoded))
            action_index = np.argmax(predicted.tolist())
            return self.action_mapping(action_index)
        else:
            action_index = random.choice(range(self.n_actions))
            return self.action_mapping(action_index)
        

    def train(self):
        current_cost_tracker = 0
        MIN_REPLAY_SIZE = self.batch_end + self.batch_size
        if len(self.iteration_done_bool_memory) < MIN_REPLAY_SIZE:
            return
        else:
            batch_sample = random.sample(list(range(len(self.dones_mem))), self.batch_size)
            batch_indexes = list(range(self.batch_size))

            self.main_model.optimizer.zero_grad()
            curr_obs_batch = torch.tensor(itemgetter(*batch_sample)(self.current_state_memory), dtype= torch.float32)
            nxt_obs_batch  = torch.tensor(itemgetter(*batch_sample)(self.nxt_state_memory), dtype= torch.float32)
            actions_indexes_batch = itemgetter(*batch_sample)(self.action_indexes_memory)
            rewards_batch = torch.tensor(itemgetter(*batch_sample)(self.reward_memory), dtype= torch.float32)
            dones_batch = itemgetter(*batch_sample)(self.iteration_done_bool_memory)  # [0,0,0,1]
            not_dones_batch = torch.tensor(np.ones(len(dones_batch))-dones_batch, dtype= torch.float32) # [1,1,1,0]
            
            curr_qs_batch = self.main_model.forward(curr_obs_batch)[batch_indexes, actions_indexes_batch]
            nxt_qs_batch = self.target_model.forward(nxt_obs_batch)

            q_target = torch.add(rewards_batch,not_dones_batch*self.DISCOUNT*torch.max(nxt_qs_batch, dim=1)[0])
            # changed_q = curr_qs_batch + ALPHA * (q_target - curr_qs_batch)
            cost = self.main_model.cost_func(curr_qs_batch, q_target)
            current_cost_tracker+=cost
            cost.backward()
            self.main_model.optimizer.step()

            self.batch_start = self.batch_end
            self.batch_end += self.batch_size
            self.cost_tracker = current_cost_tracker


In [None]:
[0.25 , ]

In [39]:
np.arange(1,2,0.2)

5

In [45]:
check_lst = list(range(10,21))

itemgetter(*[0,2,5])(check_lst)

(10, 12, 15)