In [1]:
import sys
sys.path.append('../fairseq-l-system-captioning/')
import torch
from torch import nn
from hhi_pl_utils import Experiment
from typing import Union, List

from pycore.blocks import *

In [2]:
e = Experiment()
e.setup('../fairseq-l-system-captioning/configs/simplest/slim_lstm_normal.yml')
e.init_classes()
e.data.setup()

e.model.vocab = e.data.vocab

model = e.model

Global seed set to 1234
GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


In [3]:
class Module:

    def __init__(self, module: nn.Module, parent=None) -> None:
        self.parent = parent
        self.children: List[Module] = []
        self.module = module
    
    def bfs(self):
        queue = [self]
        while len(queue) > 0:
            queue.extend(queue[0].children)
            yield queue.pop(0)
    
    def __repr__(self) -> str:
        return f'Module: {str(type(self.module))}'

def create_graph(model: nn.Module,
                 ignore: List[nn.Module]=[]
    ) -> Union[Module, None]:

    t = str(type(model))
    if ('loss' in t)\
        or ('vocab' in t)\
        or (type(model) in ignore):
        return None

    children = []

    root = Module(model)
    for child in model.children():
        if 'torch.nn.modules.container.Sequential' in str(type(child)):
            for child_seq in child.children():
                child_mod = create_graph(child_seq)
                if child_mod is not None:
                    root.children.append(child_mod)
        else:
            child_mod = create_graph(child)
            if child_mod is not None:
                root.children.append(child_mod)

    return root

graph = create_graph(model)

for c in graph.bfs():
    print(type(c.module))

<class 'lsystem_modules.slim_lstm.SlimLstm'>
<class 'seq2seq.encoder.slim.SlimCNN'>
<class 'seq2seq.decoder.lstm.LSTM'>
<class 'torch.nn.modules.conv.LazyConv2d'>
<class 'torch.nn.modules.batchnorm.LazyBatchNorm2d'>
<class 'torch.nn.modules.activation.ReLU'>
<class 'torch.nn.modules.pooling.MaxPool2d'>
<class 'torch.nn.modules.conv.LazyConv2d'>
<class 'torch.nn.modules.batchnorm.LazyBatchNorm2d'>
<class 'torch.nn.modules.activation.ReLU'>
<class 'torch.nn.modules.pooling.MaxPool2d'>
<class 'torch.nn.modules.conv.LazyConv2d'>
<class 'torch.nn.modules.batchnorm.LazyBatchNorm2d'>
<class 'torch.nn.modules.activation.ReLU'>
<class 'torch.nn.modules.pooling.MaxPool2d'>
<class 'torch.nn.modules.linear.LazyLinear'>
<class 'torch.nn.modules.sparse.Embedding'>
<class 'torch.nn.modules.rnn.LSTM'>
<class 'torch.nn.modules.dropout.Dropout'>
<class 'torch.nn.modules.linear.Linear'>


In [9]:
mapping = {
    'torch.nn.modules.conv': ConvBlock,
    'torch.nn.modules.batchnorm': NormBlock,
    'torch.nn.modules.pooling': PoolBlock,
    'torch.nn.modules.linear': LinearBlock,
    'torch.nn.modules.rnn': LSTMBlock,
    'torch.nn.modules.sparse.Embedding': EmbeddingBlock
}

it = iter(e.data.train_dataloader())
batch = next(it)

In [5]:
from typing import List, Tuple, Optional, Union
import os.path as osp
import numpy as np
from torch import Tensor, nn, Size

from pycore.blocks import Block, Begin, ConvBlock, PoolBlock, Connection, End

from torchvision.utils import save_image

class Architecure:

    @property
    def inputs(self) -> List[str]:
        return self._inputs
    
    @inputs.setter
    def inputs(self, tensors: Union[Tensor, Tuple[Tensor]]) -> None:
        if isinstance(tensors, Tensor):
            tensors = [tensors]
        
        for i, t in enumerate(tensors):
            if t.ndim > 3:
                im_path = self.image_path.replace('{i}', str(i))
                save_image(t[0], im_path)
                self._blocks.append(InputBlock(i,
                                               im_path,
                                               to=(-3, 0, 10*i),
                                               size=(self._size[1], self._size[2], 0)
                                               ))
                self._inputs.append(f'Input_{i}')
            else:
                self._blocks.append(LinearBlock(i,
                                                to=(-3, 0, 10*i),
                                                size=(self._size[0], 10, self._size[1])))
                self._inputs.append(f'Linear_{i}')

    def __init__(self,
                 start_size=(2, 64, 64),
                 pool_factor=0.8,
                 conv_factor=0.8,
                 image_path='./input_{i}.png') -> None:
        self._blocks: List[Block] = [
            Begin(),
        ]
        self._blocks_buffer: List[Tuple[nn.Module, Size]] = []
        self._size = np.array(start_size)
        self._tensor_size = None

        self.image_path = image_path
        self.pool_reduction = np.array([1, pool_factor, pool_factor])
        self.conv_expansion = np.array([conv_factor, 1, 1])

        self._inputs = []
    
    def __call__(self, module: nn.Module, x: Union[Tensor, Tuple[Tensor]]) -> None:
        print(type(module), x[0].shape)
        if len(self.inputs) == 0:
            self.inputs = x

        if self._tensor_size is None and isinstance(x, Tensor):
            self._tensor_size = x.shape
        
        same_depth = torch.allclose()
        
        if 'pooling' not in str(type(module)) and\
           'Linear' not in str(type(module)) and\
            #TODO: test if tensor size changed
            self._blocks_buffer.append((module, self._tensor_size))
        else:
            #TODO: add fused blocks correctly, calculate new sizes correctly (change in channels; and width, height by pooling)
            for mod, tensor_size in self._blocks_buffer:
                if tensor_size

    def connect(self, block1, block2) -> None:
        self._blocks.append(Connection(block1, block2))
    
    def finalize(self) -> str:
        if not isinstance(self._blocks[-1], End):
            self._blocks.append(End())
        out = ''
        for b in self._blocks:
            out += f'\n{b}'
        return out

In [6]:
arch = Architecure()

for c in graph.bfs():
    if 'torch' in str(type(c.module)):
        c.module.register_forward_pre_hook(arch)

In [7]:
model.eval()
model.validation_step(batch, 0)

{'loss': tensor(1.9590, grad_fn=<NllLossBackward0>)}

In [8]:
print(arch.finalize())



\documentclass[border=8pt, multi, tikz]{standalone}
\usepackage{import}
\subimport{c:\Users\magnusson\Documents\projects\pytorch2tikz\pycore\layers}{init}
\usetikzlibrary{positioning}
\usetikzlibrary{3d} %for including external image

\def\ConvColor{rgb:yellow,5;red,2.5;white,5}
\def\LinearColor{rgb:blue,5;red,2.5;white,5}
\def\LstmColor{rgb:yellow,5;red,2.5;white,5}
\def\ActivationColor{rgb:yellow,5;red,5;white,5}
\def\PoolColor{rgb:red,1;black,0.3}
\def\NormColor{rgb:red,1;black,0.3}
\def\UnpoolColor{rgb:blue,2;green,1;black,0.3}
\def\FcReluColor{rgb:blue,5;red,5;white,4}
\def\SoftmaxColor{rgb:magenta,5;black,7}
\def\SumColor{rgb:blue,5;green,15}

\newcommand{\copymidarrow}{\tikz \draw[-Stealth,line width=0.8mm,draw={rgb:blue,4;red,1;green,1;black,3}] (-0.3,0) -- ++(0.3,0);}

\begin{document}
\begin{tikzpicture}
\tikzstyle{connection}=[ultra thick,every node/.style={sloped,allow upside down},draw=\edgecolor,opacity=0.7]
\tikzstyle{copyconnection}=[ultra thick,every node/.style={slo