In [None]:
from analogainas.search_spaces.config_space import ConfigSpace
from analogainas.search_spaces.autoencoder.autoencoder_config_space import AutoEncoderConfigSpace
from analogainas.search_spaces.autoencoder.autoencoder_architecture import AutoEncoder
from analogainas.search_spaces.dataloaders.autoencoder_structured_dataset import AutoEncoderStructuredDataset
from analogainas.evaluators.evaluation_metrics import negative_mse_metric
from aihwkit.simulator.configs import InferenceRPUConfig
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import numpy as np

In [None]:
from analogainas.analog_helpers.analog_helpers import create_rpu_config
from aihwkit.simulator.configs import InferenceRPUConfig
from aihwkit.simulator.configs.utils import WeightClipType
from aihwkit.simulator.configs.utils import BoundManagementType
from aihwkit.simulator.presets.utils import PresetIOParameters
from aihwkit.inference.noise.pcm import PCMLikeNoiseModel
from aihwkit.inference.compensation.drift import GlobalDriftCompensation
from aihwkit.nn.conversion import convert_to_analog_mapped
from aihwkit.nn import AnalogSequential
from aihwkit.optim import AnalogSGD
import aihwkit.inference.noise.pcm as pcm

import aihwkit

print(aihwkit.__version__)

In [None]:
noise_model = pcm.PCMLikeNoiseModel()
optimal_rpu_config_dict = {'g_max': 256, 'tile_size': 64, 'dac_resolution': 128, 'adc_resolution': 256}
rpu_config = create_rpu_config()
optimal_rpu_config = create_rpu_config(g_max=optimal_rpu_config_dict['g_max'], tile_size=optimal_rpu_config_dict['tile_size'], dac_res=optimal_rpu_config_dict['dac_resolution'], adc_res=optimal_rpu_config_dict['adc_resolution'])

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
CS = AutoEncoderConfigSpace()

In [None]:
CS.get_hyperparameters()

In [None]:
CS.compute_cs_size()

In [None]:
configs = CS.sample_arch_uniformly(5)

optimal_config={'embedding_dim': 256, 'encoder_convblock1_depth': 1, 'encoder_convblock1_kernel_size': 5, 'encoder_convblock1_filters': 64, 'encoder_convblock1_stride': 1, 'encoder_convblock2_depth': 1, 'encoder_convblock2_kernel_size': 3, 'encoder_convblock2_filters': 32, 'encoder_convblock2_stride': 1, 'encoder_convblock3_depth': 1, 'encoder_convblock3_kernel_size': 7, 'encoder_convblock3_filters': 16, 'encoder_convblock3_stride': 2}
suboptimal_config= {'embedding_dim': 64, 'encoder_convblock1_depth': 3, 'encoder_convblock1_kernel_size': 5, 'encoder_convblock1_filters': 16, 'encoder_convblock1_stride': 1, 'encoder_convblock2_depth': 3, 'encoder_convblock2_kernel_size': 3, 'encoder_convblock2_filters': 8, 'encoder_convblock2_stride': 2, 'encoder_convblock3_depth': 3, 'encoder_convblock3_kernel_size': 7, 'encoder_convblock3_filters': 128, 'encoder_convblock3_stride': 1}

new_optimal_config ={'embedding_dim': 2048,
 'encoder_convblock1_depth': 1,
 'encoder_convblock1_kernel_size': 3,
 'encoder_convblock1_filters': 256,
 'encoder_convblock1_stride': 1,
 'encoder_convblock2_depth': 1,
 'encoder_convblock2_kernel_size': 7,
 'encoder_convblock2_filters': 256,
 'encoder_convblock2_stride': 1,
 'encoder_convblock3_depth': 3,
 'encoder_convblock3_kernel_size': 3,
 'encoder_convblock3_filters': 32,
 'encoder_convblock3_stride': 2,
                    }

In [None]:
sample_autoencoder = AutoEncoder(optimal_config, input_channels=3, input_size=(32,32))

In [None]:
configs[0]

In [None]:
def show_performance(_autoencoder,_dataloader, normalize=True):
    images, labels = next(iter(_dataloader))
    _autoencoder = _autoencoder.to(torch.device('cpu'))
    with torch.no_grad():
        reconstructions = _autoencoder(images)
    
    def show_image(img_tensor, title="", normalize=True):
        img = img_tensor.permute(1, 2, 0).cpu().numpy()
        if normalize:
            mean = np.array([0.4914, 0.4822, 0.4465])
            std = np.array([0.2023, 0.1994, 0.2010])
            img = (img * std) + mean
        img = np.clip(img, 0, 1)
        
        plt.imshow(img)
        plt.title(title)
        plt.axis('off')
    
    fig, axes = plt.subplots(nrows=2, ncols=8, figsize=(16, 4))
    
    for i in range(8):
        plt.sca(axes[0, i])
        show_image(images[i], title="Original")
        plt.sca(axes[1, i])
        show_image(reconstructions[i], title="Reconstructed", normalize=normalize)
    
    sample_autoencoder.to(torch.device('cpu'))
    negative_mse = negative_mse_metric(_dataloader, _autoencoder)
    print("Negative MSE: ", sum(negative_mse)/len(negative_mse))
    plt.tight_layout()
    plt.show()


In [None]:
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), 
                         (0.2023, 0.1994, 0.2010)) 
])

train_cifar_dataset = AutoEncoderStructuredDataset(
    torchvision.datasets.CIFAR10(root='./data', train=True, transform=transform, download=True)
)

dataloader = DataLoader(train_cifar_dataset, batch_size=8, shuffle=True)

test_cifar_dataset = AutoEncoderStructuredDataset(
    torchvision.datasets.CIFAR10(root='./data', train=False, transform=transform, download=True)
)

test_dataloader = DataLoader(test_cifar_dataset, batch_size=32, shuffle=True)

In [None]:
criterion = nn.MSELoss()
optimizer = optim.Adam(sample_autoencoder.parameters(), lr=1e-3)
optimizer = optim.SGD(sample_autoencoder.parameters(), lr=1e-3)

sample_autoencoder = sample_autoencoder.to(device)
epochs = 15
sample_autoencoder.train()

In [None]:
for epoch in range(epochs):
    for batch_idx, (images, _) in enumerate(dataloader):
        images = images.to(device)
        recon = sample_autoencoder(images)
        loss = criterion(recon, images)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch_idx % 50 == 0:
            print(f"Epoch [{epoch+1}/{epochs}], Step [{batch_idx}], Loss: {loss.item()}")

In [None]:
show_performance(sample_autoencoder, test_dataloader)

In [None]:
device = torch.device('cpu')
analog_sample_autoencoder = sample_autoencoder.to(torch.device('cpu'))
analog_sample_autoencoder.eval()
analog_sample_autoencoder = convert_to_analog_mapped(sample_autoencoder, rpu_config)

#analog_sample_autoencoder.drift_analog_weights(24 * 60 * 60 * 1)

In [None]:
print("One Day Performance")
show_performance(analog_sample_autoencoder, dataloader, normalize=True)

In [None]:
analog_sample_autoencoder.drift_analog_weights(24 * 60 * 60 * 30)

In [None]:
print("One Month Performance")
show_performance(analog_sample_autoencoder, test_dataloader)

In [None]:
device = torch.device('cpu')
analog_sample_autoencoder = sample_autoencoder.to(torch.device('cpu'))
analog_sample_autoencoder.eval()
analog_sample_autoencoder = convert_to_analog_mapped(sample_autoencoder, optimal_rpu_config)

analog_sample_autoencoder.drift_analog_weights(24 * 60 * 60 * 1)

In [None]:
print("One Day Performance (Optimal RPU)")
show_performance(analog_sample_autoencoder, test_dataloader)

In [None]:
analog_sample_autoencoder.drift_analog_weights(24 * 60 * 60 * 30)

In [None]:
print("One Month Performance (Optimal RPU)")
show_performance(analog_sample_autoencoder, test_dataloader)