In [None]:
# Install PyTorch first (CUDA 11.8)
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118 -q

# Install PyTorch Geometric and extensions using pre-built wheels
!pip install torch-geometric -q
!pip install pyg_lib torch_scatter torch_sparse torch_cluster torch_spline_conv -f https://data.pyg.org/whl/torch-2.7.0+cu118.html -q

# Install other utilities
!pip install matplotlib numpy scipy pandas psutil -q

print("All packages installed successfully!")

In [None]:
# Core libraries
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

# PyTorch Geometric
from torch_geometric.data import Data
from torch_geometric.nn import GATConv, GCNConv
from torch_geometric.loader import DataLoader

# Numerical and visualization
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from matplotlib.patches import Rectangle

# System utilities
import os
import time
import gc
import psutil
from scipy.spatial import KDTree
from collections import defaultdict, Counter

# Check GPU availability
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
print(f"   PyTorch version: {torch.__version__}")

if torch.cuda.is_available():
    print(f"   GPU: {torch.cuda.get_device_name(0)}")
    print(f"   GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")

print("\nAll libraries imported successfully!")

In [None]:
# ============================================
# CONFIGURE YOUR CIRCUITNET PATHS HERE
# ============================================

CIRCUITNET_BASE = r"H:\Labs\Generative Ai\Ayush1\Ayush\CircuitNet"

# Data paths
PLACEMENT_PATH = os.path.join(CIRCUITNET_BASE, "instance_placement_micron-002", "instance_placement_micron")
NODE_ATTR_PATH = os.path.join(CIRCUITNET_BASE, "graph_features", "graph_information", "node_attr")
NET_ATTR_PATH = os.path.join(CIRCUITNET_BASE, "graph_features", "graph_information", "net_attr")
PIN_ATTR_PATH = os.path.join(CIRCUITNET_BASE, "graph_features", "graph_information", "pin_attr")
CONGESTION_PATH = os.path.join(CIRCUITNET_BASE, "congestion")

# Verify paths exist
print("Verifying dataset paths...\n")

paths_to_check = {
    "Base Directory": CIRCUITNET_BASE,
    "Placement Data": PLACEMENT_PATH,
    "Node Attributes": NODE_ATTR_PATH,
    "Net Attributes": NET_ATTR_PATH,
    "Pin Attributes": PIN_ATTR_PATH,
}

all_exist = True
for name, path in paths_to_check.items():
    exists = os.path.exists(path)
    status = "[OK]" if exists else "[MISSING]"
    print(f"{status} {name}: {path}")
    if not exists:
        all_exist = False

print()
if all_exist:
    print("All paths verified! Ready to load data.")
else:
    print("WARNING: Some paths are missing. Please check your CircuitNet installation.")
    print("\nDownload from: https://drive.google.com/drive/folders/1GjW-1LBx1563bg3pHQGvhcEyK2A9sYUB")

In [None]:
# Load dataset
MAX_SAMPLES = 500  # Increased from 100 for better generalization

print(f"Loading {MAX_SAMPLES} samples from CircuitNet...\n")
circuitnet_dataset = load_circuitnet_dataset(max_samples=MAX_SAMPLES)

if circuitnet_dataset:
    # Split into train/test (80/20)
    split_idx = int(len(circuitnet_dataset) * 0.8)
    cn_train = circuitnet_dataset[:split_idx]
    cn_test = circuitnet_dataset[split_idx:]
    
    print(f"\nDataset Statistics:")
    print(f"   Total samples: {len(circuitnet_dataset)}")
    print(f"   Training samples: {len(cn_train)}")
    print(f"   Test samples: {len(cn_test)}")
    print(f"   Cells per sample: ~{circuitnet_dataset[0].num_cells:,}")
    print(f"   Edges per sample: ~{circuitnet_dataset[0].edge_index.shape[1]:,}")
    print(f"\nDataset ready for training!")
else:
    print("Failed to load dataset. Check your paths!")

In [None]:
class VLSIPlacementGNN(nn.Module):
    """
    Graph Attention Network for VLSI cell placement prediction
    
    Industry-Aware Architecture (v2 - Anti-Collapse):
    - Input: 16 features per cell (size, connectivity, macro classification, context)
    - 4 GAT layers with multi-head attention + residual connections
    - NO Sigmoid bottleneck - uses scaled tanh for full [0,1] range utilization
    - Residual connections prevent gradient vanishing in deep GNN
    - Output: (x, y) coordinates for each cell
    """
    
    def __init__(self, input_dim=16, hidden_dim=128, output_dim=2, num_layers=4, heads=4):
        super(VLSIPlacementGNN, self).__init__()
        
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        self.num_layers = num_layers
        
        # Input projection
        self.input_proj = nn.Linear(input_dim, hidden_dim)
        self.input_norm = nn.LayerNorm(hidden_dim)
        
        # GAT layers with attention + layer norms for residual connections
        self.gat_layers = nn.ModuleList()
        self.layer_norms = nn.ModuleList()
        for i in range(num_layers):
            in_channels = hidden_dim
            out_channels = hidden_dim
            self.gat_layers.append(
                GATConv(in_channels, out_channels // heads, heads=heads, dropout=0.1, concat=True)
            )
            self.layer_norms.append(nn.LayerNorm(hidden_dim))
        
        # Output projection - no Sigmoid! Use clamped output instead
        # Sigmoid causes center collapse by squashing gradients at extremes
        self.output_proj = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(hidden_dim // 2, output_dim)
        )
    
    def forward(self, data):
        x, edge_index = data.x, data.edge_index
        
        # Input projection
        x = self.input_proj(x)
        x = self.input_norm(x)
        x = F.relu(x)
        
        # GAT layers with RESIDUAL connections
        for i, (gat_layer, layer_norm) in enumerate(zip(self.gat_layers, self.layer_norms)):
            residual = x
            x = gat_layer(x, edge_index)
            x = layer_norm(x)
            if i < len(self.gat_layers) - 1:
                x = F.relu(x)
                x = F.dropout(x, p=0.1, training=self.training)
            # Residual connection - prevents information loss in deep GNN
            x = x + residual
        
        # Output projection
        out = self.output_proj(x)
        
        # Clamp to [0, 1] instead of Sigmoid
        # This preserves gradients at boundaries (Sigmoid kills them)
        out = out.clamp(0.0, 1.0)
        
        return out

# Create model with 16 input features (industry-relevant)
model = VLSIPlacementGNN(
    input_dim=16,      # 16 industry-relevant features
    hidden_dim=128,
    output_dim=2,
    num_layers=4,
    heads=4
).to(device)

# Count parameters
num_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

print("Model Architecture (v2 - Anti-Collapse):\n")
print(model)
print(f"\nModel Statistics:")
print(f"   Total parameters: {num_params:,}")
print(f"   Input features: 16 dimensions (industry-relevant)")
print(f"   Key improvements over v1:")
print(f"     - Residual connections (prevents gradient vanishing)")
print(f"     - LayerNorm (stabilizes training)")
print(f"     - No Sigmoid (prevents center collapse)")
print(f"     - Clamp [0,1] output (preserves gradients at boundaries)")
print(f"   Device: {device}")
print("\nModel created successfully!")

In [None]:
# Training loop with resume capability
train_losses = []
test_losses = []
start_epoch = 0

# Check if model exists and load it
model_save_path = r"H:\Labs\Generative Ai\Ayush\vlsi_placement_model.pth"

if os.path.exists(model_save_path):
    print("=" * 80)
    print("LOADING EXISTING MODEL")
    print("=" * 80)
    
    checkpoint = torch.load(model_save_path)
    
    # Check if saved model has same architecture (input_dim may differ)
    saved_config = checkpoint.get('model_config', {})
    saved_input_dim = saved_config.get('input_dim', 10)
    current_input_dim = model.input_dim
    
    if saved_input_dim != current_input_dim:
        print(f"   WARNING: Saved model has input_dim={saved_input_dim}, current model has input_dim={current_input_dim}")
        print(f"   Architecture changed (new industry-relevant features). Training from scratch.")
        print("=" * 80)
        print()
    else:
        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        start_epoch = checkpoint['epoch']
        train_losses = checkpoint.get('train_losses', [])
        test_losses = checkpoint.get('test_losses', [])
        
        print(f"Model loaded from: {model_save_path}")
        print(f"   Previous epochs completed: {start_epoch}")
        print(f"   Previous train loss: {train_losses[-1]:.6f}" if train_losses else "   No previous train loss")
        print(f"   Previous test loss: {test_losses[-1]:.6f}" if test_losses else "   No previous test loss")
        print(f"   Resuming training from epoch {start_epoch + 1}")
        print("=" * 80)
        print()
else:
    print("=" * 80)
    print("STARTING TRAINING FROM SCRATCH")
    print("=" * 80)
    print()

print("Starting Training...\n")
print("=" * 80)

start_time = time.time()

for epoch in range(start_epoch, start_epoch + NUM_EPOCHS):
    model.train()
    epoch_loss = 0
    batch_count = 0
    
    # Training
    for data in cn_train:
        data = data.to(device)
        optimizer.zero_grad()
        
        # Forward pass
        pred = model(data)
        loss = criterion(pred, data.y)
        
        # Backward pass
        loss.backward()
        optimizer.step()
        
        epoch_loss += loss.item()
        batch_count += 1
    
    avg_train_loss = epoch_loss / batch_count
    train_losses.append(avg_train_loss)
    
    # Evaluation
    model.eval()
    test_loss = 0
    test_count = 0
    
    with torch.no_grad():
        for data in cn_test:
            data = data.to(device)
            pred = model(data)
            loss = criterion(pred, data.y)
            test_loss += loss.item()
            test_count += 1
    
    avg_test_loss = test_loss / test_count
    test_losses.append(avg_test_loss)
    
    # Print progress
    elapsed = time.time() - start_time
    print(f"Epoch {epoch+1:2d}/{start_epoch + NUM_EPOCHS} | "
          f"Train Loss: {avg_train_loss:.6f} | "
          f"Test Loss: {avg_test_loss:.6f} | "
          f"Time: {elapsed:.1f}s")

print("=" * 80)
print(f"\nTraining complete! Total time: {elapsed/60:.1f} minutes")
print(f"   Final train loss: {train_losses[-1]:.6f}")
print(f"   Final test loss: {test_losses[-1]:.6f}")
print(f"   Total epochs completed: {start_epoch + NUM_EPOCHS}")

In [None]:
# Save model
model_save_path = r"H:\Labs\Generative Ai\Ayush1\Ayush\vlsi_placement_model.pth"

# Calculate total epochs (start_epoch + NUM_EPOCHS)
total_epochs = start_epoch + NUM_EPOCHS

torch.save({
    'epoch': total_epochs,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'train_losses': train_losses,
    'test_losses': test_losses,
    'model_config': {
        'input_dim': 16,
        'hidden_dim': 128,
        'output_dim': 2,
        'num_layers': 4,
        'heads': 4
    }
}, model_save_path)

print(f"Model saved to: {model_save_path}")
print(f"   Total epochs completed: {total_epochs}")
print(f"   File size: {os.path.getsize(model_save_path) / 1e6:.2f} MB")
print(f"\nTo load later:")
print(f"   checkpoint = torch.load('{model_save_path}')")
print(f"   model.load_state_dict(checkpoint['model_state_dict'])")

In [None]:
def visualize_industry_layout(data, predictions, chip_width_microns=1000, chip_height_microns=1000, dpi=150):
    """
    Industry-grade layout visualization with micron precision
    """
    fig, axes = plt.subplots(1, 2, figsize=(16, 8), dpi=dpi)
    
    # Convert normalized coordinates to microns
    predicted_microns = predictions * np.array([chip_width_microns, chip_height_microns])
    actual_microns = data.y.cpu().numpy() * np.array([chip_width_microns, chip_height_microns])
    
    # Plot 1: Predicted Layout
    ax1 = axes[0]
    ax1.set_xlim(0, chip_width_microns)
    ax1.set_ylim(0, chip_height_microns)
    ax1.set_aspect('equal')
    ax1.set_facecolor('#1a1a1a')
    
    # Draw cells with different colors based on size
    for i, (x, y) in enumerate(predicted_microns):
        # Estimate cell size from node features (simplified)
        cell_width = max(5, min(50, data.x[i, 3].item() * 20)) if data.x.shape[1] > 3 else 10
        cell_height = cell_width * 0.8
        
        rect = Rectangle((x - cell_width/2, y - cell_height/2), 
                        cell_width, cell_height,
                        facecolor='cyan', edgecolor='white', 
                        alpha=0.7, linewidth=0.5)
        ax1.add_patch(rect)
    
    # Draw connections (sample)
    edge_index = data.edge_index.cpu().numpy()
    for i in range(0, min(500, edge_index.shape[1]), 5):  # Sample edges
        src, dst = edge_index[:, i]
        ax1.plot([predicted_microns[src, 0], predicted_microns[dst, 0]],
                [predicted_microns[src, 1], predicted_microns[dst, 1]],
                'yellow', alpha=0.2, linewidth=0.3)
    
    ax1.set_title('Predicted Layout (Micron Precision)', fontsize=14, color='white', pad=20)
    ax1.set_xlabel('X Position (µm)', fontsize=12, color='white')
    ax1.set_ylabel('Y Position (µm)', fontsize=12, color='white')
    ax1.tick_params(colors='white')
    ax1.grid(True, alpha=0.2, color='gray')
    
    # Plot 2: Actual Layout
    ax2 = axes[1]
    ax2.set_xlim(0, chip_width_microns)
    ax2.set_ylim(0, chip_height_microns)
    ax2.set_aspect('equal')
    ax2.set_facecolor('#1a1a1a')
    
    for i, (x, y) in enumerate(actual_microns):
        cell_width = max(5, min(50, data.x[i, 3].item() * 20)) if data.x.shape[1] > 3 else 10
        cell_height = cell_width * 0.8
        
        rect = Rectangle((x - cell_width/2, y - cell_height/2), 
                        cell_width, cell_height,
                        facecolor='lime', edgecolor='white', 
                        alpha=0.7, linewidth=0.5)
        ax2.add_patch(rect)
    
    # Draw connections
    for i in range(0, min(500, edge_index.shape[1]), 5):
        src, dst = edge_index[:, i]
        ax2.plot([actual_microns[src, 0], actual_microns[dst, 0]],
                [actual_microns[src, 1], actual_microns[dst, 1]],
                'yellow', alpha=0.2, linewidth=0.3)
    
    ax2.set_title('Actual Layout (Ground Truth)', fontsize=14, color='white', pad=20)
    ax2.set_xlabel('X Position (µm)', fontsize=12, color='white')
    ax2.set_ylabel('Y Position (µm)', fontsize=12, color='white')
    ax2.tick_params(colors='white')
    ax2.grid(True, alpha=0.2, color='gray')
    
    plt.tight_layout()
    plt.savefig('industry_layout.png', dpi=dpi, facecolor='#1a1a1a')
    plt.show()
    
    # Calculate metrics
    mse = np.mean((predicted_microns - actual_microns) ** 2)
    mae = np.mean(np.abs(predicted_microns - actual_microns))
    
    print(f"\nLayout Accuracy Metrics:")
    print(f"   Mean Squared Error: {mse:.2f} µm²")
    print(f"   Mean Absolute Error: {mae:.2f} µm")
    print(f"   Average X Error: {np.mean(np.abs(predicted_microns[:, 0] - actual_microns[:, 0])):.2f} µm")
    print(f"   Average Y Error: {np.mean(np.abs(predicted_microns[:, 1] - actual_microns[:, 1])):.2f} µm")

# Visualize with test data
print("Creating industry-grade layout visualization...")
model.eval()
with torch.no_grad():
    test_data = cn_test[10].to(device)
    test_pred = model(test_data).cpu().numpy()

visualize_industry_layout(test_data, test_pred)
print("Industry-grade visualization complete!")