In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from collections import Counter
import math
import numpy as np
import re
import os
from torch.nn.utils.rnn import pad_sequence


os.environ['CUDA_LAUNCH_BLOCKING'] = '1'

torch.manual_seed(23)

<torch._C.Generator at 0x7e0aaa064a70>

In [3]:
def check_gpu():
    if torch.cuda.is_available():
        print("CUDA está disponible.")
        print(f"Hay {torch.cuda.device_count()} GPU(s) disponible(s).")
        for i in range(torch.cuda.device_count()):
            print(f"GPU {i}: {torch.cuda.get_device_name(i)}")
    else:
        print("CUDA no está disponible. No hay GPU accesible.")

check_gpu()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

CUDA está disponible.
Hay 1 GPU(s) disponible(s).
GPU 0: NVIDIA GeForce RTX 2060


In [4]:
MAX_SEQ_LEN = 128 # max num of words per phrase for translate

In [5]:
class PositionalEmbedding(nn.Module):
    def __init__(self, d_model, max_seq_len = MAX_SEQ_LEN):
        super().__init__()
        self.pos_embed_matrix = torch.zeros(max_seq_len, d_model, device=device)
        token_pos = torch.arange(0, max_seq_len, dtype = torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() 
                             * (-math.log(10000.0)/d_model))
        self.pos_embed_matrix[:, 0::2] = torch.sin(token_pos * div_term)
        self.pos_embed_matrix[:, 1::2] = torch.cos(token_pos * div_term)
        self.pos_embed_matrix = self.pos_embed_matrix.unsqueeze(0).transpose(0,1)
        
    def forward(self, x):
#         print(self.pos_embed_matrix.shape)
#         print(x.shape)
        return x + self.pos_embed_matrix[:x.size(0), :]

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model = 512, num_heads = 8):
        super().__init__()
        assert d_model % num_heads == 0, 'Embedding size not compatible with num heads'
        
        self.d_v = d_model // num_heads
        self.d_k = self.d_v
        self.num_heads = num_heads
        
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)
        
    def forward(self, Q, K, V, mask = None):
        batch_size = Q.size(0)
        '''
        Q, K, V -> [batch_size, seq_len, num_heads*d_k]
        after transpose Q, K, V -> [batch_size, num_heads, seq_len, d_k]
        '''
        Q = self.W_q(Q).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2 )
        K = self.W_k(K).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2 )
        V = self.W_v(V).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2 )
        
        weighted_values, attention = self.scale_dot_product(Q, K, V, mask)
        weighted_values = weighted_values.transpose(1, 2).contiguous().view(batch_size, -1, self.num_heads*self.d_k)
        weighted_values = self.W_o(weighted_values)
        
        return weighted_values, attention
        
        
    def scale_dot_product(self, Q, K, V, mask = None):
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        attention = F.softmax(scores, dim = -1)
        weighted_values = torch.matmul(attention, V)
        
        return weighted_values, attention
        

class PositionFeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        
    def forward(self, x):
        return self.linear2(F.relu(self.linear1(x)))
    
class EncoderSubLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout = 0.1):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.ffn = PositionFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.droupout1 = nn.Dropout(dropout)
        self.droupout2 = nn.Dropout(dropout)
    
    def forward(self, x, mask = None):
        attention_score, _ = self.self_attn(x, x, x, mask)
        x = x + self.droupout1(attention_score)
        x = self.norm1(x)
        x = x + self.droupout2(self.ffn(x))
        return self.norm2(x)

class Encoder(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, num_layers, dropout=0.1):
        super().__init__()
        self.layers = nn.ModuleList([EncoderSubLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.norm = nn.LayerNorm(d_model)
    def forward(self, x, mask=None):
        for layer in self.layers:
            x = layer(x, mask)
        return self.norm(x)

class DecoderSubLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.cross_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.dropout3 = nn.Dropout(dropout)
        
    def forward(self, x, encoder_output, target_mask=None, encoder_mask=None):
        attention_score, _ = self.self_attn(x, x, x, target_mask)
        x = x + self.dropout1(attention_score)
        x = self.norm1(x)
        
        encoder_attn, _ = self.cross_attn(x, encoder_output, encoder_output, encoder_mask)
        x = x + self.dropout2(encoder_attn)
        x = self.norm2(x)
        
        ff_output = self.feed_forward(x)
        x = x + self.dropout3(ff_output)
        return self.norm3(x)
        
class Decoder(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, num_layers, dropout=0.1):
        super().__init__()
        self.layers = nn.ModuleList([DecoderSubLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.norm = nn.LayerNorm(d_model)
        
    def forward(self, x, encoder_output, target_mask, encoder_mask):
        for layer in self.layers:
            x = layer(x, encoder_output, target_mask, encoder_mask)
        return self.norm(x)

In [6]:
class Transformer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, num_layers,
                 input_vocab_size, max_len=MAX_SEQ_LEN, dropout=0.1):
        super().__init__()
        self.encoder_embedding = nn.Embedding(input_vocab_size, d_model)
        self.pos_embedding = PositionalEmbedding(d_model, max_len)
        self.encoder = Encoder(d_model, num_heads, d_ff, num_layers, dropout)

        # Embeddings
        self._cached_source_embeddings = None
        
    def forward(self, source):
        # Encoder mask
        source_mask = self.mask(source)
        # Embedding and positional Encoding
        source = self.encoder_embedding(source) * math.sqrt(self.encoder_embedding.embedding_dim)
        self._cached_source_embeddings = source
        source = self.pos_embedding(source)
        # Encoder
        encoder_output = self.encoder(source, source_mask)
        
        return encoder_output
        
    def get_embeddings(self):
        if self._cached_source_embeddings is None:
            raise ValueError("Embeddings not computed yet. Call forward() first.")
        return self._cached_source_embeddings
    
    def mask(self, source):
        source_mask = (source != 0).unsqueeze(1).unsqueeze(2)
        return source_mask

        

## Simple test

In [7]:
seq_len_source = 10
seq_len_target = 10
batch_size = 2
input_vocab_size = 50
target_vocab_size = 50

source = torch.randint(1, input_vocab_size, (batch_size, seq_len_source))

d_model = 512
num_heads = 8
d_ff = 2048
num_layers = 6

model = Transformer(d_model, num_heads, d_ff, num_layers,
                   input_vocab_size, max_len=MAX_SEQ_LEN, dropout=0.1)

model = model.to(device)
source = source.to(device)

output = model(source)
#Expected output shape -> [batch, seq_len_target, target_vocab_size] i.e [2, 10, 50]
print(f'output.shape {output.shape}')
print(source)

output.shape torch.Size([2, 10, 512])
tensor([[39, 11,  6, 31, 11, 42, 49, 35, 10, 20],
        [24, 12, 35, 20, 30,  6, 23, 39, 48,  5]], device='cuda:0')


## DATA PREPROCESSING: Creation of scripts list with all project scripts

In [8]:
import pandas as pd
import os
from zipfile import ZipFile, BadZipFile
import json

metrics = pd.read_csv('metrics_attr.csv')

# Filter by Action Genre
metrics_action = metrics[(metrics['Main Genre'] == 'Action')]
metrics_non_action = metrics[(metrics['Main Genre'] != 'Action')]


# Check the filenames for collect
filenames_action = list(metrics_action['Name'])
filenames_non_action = list(metrics_non_action['Name'])
# Create txt for made the shell script
with open('filenames_action_global.txt', 'w') as names:
     for name in filenames_action:
        names.write(name + '\n')


#for project in filenames_action:
 #   sb3_path = f'./projects_sb3/{project}'
  #  if os.path.isfile(sb3_path):
   #     shutil.copy(sb3_path, './sb3_action_global')
    #    print(f'The project {project} has been success copy')
    #else:
     #   print(f'The project {project} doesnt exists')

In [15]:
import math

def load_json_project(path_projectsb3):
    try:
        zip_file = ZipFile(path_projectsb3, "r")
        json_project = json.loads(zip_file.open("project.json").read())
        return json_project
    except BadZipFile:
        print('Bad zipfile')

def process(json_project):
    seq_num = 0
    dict_total_blocks = {}
    
    list_total_blocks = []

    for key, list_info in json_project.items():
        if key == "targets":
            for dict_target in list_info:
                target_name = dict_target.get('name')
                if target_name:
                    dict_total_blocks[target_name] = {}
                    dict_total_blocks[target_name][f'Seq_{seq_num}'] = []
                blocks = dict_target.get('blocks')
                if blocks:
                    for block_id, block_info in blocks.items():
                        if isinstance(block_info, dict):
                            topLevel = block_info.get('topLevel')
                            if topLevel:
                                seq_num += 1
                                dict_total_blocks[target_name][f'Seq_{seq_num}'] = []
                            opcode = block_info.get('opcode')
                            if opcode:
                                
                                dict_total_blocks[target_name][f'Seq_{seq_num}'].append(opcode)
                            list_total_blocks.append(block_info)
                            #dict_total_blocks[block_id] = block_info
    return dict_total_blocks


# ----------------- SCRIPTS GLOBAL ----------------------------------
dict_total_blocks = {}
scripts_global = []
list_total_blocks = []
print(len(filenames_action))
for project in filenames_action:
    sb3_path = os.path.join('.','sb3_action_global',project)
    if os.path.isfile(sb3_path):
        #print(project)
        json_project = load_json_project(sb3_path)
        dict_total_blocks = process(json_project)

        for sprite, seqs in dict_total_blocks.items():
            for idx, block_list in seqs.items():
                if block_list != []:
                    scripts_global.append(" ".join(block_list))

# ----------------- SCRIPTS TARGET ----------------------------------

scripts_train1 = []
scripts_train2 = []
for idx, project in enumerate(filenames_action):
    sb3_path = os.path.join('.','sb3_action_global',project)
    if os.path.isfile(sb3_path):
        #print(project)
        json_project = load_json_project(sb3_path)
        dict_total_blocks = process(json_project)

        for sprite, seqs in dict_total_blocks.items():
            for block_list in seqs.values():
                if block_list != []:
                    if idx < math.floor(len(filenames_action)/2):
                        scripts_train1.append(" ".join(block_list))
                    else:
                        scripts_train2.append(" ".join(block_list))


# ----------------- SCRIPTS NEGTATIVE  ----------------------------------
scripts_train3 = []
for project in filenames_non_action:
    sb3_path = os.path.join('.','sb3_non_action',project)
    if os.path.isfile(sb3_path):
        #print(project)
        json_project = load_json_project(sb3_path)
        dict_total_blocks = process(json_project)

        for sprite, seqs in dict_total_blocks.items():
            for idx, block_list in seqs.items():
                if block_list != []:
                    scripts_train3.append(" ".join(block_list))

# -------------------------------------------------------------------------


# scripts_train1 -> src
# scritps_train2 -> trg
# scritps_train3 -> negative
scripts_global = ['<sos> ' + script + ' <eos>' for script in scripts_global]
scripts_train1 = ['<sos> ' + script + ' <eos>' for script in scripts_train1]
scripts_train2 = ['<sos> ' + script + ' <eos>' for script in scripts_train2]
scripts_train3 = ['<sos> ' + script + ' <eos>' for script in scripts_train3]

print(scripts_train3)

312
['<sos> event_whenkeypressed looks_switchbackdropto looks_backdrops <eos>', '<sos> event_whenbroadcastreceived looks_switchbackdropto looks_backdrops <eos>', '<sos> event_whenbroadcastreceived looks_switchbackdropto looks_backdrops <eos>', '<sos> event_whenflagclicked motion_gotoxy <eos>', '<sos> event_whenflagclicked control_forever control_if sensing_keypressed sensing_keyoptions motion_changexby control_if sensing_keypressed sensing_keyoptions motion_changexby <eos>', '<sos> event_whenbackdropswitchesto looks_show <eos>', '<sos> event_whenbackdropswitchesto looks_hide <eos>', '<sos> event_whenbackdropswitchesto looks_hide <eos>', '<sos> event_whenflagclicked looks_switchbackdropto looks_backdrops looks_hide <eos>', '<sos> event_whenbackdropswitchesto looks_hide <eos>', '<sos> event_whenflagclicked sound_play sound_sounds_menu <eos>', '<sos> event_whenflagclicked control_forever control_if operator_lt data_setvariableto control_if operator_lt data_setvariableto <eos>', '<sos> eve

### CREATION OF DATASET FOR **TRAIN**

In [10]:
def build_vocab(scripts):
    blocks = [block for script in scripts for block in script.split() ]
    block_count = Counter(blocks)
    sorted_block_counts = sorted(block_count.items(), key=lambda x:x[1], reverse=True)
    block2idx = {block: idx for idx, (block, _) in enumerate(sorted_block_counts, 2)}
    block2idx['<pad>'] = 0
    block2idx['<unk>'] = 1
    idx2block = {idx: block for block, idx in block2idx.items()}
    return block2idx, idx2block
    
src_block2idx, src_idx2block = build_vocab(scripts_train1)
src_vocab_size = len(src_block2idx)
trg_block2idx, trg_idx2block = build_vocab(scripts_train2)
trg_vocab_size = len(trg_block2idx)
neg_block2idx, neg_idx2block = build_vocab(scripts_train1)
neg_vocab_size = len(neg_block2idx)
#print(trg_vocab_size, spa_vocab_size)

class TrgSrcDataset(Dataset):
    def __init__(self, src_sentences, trg_sentences, src_block2idx, trg_word2idx):
        self.trg_sentences = trg_sentences
        self.src_sentences = src_sentences
        self.trg_block2idx = trg_word2idx
        self.src_block2idx = src_block2idx
        
    def __len__(self):
        return len(self.trg_sentences)
    
    def __getitem__(self, idx):
        trg_sentence = self.trg_sentences[idx]
        src_sentence = self.src_sentences[idx]
        # return tokens idxs
        trg_idxs = [self.trg_block2idx.get(block, self.trg_block2idx['<unk>']) for block in trg_sentence.split()]
        src_idxs = [self.src_block2idx.get(block, self.src_block2idx['<unk>']) for block in src_sentence.split()]

        
        return torch.tensor(trg_idxs), torch.tensor(src_idxs)

class TripletDataset(Dataset):
    def __init__(self, src_sentences, trg_sentences, neg_sentences, src_block2idx, trg_block2idx, neg_block2idx):
        self.src_sentences = src_sentences  # Anchor (source)
        self.trg_sentences = trg_sentences  # Positive (target)
        self.neg_sentences = neg_sentences  # Negative
        self.src_block2idx = src_block2idx
        self.trg_block2idx = trg_block2idx
        self.neg_block2idx = neg_block2idx
        
    def __len__(self):
        return len(self.trg_sentences)  # Usamos la longitud del conjunto positivo
    
    def __getitem__(self, idx):
        # Obtener las oraciones (anchor, positive, negative)
        src_sentence = self.src_sentences[idx]
        trg_sentence = self.trg_sentences[idx]
        neg_sentence = self.neg_sentences[idx]

        # Convertir cada oración en índices
        src_idxs = [self.src_block2idx.get(block, self.src_block2idx['<unk>']) for block in src_sentence.split()]
        trg_idxs = [self.trg_block2idx.get(block, self.trg_block2idx['<unk>']) for block in trg_sentence.split()]
        neg_idxs = [self.neg_block2idx.get(block, self.neg_block2idx['<unk>']) for block in neg_sentence.split()]

        # Retornar los tensores (anchor, positive, negative)
        return torch.tensor(src_idxs), torch.tensor(trg_idxs), torch.tensor(neg_idxs)

### CREATION OF DATASET FOR **TEST**

## TRAIN FUNCTIONS

In [11]:
def collate_fn_old(batch):
    trg_batch, src_batch, neg_batch = zip(*batch)
    trg_batch = [seq[:MAX_SEQ_LEN].clone().detach() for seq in trg_batch]
    src_batch = [seq[:MAX_SEQ_LEN].clone().detach() for seq in src_batch]
    neg_batch = [seq[:MAX_SEQ_LEN].clone().detach() for seq in neg_batch]
    trg_batch = torch.nn.utils.rnn.pad_sequence(trg_batch, batch_first=True, padding_value=0)
    src_batch = torch.nn.utils.rnn.pad_sequence(src_batch, batch_first=True, padding_value=0)
    neg_batch = torch.nn.utils.rnn.pad_sequence(neg_batch, batch_first=True, padding_value=0)
    return src_batch, trg_batch, neg_batch




In [1]:
def train(model, dataloader, loss_function, optimiser, epochs):
    model.train()
    final_anchor_embeddings = []
    final_positive_embeddings = []
    final_negative_embeddings = []
    
    for epoch in range(epochs):
        total_loss = 0
        for i, (anchor_batch, positive_batch, negative_batch) in enumerate(dataloader):
            
            anchor_batch = anchor_batch.to(device)
            positive_batch = positive_batch.to(device)
            negative_batch = negative_batch.to(device)
            
            # Zero grads
            optimiser.zero_grad()

            # Forward para anchor, positive y negative
            anchor_embeddings = model(anchor_batch)
            positive_embeddings = model(positive_batch)
            negative_embeddings = model(negative_batch)
            
            # Almacenar los embeddings solo en el último epoch
            if epoch == epochs - 1:
                final_anchor_embeddings.append(anchor_embeddings.cpu().detach())
                final_positive_embeddings.append(positive_embeddings.cpu().detach())
                final_negative_embeddings.append(negative_embeddings.cpu().detach())


            

            # Calcular la pérdida de Triplet
            loss = loss_function(anchor_embeddings, positive_embeddings, negative_embeddings)

            # Backpropagation y actualización de parámetros
            loss.backward()
            optimiser.step()

            total_loss += loss.item()
            
        avg_loss = total_loss / len(dataloader)
        print(f'Epoch: {epoch + 1}/{epochs}, Loss: {avg_loss:.4f}')
    
    # Concatenar los embeddings del último epoch
    final_anchor_embeddings = torch.cat(final_anchor_embeddings, dim=0)
    final_positive_embeddings = torch.cat(final_positive_embeddings, dim=0)
    final_negative_embeddings = torch.cat(final_negative_embeddings, dim=0)

    return final_anchor_embeddings, final_positive_embeddings, final_negative_embeddings


def evaluate(model, dataloader):
    model.eval()  # Configura el modelo en modo de evaluación
    with torch.no_grad():
        all_anchor_embeddings = []
        all_positive_embeddings = []
        all_negative_embeddings = []
        
        # Recopila los embeddings para todos los lotes
        for anchor_batch, positive_batch, negative_batch in dataloader:
            anchor_batch = anchor_batch.to(device)
            positive_batch = positive_batch.to(device)
            negative_batch = negative_batch.to(device)
            
            # Obtener embeddings
            anchor_embeddings = model.forward(anchor_batch)
            positive_embeddings = model.forward(positive_batch)
            negative_embeddings = model.forward(negative_batch)
            
            # Añadir a las listas
            all_anchor_embeddings.append(anchor_embeddings)
            all_positive_embeddings.append(positive_embeddings)
            all_negative_embeddings.append(negative_embeddings)
        
        # Concatenar todos los embeddings
        all_anchor_embeddings = torch.cat(all_anchor_embeddings, dim=0)
        all_positive_embeddings = torch.cat(all_positive_embeddings, dim=0)
        all_negative_embeddings = torch.cat(all_negative_embeddings, dim=0)
    return anchor_embeddings, positive_embeddings, negative_embeddings

## EXEC TRAIN

In [2]:
BATCH_SIZE = 16
dataset = TripletDataset(scripts_train1, scripts_train2, scripts_train3, src_block2idx, trg_block2idx, neg_block2idx)
print(dataset.src_sentences[0])

dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)

for i, (anchor_batch, positive_batch, negative_batch) in enumerate(dataloader):
    print(anchor_batch.shape)
    
print(anchor_batch[0])

NameError: name 'TripletDataset' is not defined

In [3]:
# ------ EMBEDDINGS ------------
src_embeddings = None
trg_embeddings = None
# --------------------------------

BATCH_SIZE = 16
#dataset = TrgSrcDataset(scripts_train1, scripts_train2, src_block2idx, trg_block2idx) <- WITHOUT TRIPLET
#dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)

dataset = TripletDataset(scripts_train1, scripts_train2, scripts_train3, src_block2idx, trg_block2idx, neg_block2idx)
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
model = Transformer(d_model=512, num_heads=8, d_ff=2048, num_layers=6,
                    input_vocab_size=src_vocab_size,
                    max_len=MAX_SEQ_LEN, dropout=0.1)
model = model.to(device)
loss_function = triplet_loss = nn.TripletMarginLoss(margin=1.0)
optimiser = optim.Adam(model.parameters(), lr=0.0001)
anchor_embeddings, positive_embeddings, negative_embeddings = train(model, dataloader, loss_function, optimiser, epochs = 20)

NameError: name 'TripletDataset' is not defined

In [16]:
# Save FULL model trained (arch and weights)
#torch.save(model, 'action_global_scratch_triplet.pth')

## EVALUATION **WITH SINGLE PROJECT**

In [None]:
# Import FULL model
model = torch.load('action_global_scratch2.pth')

# Print embeddings
print("--------** TRAINING EMBEDDINGS **----------")
train_src_embeddings, train_trg_embeddings = model.get_embeddings()
print("Source embeddings:", train_src_embeddings.shape)
print("Target embeddings:", train_trg_embeddings.shape)

In [25]:
import os
sb3_project = os.path.join("./Beat the Robot.sb3")

dict_total_blocks = {}
scripts_test = []
list_total_blocks = []

if os.path.isfile(sb3_project):
    print(project)
    json_project = load_json_project(sb3_project)
    dict_total_blocks = process(json_project)

    for sprite, seqs in dict_total_blocks.items():
        for idx, block_list in seqs.items():
            if block_list != []:
                scripts_test.append(" ".join(block_list))

#print(dict_total_blocks)
print(len(scripts_test))
for script in scripts_test:
    script = '<sos> ' + script + ' <eos>'
    print(script + '\n')

welknkmCX 'lkSD ow EVIw oP iwb'lkNSFPVON.sb3
24
<sos> event_whenflagclicked looks_switchbackdropto looks_backdrops <eos>

<sos> event_whenflagclicked sound_play sound_sounds_menu <eos>

<sos> event_whenbackdropswitchesto sound_stopallsounds <eos>

<sos> event_whenbackdropswitchesto <eos>

<sos> event_whenkeypressed motion_changeyby <eos>

<sos> event_whenkeypressed motion_changeyby <eos>

<sos> event_whenkeypressed motion_movesteps <eos>

<sos> event_whenkeypressed motion_changexby <eos>

<sos> event_whenbackdropswitchesto motion_gotoxy operator_random <eos>

<sos> event_whenflagclicked control_forever control_wait_until sensing_touchingobject sensing_touchingobjectmenu motion_gotoxy <eos>

<sos> event_whenflagclicked looks_show motion_gotoxy control_forever control_wait_until sensing_touchingobject sensing_touchingobjectmenu looks_switchbackdropto looks_backdrops looks_hide <eos>

<sos> event_whenflagclicked control_forever data_setvariableto sensing_timer <eos>

<sos> event_whenbackd

In [35]:
dummy_target = scripts_test


trg_block2idx_test, trg_idx2block_test = build_vocab(scripts_test)
trg_vocab_size_test = len(trg_block2idx_test)
src_block2idx_test, src_idx2block_test = build_vocab(dummy_target)
src_vocab_size_test = len(src_block2idx_test)

In [36]:
# ---- EVALUATION EMBEDDINGS ------
src_embeddings_test = None
trg_embeddings_test = None
# --------------------------------
BATCH_SIZE = 16
loss_function = nn.CrossEntropyLoss(ignore_index=0)
optimiser = optim.Adam(model.parameters(), lr=0.0001)

model.eval()

dataset_test = TrgSrcDataset(scripts_test, dummy_target, src_block2idx_test, trg_block2idx_test)
dataloader_test = DataLoader(dataset_test, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
src_embeddings_test, trg_embeddings_test = evaluate(model, dataloader_test, loss_function)

Evaluation Completed


In [39]:
print("--------** TRAINING EMBEDDINGS **----------")
#train_src_embeddings, train_trg_embeddings = model.get_embeddings()
train_src_embeddings, train_trg_embeddings = train_src_embeddings.cpu().detach().numpy(), train_trg_embeddings.cpu().detach().numpy()
print("Source embeddings:", train_src_embeddings.shape)
print("Target embeddings:", train_trg_embeddings.shape)

print("--------** EVALUATION EMBEDDINGS **----------")
src_embeddings_test, trg_embeddings_test = src_embeddings_test.cpu().detach().numpy(), trg_embeddings_test.cpu().detach().numpy()
print("Source embeddings:", src_embeddings_test.shape)
print("Target embeddings:", trg_embeddings_test.shape)

--------** TRAINING EMBEDDINGS **----------
Source embeddings: (11, 12, 512)
Target embeddings: (11, 19, 512)
--------** EVALUATION EMBEDDINGS **----------
Source embeddings: (8, 6, 512)
Target embeddings: (8, 6, 512)


In [40]:
# CHANGE TO BIDIMENSIONAL
# train embeddings
train_src_embeddings_shape, train_trg_embeddings_shape = train_src_embeddings.shape, train_trg_embeddings.shape
train_src_new_dim = train_src_embeddings_shape[0] * train_src_embeddings_shape[1]
train_trg_new_dim = train_trg_embeddings_shape[0] * train_trg_embeddings_shape[1]
train_src_embeddings, train_trg_embeddings = train_src_embeddings.reshape(train_src_new_dim, 512), train_trg_embeddings.reshape(train_trg_new_dim, 512)

# evaluation embeddings
src_embeddings_test_shape, trg_embeddings_test_shape = src_embeddings_test.shape, trg_embeddings_test.shape
src_embeddings_test_new_dim = src_embeddings_test_shape[0] * src_embeddings_test_shape[1]
trg_embeddings_test_new_dim = trg_embeddings_test_shape[0] * trg_embeddings_test_shape[1]
src_embeddings_test, trg_embeddings_test = src_embeddings_test.reshape(src_embeddings_test_new_dim, 512), trg_embeddings_test.reshape(trg_embeddings_test_new_dim, 512)



In [41]:
# PRINT RESULTS
print(train_src_embeddings.shape)
print(train_trg_embeddings.shape)

print(src_embeddings_test.shape)
print(trg_embeddings_test.shape)

(132, 512)
(209, 512)
(48, 512)
(48, 512)


In [42]:
from sklearn.metrics.pairwise import cosine_similarity, euclidean_distances

# Comparar embeddings de evaluación con los embeddings de entrenamiento
cos_sim = cosine_similarity(src_embeddings_test, train_src_embeddings)
euclidean_dist = euclidean_distances(src_embeddings_test, train_src_embeddings)

# Analizar los resultados
print(f'Similitud de coseno:\n {cos_sim.mean()}')  # Media de la similitud para una visión general
print(f'Distancia euclidiana:\n {euclidean_dist.mean()}')  # Media de la distancia para una visión general

Similitud de coseno:
 0.32013383507728577
Distancia euclidiana:
 490.69366455078125


## EVALUATION TESTS

In [1]:
for batch in dataloader:
    print([item.shape for item in batch])

NameError: name 'dataloader' is not defined