## Import Libraries

In [1]:
import os
import ast
import torch
import random
import numpy as np
from scipy.sparse import csr_matrix
from scipy.sparse.linalg import eigsh
from torch_geometric.data import Data, DataLoader, Batch
from torch_geometric.nn import GATv2Conv, SAGEConv, GCNConv
import torch.nn.functional as F
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


## Parse Files

In [2]:
def parse_formatted(filepath):
    globals_ = {}
    bounds = {}
    with open(filepath, 'r') as f:
        lines = f.read().splitlines()
    for l in lines:
        if 'Core Aspect Ratio' in l:
            globals_['Core Aspect Ratio'] = float(l.split('=')[-1].strip())
        if 'Utilization' in l and 'Place Density' not in l:
            globals_['Utilization'] = float(l.split('=')[-1].strip())
        if 'Place Density' in l:
            globals_['Place Density'] = float(l.split('=')[-1].strip())
        if '=' in l:
            key, val = l.split('=', 1)
            key = key.strip()
            if key in {'lx', 'ly', 'ux', 'uy'}:
                bounds[key] = float(val.strip())
    records = [ast.literal_eval(l) for l in lines if l.strip().startswith('{')]
    drivers = [r['driver']['id'] for r in records]
    sinks = [s['id'] for r in records for s in r['sinks']]
    node_ids = sorted(set(drivers + sinks))
    return node_ids, records, globals_, bounds

def parse_label_formatted(label_path, node_ids, bounds):
    lines = []
    with open(label_path, 'r') as f:
        for row in f:
            parts = row.strip().split()
            if len(parts) == 3 and parts[0].isdigit():
                lines.append((int(parts[0]), float(parts[1]), float(parts[2])))
    ids, xs, ys = zip(*lines)
    xs = np.array(xs)
    ys = np.array(ys)
    lx, ly, ux, uy = bounds['lx'], bounds['ly'], bounds['ux'], bounds['uy']

    x_norm = (xs - lx) / (ux - lx) 
    y_norm = (ys - ly) / (uy - ly) 
    
    # clamp if outside lower or upper bound  
    x_norm = np.clip(x_norm, 0.0, 1.0)
    y_norm = np.clip(y_norm, 0.0, 1.0)
    # map back to node order
    id2coord = {i: (x_norm[idx], y_norm[idx]) for idx, i in enumerate(ids)}
    coords = [id2coord.get(nid, (0.0, 0.0)) for nid in node_ids]
    return torch.tensor(coords, dtype=torch.float)


## Matrix/Feature Generation and Relative Loss

In [3]:
def build_edge_index(node_ids, records, bidirectional=True):
    id2idx = {nid: i for i, nid in enumerate(node_ids)}
    edges = []
    for r in records:
        d = id2idx[r['driver']['id']]
        for s in r['sinks']:
            sid = id2idx[s['id']]
            edges.append((d, sid))
            if bidirectional:
                edges.append((sid, d))
    if not edges:
        return torch.empty((2, 0), dtype=torch.long)
    return torch.tensor(edges, dtype=torch.long).t().contiguous()

def build_adjacency(N, edge_index):
    src, dst = edge_index.cpu().numpy()
    return csr_matrix((np.ones(len(src)), (src, dst)), shape=(N, N))

def compute_laplacian_eigenvectors(adj, k=10, normalized=True):
    N = adj.shape[0]
    k_eff = min(k, max(N-1, 0))
    deg = np.array(adj.sum(axis=1)).flatten()
    if normalized:
        inv_s = np.where(deg > 0, 1.0/np.sqrt(deg), 0.0)
        D = csr_matrix((inv_s, (range(N), range(N))), shape=adj.shape)
        L = csr_matrix(np.eye(N)) - D @ adj @ D
    else:
        D = csr_matrix((deg, (range(N), range(N))), shape=adj.shape)
        L = D - adj
    if k_eff < 1:
        return np.zeros((N, 0), dtype=np.float32)
    try:
        _, vecs = eigsh(L, k=k_eff+1, which='SM')
    except:
        _, vecs = np.linalg.eigh(L.toarray())
    return vecs[:, 1:k_eff+1]

def compute_relative_loss(out, data):
    id2idx = {nid: i for i, nid in enumerate(data.node_ids)}
    rel_loss, cnt = 0.0, 0
    for rec in data.records:
        d_i = id2idx[rec['driver']['id']]
        for sink in rec['sinks']:
            s_i = id2idx[sink['id']]
            pred_d, pred_s = out[d_i], out[s_i]
            true_d, true_s = data.y[d_i], data.y[s_i]
            rel_loss += (torch.norm(pred_d - pred_s) - torch.norm(true_d - true_s)).pow(2)
            cnt += 1
    return rel_loss / max(cnt, 1)

## Load Data

In [4]:
def load_all_data(root_dir, design_filter='gcd', batch_size=16, shuffle=True):
    data_list = []
    for dp, _, files in os.walk(root_dir):
        for f in files:
            if not f.endswith('_formatted.txt') or f.endswith('_label_formatted.txt'):
                continue
            if design_filter and f.split('_')[0] != design_filter:
                continue
            fp = os.path.join(dp, f)
            lf = fp.replace('_formatted.txt', '_label_formatted.txt')
            if not os.path.exists(lf):
                continue
            node_ids, records, globals_, bounds = parse_formatted(fp)
            orig_coords = {rec['driver']['id']:(rec['driver']['x'], rec['driver']['y']) for rec in records}
            for rec in records:
                for s in rec['sinks']:
                    orig_coords[s['id']] = (s['x'], s['y'])
            feats = torch.tensor(
                compute_laplacian_eigenvectors(
                    build_adjacency(len(node_ids), build_edge_index(node_ids, records)), 10
                ), dtype=torch.float
            )
            labels = parse_label_formatted(lf, node_ids, bounds)
            u_vec = torch.tensor([
                (globals_['Core Aspect Ratio'] - 0.5) / 0.4,
                (globals_['Utilization'] - 40.0) / 28.0,
                (globals_['Place Density'] - 0.2) / 0.3
            ], dtype=torch.float).unsqueeze(0)
            edges = build_edge_index(node_ids, records)
            data = Data(x=feats, edge_index=edges, u=u_vec, y=labels)
            data.design_name = f.replace('_formatted.txt','')
            data.node_ids = node_ids
            data.bounds = bounds
            data.orig_coords = orig_coords
            data.fixed_ids = [rec['driver']['id'] for rec in records if rec['driver'].get('is_fixed')]
            data.fixed_ids += [s['id'] for rec in records for s in rec['sinks'] if s.get('is_fixed')]
            data_list.append(data)
    if shuffle:
        random.shuffle(data_list)
    return DataLoader(data_list, batch_size=batch_size, shuffle=shuffle, exclude_keys=['orig_coords','node_ids','bounds','fixed_ids'])


## Model

In [5]:
class PlacementGNN(torch.nn.Module):
    def __init__(self, in_channels, hidden_channels=64, num_layers=3, global_channels=3, conv_type='sage'):
        super().__init__()
        ConvMap = {'gat': GATv2Conv, 'sage': SAGEConv, 'gcn': GCNConv}
        Conv = ConvMap[conv_type]
        self.convs = torch.nn.ModuleList()
        for i in range(num_layers):
            self.convs.append(
                Conv(
                    in_channels if i == 0 else hidden_channels,
                    hidden_channels
                )
            )
        self.post_lin = torch.nn.Linear(hidden_channels + global_channels, hidden_channels)
        self.out_lin = torch.nn.Linear(hidden_channels, 2)

    def forward(self, x, edge_index, batch, u, edge_attr=None):
        for conv in self.convs:
            if isinstance(conv, GATv2Conv):
                x = conv(x, edge_index, edge_attr)
            else:
                x = conv(x, edge_index)
            x = F.relu(x)
        u_exp = u[batch]
        h = torch.cat([x, u_exp], dim=1)
        h = F.relu(self.post_lin(h))
        return self.out_lin(h)

## Training

In [6]:
print("Starting")
loader = load_all_data('./raw_graph', design_filter='gcd', batch_size=8)
print("Data Loaded!")
model = PlacementGNN(10, 64, 4, 3, 'sage').to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
loss_fn = torch.nn.MSELoss()
print("Training")
for epoch in range(1, 2):
    model.train()
    total = 0.0
    for data in loader:
        data = data.to(device)
        optimizer.zero_grad()
        out = model(data.x, data.edge_index, data.batch, data.u)
        print("x:",data.x.shape)
        loss = loss_fn(out, data.y)
        loss.backward()
        optimizer.step()
        total += loss.item() * data.num_graphs
    print(f'Epoch {epoch} MSE: {total/len(loader.dataset):.4f}')
print("Saving Model")
torch.save(model.state_dict(), 'gnn_all.pth')

Starting




Data Loaded!
Training
x: torch.Size([4080, 10])
x: torch.Size([3704, 10])
x: torch.Size([3892, 10])
x: torch.Size([3704, 10])
x: torch.Size([3704, 10])
x: torch.Size([4268, 10])
x: torch.Size([3516, 10])
x: torch.Size([3516, 10])
x: torch.Size([3516, 10])
x: torch.Size([3516, 10])
x: torch.Size([3516, 10])
x: torch.Size([2952, 10])
x: torch.Size([3516, 10])
x: torch.Size([3704, 10])
x: torch.Size([4080, 10])
x: torch.Size([3704, 10])
x: torch.Size([3328, 10])
x: torch.Size([3892, 10])
x: torch.Size([3516, 10])
x: torch.Size([4080, 10])
x: torch.Size([3704, 10])
x: torch.Size([3516, 10])
x: torch.Size([3892, 10])
x: torch.Size([4080, 10])
x: torch.Size([3704, 10])
Epoch 1 MSE: 0.1943
Saving Model


## Predictions 

In [13]:
import csv
model.eval()
with torch.no_grad():
    for data in loader.dataset:
        formatted_fp = os.path.join('./raw_graph', data.design_name + '_formatted.txt')
        _, records, _, _ = parse_formatted(formatted_fp)
        data = data.to(device)
        batch_vec = torch.zeros(data.x.size(0), dtype=torch.long, device=device)
        out = model(data.x, data.edge_index, batch_vec, data.u)
        lx, ux = data.bounds['lx'], data.bounds['ux']
        ly, uy = data.bounds['ly'], data.bounds['uy']
        scale  = torch.tensor([ux - lx, uy - ly], device=out.device)
        offset = torch.tensor([lx, ly], device=out.device)
        preds  = (out * scale + offset).cpu().numpy()
        id2name = {}
        fixed_ids = []
        for rec in records:
            d = rec['driver']
            id2name[d['id']] = d.get('name', str(d['id']))
            for s in rec['sinks']:
                id2name[s['id']] = s.get('name', str(s['id']))
                
        node_ids   = data.node_ids.tolist() if torch.is_tensor(data.node_ids) else data.node_ids
        names = [id2name.get(nid, str(nid)) for nid in node_ids]
        if hasattr(data, 'fixed_ids') and data.fixed_ids:
            mask = ~np.isin(node_ids, data.fixed_ids)
            names = [name for name, m in zip(names, mask) if m]
            preds = preds[mask]

        fname = f"{data.design_name}_predictions.csv"
        with open(fname, "w", newline="") as f:
            w = csv.writer(f)
            w.writerow(["InstanceName", "x_center", "y_center"])
            for name, (xv, yv) in zip(names, preds):
                w.writerow([name, f"{xv:.4f}", f"{yv:.4f}"])
        print(f"Saved {fname}")


Saved gcd_nangate45_gcd_run_5_2_1_predictions.csv
Saved gcd_asap7_gcd_run_5_3_4_predictions.csv
Saved gcd_asap7_gcd_run_5_1_3_predictions.csv
Saved gcd_nangate45_gcd_run_2_2_2_predictions.csv
Saved gcd_asap7_gcd_run_4_2_2_predictions.csv
Saved gcd_asap7_gcd_run_1_2_3_predictions.csv
Saved gcd_asap7_gcd_run_1_4_3_predictions.csv
Saved gcd_nangate45_gcd_run_2_1_1_predictions.csv
Saved gcd_nangate45_gcd_run_2_2_1_predictions.csv
Saved gcd_nangate45_gcd_run_3_5_4_predictions.csv
Saved gcd_asap7_gcd_run_5_5_3_predictions.csv
Saved gcd_asap7_gcd_run_1_1_3_predictions.csv
Saved gcd_asap7_gcd_run_1_5_4_predictions.csv
Saved gcd_nangate45_gcd_run_3_1_2_predictions.csv
Saved gcd_asap7_gcd_run_4_5_2_predictions.csv
Saved gcd_nangate45_gcd_run_1_1_2_predictions.csv
Saved gcd_nangate45_gcd_run_1_5_1_predictions.csv
Saved gcd_nangate45_gcd_run_4_2_1_predictions.csv
Saved gcd_asap7_gcd_run_4_2_4_predictions.csv
Saved gcd_nangate45_gcd_run_5_1_2_predictions.csv
Saved gcd_asap7_gcd_run_2_3_1_prediction