# Map ports 

In [None]:
import sys
with open("./../../PATHS.txt") as file:
  paths = file.read().splitlines()
sys.path.extend(paths)

In [None]:
import os
import torch
import numpy as np
import scipy.sparse as sp

from dd_nm_rom import backend as bkd
from dd_nm_rom.ops import map_nested_dict

In [None]:
path_to_models = "/Users/zanardi1/Workspace/Codes/DD-NM-ROM/run/data/trained_nets/nx_480_ny_24_mu_0.1_2x_by_2y"

In [None]:
class CFG(object):

  def __init__(self, cfg) -> None:
    for k, v in cfg.items():
      setattr(self, k, v)

In [None]:
# cfg = {
#   'encoder': self.best_encoder,
#   'decoder': self.best_decoder,
#   'mask': self.mask,
#   'scale': self.scale,
#   'ref': self.ref,
#   'input_dim': self.input_dim,
#   'latent_dim': self.latent_dim,
#   'output_dim': self.output_dim,
#   'encoder_hidden': self.encoder_hidden,
#   'decoder_hidden': self.decoder_hidden,
#   'act_type': self.act_type,
#   'train_time': train_time,
#   'epoch': epoch,
#   'early_stop_counter': early_stop_counter,
#   'valid_loss_hist': valid_loss_hist,
#   'train_loss_hist': train_loss_hist,
#   'best_test_loss': best_test_loss,
#   'best_train_loss': best_train_loss,
#   'best_loss_epoch': best_loss_epoch
# }

In [None]:
def convert(cfg):
  config_common = {
    "ref": bkd.to_numpy(cfg.ref).reshape(-1),
    "scale": bkd.to_numpy(cfg.scale).reshape(-1),
    "input_dim": int(cfg.input_dim),
    "latent_dim": int(cfg.latent_dim),
    "activation": str(cfg.act_type).lower()
  }
  config = {}
  for l in ("encoder", "decoder"):
    hidden = cfg.decoder_hidden if (l == "decoder") else cfg.encoder_hidden
    layer = cfg.decoder if (l == "decoder") else cfg.encoder
    mask = cfg.mask if (l == "decoder") else cfg.mask.T
    mask_shape = tuple(mask.shape)
    config[l] = {
      "weights": get_weights_np(layer, mask_shape),
      "mask_shape": mask_shape,
      "mask_indices": np.vstack(mask.nonzero()),
      "hidden_dim": hidden
    }
    config[l].update(config_common)
  return config

def get_weights_np(weights, mask_shape):
  weights = map_nested_dict(weights, bkd.to_numpy)
  weights_np = {}
  for (i, k) in enumerate((0,2)):
    if (f'net.{k}.indices' in weights):
      mask = tuple([weights[f'net.{k}.indices'][j] for j in range(2)])
      wi = sp.csr_matrix(
        (weights[f'net.{k}.weights'], mask), shape=mask_shape
      )
    else:
      wi = weights[f'net.{k}.weight']
    weights_np[f"W{i+1}"] = wi
  weights_np["b1"] = weights['net.0.bias']
  return weights_np

In [None]:
for subdir, dirs, files in os.walk(path_to_models):
  for file in files:
    filename = os.path.join(subdir, file)
    if ("numpy" in filename):
      continue
    cfg = torch.load(filename, map_location="cpu")
    cfg_np = convert(CFG(cfg))
    torch.save(cfg_np, filename[:-2]+"_numpy.p")

In [17]:

import torch
from torch import nn
from sparselinear import SparseLinear
import scipy as sp
import numpy as np

In [18]:

class Linear(nn.Module):

  def __init__(self):
    super().__init__()

  def forward(self, x):
    return x

class Identity(torch.nn.Module):
  '''
  Generic class for decoder part of autoencoder.
  The struture is shallow with one hidden layer.

  inputs:
  latent_dim: dimension of latent dimension
  hidden_dim: dimension of linear hidden layer
  output_dim: dimension of outputs
  mask:       sparsity mask in coo format
  scale:      (output_dim) tensor for scaling input data
  ref:        (output_dim) tensor for shifting input data
  activation: [optional] activation function between hidden and output layer. 'Swish' or 'Sigmoid'. Default is 'Sigmoid'
  '''
  def __init__(self, dim, scale, ref):
    super(Identity, self).__init__()
    self.dim = dim
    self.ref = ref
    self.scale = scale
    mask = sp.sparse.identity(self.dim).tocoo()
    lay = SparseLinear(dim, dim, connectivity=torch.LongTensor(np.vstack((mask.row, mask.col))), bias=False)
    self.net = torch.nn.Sequential(lay, Linear(), lay)
    self.net.apply(self.init_weights)

  def init_weights(self, m):
    if isinstance(m, SparseLinear):
      torch.nn.init.ones_(m.weights)
      if m.bias is not None:
        torch.nn.init.zeros_(m.bias)
  
  def forward(self, x):
    return self.net(x)

In [19]:
class Decoder(torch.nn.Module):
  '''
  Generic class for decoder part of autoencoder.
  The struture is shallow with one hidden layer.

  inputs:
  latent_dim: dimension of latent dimension
  hidden_dim: dimension of linear hidden layer
  output_dim: dimension of outputs
  mask:       sparsity mask in coo format
  scale:      (output_dim) tensor for scaling input data
  ref:        (output_dim) tensor for shifting input data
  activation: [optional] activation function between hidden and output layer. 'Swish' or 'Sigmoid'. Default is 'Sigmoid'
  '''
  def __init__(self, latent_dim, hidden_dim, output_dim, mask,
         scale, ref,
         act_fun=torch.nn.Sigmoid):
    super(Decoder, self).__init__()
    self.latent_dim = latent_dim
    self.hidden_dim = hidden_dim
    self.output_dim = output_dim
    self.scale = scale
    self.ref = ref
    self.net = torch.nn.Sequential(
      SparseLinear(hidden_dim, output_dim, connectivity=torch.LongTensor(np.vstack((mask.row, mask.col))), bias=False),
      act_fun(),
      SparseLinear(hidden_dim, output_dim, connectivity=torch.LongTensor(np.vstack((mask.row, mask.col))), bias=False)
    )
    self.net.apply(self.init_weights)

  def init_weights(self, m):
    if isinstance(m, SparseLinear):
      torch.nn.init.ones_(m.weights)
      if m.bias is not None:
        torch.nn.init.zeros_(m.bias)

  def forward(self, w):
    return self.net((w-self.ref)/self.scale)

In [22]:
latent_dim = 2
hidden_dim = 2
output_dim = 2
mask = sp.sparse.identity(2).tocoo()
ref = torch.rand(2)
scale = torch.rand(2)

iden = Identity(latent_dim, scale, ref)

iden.state_dict()

OrderedDict([('net.0.weights', tensor([1., 1.])),
             ('net.0.indices',
              tensor([[0, 1],
                      [0, 1]])),
             ('net.2.weights', tensor([1., 1.])),
             ('net.2.indices',
              tensor([[0, 1],
                      [0, 1]]))])