Python Script for train and inference of an LGN and an FFN  

Inference and training functions based on /experiments/main.py from https://github.com/Felix-Petersen/difflogic
 
Author: Filipa Trindade

BOOLEAN ALGEBRA: FROM DIGITAL CIRCUITS TO DEEP LEARNING APPLICATIONS © 2024 by Filipa Trindade Coito is licensed under CC BY-SA 4.0

### Import libraries

In [None]:
# Import Libraries
import pandas as pd
import numpy as np
import plotly.graph_objects as go

import math
import random
import os
import time
import datetime
import json
import pickle
import statistics

import torch
import torchvision
from tqdm import tqdm
from copy import deepcopy

import torchvision.transforms as transforms
from PIL import Image
from torchvision.datasets import ImageFolder

from results_json import ResultsJSON
from difflogic import LogicLayer, GroupSum, CompiledLogicNet, PackBitsTensor
import difflogic_cuda

### Folder for training iterations and path to dataset

In [None]:
# Create a folder for each training iteration
# Define timestamp for the folder's name
time_string = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
# Folder for LGN trainings
path = 'trained_models' + '/' + time_string
if not os.path.exists(path):
    os.makedirs(path)
# Folder for FFN trainings
path_ffn = 'trained_models_ffn' + '/' + time_string
if not os.path.exists(path_ffn):
    os.makedirs(path_ffn)


# Path to the dataset
datset_path = 'dataset_new/'

### Dictionary of model parameters (LGN)

In [None]:
# Define dictonary of model parameters
args = {
        # Run ID
        'id': 10, 
        # Tau Parameter for GroupSum
        'tau': 20,
        # Batch size
        'batch_size': 4, 
        # Adam optimizer's learning rate
        'learning_rate': 0.0001, 
        # Max. n. of epochs
        'epochs': 200,
        # Early-stopping's patience criteria
        'patience': 4,
        # n. of neurons in each layer
        'num_neurons': 90000, 
        # n. of logic layers 
        'num_layers' : 4,
        # n. of pictures on the val. set
        'val_size': 316, 
        # n. of pictures on the test set
        'test_size': 316, 
        # n. of grey-levels (n_levels) 
        'num_niveis': 7, 
        # Entry dimensions for the pictures in the dataset (40x40)
        'img_size' : 40,
        #Seed
        'seed': 952
       }
# Creates the file log_args.txt and saves our args. dictionary
with open(os.path.join(path, 'log_args.txt'), 'w') as file:
    file.write(json.dumps(args))

### Define seed

In [None]:
# Define Seed
torch.manual_seed(args['seed'])
random.seed(args['seed'])
np.random.seed(args['seed'])

### Loading the pictures

In [None]:
# Loading and resize the train pictures
# Define a lambda function to subdivide the greyscale in a binary tensor with num_niveis elements 
trans_greyscale_in_binary = lambda x: torch.cat(
    [
        (x > ((i + 1) / (float(args['num_niveis']) + 1))).float() 
        for i in range(args['num_niveis'])
    ], dim=0)


# Define train pictures
train_images = ImageFolder(
    root=datset_path + '/train',
    transform=transforms.Compose([
        # Resize the pictures
        transforms.Resize((args['img_size'], args['img_size'])),  
        # Transform to greyscale
        transforms.Grayscale(num_output_channels=1),
        # Convert to tensor
        transforms.ToTensor(),
        # Applies the function previously defined
        torchvision.transforms.Lambda(trans_greyscale_in_binary)
    ])
)

train_images_ff = ImageFolder(
    root=datset_path + '/train',
    transform=transforms.Compose([
        transforms.Resize((args['img_size'], args['img_size'])),  
        transforms.Grayscale(num_output_channels=1),
        transforms.ToTensor()
    ])
)


# Define test
test_images = ImageFolder(
    root=datset_path + '/test',
    transform=transforms.Compose([
        transforms.Resize((args['img_size'], args['img_size'])),
        transforms.Grayscale(num_output_channels=1),
        transforms.ToTensor(),
        torchvision.transforms.Lambda(trans_greyscale_in_binary)
    ])
)


# Define test
test_images_ff = ImageFolder(
    root=datset_path + '/test',
    transform=transforms.Compose([
        transforms.Resize((args['img_size'], args['img_size'])),
        transforms.Grayscale(num_output_channels=1),
        transforms.ToTensor()
    ])
)


# Define validation
val_images = ImageFolder(
    root=datset_path + '/val',
    transform=transforms.Compose([
        transforms.Resize((args['img_size'], args['img_size'])),
        transforms.Grayscale(num_output_channels=1),
        transforms.ToTensor(),
        torchvision.transforms.Lambda(trans_greyscale_in_binary)
    ])
)


# Define validation
val_images_ff = ImageFolder(
    root=datset_path + '/val',
    transform=transforms.Compose([
        transforms.Resize((args['img_size'], args['img_size'])),
        transforms.Grayscale(num_output_channels=1),
        transforms.ToTensor()
    ])
)

In [None]:
# Transform the folders in Dataloaders
train_loader = torch.utils.data.DataLoader(train_images, batch_size=args['batch_size'], shuffle=True, drop_last=True, num_workers = 4)
test_loader  = torch.utils.data.DataLoader(test_images, batch_size=int(1e6), shuffle=False, num_workers = 4)
val_loader   = torch.utils.data.DataLoader(val_images, batch_size=int(1e6), shuffle=False, num_workers = 4)

results = ResultsJSON(eid=args['id'], path='./results/')

### Define the functions for train and inference

In [None]:
# Define the functions for train and inference
def train(model, x, y, loss_fn, optimizer):
    """
    A step of SGD
    """
    
    x = model(x)
    loss = loss_fn(x, y)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    return loss.item()

def eval(model, loader, mode):
    """
    Use the model to develop the mean of the output's tensor that 
    originates from applying the model to a set of pictures that come from the loaders

    Because we are dealing with a binary classification problem, the returned value 
    corresponds directly to the accuracy of the set.

    Taken from experiments/main.py from https://github.com/Felix-Petersen/difflogic
    """
    orig_mode = model.training
    with torch.no_grad():
        model.train(mode=mode)
        res = np.mean(
            [
                (model(x.to('cuda').round()).argmax(-1) == y.to('cuda')).to(torch.float32).mean().item()
                for x, y in loader
            ]
        )
        model.train(mode=orig_mode)
    return res.item()

def packbits_eval(model, loader):
    """
    Same thing as the eval but it uses an optimized version 
    of the input for a quicker inference.

    Taken from experiments/main.py from https://github.com/Felix-Petersen/difflogic
    """
    orig_mode = model.training
    with torch.no_grad():
        model.eval()
        res = np.mean(
            [
                (model(PackBitsTensor(x.to('cuda').reshape(x.shape[0], -1).round().bool())).argmax(-1) == y.to(
                    'cuda')).to(torch.float32).mean().item()
                for x, y in loader
            ]
        )
        model.train(mode=orig_mode)
    return res.item()

### LGN training

In [None]:
#####################
### LGN TRAINING  ###
#####################
# Define the model with the parameters in args
llkw = dict(grad_factor=1., connections='unique')

# Dimensions of the entry vector
in_dim = args['num_niveis']*args['img_size']*args['img_size']
# n. of classes
class_count = 2
# Initialize list of layers
logic_layers = []

# Architecture of the model
arch = 'randomly_connected'
k = args['num_neurons']
l = args['num_layers']

# Define the Flatten layer (1st layer) of the model
logic_layers.append(torch.nn.Flatten())
# Define the hidden layers, all the same with the parameters defined in args
logic_layers.append(LogicLayer(in_dim=in_dim, out_dim=k, **llkw))
for _ in range(l - 1):
    logic_layers.append(LogicLayer(in_dim=k, out_dim=k, **llkw))
# Add the GroupSum at the end
model = torch.nn.Sequential(
    *logic_layers,
    GroupSum(class_count, args['tau'])
)

total_num_neurons = sum(map(lambda x: x.num_neurons, logic_layers[1:-1]))
print(f'total_num_neurons={total_num_neurons}')
total_num_weights = sum(map(lambda x: x.num_weights, logic_layers[1:-1]))
print(f'total_num_weights={total_num_weights}')
if args['id'] is not None:
    results.store_results({
        'total_num_neurons': total_num_neurons,
        'total_num_weights': total_num_weights,
    })

model = model.to('cuda')
model.implementation = 'cuda'

print(model)

if args['id'] is not None:
    results.store_results({'model_str': str(model)})

# Define loss function
loss_fn = torch.nn.CrossEntropyLoss()
# Define optimizer
optim = torch.optim.Adam(model.parameters(), lr=args['learning_rate'])
# Print the n. of trainable parameters
pytorch_total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print('Total trainable params:' + str(pytorch_total_params))

In [None]:
# Define necessary variables for training
# Validation of previous epoch
val_anterior = -1.0
# Validation counter
val_counter = 0
# Initializes the list of validations to be saved during training
val_list = []
train_list = []
test_list = []
loss_list = []
seconds_list = []
cumu_training_seconds = 0
best_epoch = 0

# We loop through the whole range of epochs
for epoch in range(args['epochs']):

    start_time = time.time()
    print('Epoch: ' + str(epoch))
    # In each epoch we iterate through the whole training set 1 batch at the time
    for i, (x, y) in tqdm(
            enumerate(train_loader),
            desc='Epoch',
            total=len(train_loader),
    ):
        
        x = x.to(torch.float64).to('cuda')
        y = y.to('cuda')   
        # For each batch we compute the loss 
        loss = train(model, x, y, loss_fn, optim)

    cumu_training_seconds = cumu_training_seconds + (time.time() - start_time)
    seconds_list.append(round(cumu_training_seconds,0))

    print('Computing validation accuracy...')
    val_accuracy_eval_mode = eval(model, val_loader, mode=False)
    val_list.append(val_accuracy_eval_mode)
    train_accuracy_eval_mode = eval(model, train_loader, mode=False)
    train_list.append(train_accuracy_eval_mode)
    loss_list.append(loss)
    
    print('Loss: ' + str(np.round(loss,4)))
    print('Val Acc: ' + str(np.round(val_accuracy_eval_mode,4)))
    print('Train Acc: ' + str(np.round(train_accuracy_eval_mode,4)))
    
    # Compare the actual validation with the previous validation
    if (val_accuracy_eval_mode <= val_anterior) or (epoch == args['epochs']-1):
        # When the val_accuracy of these epoch is smaller than the previous one, the val_counter will be incremented by 1
        val_counter = val_counter + 1
        if (val_counter > args['patience']) or (epoch == args['epochs']-1):
            # Stops the train and gets the best model until now
            pickle.dump(best_model, open(os.path.join(path, 'epoch-{}'.format(best_epoch)), 'wb'))
            # Saves the log file
            with open(os.path.join(path, 'log_seconds.txt'), 'w') as file:
                file.write(json.dumps(seconds_list))  
            with open(os.path.join(path, 'log_val.txt'), 'w') as file:
                file.write(json.dumps(val_list)) 
            with open(os.path.join(path, 'log_train.txt'), 'w') as file:
                file.write(json.dumps(train_list))  
            # Terminate model training
            break
    else:
        # If the val_accuracy for the current epoch is larger than the one for the previous epoch, the pacience will be reseted
        val_counter = 0
        best_model = deepcopy(model)
        best_epoch = epoch

    val_anterior = val_accuracy_eval_mode if val_accuracy_eval_mode >= val_anterior else val_anterior
# Load the best model from file and evaluate on the test set
model = pickle.load(open(os.path.join(path, 'epoch-{}'.format(best_epoch)), 'rb'))
test_accuracy_eval_mode = eval(model, test_loader, mode=False)
print('Test Acc: ' + str(np.round(test_accuracy_eval_mode,4)))

In [None]:
# Code to load the best model from the folder and evaluate
#best_lgn_model = pickle.load(open(os.path.join('trained_models/20241117_125109/epoch-46'), 'rb'))
#eval(best_lgn_model, test_loader, mode=False)

In [None]:
# Create the figure for Chapter 8: evolution of train_acc and val_acc during training
# Sample data
x_values = [*range(0, len(val_list), 1)] # x-axis values
# Create traces for each curve
trace1 = go.Scatter(x = x_values, y = val_list , mode = 'lines', name = "Validation Accuracy")
trace2 = go.Scatter(x = seconds_list, y = train_list , mode = 'lines', name = "Train Accuracy", xaxis = "x2")
# Create the figure and add the traces
fig = go.Figure()
fig.add_trace(trace1)
fig.add_trace(trace2)
# Configure primary x-axis
fig.update_xaxes(
    title_text = "Epochs",
    side = "bottom"
)
# Configure secondary x-axis with a different scale, overlaid on the primary axis
fig.update_layout(
    xaxis2 = dict(
        title = "Elapsed time [s]",
        overlaying ="x",             # Overlays on the same position as the primary x-axis
        side ="top",                 # Displays at the top
        ticktext = seconds_list,     # Shows alternative values        
    ),
    yaxis_title = "Accuracy"
)
# Show plot
fig.write_image(os.path.join(path, 'val_train_acc.png'))

### FFN training

In [None]:
#####################
### FFN TRAINING  ###
#####################
# Define the network's parameters
args_ff = {
    'epochs': 200,
    'batch_size': 32,
    'hidden_dim': 1150,
    'num_layers': 4,
    'learning_rate' : 0.00001,
    'patience':5
    }

In [None]:
# Transform the files into Dataloaders
train_loader_ff = torch.utils.data.DataLoader(train_images_ff, batch_size=args_ff['batch_size'], shuffle=True, drop_last=True, num_workers = 4)
test_loader_ff  = torch.utils.data.DataLoader(test_images_ff, batch_size=int(1e6), shuffle=False, num_workers = 4)
val_loader_ff   = torch.utils.data.DataLoader(val_images_ff, batch_size=int(1e6), shuffle=False, num_workers = 4)

In [None]:
from torch import nn
from torch import optim

# Dimensions of the pictures in the dataset
dim_in = args['img_size']*args['img_size']
# n. of classes
num_classes = 2
# Define the layers of the FFN
ff_layers = []

ff_layers.append(nn.Flatten())

first_l = nn.Linear(dim_in, args_ff['hidden_dim'])
torch.nn.init.normal_(first_l.weight, mean=0.0, std=1.0)
torch.nn.init.zeros_(first_l.bias)

ff_layers.append(first_l)
ff_layers.append(nn.ReLU())

for _ in range(args_ff['num_layers'] - 1):
    lay = nn.Linear(args_ff['hidden_dim'], args_ff['hidden_dim'])
    torch.nn.init.normal_(lay.weight, mean=0.0, std=1.0)
    torch.nn.init.zeros_(lay.bias)
    ff_layers.append(lay)
    ff_layers.append(nn.ReLU())

last_l = nn.Linear(args_ff['hidden_dim'],num_classes)
torch.nn.init.normal_(last_l.weight, mean=0.0, std=1.0)
torch.nn.init.zeros_(last_l.bias)
ff_layers.append(last_l)

model = nn.Sequential(*ff_layers).to('cuda')
# Define the loss function
loss_fn = nn.CrossEntropyLoss()
# Define the Adam optimizer
optimizer = torch.optim.Adam(model.parameters(),lr=args_ff['learning_rate'])
# Print model
print(model)
# Print the n. of trainable parameters
pytorch_total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print('Total trainable params:' + str(pytorch_total_params))

In [None]:
# To calculate the accuracy of the network
def eval_fn(loader, model, mode):
    orig_mode = model.training
    with torch.no_grad():
        model.train(mode=mode)
        res = np.mean(
            [
                (model(x.to('cuda').float()).argmax(-1) == y.to('cuda')).to(torch.float32).mean().item()
                for x, y in loader
            ]
        )
        model.train(mode=orig_mode)
    return res.item()

# Define necessary variables for training
# Validation of previous epoch
val_anterior = -1.0
# Initializes the list of validations to be saved during training
val_list = []
train_list = []
seconds_list = []
cumu_training_seconds = 0
best_epoch = 0
# Val counter
val_counter = 0

for epoch in range(args_ff['epochs']):
    
    start_time = time.time()
    print('Epoch: ' + str(epoch))
    for i, (x, y) in tqdm(
            enumerate(train_loader_ff),
            desc='Epoch',
            total=len(train_loader_ff),
    ):
        x = x.to('cuda')
        y = y.to('cuda')    

        outputs = model(x.float())
        loss = loss_fn(outputs,y)

        optimizer.zero_grad()
        loss.backward()     
        nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()  

    cumu_training_seconds = cumu_training_seconds + (time.time() - start_time)
    seconds_list.append(round(cumu_training_seconds,0))
        
    val_accuracy_eval_mode = eval_fn(val_loader_ff, model, mode=False)
    val_list.append(val_accuracy_eval_mode)
    train_acc = eval_fn(train_loader_ff, model, mode=False)
    train_list.append(train_acc)

    # Compare the actual validation with the previous validation
    if (val_accuracy_eval_mode <= val_anterior) or (epoch == args_ff['epochs']-1):
        # When the val_accuracy of these epoch is smaller than the previous one, the val_counter will be incremented by 1
        val_counter = val_counter + 1
        if (val_counter > args_ff['patience']) or (epoch == args_ff['epochs']-1):
            # Stops the train and gets the best model until now
            pickle.dump(best_model, open(os.path.join(path_ffn, 'epoch-{}'.format(best_epoch)), 'wb'))
            # Saves the log file
            with open(os.path.join(path_ffn, 'log_val.txt'), 'w') as file:
                file.write(json.dumps(val_list)) 
            with open(os.path.join(path_ffn, 'log_args.txt'), 'w') as file:
                file.write(json.dumps(args_ff)) 
            with open(os.path.join(path_ffn, 'log_train.txt'), 'w') as file:
                file.write(json.dumps(train_list))  
            with open(os.path.join(path_ffn, 'log_seconds.txt'), 'w') as file:
                file.write(json.dumps(seconds_list))  
            break
    else:
        # If the val_accuracy for the current epoch is larger than the one for the previous epoch, the pacience will be reseted
        val_counter = 0
        best_model = deepcopy(model)
        best_epoch = epoch

    val_anterior = val_accuracy_eval_mode if val_accuracy_eval_mode >= val_anterior else val_anterior
    
    print(f'Loss: {loss.item():.4f}'.format(loss))
    print(f'Val Acc: {val_accuracy_eval_mode:.4f}'.format(val_accuracy_eval_mode))
    print(f'Train Acc: {train_acc:.4f}'.format(train_acc))

model = pickle.load(open(os.path.join(path_ffn, 'epoch-{}'.format(best_epoch)), 'rb'))
test_accuracy_eval_mode = eval_fn(test_loader_ff, model, mode=False)
print('Test Acc: ' + str(np.round(test_accuracy_eval_mode,4)))

In [None]:
# Code to load the best model from the folder and evaluate
#best_ffn_model = pickle.load(open(os.path.join('trained_models_ffn/20241117_125109/epoch-13'), 'rb'))
#eval_fn(test_loader_ff, best_ffn_model, mode=False)

In [None]:
# Create the figure for Chapter 8: evolution of train_acc and val_acc during training
# Sample data
x_values = [*range(0, len(val_list), 1)] # x-axis values
# Create traces for each curve
trace1 = go.Scatter(x = x_values, y = val_list , mode = 'lines', name = "Validation Accuracy")
trace2 = go.Scatter(x = seconds_list, y = train_list , mode = 'lines', name = "Train Accuracy", xaxis = "x2")
# Create the figure and add the traces
fig = go.Figure()
fig.add_trace(trace1)
fig.add_trace(trace2)
# Configure primary x-axis
fig.update_xaxes(
    title_text = "Epochs",
    side = "bottom"
)
# Configure secondary x-axis with a different scale, overlaid on the primary axis
fig.update_layout(
    xaxis2 = dict(
        title = "Elapsed time [s]",
        overlaying ="x",           # Overlays on the same position as the primary x-axis
        side ="top",               # Displays at the top
        ticktext = seconds_list,   # Shows alternative values       
    ),
    yaxis_title = "Accuracy"
)
# Show plot
fig.write_image(os.path.join(path_ffn, 'val_train_acc.png'))

Python Script for train and inference of an LGN and an FFN  

Inference and training functions based on /experiments/main.py from https://github.com/Felix-Petersen/difflogic
 
Author: Filipa Trindade

BOOLEAN ALGEBRA: FROM DIGITAL CIRCUITS TO DEEP LEARNING APPLICATIONS © 2024 by Filipa Trindade Coito is licensed under CC BY-SA 4.0