In [1]:
import math

from keras.applications import *
import networkx as nx
from tensorflow import keras
import scipy
import tensorflow as tf
import numpy as np
from typing import Tuple, List, Dict
import random

In [2]:
class Partitioner:
    def __init__(self, model: keras.Model):
        self.model = model
        self.Stack = []
        self.visited = {}
        # The "depth"/level that a certain layer is at
        self.layer_level = {}
        # The layers at a certain depth/level, where the index of the array is the level
        self.levels = []

    def get_previous(self, layer_name):
        inbound = self.model.get_layer(layer_name).inbound_nodes[0].inbound_layers
        if type(inbound) != list:
            inbound = [inbound]
        return [layer.name for layer in inbound]

    def get_next(self, layer_name):
        outbound = self.model.get_layer(layer_name).outbound_nodes
        return [node.outbound_layer.name for node in outbound]

    # Traverses the model starting from layer_name all the way to start
    def traverse(self, layer_name, start, part_name, inpt):
        # On subsequent recursive steps, the new input layer will be defined,
        # so that name needs to be checked in base case
        if (layer_name == start) or (layer_name == part_name):
            return inpt

        output = []
        for n in self.get_previous(layer_name):
            output.append(self.traverse(n, start, part_name, inpt))

        # If the DAG node only has 1 previous connection
        if len(output) == 1:
            output = output[0]

        layer = self.model.get_layer(layer_name)
        to_next = layer(output)
        return to_next

    def construct_model(self, start, end, part_name="part_begin"):
        inpt = keras.Input(tensor=self.model.get_layer(start).output, name=part_name)
        output = self.traverse(end, start, part_name, inpt)
        part = keras.Model(inputs=self.model.get_layer(start).output, outputs=output)
        return part

    # TODO write this function
    def create_model_partitions(self, node_capacities: List[str], communication_graph: nx.Graph):
        node_partition_names = self.partition_model(node_capacities, communication_graph)
        model_partitions = {}
        for k in node_partition_names:
            start_layer, end_layer = node_partition_names[k]
            model = self.construct_model(start_layer, end_layer)
            model_partitions[k] = model
            print("Model constructed")

        return model_partitions


    # A recursive function used by longest_path. See below
    # link for details
    # https:#www.geeksforgeeks.org/topological-sorting/
    def topological_sort_util(self, v: str):
        self.visited[v] = True

        # Recur for all the vertices adjacent to this vertex
        # list<AdjListNode>::iterator i
        for i in self.get_next(v):
            if not self.visited[i]:
                self.topological_sort_util(i)

        # Push current vertex to stack which stores topological
        # sort
        self.Stack.append(v)

    # The function to find longest distances from a given vertex.
    # It uses recursive topologicalSortUtil() to get topological
    # sorting.
    def longest_path(self, s: str) -> List[List[str]]:
        for l in self.model.layers:
            self.visited[l.name] = False
            self.layer_level[l.name] = -1 # Equal to -infty

        # Call the recursive helper function to store Topological
        # Sort starting from all vertices one by one
        for l in self.model.layers:
            if not self.visited[l.name]:
                self.topological_sort_util(l.name)

        # Initialize distances to all vertices as infinite and
        # distance to source as 0
        self.layer_level[s] = 0

        # Process vertices in topological order
        while len(self.Stack) > 0:

            # Get the next vertex from topological order
            u = self.Stack.pop()

            # Update distances of all adjacent vertices
            # list<AdjListNode>::iterator i
            if self.layer_level[u] != -1:
                for i in self.get_next(u):
                    if self.layer_level[i] < self.layer_level[u] + 1:
                        self.layer_level[i] = self.layer_level[u] + 1 # Each edge weighted 1

        # Create array of calculated longest distances to layer
        layers_at_level = [[]] * len(self.layer_level)
        for l in self.model.layers:
            if len(layers_at_level[self.layer_level[l.name]]) == 0:
                layers_at_level[self.layer_level[l.name]] = []

            layers_at_level[self.layer_level[l.name]].append(l.name)

        return layers_at_level

    def find_singletons(self):
        # Model only has 1 input, which is input_names[0]
        name = self.model.input_names[0]
        # Finding the longest path from the start to every other layer
        self.levels = self.longest_path(name)
        singletons = []
        for l in range(len(self.levels)):
            if len(self.levels[l]) == 1:
                singletons.append(self.levels[l][0])
        return singletons

    def find_all_paths_util(self, u, d, visited, path, all_paths):
        # If the distance of the current path is greater than the longest path (the "level") to the destination node, we know the destination node can't be a partition point
        if self.layer_level[u] > self.layer_level[d]:
            return False
        # Mark the current node as visited and store in path
        visited[u] = True
        path.append(u)

        # If current vertex is same as destination, then print
        # current path[] (because we've found a path from u to d)
        if u == d:
            exists = False
            # See if path already exists in list of paths
            for p in all_paths:
                if p == path:
                    exists = True
                    break

            if not exists:
                all_paths.append(path.copy())
        else:
            # If current vertex is not destination
            # Recur for all the vertices adjacent to this vertex
            for i in self.get_next(u):
                if not visited[i]:
                    ret = self.find_all_paths_util(i, d, visited, path, all_paths)
                    if not ret:
                        return False

        # Remove current vertex from path[] and mark it as unvisited
        path.pop()
        visited[u] = False
        return True

    # Finds all paths from 's' to 'd.' Returns false if a there exists a path from s that has a greater "level" than d, otherwise returns true
    def find_all_paths(self, s, d) -> bool:
        # Mark all the vertices as not visited
        visited = {}
        for l in self.model.layers:
            visited[l.name] = False

        # Create an array to store paths
        path = []
        all_paths = []

        # Call the recursive helper function to find all paths
        return self.find_all_paths_util(s, d, visited, path, all_paths)

    def partitions_util(self, prev, singleton_nodes, partitions):
        # Reached the end of the model and found all the partitions
        if len(singleton_nodes) == 0:
            return partitions
        p = False
        i = -1 # So first i starts at 0
        # Starting from the previous partition point, we iterate through all the subsequent singleton nodes to find the next partition point
        while not p:
            i += 1
            p = self.find_all_paths(prev, singleton_nodes[i])

        partitions.append(singleton_nodes[i])
        return self.partitions_util(singleton_nodes[i], singleton_nodes[i + 1:], partitions)

    def find_partitions(self) -> List[str]:
        inpt = self.model.input_names[0]
        return self.partitions_util(inpt, self.find_singletons(), [])

    def keras_model_memory_usage_in_bytes(self, model, batch_size: int):
        """
        Return the estimated memory usage of a given Keras model in bytes.
        This includes the model weights and layers, but excludes the dataset.

        The model shapes are multiplied by the batch size, but the weights are not.

        Args:
            model: A Keras model.
            batch_size: The batch size you intend to run the model with. If you
                have already specified the batch size in the model itself, then
                pass `1` as the argument here.
        Returns:
            An estimate of the Keras model's memory usage in bytes.

        """
        default_dtype = tf.keras.backend.floatx()
        shapes_mem_count = 0
        internal_model_mem_count = 0
        for layer in model.layers:
            if isinstance(layer, tf.keras.Model):
                internal_model_mem_count += self.keras_model_memory_usage_in_bytes(
                    layer, batch_size=batch_size
                )
            single_layer_mem = tf.as_dtype(layer.dtype or default_dtype).size
            out_shape = layer.output_shape
            if isinstance(out_shape, list):
                out_shape = out_shape[0]
            for s in out_shape:
                if s is None:
                    continue
                single_layer_mem *= s
            shapes_mem_count += single_layer_mem

        trainable_count = sum(
            [tf.keras.backend.count_params(p) for p in model.trainable_weights]
        )
        non_trainable_count = sum(
            [tf.keras.backend.count_params(p) for p in model.non_trainable_weights]
        )

        total_memory = (
                batch_size * shapes_mem_count
                + internal_model_mem_count
                + trainable_count
                + non_trainable_count
        )
        return total_memory

    def keras_layer_memory(self, layer_name, batch_size: int):
        default_dtype = tf.keras.backend.floatx()
        shapes_mem_count = 0
        internal_model_mem_count = 0

        if isinstance(layer_name, tf.keras.Model):
            internal_model_mem_count += self.keras_model_memory_usage_in_bytes(
                layer_name, batch_size=batch_size
            )
        single_layer_mem = tf.as_dtype(layer_name.dtype or default_dtype).size
        out_shape = layer_name.output_shape
        if isinstance(out_shape, list):
            out_shape = out_shape[0]
        for s in out_shape:
            if s is None:
                continue
            single_layer_mem *= s
        shapes_mem_count += single_layer_mem

        trainable_count = sum(
            [tf.keras.backend.count_params(p) for p in layer_name.trainable_weights]
        )
        non_trainable_count = sum(
            [tf.keras.backend.count_params(p) for p in layer_name.non_trainable_weights]
        )

        total_memory = (
                batch_size * shapes_mem_count
                + internal_model_mem_count
                + trainable_count
                + non_trainable_count
        )
        return total_memory

    def find_partition_memory(self, partition_points):
        part_mems = []
        #Each index represents the memory between that part pt and the next one
        for i in range(1, len(partition_points)):
            # Going backwards along layers within partition to find total memory usage
            start = self.layer_level[partition_points[i]]
            end = self.layer_level[partition_points[i - 1]]
            mem = 0
            for j in range(start, end, -1):
                for l in self.levels[j]:
                    layer_mem = self.keras_layer_memory(self.model.get_layer(l), 1)
                    mem += layer_mem
            part_mems.append(mem)
        # Nothing used after last partition pt, which is output layer
        part_mems.append(0)
        return part_mems

    # Returns transfer size of partition in Mbits
    def find_partition_transfer_size(self, partition_points) -> Tuple[List[int], Dict[str, int]]:
        transfer_sizes = []
        transfer_size_dict = {}
        for i in range(len(partition_points)):
            num_outbound = len(self.model.get_layer(partition_points[i]).outbound_nodes)

            # Iterate through all elements of shape tuple except first one (which is batch size)
            output_size = 1
            for s in self.model.get_layer(partition_points[i]).get_output_at(0).get_shape()[1:]:
                output_size *= s
            # Compression ratio is ~1.44 (according to https://www.researchgate.net/publication/264417607_Fixed-Rate_Compressed_Floating-Point_Arrays)
            zfp_comp_ratio = 1.44
            # Assuming all elements are floats, each float uses 8 bytes
            output_size_bytes = (output_size * 8) / zfp_comp_ratio
            output_size_mbits = (output_size_bytes * 8) / (1024 ** 2)
            # All outputs of the layer are the same size, the total size will be (output size * num_output_nodes)
            transfer_size = num_outbound * output_size_mbits
            transfer_size_dict[partition_points[i]] = transfer_size
            transfer_sizes.append(transfer_size)

        return transfer_sizes, transfer_size_dict

    # For each node, finds the next partition point with the smallest transfer size
    def partition_model(self, node_capacities: List[int], communication_graph: nx.Graph):
        pass

In [3]:
def distance_to_bandwidth(d):
    # Network with average bandwidth = 6.5 Mbps
    a = 283230
    return math.log2(1 + a / (d ** 2))

def get_bottleneck(transfer_sizes: List[int], G_c: nx.Graph, arrangement: List[int]):
    bottleneck = 0
    for t in range(len(transfer_sizes)):
        latency = transfer_sizes[t] / G_c[arrangement[t]][arrangement[t+1]]['weight']
        if latency > bottleneck:
            bottleneck = latency

    return bottleneck

def generate_comm_graph(num_nodes: int):
    rng = np.random.default_rng()
    # Set of arrays of len 2
    node_pos = (rng.random((num_nodes, 2)) * 149) + 1
    comm_graph = nx.complete_graph(num_nodes)
    nodes_list = list(comm_graph.nodes())
    for n in range(len(nodes_list)):
        comm_graph.nodes()[nodes_list[n]]['pos'] = node_pos[n]
    for j in comm_graph.edges():
        u = j[0]
        v = j[1]
        dist = scipy.spatial.distance.euclidean(comm_graph.nodes[u]["pos"], comm_graph.nodes[v]["pos"])
        w = distance_to_bandwidth(dist)
        comm_graph[u][v]["weight"] = w
        comm_graph[u][v]['name'] = f"{u}-{v}"

    return comm_graph

In [4]:
def random_partition_place(node_capacity, num_nodes, partitions, transfer_sizes, partition_mems):
    splits = []
    i = 0
    while i < len(partitions):
        j = i
        while sum(partition_mems[i:j-1]) < node_capacity:
                j += 1
                if j > len(partitions) - 1:
                    break
        if j != len(partitions):
            splits.append(j)
        i = j+1
    #print(splits)
    #print(len(partitions))

    choices = []
    for s in range(1, len(splits)):
        start = splits[s-1] + 1
        end = splits[s]
        choice = random.randint(start, end)
        choices.append(choice)

    random_transfers = []
    for c in choices:
        random_transfers.append(transfer_sizes[c])

    comm_graph = generate_comm_graph(num_nodes)
    nodes = [i for i in range(num_nodes)]
    num_parts = len(random_transfers) + 1
    arrangement = random.sample(nodes, num_parts)

    bottleneck = get_bottleneck(random_transfers, comm_graph, arrangement)

    return bottleneck

In [5]:
def test_graph_configs(model, model_name):
    partitioner = Partitioner(model)
    partitions = partitioner.find_partitions()
    transfer_sizes = partitioner.find_partition_transfer_size(partitions)[0]

    partition_mems = partitioner.find_partition_memory(partitions)

    all_data = {}
    # Average of many trials for accuracy
    num_trials = 50
    for i in range(num_trials):
        print(f"Trial #{i+1}")
        for num_nodes in node_nums:
            for c in caps:
                # Convert to MB
                cap = c * (1024 ** 2)
                bottleneck = random_partition_place(cap, num_nodes, partitions, transfer_sizes, partition_mems)

                key = f"{model_name}-{c}-{num_nodes}"
                if i == 0:
                    old_avg = 0
                else:
                    old_avg = all_data[key]

                new_avg = old_avg + ((bottleneck - old_avg)/(i+1))
                all_data[key] = new_avg

    return all_data

In [6]:
# The models we're using for the test
#model_names = ['ResNet50', 'InceptionResNetV2', 'EfficientNetB1', 'MobileNetV2']

In [7]:
#caps = [64, 128, 256]
caps = [64, 128, 256]
# Number of nodes
node_nums = [5, 10, 15, 20, 50]

model = InceptionResNetV2()
model_name = 'InceptionResNetV2'

data = test_graph_configs(model, model_name)
for k in data:
    cols = k.split("-")
    key_fmt = "\t".join(cols)
    val = data[k]
    result = f"{key_fmt}\t{val}"
    print(result)

Metal device set to: Apple M1 Pro


2022-09-18 14:34:22.103829: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:305] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
2022-09-18 14:34:22.103935: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:271] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)


Trial #1
Trial #2
Trial #3
Trial #4
Trial #5
Trial #6
Trial #7
Trial #8
Trial #9
Trial #10
Trial #11
Trial #12
Trial #13
Trial #14
Trial #15
Trial #16
Trial #17
Trial #18
Trial #19
Trial #20
Trial #21
Trial #22
Trial #23
Trial #24
Trial #25
Trial #26
Trial #27
Trial #28
Trial #29
Trial #30
Trial #31
Trial #32
Trial #33
Trial #34
Trial #35
Trial #36
Trial #37
Trial #38
Trial #39
Trial #40
Trial #41
Trial #42
Trial #43
Trial #44
Trial #45
Trial #46
Trial #47
Trial #48
Trial #49
Trial #50
InceptionResNetV2	64	5	10.679291662156565
InceptionResNetV2	128	5	9.3156382309007
InceptionResNetV2	256	5	5.422333755575066
InceptionResNetV2	64	10	10.123497794067541
InceptionResNetV2	128	10	8.143775696925365
InceptionResNetV2	256	10	6.499533996186187
InceptionResNetV2	64	15	9.7649540818779
InceptionResNetV2	128	15	7.957684960212683
InceptionResNetV2	256	15	5.889915445362976
InceptionResNetV2	64	20	10.386013656200051
InceptionResNetV2	128	20	8.536190297465293
InceptionResNetV2	256	20	5.727628568228336
I