In [1]:
%env PYTORCH_ENABLE_MPS_FALLBACK = 1

env: PYTORCH_ENABLE_MPS_FALLBACK=1


In [2]:
import torch
import re
import os
import torch.nn as nn
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader

In [3]:
data = torch.load("data/bhive_hsw.data")
data = [data_point for data_point in data if data_point[3] != None]

In [4]:
def transform_xml(block: str):
    block = block.removeprefix("<block>").removesuffix("</block>")
    instructions = [
        instruction.removesuffix("</instr>")
        for instruction in block.split("<instr>")
        if instruction != ""
    ]
    instructions = [
        [code for code in re.split(r"[<>]", instruction) if code != ""]
        for instruction in instructions
    ]
    return instructions

In [5]:
y = [data_point[1] for data_point in data]
X = [transform_xml(data_point[3]) for data_point in data]
valid_indices = [
    i
    for i, block_data in enumerate(X)
    if block_data and any(instr for instr in block_data)
]
X = [X[i] for i in valid_indices]
y = [torch.tensor([y[i]], dtype=torch.float32) for i in valid_indices]

X = X[:10000]
y = y[:10000]

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

In [8]:
instruction_set = set(
    [word for data_point in X for instruction in data_point for word in instruction]
)
vocab_map = {"<PAD>": 0, "<UNK>": 1}
vocab_counter = 2
for word in instruction_set:
    if word not in vocab_map:
        vocab_map[word] = vocab_counter
        vocab_counter += 1
VOCAB_SIZE = len(vocab_map)
PADDING_IDX = vocab_map["<PAD>"]


def tokens_to_ids_func(tokens_list, local_vocab_map):
    return torch.tensor(
        [local_vocab_map.get(token, local_vocab_map["<UNK>"]) for token in tokens_list],
        dtype=torch.long,
    )

In [None]:
# for i in range(len(X[10])):
#     print(tokens_to_ids_func(X[10][i], local_vocab_map=vocab_map))

tensor([ 25,  81,  58,  19, 180, 257,  23, 208,  24, 180, 102,  23, 106])
tensor([ 25, 305,  58,  19, 180,  99, 225, 100, 107, 288, 115,  62, 169,  23,
        208,  24, 180, 257,  23, 106])
tensor([ 25, 305,  58,  19, 180,  99, 225, 100, 107, 288, 115,  62, 169,  23,
        208,  24, 180, 187,  23, 106])
tensor([ 25, 305,  58,  19, 180,  99, 225, 100, 107, 288, 115,  62, 169,  23,
        208,  24, 180, 268,  23, 106])
tensor([ 25, 305,  58,  19, 180,  99, 225, 100, 107, 288, 115,  62, 169,  23,
        208,  24, 180, 245,  23, 106])
tensor([ 25, 148,  58,  19, 180,  99, 225, 187, 107,  90, 121, 320, 169,  23,
        208,  24, 180, 278,  23, 106])
tensor([ 25,  81,  58,  19, 180, 102,  23, 208,  24, 180, 257,  23, 106])
tensor([ 25, 108,  58,  19, 180, 278,  23, 180, 121,  23, 208,  24, 106])


In [15]:
class InstructionDataset(Dataset):
    def __init__(self, features, labels):
        self.features = features
        self.labels = labels

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]


train_dataset = InstructionDataset(X_train, y_train)
test_dataset = InstructionDataset(X_test, y_test)

In [None]:
class Ithemal(nn.Module):
    def __init__(self, vocab_size, embedding_size, hidden_size, padding_idx_val=0):
        super().__init__()
        self.vocab_size = vocab_size
        self.embedding_size = embedding_size
        self.hidden_size = hidden_size
        self.padding_idx = padding_idx_val

        self.embedding = nn.Embedding(
            self.vocab_size, self.embedding_size, padding_idx=self.padding_idx
        )
        self.token_rnn = nn.LSTM(
            self.embedding_size, self.hidden_size, batch_first=True
        )
        self.instr_rnn = nn.LSTM(self.hidden_size, self.hidden_size, batch_first=True)
        self.linear = nn.Linear(self.hidden_size, 1)

    def forward(self, basic_block_tokens, tokens_to_ids_converter):
        device = self.embedding.weight.device

        if not basic_block_tokens or not any(instr for instr in basic_block_tokens):
            return torch.tensor([0.0], device=device, dtype=torch.float32)
        instruction_representations = []
        for tokens_in_instruction in basic_block_tokens:
            if not tokens_in_instruction:
                continue

            token_ids = tokens_to_ids_converter(tokens_in_instruction).to(device)

            embedded_tokens = self.embedding(token_ids).unsqueeze(0)
            _, (h_n_token, _) = self.token_rnn(embedded_tokens)

            instr_repr = h_n_token.squeeze(0).squeeze(0)
            instruction_representations.append(instr_repr)
        if not instruction_representations:
            return torch.tensor([0.0], device=device, dtype=torch.float32)

        # Shape before unsqueeze: (num_instructions_in_block, hidden_size)
        # Add batch dim for instr_rnn: (1, num_instructions_in_block, hidden_size)
        instr_input_tensor = torch.stack(instruction_representations).unsqueeze(0)
        _, (h_n_block, _) = self.instr_rnn(instr_input_tensor)
        block_repr = h_n_block.squeeze(0).squeeze(0)
        output = self.linear(block_repr)
        return output

In [10]:
class MAPE_Loss(nn.Module):
    def __init__(self, epsilon=1e-8):
        super(MAPE_Loss, self).__init__()
        self.epsilon = epsilon

    def forward(self, y_pred, y_true):
        if y_true.shape != y_pred.shape:
            raise ValueError(
                f"y_true and y_pred must have the same shape, but got {y_true.shape} and {y_pred.shape}"
            )

        absolute_percentage_error = torch.abs(
            (y_true - y_pred) / (y_true + self.epsilon)
        )
        mape = torch.mean(absolute_percentage_error) * 100
        return mape

In [11]:
class InstructionDataset(Dataset):
    def __init__(self, features, labels):
        self.features = features
        self.labels = labels

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]

In [None]:
def select_device():
    if torch.cuda.is_available():
        return "cuda"
    elif torch.backends.mps.is_available():
        return "cpu" # mps fails. I don't know why
    else:
        return "cpu"

: 

In [None]:
import torch
import torch.nn as nn

EMBEDDING_SIZE = 64
HIDDEN_SIZE = 128
LEARNING_RATE = 1e-3
EPOCHS = 5

device = select_device()
print(f"Using device: {device}")
model = Ithemal(VOCAB_SIZE, EMBEDDING_SIZE, HIDDEN_SIZE, PADDING_IDX).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)

criterion = MAPE_Loss()


print("Starting non-batch training...")
for epoch in range(EPOCHS):
    model.train()
    total_train_loss = 0
    print(f"\nEpoch {epoch+1}/{EPOCHS}")
    for i in range(len(train_dataset)):
        block_token_lists, target = train_dataset[i]
        target = target.to(device)

        optimizer.zero_grad()

        prediction = model(block_token_lists, lambda tokens: tokens_to_ids_func(tokens, vocab_map))
        loss = criterion(prediction, target)

        loss.backward()
        optimizer.step()
        total_train_loss += loss.item()

        print(
            f"\rTraining: Sample {i+1}/{len(train_dataset)}, Loss: {loss.item():.4f}",
            end="",
        )

    avg_train_loss = (total_train_loss / len(train_dataset))
    print(
        f"\rEpoch {epoch+1} Average Training Loss: {avg_train_loss:.4f}       "
    )

    model.eval()
    total_test_loss = 0
    with torch.no_grad():
        for i in range(len(test_dataset)):
            block_token_lists, target = test_dataset[i]
            target = target.to(device)
            prediction = model(
                block_token_lists, lambda tokens: tokens_to_ids_func(tokens, vocab_map)
            )
            loss = criterion(prediction, target)
            total_test_loss += loss.item()

            print(
                f"\rTesting: Sample {i+1}/{len(test_dataset)}, Loss: {loss.item():.4f}",
                end="",
            )

    avg_test_loss = total_test_loss / len(test_dataset)
    print(f"\rEpoch {epoch+1} Average Test Loss: {avg_test_loss:.4f}          ")

print("\nTraining finished.")

Using device: mps
Starting non-batch training...

Epoch 1/5
Training: Sample 9/8000, Loss: 99.8875

In [None]:
model_save_dir = "models/"
model_filename = "final_ithemal_model.pth"
model_save_path = os.path.join(model_save_dir, model_filename)

os.makedirs(model_save_dir, exist_ok=True)
torch.save(model.state_dict(), model_save_path)
print(f"Final model saved to {model_save_path}")

Final model saved to models/final_ithemal_model.pth
