# Estrutura de decisão de Markov - atingindo estabilidade do sistema

In [None]:
##############################################################################################
# ------------------------ ENTENDENDO A EQUAÇÃO DE CHAPMAN KOLMOGOROV ------------------------
##############################################################################################

# %%
# Passo 1) Importa as bibliotecas necessárias ................................................
import pandas as pd
import numpy as np

# %%
# Passo 2) Gera um data-frame dinamicamente ..................................................
df = pd.DataFrame()
for i in range(20):
    df['var' + str(i)] = np.random.poisson(size = 100000)
df

# %%
# Passo 3) Cria uma matriz para servir como input da função .................................
matriz_teste = np.array([[0.34,0.26,0.4],
                         [0.03,0.37,0.6],
                         [0.8,0.1,0.1]])
type(matriz_teste)

# %%
# Passo 4) Implementa a Equação de Chapman-Kolmogorov .......................................
def calcula_n_passos(matriz, passos):

    matriz = np.array(matriz, dtype = float)
    matriz_resultados = np.zeros((matriz.shape[0],matriz.shape[1]), dtype = float)
    df_resultados = pd.DataFrame(columns = ['Passo',
                                            'matriz_atual'])

    for i in range(passos):
        if i == 0:
            matriz_resultados = matriz
        else:
            matriz_resultados = np.dot(matriz,matriz_resultados)

        df_resultados.loc[df_resultados.shape[0]] = [i,
                                                     matriz_resultados]

    return df_resultados

# %%
# Passo 5) Invoca a Equação de Chapman-Kolmogorov com uma matriz qualquer ...................
df1 = calcula_n_passos(matriz = matriz_teste,
                       passos = 20)
df1

# %%
# Passo 6) Implementa função que calcula a quantidade de passos .............................
def calcula_passos_estabilidade(matriz, casas_decimais):

    matriz = np.array(matriz, dtype = float)
    matriz_resultados = np.zeros((matriz.shape[0],matriz.shape[1]), dtype = float)
    df_resultados = pd.DataFrame(columns = ['Passo',
                                            'matriz_atual'])
    i = 0
    while True:
        if i == 0:
            matriz_resultados = matriz
        else:
            matriz_resultados = np.dot(matriz,matriz_resultados)

        df_resultados.loc[df_resultados.shape[0]] = [i,
                                                     matriz_resultados]
        if i >= 1:
            if np.all(np.round(df_resultados.iloc[i-1,1]/df_resultados.iloc[i,1], casas_decimais) == 1):
                break
        i += 1

    return df_resultados

# %%
# Passo 7) Invoca a função que calcula a quantidade de passos necessários ..................
df2 = calcula_passos_estabilidade(matriz = matriz_teste,
                                  casas_decimais = 8)
df2


# Problema do Labirinto
- Esse problema tem como base um labirinto 4x4, o aprendizado utiliza do sistema de recompensa com Q-Network para limitar o agente a não sair da dimenção 4x4 e também não colidir nos obstaculos (O), O = (1,1), (2,2), (3,3).
- Sendo que o objetivo é chegar no (4,4)

**OBS**
- Alguns pontos a serem ressaltados, para rodar em gpu no colab tem que mudar ali na parte onde conecta a maquina na hora de rodar.
- Se for rodar na GPU (placa de vídeo) estou utilizando CUDA, funciona apenas em placas de vídeo da NVIDIA.
- Aqui está conectando no drive pois queria criar uma pasta no meu drive com o arquivo já treinado. (Aqui no colab demorou 9 min para rodar o treinamento do agente em GPU)

**João Vinicius - Qualquer dúvida me mandem email. jvfcolombini@unifesp.br**

## GPU

### Importando bibliotecas

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

### Definindo o Ambiente

In [None]:
class MazeEnvironment:
    def __init__(self):
        self.size = 5
        self.start = (0, 0)
        self.goal = (4, 4)
        self.obstacles = [(1, 1), (2, 2), (3, 3)]
        self.agent_pos = self.start

    def reset(self):
        self.agent_pos = self.start
        return self.agent_pos

    def step(self, action):
        x, y = self.agent_pos

        if action == 0:  # Up
            x_new = x - 1
            y_new = y
        elif action == 1:  # Down
            x_new = x + 1
            y_new = y
        elif action == 2:  # Left
            x_new = x
            y_new = y - 1
        elif action == 3:  # Right
            x_new = x
            y_new = y + 1

        # Verificar se a nova posição é válida
        if (x_new, y_new) in self.obstacles:
            reward = -1  # Penalizar colisão com obstáculo
            done = False
            new_state = self.agent_pos
        elif x_new < 0 or x_new >= self.size or y_new < 0 or y_new >= self.size:
            reward = -1  # Penalizar sair dos limites do labirinto
            done = False
            new_state = self.agent_pos
        elif (x_new, y_new) == self.goal:
            reward = 10  # Recompensar alcançar o objetivo
            done = True
            new_state = self.goal
        else:
            reward = 0  # Nenhuma recompensa por mover-se para uma posição válida
            done = False
            new_state = (x_new, y_new)

        self.agent_pos = new_state
        return new_state, reward, done

### Definição da Rede Neural


In [None]:
class QNetwork(nn.Module):
    def __init__(self, input_size, output_size):
        super(QNetwork, self).__init__()
        self.fc = nn.Linear(input_size, 128)
        self.fc2 = nn.Linear(128, output_size)

    def forward(self, x):
        x = torch.relu(self.fc(x))
        x = self.fc2(x)
        return x

### Função para o treino

In [None]:
# Função de treinamento
def train(env, q_net, optimizer, num_episodes, gamma=0.99, epsilon_start=1.0, epsilon_end=0.01, epsilon_decay=0.995):
    q_net.cuda()  # Movendo a rede neural para a GPU
    for episode in range(num_episodes):
        state = env.reset()
        state = torch.tensor(state, dtype=torch.float32).cuda()  # Movendo o estado inicial para a GPU
        epsilon = max(epsilon_start * (epsilon_decay ** episode), epsilon_end)
        episode_path = [state.cpu().numpy()]  # Lista para armazenar o caminho seguido pelo agente

        while True:
            if np.random.rand() < epsilon:
                action = np.random.randint(0, 4)
            else:
                q_values = q_net(state.unsqueeze(0))
                action = torch.argmax(q_values).item()

            next_state, reward, done = env.step(action)
            next_state = torch.tensor(next_state, dtype=torch.float32).cuda()

            # Imprimir informações
            print("Epoch:", episode, "Next state:", next_state, "Reward:", reward)

            episode_path.append(next_state.cpu().numpy())  # Adicionando o próximo estado ao caminho

            target = reward + gamma * torch.max(q_net(next_state.unsqueeze(0))).item()
            q_value = q_net(state.unsqueeze(0))[0, action]

            loss = nn.MSELoss()(q_value, torch.tensor(target).cuda())

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            state = next_state

            if done:
                print("Episode:", episode)
                print("Path:", episode_path)
                break

    # Salvando o modelo treinado
    torch.save(q_net.state_dict(), "q_network_model.pth")

### Treino do agente

In [None]:
# Treinamento do agente
env = MazeEnvironment()
q_net = QNetwork(2, 4)
optimizer = optim.Adam(q_net.parameters(), lr=0.001)
train(env, q_net, optimizer, num_episodes=1000)

[1;30;43mA saída de streaming foi truncada nas últimas 5000 linhas.[0m
Epoch: 956 Next state: tensor([0., 4.], device='cuda:0') Reward: -1
Epoch: 956 Next state: tensor([0., 4.], device='cuda:0') Reward: -1
Epoch: 956 Next state: tensor([0., 4.], device='cuda:0') Reward: -1
Epoch: 956 Next state: tensor([0., 4.], device='cuda:0') Reward: -1
Epoch: 956 Next state: tensor([0., 4.], device='cuda:0') Reward: -1
Epoch: 956 Next state: tensor([0., 4.], device='cuda:0') Reward: -1
Epoch: 956 Next state: tensor([0., 4.], device='cuda:0') Reward: -1
Epoch: 956 Next state: tensor([0., 4.], device='cuda:0') Reward: -1
Epoch: 956 Next state: tensor([0., 4.], device='cuda:0') Reward: -1
Epoch: 956 Next state: tensor([0., 4.], device='cuda:0') Reward: -1
Epoch: 956 Next state: tensor([0., 4.], device='cuda:0') Reward: -1
Epoch: 956 Next state: tensor([0., 4.], device='cuda:0') Reward: -1
Epoch: 956 Next state: tensor([0., 4.], device='cuda:0') Reward: -1
Epoch: 956 Next state: tensor([0., 4.], dev

### Aplicação do modelo no teste

In [None]:
env = MazeEnvironment()
q_net = QNetwork(2, 4)
q_net.load_state_dict(torch.load("/content/drive/MyDrive/IC- RL/q_network_model.pth"))
q_net.cuda()  # Movendo o modelo para a GPU
state = env.reset()
state = torch.tensor(state, dtype=torch.float32).cuda()  # Movendo o estado inicial para a GPU
epsilon = 0.1  # Definindo o valor de epsilon
while True:
    if np.random.rand() < epsilon:
        action = np.random.randint(0, 4)  # Ação aleatória com probabilidade epsilon
    else:
        q_values = q_net(state.unsqueeze(0))
        action = torch.argmax(q_values).item()

    next_state, reward, done = env.step(action)
    next_state = torch.tensor(next_state, dtype=torch.float32).cuda()  # Movendo o próximo estado para a GPU

    print(next_state)

    state = next_state
    if done:
        print("Episode finished.")
        break

tensor([0., 1.], device='cuda:0')
tensor([0., 1.], device='cuda:0')
tensor([0., 2.], device='cuda:0')
tensor([1., 2.], device='cuda:0')
tensor([1., 3.], device='cuda:0')
tensor([2., 3.], device='cuda:0')
tensor([2., 4.], device='cuda:0')
tensor([3., 4.], device='cuda:0')
tensor([3., 4.], device='cuda:0')
tensor([3., 4.], device='cuda:0')
tensor([3., 4.], device='cuda:0')
tensor([3., 4.], device='cuda:0')
tensor([3., 4.], device='cuda:0')
tensor([3., 4.], device='cuda:0')
tensor([3., 4.], device='cuda:0')
tensor([3., 4.], device='cuda:0')
tensor([3., 4.], device='cuda:0')
tensor([3., 4.], device='cuda:0')
tensor([3., 4.], device='cuda:0')
tensor([3., 4.], device='cuda:0')
tensor([3., 4.], device='cuda:0')
tensor([3., 4.], device='cuda:0')
tensor([3., 4.], device='cuda:0')
tensor([3., 4.], device='cuda:0')
tensor([3., 4.], device='cuda:0')
tensor([3., 4.], device='cuda:0')
tensor([3., 4.], device='cuda:0')
tensor([3., 4.], device='cuda:0')
tensor([3., 4.], device='cuda:0')
tensor([3., 4.

## CPU

### Importando Bibliotecas

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

### Definindo o Ambiente

In [None]:
class MazeEnvironment:
    def __init__(self):
        self.size = 5
        self.start = (0, 0)
        self.goal = (4, 4)
        self.obstacles = [(1, 1), (2, 2), (3, 3)]
        self.agent_pos = self.start

    def reset(self):
        self.agent_pos = self.start
        return self.agent_pos

    def step(self, action):
        x, y = self.agent_pos

        if action == 0:  # Up
            x_new = x - 1
            y_new = y
        elif action == 1:  # Down
            x_new = x + 1
            y_new = y
        elif action == 2:  # Left
            x_new = x
            y_new = y - 1
        elif action == 3:  # Right
            x_new = x
            y_new = y + 1

        # Verificar se a nova posição é válida
        if (x_new, y_new) in self.obstacles:
            reward = -1  # Penalizar colisão com obstáculo
            done = False
            new_state = self.agent_pos
        elif x_new < 0 or x_new >= self.size or y_new < 0 or y_new >= self.size:
            reward = -1  # Penalizar sair dos limites do labirinto
            done = False
            new_state = self.agent_pos
        elif (x_new, y_new) == self.goal:
            reward = 10  # Recompensar alcançar o objetivo
            done = True
            new_state = self.goal
        else:
            reward = 0  # Nenhuma recompensa por mover-se para uma posição válida
            done = False
            new_state = (x_new, y_new)

        self.agent_pos = new_state
        return new_state, reward, done

### Definindo a Rede Neural

In [None]:
class QNetwork(nn.Module):
    def __init__(self, input_size, output_size):
        super(QNetwork, self).__init__()
        self.fc = nn.Linear(input_size, 128)
        self.fc2 = nn.Linear(128, output_size)

    def forward(self, x):
        x = torch.relu(self.fc(x))
        x = self.fc2(x)
        return x

### Função para o treino

In [None]:
def train(env, q_net, optimizer, num_episodes, gamma=0.99, epsilon_start=1.0, epsilon_end=0.01, epsilon_decay=0.995):
    for episode in range(num_episodes):
        state = env.reset()
        epsilon = max(epsilon_start * (epsilon_decay ** episode), epsilon_end)
        episode_path = [state]  # Lista para armazenar o caminho seguido pelo agente

        while True:
            if np.random.rand() < epsilon:
                action = np.random.randint(0, 4)
            else:
                q_values = q_net(torch.tensor(state, dtype=torch.float32).unsqueeze(0))
                action = torch.argmax(q_values).item()

            next_state, reward, done = env.step(action)

            # Imprimir informações
            print("Epoch:", episode, "Next state:", next_state, "Reward:", reward)

            episode_path.append(next_state)  # Adicionando o próximo estado ao caminho

            target = reward + gamma * torch.max(q_net(torch.tensor(next_state, dtype=torch.float32).unsqueeze(0))).item()
            q_value = q_net(torch.tensor(state, dtype=torch.float32).unsqueeze(0))[0, action]

            loss = nn.MSELoss()(q_value, torch.tensor(target))

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            state = next_state

            if done:
                print("Episode:", episode)
                print("Path:", episode_path)
                break

    # Salvando o modelo treinado
    torch.save(q_net.state_dict(), "q_network_model.pth")

### Treinando o Agente e Salvando o modelo

In [None]:
env = MazeEnvironment()
q_net = QNetwork(2, 4)
optimizer = optim.Adam(q_net.parameters(), lr=0.001)
train(env, q_net, optimizer, num_episodes=1000)

### Teste do agente

In [None]:
# Teste do agente treinado
env = MazeEnvironment()
q_net = QNetwork(2, 4)
q_net.load_state_dict(torch.load("/content/drive/MyDrive/IC- RL/q_network_model.pth"), map_location=torch.device('cpu'))

state = env.reset()
epsilon = 0.1  # Definindo o valor de epsilon
while True:
    if np.random.rand() < epsilon:
        action = np.random.randint(0, 4)  # Ação aleatória com probabilidade epsilon
    else:
        q_values = q_net(torch.tensor(state, dtype=torch.float32).unsqueeze(0))
        action = torch.argmax(q_values).item()

    next_state, reward, done = env.step(action)
    print(next_state)

    state = next_state
    if done:
        print("Episode finished.")
        break
