References:
- https://github.com/pyg-team/pytorch_geometric
- https://pytorch-geometric.readthedocs.io/en/latest/notes/introduction.html
- https://colab.research.google.com/drive/14OvFnAXggxB8vM4e8vSURUp1TaKnovzX?usp=sharing#scrollTo=imGrKO5YH11-

KNOWN_BUGS:
- Component Layer doesn't currently account for through hole components w/ pads on the opposite side. This is because I am throwing away pad info and just using component outline info.
- Board Outline is currently ignored
- Not currently exporting/using non-designator text

TODO:
- Update categorical handling to automatically handle any number of categorical features
- If I get this architecture making descent predictions, I need to predict silkscreen sizes and not just have them be static
  
NOTES:
- Cadence doesn't seem to have any standard for naming silk layers or artwork groups. For example silkscreen top could be used as the assembly top output to gerber or vice versa and everything in between. Also, there are 10s of silkscreen layer options. I am currently assuming the valid silk layers by searching the artwork groups for "silk" in the name or a couple of other variations that i've seen, but this is by no means a catch all solution and may need to be manually edited for each board export.
- Currently not using slk-arc in training (I don't see this being necessary until predictions become more accurate. I don't think this is a major factor holding back good predictions). I do have it's data preprocessing in this notebook, but it is not used in training.
- I created a tool embedding to feed to the network (cadence/altium), but it will still get created if only one tool is used.
- I was using PyGeometric's train/val/test mask attribute, but had to remove it because if you use DataLoader w/ mask, sometimes you will get batches of data w/ the entire mask set to False which will give nan predictions.

Tools:
- Altium export/import scripts tested w/ 22.5.1
- Cadence export script tested w/ PCB Editor v17.4

In [None]:
import sys
sys.path.insert(0, './models')

import torch
from torch_geometric.data import Data

import pandas as pd
import os
import numpy as np
import pickle
import matplotlib.pyplot as plt
import random
from time import time
from collections import Counter

# Add project directories not tracked by git
prj_dirs = ['models']
for d in ['train','test','predictions']:
    subdir = os.path.join('data',d)
    prj_dirs += [subdir]
    
for d in prj_dirs:
    if not(os.path.exists(d)):
        os.mkdir(d)

DATA_DIR = os.path.join('data','train')
TEST_DIR = os.path.join('data','test')

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

In [None]:
data_files = [os.path.join(DATA_DIR, f) for f in os.listdir(DATA_DIR) if f.endswith('.csv')]
assert len(data_files) > 0, f'Add csv datasets to {DATA_DIR} before running.'
test_files = [os.path.join(TEST_DIR, f) for f in os.listdir(TEST_DIR) if f.endswith('.csv')]
data_files += test_files

In [None]:
data_files

### Data Description:

| Column     | Type | Description |
| ----------- | ----------- | ----------- |
| Tool      | N/A     | altium/cadence|
| Type      | N/A       | Features extracted from type Silk or Pin (x/y/L/R/T/B). If Pin Type, features may be extracted from parent Component |
| Designator      | Component       | Parent Component Designator |
| x/y      | Component or Silk       | Origin Coords |
| L/R/T/B      | Component or Silk       | Bounding Rectangle (x1,x2,y1,y2 for tracks/arcs) |
| Rotation      | Component or Silk       | Degrees |
| Layer      | Component      | Top or Bottom |
| Info      | Type Specific      | Delimited list of extra info specific to Type |
| Board      | N/A     | This is not imported data. This is an index of each board/dataset |

### Info Column Specific Data

| Tool  | Info Column     | Type | Description |
| ----- | ----------- | ----------- | ----------- |
| Cadence | OriginalLayer      | All     | Original Cadence Layer Name before filtered to Top/Bottom |
| Altium | PinName      | Pin     | Pin Name |
| Cadence | PinNumber      | Pin     | Pin Number |
| Cadence | PadstackName      | Pin     | Padstack Name |
| Cadence | PinRotation      | Pin     | Rotation in Degrees |
| Cadence | PinRelativeRotation      | Pin     | Rotation in Degrees Relative to Component |
| Cadence | IsThrough      | Pin     | Is a Through Hole pin |
| Both | NetName      | Pin     | Pin's Net Name |
| Both | PinX/Y      | Pin     | Pin's Origin Coords |
| Both | Width      | Track/Arc     | Track/Arc Width |
| Altium | Length      | Track     | Track Length |
| Both | Radius      | Arc     | Radius in mils |
| Altium | StartAngle      | Arc     | Start Angle |
| Altium | EndAngle      | Arc     | End Angle |
| Cadence | IsCircle      | Arc     | CIRCLE/UNCLOSED_ARC |
| Cadence | IsClockwise      | Arc     | TRUE/FALSE |
| Both | InComponent      | Track/Arc     | Object Part of Component Footprint (0=False,-1=True) |
| Cadence | LineType      | Track     | vertical/horizontal/odd |
| Cadence | Justify      | Des     | Text Justification (CENTER,) |
| Cadence | IsMirror      | Des     | Text is mirrored (TRUE/FALSE) |

Notes: 
- units in mils
- Pin Type varies for each row (Pin/Net/PinX/PinY), but Component data is the duplicate info across these rows (x/y/L/R/T/B/Rotation/Layer).

In [None]:
# Combine all csv datasets
df_list = []
for i, file in enumerate(data_files):
    df = pd.read_csv(file)
    df['Board'] = i
    
    # Verification
    assert all([t in set(df.Type) for t in ['pin', 'slk-des']]), 'Missing data type.'
    
    df_list.append(df)

df = pd.concat(df_list, axis=0, ignore_index=True)

### Separate Tracks/Arcs from the main dataset

### Split Info Column in to Multiple Columns

In [None]:
dt = df.loc[df.Type.isin(['slk-trk','slk-arc'])] # Track/Arc Dataset
df = df.loc[~df.Type.isin(['slk-trk','slk-arc'])] # Main Dataset

In [None]:
def split_info(df):
    info_cols = set([i.split(':')[0] for row in df.Info for i in row.split(';') if i.split(':')[0] != ''])
    print(f'Info Columns: {info_cols}')

    def get_info(col, info, D=';'):
        info_dict = {i.split(':')[0]:i.split(':')[-1] for i in info.split(D)}
        return info_dict.get(col, 'NA')

    for c in info_cols:
        df[c] = df.Info.apply(lambda i: get_info(c, i))

    df.drop(columns=['Info'], inplace=True)
    
    return df

In [None]:
dt = split_info(dt)
    
dt.drop(columns=['Rotation'], inplace=True)
dt.rename(columns={'L':'x1','R':'x2','T':'y1','B':'y2'}, inplace=True)

In [None]:
# df.loc[df.Info.isna(), 'Info'] = '' # Convert NaN to empty
# tmp = split_info(df.copy()) # Don't need the info cols for df, so just hold in tmp dataframe

# print('\nVisually verify each original layer is grouped in to the correct top or bottom layer group.')
# print('If it gets it wrong, this is defined in the cadence export script')
# tmp.loc[(tmp.Tool=='cadence'), ['Layer', 'OriginalLayer']].drop_duplicates(subset=['Layer','OriginalLayer'])

In [None]:
#del tmp

### Drop unmanufacturable track widths
note: I don't know how cadence handles silk track visibility in gerbers because not all silk tracks make it to the gerber. So I'm currently assuming anything less than 4 mils is not in the final gerber.

In [None]:
# MIN_SLK_TRK_WIDTH = 4 # mils
#dt = dt.loc[(dt.Width.astype(float) > MIN_SLK_TRK_WIDTH) | (dt.Width.astype(float) == 0)]

In [None]:
orig_len = len(df)

# Drop duplicated silkscreen designators (I dont know a clean way to know which silk to use if duplicates exist)
df = df[~((df.Type == 'slk-des') & df.duplicated(subset=['Board','Type','Designator','Layer']))]
print(f'Dropped {orig_len - len(df)} duplicates.')

In [None]:
# Verification
assert len(df.loc[(df.Type=='slk-des') & (df.duplicated(subset=['Board','Type','Designator','Layer']))]) == 0, "Duplicate Silk Rows"

row_cnts = df.groupby(by=['Board']).Designator.count().tolist()
if len(set(df.Board)) > 1:
    assert all([row_cnts[0] == r for r in row_cnts]) == False, 'Each board has the same number of data rows, something is probably wrong with the input data.'

In [None]:
# I'm not going to use Pin Info for this notebook (will merge duplicate pin info in to component info)
df = df[~((df.Type == 'pin') & df.duplicated(subset=['Board','Type','Designator','Layer']))]

df.drop(columns=['Info'], inplace=True)
df.loc[df.Type == 'pin', 'Type'] = 'cmp'

print(len(df))

# Clean Data

In [None]:
def print_feature_distribution(feature_data, idx_to_key=None):
    probs_dict = {}
    for i, v in enumerate(list(set(feature_data))):
        probs_dict[v] = sum(feature_data == v)/len(feature_data)

    if idx_to_key:
        keys = [str(idx_to_key[i]) for i in probs_dict.keys()]
    else:
        keys = [str(k) for k in probs_dict.keys()]
        
    fig, ax = plt.subplots()
    ax.bar(keys, probs_dict.values())
    plt.show()

def categorical_encoding(data_list, thresh=0.01, drop=False):
    """
    drop: False='data values below thresh will be added to OTHER key.'
          True='data values below thresh will be excluded from encoding'
    """
    cnt = Counter(data_list) 
    valid = [k for k, v in cnt.most_common() if v/sum(cnt.values()) > thresh]
    
    if drop:
        key_to_idx = {c:i for i, c in enumerate(sorted(valid), 0)}
    else:
        key_to_idx = {} if len(cnt) == len(valid) else {'OTHER': 0}
        key_to_idx.update({c:i for i, c in enumerate(sorted(valid), len(key_to_idx))})
    
    idx_to_key = {v:k for k, v in key_to_idx.items()}
    
    if drop:
        encoding = [key_to_idx[d] for d in data_list if d in key_to_idx.keys()]
    else:
        encoding = [key_to_idx.get(d, 0) for d in data_list]
    
    return encoding, key_to_idx, idx_to_key

In [None]:
encoding, tool_to_class, class_to_tool = categorical_encoding(df.Tool)
df['X_Tool'] = encoding
df.drop(columns=['Tool'], inplace=True)

dt['X_Tool'] = [tool_to_class[t] for t in dt.Tool]
dt.drop(columns=['Tool'], inplace=True)

# Verification
assert set(df.X_Tool) == set(tool_to_class.values()), 'encoding mismatch'

print_feature_distribution(df.X_Tool, idx_to_key=class_to_tool)

In [None]:
# Create a categorical feature from the refdes alphabetic prefix

def create_refdes_class(refdes):
    return ''.join([c for c in refdes if c.isalpha()])

df['R_DES'] = df.Designator.apply(create_refdes_class) 
print_feature_distribution(df.R_DES)

df['X_DES'], refdes_to_class, class_to_refdes = categorical_encoding(df.R_DES)
df.drop(columns='R_DES', inplace=True)

# Verification
assert set(df.X_DES) == set(refdes_to_class.values()), 'encoding mismatch'

print_feature_distribution(df.X_DES, idx_to_key=class_to_refdes)

In [None]:
# Track & Arc Designator Feature
dt.loc[dt.Designator.isna(), 'Designator'] = 'OTHER'
dt['R_DES'] = dt.Designator.apply(create_refdes_class) 
dt['X_DES'] = [refdes_to_class.get(r, 0) for r in dt.R_DES]
dt.drop(columns='R_DES', inplace=True)

print_feature_distribution(dt.X_DES, idx_to_key=class_to_refdes)

In [None]:
print_feature_distribution(df.Layer)

encoding, layer_to_class, class_to_layer = categorical_encoding(df.Layer, drop=True)
df = df.loc[df.Layer.isin(layer_to_class.keys())].copy()
df['X_Layer'] = encoding
df.drop(columns=['Layer'], inplace=True)

# Verification
assert set(df.X_Layer) == set(layer_to_class.values()), 'encoding mismatch'

print_feature_distribution(df.X_Layer, idx_to_key=class_to_layer)

In [None]:
# Track & Arc Layer Categorical Feature
dt = dt.loc[dt.Layer.isin(layer_to_class.keys())].copy()
dt['X_Layer'] = [layer_to_class[l] for l in dt.Layer]
dt.drop(columns=['Layer'], inplace=True)

print_feature_distribution(dt.X_Layer, idx_to_key=class_to_layer)

In [None]:
# InComponent Categorical Feature
dt['X_InCmp'] = [0 if c in [0,'0'] else 1 for c in dt.InComponent]
dt.drop(columns=['InComponent'], inplace=True)

In [None]:
def round_rotations(rotation, round_degree=15):
    rotation = rotation - 360 if rotation >= 360 else rotation
    return int(np.round(rotation/round_degree)*round_degree)

df.Rotation = df.Rotation.apply(round_rotations)
print_feature_distribution(df.Rotation)

orig_len = len(df)

encoding, rotation_to_class, class_to_rotation = categorical_encoding(df.Rotation, drop=False, thresh=0.001)
#df = df.loc[df.Rotation.isin(rotation_to_class.keys())].copy()
df['XY_ROT'] = encoding
df.drop(columns=['Rotation'], inplace=True)

#print(f'Dropped {orig_len - len(df)} rows w/ infrequent rotations.')
print_feature_distribution(df.XY_ROT, idx_to_key=class_to_rotation)

In [None]:
# Separate silk and component types in to their own dataframes
df_s = df.copy().loc[df.Type == 'slk-des']
df_c = df.copy().loc[df.Type == 'cmp']

# Silk Rotation is an output feature
df_s.rename(columns={'XY_ROT': 'Y_ROT'}, inplace=True)

# Cmp Layer & Rotation are input features instead of Output Labels
df_c.rename(columns={'XY_ROT': 'X_ROT'}, inplace=True)

In [None]:
orig_len = len(df_s)

# Verification
assert set(df_s.Board) == set(df_c.Board), 'Missing data.'

# Remove rows where designator is not in both silkscreen & component datasets
for b in set(df_s.Board):
    s_des = set(df_s[df_s.Board==b].Designator)
    c_des = set(df_c[df_c.Board==b].Designator)
    
    rem = s_des.difference(c_des)
    df_s = df_s.drop(df_s[(df_s.Board==b) & (df_s.Designator.isin(rem))].index)
    
    if len(rem) > 0:
        print(f'Board: {b}, {data_files[b]}')
        print(rem)
        
print(f'Removed {orig_len-len(df_s)} rows. Best case is if this is 0.')

In [None]:
# Split track & arc datasets
da = dt.loc[dt.Type == 'slk-arc']
dt = dt.loc[dt.Type == 'slk-trk']

drops = [c for c in ['Length','Type','LineType'] if c in da.columns]
da.drop(columns=drops, inplace=True) # Remove track specific data from arc dataframe
drops = [c for c in ['StartAngle','EndAngle','Radius','Type','isCircle','isClockwise'] if c in dt.columns]
dt.drop(columns=drops, inplace=True) # ditto for arc data

# Set cadence start/end angles & length to 0 since cadence doesn't have an equivalent. 
# This is a temporary solution since this data isn't currently being used anyway
if 'StartAngle' in da.columns:
    da.loc[(da.StartAngle == 'NA'), 'StartAngle'] = 0
    da.loc[(da.EndAngle == 'NA'), 'EndAngle'] = 0
    dt.loc[(dt.Length == 'NA'), 'Length'] = 0

In [None]:
# Create node indexes
for b in set(df.Board):
    df_s.loc[df_s.Board==b, 'idx'] = [i for i in range(len(df_s.loc[df_s.Board==b]))]
    df_c.loc[df_c.Board==b, 'idx'] = [i for i in range(len(df_c.loc[df_c.Board==b]))]
    da.loc[da.Board == b, 'idx'] = [i for i in range(len(da.loc[da.Board==b]))]
    dt.loc[dt.Board == b, 'idx'] = [i for i in range(len(dt.loc[dt.Board==b]))]
    
df_s.idx = df_s.idx.astype(int)
df_c.idx = df_c.idx.astype(int)
da.idx = da.idx.astype(int)
dt.idx = dt.idx.astype(int)

### Normalize

In [None]:
SCALER = 10000

# Component Data
for c in ['x','y','L','R','T','B']:
    df_c[c] = df_c[c]/SCALER

# Silkscreen Data
df_s['W'] = abs(df_s['R'] - df_s['L'])/SCALER
df_s['H'] = abs(df_s['T'] - df_s['B'])/SCALER
df_s.drop(columns=['L','R','T','B','Type'], inplace=True)

# Track Data
for c in ['x','y','x1','x2','y1','y2','Width','Length']:
    if c in dt.columns:
        dt[c] = dt[c].astype(float)/SCALER
    
# Arc Data
for c in ['x','y','x1','x2','y1','y2','Width','Radius']:
    if c in da.columns:
        da[c] = da[c].astype(float)/SCALER

if 'StartAngle' in da.columns:
    da.StartAngle = da.StartAngle.astype(float)/360
    da.EndAngle = da.EndAngle.astype(float)/360

# Not sure if it's better to scale the output predictions
# df_s['x'] = df_s['x']/SCALER
# df_s['y'] = df_s['y']/SCALER

### Define Features

In [None]:
cmp_features = ['X_Tool','X_DES','X_Layer','X_ROT','x','y','L','R','T','B']
slk_features = ['X_Tool','X_DES','W','H']
trk_features = ['X_Tool','X_DES','X_Layer','x','y','x1','x2','y1','y2','Width']
arc_features = ['X_Tool','X_DES','X_Layer','x','y','x1','x2','y1','y2','Width','StartAngle','EndAngle','Radius']
slk_lbls = ['Y_ROT','x','y']

# Categorical Pin Features
categorical_cmp_features = [c for c in df_c.columns if 'int' in str(df_c[c].dtype) and c in cmp_features]
categorical_slk_features = [c for c in df_s.columns if 'int' in str(df_s[c].dtype) and c in slk_features]
categorical_trk_features = [c for c in dt.columns if 'int' in str(dt[c].dtype) and c in trk_features]
categorical_arc_features = [c for c in da.columns if 'int' in str(da[c].dtype) and c in arc_features]

# Non-Categorical Features
continuous_cmp_features = [c for c in df_c.columns if c not in categorical_cmp_features and c in cmp_features]
continuous_slk_features = [c for c in df_s.columns if c not in categorical_slk_features and c in slk_features]
continuous_trk_features = [c for c in dt.columns if c not in categorical_trk_features and c in trk_features]
continuous_arc_features = [c for c in da.columns if c not in categorical_arc_features and c in arc_features]

assert all([cmp_features[i] in categorical_cmp_features for i, f in enumerate(categorical_cmp_features)]), 'Categorical Features must be first in the feature columns followed by numerical features'
assert all([slk_features[i] in categorical_slk_features for i, f in enumerate(categorical_slk_features)]), 'Categorical Features must be first in the feature columns followed by numerical features'
assert all([trk_features[i] in categorical_trk_features for i, f in enumerate(categorical_trk_features)]), 'Categorical Features must be first in the feature columns followed by numerical features'
assert all([arc_features[i] in categorical_arc_features for i, f in enumerate(categorical_arc_features)]), 'Categorical Features must be first in the feature columns followed by numerical features'

# Class count for each categorical feature
categorical_dict = {c: len(set(df_c[c])) for c in categorical_cmp_features}
categorical_dict.update({c: len(set(df_s[c])) for c in categorical_slk_features}) 

print('Please verify the following are continuous features:')
print(f'Component Features: {continuous_cmp_features}')
print(f'Silk Features: {continuous_slk_features}')
print(f'Track Features: {continuous_trk_features}')
print(f'Arc Features: {continuous_arc_features}')
print('\n...and the following are categorical features:')
print(f'Component Features: {categorical_cmp_features}')
print(f'Silk Features: {categorical_slk_features}')
print(f'Track Features: {categorical_trk_features}')
print(f'Arc Features: {categorical_arc_features}')

print()
print('Categorical Dictionary: ', categorical_dict)

### Export preprocessing data

In [None]:
preprocessing = {'tool_to_class': tool_to_class,
                 'class_to_tool': class_to_tool,
                 'rotation_to_class':rotation_to_class,
                 'class_to_rotation':class_to_rotation, 
                 'refdes_to_class': refdes_to_class, 
                 'class_to_refdes':class_to_refdes, 
                 'layer_to_class':layer_to_class, 
                 'class_to_layer':class_to_layer, 
                 'categorical_dict':categorical_dict,
                 'cmp_features':cmp_features,
                 'slk_features':slk_features,
                 'trk_features':trk_features,
                 'arc_features':arc_features,
                 'slk_lbls':slk_lbls,
                 'categorical_cmp_features':categorical_cmp_features,
                 'categorical_slk_features':categorical_slk_features,
                 'categorical_trk_features':categorical_trk_features,
                 'categorical_arc_features':categorical_arc_features,
                 'scaler': SCALER} 


pickle.dump(preprocessing, open('preprocessing.pkl', 'wb'))

### Create node list

In [None]:
# Verification
good = []
for b in set(df.Board):
    good += [all([i == idx for i, idx in enumerate(df_s.loc[(df_s.Board==b),'idx'])])]
    good += [all([i == idx for i, idx in enumerate(df_c.loc[(df_c.Board==b),'idx'])])]

assert all(good), 'Non continguous index'

In [None]:
from graph import get_cmp_edge_idx, get_cmp_slk_edge_idx, get_split_mask, get_cmp_trk_edge_idx, get_trk_edge_idx
from tqdm import tqdm
from torch_geometric.data import HeteroData

LOAD_DATASET = True
dataset_filename = 'dataset.pkl'

def create_graph(s, c, t):
    data = HeteroData()

    # Define Nodes
    data['cmp'].x = torch.tensor(c[cmp_features].values, dtype=torch.float)
    data['slk'].x = torch.tensor(s[slk_features].values, dtype=torch.float)
    data['trk'].x = torch.tensor(t[trk_features].values, dtype=torch.float)
    #data['arc'].x = torch.tensor(a[arc_features].values, dtype=torch.float)
    data['slk'].y = torch.tensor(s[slk_lbls].values, dtype=torch.float)

    # Define Edges
    data['cmp','cmp-slk','slk'].edge_index = get_cmp_slk_edge_idx(c, s, ['cmp','cmp-slk','slk'])
    data['cmp','cmp-cmp','cmp'].edge_index = get_cmp_edge_idx(c, n=5)
    data['cmp','cmp-trk','trk'].edge_index = get_cmp_trk_edge_idx(c, t, ['cmp','cmp-trk','trk'])
    data['trk','trk-trk','trk'].edge_index = get_trk_edge_idx(t)
    #data['cmp','cmp-arc','arc'].edge_index = get_cmp_trk_edge_idx(c, a, ['cmp','cmp-arc','arc'])
    
    return data

if LOAD_DATASET and os.path.exists(dataset_filename):
    dataset, test_dataset = pickle.load(open(dataset_filename, 'rb'))
else:
    dataset, test_dataset = [], []
    for b in tqdm(set(df.Board)):
        s = df_s.loc[df_s.Board==b] # Silkscreen Data For given Board b
        c = df_c.loc[df_c.Board==b] # Component Data For given Board b
        t = dt.loc[dt.Board==b] # Track Data For given Board b
        #a = da.loc[da.Board==b] # Arc Data For given Board b
        
        if data_files[b] in test_files:
            # Test dataset (make predictions on this data and export to csv)
            test_dataset += [create_graph(s, c, t)]
        else:
            # Train dataset
            dataset += [create_graph(s, c, t)]
    
    pickle.dump((dataset, test_dataset), open(dataset_filename, 'wb'))

In [None]:
# Verification
for data in dataset:
    slk_len = data.x_dict['slk'].shape[0]
    cmp_len = data.x_dict['cmp'].shape[0]
    max_cmp_idx = max(data.edge_index_dict[('cmp','cmp-slk','slk')][0].tolist())
    max_slk_idx = max(data.edge_index_dict[('cmp','cmp-slk','slk')][1].tolist())
    assert max_slk_idx < slk_len, 'Silk Error'
    assert max_cmp_idx < cmp_len, 'Cmp Error'
    
    max_cmp_idx1 = max(data.edge_index_dict[('cmp','cmp-cmp','cmp')][0].tolist())
    max_cmp_idx2 = max(data.edge_index_dict[('cmp','cmp-cmp','cmp')][1].tolist())
    assert max_cmp_idx1 < cmp_len, 'Cmp Error2'
    assert max_cmp_idx2 < cmp_len, 'Cmp Error3'

In [None]:
from torch_geometric.loader import DataLoader
from torch_geometric import seed_everything

seed_everything(457301994)

train_loader = DataLoader(dataset[1:], batch_size=4, shuffle=True)
val_loader = DataLoader(dataset[:1], batch_size=1, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

In [None]:
# Model defined in external file
from model import model, gnn_type, actFunc, dropout, gnn_channels, hidden_channels

gnn_str = ''.join([c for c in str(gnn_type).split('.')[-1] if c.isalnum()])
act_str = ''.join([c for c in str(actFunc).split('.')[-1] if c.isalnum()])
layers = f'lyrs{str(model).count("HeteroConv")}'

print(model)

In [None]:
LOAD_MODEL = False # Meaning load trained model
MODEL_SAVE_DIR = 'models'
MODEL_SAVE_NAME = 'model.pth'
if not os.path.isdir(MODEL_SAVE_DIR):
    os.mkdir(MODEL_SAVE_DIR)
model_save_path = os.path.join(MODEL_SAVE_DIR, MODEL_SAVE_NAME)
state_dict_path = os.path.join(MODEL_SAVE_DIR, 'state_dict.pth')

if LOAD_MODEL:
    model = torch.load(model_save_path)
    model.load_state_dict(torch.load(state_dict_path)['model_state_dict'])
    
model.to(device)

In [None]:
lr = 0.0001
wd = 0.01
note = f'wd{wd}'

model_key = f'{gnn_str}_{lr}_{len(dataset)}_{act_str}_{dropout}_{gnn_channels}_{hidden_channels}_{layers}_{note}'.strip('_')        
print('Model Name:')
print(model_key)

In [None]:
from predict import export_predictions
import torch.nn.functional as F

optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=wd)  # Define optimizer.

EXPORT_PREDICTIONS = True # If true, make sure you have pcb datasets in the data/test/ directory

# Create dataframe from only test_data boards (pass to export_predictions)
test_board_idxs = [b for b in set(df_s.Board) if data_files[b] in test_files]
pr = df_s.loc[df_s.Board.isin(test_board_idxs)].copy()

def train():
    model.train()
    
    total_loss = 0
    for d in train_loader:  # Iterate over each mini-batch.
        d = d.to(device)

        _, rot_out, dx_out, dy_out = model(d.x_dict, d.edge_index_dict)  # Perform a single forward pass.
        dx_out = dx_out.reshape((-1,))
        dy_out = dy_out.reshape((-1,))
        
        rot_loss = F.cross_entropy(rot_out, d['slk'].y[:, slk_lbls.index('Y_ROT')].to(dtype=torch.long))
        
        dx_loss = F.mse_loss(dx_out, d['slk'].y[:, slk_lbls.index('x')])
        dy_loss = F.mse_loss(dy_out, d['slk'].y[:, slk_lbls.index('y')])

        loss = rot_loss + dx_loss + dy_loss
        total_loss += float(loss)

        optimizer.zero_grad()  # Clear gradients.
        loss.backward()  # Derive gradients.
        optimizer.step()  # Update parameters based on gradients.
        
    return total_loss

def test(mode='test'):
    model.eval()
    
    total_x_loss, total_y_loss, total_rot_loss = 0,0,0
    total_loss = 0
    for d in val_loader:
        d = d.to(device)
        
        embedding, rot_out, dx_out, dy_out = model(d.x_dict, d.edge_index_dict)
        dx_out = dx_out.reshape((-1,))
        dy_out = dy_out.reshape((-1,))

        dx_loss = F.mse_loss(dx_out, d['slk'].y[:, slk_lbls.index('x')])
        dy_loss = F.mse_loss(dy_out, d['slk'].y[:, slk_lbls.index('y')])
        
        rot_loss = F.cross_entropy(rot_out, d['slk'].y[:, slk_lbls.index('Y_ROT')].to(dtype=torch.long))
        
        loss = rot_loss + dx_loss + dy_loss
        
        
        total_x_loss += float(dx_loss)
        total_y_loss += float(dy_loss)
        total_rot_loss += float(rot_loss)
        total_loss += float(loss)
    
    return embedding, total_loss, total_rot_loss, total_x_loss, total_y_loss

loss_list = []
val_list = []
rot_loss_list = []
dx_loss_list = []
dy_loss_list = []
times = []
start_time = time()

if os.path.exists('model_compare.pkl'):
    model_dict = pickle.load(open('model_compare.pkl', 'rb'))
else:
    model_dict = {}
    
for epoch in range(1, 200000000):
    loss = train()
    
    if epoch % 1 == 0:
        embeddings, val_loss, rot_loss, dx_loss, dy_loss = test('val') 
        
        if np.isnan(rot_loss) or np.isnan(val_loss):
            break
        
        loss_list += [loss]
        val_list += [val_loss]
        rot_loss_list += [rot_loss]
        dx_loss_list += [dx_loss]
        dy_loss_list += [dy_loss]
        
        times += [time() - start_time]
    
        print(f'Epoch: {epoch:03d}, Loss: {loss:.0f}, Val: {val_loss:.0f}, Rot Loss: {rot_loss: .4f}')
        
    if epoch % 100 == 0:
        if EXPORT_PREDICTIONS and len(test_files) > 0:
            export_predictions(test_loader, model, pr, device=device, class_to_rotation=class_to_rotation,
                               export_filename=f'predictions_{epoch}.csv', 
                               export_subdir='data//predictions')
        
        model_dict[model_key] = (loss_list, val_list, (dx_loss_list, dy_loss_list), times)
        pickle.dump(model_dict, open('model_compare.pkl', 'wb'))
        
        pickle.dump({'loss': loss_list, 
                     'val': val_list,
                     'rot_loss': rot_loss_list,
                     'dx_loss': dx_loss_list,
                     'dy_loss': dy_loss_list,
                     'embeddings': [e.cpu() for e in embeddings]},
                    open('log_batch.pkl', 'wb'))
        
        torch.save(model, model_save_path)
        torch.save({'epoch': epoch, 'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(), 'loss': loss}, state_dict_path)