## Import Libraries

In [2]:
import sys
import os
from pathlib import Path
import torch
sys.path.append(os.getcwd())
sys.path.append(str(Path(os.getcwd()).parent))
from utils import *
from configuration import *
import numpy as np
import pandas as pd

if not os.path.exists('./evaluation_date/'):
    os.makedirs('./evaluation_date/')

In [3]:
args = parser.parse_args([])
args.DEVICE = 'cpu'
args.task = 'temporal'
args.metric = 'temporal_acc'
args.SoftEva = True
args = FormulateArgs(args)

## Results on non-augmented test set

In [None]:
args.augment = False
args.augment

In [None]:
results = torch.zeros([15])

for ds in range(15):
        args.DATASET = ds
        seed = args.SEED
        test_loader , datainfo = GetDataLoader(args, 'test',  path='../dataset/')
        print(datainfo)

        modelname = f"pLF_data_{ds:02d}_{datainfo['dataname']}_seed_{seed:02d}.model"
        
        model_exist = os.path.isfile(f'./models/{modelname}')
        
        if model_exist:  
                          
            model = torch.load(f'./models/{modelname}', map_location=args.DEVICE)
            
            model.UpdateArgs(args)
            
            SetSeed(args.SEED)

            evaluator = Evaluator(args).to(args.DEVICE)

            for x,y in test_loader:
                X_test, y_test = x.to(args.DEVICE), y.to(args.DEVICE)
                start_time = time.time()
                model(X_test[:1, :, :])
                end_time = time.time()
                inference_time = end_time - start_time
                results[ds] = inference_time
                break

        else:
            results[ds] = float('nan')

In [8]:
np.savetxt(f"./evaluation1/inf_time_2Order_LPF_acc.txt", results.numpy(), delimiter="\t", fmt='%.5f')

## Count devices

In [None]:
results = torch.zeros([15, 3])

for ds in range(15):
        args.DATASET = ds
        seed = args.SEED
        test_loader , datainfo = GetDataLoader(args, 'test',  path='../dataset/')
        print(datainfo)

        modelname = f"pLF_data_{ds:02d}_{datainfo['dataname']}_seed_{seed:02d}.model"
        
        model_exist = os.path.isfile(f'./models/{modelname}')

        if model_exist:  
                          
            model = torch.load(f'./models/{modelname}', map_location=args.DEVICE)
            
            model.UpdateArgs(args)
            
            SetSeed(args.SEED)

            evaluator = Evaluator(args).to(args.DEVICE)

            state_dict = model.state_dict()
            num_theta = 0
            num_r = 0
            num_c = 0
            for name in state_dict:
                if "theta_" in name:
                    num_theta += state_dict[name].shape[0] * state_dict[name].shape[1]
                elif "R_" in name:
                    num_r +=1
                elif "C_" in name:
                    num_c += 1
            
            results[ds][0] = num_theta
            results[ds][1] = num_r
            results[ds][2] = num_c

        else:
            results[ds] = float('nan')

In [42]:
np.savetxt(f"./evaluation1/count_device_2Order_LPF_acc.txt", results.numpy(), delimiter="\t", fmt='%.5f')

In [None]:
results = torch.zeros([15,10,6])

for ds in range(15):
    args.DATASET = ds
    valid_loader, datainfo = GetDataLoader(args, 'valid', path='../dataset/')
    test_loader , datainfo = GetDataLoader(args, 'test',  path='../dataset/')
    print(datainfo)
    for seed in range(10):
        args.SEED = seed

        modelname = f"pLF_data_{ds:02d}_{datainfo['dataname']}_seed_{seed:02d}.model"
        
        model_exist = os.path.isfile(f'./models/{modelname}')

        if model_exist:  
                          
            model = torch.load(f'./models/{modelname}', map_location=args.DEVICE)
            
            model.UpdateArgs(args)
            
            SetSeed(args.SEED)

            evaluator = Evaluator(args).to(args.DEVICE)

            for x,y in valid_loader:
                X_valid, y_valid = x.to(args.DEVICE), y.to(args.DEVICE)
            for x,y in test_loader:
                X_test, y_test = x.to(args.DEVICE), y.to(args.DEVICE)

            acc_valid = evaluator(model, X_valid, y_valid)
            acc_test   = evaluator(model, X_test,  y_test)
            
            results[ds,seed,0] = acc_valid
            results[ds,seed,1] = acc_test
            
        else:
            results[ds,seed,:] = float('nan')

In [7]:
N_selected_seeds = 3
re_temp = torch.nan_to_num(results, nan=-10000.)
values, indices = torch.topk(re_temp[:,:,1], k=N_selected_seeds, dim=1)
mean_selected = torch.mean(torch.asarray(values), dim=1)
var_selected = torch.std(torch.asarray(values), dim=1)
selected_results = torch.cat([mean_selected.unsqueeze(-1), var_selected.unsqueeze(-1)], dim=1)

np.savetxt(f"./evaluation1/non_var_test_top_{N_selected_seeds}_2Order_LPF_acc.txt", selected_results.numpy(), delimiter="\t", fmt='%.5f')

In [44]:
N_selected_seeds = 3
re_temp = torch.nan_to_num(results, nan=-10000.)
values, indices = torch.topk(re_temp[:,:,3], k=N_selected_seeds, dim=1)
mean_selected = torch.mean(torch.asarray(values), dim=1)
var_selected = torch.std(torch.asarray(values), dim=1)
selected_results = torch.cat([mean_selected.unsqueeze(-1), var_selected.unsqueeze(-1)], dim=1)

np.savetxt(f"./evaluation1/non_var_test_top_{N_selected_seeds}_aug_LPF_acc.txt", selected_results.numpy(), delimiter="\t", fmt='%.5f')

## Results on augmented test set

In [None]:
args.augment = True
args.augment

## Proposed model

In [None]:
import PowerPrintedLearnableFilter as pNN
args.augment = True
all_results = []
results = torch.zeros([15,10,6])
for ds in range(15):
    args.DATASET = ds
    for seed in range(10):
        valid_loader, datainfo = GetDataLoader(args, 'valid', path='./dataset/')
        modelname = f"pLF_data_{ds:02d}_{datainfo['dataname']}_seed_{seed:02d}.model"
        model_exist = os.path.isfile(f'./ConstantLearnableFilters/models/{modelname}')
        print(model_exist)
        if model_exist:     
            model = torch.load(f'./ConstantLearnableFilters/models/{modelname}', map_location=args.DEVICE)
            model.UpdateArgs(args)
        break
    break


In [5]:
pgmin = 1e-7
def g_tilde(theta_):
    # scaled conductances
    g_initial = theta_.abs()
    g_min = g_initial.min(dim=0, keepdim=True)[0]
    scaler = pgmin / g_min
    return g_initial * scaler

Rmin = torch.tensor(1e5)    # 100kOhm
Rmax = torch.tensor(1e7)    # 10MOhm
Cmin = torch.tensor(1e-7)   # 100nF
Cmax = torch.tensor(1e-4)   # 100uF

def C(C_):
    C_true = torch.sigmoid(C_) * (Cmax - Cmin) + Cmin
    return C_true
        
def R(R_):  
    R_true = torch.sigmoid(R_) * (Rmax - Rmin) + Rmin
    return R_true

In [None]:
def MAC_power(self, x, y):
    # dimensions of x: (num_batch, B, D, T)
    # the features are in last dimension
    x = x.permute(0, 1, 3, 2)
    x_extend = torch.cat([x,
                          torch.ones([x.shape[0], x.shape[1], x.shape[2], 1]).to(self.device),
                          torch.zeros([x.shape[0], x.shape[1], x.shape[2], 1]).to(self.device)], dim=3)
    x_neg = self.INV(x_extend)
    x_neg[:, :, :, -1] = 0.
    F = x_extend.shape[0]
    V = x_extend.shape[1]
    E = x_extend.shape[2]
    M = x_extend.shape[3]
    N = y.shape[3]
    positive = self.theta_noisy.clone().detach().to(self.device)
    positive[positive >= 0] = 1.
    positive[positive < 0] = 0.
    negative = 1. - positive
    Power = torch.tensor(0.).to(self.device)
    for f in range(F):
        for v in range(V):
            for m in range(M):
                for n in range(N):
                    Power += self.g_tilde[m, n] * ((x_extend[f, v, :, m]*positive[f, v, m, n] +
                                                   x_neg[f, v, :, m]*negative[f, v, m, n])-y[f, v, :, n]).pow(2.).sum()
    Power = Power / E / V / F
    return Power

In [None]:
valid_loader, datainfo = GetDataLoader(args, 'valid', path='./dataset/')
test_loader , datainfo = GetDataLoader(args, 'test',  path='./dataset/')  
for x, y in test_loader:
    print(x.shape, y.shape)

In [7]:
# Function to pad shorter lists with None or empty strings
def pad_to_length(lists, target_length):
    return [lst + [None] * (target_length - len(lst)) for lst in lists]

In [49]:
import pandas as pd
import PrintedLearnableFilter as pNN
# 5uw * count_2nd_order
# sum over every g_tild (the valid ones)
# acts
# invs 

# sum over them
def document_model(model, dataset):
    # Initialize powers
    sum_inv_power = 0
    sum_act_power = 0
    sum_crossbar_power = 0
    sum_lF_power = 0
    
    LF_POWER = 5e-6
    
    # Initialize counters
    count_g = 0
    count_act = 0
    count_inv = 0
    count_R = 0
    count_C = 0
    count_secorder_filter = 0
    count_single_filter = 0

    # Initialize lists to store values for the Excel file
    theta_values = []
    g_tilde_values = []
    mac_values = []
    inv_power_values = []
    act_power_values = []
    r_values = []
    c_values = []

    # Simulate iterating over the model (assuming 'model' and 'g_tilde' are defined elsewhere)
    for name, module in model.model.named_modules():
        if name == '0_pLayer' or name == '1_pLayer' or name == '2_pLayer':
            for name_pl, module_pl in module.named_modules():
                if name_pl == 'model.0_MAC' or name_pl == 'model.2_MAC':
                    if hasattr(module_pl, 'theta_'):
                        n_neuron = module_pl.theta_.shape[1]
                        n_inp = module_pl.theta_.shape[0]
                        theta_ = module_pl.theta_
                        
                        nonzero = module_pl.theta.clone().detach().abs()
                        nonzero[nonzero > 0] = 1.
                        num_theta = nonzero.sum()
                        count_g += num_theta.item()
                        g_tilde_ = g_tilde(theta_)
                        
                        
                        g_tilde_[torch.isnan(g_tilde_)] = 0
                        g_tilde_[torch.isinf(g_tilde_)] = 0
                        sum_crossbar_power += g_tilde_.sum().item()

                        # Store theta and g_tilde values using detach to avoid the RuntimeError
                        theta_values.append(theta_.detach().numpy())
                        g_tilde_values.append(g_tilde_.detach().numpy())

                        for col in range(n_neuron):
                            mac_values.append(g_tilde_[:, col].detach().numpy())

                        if hasattr(module_pl, 'INV'):
                            inv_power_values.append(module_pl.INV.power.detach().item())
                            nonzero = module_pl.theta.clone().detach().abs()[:-2, :]
                            nonzero[nonzero > 0] = 1.
                            count_inv += nonzero.max(0)[0].sum().item()
                            sum_inv_power += module_pl.INV.power.detach().item()

                        if hasattr(module_pl, 'ACT'):
                            act_power_values.append(module_pl.ACT.power.detach().item())
                            positive = module_pl.theta.clone().detach()[:-2, :]
                            positive[positive >= 0] = 1.
                            positive[positive < 0] = 0.
                            negative = 1. - positive
                            count_act += negative.max(1)[0].sum().item()
                            sum_act_power += module_pl.ACT.power.detach().item()

                if name_pl == 'model.1_LF':
                    # Access the FilterGroups within '1_LF'
                    for group_name, filter_group in module_pl.FilterGroups.named_modules():
                        if hasattr(filter_group, 'FilterGroup'):
                            
                            for filter_name, sec_learnable_filter in filter_group.FilterGroup.named_modules():
                                if isinstance(sec_learnable_filter, pNN.SecOLearnableFilter):
                                    # count_secorder_filter += 1
                                    # print(count_secorder_filter)
                                    if hasattr(sec_learnable_filter, 'LearnableFilters'):
                                        num_lf = 0
                                        for name, learnable_filter in sec_learnable_filter.LearnableFilters.named_modules():
                                            if isinstance(learnable_filter, pNN.LearnableFilter):
                                                if hasattr(learnable_filter, 'R_') and hasattr(learnable_filter, 'C_'):
                                                    r = R(learnable_filter.R_).detach().numpy()
                                                    r_values.append(r)
                                                    nonzero = R(learnable_filter.R_).detach().abs()
                                                    nonzero[nonzero > 0] = 1.
                                                    num_r = nonzero.sum().item()
                                                    count_R += num_r

                                                    c_values.append(C(learnable_filter.C_).detach().numpy())
                                                    nonzero = C(learnable_filter.C_).detach().abs()
                                                    nonzero[nonzero > 0] = 1.
                                                    num_c = nonzero.sum().item()
                                                    count_C += num_c
                                                    
                                                    if num_r == 1 and num_c == 1:
                                                        count_single_filter += 1
                                                        num_lf += 1
                                                    if num_lf == 2:
                                                        count_secorder_filter += 1
                                            # print(num_lf)
                                            
    sum_lF_power = LF_POWER * count_secorder_filter
    
    sum_power = sum_inv_power + sum_act_power + sum_crossbar_power + sum_lF_power
                                                        

    # Get the max length of the lists
    max_length = max(len(theta_values), len(g_tilde_values), len(mac_values), len(inv_power_values), len(act_power_values), len(r_values), len(c_values))

    # Pad all lists to the same length
    theta_values, g_tilde_values, mac_values, inv_power_values, act_power_values, r_values, c_values, count_g_list,  count_inv_list, count_act_list, count_R_list, count_C_list, count_sec_filter_list, count_sin_filter_list, sum_inv_power_ls, sum_act_power_ls, sum_crossbar_power_ls, sum_lF_power_ls, sum_power_ls = pad_to_length([theta_values, g_tilde_values, mac_values, inv_power_values, act_power_values, r_values, c_values,
        [count_g], [count_inv], [count_act], [count_R], [count_C], [count_secorder_filter], [count_single_filter],
        [sum_inv_power], [sum_act_power], [sum_crossbar_power], [sum_lF_power], [sum_power]], max_length)

    # # Broadcast counters to match the length of the lists
    # count_g_list = [count_g] * max_length
    # count_inv_list = [count_inv] * max_length
    # count_act_list = [count_act] * max_length
    # count_R_list = [count_R] * max_length
    # count_C_list = [count_C] * max_length

    # Create a DataFrame to store the extracted values
    df = pd.DataFrame({
        'g_tilde Layer wise': [str(v) for v in g_tilde_values],
        'g_tilde MAC wise': [str(v) for v in mac_values],
        'INV Power': inv_power_values,
        'ACT Power': act_power_values,
        'R Values': [str(v) for v in r_values],
        'C Values': [str(v) for v in c_values],
        'Count g': count_g_list,
        'Count INV': count_inv_list,
        'Count ACT': count_act_list,
        'Count R': count_R_list,
        'Count C': count_C_list,
        'Count Sec Filter': count_sec_filter_list,
        'Count Single Filter': count_sin_filter_list,
        'Sum ACT Power': sum_act_power_ls,
        'Sum INV Power': sum_inv_power_ls,
        'Sum Crossbar Power': sum_crossbar_power_ls,
        'Sum LF Power': sum_lF_power_ls,
        'Sum Power': sum_power_ls
    })

    # Save to Excel
    file_path = f'./{dataset}_model_parameters_fixed.xlsx'
    df.to_excel(file_path, index=False)

    # Display DataFrame to the user
    # import ace_tools as tools; tools.display_dataframe_to_user(name="Model Parameters", dataframe=df)


In [None]:
document_model(model)

In [None]:
count_single_filter

In [None]:
'Theta Values', [str(v) for v in theta_values]

In [None]:
args.augment = True
all_results = []
results = torch.zeros([15,10,6])
for ds in range(15):
    args.DATASET = ds
    valid_loader, datainfo = GetDataLoader(args, 'valid', path='../dataset/')
    test_loader , datainfo = GetDataLoader(args, 'test',  path='../dataset/')  
    print(datainfo)
    for seed in range(10):
        args.SEED = seed


        modelname = f"pLF_data_{ds:02d}_{datainfo['dataname']}_seed_{seed:02d}.model"
        
        model_exist = os.path.isfile(f'./models/{modelname}')

        print(model_exist)
        
        
        if model_exist:  
                          
            model = torch.load(f'./models/{modelname}', map_location=args.DEVICE)
            model.UpdateArgs(args)

            SetSeed(args.SEED)

            evaluator = Evaluator(args).to(args.DEVICE)
            # baseline_evaluator = baselineEvaluator(baseline_args).to(baseline_args.DEVICE)

            for x,y in valid_loader:
                X_valid, y_valid = x.to(args.DEVICE), y.to(args.DEVICE)
            for x,y in test_loader:
                X_test, y_test = x.to(args.DEVICE), y.to(args.DEVICE)

            acc_valid = evaluator(model, X_valid, y_valid)
            acc_test   = evaluator(model, X_test,  y_test)

            results[ds,seed,0] = acc_valid
            results[ds,seed,1] = acc_test
            
            inference_time = 0
            
            for x,y in test_loader:
                X_test, y_test = x.to(args.DEVICE), y.to(args.DEVICE)
                start_time = time.time()
                model(X_test[:1, :, :])
                end_time = time.time()
                inference_time = end_time - start_time
                break
            results[ds, seed, 2] = inference_time
            
            # results[ds,seed,3] = area_test
            # results[ds,seed,4] = power_test
            
        else:
            results[ds,seed,:] = float('nan')
            
        temp_result = [datainfo['dataname'], seed, results[ds, seed, 0].item(), results[ds, seed, 1].item(), results[ds, seed, 2].item()]
        all_results.append(temp_result)

columns = ['dataset', 'seed', 'acc_valid', 'acc_test', 'average_time']
all_results.sort(key=lambda x: x[0])            
df = pd.DataFrame(all_results, columns=columns)
# Save the DataFrame to an Excel file
if not os.path.exists(f"./evaluation2_date/"):
    os.makedirs(f"./evaluation2_date/")
excel_filename = f"./evaluation2_date/augment_{args.augment}_proposed_evaluation_results_analysis.xlsx"
df.to_excel(excel_filename, index=False)
print(f"Results have been saved to {excel_filename}")

In [37]:
N_selected_seeds = 3
re_temp = torch.nan_to_num(results, nan=-10000.)
values, indices = torch.topk(re_temp[:,:,1], k=N_selected_seeds, dim=1)
mean_selected = torch.mean(torch.asarray(values), dim=1)
var_selected = torch.std(torch.asarray(values), dim=1)
selected_results = torch.cat([mean_selected.unsqueeze(-1), var_selected.unsqueeze(-1)], dim=1)

np.savetxt(f"./evaluation1/var_test_top_{N_selected_seeds}_var_LPF_acc.txt", selected_results.numpy(), delimiter="\t", fmt='%.5f')

In [38]:
N_selected_seeds = 3
re_temp = torch.nan_to_num(results, nan=-10000.)
values, indices = torch.topk(re_temp[:,:,3], k=N_selected_seeds, dim=1)
mean_selected = torch.mean(torch.asarray(values), dim=1)
var_selected = torch.std(torch.asarray(values), dim=1)
selected_results = torch.cat([mean_selected.unsqueeze(-1), var_selected.unsqueeze(-1)], dim=1)

np.savetxt(f"./evaluation1/var_test_top_{N_selected_seeds}_aug_LPF_acc.txt", selected_results.numpy(), delimiter="\t", fmt='%.5f')

In [None]:
args.augment = True
all_results = []
results = torch.zeros([15,10,6])
for ds in range(9):
    args.DATASET = ds
    valid_loader, datainfo = GetDataLoader(args, 'valid', path='../dataset/')
    test_loader , datainfo = GetDataLoader(args, 'test',  path='../dataset/')  
    print(datainfo)
    for seed in range(10):
        args.SEED = seed


        modelname = f"pLF_data_{ds:02d}_{datainfo['dataname']}_seed_{seed:02d}.model"
        
        model_exist = os.path.isfile(f'./models/{modelname}')
        # aug_model_exist = os.path.isfile(f'../AugLearnableFilters/models/{modelname}')
        # baseline_model_exist = os.path.isfile(f'../LastBaseline/LearnableFilters/models/{modelname}')
        
        # print(model_exist and aug_model_exist and baseline_model_exist)
        print(model_exist)
        
        
        if model_exist:  
                          
            model = torch.load(f'./models/{modelname}', map_location=args.DEVICE)
            # aug_model = torch.load(f'../AugLearnableFilters/models/{modelname}', map_location=args.DEVICE)
            # baseline_model = torch.load(f'../LastBaseline/LearnableFilters/models/{modelname}', map_location=baseline_args.DEVICE)
            
            model.UpdateArgs(args)
            # aug_model.UpdateArgs(args)
            # baseline_model.UpdateArgs(baseline_args)
            
            SetSeed(args.SEED)

            evaluator = Evaluator(args).to(args.DEVICE)
            # baseline_evaluator = baselineEvaluator(baseline_args).to(baseline_args.DEVICE)

            for x,y in valid_loader:
                X_valid, y_valid = x.to(args.DEVICE), y.to(args.DEVICE)
            for x,y in test_loader:
                X_test, y_test = x.to(args.DEVICE), y.to(args.DEVICE)

            acc_valid = evaluator(model, X_valid, y_valid)
            acc_test   = evaluator(model, X_test,  y_test)
            
            # aug_acc_valid = evaluator(aug_model, X_valid, y_valid)
            # aug_acc_test   = evaluator(aug_model, X_test,  y_test)
            
            # baseline_acc_valid = baseline_evaluator(baseline_model, X_valid, y_valid)
            # baseline_acc_test   = baseline_evaluator(baseline_model, X_test,  y_test)

            results[ds,seed,0] = acc_valid
            results[ds,seed,1] = acc_test
            
            # results[ds,seed,2] = aug_acc_valid
            # results[ds,seed,3] = aug_acc_test
            
            # results[ds,seed,4] = baseline_acc_valid
            # results[ds,seed,5] = baseline_acc_test
        else:
            results[ds,seed,:] = float('nan')
            
        inference_time = 0
            
        for x,y in test_loader:
            X_test, y_test = x.to(args.DEVICE), y.to(args.DEVICE)
            start_time = time.time()
            model(X_test[:1, :, :])
            end_time = time.time()
            inference_time = end_time - start_time
            break
        results[ds, seed, 2] = inference_time
            
        temp_result = [datainfo['dataname'], seed, results[ds, seed, 0].item(), results[ds, seed, 1].item(), results[ds, seed, 2].item()]
        all_results.append(temp_result)

columns = ['dataset', 'seed', 'acc_valid', 'acc_test', 'average_time']
all_results.sort(key=lambda x: x[0])            
df = pd.DataFrame(all_results, columns=columns)
# Save the DataFrame to an Excel file
if not os.path.exists(f"./evaluation/"):
    os.makedirs(f"./evaluation/")
excel_filename = f"./evaluation/ELMAN_evaluation_results_analysis.xlsx"
df.to_excel(excel_filename, index=False)
print(f"Results have been saved to {excel_filename}")