In [1]:
from pypetri.elements import *
from pypetri.petri_net import *
from pypetri.example_nets import *
import math

import numpy as np
import torch
import torch.nn as nn
from torch.nn import functional as F
from torch.distributions import Categorical
import matplotlib.pyplot as plt
import collections
import random

from tqdm import tqdm


In [2]:
efm_net = EmptyNet('efm_net')
initial_file_path = 'initial_file/neural_petri_net.csv'
efm_net.init_by_csv(initial_file_path)

In [3]:
class ReplayBuffer():
    def __init__(self, capacity):
        self.buffer = collections.deque(maxlen=capacity)

    def __len__(self):
        return len(self.buffer)
    
    def add(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
        
    def sample(self, batch_size):
        transitions = random.sample(self.buffer, batch_size)
        state, action, reward, next_state, done = zip(*transitions)
        return np.array(state), action, reward, np.array(next_state), done
    
    def size(self):
        return len(self.buffer)
        

In [4]:
class N2N(nn.Module):
    def __init__(self, dim_in, dim_out):
        super(N2N, self).__init__()
        self.weight = nn.Parameter(torch.Tensor(dim_out, dim_in).float())
        self.bias = nn.Parameter(torch.Tensor(dim_out).float())
        self.reset_parameters()
    
    def reset_parameters(self):
        nn.init.kaiming_uniform_(self.weight, a=math.sqrt(5))
        if self.bias is not None:
            fan_in, _ = nn.init._calculate_fan_in_and_fan_out(self.weight)
            bound = 1 / math.sqrt(fan_in)
            nn.init.uniform_(self.bias, -bound, bound)
    
    def forward(self, x):
        x = torch.matmul(x, self.weight.t()) + self.bias.unsqueeze(0)
        x = F.leaky_relu(x)
        return x
    
class T2P(nn.Module):
    def __init__(self, dim_in, dim_out, adj_pt):
        super(T2P, self).__init__()
        self.adj_pt = torch.from_numpy(adj_pt).float()
        self.weight = nn.Parameter(torch.Tensor(dim_out, dim_in).float())
        self.bias = nn.Parameter(torch.Tensor(dim_out).float())
        self.reset_parameters()
    
    def reset_parameters(self):
        nn.init.kaiming_uniform_(self.weight, a=math.sqrt(5))
        if self.bias is not None:
            fan_in, _ = nn.init._calculate_fan_in_and_fan_out(self.weight)
            bound = 1 / math.sqrt(fan_in)
            nn.init.uniform_(self.bias, -bound, bound)
            
    def forward(self, x):
        x = torch.matmul(self.adj_pt, x)
        x = torch.matmul(x, self.weight.t()) + self.bias.unsqueeze(0)
        x = F.leaky_relu(x)
        x = torch.matmul(x, self.adj_pt)
        return x
        

In [5]:
P2P_layer = N2N(3, 5)
T2T_layer = N2N(3, 4)

input = efm_net.get_state()[0]
input = torch.from_numpy(input).float()
output = P2P_layer(input)
print(output)

input = efm_net.get_state()[1]
input = torch.from_numpy(input).float()
output = T2T_layer(input)
print(output)


tensor([[ 0.0396,  0.6403,  0.4018,  0.0186,  0.9943],
        [ 0.0396,  0.6403,  0.4018,  0.0186,  0.9943],
        [ 0.0396,  0.6403,  0.4018,  0.0186,  0.9943],
        [ 0.0396,  0.6403,  0.4018,  0.0186,  0.9943],
        [ 0.0396,  0.6403,  0.4018,  0.0186,  0.9943],
        [ 0.0396,  0.6403,  0.4018,  0.0186,  0.9943],
        [ 0.0396,  0.6403,  0.4018,  0.0186,  0.9943],
        [ 0.0396,  0.6403,  0.4018,  0.0186,  0.9943],
        [ 0.0396,  0.6403,  0.4018,  0.0186,  0.9943],
        [ 0.0396,  0.6403,  0.4018,  0.0186,  0.9943],
        [ 0.0396,  0.6403,  0.4018,  0.0186,  0.9943],
        [ 0.0396,  0.6403,  0.4018,  0.0186,  0.9943],
        [ 0.0396,  0.6403,  0.4018,  0.0186,  0.9943],
        [ 0.0396,  0.6403,  0.4018,  0.0186,  0.9943],
        [ 0.0396,  0.6403,  0.4018,  0.0186,  0.9943],
        [ 0.0396,  0.6403,  0.4018,  0.0186,  0.9943],
        [ 0.0396,  0.6403,  0.4018,  0.0186,  0.9943],
        [ 0.0396,  0.6403,  0.4018,  0.0186,  0.9943],
        [ 

In [6]:
class GPNQNet(torch.nn.Module):
    def __init__(self):
        super(GPNQNet, self).__init__()
        