<a href="https://colab.research.google.com/github/Erkiperkki/IL2232/blob/main/graph_data_gen.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')
PATH="/content/drive/MyDrive/Embedded Systems Design Project"
#/content/drive/MyDrive/Embedded Systems Design Project/output

Mounted at /content/drive


In [68]:
import os
import random
import matplotlib.pyplot as plt
import networkx as nx
import pygraphviz
from networkx.drawing.nx_agraph import to_agraph
import torch

In [None]:
dir_path=PATH + "/output"
SLIDING_WINDOW_SIZE = 4
FRAME_INTERVAL = 1 #FRAME_INTERVAL=1 every frame is used, FRAME_INTERVAL=10 every tenth frame is used
OVERLAPSE = 0
SLIDING_WINDOW_OVERLAPSE = (SLIDING_WINDOW_SIZE - OVERLAPSE + 1) * FRAME_INTERVAL
assert SLIDING_WINDOW_SIZE-OVERLAPSE>0

In [24]:
def center_point(x_l, x_r, y_t, y_b):
  x = (x_r - x_l) * 0.5 + x_l
  y = (y_t - y_b) * 0.5 + y_b
  return (x, y)

In [63]:
def print_graph(G):
  # Print the graph
  print("Graph nodes:", G.nodes())
  print("Graph edges:", G.edges(data=True))
  pos = nx.spring_layout(G)
  nx.draw(G, pos, with_labels=True, node_size=500, node_color='lightblue', font_size=12, font_color='black')

  # Draw edge labels
  #edge_labels = nx.get_edge_attributes(G, 'label')
  edge_labels = {(u, v): G[u][v]['label'] for u, v in G.edges() if 'label' in G[u][v]}

  nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels)

  plt.show()

In [26]:
def save_graph(G,name):
  G = to_agraph(G)
  G.layout('dot')
  G.edge_attr["len"] = "2"
  G.node_attr["fontsize"] = "22"
  G.edge_attr["fontsize"] = "18"

  G.draw(name+".png", prog="neato")


In [None]:
def create_interaction_graph(frames):
  edge_list=[]
  #IG = nx.MultiDiGraph()
  IG = nx.DiGraph()
  for G in frames:
    for node in G.nodes():
      if not node in IG.nodes():IG.add_node(node)
    for edge in G.edges():
      if not edge in edge_list:
        edge_list.append(edge)

  for edge in edge_list:
    label=None
    for n, G in enumerate(frames):
      if n+1==len(frames):break
      if not edge in G.edges() or not edge in frames[n+1].edges(): continue

      obj1, obj2 = G.edges()[(edge)], frames[n+1].edges()[(edge)]
      #label=comp(obj1["label"], obj2["label"])
      a, temp, _=rel_attr[obj1["label"], obj2["label"]]
      if temp == "No change" and label!=None: continue

      label, label_str, edge_color=rel_attr[obj1["label"], obj2["label"]]
      #IG.add_edge(*edge, label=label) #priorites the last interaction #could put bool counter here to only add edge once if it found any but it looks uglier
    if (label!=None):
      IG.add_edge(*edge, label=label, label_str=label_str, color=edge_color)

  return IG

In [None]:
def format_input(input):
    temp=input.strip("\n").split(",")
    return ((float(temp[5]), float(temp[6])), int(float(temp[7])) ) #[(float_touple_xy), int_label]



In [8]:
# 0 Moving left to right
# 1 Moving right to left
# 2 Moving forward
# 3 Moving backward
# 4 No change
# 5 Self edge
# 6 Moving foward * (not included)
# 7 Moving backward * (not included)

rel_attr={
    (0, 0): [5, "Self edge", "black"],
    (1, 1): [4, "No change", "darkgray"],
    (1, 2): [0, "Moving left to right", "green"],
    (1, 3): [2, "Moving forward", "blue"],
    (1, 4): [6, "Moving forward * (not included)", "black"],
    (2, 1): [1, "Moving right to left", "orange"],
    (2, 2): [4, "No change", "darkgray"],
    (2, 3): [6, "Moving foward * (not included)", "black"],
    (2, 4): [2, "Moving forward", "blue"],
    (3, 1): [3, "Moving backward", "red"],
    (3, 2): [7, "Moving backward * (not included)", "black"],
    (3, 3): [4, "No change", "darkgray"],
    (3, 4): [0, "Moving left to right", "green"],
    (4, 1): [7, "Moving backward * (not included)", "black"],
    (4, 2): [3, "Moving backward", "red"],
    (4, 3): [1, "Moving right to left", "orange"],
    (4, 4): [4, "No change", "darkgray"]
}
def comp(prev, curr):
  if(prev == 0 and curr == 0):
    return 5  # 5 self edge
  elif(prev == 1 and curr == 2):
    return 0
  elif(prev == 1 and curr == 3):
    return 2
  elif(prev == 1 and curr == 4):
    return 6 # 6 moving foward * (not included)
  elif(prev == 2 and curr == 1):
    return 1
  elif(prev == 2 and curr == 3):
    return 6 # 6 moving foward * (not included)
  elif(prev == 2 and curr == 4):
    return 2
  elif(prev == 3 and curr == 1):
    return 3
  elif(prev == 3 and curr == 2):
    return 7 # 7 moving backward * (not included)
  elif(prev == 3 and curr == 4):
    return 0
  elif(prev == 4 and curr == 1):
    return 7 # 7 moving backward * (not included)
  elif(prev == 4 and curr == 2):
    return 3
  elif(prev == 4 and curr == 3):
    return 1
  elif(prev == curr):
    return 4
  else:
    raise TypeError("Fucntion Comp Failed!!")

In [None]:
def add_edges(G) -> nx.DiGraph:
    for obj1 in G.nodes():
      for obj2 in G.nodes():
        if obj1 != obj2:

          obj1_x , obj1_y = G.nodes[obj1]["xy"][0] , G.nodes[obj1]["xy"][1]
          obj2_x , obj2_y = G.nodes[obj2]["xy"][0] , G.nodes[obj2]["xy"][1]

          if obj1_x >= obj2_x and obj1_y >= obj2_y:
            label =2 # bottom-right
            label_str = "Bottom-Right" # bottom-right
            edge_color = "lightgreen"
          elif obj1_x < obj2_x and obj1_y >= obj2_y:
            label =1
            label_str = "Bottom-Left" # bottom-right
            edge_color = "lightblue"
          elif obj1_x < obj2_x and obj1_y < obj2_y:
            label =3
            label_str = "Top-Left" # bottom-right
            edge_color = "orange"
          elif obj1_x >= obj2_x and obj1_y < obj2_y:
            label =4
            label_str = "Top-Right" # bottom-right
            edge_color= "pink"

          G.add_edge(obj1, obj2, label=label, label_str=label_str, color=edge_color)

    return G


In [43]:
input_scenes=[]
input_windows=[]
input_frames=[]

sort_alg = lambda key: int(key.strip(".txt").split("_")[1]) # sort using the frame_nr in format: "{scene_nr} _ {frame_nr} .txt"

for dir_name in os.listdir(dir_path):
  scene_path=os.path.join(dir_path, dir_name)
  frame_paths = sorted(os.listdir(scene_path), key=sort_alg)
  for i in range(0, len(frame_paths) - SLIDING_WINDOW_SIZE + 1, SLIDING_WINDOW_OVERLAPSE): #grab right values
    input_frames=[]
    for filename in frame_paths[i: i + (SLIDING_WINDOW_SIZE*FRAME_INTERVAL): FRAME_INTERVAL]: #grab right values
      if(i + (SLIDING_WINDOW_SIZE*FRAME_INTERVAL) > len(frame_paths)): continue
      with open(os.path.join(scene_path, filename), 'r') as f:
        frame=f.readlines()
        input_frames.append(frame)

        if input_frames==[]:
          print(i)
          raise ValueError

    #   print(filename)
    # print("--")
    # print(len(frames))
    if input_frames: input_windows.append(input_frames)
  if input_windows: input_scenes.append(input_windows)
#input_frames=scenes

In [None]:
#main
graphs = []
interaction_graph_list = []
for windows in input_scenes:
  for frames in windows:
    for frame in frames:
      G = nx.DiGraph()
      for ln in frame:
        xy, label = format_input(ln)
        G.add_node(label, xy=xy)

      G=add_edges(G)
      graphs.append(G)

    IG = create_interaction_graph(graphs)
    if IG.number_of_nodes() <= 1 or IG.number_of_edges() <= 1: continue #remove empty frames
    interaction_graph_list.append(IG)

    graphs=[]

for i in interaction_graph_list:
  print_graph(i)




In [71]:
torch.save(interaction_graph_list, PATH+"/IA_graph.csv")
# for n,g in enumerate(interaction_graph_list):
#   nx.write_edgelist(g, PATH+"/IA_graph"+str(n)+".csv", data=True, delimiter =",", )


In [None]:
RELATION_COLORS = ["red", "yellow", "blue", "green", "purple", "black"]

G=IG

print("Graph nodes:", G.nodes())
print("Graph edges:", G.edges())

A = to_agraph(G)

A.layout('dot')
#A.node_attr["style"] = "filled"
A.edge_attr["len"] = "2"
A.node_attr["fontsize"] = "22"
A.edge_attr["fontsize"] = "18"

#A.write("graph.dot") # write to simple.dot
#A.read("graph.dot") # write to simple.dot
A.graph_attr["label"] = "Interaction Graph"
A.draw("Interaction_Graph.png", prog="neato")


Graph nodes: [1, 2, 3, 5, 9, 10, 11, 12, 13, 14, 15, 16, 18, 19, 23, 31, 33, 34, 35, 36, 38, 39, 41, 42, 43]
Graph edges: [(1, 2), (1, 3), (1, 5), (2, 1), (2, 3), (2, 5), (2, 9), (2, 10), (2, 11), (2, 12), (2, 13), (2, 14), (2, 15), (2, 16), (3, 1), (3, 2), (3, 5), (3, 9), (5, 1), (5, 2), (5, 3), (9, 2), (9, 3), (9, 12), (9, 13), (9, 14), (10, 2), (10, 11), (11, 2), (11, 10), (11, 12), (11, 13), (12, 2), (12, 11), (12, 13), (12, 14), (12, 9), (12, 15), (13, 2), (13, 11), (13, 12), (13, 14), (13, 9), (13, 15), (13, 16), (14, 2), (14, 12), (14, 13), (14, 9), (14, 15), (15, 2), (15, 12), (15, 13), (15, 14), (15, 16), (16, 2), (16, 15), (16, 13), (18, 19), (18, 23), (19, 18), (23, 18), (31, 34), (31, 35), (34, 31), (34, 35), (35, 31), (35, 34), (38, 39), (38, 41), (38, 42), (38, 43), (39, 38), (39, 41), (39, 42), (39, 43), (41, 38), (41, 39), (41, 42), (41, 43), (42, 38), (42, 39), (42, 41), (42, 43), (43, 38), (43, 39), (43, 41), (43, 42)]


In [None]:
'''
#save graphs with pickle
import pickle

# save graph object to file
a=[IG,IG,IG]
pickle.dump(a, open('filename.pickle', 'wb'))
print_graph(a[1])

# load graph object from file
b = pickle.load(open('filename.pickle', 'rb'))
print(len(b))
print_graph(b[1])
'''

"\n#save graphs with pickle\nimport pickle\n\n# save graph object to file\na=[IG,IG,IG]\npickle.dump(a, open('filename.pickle', 'wb'))\nprint_graph(a[1])\n\n# load graph object from file\nb = pickle.load(open('filename.pickle', 'rb'))\nprint(len(b))\nprint_graph(b[1])\n"