In [1]:
import numpy as np
from mediapipe import solutions

from custom_pose_landmarks import CustomPoseLandmark

In [89]:
class Graph():
    """
    
    """
    def __init__(self, layout, center=0, strategy='uniform', max_distance=1, dilation=1):
        # Initialize parameters
        self.strategy = strategy
        self.max_distance = max_distance
        self.dilation = dilation
        
        # Initialize graph edges, graph center and adjacency matrix
        self.edges = self.get_edges(layout)
        self.center = self.get_center(layout, center)

        self.adjacency = self.get_adjacency()


    def get_edges(self, layout):
        """
        get edges of the graph
        """
        # Extract the number of nodes in the layout
        self.num_node = layout.num_elements()
        # Create a self-loop for each node
        self_link = [(i, i) for i in range(self.num_node)]
        # Extract connections from custome layout
        neighbor_link = list(layout.get_connections())

        # Prepare the edges and center of the graph
        edges = self_link + neighbor_link

        return edges


    def get_center(self, layout, center):
        """ 
        get center of the graph
        """
        if isinstance(center, int):
            return center
        elif isinstance(center, str):
            return layout.get_value(center)
        

    def get_adjacency(self):
        """ 
        get adjacency matrix
        """
        # Prepare adjacency matrix storage based on the number of nodes in the graph
        adjacency_matrix = np.zeros((self.num_node, self.num_node))

        # Fill the matrix based on the list of edges
        for i, j in self.edges:
            adjacency_matrix[i, j] = 1
            adjacency_matrix[j, i] = 1

        return adjacency_matrix

    
    def get_distance(self):
        """ 
        get distance matrix
        """
        # Prepare a hop distance matrix storage
        distance_matrix = np.zeros_like(self.adjacency) + np.inf

        # Calculate the consecutive powers of the adjacency matrix
        transfer_matrix = [
            np.linalg.matrix_power(self.adjacency, d) for d in range(self.max_distance + 1)
        ]

        arrive_matrix = (np.stack(transfer_matrix) > 0)

        # Calculate hop distances for each node
        for d in range(self.max_distance, -1, -1):
            distance_matrix[arrive_matrix[d]] = d

        return distance_matrix


    def normalize(self, matrix):
        """ 
        normalize undirected graph, symmetry matrix
        """
        # Calculate degree (number of links) for each node
        degree_vector = np.sum(matrix, 0)

        # Prepare a diagonal matrix storage
        diagonal = np.zeros((self.num_node, self.num_node))
        
        # Calculate diagonal matrix
        for i in range(self.num_node):
            if degree_vector[i] > 0:
                diagonal[i, i] = degree_vector[i] ** (-0.5)
    
        normalized_matrix = np.dot(np.dot(diagonal, matrix), diagonal)

        return normalized_matrix
    

    def get_labels(self):
        """ 
        label mapping for neighbor nodes based on strategy
        """
        # Create sequence of valid hop distances
        valid_hop = range(0, self.max_distance + 1, self.dilation)

        # Create a copy of the adjacency matrix to extend it
        adjacency_copy = np.copy(self.adjacency)
        # Get distance matrix for node labeling
        distance = self.get_distance()

        # Extend adjacency matrix based on hop distances
        for hop in valid_hop:
            adjacency_copy[distance == hop] = 1

        # Normalize adjacency matrix before implementing the strategy
        normalized = self.normalize(adjacency_copy)

        # Labeling process
        if self.strategy == 'uniform':
            labels = np.expand_dims(normalized, axis=0)
        
        elif self.strategy == 'distance':
            # Create node labels for every hop distance
            labels = np.zeros((len(valid_hop), self.num_node, self.num_node))

            for i, hop in enumerate(valid_hop):
                labels[i][distance == hop] = normalized[distance == hop]

        elif self.strategy == 'spatial':
            #
            labels = []

            for hop in valid_hop:
                root = np.zeros_like(self.adjacency)
                close = np.zeros_like(self.adjacency)
                further = np.zeros_like(self.adjacency)

                for i in range(self.num_node):
                    for j in range(self.num_node):
                        if distance[j, i] == hop:
                            #
                            if distance[j, self.center] == distance[i, self.center]:
                                root[j, i] = normalized[j, i]
                            
                            #
                            elif distance[j, self.center] > distance[i, self.center]:
                                close[j, i] = normalized[j, i]

                            #
                            else:
                                further[j, i] = normalized[j, i]

                if hop == 0:
                    labels.append(root)
                else:
                    labels.append(root + close)
                    labels.append(further)
            
            labels = np.stack(labels)
                                
        else:
            raise ValueError('strategy error')
        
        return labels

In [90]:
# Selected values of pose landmarks corresponding to PoseLandmark class from MediaPipe library
values = [0, 11, 12, 13]

# Custom pose landmark names and their connections
landmarks = {
    'THORAX': ['NOSE']}

# MediaPipe solutions
mp_drawing = solutions.drawing_utils
mp_pose = solutions.pose

custom_pose = CustomPoseLandmark(mp_pose, values, landmarks)
# conv = Video2DataFrame(mp_pose, mp_drawing, custom_pose)

In [91]:
custom_pose.get_connections()

{(1, 2), (1, 3), (4, 0)}

In [92]:
graph = Graph(
    layout=custom_pose,
    center='THORAX', 
    strategy='spatial',
    max_distance=1,
    dilation=1
)

In [98]:
graph.edges

[(0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (4, 0), (1, 2), (1, 3)]

In [99]:
graph.center

4

In [93]:
graph.adjacency

array([[1., 0., 0., 0., 1.],
       [0., 1., 1., 1., 0.],
       [0., 1., 1., 0., 0.],
       [0., 1., 0., 1., 0.],
       [1., 0., 0., 0., 1.]])

In [94]:
graph.get_adjacency()

array([[1., 0., 0., 0., 1.],
       [0., 1., 1., 1., 0.],
       [0., 1., 1., 0., 0.],
       [0., 1., 0., 1., 0.],
       [1., 0., 0., 0., 1.]])

In [95]:
graph.get_distance()

array([[ 0., inf, inf, inf,  1.],
       [inf,  0.,  1.,  1., inf],
       [inf,  1.,  0., inf, inf],
       [inf,  1., inf,  0., inf],
       [ 1., inf, inf, inf,  0.]])

In [97]:
graph.get_labels()

array([[[0.5       , 0.        , 0.        , 0.        , 0.        ],
        [0.        , 0.33333333, 0.        , 0.        , 0.        ],
        [0.        , 0.        , 0.5       , 0.        , 0.        ],
        [0.        , 0.        , 0.        , 0.5       , 0.        ],
        [0.        , 0.        , 0.        , 0.        , 0.5       ]],

       [[0.        , 0.        , 0.        , 0.        , 0.5       ],
        [0.        , 0.        , 0.40824829, 0.40824829, 0.        ],
        [0.        , 0.40824829, 0.        , 0.        , 0.        ],
        [0.        , 0.40824829, 0.        , 0.        , 0.        ],
        [0.        , 0.        , 0.        , 0.        , 0.        ]],

       [[0.        , 0.        , 0.        , 0.        , 0.        ],
        [0.        , 0.        , 0.        , 0.        , 0.        ],
        [0.        , 0.        , 0.        , 0.        , 0.        ],
        [0.        , 0.        , 0.        , 0.        , 0.        ],
        [0.5    