In [1]:
import pandas as pd
import torch
import warnings
warnings.filterwarnings("ignore")
from transformers import logging
logging.set_verbosity_error()


In [51]:
llm = "llama"

# 1st Meta Network using just last generation informations

In [2]:
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

# 1. Load the dataset
df = pd.read_csv(f"/content/blocksworld_generations_dataset_{llm}.csv")

# 2. Solves_problem already binary (0/1), no need to map if it's consistent
# Ensure it's float
df["solves_problem"] = df["solves_problem"].astype(float)

# 3. Function to extract last attempt features based on num_attempts
def extract_last_attempt(row):
    n = int(row["num_attempts"])
    return pd.Series({
        "valid_action_percent": row[f"gen{n}_valid_action_percent"],
        "consecutive_valid_steps": row[f"gen{n}_consecutive_valid_steps"],
        "logical_violations": row[f"gen{n}_logical_violations"],
        "plan_len": row["plan_len"],  # already refers to final plan length
        "solves_problem": row["solves_problem"]
    })

# Apply extraction row-wise
df_last = df.apply(extract_last_attempt, axis=1)

# Drop rows with missing values
df_last = df_last.dropna()

# Features and target
features = ["valid_action_percent", "consecutive_valid_steps", "logical_violations", "plan_len"]
target = "solves_problem"

X = df_last[features].astype("float32").values
y = df_last[target].astype("float32").values.reshape(-1, 1)

# Train/val/test split
X_train, X_temp, y_train, y_temp = train_test_split(
    X, y, test_size=0.3, random_state=42, stratify=y
)
X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp
)

# Standardize features
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train).astype("float32")
X_val = scaler.transform(X_val).astype("float32")
X_test = scaler.transform(X_test).astype("float32")

# Custom dataset
class PlanningMetaDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.float32)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# Datasets and loaders
train_dataset = PlanningMetaDataset(X_train, y_train)
val_dataset = PlanningMetaDataset(X_val, y_val)
test_dataset = PlanningMetaDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32)
test_loader = DataLoader(test_dataset, batch_size=32)


In [3]:
import torch.nn as nn

class MetaConfidenceNet(nn.Module):
    def __init__(self, input_dim):
        super(MetaConfidenceNet, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            nn.BatchNorm1d(64),
            nn.Dropout(0.2),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 1)  # logit output
        )

    def forward(self, x):
        return self.model(x)


In [4]:
class EarlyStopping:
    def __init__(self, patience=10, min_delta=0.001):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.best_loss = float('inf')
        self.early_stop = False

    def __call__(self, val_loss):
        if val_loss < self.best_loss - self.min_delta:
            self.best_loss = val_loss
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True


In [5]:
import torch.optim as optim

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = MetaConfidenceNet(input_dim=X.shape[1]).to(device)
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
early_stopping = EarlyStopping(patience=8)

def evaluate(loader):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for X_batch, y_batch in loader:
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            total_loss += loss.item() * y_batch.size(0)

            preds = (torch.sigmoid(outputs) > 0.5).float()
            correct += (preds == y_batch).sum().item()
            total += y_batch.size(0)

    return total_loss / total, correct / total

# Train
n_epochs = 50
for epoch in range(n_epochs):
    model.train()
    running_loss = 0
    for X_batch, y_batch in train_loader:
        X_batch = X_batch.to(device)
        y_batch = y_batch.to(device)
        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * y_batch.size(0)

    train_loss = running_loss / len(train_loader.dataset)
    val_loss, val_acc = evaluate(val_loader)

    print(f"Epoch {epoch+1}/{n_epochs} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f}")

    early_stopping(val_loss)
    if early_stopping.early_stop:
        print("Early stopping triggered.")
        break


Epoch 1/50 | Train Loss: 0.7458 | Val Loss: 0.6823 | Val Acc: 1.0000
Epoch 2/50 | Train Loss: 0.6939 | Val Loss: 0.6645 | Val Acc: 1.0000
Epoch 3/50 | Train Loss: 0.6483 | Val Loss: 0.6420 | Val Acc: 1.0000
Epoch 4/50 | Train Loss: 0.5971 | Val Loss: 0.6179 | Val Acc: 1.0000
Epoch 5/50 | Train Loss: 0.5654 | Val Loss: 0.5912 | Val Acc: 1.0000
Epoch 6/50 | Train Loss: 0.5450 | Val Loss: 0.5613 | Val Acc: 1.0000
Epoch 7/50 | Train Loss: 0.4957 | Val Loss: 0.5406 | Val Acc: 1.0000
Epoch 8/50 | Train Loss: 0.4728 | Val Loss: 0.5078 | Val Acc: 1.0000
Epoch 9/50 | Train Loss: 0.4362 | Val Loss: 0.4723 | Val Acc: 1.0000
Epoch 10/50 | Train Loss: 0.4076 | Val Loss: 0.4318 | Val Acc: 1.0000
Epoch 11/50 | Train Loss: 0.3762 | Val Loss: 0.3937 | Val Acc: 1.0000
Epoch 12/50 | Train Loss: 0.3415 | Val Loss: 0.3322 | Val Acc: 1.0000
Epoch 13/50 | Train Loss: 0.3146 | Val Loss: 0.3065 | Val Acc: 1.0000
Epoch 14/50 | Train Loss: 0.2713 | Val Loss: 0.2636 | Val Acc: 1.0000
Epoch 15/50 | Train Loss: 0.2

In [6]:
test_loss, test_acc = evaluate(test_loader)
print(f"\nTest Loss: {test_loss:.4f} | Test Accuracy: {test_acc:.4f}")



Test Loss: 0.0048 | Test Accuracy: 1.0000


In [8]:
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader

# --- 1. Load test CSV ---
df = pd.read_csv("/content/all_generations_problem.csv")
#df = df[df["folder"] == "basic_move"]
# --- 2. Use exact same features as training ---
features = [
    'valid_action_percent',
    'consecutive_valid_steps',
    'logical_violations',
    'plan_len'
]
target = 'solves_problem'

# --- 3. Drop rows with NaNs ---
df = df.dropna(subset=features + [target])

# --- 4. Convert target to float ---
df['solves_problem'] = df['solves_problem'].replace({False: 0.0, True: 1.0})

# --- 5. Extract and scale ---
X_test = df[features].astype('float32').values
y_test = df[target].astype('float32').values.reshape(-1, 1)

# --- 6. Use pre-fitted scaler ---
X_test = scaler.transform(X_test)

# --- 7. PyTorch Dataset & DataLoader ---
class PlanningMetaDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.float32)
    def __len__(self):
        return len(self.X)
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

test_dataset = PlanningMetaDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)


In [9]:
test_loss, test_acc = evaluate(test_loader)
print(f"\nTest Loss VALIDATION: {test_loss:.4f} | Test Accuracy: {test_acc:.4f}")



Test Loss VALIDATION: 0.4440 | Test Accuracy: 0.8367


In [16]:
with torch.no_grad():
  model.eval()
  easy_problem = test_dataset[0][0].unsqueeze(0).to(device)
  model = model.to(device)
  score = torch.sigmoid(model(easy_problem))
  print(f"Confidence score for easy basic move problem is {score.item():.2f}")

Confidence score for easy basic move problem is 0.99


In [17]:
with torch.no_grad():
  model.eval()
  hard_problem = test_dataset[33][0].unsqueeze(0).to(device)
  model = model.to(device)
  score = torch.sigmoid(model(hard_problem))
  print(f"Confidence score for hard hanoi problem is {score.item():.2f}")

Confidence score for hard hanoi problem is 0.01


# 1st Meta Network got a 55% score on validation set

# Now let' s do BERTA WITH EXTRA FEWTURES Meta Network

In [27]:
import os
import pandas as pd

# Load the dataset
df = pd.read_csv(f"/content/blocksworld_generations_dataset_{llm}.csv")

# Base folder where problem files are stored
base_path = "/content"

# Replace problem column with file contents
problem_contents = []
for prob in df["problem"]:
    file_path = os.path.join(base_path, prob+".pddl")
    try:
        with open(file_path, "r") as f:
            content = f.read()
        problem_contents.append(content)
    except FileNotFoundError:
        # If file is missing, keep the original name (or you can set None)
        problem_contents.append(prob)

df["problem"] = problem_contents


In [36]:
train_df, temp_df = train_test_split(df, test_size=0.3, random_state=42, stratify=df['solves_problem'])

In [39]:
import pandas as pd
import torch
import numpy as np
from torch.utils.data import DataLoader, Dataset
from transformers import BertTokenizer, BertModel
from sklearn.model_selection import train_test_split
import torch.nn as nn
import torch.nn.functional as F
from tqdm import tqdm



# Label encoding
df['solves_problem'] = df['solves_problem'].replace({False: 0.0, True: 1.0})
df = df.drop_duplicates(subset='problem')

# Features
extra_features = ['valid_action_percent', 'consecutive_valid_steps', 'logical_violations', 'plan_len']
df = df.dropna(subset=['problem', 'num_attempts'] + extra_features + ['solves_problem'])

# Train/Val/Test split
train_df, temp_df = train_test_split(df, test_size=0.3, random_state=42, )
val_df, test_df = train_test_split(temp_df, test_size=0.5, random_state=42,)

tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

In [40]:
class BertPlanningDataset(Dataset):
    def __init__(self, df, extra_features, tokenizer, max_length=512):
        self.df = df.reset_index(drop=True)
        self.extra_features = extra_features
        self.tokenizer = tokenizer
        self.max_length = max_length

        self.labels = torch.tensor(df['solves_problem'].values, dtype=torch.float32)
        self.extra_feats = torch.tensor(df[extra_features].values, dtype=torch.float32)

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.loc[idx]
        gen_col = f'gen_{int(row["num_attempts"])}'
        problem = row['problem']
        generated = row.get(gen_col, "")

        encoding = self.tokenizer(
            problem,
            generated,
            padding='max_length',
            truncation='only_second',
            max_length=self.max_length,
            return_tensors='pt'
        )
        item = {k: v.squeeze(0) for k, v in encoding.items()}
        item['extra_feats'] = self.extra_feats[idx]
        item['labels'] = self.labels[idx]
        return item


In [41]:
class BertWithExtraFeatures(nn.Module):
    def __init__(self, bert_model_name='bert-base-uncased', extra_feat_dim=4, dropout=0.3):
        super().__init__()
        self.bert = BertModel.from_pretrained(bert_model_name)
        self.dropout = nn.Dropout(dropout)
        self.classifier = nn.Sequential(
            nn.Linear(self.bert.config.hidden_size + extra_feat_dim, 128),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(128, 1),
            nn.Sigmoid()
        )

    def forward(self, input_ids, attention_mask, token_type_ids, extra_feats):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
        cls_output = outputs.last_hidden_state[:, 0, :]  # [CLS]
        combined = torch.cat([cls_output, extra_feats], dim=1)
        return self.classifier(combined).squeeze(1)


In [42]:
train_dataset = BertPlanningDataset(train_df, extra_features, tokenizer)
val_dataset = BertPlanningDataset(val_df, extra_features, tokenizer)
test_dataset = BertPlanningDataset(test_df, extra_features, tokenizer)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16)
test_loader = DataLoader(test_dataset, batch_size=16)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = BertWithExtraFeatures().to(device)

criterion = nn.BCELoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-5)


model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

In [43]:
def evaluate(model, loader):
    model.eval()
    total, correct = 0, 0
    with torch.no_grad():
        for batch in loader:
            inputs = {k: v.to(device) for k, v in batch.items() if k != 'labels'}
            labels = batch['labels'].to(device)
            outputs = model(**inputs)
            preds = (outputs > 0.5).float()
            correct += (preds == labels).sum().item()
            total += labels.size(0)
    return correct / total

num_epochs = 3
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for batch in tqdm(train_loader, desc=f"Epoch {epoch+1}"):
        inputs = {k: v.to(device) for k, v in batch.items() if k != 'labels'}
        labels = batch['labels'].to(device)

        optimizer.zero_grad()
        outputs = model(**inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    val_acc = evaluate(model, val_loader)
    print(f"Epoch {epoch+1} | Loss: {running_loss:.4f} | Val Acc: {val_acc:.4f}")


Epoch 1: 100%|██████████| 6/6 [00:09<00:00,  1.56s/it]


Epoch 1 | Loss: 2.7178 | Val Acc: 0.8947


Epoch 2: 100%|██████████| 6/6 [00:09<00:00,  1.51s/it]


Epoch 2 | Loss: 1.6653 | Val Acc: 1.0000


Epoch 3: 100%|██████████| 6/6 [00:08<00:00,  1.48s/it]


Epoch 3 | Loss: 1.2920 | Val Acc: 1.0000


In [44]:
test_acc = evaluate(model, test_loader)
print(f"Test Accuracy: {test_acc:.4f}")


Test Accuracy: 0.9500


In [47]:
import pandas as pd
import torch
from torch.utils.data import DataLoader
from transformers import BertTokenizer

# 1. Load validation set (replace with your path or preloaded DataFrame)
df_val = pd.read_csv("/content/all_generations_problem.csv")
#df_val = df_val[df_val["folder"] == "blocksworld"]
# 2. Ensure required fields exist and clean
required_cols = ['problem', 'solves_problem', 'num_attempts'] + [
    f'gen_{i}' for i in range(1, 5)
] + ['valid_action_percent', 'consecutive_valid_steps', 'logical_violations']

missing_cols = [col for col in required_cols if col not in df_val.columns]
if missing_cols:
    raise ValueError(f"Missing columns in validation dataset: {missing_cols}")

# 3. Load BERT tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# 4. Define extra numeric features
extra_features = ['valid_action_percent', 'consecutive_valid_steps', 'logical_violations', 'plan_len']

# 5. Define custom dataset class
class BertPlanningDataset(torch.utils.data.Dataset):
    def __init__(self, df, extra_features, tokenizer, max_length=512):
        self.df = df.reset_index(drop=True)
        self.extra_features = extra_features
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.labels = torch.tensor(df['solves_problem'].replace({False: 0, True: 1}).values, dtype=torch.float32)
        self.extra_feats = torch.tensor(df[extra_features].fillna(0).values, dtype=torch.float32)

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.loc[idx]
        problem = row['problem']
        gen_col = f"gen_{int(row['num_attempts'])}"
        generated = str(row.get(gen_col, "")) if pd.notnull(row.get(gen_col, "")) else ""

        encoded = self.tokenizer(
        problem,
        generated,
        padding='max_length',
        truncation='longest_first',
        max_length=self.max_length,
        return_tensors='pt'
    )

        item = {key: val.squeeze(0) for key, val in encoded.items()}
        item['extra_feats'] = self.extra_feats[idx]
        item['labels'] = self.labels[idx]
        return item

# 6. Create test dataset and DataLoader
test_dataset = BertPlanningDataset(df_val, extra_features, tokenizer)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)


In [48]:
test_acc = evaluate(model, test_loader)
print(f"Test Accuracy: {test_acc:.4f}")


Test Accuracy: 0.7143


# BERTA ONLY ENCODING

In [52]:
class BertPlanningDataset(Dataset):
    def __init__(self, df, extra_features, tokenizer, max_length=512):
        self.df = df.reset_index(drop=True)
        self.tokenizer = tokenizer
        self.max_length = max_length

        self.labels = torch.tensor(df['solves_problem'].values, dtype=torch.float32)

        # Handle optional extra features
        self.use_extra_feats = extra_features is not None and len(extra_features) > 0
        if self.use_extra_feats:
            self.extra_feats = torch.tensor(df[extra_features].values, dtype=torch.float32)

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.loc[idx]
        gen_col = f'gen_{int(row["num_attempts"])}'
        problem = row['problem']
        generated = row.get(gen_col, "")

        encoding = self.tokenizer(
            problem,
            generated,
            padding='max_length',
            truncation=True,
            max_length=self.max_length,
            return_tensors='pt'
        )
        item = {k: v.squeeze(0) for k, v in encoding.items()}

        if self.use_extra_feats:
            item['extra_feats'] = self.extra_feats[idx]

        item['labels'] = self.labels[idx]
        return item


In [53]:
import pandas as pd
import torch
import numpy as np
from torch.utils.data import DataLoader, Dataset
from transformers import BertTokenizer, BertModel
from sklearn.model_selection import train_test_split
import torch.nn as nn
import torch.nn.functional as F
from tqdm import tqdm

# Load dataset
df = pd.read_csv(f"//content/blocksworld_generations_dataset_{llm}.csv")

# Label encoding
df['solves_problem'] = df['solves_problem'].replace({False: 0.0, True: 1.0})
df = df.drop_duplicates(subset='problem')

# Features
extra_features = ['valid_action_percent', 'consecutive_valid_steps', 'logical_violations', 'plan_len']
df = df.dropna(subset=['problem', 'num_attempts'] + extra_features + ['solves_problem'])

# Train/Val/Test split
train_df, temp_df = train_test_split(df, test_size=0.3, random_state=42, stratify=df['solves_problem'])
val_df, test_df = train_test_split(temp_df, test_size=0.5, random_state=42, stratify=temp_df['solves_problem'])

tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")


In [54]:
import torch
import torch.nn as nn
from transformers import BertModel

class BertWithExtraFeatures(nn.Module):
    def __init__(self, bert_model_name='bert-base-uncased', extra_feat_dim=0, dropout=0.3):
        super().__init__()
        self.bert = BertModel.from_pretrained(bert_model_name)
        self.use_extra_feats = extra_feat_dim > 0
        input_dim = self.bert.config.hidden_size + extra_feat_dim if self.use_extra_feats else self.bert.config.hidden_size

        self.dropout = nn.Dropout(dropout)
        self.classifier = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(128, 1),
            nn.Sigmoid()
        )

    def forward(self, input_ids, attention_mask, token_type_ids, extra_feats=None):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
        cls_output = outputs.last_hidden_state[:, 0, :]  # [CLS] token

        if self.use_extra_feats:
            if extra_feats is None:
                raise ValueError("Model was initialized to use extra features, but none were provided.")
            combined = torch.cat([cls_output, extra_feats], dim=1)
        else:
            combined = cls_output

        return self.classifier(combined).squeeze(1)


In [55]:
class BertPlanningDataset(Dataset):
    def __init__(self, df, extra_features, tokenizer, max_length=512):
        self.df = df.reset_index(drop=True)
        self.tokenizer = tokenizer
        self.max_length = max_length

        self.labels = torch.tensor(df['solves_problem'].values, dtype=torch.float32)

        # Handle optional extra features
        self.use_extra_feats = extra_features is not None and len(extra_features) > 0
        if self.use_extra_feats:
            self.extra_feats = torch.tensor(df[extra_features].values, dtype=torch.float32)

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.loc[idx]
        gen_col = f'gen_{int(row["num_attempts"])}'
        problem = row['problem']
        generated = row.get(gen_col, "")

        encoding = self.tokenizer(
            problem,
            generated,
            padding='max_length',
            truncation=True,
            max_length=self.max_length,
            return_tensors='pt'
        )
        item = {k: v.squeeze(0) for k, v in encoding.items()}

        if self.use_extra_feats:
            item['extra_feats'] = self.extra_feats[idx]

        item['labels'] = self.labels[idx]
        return item


In [56]:
train_dataset = BertPlanningDataset(train_df, None, tokenizer)
val_dataset = BertPlanningDataset(val_df, None, tokenizer)
test_dataset = BertPlanningDataset(test_df, None, tokenizer)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16)
test_loader = DataLoader(test_dataset, batch_size=16)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = BertWithExtraFeatures().to(device)

criterion = nn.BCELoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-5)


In [57]:
def evaluate(model, loader):
    model.eval()
    total, correct = 0, 0
    with torch.no_grad():
        for batch in loader:
            inputs = {k: v.to(device) for k, v in batch.items() if k != 'labels'}
            labels = batch['labels'].to(device)
            outputs = model(**inputs)
            preds = (outputs > 0.5).float()
            correct += (preds == labels).sum().item()
            total += labels.size(0)
    return correct / total

num_epochs = 3
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for batch in tqdm(train_loader, desc=f"Epoch {epoch+1}"):
        inputs = {k: v.to(device) for k, v in batch.items() if k != 'labels'}
        labels = batch['labels'].to(device)

        optimizer.zero_grad()
        outputs = model(**inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    val_acc = evaluate(model, val_loader)
    print(f"Epoch {epoch+1} | Loss: {running_loss:.4f} | Val Acc: {val_acc:.4f}")


Epoch 1: 100%|██████████| 6/6 [00:08<00:00,  1.46s/it]


Epoch 1 | Loss: 2.7927 | Val Acc: 0.8947


Epoch 2: 100%|██████████| 6/6 [00:08<00:00,  1.47s/it]


Epoch 2 | Loss: 1.6900 | Val Acc: 0.8947


Epoch 3: 100%|██████████| 6/6 [00:09<00:00,  1.50s/it]


Epoch 3 | Loss: 1.4536 | Val Acc: 0.8947


In [58]:
import pandas as pd
import torch
from torch.utils.data import DataLoader, Dataset
from transformers import BertTokenizer

# 1. Load validation set
df_val = pd.read_csv("/content/all_generations_problem.csv")
# 2. Ensure required fields exist and clean
required_cols = ['problem', 'solves_problem', 'num_attempts'] + [
    f'gen_{i}' for i in range(1, 5)
]

missing_cols = [col for col in required_cols if col not in df_val.columns]
if missing_cols:
    raise ValueError(f"Missing columns in validation dataset: {missing_cols}")

# 3. Load BERT tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# 4. Define custom dataset class without extra features
class BertPlanningDataset(Dataset):
    def __init__(self, df, tokenizer, max_length=512):
        self.df = df.reset_index(drop=True)
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.labels = torch.tensor(df['solves_problem'].replace({False: 0, True: 1}).values, dtype=torch.float32)

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.loc[idx]
        problem = row['problem']
        gen_col = f"gen_{int(row['num_attempts'])}"
        generated = str(row.get(gen_col, "")) if pd.notnull(row.get(gen_col, "")) else ""

        encoded = self.tokenizer(
            problem,
            generated,
            padding='max_length',
            truncation=True,
            max_length=self.max_length,
            return_tensors='pt',
            return_overflowing_tokens=False
        )

        item = {key: val.squeeze(0) for key, val in encoded.items()}
        item['labels'] = self.labels[idx]
        return item

# 5. Create test dataset and DataLoader
test_dataset = BertPlanningDataset(df_val, tokenizer)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)


In [59]:
test_acc = evaluate(model, test_loader)
print(f"Test Accuracy: {test_acc:.4f}")


Test Accuracy: 0.7143


# PROGRESSION

In [60]:
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler

# 1. Load the dataset
df = pd.read_csv(f"/content/blocksworld_generations_dataset_{llm}.csv")

# 2. Replace labels
df['solves_problem'] = df['solves_problem'].replace({False: 0.0, True: 1.0})

# 4. Drop original features (redundant)
redundant = ['valid_action_percent', 'consecutive_valid_steps', 'logical_violations']
df = df.drop(columns=redundant, errors='ignore')

# 5. Define generation features
gen_features = ['valid_action_percent', 'consecutive_valid_steps', 'logical_violations']
features = []

for i in range(1, 5):
    for feat in gen_features:
        features.append(f'gen{i}_{feat}')

# 6. Optionally add 'plan_len'
features.append('plan_len')

# 8. Fill NaNs in gen features with padding (e.g., -1.0 or 0.0)
df[features] = df[features].fillna(-1000)

# 9. Convert to numpy arrays
X = df[features].astype('float32').values
y = df['solves_problem'].astype('float32').values.reshape(-1, 1)

# 10. Train-val-test split
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

# 11. Normalize (optional for tree models, important for MLPs)
scaler = MinMaxScaler()
X_train = scaler.fit_transform(X_train).astype('float32')
X_val = scaler.transform(X_val).astype('float32')
X_test = scaler.transform(X_test).astype('float32')

# 12. Dataset definition
class PlanningGenDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.float32)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# 13. Dataloaders
train_dataset = PlanningGenDataset(X_train, y_train)
val_dataset = PlanningGenDataset(X_val, y_val)
test_dataset = PlanningGenDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32)
test_loader = DataLoader(test_dataset, batch_size=32)


In [61]:
df[features]

Unnamed: 0,gen1_valid_action_percent,gen1_consecutive_valid_steps,gen1_logical_violations,gen2_valid_action_percent,gen2_consecutive_valid_steps,gen2_logical_violations,gen3_valid_action_percent,gen3_consecutive_valid_steps,gen3_logical_violations,gen4_valid_action_percent,gen4_consecutive_valid_steps,gen4_logical_violations,plan_len
0,18.181818,4,3,9.090909,2.0,3.0,8.333333,2.0,3.0,8.000000,2.0,3.0,25
1,5.555556,2,3,4.761905,2.0,3.0,2.597403,2.0,3.0,2.040816,2.0,3.0,98
2,5.405405,2,3,6.060606,2.0,3.0,5.128205,2.0,3.0,6.451613,2.0,3.0,31
3,8.333333,2,3,6.666667,2.0,3.0,6.250000,2.0,3.0,5.882353,2.0,3.0,34
4,6.666667,2,3,6.060606,2.0,3.0,5.882353,2.0,3.0,4.545455,2.0,3.0,44
...,...,...,...,...,...,...,...,...,...,...,...,...,...
136,5.882353,1,3,4.347826,1.0,3.0,4.545455,1.0,3.0,4.545455,1.0,3.0,22
137,5.882353,2,3,8.000000,2.0,3.0,5.882353,2.0,3.0,5.882353,2.0,3.0,34
138,3.703704,1,3,3.703704,1.0,3.0,3.448276,1.0,3.0,3.448276,1.0,3.0,29
139,5.882353,2,3,9.523810,2.0,3.0,6.451613,2.0,3.0,6.666667,2.0,3.0,30


In [62]:
import torch.optim as optim

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = MetaConfidenceNet(input_dim=X.shape[1]).to(device)
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
early_stopping = EarlyStopping(patience=8)

def evaluate(loader):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for X_batch, y_batch in loader:
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            total_loss += loss.item() * y_batch.size(0)

            preds = (torch.sigmoid(outputs) > 0.5).float()
            correct += (preds == y_batch).sum().item()
            total += y_batch.size(0)

    return total_loss / total, correct / total

# Train
n_epochs = 100
for epoch in range(n_epochs):
    model.train()
    running_loss = 0
    for X_batch, y_batch in train_loader:
        X_batch = X_batch.to(device)
        y_batch = y_batch.to(device)
        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * y_batch.size(0)

    train_loss = running_loss / len(train_loader.dataset)
    val_loss, val_acc = evaluate(val_loader)

    print(f"Epoch {epoch+1}/{n_epochs} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f}")

    early_stopping(val_loss)
    if early_stopping.early_stop:
        print("Early stopping triggered.")
        break


Epoch 1/100 | Train Loss: 0.6551 | Val Loss: 0.6363 | Val Acc: 0.9524
Epoch 2/100 | Train Loss: 0.6154 | Val Loss: 0.6217 | Val Acc: 0.9524
Epoch 3/100 | Train Loss: 0.5900 | Val Loss: 0.6093 | Val Acc: 0.9524
Epoch 4/100 | Train Loss: 0.5716 | Val Loss: 0.5960 | Val Acc: 1.0000
Epoch 5/100 | Train Loss: 0.5380 | Val Loss: 0.5820 | Val Acc: 1.0000
Epoch 6/100 | Train Loss: 0.5182 | Val Loss: 0.5630 | Val Acc: 1.0000
Epoch 7/100 | Train Loss: 0.4975 | Val Loss: 0.5441 | Val Acc: 1.0000
Epoch 8/100 | Train Loss: 0.4872 | Val Loss: 0.5196 | Val Acc: 1.0000
Epoch 9/100 | Train Loss: 0.4540 | Val Loss: 0.4962 | Val Acc: 1.0000
Epoch 10/100 | Train Loss: 0.4383 | Val Loss: 0.4729 | Val Acc: 1.0000
Epoch 11/100 | Train Loss: 0.4141 | Val Loss: 0.4505 | Val Acc: 1.0000
Epoch 12/100 | Train Loss: 0.3986 | Val Loss: 0.4229 | Val Acc: 1.0000
Epoch 13/100 | Train Loss: 0.3775 | Val Loss: 0.3958 | Val Acc: 1.0000
Epoch 14/100 | Train Loss: 0.3487 | Val Loss: 0.3649 | Val Acc: 1.0000
Epoch 15/100 | 

In [63]:
test_loss, test_acc = evaluate(test_loader)
print(f"\nTest Loss: {test_loss:.4f} | Test Accuracy: {test_acc:.4f}")



Test Loss: 0.0209 | Test Accuracy: 1.0000


In [70]:
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader

# 1. Load the new test CSV
df = pd.read_csv("/content/all_generations_problem.csv")

# 2. Drop redundant features if they exist
redundant = ['valid_action_percent', 'consecutive_valid_steps', 'logical_violations']
df = df.drop(columns=redundant, errors='ignore')

# 3. Define the exact same 13 features as training
gen_features = ['valid_action_percent', 'consecutive_valid_steps', 'logical_violations']
features = []

for i in range(1, 5):
    for feat in gen_features:
        features.append(f'gen{i}_{feat}')

features.append('plan_len')  # Add plan_len to make it 13 total
# 5. Replace target label
df['solves_problem'] = df['solves_problem'].replace({False: 0.0, True: 1.0})

# 6. Pad remaining NaNs in features (for early-stopping generations)
df[features] = df[features].fillna(-1000)

# 7. Extract and convert to float32
X_test = df[features].astype('float32').values
y_test = df['solves_problem'].astype('float32').values.reshape(-1, 1)

# 8. Standardize using the training scaler
X_test = scaler.transform(X_test).astype('float32')  # `scaler` must be from training

# 9. Create PyTorch Dataset and DataLoader
class PlanningGenDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.float32)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

test_dataset = PlanningGenDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)


In [72]:
test_loss, test_acc = evaluate(test_loader)
print(f"\nTest Loss: {test_loss:.4f} | Test Accuracy: {test_acc:.4f}")



Test Loss: 0.0303 | Test Accuracy: 1.0000


In [66]:
with torch.no_grad():
  model.eval()
  easy_problem = test_dataset[0][0].unsqueeze(0).to(device)
  model = model.to(device)
  score = torch.sigmoid(model(easy_problem))
  print(f"Confidence score for easy basic move problem is {score.item():.4f}")

Confidence score for easy basic move problem is 1.0000


In [67]:
with torch.no_grad():
  model.eval()
  hard_problem = test_dataset[33][0].unsqueeze(0).to(device)
  model = model.to(device)
  score = torch.sigmoid(model(hard_problem))
  print(f"Confidence score for hard hanoi problem is {score.item():.4f}")

Confidence score for hard hanoi problem is 0.0139
