# Installing Dependencies and accessing Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install pyctm==0.0.13
!pip install pympler
!pip install -q memory_profiler

In [None]:
%load_ext memory_profiler

# Imports

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as ff
import torch.optim as optim
import torch.utils.data as data_utils
from torchvision.utils import make_grid
from torch.utils.data import DataLoader, random_split
from torch.utils.data import Dataset
from torch.optim import lr_scheduler
import matplotlib.pyplot as plt
from collections import OrderedDict
from torch.autograd import Variable
from torchvision import transforms
from torch.utils.tensorboard import SummaryWriter
from torch.nn import Parameter
import torchvision
from torch.nn import TransformerEncoder, TransformerEncoderLayer, TransformerDecoder, TransformerDecoderLayer
from torch.optim.lr_scheduler import StepLR

from torchvision import models
import pandas as pd
import json as json
import numpy as np
from IPython.display import clear_output
from tqdm import tqdm

import seaborn as sns
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score, confusion_matrix, roc_auc_score, roc_curve, auc

from pyctm.representation.sdr_idea_array import SDRIdeaArray
from pyctm.representation.sdr_idea_deserializer import SDRIdeaDeserializer
from pyctm.representation.sdr_idea_serializer import SDRIdeaSerializer
from pyctm.representation.dictionary import Dictionary
from pyctm.representation.idea import Idea
from pyctm.representation.array_dictionary import ArrayDictionary
from pyctm.representation.sdr_idea_array_serializer import SDRIdeaArraySerializer

from prettytable import PrettyTable

import math
import time
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0"





In [None]:
print("\nChecando GPU...")

print("Dispositivo cuda disponível? ", end='')
use_gpu = False
if torch.cuda.is_available() is True:
    device = torch.device("cuda:0")
    print("sim: " + str(device))
    from torch.cuda import get_device_name
    use_gpu = True
    print("GPU:" + str(get_device_name(0)))
else:
    device = torch.device("cpu")
    print("não. Usando CPU.")

## Utils Functions

In [None]:

import matplotlib.pyplot as plt
import time
from torchvision.utils import make_grid
import torch

def show_tensor_images(image_tensor, num_images=25, size=(1, 28, 28), nrow=4, show=True):
    '''
    Function for visualizing images: Given a tensor of images, number of images, and
    size per image, plots and prints the images in a uniform grid.
    '''
    plt.figure(figsize=(15, 15))

    # Convert tensor to numpy array
    image_array = image_tensor.detach().cpu().numpy()

    # Choose a colormap (e.g., 'viridis') to represent the values
    cmap = 'viridis'

    # Plot each image in the grid
    for i in range(num_images):
        plt.subplot(nrow, nrow, i + 1)
        plt.imshow(image_array[i][0], cmap=cmap, vmin=image_tensor.min(), vmax=image_tensor.max())  # Adiciona vmin e vmax
        plt.axis('off')

    # Show colorbar
    plt.colorbar()

    if show:
        plt.show()

# Exemplo de uso:
# Suponha que 'tensor_imagens' seja o seu tensor de imagens
# show_tensor_images(tensor_imagens)


In [None]:
def weights_init(model):
    for m in model.modules():
        if isinstance(m, (nn.Conv2d, nn.ConvTranspose2d)):
            nn.init.normal_(m.weight.data, 0.0, 0.02)
        if isinstance(m, nn.Linear):
            nn.init.xavier_uniform_(m.weight)
            nn.init.constant_(m.bias, 0)

In [None]:
def get_graph_connection():
    graph_connection = {
        "1.0": ["2.0", "16.0"],
        "2.0": ["1.0", "3.0", "15.0"],
        "3.0": ["2.0", "4.0", "14.0"],
        "4.0": ["3.0", "5.0"],
        "5.0": ["4.0", "6.0", "14.0"],
        "6.0": ["5.0", "7.0", "13.0"],
        "7.0": ["6.0", "8.0"],
        "8.0": ["7.0", "9.0", "12.0"],
        "9.0": ["8.0", "10.0"],
        "10.0": ["9.0", "11.0", "16.0"],
        "11.0": ["10.0", "12.0", "15.0"],
        "12.0": ["11.0", "13.0", "8.0"],
        "13.0": ["6.0", "12.0", "14.0"],
        "14.0": ["3.0", "5.0", "13.0", "15.0"],
        "15.0": ["2.0", "11.0", "14.0", "16.0"],
        "16.0": ["1.0", "10.0", "15.0"]
    }

    return graph_connection

In [None]:
def is_valid_plan(plan_steps):
    for i in range(len(plan_steps) - 1):
        current_step = plan_steps[i]
        next_step = plan_steps[i + 1]

        if current_step.name == "moveToNode" and next_step.name == "moveToNode":
            try:
                current_node = str(current_step.value)
                next_node = str(next_step.value)
                if next_node not in get_graph_connection()[current_node]:
                    return False
            except Exception as e:
                return False

    return True


def is_valid_plan_v2(action, initial_node, plan_steps, occupiedNodes=[]):
    graph_connections = get_graph_connection()  # Armazenar o grafo uma vez

    # Criar um conjunto com os nomes dos passos para verificar 'pick' e 'place'
    step_names = {step.name for step in plan_steps}

    if action == 'PICK':
        if 'pick' not in step_names:
            return False

        current_step = plan_steps[0]
        if current_step.name == "moveToNode":
            if float(current_step.value) != float(initial_node):
                current_node = str(current_step.value)
                initial_node = str(initial_node)

                if current_node not in graph_connections.get(initial_node, []):
                    return False

    elif action == 'PLACE':
        if 'place' not in step_names:
            return False

    for i in range(len(plan_steps) - 1):
        current_step = plan_steps[i]
        next_step = plan_steps[i + 1]

        if current_step.name == "moveToNode" and next_step.name == "moveToNode":
            current_node = str(current_step.value)
            next_node = str(next_step.value)

            # Verificar conexão e ocupação do nó em uma única operação
            if next_node not in graph_connections.get(current_node, []) or current_node in occupiedNodes:
                return False

    return True


In [None]:
def convert_and_print_idea(new_pattern):
    sdr_idea = new_pattern.clone().squeeze(0).detach().cpu().numpy()

    action_step_idea = sdr_idea_deserializer.deserialize(sdr_idea)

    if action_step_idea is not None:
        if action_step_idea.id is None:
            action_step_idea.id = 'Undefined'

        if action_step_idea.name is None:
            action_step_idea.name = 'Undefined'

        if action_step_idea.type is None:
            action_step_idea.type = 'Undefined'

        if action_step_idea.value is None:
            action_step_idea.value = 'Undefined'

        print(f'Id:{action_step_idea.id}, Name: {action_step_idea.name}, Type: {action_step_idea.type}, Value: {action_step_idea.value}')


In [None]:
def is_valid_idea(stepIdea):
    if stepIdea.name == "pick" or stepIdea.name == "place":
        if (isinstance(stepIdea.value, list) and
            len(stepIdea.value) == 2 and
            0 <= stepIdea.value[0] <= 187 and
            1 <= stepIdea.value[1] <= 4):
            return True
        else:
            return False

    elif stepIdea.name == "moveTo":
        if isinstance(stepIdea.value, float) and 0 <= stepIdea.value <= 187:
            return True
        else:
            return False

    elif stepIdea.name == "moveToNode":
        if isinstance(stepIdea.value, float) and 1 <= stepIdea.value <= 16:
            return True
        else:
            return False


    return True

def treat_idea(action_step_idea):

  if action_step_idea is not None:
        if action_step_idea.id is None:
            action_step_idea.id = 'Undefined'

        if action_step_idea.name is None:
            action_step_idea.name = 'Undefined'

        if action_step_idea.type is None:
            action_step_idea.type = 'Undefined'

        if action_step_idea.value is None:
            action_step_idea.value = 'Undefined'


  return action_step_idea

In [None]:
def json_to_idea(data):
    idea = Idea(data["id"], data["name"], data["value"], data["type"])
    if "l" in data:
        for child_data in data["l"]:
            child_idea = json_to_idea(child_data)
            idea.add(child_idea)
    return idea

# Model

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
from torch.nn import TransformerEncoderLayer, TransformerDecoderLayer
import copy

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_length):
        super(PositionalEncoding, self).__init__()

        pe = torch.zeros(max_seq_length, d_model)
        position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        self.register_buffer('pe', pe.unsqueeze(0))

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

class TransformerXLAttention(nn.Module):
    def __init__(self, d_model, n_head, dropout=0.1):
        super(TransformerXLAttention, self).__init__()
        self.d_model = d_model
        self.n_head = n_head
        self.dropout = dropout

        # Initialize query, key, and value linear transformations
        self.W_Q = nn.Linear(d_model, d_model)
        self.W_K = nn.Linear(d_model, d_model)
        self.W_V = nn.Linear(d_model, d_model)

        # Initialize output linear transformation
        self.W_O = nn.Linear(d_model, d_model)

        # Dropout layer
        self.dropout = nn.Dropout(dropout)

    def forward(self, Q, K, V, mask=None):
        # Linear transformations
        Q = self.W_Q(Q)
        K = self.W_K(K)
        V = self.W_V(V)

        # Split heads
        Q = self.split_heads(Q, self.n_head)
        K = self.split_heads(K, self.n_head)
        V = self.split_heads(V, self.n_head)

        # Scale dot product attention
        attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_model)

        # Apply mask if provided
        if mask is not None:
            attn_scores = attn_scores.masked_fill(mask == 0, -1e9)

        attn_probs = F.softmax(attn_scores, dim=-1)
        attn_probs = self.dropout(attn_probs)

        # Weighted sum of values
        attn_output = torch.matmul(attn_probs, V)

        # Combine heads
        attn_output = self.combine_heads(attn_output)

        # Linear transformation for output
        attn_output = self.W_O(attn_output)

        return attn_output

    def split_heads(self, x, n_head):
        batch_size, seq_len, d_model = x.size()
        head_dim = d_model // n_head
        x = x.view(batch_size, seq_len, n_head, head_dim)
        return x.transpose(1, 2)

    def combine_heads(self, x):
        batch_size, n_head, seq_len, head_dim = x.size()
        x = x.transpose(1, 2).contiguous()
        return x.view(batch_size, seq_len, n_head * head_dim)


class PositionWiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(PositionWiseFeedForward, self).__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))

class ResidualLayer(nn.Module):
    def __init__(self, sublayer, input_dim):
        super(ResidualLayer, self).__init__()
        self.sublayer = sublayer
        self.norm = nn.LayerNorm(input_dim)

    def forward(self, x):
        return x + self.sublayer(self.norm(x))

class ConvolutionalEmbeddingLayer1D(nn.Module):
    def __init__(self, input_dim, d_model):
        super(ConvolutionalEmbeddingLayer1D, self).__init__()
        self.conv1 = nn.Conv1d(input_dim, d_model, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(d_model, d_model, kernel_size=3, padding=1)
        self.conv3 = nn.Conv1d(d_model, d_model, kernel_size=3, padding=1)

    def forward(self, x):
        x = F.relu(self.conv1(x.permute(0, 2, 1)))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        return x.permute(0, 2, 1)

class EncoderLayer(nn.Module):
    def __init__(self, d_model, n_head, d_ff, dropout):
        super(EncoderLayer, self).__init__()
        self.self_attn = TransformerXLAttention(d_model, n_head, dropout)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask):
        attn_output = self.self_attn(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))
        return x

class DecoderLayer(nn.Module):
    def __init__(self, d_model, n_head, d_ff, dropout):
        super(DecoderLayer, self).__init__()
        self.self_attn = TransformerXLAttention(d_model, n_head, dropout)
        self.cross_attn = TransformerXLAttention(d_model, n_head, dropout)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_output, src_mask, tgt_mask):
        attn_output = self.self_attn(x, x, x, tgt_mask)
        x = self.norm1(x + self.dropout(attn_output))
        attn_output = self.cross_attn(x, enc_output, enc_output, src_mask)
        x = self.norm2(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm3(x + self.dropout(ff_output))
        return x


class PlanningTransformer(nn.Module):
    def __init__(self, vocabulary_size, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout, max_seq_len=20, device='cpu'):
        super(PlanningTransformer, self).__init__()

        self.vocabulary_size = vocabulary_size
        self.d_model = d_model
        self.max_seq_len = max_seq_len
        self.device = device

        self.encoder_embedding = nn.Embedding(vocabulary_size, d_model)
        self.decoder_embedding = nn.Embedding(vocabulary_size, d_model)

        # Positional Encoding
        self.positional_encoding = PositionalEncoding(d_model, max_seq_len)

        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, nhead, dim_feedforward, dropout) for _ in range(num_encoder_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, nhead, dim_feedforward, dropout) for _ in range(num_decoder_layers)])

        self.dropout = nn.Dropout(dropout)

        self.output_layer =nn.Linear(d_model, self.vocabulary_size)

    def generate_mask(self, src, tgt):
        src_mask = (src != 0).to(self.device).unsqueeze(1).unsqueeze(2)
        tgt_mask = (tgt != 0).to(self.device).unsqueeze(1).unsqueeze(3)
        seq_length = tgt.size(1)
        nopeak_mask = (1 - torch.triu(torch.ones(1, seq_length, seq_length), diagonal=1)).bool().to(self.device)
        tgt_mask = tgt_mask & nopeak_mask
        return src_mask, tgt_mask

    def generate_square_subsequent_mask(self, sz):
        mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
        mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask


    def create_positional_encoding(self, max_len, d_model):
        # Create a matrix of positional encodings
        positional_encoding = torch.zeros(max_len, d_model).to(self.device)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        positional_encoding[:, 0::2] = torch.sin(position * div_term)
        positional_encoding[:, 1::2] = torch.cos(position * div_term)
        positional_encoding = positional_encoding.unsqueeze(0)
        return nn.Parameter(positional_encoding, requires_grad=False)

    def forward(self, src, tgt):
        batch_size_tgt, seq_len_tgt = tgt.size()
        batch_size, seq_len = src.size()

        src = src.to(self.device)
        tgt = tgt.to(self.device)

        src_mask, tgt_mask = self.generate_mask(src, tgt)

        src_embedded = self.dropout(self.positional_encoding(self.encoder_embedding(src)))
        tgt_embedded = self.dropout(self.positional_encoding(self.decoder_embedding(tgt)))

        enc_output = src_embedded
        for enc_layer in self.encoder_layers:
            enc_output = enc_layer(enc_output, src_mask)

        dec_output = tgt_embedded
        for dec_layer in self.decoder_layers:
            dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)

        output = self.output_layer(dec_output)

        output = output.view(batch_size_tgt, seq_len_tgt, self.vocabulary_size)
        return output

# Data Treatments

## Dataset Class

In [None]:
import torch
import torch.nn.functional as F
import numpy as np

import torch
import numpy as np

class PlanDataset(torch.utils.data.Dataset):
    def __init__(self, dataset, transform=None):
        self.dataset = dataset
        self.transform = transform

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, index):
        input = torch.from_numpy(np.asarray(self.dataset["input"].values[index]))
        label = torch.from_numpy(np.asarray(self.dataset["output"].values[index]))

        input = input.long()
        label = label.long()

        return input, label



## Loading Data

In [None]:
!gdown --id 13kkDkMDlTOfFU4kNQrEenvMPcDYC64hr

In [None]:
!rm -rf dataPlanSDR/
!unzip "/content/drive/MyDrive/data/SDR/dataPlanSDR35.zip" -d .

In [None]:
df = pd.read_json("/content/dataPlanSDR/dataPlanSDR_0.json")

for i in range(1,2499):
  df = pd.concat([df, pd.read_json("/content/dataPlanSDR/dataPlanSDR_%s.json" % i)])
  print("Loaded File - dataPlanSDR_%s.json" % i)

### Spliting Data - Train and Validation

In [None]:
train_df, validation_df, test_df = \
              np.split(df.sample(frac=1, random_state=42),
                       [int(.9*len(df)), int(.95*len(df))])

train_size = len(train_df)

print("Train Size:" + str(len(train_df)))
print("Validation Size:" + str(len(validation_df)))
print("Test Size:" + str(len(test_df)))

# Testing

## Instanciate data loaders

In [None]:
train_plan_dataset = PlanDataset(train_df)
train_data_loader = DataLoader(train_plan_dataset, batch_size=batch_size, shuffle=True)

validation_plan_dataset = PlanDataset(validation_df)
validation_data_loader = DataLoader(validation_plan_dataset, batch_size=batch_size, shuffle=True)

test_plan_dataset = PlanDataset(test_df)
test_data_loader = DataLoader(test_plan_dataset, batch_size=1, shuffle=False)

## Instanciate model

In [None]:
gen = PlanningTransformer(vocabulary_size, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout, max_seq_len, device).to(device)

In [None]:
gen.load_state_dict(torch.load("/content/drive/MyDrive/data/models/generators/generator_P_SM_12_768_TRANS_E100_231024.pth", map_location=torch.device('cpu')))
gen = gen.to(device)

gen.eval()

## Deserializer

In [None]:
import torch
from pyctm.representation.array_value_validation import ArrayValueValidation
from pyctm.representation.idea import Idea
from pyctm.representation.idea_metadata_values import IdeaMetadataValues
from pyctm.representation.array_value_converter import ArrayValueConverter
from pyctm.representation.value_validation import ValueValidation

class SDRIdeaArrayDeserializer:
    def __init__(self, dictionary, device='cpu'):
        self.dictionary = dictionary
        if self.dictionary is not None:
            # Assuming dictionary words are already tensors on the correct device
            self.dictionary.words = {
                int(key): value for key, value in self.dictionary.words.items()
            }
        self.value_converter = ArrayValueConverter()
        self.start_word = 1
        self.end_word = 2
        self.device = device

        # Cache IdeaMetadataValues and metadata map
        self.idea_metadata_values = IdeaMetadataValues()
        self.metadata_map = self.idea_metadata_values.get_metadata_map()

    def deserialize(self, sdr_idea_array):
        if sdr_idea_array is None or sdr_idea_array.sdr is None:
            raise Exception("SDR Idea Array is null or empty.")

        idea_relationship = {}
        idea_list = []

        sdr = sdr_idea_array.sdr
        index = 0
        # Convert SDR to tensor and move to device
        sdr_tensor = sdr.clone().detach().to(self.device)
        # Create a mapping from SDR indices to dictionary words
        sdr_words = [self.dictionary.words[str(token.item())] for token in sdr_tensor]

        while index < len(sdr_tensor):
            token = sdr_tensor[index].item()
            if token == self.start_word:
                index += 1
                continue
            elif token == self.end_word:
                break

            idea = Idea()

            parent_id = None
            if idea_list:
                value, index = self.get_numeric_value(sdr_tensor, index)
                parent_id = self.get_value_according_type(value, "long")

            value, index = self.get_numeric_value(sdr_tensor, index)
            idea.id = self.get_value_according_type(value, "long")
            idea.name, index = self.get_string_value(sdr_tensor, index)

            idea_type_str, index = self.get_string_value(sdr_tensor, index)
            idea.type = int(idea_type_str)
            idea.value, index = self.get_value(sdr_tensor, index)

            if parent_id is not None:
                idea_relationship[idea.id] = parent_id

            idea_list.append(idea)

        # Create a mapping from idea IDs to ideas for quick lookup
        idea_id_map = {idea.id: idea for idea in idea_list}

        for idea_element in idea_list:
            # Get IDs of child ideas whose parent is the current idea
            child_ids = [
                child_id for child_id, parent_id in idea_relationship.items()
                if parent_id == idea_element.id
            ]
            # Add child ideas to the current idea's child_ideas list
            for child_id in child_ids:
                child_idea = idea_id_map.get(child_id)
                if child_idea:
                    idea_element.child_ideas.append(child_idea)

        return idea_list[0] if idea_list else None

    def get_numeric_value(self, sdr_tensor, index):
        digits = []
        for i in range(3):
            token = sdr_tensor[index + i].item()
            digit = int(self.dictionary.words[str(token)])
            digits.append(digit)
        index += 3

        value = digits[0] + digits[1] * 0.1 + digits[2] * 0.01

        signal_token = sdr_tensor[index].item()
        signal = self.dictionary.words[str(signal_token)]
        index += 1
        base_token = sdr_tensor[index].item()
        base = int(self.dictionary.words[str(base_token)])
        index += 1
        base_signal_token = sdr_tensor[index].item()
        base_signal = self.dictionary.words[str(base_signal_token)]
        index += 1

        exponent = base * (1 if base_signal == "+" else -1)
        value *= 10 ** exponent
        value = value if signal == "+" else -value
        value = round(value, 2)

        return value, index

    def get_local_string_value(self, index):
        value = self.dictionary.words[str(index)]
        return value

    def get_string_value(self, sdr_tensor, index):
        token = sdr_tensor[index].item()
        value = self.dictionary.words[str(token)]
        index += 1
        return value, index

    def get_value(self, sdr_tensor, index):
        metadata_token = sdr_tensor[index].item()
        metadata_value = int(self.dictionary.words[str(metadata_token)])
        index += 1

        length_value, index = self.get_numeric_value(sdr_tensor, index)
        length = self.get_value_according_type(length_value, "int")

        for clazz, metadata in self.metadata_map.items():
            if metadata == metadata_value:
                if ArrayValueValidation.is_array(clazz):
                    array_value, index = self.get_array_value(sdr_tensor, index, length, clazz)
                    return array_value, index
                elif ArrayValueValidation.is_primitive(clazz):
                    value, index = self.get_numeric_value(sdr_tensor, index)
                    return self.get_value_according_type(value, clazz), index
                elif ArrayValueValidation.is_string(clazz):
                    value, index = self.get_string_value(sdr_tensor, index)
                    return value, index
        return None, index

    def get_array_value(self, sdr_tensor, index, length, clazz):
        array = []
        for _ in range(length):
            if clazz in ["list_double", "list_float"]:
                value, index = self.get_numeric_value(sdr_tensor, index)
                array.append(self.get_value_according_type(value, "float"))
            elif clazz == "list_int":
                value, index = self.get_numeric_value(sdr_tensor, index)
                array.append(self.get_value_according_type(value, "int"))
            elif clazz == "list_short":
                value, index = self.get_numeric_value(sdr_tensor, index)
                array.append(self.get_value_according_type(value, "short"))
            elif clazz == "list_bool":
                value, index = self.get_string_value(sdr_tensor, index)
                array.append(value == "True")
            elif clazz == "list_long":
                value, index = self.get_numeric_value(sdr_tensor, index)
                array.append(self.get_value_according_type(value, "long"))
            elif clazz == "list_str":
                value, index = self.get_string_value(sdr_tensor, index)
                array.append(value)
        return array, index

    def get_value_according_type(self, value, clazz):
        if clazz == "int":
            return int(value)
        elif clazz == "float":
            return float(value)
        elif clazz == "short":
            return int(value)  # Python does not have a native 'short' type
        elif clazz == "long":
            return int(value)
        elif clazz == "double":
            return float(value)
        else:
            return value

    def get_metadata_type(self, metadata_value):
        idea_metadata_values = IdeaMetadataValues()
        metadata_map = idea_metadata_values.get_metadata_map()
        for clazz, metadata in metadata_map.items():
            if metadata == metadata_value:
                if ArrayValueValidation.is_array(clazz):
                    if "list_str" == clazz:
                        return "STRING_ARRAY"
                    else:
                        return "NUM_ARRAY"
                elif ArrayValueValidation.is_primitive(clazz):
                    return "NUM_VALUE"
                elif ArrayValueValidation.is_string(clazz):
                    return "STRING_VALUE"
        return None


In [None]:
file = open("/content/dataPlanSDR/dictionary.json")

object=json.load(file)

print(object)

dictionary = ArrayDictionary(**object)

sdr_idea_serializer = SDRIdeaArraySerializer(total_of_ideas=6, total_of_values=7, default_value=0, dictionary=dictionary)
sdr_idea_deserializer = SDRIdeaArrayDeserializer(sdr_idea_serializer.dictionary, device=device)

sdr_idea_deserializer.dictionary.words = {str(key): value for key, value in sdr_idea_deserializer.dictionary.words.items()}


## Situated Beam Search

In [None]:
import torch
import torch.nn.functional as F
import math

class SDRIdeaArrayPredictor:

    def __init__(self, sdr_idea_deserializer):
        self.sdr_idea_deserializer = sdr_idea_deserializer
        self.sdr_idea_deserializer.dictionary.words = {str(key): value for key, value in sdr_idea_deserializer.dictionary.words.items()}
        self.step_index = {
            "NUMBER": [6,7,8,9,10,11,12,13,14,15],
            "SIGNAL": [4,5],
            "STRING": [16,19,20,22,23,24,25,27,28,29,30,31,32],
            "TYPE": [17],
            "METADATA": [17, 18, 21, 26],
            "SPECIAL": [2],
            "END": [2,6,7,8,9,10,11,12,13,14,15]
        }
        self.steps_dict = {
            "PARENT_ID": ["END", "NUMBER", "NUMBER", "SIGNAL", "NUMBER", "SIGNAL"],
            "ID": ["NUMBER", "NUMBER", "NUMBER", "SIGNAL", "NUMBER", "SIGNAL"],
            "NAME": ["STRING"],
            "TYPE": ["TYPE"],
            "METADATA": ["METADATA"],
            "LENGTH": ["NUMBER", "NUMBER", "NUMBER", "SIGNAL", "NUMBER", "SIGNAL"],
            "NUM_VALUE": ["NUMBER", "NUMBER", "NUMBER", "SIGNAL", "NUMBER", "SIGNAL"],
            "STRING_VALUE": ["STRING"],
            "END": ["END"]
        }
        self.parent_states = ["PARENT_ID", "ID", "NAME", "TYPE", "METADATA", "LENGTH"]
        self.states = ["ID", "NAME", "TYPE", "METADATA", "LENGTH"]
        self.end_symbol = 2  # Will be set in beam_search

    def sample_gumbel(self, shape, device, eps=1e-20):
        """Sample from Gumbel(0, 1)"""
        U = torch.rand(shape, device=device)
        return -torch.log(-torch.log(U + eps) + eps)

    def gumbel_softmax(self, logits, temperature, scale):
        """Applies Gumbel-Softmax sampling to the logits."""
        gumbel_noise = self.sample_gumbel(logits.size(), logits.device)
        y = logits + gumbel_noise * 5 * math.tanh(scale)
        return F.softmax(y / temperature, dim=-1)

    def beam_search(self, model, src, start_symbol, end_symbol, max_len, beam_size, temperature, sdr_idea_deserializer, device):

        self.end_symbol = end_symbol  # Set the end symbol for use in get_allowed_tokens
        src = src.to(device)
        ys = torch.LongTensor([[start_symbol]]).type_as(src.data).to(device)
        initial_states = self.get_states(False)
        current_state = initial_states.pop(0)
        steps = self.get_steps(current_state)
        beam = [(ys, 0.0, current_state, initial_states, steps)]
        finished_beams = []

        model.to(device).eval()

        for _ in range(max_len):
            beam_candidates = []
            batch_size = len(beam)
            batch_src = src.repeat(batch_size, 1).to(device)
            batch_answers = []
            batch_current_states = []
            batch_states = []
            batch_steps = []
            original_beam_indices = []

            # Prepare batch inputs
            for idx, beam_item in enumerate(beam):
                answer, score, current_state, states, steps = beam_item
                if answer[0, -1].item() == end_symbol:
                    finished_beams.append(beam_item)

                    finished_beams.sort(key=lambda x: x[1])
                    if len(finished_beams) > beam_size:
                        finished_beams = finished_beams[:beam_size]

                    continue

                batch_answers.append(answer)
                batch_current_states.append(current_state)
                batch_states.append(states)
                batch_steps.append(steps)
                original_beam_indices.append(idx)

            if not batch_answers:
                break  # All beams are finished

            batch_answers_tensor = torch.cat(batch_answers, dim=0).to(device)
            batch_src = batch_src[:batch_answers_tensor.shape[0], :]

            # Forward pass through the model
            with torch.no_grad():
                logits = model(batch_src, batch_answers_tensor)[:, -1, :]  # Get logits for all items

            for idx in range(len(batch_answers)):
                answer = batch_answers[idx]
                score = beam[original_beam_indices[idx]][1]
                current_state = batch_current_states[idx]
                states = batch_states[idx]
                steps = batch_steps[idx]

                current_state, current_step, states_copy, steps_copy, allowed_tokens = self.get_allowed_tokens(
                    current_state, states.copy(), steps.copy()
                )

                allowed_tokens_tensor = torch.tensor(allowed_tokens, dtype=torch.long).to(device)

                # Get logits for allowed tokens only
                allowed_logits = logits[idx, allowed_tokens_tensor]

                scale = temperature

                # Apply Gumbel-Softmax to allowed_logits on GPU
                if current_state in ['ID', 'PARENT_ID', 'TYPE', 'METADATA', 'LENGTH', 'END']  or current_step in ['SIGNAL'] or temperature == 0:
                  allowed_probs = self.gumbel_softmax(allowed_logits, 1e-10, 0)
                else:
                  allowed_probs = self.gumbel_softmax(allowed_logits, temperature, scale)

                # Select top candidates
                top_limit = min(beam_size, len(allowed_tokens))
                top_probs, indices_in_allowed = allowed_probs.topk(top_limit)

                top_ix = allowed_tokens_tensor[indices_in_allowed]

                for i in range(top_limit):
                    prob = top_probs[i].item()
                    ix = top_ix[i].item()
                    next_answer = torch.cat([answer, torch.tensor([[ix]], device=device)], dim=1)
                    next_score = score - math.log(prob + 1e-8)  # Add epsilon to avoid log(0)

                    new_steps = steps_copy.copy()
                    new_states = states_copy.copy()
                    new_current_state = current_state

                    new_current_state = self.validate_and_extend_steps(new_current_state, new_states, new_steps, next_answer)

                    beam_candidates.append((next_answer, next_score, new_current_state, new_states, new_steps))

            if not beam_candidates:
                break

            # Sort and select top beams
            beam_candidates.sort(key=lambda x: x[1])  # Sort in ascending order of score
            beam = beam_candidates[:beam_size]

        if not finished_beams:
            finished_beams = beam

        finished_beams.sort(key=lambda x: x[1])  # Sort in ascending order of score
        ys, _, _, _, _ = finished_beams[0]  # Select the beam with the lowest score

        return ys

    def validate_and_extend_steps(self, current_state, states, steps, next_answer):
        # Ensure any tensor operations are on the GPU
        if len(steps) == 0 and current_state == "LENGTH":
            length_value = self.sdr_idea_deserializer.get_numeric_value(next_answer[0, -6:].to(next_answer.device), 0)[0]

            if length_value <= 0:
                length_value = 1
            else:
                if length_value > 10:
                    length_value = 10

            metadata_type = self.sdr_idea_deserializer.get_metadata_type(
                int(self.sdr_idea_deserializer.get_local_string_value(next_answer[0, -7].item()))
            )
            if metadata_type in ["STRING_ARRAY", "STRING_VALUE"]:
                steps.extend(int(length_value) * self.get_steps("STRING_VALUE"))
            elif metadata_type in ["NUM_ARRAY", "NUM_VALUE"]:
                steps.extend(int(length_value) * self.get_steps("NUM_VALUE"))

            current_state = "VALUE"

        return current_state

    def get_allowed_tokens(self, current_state, states, steps):
        current_step = None

        if len(steps) == 0:
            current_state = states.pop(0)

            if len(states) == 0:
                states = self.get_states(True)

            steps = self.get_steps(current_state)

        if current_state is None:
            current_state = states[0]

        current_step = steps.pop(0)
        allowed_tokens = self.get_step_index()[current_step]

        return current_state, current_step, states, steps, allowed_tokens

    def get_steps(self, state):
        return self.steps_dict[state].copy()

    def get_states(self, is_parent=False):
        return self.parent_states.copy() if is_parent else self.states.copy()

    def get_step_index(self):
        return self.step_index


## Metric Methods

In [None]:
import json
import os
import torch
import torch.nn.functional as F
from nltk.translate.bleu_score import sentence_bleu
from scipy.stats import entropy
from sklearn.metrics import jaccard_score
from tqdm import tqdm
import numpy as np
from pympler import asizeof
import gc
from memory_profiler import memory_usage

def print_variable_size(variable, name):
    size = asizeof.asizeof(variable)
    print(f"Tamanho da variável '{name}': {size / (1024 ** 2):.2f} MB")

def track_all_objects():
    all_objects = gc.get_objects()
    for obj in all_objects:
        print(f"Objeto: {type(obj)}, Tamanho: {asizeof.asizeof(obj)} bytes")


def test_with_deserializer(model, test_data_loader, device, temperature=1, beam_size=1, deserializer=None):
    %reload_ext memory_profiler

    gc.collect()

    desired_size = 374
    model.eval()
    loop = tqdm(test_data_loader)

    fully_plan_correct = 0
    correct_ideas_converted = 0
    diverse_plan_count = 0
    diverse_correct_plan_count = 0

    diversity_scores = []
    bleu_scores = []

    batch_metrics = []

    sdr_idea_array_predictor = SDRIdeaArrayPredictor(sdr_idea_deserializer=deserializer)

    total_samples = 0

    with torch.no_grad():

        for batch_idx, (input, label) in enumerate(loop):
            input = input.to(device)
            label = label.to(device)

            def run_beam_search():
                return sdr_idea_array_predictor.beam_search(
                    model=model, src=input, start_symbol=1, end_symbol=2, max_len=desired_size,
                    beam_size=beam_size,
                    sdr_idea_deserializer=deserializer, device=device,
                    temperature=temperature
                )

            result = run_beam_search()

            if result.size(1) > desired_size:
                result = result[:, :desired_size]
            elif result.size(1) < desired_size:
                zeros_needed = desired_size - result.size(1)
                zeros_tensor = torch.zeros((result.size(0), zeros_needed), dtype=result.dtype).to(device)
                result = torch.cat((result, zeros_tensor), dim=1)

            result = result.long()

            try:
                goal_numpy = input.view(-1)
                sdr_idea_array_goal = SDRIdeaArray(total_of_ideas=6, total_of_values=7, default_value=0)
                sdr_idea_array_goal.sdr = goal_numpy

                deserialized_goal = deserializer.deserialize(sdr_idea_array_goal)
                start_token_id_tensor = torch.tensor([1]).to(device)
                label_numpy = torch.cat([start_token_id_tensor, label.view(-1)], dim=0)
                sdr_idea_array_label = SDRIdeaArray(total_of_ideas=6, total_of_values=7, default_value=0)
                sdr_idea_array_label.sdr = label_numpy

                deserialized_label = deserializer.deserialize(sdr_idea_array_label)
                full_label = [deserialized_label] + deserialized_label.child_ideas

                result_numpy = torch.cat([start_token_id_tensor, result.view(-1)], dim=0)

                sdr_idea_array = SDRIdeaArray(total_of_ideas=6, total_of_values=7, default_value=0)
                sdr_idea_array.sdr = result_numpy

                deserialized_result = deserializer.deserialize(sdr_idea_array)

                full_plan = [deserialized_result] + deserialized_result.child_ideas

                verify_correct_idea = all(is_valid_idea(idea) for idea in full_plan)

                is_valid = False

                if verify_correct_idea:
                    correct_ideas_converted += 1
                    initial_node = next(filter(lambda x: x.name == 'initialNode', deserialized_goal.child_ideas), None)
                    goal_action = next(filter(lambda x: x.name == 'goalAction', deserialized_goal.child_ideas), None)
                    occupied_nodes = next(filter(lambda x: x.name == 'occupiedNodes', deserialized_goal.child_ideas), None)

                    if is_valid_plan_v2(action="PICK" if goal_action.value == 2 else 'PLACE',
                                        initial_node=initial_node.value if initial_node is not None else None,
                                        plan_steps=full_plan,
                                        occupiedNodes=occupied_nodes.value if occupied_nodes is not None else []):
                        fully_plan_correct += 1
                        is_valid = True

                # Verifica se o plano é diferente do label
                if torch.sum(torch.sum(result, dim=1) - torch.sum(label, dim=1) != 0).item() > 0:
                    diverse_plan_count += 1  # Contabiliza plano como diverso
                    if is_valid:
                        diverse_correct_plan_count += 1  # Contabiliza plano como diverso e correto

            except Exception as e:
                print(f"Deserialization error in batch {batch_idx + 1}: {e}")
                print(result)

            total_samples += label.size(0)

            # BLEU score calculation
            batch_bleu_scores = []
            for i in range(label.size(0)):
                reference_ids = label[i].cpu().numpy().tolist()
                hypothesis_ids = result[i].cpu().numpy().tolist()

                bleu_score = sentence_bleu([reference_ids], hypothesis_ids)
                bleu_scores.append(bleu_score)
                batch_bleu_scores.append(bleu_score)

            # Jaccard diversity calculation
            batch_diversity_scores = []
            for i in range(label.size(0)):
                jaccard_dist = jaccard_score(label[i].cpu().numpy(), result[i].cpu().numpy(), average='macro')
                diversity_scores.append(jaccard_dist)
                batch_diversity_scores.append(jaccard_dist)

            batch_correct_ideas_converted = correct_ideas_converted / total_samples if total_samples > 0 else 0
            batch_plan_correct = fully_plan_correct / total_samples if total_samples > 0 else 0
            batch_diverse_correct_plan = diverse_correct_plan_count / diverse_plan_count if diverse_plan_count > 0 else 0

            batch_metrics.append({
                "batch_idx": batch_idx + 1,
                "average_bleu_score": float(np.mean(bleu_scores)),
                "average_jaccard_score": float(np.mean(diversity_scores)),
                "percentage_correct_ideas_converted": float(batch_correct_ideas_converted),
                "percentage_fully_plan_correct": float(batch_plan_correct),
                "percentage_diversity_correct_plan": float(batch_diverse_correct_plan),
            })

            loop.set_description(
                f"Batch {batch_idx + 1}/{len(test_data_loader)} - "
                f"BLEU: {np.mean(bleu_scores) * 100:.2f}%, "
                f"Jaccard: {np.mean(diversity_scores):.4f}, "
                f"Ideas Correct: {batch_correct_ideas_converted * 100:.2f}%, "
                f"Plan Correct: {batch_plan_correct * 100:.2f}%, "
                f"Diverse Correct Plan: {batch_diverse_correct_plan * 100:.2f}%, "
                f"Total of Diverse Plans: {diverse_plan_count}"
            )

    percentage_diverse_correct_plan = (diverse_correct_plan_count / diverse_plan_count) * 100 if diverse_plan_count > 0 else 0
    final_metrics = {
        "percentage_correct_ideas_converted": float(correct_ideas_converted / total_samples * 100),
        "percentage_fully_plan_correct": float(fully_plan_correct / total_samples * 100),
        "percentage_diversity_correct_plan": float(percentage_diverse_correct_plan),
        "average_bleu_score": float(np.mean(bleu_scores)),
        "average_jaccard_diversity_score": float(np.mean(diversity_scores))
    }

    output_dir = "/content/drive/MyDrive/data/models/generators"
    os.makedirs(output_dir, exist_ok=True)

    with open(os.path.join(output_dir, f"batch_metrics_temperature_{str(temperature)}_beam_size_{str(beam_size)}_generator.json"), "w") as batch_file:
        json.dump(batch_metrics, batch_file, indent=4)

    with open(os.path.join(output_dir, f"final_metrics_temperature_{str(temperature)}_beam_size_{str(beam_size)}_generator.json"), "w") as final_file:
        json.dump(final_metrics, final_file, indent=4)

    print(f"Final Metrics:")
    print(f"Percentage of Fully Converted Sequences: {fully_plan_correct / total_samples * 100:.2f}%")
    print(f"Percentage of Correctly Converted Ideas: {correct_ideas_converted / total_samples * 100:.2f}%")
    print(f"Percentage of Correct Plans: {fully_plan_correct / total_samples * 100:.2f}%")
    print(f"Percentage of Diverse Correct Plans: {percentage_diverse_correct_plan:.2f}%")
    print(f"Average BLEU Score: {np.mean(bleu_scores) * 100:.2f}%")
    print(f"Average Jaccard Score (Diversity): {np.mean(diversity_scores):.4f}")


## Execution

In [None]:
test_with_deserializer(gen, test_data_loader, device, temperature=0.25, beam_size=1, deserializer=sdr_idea_deserializer)

In [None]:
test_with_deserializer(gen, test_data_loader, device, beam_size=1,  temperature=0.5, deserializer=sdr_idea_deserializer)

In [None]:
test_with_deserializer(gen, test_data_loader, device, beam_size=1,  temperature=0.625, deserializer=sdr_idea_deserializer)

In [None]:
test_with_deserializer(gen, test_data_loader, device, beam_size=1,  temperature=0.75, deserializer=sdr_idea_deserializer)

In [None]:
test_with_deserializer(gen, test_data_loader, device, beam_size=1,  temperature=1, deserializer=sdr_idea_deserializer)

In [None]:
test_with_deserializer(gen, test_data_loader, device, beam_size=2,  temperature=0.25, deserializer=sdr_idea_deserializer)

In [None]:
test_with_deserializer(gen, test_data_loader, device, beam_size=2,  temperature=0.5, deserializer=sdr_idea_deserializer)

In [None]:
test_with_deserializer(gen, test_data_loader, device, beam_size=2,  temperature=0.625, deserializer=sdr_idea_deserializer)

In [None]:
test_with_deserializer(gen, test_data_loader, device, beam_size=2,  temperature=0.75, deserializer=sdr_idea_deserializer)

In [None]:
test_with_deserializer(gen, test_data_loader, device, beam_size=2,  temperature=1, deserializer=sdr_idea_deserializer)

In [None]:
test_with_deserializer(gen, test_data_loader, device, beam_size=3,  temperature=0.25, deserializer=sdr_idea_deserializer)

In [None]:
test_with_deserializer(gen, test_data_loader, device, beam_size=3,  temperature=0.5, deserializer=sdr_idea_deserializer)

In [None]:
test_with_deserializer(gen, test_data_loader, device, beam_size=3,  temperature=0.625, deserializer=sdr_idea_deserializer)

In [None]:
test_with_deserializer(gen, test_data_loader, device, beam_size=3,  temperature=0.75, deserializer=sdr_idea_deserializer)

In [None]:
test_with_deserializer(gen, test_data_loader, device, beam_size=3,  temperature=1, deserializer=sdr_idea_deserializer)

In [None]:
test_with_deserializer(gen, test_data_loader, device, beam_size=4,  temperature=0.25, deserializer=sdr_idea_deserializer)

In [None]:
test_with_deserializer(gen, test_data_loader, device, beam_size=4,  temperature=0.5, deserializer=sdr_idea_deserializer)

In [None]:
test_with_deserializer(gen, test_data_loader, device, beam_size=4,  temperature=0.625, deserializer=sdr_idea_deserializer)

In [None]:
test_with_deserializer(gen, test_data_loader, device, beam_size=4,  temperature=0.75, deserializer=sdr_idea_deserializer)

In [None]:
test_with_deserializer(gen, test_data_loader, device, beam_size=4,  temperature=1, deserializer=sdr_idea_deserializer)