In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
from sklearn.metrics import accuracy_score, mean_absolute_error
import joblib
import warnings
warnings.filterwarnings("ignore")

param_space = {
    "cpu_clock_GHz": np.round(np.linspace(0.3, 3.0, 24), 2),
    "l1i_kb": 2 ** np.arange(4, 9),  
    "l1d_kb": 2 ** np.arange(4, 9),
    "l1_assoc": 2 ** np.arange(0, 4), 
    "l2_kb": 2 ** np.arange(7, 12),  
    "l2_assoc": 2 ** np.arange(1, 5), 
    "fetchWidth": np.arange(4, 13, 1, dtype=int), 
    "decodeWidth": np.arange(4, 13, 1, dtype=int),
    "renameWidth": np.arange(4, 13, 1, dtype=int),
    "dispatchWidth": np.arange(4, 13, 1, dtype=int),
    "issueWidth": np.arange(4, 13, 1, dtype=int),
    "commitWidth": np.arange(4, 13, 1, dtype=int),
    "wbWidth": np.arange(6, 13, 1, dtype=int),
    "numROBEntries": np.arange(32, 257, 16, dtype=int),
    "numIQEntries": np.arange(16, 129, 16, dtype=int),
    "numPhysIntRegs": np.arange(64, 513, 32, dtype=int),
    "numPhysFloatRegs": np.arange(64, 513, 32, dtype=int),
    "LQEntries": np.arange(8, 65, 8, dtype=int),
    "SQEntries": np.arange(8, 65, 8, dtype=int),
    "branch_predictor": [
        "BiModeBP", "LocalBP", "TAGE", "TAGE_SC_L_64KB",
        "MultiperspectivePerceptron64KB", "TournamentBP"
    ],
}

classification_targets = [
    'l1i_kb', 'l1d_kb', 'l2_kb', 
    'branch_predictor'
]

regression_targets = [
    'fetchWidth', 'decodeWidth', 'renameWidth', 'dispatchWidth',
    'issueWidth', 'commitWidth', 'wbWidth',
    'numROBEntries', 'numIQEntries',
    'numPhysIntRegs', 'numPhysFloatRegs',
    'LQEntries', 'SQEntries', 'cpu_clock_GHz'
]

all_targets = classification_targets + regression_targets

df = pd.read_csv("/kaggle/input/gem5-results/gem5_mcpat_stats_results_cleaned_50k_v3.csv")
df_original = df.copy()

feature_columns = [
    'workload',
    'Area', 'Peak Power', 'Total Leakage', 'Peak Dynamic',
    'Subthreshold Leakage', 'Gate Leakage', 'Runtime Dynamic',
    'ipc', 'branch_misprediction_rate',
    'icache_miss_rate', 'dcache_read_miss_rate', 'dcache_write_miss_rate'
]

encoders = {}

le_workload = LabelEncoder()
df["workload"] = le_workload.fit_transform(df["workload"])
encoders["workload"] = le_workload

classification_encoded = {}
for col in classification_targets:
    le = LabelEncoder()
    df[col] = le.fit_transform(df[col])
    encoders[col] = le
    classification_encoded[col] = len(le.classes_)
    print(f"Encoded {col}: {len(le.classes_)} classes (CLASSIFICATION)")


X = df[feature_columns]
y_class = df[classification_targets]
y_reg = df[regression_targets].astype(float)

X_train, X_test, y_class_train, y_class_test, y_reg_train, y_reg_test = train_test_split(
    X, y_class, y_reg, test_size=0.2, random_state=42
)


categorical_features = ['workload']
continuous_features = [c for c in feature_columns if c not in categorical_features]

scaler_features = StandardScaler()
X_train[continuous_features] = scaler_features.fit_transform(X_train[continuous_features])
X_test[continuous_features] = scaler_features.transform(X_test[continuous_features])

scaler_targets = StandardScaler()
y_reg_train_scaled = pd.DataFrame(
    scaler_targets.fit_transform(y_reg_train),
    columns=y_reg_train.columns,
    index=y_reg_train.index
)
y_reg_test_scaled = pd.DataFrame(
    scaler_targets.transform(y_reg_test),
    columns=y_reg_test.columns,
    index=y_reg_test.index
)

In [None]:
def generate_constraint_samples(n_samples=50000):
    print(f"\nGenerating {n_samples} random performance constraint scenarios...")
    
    samples = {}
    
    samples['workload'] = np.random.choice(
        X_train.reset_index(drop=True)['workload'].unique(),
        n_samples
    )
    
    for col in continuous_features:
        col_original = df_original[col]
        mean, std = col_original.mean(), col_original.std()
        min_val, max_val = col_original.min(), col_original.max()
        
        samples[col] = np.random.normal(mean, std, n_samples)
        samples[col] = np.clip(samples[col], min_val, max_val)
    
    df_samples = pd.DataFrame(samples)
    print(f"Generated {len(df_samples)} samples")
    return df_samples

def predict_hardware_configs(model, df_samples, scaler_features, scaler_targets, encoders):
    print("\nü§ñ Predicting hardware configurations...")
    
    X_cats = df_samples[categorical_features].values
    X_conts = scaler_features.transform(df_samples[continuous_features])
    
    X_cats_t = torch.tensor(X_cats, dtype=torch.long).to(device)
    X_conts_t = torch.tensor(X_conts, dtype=torch.float32).to(device)
    
    temp_dataset = torch.utils.data.TensorDataset(X_cats_t, X_conts_t)
    temp_loader = DataLoader(temp_dataset, batch_size=1024, shuffle=False)
    
    all_class_preds = []
    all_reg_preds = []
    
    model.eval()
    with torch.no_grad():
        for cats, conts in tqdm(temp_loader, desc="Predicting", leave=False):
            class_outputs, reg_output = model(cats, conts)
            
            class_preds = [torch.argmax(out, dim=1).cpu().numpy() for out in class_outputs]
            all_class_preds.append(np.stack(class_preds, axis=1))
            
            reg_preds = scaler_targets.inverse_transform(reg_output.cpu().numpy())
            all_reg_preds.append(reg_preds)
    
    y_class_pred = np.vstack(all_class_preds)
    y_reg_pred = np.vstack(all_reg_preds)
    
    df_class = pd.DataFrame(y_class_pred, columns=classification_targets)
    for col in classification_targets:
        df_class[col] = encoders[col].inverse_transform(df_class[col])
    
    df_reg = pd.DataFrame(y_reg_pred, columns=regression_targets)
    for i, col in enumerate(regression_targets):
        df_reg[col] = np.clip(
            np.round(df_reg[col]),
            param_space[col].min(),
            param_space[col].max()
        ).astype(int)
    
    df_hardware = pd.concat([df_class, df_reg], axis=1)
    
    print(f"‚úÖ Predicted {len(df_hardware)} hardware configurations")
    return df_hardware

In [None]:
def validate_predicted_configs(df_hardware, param_space):
    print("\n" + "="*70)
    print("VALIDATING PREDICTED HARDWARE CONFIGURATIONS")
    print("="*70)
    
    violations = 0
    
    for col in df_hardware.columns:
        if col in param_space:
            valid_values = param_space[col]
            
            if df_hardware[col].dtype in ['float64', 'float32']:
                df_hardware[col] = df_hardware[col].round(2)
            
            invalid_mask = ~df_hardware[col].isin(valid_values)
            
            if invalid_mask.any():
                violations += invalid_mask.sum()
                print(f"{col}: {invalid_mask.sum()} invalid predictions")
    
    if violations == 0:
        print("ALL PREDICTIONS ARE VALID!")
        print(f"   {len(df_hardware)} configs meet parameter space constraints")
    else:
        print(f"\nFound {violations} constraint violations")
    
    return violations == 0

def apply_constraints_and_optimize(df_samples, df_hardware, constraints, objective, top_k=10):
    print("\n" + "="*70)
    print("APPLYING CONSTRAINTS AND OPTIMIZING")
    print("="*70)
    
    df_combined = pd.concat([
        df_samples.reset_index(drop=True),
        df_hardware.reset_index(drop=True)
    ], axis=1)
    
    mask = pd.Series([True] * len(df_combined))
    
    print("Constraints:")
    for col, (min_val, max_val) in constraints.items():
        if col in df_combined.columns:
            if min_val is not None:
                mask &= (df_combined[col] >= min_val)
                print(f"  {col} >= {min_val}")
            if max_val is not None:
                mask &= (df_combined[col] <= max_val)
                print(f"  {col} <= {max_val}")
    
    df_feasible = df_combined[mask].copy()
    
    if df_feasible.empty:
        print("\nNO FEASIBLE DESIGNS FOUND!")
        return None
    
    print(f"\nFound {len(df_feasible)} feasible designs ({len(df_feasible)/len(df_combined)*100:.1f}%)")
    
    print(f"\nOptimizing for: {objective}")
    
    if objective == 'min_power':
        df_feasible['objective'] = df_feasible['Peak Power']
        ascending = True
    elif objective == 'min_area':
        df_feasible['objective'] = df_feasible['Area']
        ascending = True
    elif objective == 'max_ipc':
        df_feasible['objective'] = df_feasible['ipc']
        ascending = False
    elif objective == 'pca':
        df_feasible['objective'] = (df_feasible['Peak Power'] * df_feasible['Area']) / (df_feasible['ipc'] + 1e-6)
        ascending = True
    elif objective == 'max_perf_per_watt':
        df_feasible['objective'] = df_feasible['ipc'] / (df_feasible['Peak Power'] + 1e-6)
        ascending = False
    else:
        print(f"Unknown objective: {objective}")
        return None
    
    df_top = df_feasible.sort_values('objective', ascending=ascending).head(top_k)
    
    return df_top

def display_top_designs(df_top, objective_name):
    print("\n" + "="*70)
    print(f"TOP {len(df_top)} RECOMMENDED HARDWARE DESIGNS")
    print(f"Optimized for: {objective_name}")
    print("="*70)
    
    for i, (idx, row) in enumerate(df_top.iterrows()):
        print(f"\n{'‚îÄ'*70}")
        print(f"DESIGN OPTION #{i+1}")
        print(f"{'‚îÄ'*70}")
        
        print("\nPerformance Requirements:")
        perf_cols = ['ipc', 'Peak Power', 'Area', 'Runtime Dynamic',
                     'branch_misprediction_rate', 'icache_miss_rate']
        for col in perf_cols:
            if col in row:
                print(f"  {col:30s}: {row[col]:.4f}")
        
        print("\nRecommended Hardware Configuration:")
        for col in all_targets:
            if col in row:
                print(f"  {col:30s}: {row[col]}")
        
        print(f"\nObjective Value: {row['objective']:.4f}")

In [None]:
class HybridCPUDataset(Dataset):
    def __init__(self, X, y_class, y_reg, cat_cols, cont_cols):
        self.cats = torch.tensor(X[cat_cols].values, dtype=torch.long)
        self.conts = torch.tensor(X[cont_cols].values, dtype=torch.float32)
        self.y_class = torch.tensor(y_class.values, dtype=torch.long)
        self.y_reg = torch.tensor(y_reg.values, dtype=torch.float32)
    
    def __len__(self):
        return len(self.cats)
    
    def __getitem__(self, idx):
        return self.cats[idx], self.conts[idx], self.y_class[idx], self.y_reg[idx]

train_loader = DataLoader(
    HybridCPUDataset(X_train, y_class_train, y_reg_train_scaled, 
                     categorical_features, continuous_features),
    batch_size=64, shuffle=True
)
test_loader = DataLoader(
    HybridCPUDataset(X_test, y_class_test, y_reg_test_scaled,
                     categorical_features, continuous_features),
    batch_size=256, shuffle=False
)

class HybridTabTransformer(nn.Module):
    def __init__(self, categories, num_continuous, 
                 classification_dims, num_regression_outputs,
                 dim=128, depth=4, heads=8, dropout=0.1):
        super().__init__()
        self.num_cont = num_continuous
        
        self.cat_embeds = nn.ModuleList([
            nn.Embedding(cat_size, dim) for cat_size in categories
        ])
        
        self.transformer = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(
                d_model=dim, nhead=heads, dropout=dropout, batch_first=True
            ),
            num_layers=depth
        )
        
        self.continuous_mlp = nn.Sequential(
            nn.Linear(num_continuous, dim * 2),
            nn.GELU(),
            nn.LayerNorm(dim * 2),
            nn.Dropout(0.1),
            nn.Linear(dim * 2, dim)
        )
        
        self.post_mlp = nn.Sequential(
            nn.Linear(dim * 2, 512),
            nn.GELU(),
            nn.Dropout(0.2),
            nn.Linear(512, 256),
            nn.GELU(),
            nn.LayerNorm(256),
            nn.Dropout(0.1),
            nn.Linear(256, 128),
            nn.GELU()
        )
        
        self.classification_heads = nn.ModuleList([
            nn.Linear(128, out_dim) for out_dim in classification_dims
        ])
        
        self.regression_head = nn.Sequential(
            nn.Linear(128, 64),
            nn.GELU(),
            nn.Linear(64, num_regression_outputs)
        )
    
    def forward(self, x_cats, x_conts):
        emb = torch.stack([
            emb_layer(x_cats[:, i])
            for i, emb_layer in enumerate(self.cat_embeds)
        ], dim=1)
        trans_out = self.transformer(emb)
        trans_out = trans_out.mean(dim=1)
        
        cont_out = self.continuous_mlp(x_conts)
        combined = torch.cat([trans_out, cont_out], dim=1)
        shared = self.post_mlp(combined)
        
        class_outputs = [head(shared) for head in self.classification_heads]
        
        reg_output = self.regression_head(shared)
        
        return class_outputs, reg_output

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Device: {device}")

categories = [int(X_train[col].nunique()) for col in categorical_features]
classification_dims = [classification_encoded[col] for col in classification_targets]

print(f"\nInput categorical features: {categorical_features}")
print(f"Categorical cardinalities: {categories}")
print(f"\nClassification outputs: {classification_targets}")
print(f"Classification dimensions: {classification_dims}")
print(f"\nRegression outputs: {regression_targets}")
print(f"Regression dimension: {len(regression_targets)}")

model = HybridTabTransformer(
    categories=categories,
    num_continuous=len(continuous_features),
    classification_dims=classification_dims,
    num_regression_outputs=len(regression_targets),
    dim=128,
    depth=4,
    heads=8,
    dropout=0.1
).to(device)

total_params = sum(p.numel() for p in model.parameters())
print(f"\nTotal parameters: {total_params:,}")

optimizer = optim.AdamW(model.parameters(), lr=5e-4, weight_decay=1e-4)

criterion_ce = nn.CrossEntropyLoss()
criterion_mse = nn.MSELoss()


epochs = 300
patience = 13
best_loss = np.inf
wait = 0

for epoch in range(epochs):
    model.train()
    total_loss = 0
    
    pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}", leave=False)
    for cats, conts, y_class, y_reg in pbar:
        cats = cats.to(device)
        conts = conts.to(device)
        y_class = y_class.to(device)
        y_reg = y_reg.to(device)
        
        optimizer.zero_grad()
        
        class_outputs, reg_output = model(cats, conts)
        
        loss_class = sum(
            criterion_ce(out, y_class[:, i])
            for i, out in enumerate(class_outputs)
        )
        
        loss_reg = criterion_mse(reg_output, y_reg)
        
        loss = 5 * loss_class + loss_reg
        
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        
        total_loss += loss.item()
        pbar.set_postfix({'loss': f'{loss.item():.4f}'})
    
    avg_loss = total_loss / len(train_loader)
    
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for cats, conts, y_class, y_reg in test_loader:
            cats = cats.to(device)
            conts = conts.to(device)
            y_class = y_class.to(device)
            y_reg = y_reg.to(device)
            
            class_outputs, reg_output = model(cats, conts)
            
            loss_class = sum(
                criterion_ce(out, y_class[:, i])
                for i, out in enumerate(class_outputs)
            )
            loss_reg = criterion_mse(reg_output, y_reg)
            loss = loss_class + loss_reg
            
            val_loss += loss.item()
    
    val_loss /= len(test_loader)
    
    if epoch % 10 == 0 or epoch < 5:
        print(f"Epoch {epoch+1:3d} | Train Loss: {avg_loss:.4f} | Val Loss: {val_loss:.4f}")
    
    if val_loss < best_loss:
        best_loss = val_loss
        wait = 0
        torch.save(model.state_dict(), "hybrid_tabtransformer_best.pt")
        joblib.dump({
            'scaler_features': scaler_features,
            'scaler_targets': scaler_targets,
            'encoders': encoders
        }, "hybrid_preprocessors.pkl")
        if epoch > 0:
            print(f"‚úÖ Best model saved at epoch {epoch+1} (val_loss: {val_loss:.4f})")
    else:
        wait += 1
        if wait >= patience:
            print(f"\n‚èπÔ∏è Early stopping at epoch {epoch+1}")
            break

model.load_state_dict(torch.load("hybrid_tabtransformer_best.pt", weights_only=True))
preprocessors = joblib.load("hybrid_preprocessors.pkl")
scaler_features = preprocessors['scaler_features']
scaler_targets = preprocessors['scaler_targets']
encoders = preprocessors['encoders']

model.eval()

all_class_preds, all_class_true = [], []
all_reg_preds, all_reg_true = [], []

with torch.no_grad():
    for cats, conts, y_class, y_reg in test_loader:
        cats, conts = cats.to(device), conts.to(device)
        class_outputs, reg_output = model(cats, conts)
        
        class_preds = [torch.argmax(out, dim=1).cpu().numpy() for out in class_outputs]
        all_class_preds.append(np.stack(class_preds, axis=1))
        all_class_true.append(y_class.numpy())
        
        reg_preds = scaler_targets.inverse_transform(reg_output.cpu().numpy())
        reg_true = scaler_targets.inverse_transform(y_reg.numpy())
        all_reg_preds.append(reg_preds)
        all_reg_true.append(reg_true)

y_class_pred = np.vstack(all_class_preds)
y_class_true = np.vstack(all_class_true)
y_reg_pred = np.vstack(all_reg_preds)
y_reg_true = np.vstack(all_reg_true)

for i, col in enumerate(regression_targets):
    y_reg_pred[:, i] = np.clip(
        np.round(y_reg_pred[:, i]),
        param_space[col].min(),
        param_space[col].max()
    )

print("\n" + "="*80)
print("CLASSIFICATION TARGETS")
print("="*80)
print(f"{'Parameter':<25} {'Accuracy':<12}")
print("-"*80)

for i, col in enumerate(classification_targets):
    acc = accuracy_score(y_class_true[:, i], y_class_pred[:, i])
    print(f"{col:<25} {acc:<12.3f}")

print("\n" + "="*80)
print("REGRESSION TARGETS")
print("="*80)
print(f"{'Parameter':<25} {'MAE':<12} {'MAE%':<15}")
print("-"*80)

for i, col in enumerate(regression_targets):
    mae = mean_absolute_error(y_reg_true[:, i], y_reg_pred[:, i])
    
    valid_range = param_space[col]
    valid_min = valid_range.min()
    valid_max = valid_range.max()
    mae_p = mae / (valid_max - valid_min)
    
    print(f"{col:<25} {mae:<12.2f} {mae_p:<15.1%}")

print("="*80)
print("\nHYBRID MODEL SUCCESSFULLY TRAINED!")
print("   - Classification: Discrete/categorical parameters")
print("   - Regression: Integer range parameters")

In [None]:
df_samples = generate_constraint_samples(n_samples=10000)

df_hardware = predict_hardware_configs(
    model, df_samples, scaler_features, scaler_targets, encoders
)

validate_predicted_configs(df_hardware, param_space)

print("\n" + "="*70)
print("EXAMPLE : High-Performance Power-Efficient Design")
print("="*70)

constraints_1 = {
    'ipc': (1.5, None),           # IPC >= 1.5
    'Peak Power': (None, 50.0),   # Power <= 50W
    'Area': (None, 200.0)         # Area <= 200 mm¬≤
}

df_top_1 = apply_constraints_and_optimize(
    df_samples, df_hardware,
    constraints=constraints_1,
    objective='max_perf_per_watt',
    top_k=5
)

if df_top_1 is not None:
    display_top_designs(df_top_1, 'Max Performance per Watt')