## Unit tests to sanity check the network's capabilities

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import os
import json

In [None]:
from GraphTranslatorModule import GraphTranslatorModule
from pytorch_lightning import Trainer
from pytorch_lightning.loggers import WandbLogger
import torch

In [None]:
node_dictionary = {}
node_dictionary['kitchen'] = {"id": 1, "class_name": "kitchen", "category": "Rooms", "properties": [], "states": [], "prefab_name": None, "bounding_box": None}
node_dictionary['cabinet'] = {"id": 2, "class_name": "cabinet", "category": "Furniture", "properties": [], "states": ["CLOSED"], "prefab_name": None, "bounding_box": None}
node_dictionary['table'] = {"id": 3, "class_name": "table", "category": "Furniture", "properties": [], "states": ["CLOSED"], "prefab_name": None, "bounding_box": None}
node_dictionary['sink'] = {"id": 4, "class_name": "sink", "category": "Furniture", "properties": [], "states": ["CLOSED"], "prefab_name": None, "bounding_box": None}
node_dictionary['cup'] = {"id": 5, "class_name": "cup", "category": "placable_objects", "properties": [], "states": ["CLEAN"], "prefab_name": None, "bounding_box": None}
node_dictionary['cereal'] = {"id": 6, "class_name": "cereal", "category": "placable_objects", "properties": [], "states": ["CLEAN"], "prefab_name": None, "bounding_box": None}
node_dictionary['toast'] = {"id": 7, "class_name": "toast", "category": "placable_objects", "properties": [], "states": ["CLEAN"], "prefab_name": None, "bounding_box": None}

In [None]:
def edge(from_node, relation, to_node):
    return {'from_id':node_dictionary[from_node]['id'], 'relation_type':relation, 'to_id':node_dictionary[to_node]['id']}

In [None]:
dt = 10

def time_internal(mins, hrs, days=0, weeks=0):
    return int(round(((((weeks*7+days)*24)+hrs)*60+mins)/dt))

def time_external(in_t):
    in_t = in_t*dt
    mins = in_t % 60
    in_t = in_t // 60
    hrs = in_t % 24
    in_t = in_t // 24
    days = in_t % 7
    in_t = in_t // 7
    weeks = in_t
    return(weeks, days, hrs, mins)

In [None]:
def print_a_result(idx = -1):
    test_data = data._alldata[idx]
    edges, nodes, context_curr, context_query, y = test_data
    print(edges)
    edges = edges.unsqueeze(0)  
    nodes = nodes.unsqueeze(0)  
    context_curr = context_curr.unsqueeze(0)  
    context_query = context_query.unsqueeze(0)  
    x_hat = model.forward(edges, nodes, context_curr, context_query).squeeze()
    print(x_hat)
    print(y)

## Case 1. No noise repeating case

### Generate data

In [None]:
data_dir = 'data/unittests/case1'
if not os.path.exists(data_dir):
    os.makedirs(data_dir)


nodes = [node_dictionary[key] for key in ['kitchen','cabinet','table','sink','cup']]
print('Nodes : ',nodes)

with open(os.path.join(data_dir,'classes.json'),'w') as f:
    json.dump({"nodes":nodes, "edges": ["INSIDE"]}, f)

data = []

fixed_edges = []
fixed_edges.append(edge('cabinet','INSIDE','kitchen'))
fixed_edges.append(edge('table','INSIDE','kitchen'))
fixed_edges.append(edge('sink','INSIDE','kitchen'))

edges_0 = []
edges_0.append(edge('cup','INSIDE','cabinet'))

edges_1 = []
edges_1.append(edge('cup','INSIDE','table'))

edges_2 = []
edges_2.append(edge('cup','INSIDE','sink'))

changing_edges = [edges_0, edges_1, edges_2, edges_0]

graph_times = [time_internal(0,8,0,0), time_internal(20,8,0,0), time_internal(40,8,0,0), time_internal(0,9,0,0)]
data.append({"times":graph_times, "graphs":[{"nodes":nodes, "edges":edges+fixed_edges} for edges in changing_edges]})
graph_times = [time_internal(0,8,0,0), time_internal(20,8,0,0), time_internal(40,8,0,0), time_internal(0,9,0,0)]
data.append({"times":graph_times, "graphs":[{"nodes":nodes, "edges":edges+fixed_edges} for edges in changing_edges]})
graph_times = [time_internal(0,8,0,0), time_internal(20,8,0,0), time_internal(40,8,0,0), time_internal(0,9,0,0)]
data.append({"times":graph_times, "graphs":[{"nodes":nodes, "edges":edges+fixed_edges} for edges in changing_edges]})
graph_times = [time_internal(0,8,0,0), time_internal(20,8,0,0), time_internal(40,8,0,0), time_internal(0,9,0,0)]
data.append({"times":graph_times, "graphs":[{"nodes":nodes, "edges":edges+fixed_edges} for edges in changing_edges]})
graph_times = [time_internal(0,8,0,0), time_internal(20,8,0,0), time_internal(40,8,0,0), time_internal(0,9,0,0)]
data.append({"times":graph_times, "graphs":[{"nodes":nodes, "edges":edges+fixed_edges} for edges in changing_edges]})

with open(os.path.join(data_dir,'sample.json'),'w') as f:
    json.dump(data, f)


### Run model on the data

In [None]:
from reader import RoutinesDataset
from encoders import time_external, time_external_normalized, time_sine_cosine
from analyzers import *

data = RoutinesDataset(data_path=os.path.join(data_dir,'sample.json'), classes_path=os.path.join(data_dir,'classes.json'), time_encoder=time_sine_cosine)

logging_analyzers = [MeanLoss(), EdgeTypeLoss(data.get_edge_classes())]

model = GraphTranslatorModule(num_nodes=data.n_nodes, 
                              node_feature_len=data.n_len, 
                              edge_feature_len=data.e_len, 
                              context_len=data.c_len, 
                              train_analyzer=MeanLoss(), 
                              logging_analyzers=logging_analyzers)


wandb_logger = WandbLogger()
trainer = Trainer(max_epochs=250, logger=wandb_logger, log_every_n_steps=1)
trainer.fit(model, data.get_train_loader())
trainer.test(model, data.get_test_loader())

## Case 2. No noise repeating case with multiple edge types

In [None]:
data_dir = 'data/unittests/case2'
if not os.path.exists(data_dir):
    os.makedirs(data_dir)


nodes = [node_dictionary[key] for key in ['kitchen','cabinet','table','sink','cup']]

with open(os.path.join(data_dir,'classes.json'),'w') as f:
    json.dump({"nodes":nodes, "edges": ["ON","INSIDE","CLOSE"]}, f)

data = []

fixed_edges = []
fixed_edges.append(edge('cabinet','INSIDE','kitchen'))
fixed_edges.append(edge('table','INSIDE','kitchen'))
fixed_edges.append(edge('sink','INSIDE','kitchen'))

edges_0 = []
edges_0.append(edge('cup','INSIDE','cabinet'))
edges_0.append(edge('cup','CLOSE','cabinet'))
edges_0.append(edge('cabinet','CLOSE','cup'))

edges_1 = []
edges_1.append(edge('cup','ON','table'))
edges_1.append(edge('cup','CLOSE','table'))
edges_1.append(edge('table','CLOSE','cup'))

edges_2 = []
edges_2.append(edge('cup','INSIDE','sink'))
edges_2.append(edge('cup','CLOSE','sink'))
edges_2.append(edge('sink','CLOSE','cup'))

changing_edges = [edges_0, edges_1, edges_2, edges_0]

graph_times = [time_internal(0,8,0,0), time_internal(20,8,0,0), time_internal(40,8,0,0), time_internal(0,9,0,0)]
data.append({"times":graph_times, "graphs":[{"nodes":nodes, "edges":edges+fixed_edges} for edges in changing_edges]})
graph_times = [time_internal(0,8,0,0), time_internal(20,8,0,0), time_internal(40,8,0,0), time_internal(0,9,0,0)]
data.append({"times":graph_times, "graphs":[{"nodes":nodes, "edges":edges+fixed_edges} for edges in changing_edges]})
graph_times = [time_internal(0,8,0,0), time_internal(20,8,0,0), time_internal(40,8,0,0), time_internal(0,9,0,0)]
data.append({"times":graph_times, "graphs":[{"nodes":nodes, "edges":edges+fixed_edges} for edges in changing_edges]})
graph_times = [time_internal(0,8,0,0), time_internal(20,8,0,0), time_internal(40,8,0,0), time_internal(0,9,0,0)]
data.append({"times":graph_times, "graphs":[{"nodes":nodes, "edges":edges+fixed_edges} for edges in changing_edges]})
graph_times = [time_internal(0,8,0,0), time_internal(20,8,0,0), time_internal(40,8,0,0), time_internal(0,9,0,0)]
data.append({"times":graph_times, "graphs":[{"nodes":nodes, "edges":edges+fixed_edges} for edges in changing_edges]})

with open(os.path.join(data_dir,'sample.json'),'w') as f:
    json.dump(data, f)


In [None]:
from reader import RoutinesDataset
from analyzers import *

data = RoutinesDataset(data_path=os.path.join(data_dir,'sample.json'), classes_path=os.path.join(data_dir,'classes.json'))

logging_analyzers = [MeanLoss(), EdgeTypeLoss(data.get_edge_classes())]

model = GraphTranslatorModule(num_nodes=data.n_nodes, 
                              node_feature_len=data.n_len, 
                              edge_feature_len=data.e_len, 
                              context_len=data.c_len, 
                              train_analyzer=MeanLoss(), 
                              logging_analyzers=logging_analyzers)


wandb_logger = WandbLogger()
trainer = Trainer(max_epochs=250, logger=wandb_logger, log_every_n_steps=1)
trainer.fit(model, data.get_train_loader())
trainer.test(model, data.get_test_loader())

## Case 3. No noise repeating case with different times

In [None]:
data_dir = 'data/unittests/case3'
if not os.path.exists(data_dir):
    os.makedirs(data_dir)


nodes = [node_dictionary[key] for key in ['kitchen','cabinet','table','sink','cup']]
print('Nodes : ',nodes)

with open(os.path.join(data_dir,'classes.json'),'w') as f:
    json.dump({"nodes":nodes, "edges": ["INSIDE"]}, f)

data = []

fixed_edges = []
fixed_edges.append(edge('cabinet','INSIDE','kitchen'))
fixed_edges.append(edge('table','INSIDE','kitchen'))
fixed_edges.append(edge('sink','INSIDE','kitchen'))

edges_0 = []
edges_0.append(edge('cup','INSIDE','cabinet'))

edges_1 = []
edges_1.append(edge('cup','INSIDE','table'))

edges_2 = []
edges_2.append(edge('cup','INSIDE','sink'))

changing_edges = [edges_0, edges_1, edges_2, edges_0]

graph_times = [time_internal(0,8,0,0), time_internal(20,8,0,0), time_internal(40,8,0,0), time_internal(0,9,0,0)]
data.append({"times":graph_times, "graphs":[{"nodes":nodes, "edges":edges+fixed_edges} for edges in changing_edges]})
graph_times = [time_internal(0,7,0,0), time_internal(20,7,0,0), time_internal(40,7,0,0), time_internal(0,8,0,0)]
data.append({"times":graph_times, "graphs":[{"nodes":nodes, "edges":edges+fixed_edges} for edges in changing_edges]})
graph_times = [time_internal(10,7,0,0), time_internal(20,7,0,0), time_internal(40,7,0,0), time_internal(0,8,0,0)]
data.append({"times":graph_times, "graphs":[{"nodes":nodes, "edges":edges+fixed_edges} for edges in changing_edges]})
graph_times = [time_internal(30,8,0,0), time_internal(00,9,0,0), time_internal(10,9,0,0), time_internal(30,9,0,0)]
data.append({"times":graph_times, "graphs":[{"nodes":nodes, "edges":edges+fixed_edges} for edges in changing_edges]})
graph_times = [time_internal(10,8,0,0), time_internal(50,8,0,0), time_internal(10,9,0,0), time_internal(30,9,0,0)]
data.append({"times":graph_times, "graphs":[{"nodes":nodes, "edges":edges+fixed_edges} for edges in changing_edges]})

with open(os.path.join(data_dir,'sample.json'),'w') as f:
    json.dump(data, f)


In [None]:
from reader import RoutinesDataset
from analyzers import *

data = RoutinesDataset(data_path=os.path.join(data_dir,'sample.json'), classes_path=os.path.join(data_dir,'classes.json'))

logging_analyzers = [MeanLoss(), EdgeTypeLoss(data.get_edge_classes())]

model = GraphTranslatorModule(num_nodes=data.n_nodes, 
                              node_feature_len=data.n_len, 
                              edge_feature_len=data.e_len, 
                              context_len=data.c_len, 
                              train_analyzer=MeanLoss(), 
                              logging_analyzers=logging_analyzers)


wandb_logger = WandbLogger()
trainer = Trainer(max_epochs=250, logger=wandb_logger, log_every_n_steps=1)
trainer.fit(model, data.get_train_loader())
trainer.test(model, data.get_test_loader())

## Case 4. 50-50 repeating case

In [None]:
data_dir = 'data/unittests/case4'
if not os.path.exists(data_dir):
    os.makedirs(data_dir)


nodes = [node_dictionary[key] for key in ['kitchen','cabinet','table','cereal','toast']]
print('Nodes : ',nodes)

with open(os.path.join(data_dir,'classes.json'),'w') as f:
    json.dump({"nodes":nodes, "edges": ["ON","INSIDE","CLOSE"]}, f)

data = []

fixed_edges = []
fixed_edges.append(edge('cabinet','INSIDE','kitchen'))
fixed_edges.append(edge('table','INSIDE','kitchen'))
fixed_edges.append(edge('sink','INSIDE','kitchen'))

edges_0 = []
edges_0.append(edge('cereal','INSIDE','cabinet'))
edges_0.append(edge('cereal','CLOSE','cabinet'))
edges_0.append(edge('toast','INSIDE','cabinet'))
edges_0.append(edge('toast','CLOSE','cabinet'))

edges_1a = []
edges_1a.append(edge('toast','INSIDE','cabinet'))
edges_1a.append(edge('toast','CLOSE','cabinet'))
edges_1a.append(edge('cereal','ON','table'))
edges_1a.append(edge('cereal','CLOSE','table'))

edges_1b = []
edges_1b.append(edge('cereal','INSIDE','cabinet'))
edges_1b.append(edge('cereal','CLOSE','cabinet'))
edges_1b.append(edge('toast','ON','table'))
edges_1b.append(edge('toast','CLOSE','table'))


changing_edges_a = [edges_0, edges_1a]
changing_edges_b = [edges_0, edges_1b]

graph_times = [time_internal(0,8,0,0), time_internal(20,8,0,0)]
data.append({"times":graph_times, "graphs":[{"nodes":nodes, "edges":edges+fixed_edges} for edges in changing_edges_a]})
graph_times = [time_internal(0,8,0,0), time_internal(20,8,0,0)]
data.append({"times":graph_times, "graphs":[{"nodes":nodes, "edges":edges+fixed_edges} for edges in changing_edges_b]})
graph_times = [time_internal(0,8,0,0), time_internal(20,8,0,0)]
data.append({"times":graph_times, "graphs":[{"nodes":nodes, "edges":edges+fixed_edges} for edges in changing_edges_a]})
graph_times = [time_internal(0,8,0,0), time_internal(20,8,0,0)]
data.append({"times":graph_times, "graphs":[{"nodes":nodes, "edges":edges+fixed_edges} for edges in changing_edges_b]})
graph_times = [time_internal(0,8,0,0), time_internal(20,8,0,0)]
data.append({"times":graph_times, "graphs":[{"nodes":nodes, "edges":edges+fixed_edges} for edges in changing_edges_a]})
graph_times = [time_internal(0,8,0,0), time_internal(20,8,0,0)]
data.append({"times":graph_times, "graphs":[{"nodes":nodes, "edges":edges+fixed_edges} for edges in changing_edges_b]})

with open(os.path.join(data_dir,'sample.json'),'w') as f:
    json.dump(data, f)

In [None]:
from reader import RoutinesDataset
from analyzers import *

data = RoutinesDataset(data_path=os.path.join(data_dir,'sample.json'), classes_path=os.path.join(data_dir,'classes.json'))

logging_analyzers = [MeanLoss(), EdgeTypeLoss(data.get_edge_classes()), SpecificEdgeLoss(3,1, data.get_edge_classes(), 'cereal to cabinet'), SpecificEdgeLoss(3,2, data.get_edge_classes(), 'cereal to table'), SpecificEdgeLoss(4,1, data.get_edge_classes(), 'toast to cabinet'), SpecificEdgeLoss(4,2, data.get_edge_classes(), 'toast to table')]

model = GraphTranslatorModule(num_nodes=data.n_nodes, 
                              node_feature_len=data.n_len, 
                              edge_feature_len=data.e_len, 
                              context_len=data.c_len, 
                              train_analyzer=MeanLoss(), 
                              logging_analyzers=logging_analyzers)


wandb_logger = WandbLogger()
trainer = Trainer(max_epochs=500, logger=wandb_logger, log_every_n_steps=1)
trainer.fit(model, data.get_train_loader())
trainer.test(model, data.get_test_loader())

In [None]:
print_a_result(2)