<a href="https://colab.research.google.com/github/ExplorerLGD/StupidAI/blob/master/MYDQN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
import random
import numpy as np
import math
from collections import namedtuple
from itertools import count
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T

# if gpu is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [0]:
class Net(nn.Module):
  def __init__(self,):
    super(Net,self).__init__()
    self.fc1=nn.Linear(1,50)
    self.fc1.weight.data.normal_(0,0.1)
    self.out=nn.Linear(50,1)
    self.fc1.weight.data.normal_(0,0.1)
  def forward(self,x):
    x=self.fc1(x)
    x=F.relu(x)
    x=self.out(x)
    actions_value=torch.floor(torch.tanh(x) * 10.0)
    return actions_value
  

In [0]:
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))


class ReplayMemory(object):

    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args):
        """Saves a transition."""
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [0]:
BATCH_SIZE = 128
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 200
TARGET_UPDATE = 10

policy_net = Net().to(device)
target_net = Net().to(device)

memory=ReplayMemory(10000)
steps_done=0

def select_action(state):
  global steps_done
  sample=random.random()
  eps_threshold=EPS_END + (EPS_START - EPS_END) * math.exp(-1. * steps_done / EPS_DECAY)
  if sample>eps_threshold:
    with torch.no_grad():
      return policy_net(state).view(1, 1)
  else:
    return torch.tensor([[random.randrange(-10,10)]], device=device, dtype=torch.float)

In [0]:
def optimize_model():
  if len(memory)<BATCH_SIZE:
    return 
  transitions=memory.sample(BATCH_SIZE)
  batch=Transition(*zip(*transitions))
  
  print(batch.next_state[0].size())
  print(batch.next_state)

  non_final_mask=torch.tensor(tuple(map(lambda s: s is not None,batch.next_state)),device=device,dtype=torch.uint8)
  non_final_next_states=torch.cat([s for s in batch.next_state if s is not None])
  
  state_batch=torch.cat(batch.state)
  action_batch=torch.cat(batch.action)
  reward_batch=torch.cat(batch.reward)
  
  state_action_values=policy_net(state_batch).gather(1,action_batch)
  next_state_values=torch.zeros(BATCH_SIZE,device=device)
  
  next_state_values[non_final_mask]=target_net(non_final_next_states).max(1)[0].detach()
  expected_state_action_values=(next_state_values*GAMMA)+reward_batch
  
  loss=F.smooth_l1_loss(state_action_values,expected_state_action_values.unsqueeze(1))
  optimizer.zero_grad()
  loss.backward()
  for param in policy_net.parameters():
    param.grad.data.clamp_(-1,1)
  optimizer.step()
 


In [172]:
num_episodes=50
for i_episode in range(num_episodes):
  state=torch.tensor([[random.randrange(10)]], device=device, dtype=torch.float)
  for t in count():
    action=select_action(state)
    print(action)
    #_,reward,done,_=env.step(action.item())
    #write reward system,set final state is 10
    final_state=torch.tensor([[10]], device=device, dtype=torch.float)
    reward=-abs(final_state-(state+action))
    #if reward=0,that means get final state
    
    #observe new state
    if reward!=0:
      next_state=state+action
    else:
      next_state=None
      print("Game Over")
      
    memory.push(state,action,next_state,reward)
    state=next_state
    
    optimize_model()
    if reward==0:
      break
  if i_episode % TARGET_UPDATE==0:
    target_net.load_state_dict(policy_net.state_dict())


tensor([[-4.]], device='cuda:0')
tensor([[-8.]], device='cuda:0')
tensor([[7.]], device='cuda:0')
tensor([[7.]], device='cuda:0')
tensor([[9.]], device='cuda:0')
tensor([[7.]], device='cuda:0')
tensor([[3.]], device='cuda:0')
tensor([[-10.]], device='cuda:0')
tensor([[-6.]], device='cuda:0')
tensor([[2.]], device='cuda:0')
tensor([[-10.]], device='cuda:0')
tensor([[0.]], device='cuda:0')
tensor([[0.]], device='cuda:0')
tensor([[0.]], device='cuda:0')
tensor([[-4.]], device='cuda:0')
tensor([[-1.]], device='cuda:0')
tensor([[2.]], device='cuda:0')
tensor([[7.]], device='cuda:0')
tensor([[4.]], device='cuda:0')
tensor([[-8.]], device='cuda:0')
tensor([[7.]], device='cuda:0')
tensor([[0.]], device='cuda:0')
tensor([[-3.]], device='cuda:0')
tensor([[-7.]], device='cuda:0')
tensor([[5.]], device='cuda:0')
tensor([[3.]], device='cuda:0')
tensor([[8.]], device='cuda:0')
tensor([[2.]], device='cuda:0')
tensor([[1.]], device='cuda:0')
tensor([[7.]], device='cuda:0')
tensor([[-6.]], device='cuda

RuntimeError: ignored

In [0]:
#以下均为测试代码，羞羞勿看 O(∩_∩)O

In [0]:
class Net(nn.Module):
  def __init__(self,):
    super(Net,self).__init__()
    self.fc1=nn.Linear(1,50)
    self.fc1.weight.data.normal_(0,0.1)
    self.out=nn.Linear(50,1)
    self.fc1.weight.data.normal_(0,0.1)
  def forward(self,x):
    x=self.fc1(x)
    x=F.relu(x)
    actions_value=self.out(x)
    return torch.floor(torch.tanh(actions_value) * 10.0)
net=Net().to(device)
a=net(torch.tensor([[10],[5]], device=device, dtype=torch.float))
print(a.max(0)[0])


In [0]:
x=Transition(1,2,3,4)
y=Transition(5,6,7,8)
test=[x,y]

print(tuple(map(lambda s:s is not None,test)))


In [0]:
a=torch.tensor([[2,3]])
b=torch.tensor([[4,5]])
if a.size() !=torch.size([1,2]):
  print("false")
print(a.size())
print(c)