Skip to content

Commit

Permalink
feat: Receptive Field sizes now are also allowed to be tuples
Browse files Browse the repository at this point in the history
  • Loading branch information
MLRichter committed Jan 12, 2022
1 parent 1e00adc commit 809b9bd
Show file tree
Hide file tree
Showing 6 changed files with 373 additions and 44 deletions.
6 changes: 4 additions & 2 deletions README.md
Expand Up @@ -51,7 +51,9 @@ ever training the model.
You can do this simply by importing your architecture into the format of RFA-Toolbox and then use the in-build functions
to visualize your architecture using GraphViz.
The visualization will automatically mark layers predicted to be unproductive red and critical layers, that are potentially unproductive orange.
This is especially useful if you plan to train your model on resolutions that are substantially lower than the
In edge case scenarios, where the receptive field expands of the boundaries of the image on some but not all tensor-axis, the layer will be marked yellow,
since such a layer is probably not operating and maximum efficiency.
Being able to detect these types of inefficiencies is especially useful if you plan to train your model on resolutions that are substantially lower than the
design-resolution of most models.
As an alternative, you can also use the graph from RFA-Toolbox to hook RFA-toolbox more directly into your program.

Expand Down Expand Up @@ -142,7 +144,7 @@ out = EnrichedNetworkNode(
)
visualize_architecture(
out, f"example_model", input_res=32
).render(f"example_model")
).view()

```

Expand Down
6 changes: 2 additions & 4 deletions rfa_toolbox/encodings/pytorch/ingest_architecture.py
Expand Up @@ -292,8 +292,6 @@ def create_graph_from_model(


if __name__ == "__main__":
model = torchvision.models.alexnet()
model = torchvision.models.inception_v3()
graph = create_graph_from_model(model)
visualize_architecture(graph, "alexnet_32_pixel", input_res=32).render(
"alexnet_32_pixel"
)
visualize_architecture(graph, "inceptionv3", input_res=32).view()
42 changes: 26 additions & 16 deletions rfa_toolbox/encodings/pytorch/layer_handlers.py
@@ -1,3 +1,6 @@
from collections import Sequence

import numpy as np
import torch
from attr import attrs

Expand Down Expand Up @@ -50,17 +53,24 @@ def __call__(
conv_layer = obtain_module_with_resolvable_string(resolvable_string, model)
kernel_size = (
conv_layer.kernel_size
if isinstance(conv_layer.kernel_size, int)
else conv_layer.kernel_size[0]
# if isinstance(conv_layer.kernel_size, int)
# else conv_layer.kernel_size[0]
)
stride_size = (
conv_layer.stride
if isinstance(conv_layer.stride, int)
else conv_layer.stride[0]
# if isinstance(conv_layer.stride, int)
# else conv_layer.stride[0]
)
filters = conv_layer.out_channels
if not isinstance(kernel_size, Sequence) and not isinstance(
kernel_size, np.ndarray
):
kernel_size_name = f"{kernel_size}x{kernel_size}"
else:
kernel_size_name = "x".join([str(k) for k in kernel_size])
final_name = f"{name} {kernel_size_name} / {stride_size}"
return LayerDefinition(
name=f"{name} {kernel_size}x{kernel_size}",
name=final_name, # f"{name} {kernel_size}x{kernel_size}",
kernel_size=kernel_size,
stride_size=stride_size,
filters=filters,
Expand Down Expand Up @@ -90,18 +100,18 @@ def __call__(
self, model: torch.nn.Module, resolvable_string: str, name: str
) -> LayerDefinition:
conv_layer = obtain_module_with_resolvable_string(resolvable_string, model)
kernel_size = (
conv_layer.kernel_size
if isinstance(conv_layer.kernel_size, int)
else conv_layer.kernel_size[0]
)
stride_size = (
conv_layer.stride
if isinstance(conv_layer.stride, int)
else conv_layer.stride[0]
)
kernel_size = conv_layer.kernel_size

stride_size = conv_layer.stride
if not isinstance(kernel_size, Sequence) and not isinstance(
kernel_size, np.ndarray
):
kernel_size_name = f"{kernel_size}x{kernel_size}"
else:
kernel_size_name = "x".join([str(k) for k in kernel_size])
final_name = f"{name} {kernel_size_name} / {stride_size}"
return LayerDefinition(
name=f"{name} {kernel_size}x{kernel_size}",
name=final_name, # f"{name} {kernel_size}x{kernel_size}",
kernel_size=kernel_size,
stride_size=stride_size,
)
Expand Down
182 changes: 161 additions & 21 deletions rfa_toolbox/graphs.py
@@ -1,5 +1,5 @@
from operator import attrgetter
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
from typing import Any, Callable, Dict, List, Optional, Sequence, Set, Tuple, Union

import numpy as np
from attr import attrib, attrs
Expand Down Expand Up @@ -41,11 +41,15 @@ def receptive_field_provider(node: "EnrichedNetworkNode") -> Optional[int]:
return node.receptive_field_min


# FIXME: Make this function work for scenarios,
# where the infos contain tuples of receptive field sizes describing an area
def naive_minmax_filter(
info: Tuple["ReceptiveFieldInfo"],
) -> Tuple["ReceptiveFieldInfo", "ReceptiveFieldInfo"]:
"""Filters all receptive field infos, except for the one
with the mininum and maximum receptive field size.
Currently only works if all receptive field sizes, kernel and
stride sizes are scalar
Args:
info: Tuple of receptive field info containers to filters
Expand All @@ -63,6 +67,12 @@ def naive_minmax_filter(
return minimum_receptive_field, maximum_receptive_field


def noop_filter(
info: Tuple["ReceptiveFieldInfo"],
) -> Tuple["ReceptiveFieldInfo"]:
return info


@attrs(auto_attribs=True, frozen=True, slots=True)
class ReceptiveFieldInfo:
"""The container holding information for the successive receptive
Expand All @@ -74,8 +84,16 @@ class ReceptiveFieldInfo:
increased by stride sizes > 1
"""

receptive_field: int
multiplicator: int
receptive_field: Union[int, Sequence[int]] = attrib(
converter=lambda x: tuple(x)
if isinstance(x, Sequence) or isinstance(x, np.ndarray)
else x
)
multiplicator: Union[int, Sequence[int]] = attrib(
converter=lambda x: tuple(x)
if isinstance(x, Sequence) or isinstance(x, np.ndarray)
else x
)


@attrs(auto_attribs=True, frozen=True, slots=True)
Expand All @@ -93,31 +111,60 @@ class LayerDefinition(Layer):
"""

name: str
kernel_size: Optional[int] = attrib(
kernel_size: Optional[Union[int, Sequence[int]]] = attrib(
converter=lambda x: np.inf if x is None else x, default=None
)
stride_size: Optional[int] = attrib(
stride_size: Optional[Union[int, Sequence[int]]] = attrib(
converter=lambda x: 1 if x is None else x, default=None
)
filters: Optional[int] = None
units: Optional[int] = None

@kernel_size.validator
def validate_kernel_size(self, attribute: str, value: int) -> None:
if value is not None and value < 1:
def validate_kernel_size(
self, attribute: str, value: Union[int, Sequence[int]]
) -> None:
if isinstance(value, Sequence):
for v in value:
self.validate_kernel_size(attribute, v)
elif value is not None and value < 1:
raise ValueError(
f"{attribute} must be greater than 0 or "
f"{attribute} values must be greater than 0 or "
f"infinite (which indicates a dense layer)"
)

@stride_size.validator
def validate_stride_size(self, attribute: str, value: int) -> None:
if value is not None and value < 1:
def validate_stride_size(
self, attribute: str, value: Union[int, Sequence[int]]
) -> None:
if isinstance(value, Sequence):
for v in value:
self.validate_stride_size(attribute, v)
elif value is not None and value < 1:
raise ValueError(
f"{attribute} must be greater than 0 "
f"(choose 1, if this is a dense layer)"
f"{attribute} values must be greater than 0 or "
f"infinite (which indicates a dense layer)"
)

def _check_consistency_for_kernel_and_stride_sequences(self) -> None:
if isinstance(self.kernel_size, Sequence) and isinstance(
self.stride_size, Sequence
):
if len(self.kernel_size) != len(self.stride_size):
raise ValueError(
"kernel_size and stride_size must have the same length"
)
for i in range(len(self.kernel_size)):
if len(self.kernel_size) != len(self.stride_size):
raise ValueError(
"When kernel_size and stride_size are both sequences, "
"they must have the same length, kernel_size: "
f"{self.kernel_size}, stride_size: {self.stride_size}"
)

def __attrs_post_init__(self):
self._check_consistency_for_kernel_and_stride_sequences()

@classmethod
def from_dict(cls, config) -> "LayerDefinition":
"""Create a LayerDefinition from the dictionary.
Expand Down Expand Up @@ -163,10 +210,12 @@ def compute_receptive_field_sizes(
"""
result: List[ReceptiveFieldInfo] = list()
for rf_info in receptive_field_info:
receptive_field = rf_info.receptive_field + (
(layer_info.kernel_size - 1) * rf_info.multiplicator
receptive_field = np.asarray(rf_info.receptive_field) + (
(np.asarray(layer_info.kernel_size) - 1) * np.asarray(rf_info.multiplicator)
)
multiplicator = np.asarray(layer_info.stride_size) * np.asarray(
rf_info.multiplicator
)
multiplicator = layer_info.stride_size * rf_info.multiplicator
new_info = ReceptiveFieldInfo(
receptive_field=receptive_field, multiplicator=multiplicator
)
Expand Down Expand Up @@ -219,18 +268,104 @@ class EnrichedNetworkNode(Node):

receptive_field_info_filter: Callable[
[Tuple[ReceptiveFieldInfo]], Tuple[ReceptiveFieldInfo]
] = naive_minmax_filter
] = noop_filter
all_layers: List["EnrichedNetworkNode"] = attrib(init=False)

@property
def receptive_field_sizes(self) -> List[int]:
return [elem.receptive_field for elem in self.receptive_field_info]

def _group_by_dim(
self, rf_sizes: List[Union[Sequence[int], int]]
) -> Dict[Union[int, str], List[int]]:
"""Find the minimum receptive field size.
Args:
rf_sizes: A list of receptive field sizes.
Returns:
The minimum size.
"""
if all(
[
isinstance(elem, int) and not isinstance(elem, Sequence)
for elem in rf_sizes
]
):
return {"all": rf_sizes}
else:
result: Dict[Union[int, str], List[int]] = {"all": []}
for rf_size in rf_sizes:
if isinstance(rf_size, Sequence):
for i, size in enumerate(rf_size):
if i not in result:
result[i] = []
result[i].append(size)
else:
result["all"].append(rf_size)
return result

@staticmethod
def _apply_function_on_receptive_field_groups(
groups: Dict[Union[int, str], List[int]], func: Callable[[List[int]], int]
) -> Union[Sequence[int], int]:
"""Apply a function on a list of receptive field sizes.
Args:
groups: A dictionary of receptive field sizes.
func: The function to apply.
Returns:
The result of the function.
"""
if "all" in groups:
scalars: List[int] = groups.pop("all")
if len(groups) == 0:
return func(scalars)
else:
raise ValueError(
"'all'-key not in sequence for receptive field computation"
)

result: List[int] = []
max_dim: int = max(groups.keys())
for i in range(max_dim + 1):
if i not in groups:
raise ValueError(f"Missing dimension {i}")
dim: List[int] = groups[i] + scalars
result.append(func(dim))
return tuple(result)

def _apply_function_on_receptive_field_sizes(
self, func: Callable[[List[int]], int]
) -> Union[Sequence[int], int]:
"""Apply a function on the receptive field sizes.
Args:
func: The function to apply.
Returns:
The result of the function.
"""
return self._apply_function_on_receptive_field_groups(
self._group_by_dim(self.receptive_field_sizes), func
)

def _receptive_field_min(self):
return min(self.receptive_field_sizes, default=0)
return self._apply_function_on_receptive_field_sizes(
lambda x: min(x, default=0)
)
# return min(self.receptive_field_sizes, default=0)

def _receptive_field_max(self):
return max(self.receptive_field_sizes, default=0)
return self._apply_function_on_receptive_field_sizes(
lambda x: max(x, default=0)
)

# return max(self.receptive_field_sizes, default=0)

@property
def kernel_size(self):
Expand Down Expand Up @@ -277,7 +412,7 @@ def __attrs_post_init__(self):

def is_border(
self,
input_resolution: int,
input_resolution: Union[int, Sequence[int]],
receptive_field_provider: Callable[
["EnrichedNetworkNode"], Union[float, int]
] = receptive_field_provider,
Expand Down Expand Up @@ -307,11 +442,16 @@ def is_border(
# all inputs with a receptive field size
# SMALLER than the input resolution
direct_predecessors = [
input_resolution <= receptive_field_provider(pred)
np.all(
np.asarray(input_resolution)
<= np.asarray(receptive_field_provider(pred))
)
for pred in self.predecessors
]
# of course, this means that this layer also needs to fullfill this property
own = input_resolution <= self.receptive_field_min
own = np.all(
np.asarray(input_resolution) <= np.asarray(self.receptive_field_min)
)
# additionally (only relevant for multipath architectures)
# all following layer are border layers as well
# successors = [
Expand Down
11 changes: 10 additions & 1 deletion rfa_toolbox/vizualize.py
@@ -1,4 +1,5 @@
import graphviz
import numpy as np

from rfa_toolbox.graphs import EnrichedNetworkNode

Expand Down Expand Up @@ -42,8 +43,16 @@ def visualize_node(
color = "white"
if node.is_border(input_resolution=input_res):
color = "red"
elif node.receptive_field_min > input_res and color_critical:
elif (
np.all(np.asarray(node.receptive_field_min) > np.asarray(input_res))
and color_critical
):
color = "orange"
elif (
np.any(np.asarray(node.receptive_field_min) > np.asarray(input_res))
and color_critical
):
color = "yellow"
l_name = node.layer_info.name
rf_info = "\\n" + f"r={node.receptive_field_min}"
filters = f"\\n{node.layer_info.filters} filters"
Expand Down

0 comments on commit 809b9bd

Please sign in to comment.