<a href="https://colab.research.google.com/github/HemantharajM/AI-Material/blob/master/Multi_agent_planning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
import sys
import matplotlib.pyplot as plt
import numpy as np
import torch
from torch import autograd
from torch.utils.data import DataLoader
from torch.nn.parameter import Parameter
import torch.nn.functional as F

In [3]:
from google.colab import files
uploaded = files.upload()

Saving test_dataset.npz to test_dataset.npz


In [0]:
train = []
with np.load('train_dataset.npz',mmap_mode='r',allow_pickle=True) as f:
    train = f['arr_0']

In [0]:
test = []
with np.load('test_dataset.npz',mmap_mode='r',allow_pickle=True) as f:
    test = f['arr_0']

In [0]:
train_data = []
for i in train:
    env = (i[0] == 1.0) + 0.0
    goal = (i[0] == 0.5) + 0.0
    agent = i[1]
    domain = np.array([env,goal,agent])
    train_data.append([torch.tensor(domain),torch.tensor(i[2]),torch.tensor(i[3])])

In [0]:
test_data = []
for i in test:
    env = (i[0] == 1.0) + 0.0
    goal = (i[0] == 0.5) + 0.0
    agent = i[1]
    domain = np.array([env,goal,agent])
    test_data.append([torch.tensor(domain),torch.tensor(i[2]),torch.tensor(i[3])])

In [0]:
train_dl = DataLoader(train_data,batch_size=128,shuffle=True)
test_dl = DataLoader(test_data,batch_size=128,shuffle=False)

MODEL

In [0]:
class MA_VIN(torch.nn.Module):
    def __init__(self, k):
        super(MA_VIN, self).__init__()
        self.k = k
        self.h = torch.nn.Conv2d(
            in_channels=3,
            out_channels=150,
            kernel_size=(3, 3),
            stride=1,
            padding=1,
            bias=True)
        self.r = torch.nn.Conv2d(
            in_channels=150,
            out_channels=1,
            kernel_size=(1, 1),
            stride=1,
            padding=0,
            bias=False)
        self.q = torch.nn.Conv2d(
            in_channels=1,
            out_channels=20,
            kernel_size=(3, 3),
            stride=1,
            padding=1,
            bias=False)
        self.fc = torch.nn.Linear(in_features=20, out_features=5, bias=False)
        self.w = Parameter(torch.zeros(20, 1, 3, 3), requires_grad=True)
        self.sm = torch.nn.Softmax(dim=1)

    def forward(self, X, S1, S2):
        h = self.h(X)
        r = self.r(h)
        q = self.q(r)
        v, _ = torch.max(q, dim=1, keepdim=True)
        for i in range(0, self.k - 1):
            q = F.conv2d(
                torch.cat([r, v], 1),
                torch.cat([self.q.weight, self.w], 1),
                stride=1,
                padding=1)
            v, _ = torch.max(q, dim=1, keepdim=True)

        q = F.conv2d(
            torch.cat([r, v], 1),
            torch.cat([self.q.weight, self.w], 1),
            stride=1,
            padding=1)

        slice_s1 = S1.long().expand(8, 1, 20, q.size(0))
        slice_s1 = slice_s1.permute(3, 2, 1, 0)
        q_out = q.gather(2, slice_s1).squeeze(2)

        slice_s2 = S2.long().expand(1, 20, q.size(0))
        slice_s2 = slice_s2.permute(2, 1, 0)
        q_out = q_out.gather(2, slice_s2).squeeze(2)

        logits = self.fc(q_out)
        return logits, self.sm(logits)

In [0]:
model = MA_VIN(15)

In [55]:
model.cuda()

MA_VIN(
  (h): Conv2d(3, 150, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (r): Conv2d(150, 1, kernel_size=(1, 1), stride=(1, 1), bias=False)
  (q): Conv2d(1, 20, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
  (fc): Linear(in_features=20, out_features=5, bias=False)
  (sm): Softmax()
)

Loss

In [0]:
Criterion = torch.nn.CrossEntropyLoss()

Optimizer

In [0]:
optimizer = torch.optim.RMSprop(model.parameters(),lr=0.0001)

Training

In [0]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "CPU")

In [86]:
N_Epoch = 10

for i in range(N_Epoch):
    running_loss = 0.0
    correct = 0
    
    model.train()
    
    train_iter = iter(train_dl)
    
    for j in range(294):
    
        X,S,label = next(train_iter)
        
        X = X.float()
        X = X.to(device)
        
        S = S.to(device)
        
        label = label.to(device)
        
        optimizer.zero_grad()
        
        output, prediction = model(X.float(),S[:,0],S[:,1])
        
        loss = Criterion(output,label)
        
        loss.backward()
        
        optimizer.step()
        
        running_loss += loss.item()
        
        _,index = torch.max(output,dim=1)
        
        correct += torch.eq(index, label).sum().item()
    
    print(f"Epoch:{i}")
    print(f'Average_loss :{running_loss / 294}')
    print(f"Accuracy : {correct / len(train_data)}")
    

Epoch:0
Average_loss :0.5514496544591424
Accuracy : 0.7568638927317229
Epoch:1
Average_loss :0.55153838317005
Accuracy : 0.7568904969671172
Epoch:2
Average_loss :0.5513623802840304
Accuracy : 0.7561721826114718
Epoch:3
Average_loss :0.5513094618004195
Accuracy : 0.7568106842609343
Epoch:4
Average_loss :0.5514152014539355
Accuracy : 0.7568904969671172
Epoch:5
Average_loss :0.551671024487943
Accuracy : 0.756198786846866
Epoch:6
Average_loss :0.5515391626933805
Accuracy : 0.7576088113227626
Epoch:7
Average_loss :0.5516318368668459
Accuracy : 0.7568904969671172
Epoch:8
Average_loss :0.5516545055877595
Accuracy : 0.7565978503777802
Epoch:9
Average_loss :0.5514865734544742
Accuracy : 0.7562519953176545


In [92]:

running_loss = 0.0
correct = 0
    
model.eval()
    
for j in test_dl:
    
  X,S,label = j[0],j[1],j[2]
        
  X = X.float()
  X = X.to(device)
        
  S = S.to(device)
        
  label = label.to(device)
        
  output, prediction = model(X.float(),S[:,0],S[:,1])
        
  _,index = torch.max(output,dim=1)
        
  correct += torch.eq(index, label).sum().item()
    
print(f"Test_data")
print(f"Accuracy : {correct / len(test_data)}")
print(correct,len(test_data))
    

Test_data
Accuracy : 0.7287309325365624
13902 19077


In [0]:
mod = MA_VIN(15)

In [97]:
mod.cuda()

MA_VIN(
  (h): Conv2d(3, 150, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (r): Conv2d(150, 1, kernel_size=(1, 1), stride=(1, 1), bias=False)
  (q): Conv2d(1, 20, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
  (fc): Linear(in_features=20, out_features=5, bias=False)
  (sm): Softmax()
)

In [98]:

running_loss = 0.0
correct = 0
    
model.eval()
    
for j in test_dl:
    
  X,S,label = j[0],j[1],j[2]
        
  X = X.float()
  X = X.to(device)
        
  S = S.to(device)
        
  label = label.to(device)
        
  output, prediction = mod(X.float(),S[:,0],S[:,1])
        
  _,index = torch.max(output,dim=1)
        
  correct += torch.eq(index, label).sum().item()
    
print(f"Test_data")
print(f"Accuracy : {correct / len(test_data)}")
print(correct,len(test_data))
    

Test_data
Accuracy : 0.261990879069036
4998 19077


In [0]:
from  google.colab import files
files.download('example.txt')