### Imports and HTML-content

In [1]:
%run helper.py

In [2]:
from IPython.display import display, HTML
point_to_dist_angle = """

"""

In [3]:
from scipy.stats import multivariate_normal

def calc_likelihoods_for_distributions_and_points(matrix_dist, matrix_points):
    means = matrix_dist[:, 0]
    covariances = matrix_dist[:, 1:]
    likelihoods = np.array([multivariate_normal(mean=means[i], cov=covariances[i]).pdf(matrix_points) for i in range(len(means))])
    
    return likelihoods.T 

### Code

### Build Graph

In [4]:
@save_params
def create_graph_data(csv_file_name, iteration):
    classes_dict = {
        'goomba': 0,
        'mario': 1,
        'cloud': 2,
        'ground': 3,
        'bush': 4,
        'box': 5,
        'pipe': 6
    }
    
    class_names, boxes = get_classnames_boxes_from_csv(csv_file_name, iteration)
    num_nodes = len(boxes)
    edge_connections = cartesian_product_for_nodes(range(num_nodes))
    node_features = []
    matrix = np.empty((0, 2))
    normal_dist = [0, 0, 1, 0, 0, 1] # mu1, mu2, sig00, sig01, sig10, sig11
    dataset_number = int(csv_file_name.split('/')[-1].split('.')[0])

    if num_nodes == 1:
        box = boxes[0]
        width, height = abs(box['left'] - box['right']), abs(box['top'] - box['bottom'])
        # node_feature: (normal-distribution, class-label, x-val, y-val, width, height, dataset_number, iteration, id)
        node_features.append((*normal_dist, classes_dict[class_names[0]], box['center_x'], \
                              box['center_y'], width, height, dataset_number, iteration, 0))
        
        node_features = torch.tensor(node_features, dtype=torch.float)
        
        return Data(
            x=node_features,
            edge_index=torch.tensor([0]),
            edge_attr=torch.tensor([0])
        )
        
    for i, box in enumerate(boxes):
        new_row = np.array([[box['center_x'], box['center_y']]])
        matrix = np.vstack((matrix, new_row))

        width, height = abs(box['left'] - box['right']), abs(box['top'] - box['bottom'])
        # node_feature: (normal-distribution, class-label, x-val, y-val, width, height, dataset_number, iteration, id)
        node_features.append((*normal_dist, classes_dict[class_names[i]], box['center_x'], \
                              box['center_y'], width, height, dataset_number, iteration, i))
    
    dists, angles = dist_angle_from_matrix(matrix, edge_connections)
    
    node_features = torch.tensor(node_features, dtype=torch.float)
    edge_connections = torch.tensor(edge_connections)
    edges_features =  torch.tensor(np.stack((dists, angles), axis=-1))
    
    data = Data(
        x=node_features,
        edge_index=edge_connections.t().contiguous(),
        edge_attr=edges_features
    )

    return data

In [5]:
dataset = []
for i in range(4):
    csv_file_name = f"/workspaces/jupyterlite/content/pytroch-geometric/mario-tracking-data/{i:04d}.csv"
    df = pd.read_csv(csv_file_name)
    num_iterations = int(df.iloc[-1]['iteration']) # we start at index 1, so no need for `+ 1`
    
    for iteration in range(1, num_iterations):
        graph_data = create_graph_data(csv_file_name, iteration)
        dataset.append(graph_data)

train_dataset = dataset[:int(0.8 * len(dataset))]
test_dataset = dataset[int(0.8 * len(dataset)):]

train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=1)

  return np.linalg.norm(val_edge_pairs, axis=1), np.rad2deg(np.arctan(val_edge_pairs[:, 1] / val_edge_pairs[:, 0]))
  return np.linalg.norm(val_edge_pairs, axis=1), np.rad2deg(np.arctan(val_edge_pairs[:, 1] / val_edge_pairs[:, 0]))


In [6]:
int(csv_file_name.split('/')[-1].split('.')[0])

3

#### Message Passing

In [7]:
class SimpleGAT(torch.nn.Module):
    def __init__(self, in_channels, hidden_channels, out_channels, heads=1):
        super().__init__()
        self.conv1 = GATConv(in_channels, out_channels, heads=1, add_self_loops=False)

    def forward(self, x, edge_index, target_node):
        out = self.conv1(x, edge_index)

#### Loss

The Loss will be the SSE of the likelihoods that the points of frame i+1 belong to the distributions that were created from the graph on frame i.


In [8]:
def calc_point_indices_to_distributions(distributions, points, treshhold_likelihood = 0):
    if points.shape[0] == 0:
        return np.zeros(distributions.shape[0]) - 1
    
    if points.shape[0] == 1 and distributions.shape[0] == 1:
        likelihood_entries = calc_likelihoods_for_distributions_and_points(distributions, points)
    
        if likelihood_entries < treshhold_likelihood: return np.array([-1])
        return np.array([0])

    likelihoods = calc_likelihoods_for_distributions_and_points(distributions, points)
    sorted_indices = np.argsort(likelihoods, axis=0)
    ranks = np.zeros_like(likelihoods, dtype=int)
    n_rows, n_cols = likelihoods.shape
    ranks[sorted_indices, np.arange(n_cols)] = np.tile(np.arange(n_rows), (n_cols, 1)).T
    
    mask_binary = np.array((n_rows - ranks) <= n_cols, dtype=int)
    cumsum_array = np.cumsum(mask_binary, axis=0)
    
    s = min(n_rows, n_cols)
    extended_likelihood_entries = np.zeros((s+1, n_cols + 1))
    
    mask  = np.array((n_rows - ranks) <= n_cols)
    
    points_to_consider = np.where(np.any(mask, axis=1))[0]
    filtered_points = points[points_to_consider]
    
    likelihood_entries = calc_likelihoods_for_distributions_and_points(distributions, filtered_points)
    likelihood_indicies_to_filter = likelihood_entries < treshhold_likelihood
    # punish points that should be filtered out
    likelihood_entries[likelihood_indicies_to_filter] = 1e-50
    
    inv_percentage_of_likelihood = np.log(likelihood_entries)/ np.sum(np.log(likelihood_entries), axis=0)
    # percentage_of_likelihood = (1 / inv_percentage_of_likelihood) / np.sum( (1 / inv_percentage_of_likelihood), axis=0)
    # percentage_of_likelihood = inv_percentage_of_likelihood
    
    cost_matrix = np.log(inv_percentage_of_likelihood)
    row_ind, col_ind = linear_sum_assignment(cost_matrix)
    
    max_index = max(col_ind) + 1
    points_to_distributions = np.zeros(max_index, dtype=row_ind.dtype) - 1
    
    points_to_distributions[col_ind] = row_ind

    # if punished points still come up ahead, set the association to -1
    for point_index, distribution_index in enumerate(col_ind):
        if likelihood_indicies_to_filter[point_index, distribution_index]:
            points_to_distributions[point_index] = -1
    
    return points_to_distributions

In [9]:
def classes_points_distributions_ids_from_graph(graph):
    graph = graph.x
    return graph[:, 6], graph[:, :6], graph[:, 7:9], graph[:, -1]

In [10]:
def indicies_of_filterted_array_entries(filtered_indicies, relative_indicies):
    return np.where(filtered_indicies)[0][relative_indicies]

In [11]:
def calculate_sse_for_distributions_and_points(graph_a, graph_b, mappings):
    node_attr_a = classes_points_distributions_ids_from_graph(graph_a)
    node_attr_b = classes_points_distributions_ids_from_graph(graph_b)
    
    dist_indicies = mappings[:, 0]
    distributions = node_attr_a[1][dist_indicies]
    distributions[:, :2] +=  np.array(node_attr_a[2][dist_indicies])[:]
    distributions = np.array(distributions.reshape(distributions.shape[0], -1, 2))
    
    points = node_attr_b[2][mappings[:, 1]]
    
    likelihoods = torch.tensor(calc_likelihoods_for_distributions_and_points(distributions, points))
    if likelihoods.shape[0] == 1: return likelihoods
    
    return likelihoods[mappings[:, 0], mappings[:, 1]]

In [12]:
@save_params
def find_mapping_of_two_graphs(graph_a, graph_b):
    node_attributes_a = classes_points_distributions_ids_from_graph(graph_a)
    node_attributes_b = classes_points_distributions_ids_from_graph(graph_b)

    total_mappings = torch.empty((0, 2), dtype=torch.int)

    for label in classes_points_distributions_ids_from_graph(graph_b)[0].unique():
        dist_filter = (node_attributes_a[0] == label)
        dist_indicies = np.copy(dist_filter)
        points_filter = (node_attributes_b[0] == label)
        point_indicies = np.copy(points_filter)
        distributions = node_attributes_a[1][dist_indicies]
        distributions = np.array(distributions.reshape(distributions.shape[0], -1, 2))  # to have the correct format
        distributions[:, 0, :] += np.array(node_attributes_a[2][dist_indicies])
        points = node_attributes_b[2][point_indicies]
        num_dist_entries = torch.sum(dist_filter)
        
        mappings = torch.full((num_dist_entries, 2), -1, dtype=torch.int)
        mappings[:, 0] = torch.tensor((node_attributes_a[-1])[dist_filter], dtype=torch.int)
        
        dist_to_point_mapping = calc_point_indices_to_distributions(distributions, points)
        
        for i, dest_index in enumerate(dist_to_point_mapping):
            if dest_index == -1:
                continue
            relative_point_index = indicies_of_filterted_array_entries(points_filter, i)
            mappings[dest_index, 1] = torch.tensor((node_attributes_b[-1])[relative_point_index], dtype=torch.int)
        
        total_mappings = torch.cat((total_mappings, mappings), dim=0)

    return total_mappings

In [13]:
graph_a = train_dataset[700]
graph_b = train_dataset[701]
mappings = find_mapping_of_two_graphs(graph_a, graph_b)
calculate_sse_for_distributions_and_points(graph_a, graph_b, mappings)

  mappings[:, 0] = torch.tensor((node_attributes_a[-1])[dist_filter], dtype=torch.int)
  mappings[dest_index, 1] = torch.tensor((node_attributes_b[-1])[relative_point_index], dtype=torch.int)


tensor([0.0213, 0.0016], dtype=torch.float64)

#### Graph to CSV

In [14]:
graph = graph_a.x[0][6:]
graph

tensor([  5.0000, 151.5850, 201.2750,  15.0900,  15.4900,   2.0000, 103.0000,
          0.0000])

In [15]:
# for each csv file, name the iteration, class, top, left, bottom, right, center_x, center_y

In [27]:
import csv
import os

# csv-header: iteration, class, top, left, bottom, right, center_x, center_y, object_id
def csv_entry_for_graph(graph, relative_path='./csv_files'):
    classes_dict = {
        'goomba': 0,
        'mario': 1,
        'cloud': 2,
        'ground': 3,
        'bush': 4,
        'box': 5,
        'pipe': 6
    }
    
    graph = graph.x[0][6:]
    iteration = int(graph[6]) + 1 # we want to start the iterations with 1
    class_num = int(graph[0])
    width = round(float(graph[3]), 2)
    height = round(float(graph[4]), 2)
    center_x = round(float(graph[1]), 2)
    center_y = round(float(graph[2]), 2)
    top = round(center_y + (height / 2), 2)
    left = round(center_x - (width / 2), 2)
    bottom = round(center_y - (height / 2), 2)
    right = round(center_x + (width / 2), 2)
    object_id = int(graph[7])

    file_number = int(graph[5]) # write to csv-file with name {file_number:04d}.csv"
    
    csv_entry = [iteration, class_num, top, left, bottom, right, center_x, center_y, object_id]
    file_name = os.path.join(relative_path, f"{file_number:04d}.csv")
    # file_exists = os.path.isfile(file_name)
    
    if iteration == 1 and os.path.isfile(file_name):
        os.remove(file_name)
    
    with open(file_name, mode='a', newline='') as file:
        writer = csv.writer(file)
        
        if iteration == 1:
            writer.writerow(['iteration', 'class', 'top', 'left', 'bottom', 'right', 'center_x', 'center_y', 'object_id'])
        
        writer.writerow(csv_entry)

In [57]:
import csv
import os

# csv-header: iteration, class, top, left, bottom, right, center_x, center_y, object_id
def csv_entry_for_graph(graph, relative_path='./csv_files'):
    classes_dict = {
        0: 'goomba',
        1: 'mario',
        2: 'cloud',
        3: 'ground',
        4: 'bush',
        5: 'box',
        6: 'pipe'
    }

    for i in range(graph.x[0].shape[0]):
        graph_node = graph.x[i][6:]
        iteration = int(graph_node[6]) # iteration will start at 1
        class_num = int(graph_node[0])
        class_name = classes_dict.get(class_num, 'unknown')
        width = round(float(graph_node[3]), 2)
        height = round(float(graph_node[4]), 2)
        center_x = round(float(graph_node[1]), 2)
        center_y = round(float(graph_node[2]), 2)
        top = round(center_y - (height / 2), 2)
        left = round(center_x - (width / 2), 2)
        bottom = round(center_y + (height / 2), 2)
        right = round(center_x + (width / 2), 2)
        object_id = int(graph_node[7])
    
        file_number = int(graph_node[5])  # write to csv-file with name {file_number:04d}.csv"
        
        csv_entry = [iteration, class_name, top, left, bottom, right, center_x, center_y, object_id]
        file_name = os.path.join(relative_path, f"{file_number:04d}.csv")
        
        if iteration == 1 and os.path.isfile(file_name):
            os.remove(file_name)
        
        with open(file_name, mode='a', newline='') as file:
            writer = csv.writer(file)
            
            if iteration == 1:
                writer.writerow(['iteration', 'class', 'top', 'left', 'bottom', 'right', 'center_x', 'center_y', 'object_id'])
            
            writer.writerow(csv_entry)

In [58]:
for graph in train_dataset:
    csv_entry_for_graph(graph, 'graph_to_csv')

AttributeError: 'Tensor' object has no attribute 'x'

#### Visualize the CSV-Data

In [29]:
import IPython.display as display
from IPython.display import clear_output
import ipywidgets as widgets
import os
import time

image_widget = widgets.Image(format='jpeg')
display.display(image_widget)

Image(value=b'', format='jpeg')

In [30]:
def get_name_and_boxes_for_iteration(csv_file_name, iteration):
    names = []
    boxes = []
    
    with open(csv_file_name, 'r') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            if int(row['iteration']) == iteration:
                names.append(row['class'])
                boxes.append({
                    'top': float(row['top']),
                    'left': float(row['left']),
                    'bottom': float(row['bottom']),
                    'right': float(row['right']),
                    'center_x': float(row['center_x']),
                    'center_y': float(row['center_y'])
                })
    
    return names, boxes

def draw_boxes_to_canvas(csv_file_name, image_widget, x_limit=240, y_limit=240, frame_delay=0.01):
    df = pd.read_csv(csv_file_name)
    num_iterations = int(df.iloc[-1]['iteration']) + 1
    for iteration in range(num_iterations):
        names, boxes = get_name_and_boxes_for_iteration(csv_file_name, iteration)
        
        height, width = y_limit, x_limit
        canvas = np.zeros((height, width, 3), dtype=np.uint8)
        
        # Draw the file name in the top-left corner
        file_name_text = f"File: {os.path.basename(csv_file_name)}"
        cv2.putText(canvas, file_name_text, (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
        
        for name, box in zip(names, boxes):
            top = int(box['top'])
            left = int(box['left'])
            bottom = int(box['bottom'])
            right = int(box['right'])
            
            cv2.rectangle(canvas, (top, left), (bottom, right), (0, 255, 0), 2)
            cv2.putText(canvas, name, (top, left - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
        
        _, buffer = cv2.imencode('.jpg', canvas)
        image_widget.value = buffer.tobytes()

        time.sleep(frame_delay)

def replay_csv_files_from_directory(directory_name, image_widget):
    for file_name in sorted(os.listdir(directory_name)):
        if file_name.endswith('.csv'):
            csv_file_path = os.path.join(directory_name, file_name)
            draw_boxes_to_canvas(csv_file_path, image_widget)


In [31]:
# directory_name = "/workspaces/jupyterlite/workspace/supermario_graph-nn/03_supermario_graph-nn/yolo_results/clean_multiple_episodes"
# replay_csv_files_from_directory(directory_name, image_widget)

In [55]:
directory_name = "/workspaces/jupyterlite/content/pytroch-geometric/graph_to_csv"
replay_csv_files_from_directory(directory_name, image_widget)