In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
import json
from sklearn.model_selection import train_test_split
import torch.nn.utils as torch_utils
from torch.nn.functional import one_hot
import matplotlib.pyplot as plt
import random

In [2]:
class Net(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.ln1 = nn.LayerNorm(hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.ln2 = nn.LayerNorm(hidden_size)
        self.fc3 = nn.Linear(hidden_size, output_size)
        
        nn.init.kaiming_uniform_(self.fc1.weight, nonlinearity='relu')
        nn.init.kaiming_uniform_(self.fc2.weight, nonlinearity='relu')

    def forward(self, x):
        out = self.fc1(x)
        out = self.ln1(out)
        out = self.relu(out)
        out = self.fc2(out)
        out = self.ln2(out)
        out = self.relu(out)    
        out = self.fc3(out)
        return nn.functional.softmax(out, dim=-1).squeeze()

class Memory:
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []

    def push(self, command, action, reward):
        if len(self.memory) >= self.capacity:
               self.memory.pop(0)
        self.memory.append((command, action, reward))

    def sample(self, batch_size):
        return random.sample(self.memory, min(len(self.memory), batch_size))
        if len(self.memory) < 2:
            return []
        if len(self.memory) >= 2*batch_size:
            return random.sample(self.memory, batch_size)
        return random.sample(self.memory, int(len(self.memory)/2))

In [13]:
def cmd_to_tensor(cmd):
    return one_hot(torch.tensor(cmd), 4).float()

def get_move(command):
    with torch.no_grad():
        output = model(cmd_to_tensor(command))
        print(f"""
        0: {model(cmd_to_tensor(0))}
        1: {model(cmd_to_tensor(1))}
        2: {model(cmd_to_tensor(2))}
        3: {model(cmd_to_tensor(3))}
        """)
        return torch.multinomial(output, 1).item()
    
def _get_reward_and_lr(move, model_output, good):
    p = model_output[move]
    
    if p > 0.9 and good:
        return None, None
    # print(f"""
    # {p}
    # {move}
    # {good}
    # """)
    min_c = 1e-6
    max_c = 0.05
    m = max_c-1*min_c
    q = min_c
    if good:
        m = -1*m
        q = max_c
    lr = m*p + q
    
    min_c = 1
    max_c = 10
    if not good:
        min_c = -10
        max_c = -1
    m = max_c-1*min_c
    q = min_c
    if good:
        m = -1*m
        q = max_c
    reward = m*p + q
    
    # print(reward.item(), lr.item())
    return reward.item(), lr.item()
    return reward.item(), 0.05
    
def _base_train(command, move, good, is_from_memory):
    command_t = cmd_to_tensor(command)
    output_prob = model(command_t)
    
    move_t = torch.tensor(move).long()
    if is_from_memory and output_prob[move_t]  >= 0.6:
        return
    
    reward, lr = _get_reward_and_lr(move, output_prob, good)
    if reward is None:
        return
    reward_t = torch.tensor(reward).float()
    
    
    loss = -torch.log(output_prob[move_t])*reward_t
    
    for param_group in optimizer.param_groups:
            param_group['lr'] = lr
    
    optimizer.zero_grad()
    loss.backward()
    torch_utils.clip_grad_norm_(model.parameters(), max_norm=0.8)
    optimizer.step()

def replay_memory():
    for command, move, reward in memory.sample(100):
        _base_train(command, move, reward, True)
        
def train(command, move, good):
    _base_train(command, move, good, False)
    replay_memory()
    memory.push(command, move, good)

device = torch.device('cpu')
model = Net(4, 4, 4).to(device)
memory = Memory(100)
optimizer = optim.Adam(model.parameters(), lr=0.1)
# optimizer = optim.RMSprop(model.parameters(), lr=0.001, alpha=0.9)

In [38]:
train_data = [1]*2

for c in train_data:
    m = get_move(c)
    print(f"{c} -> {m}")
    reward = False
    if m == c:
        print(f"goood")
        reward = True
    train(c, m, reward)


        0: tensor([6.4740e-01, 1.0272e-06, 3.5205e-01, 5.4929e-04])
        1: tensor([0.0139, 0.9668, 0.0055, 0.0138])
        2: tensor([1.6709e-02, 5.9168e-05, 9.0881e-01, 7.4421e-02])
        3: tensor([5.6614e-06, 6.5245e-03, 1.2714e-02, 9.8076e-01])
        
1 -> 1
goood

        0: tensor([6.2902e-01, 1.0503e-06, 3.7037e-01, 6.0815e-04])
        1: tensor([0.0119, 0.9693, 0.0052, 0.0136])
        2: tensor([1.1026e-02, 7.8536e-05, 8.7985e-01, 1.0905e-01])
        3: tensor([3.9183e-06, 6.4443e-03, 1.0002e-02, 9.8355e-01])
        
1 -> 1
goood


In [515]:
print(f"""
0: {model(cmd_to_tensor(0))}
1: {model(cmd_to_tensor(1))}
2: {model(cmd_to_tensor(2))}
3: {model(cmd_to_tensor(3))}
""")


0: tensor([2.9766e-04, 7.0282e-04, 9.9840e-01, 6.0219e-04],
       grad_fn=<SqueezeBackward0>)
1: tensor([0.2839, 0.2210, 0.2887, 0.2063], grad_fn=<SqueezeBackward0>)
2: tensor([0.2839, 0.2210, 0.2887, 0.2063], grad_fn=<SqueezeBackward0>)
3: tensor([0.2839, 0.2210, 0.2887, 0.2063], grad_fn=<SqueezeBackward0>)

