# ACG Seminar Assignment -- Neural Volume Compression  

Student: Anže Kristan  
class: Advanced Computer Graphics  
semester: Spring 2023/24  
school: Faculty of Computer and Information Science, University of Ljubljana  

#### imports and setup:

In [1]:
# from google.colab import drive
# drive.mount('/content/drive')

In [2]:
import pandas as pd
import torch
import torch.nn as nn
from torchsummary import summary
import numpy as np
import seaborn as sns
import pandas as pd
from scipy.ndimage import gaussian_filter
from os import path as ospath

In [3]:
folder_path = None #change path
files = {
    "tooth": {
        "file_name": "tooth_103x94x161_1x1x1_uint8.raw",
        "shape": [103, 94, 161],
        "dtype": np.uint8
    }
}

# from https://www.geeksforgeeks.org/reading-binary-files-in-python/

# Open the file in binary mode
def readRawFile(file_path, datatype, shape, doReshape=True, divideBy=2**8, gaussianFilter=False, cutoffLow=(0,0), cutoffHigh=(255,255)):
  array = np.fromfile(file_path, dtype=datatype).astype(np.float32) # read from file based on datatype
  array[array<cutoffLow[0]] = cutoffLow[1] # set values below cutoff[0] to the value of cutoff[1]
  array[array>cutoffHigh[0]] = cutoffHigh[1]
  array = np.divide(array, divideBy) # divide input by specified (to get input into range 0-1)
  if (doReshape):
    array = np.reshape(array, list(reversed(shape))).transpose() # need to make sure reshape takes the same order as the file all x->all y->all z
    if (gaussianFilter):
      array = gaussian_filter(array, sigma=0.1, radius=3)
  return array

#### Dataset class:

Dataset class to help with machine learning; sample it like an array with an index (int) and returns a sample corresponding to that index

In [4]:
class Dataset:
  def __init__(self,samples,stride=[4,4,4],kernel_size=[8,8,8], returnSurroundingIndices=True):
    self.stride = stride # [x,y,z]
    self.kernel_size = kernel_size # single value treated as length of the side of a cube
    self.returnSurroundingIndices = returnSurroundingIndices
    self.orig_shape = samples.shape
    self.samples = self.pad_to_divisible_and_kernel(samples)

    self.unfold_dims = np.array([ # num samples in each dimension
        int(np.ceil((self.orig_shape[0] - self.kernel_size[0])/self.stride[0])+1),
        int(np.ceil((self.orig_shape[1] - self.kernel_size[1])/self.stride[1])+1),
        int(np.ceil((self.orig_shape[2] - self.kernel_size[2])/self.stride[2])+1)
    ])
    self.length =  np.prod(self.unfold_dims) # total num samples

  def pad_to_divisible_and_kernel(self, arr):
    # pad the array to be divisible by kernel size
    to_pad = np.remainder(arr.shape, self.kernel_size)
    to_pad = [self.kernel_size[x] - to_pad[x] if to_pad[x] > 0 else 0 for x in range(3)]
    arr_pd = np.pad(arr, ( (0, to_pad[0]), (0, to_pad[1]), (0, to_pad[2]) ), mode='constant', constant_values='0')

    if self.returnSurroundingIndices:
      # pad an extra kernel_size around array
      extra_pad = [[self.kernel_size[x],self.kernel_size[x]] for x in range(3)]
      arr_pd2 = torch.tensor(np.pad(arr_pd, extra_pad, mode='constant', constant_values=0))
    else:
      arr_pd2 = arr_pd
    return torch.Tensor(arr_pd2)


  def __getitem__(self,index):
    arr_coord = np.multiply(np.unravel_index(index, self.unfold_dims), self.stride)
    if self.returnSurroundingIndices:
      arr_coord = np.add(arr_coord, self.kernel_size) # skip initial kernel_size of padding
    ix = 0
    if self.returnSurroundingIndices:
      surrounding_indices = [-1, 0, 1]
      ret_arr = torch.empty((27, *self.kernel_size))
    else:
      surrounding_indices = [0]
      ret_arr = torch.empty((1, *self.kernel_size))
    for z in surrounding_indices:
      for y in surrounding_indices:
        for x in surrounding_indices:
          tmp_coords = np.add(arr_coord, np.multiply([x, y, z], self.kernel_size))
          # print(ix, tmp_coords, z, y, x)
          ret_arr[ix] = self.get_sub_matrix_from_coords(tmp_coords)
          ix += 1
    return ret_arr

  def get_sub_matrix_from_coords(self, coords):
    batch_start = coords
    # print(batch_start)
    batch_end = np.add(coords, self.kernel_size)
    # print(batch_end)
    ret_arr = self.samples[batch_start[0]:batch_end[0], batch_start[1]:batch_end[1], batch_start[2]:batch_end[2]]
    # print(ret_arr.shape)
    return ret_arr

  def __len__(self):
    return self.length

  def shuffle(self):
    pass

#### Test read write:

Pass model, to see if reading/writing file gives same output as input:

In [5]:
tooth_kernel_size = [8]*3
tooth_stride = [8]*3

In [6]:
tooth_path=ospath.join(folder_path, files["tooth"]["file_name"])
tooth_array=readRawFile(tooth_path, files["tooth"]["dtype"], files["tooth"]["shape"]) #, cutoffLow=(105,0), gaussianFilter=True) # cutoff = (105,0) and gaussianFilter are good denoising parameters for the tooth model
tooth_array.shape

(103, 94, 161)

In [7]:
pd.Series(readRawFile(tooth_path, files["tooth"]["dtype"], files["tooth"]["shape"], doReshape=False, divideBy=1)).describe()

count    1.558802e+06
mean     1.031652e+02
std      4.591717e+01
min      0.000000e+00
25%      7.600000e+01
50%      7.900000e+01
75%      1.220000e+02
max      2.550000e+02
dtype: float64

In [8]:
toothPassDset = Dataset(tooth_array, stride=tooth_stride, kernel_size=tooth_kernel_size, returnSurroundingIndices=False)
print(toothPassDset.length, toothPassDset.samples.shape, toothPassDset.unfold_dims, toothPassDset.orig_shape, tooth_kernel_size, np.remainder((toothPassDset.orig_shape), (tooth_kernel_size)))

3276 torch.Size([104, 96, 168]) [13 12 21] (103, 94, 161) [8, 8, 8] [7 6 1]


In [9]:
pass_array_out = torch.empty(toothPassDset.samples.shape)
for ix in range(toothPassDset.length):
  outIst = np.multiply(np.unravel_index(ix, toothPassDset.unfold_dims), toothPassDset.stride)
  outIed = np.add(outIst, toothPassDset.kernel_size)
  pass_array_out[outIst[0]:outIed[0], outIst[1]:outIed[1], outIst[2]:outIed[2]] = toothPassDset[ix]

In [10]:
print(ix, outIst, outIed)

3275 [ 96  88 160] [104  96 168]


In [11]:
np.sum((pass_array_out == toothPassDset.samples).flatten().numpy())/np.prod(pass_array_out.shape)

1.0

In [12]:
write_array = pass_array_out[0:toothPassDset.orig_shape[0], 0:toothPassDset.orig_shape[1], 0:toothPassDset.orig_shape[2]].numpy()
mse = np.mean((write_array - tooth_array)**2) # from https://stackoverflow.com/a/18047482
print(f"MSE reconstructed - input: {mse}")
write_array = (write_array.transpose().reshape(-1)*255).astype(np.uint8)
write_array[:5]

MSE reconstructed - input: 0.0


array([73, 75, 78, 77, 81], dtype=uint8)

In [13]:
np.sum(readRawFile(tooth_path, files["tooth"]["dtype"], files["tooth"]["shape"], False) == write_array)/np.prod(files["tooth"]["shape"])

9.622774412657926e-06

In [14]:
write_array.tofile(ospath.join(folder_path, "pass_out_test.raw"), format="%<hhf")

In [15]:
read_array = np.fromfile(ospath.join(folder_path, "pass_out_test.raw"), dtype=np.uint8)
read_array.shape

(1558802,)

In [16]:
np.sum(write_array == read_array)

1558802

As it seems like the dataset, reading, writing and such works, we can move on to (neural network) models for compression

#### Fit function:

Define the fit function:

In [17]:
from torch.optim import SGD, Adam
from torch.nn import MSELoss, CrossEntropyLoss, L1Loss
import datetime

def fit(model, dset_train, num_epochs=1, train_batches=1, optimFunc=Adam, lossFunc=MSELoss, folder_path=folder_path, verbose=2):
  # with help from https://pytorch.org/tutorials/beginner/introyt/trainingyt.html

  curr_best_loss = np.Infinity
  training_losses = []
  save_path = ospath.join(folder_path, model.name)

  optimizer = optimFunc(model.parameters(), lr=0.1)
  loss = lossFunc()
  model.train(True)
  start_time = datetime.datetime.now()
  if (verbose > 0):
    print(f"starting training for model '{model.name}' at: {datetime.datetime.now()}")
  for i in range(num_epochs):
    t_losses = []

    for j in range(train_batches):
      x_train = dset_train[j]
      p = model(x_train)
      l = loss(p, x_train)

      optimizer.zero_grad()
      l.backward()
      optimizer.step()

      t_losses.append(l.item())

    curr_loss = np.mean(t_losses)
    training_losses.append(curr_loss)

    if curr_loss < curr_best_loss:
      curr_best_loss = curr_loss
      torch.save(model.state_dict(), save_path)
    if (verbose > 1):
      print(f"epoch {i+1}/{num_epochs} at time {datetime.datetime.now().time()}; current loss: {curr_loss}")
    elif (verbose == 1):
      print(f"epoch {i+1}/{num_epochs: <{10}}", end="\r")

  end_time = datetime.datetime.now()
  if (verbose > 0):
    print(f"end of training {num_epochs} epochs: {datetime.datetime.now()}; elapsed time: {end_time - start_time}")
  best_model = model
  best_model.load_state_dict(torch.load(save_path))

  return best_model, training_losses

#### Linear models:

##### AELinPass  
Try a pass linear model to make sure it works

In [18]:
class autoEncLinPass(nn.Module):

  def __init__(self, name="linae1", kernel_size=[8,8,8]):
    super(autoEncLinPass, self).__init__()
    sample_size = np.prod(kernel_size)
    self.encoder = nn.Sequential(
      nn.Flatten(),
      )
    self.decoder = nn.Sequential(
      nn.Unflatten(1, [1, *kernel_size]),
      )
    self.name = name

  def get_encoding(self, x):
    return self.encoder(x)

  def get_decoding(self, c):
    return self.decoder(c)

  def forward(self, x):
    c = self.encoder(x)
    y = self.decoder(c)
    return y

In [19]:
def trainAndOutputAutoEncPassModel(folder_path=folder_path, model=autoEncLinPass, kernel_size=[4,4,4], stride_size=[4,4,4], epochs=4, name="model1lin", vol_name="tooth", verbose=True):
  # to train model
  tooth_path=ospath.join(folder_path, files[vol_name]["file_name"])
  tooth_array=readRawFile(tooth_path, files[vol_name]["dtype"], files[vol_name]["shape"])#, cutoffLow=(105,0))#, gaussianFilter=True)
  model1toothDset = Dataset(tooth_array, stride=stride_size, kernel_size=kernel_size, returnSurroundingIndices=False)
  model1 = model(name=name, kernel_size=kernel_size)

  # to output encoded representation
  if (verbose):
    print(f"starting encoding at: {datetime.datetime.now()}")
  toothEncDset = Dataset(tooth_array, stride=kernel_size, kernel_size=kernel_size, returnSurroundingIndices=False)
  model1_encoded_reps = np.array([model1.encoder(toothEncDset[x]).detach().numpy().flatten() for x in range(toothEncDset.length)])
  model1_encoded_reps.reshape(-1).tofile(ospath.join(folder_path, f"{model1.name}.enc"), format="%<f")

  # output reconstructed model
  if (verbose):
      print(f"starting decoding at: {datetime.datetime.now()}")
  model1_reconstructed = np.empty(toothEncDset.samples.shape)
  for ix, enc in enumerate(model1_encoded_reps):
    tmp_dec = model1.decoder(torch.tensor(enc).reshape((1,-1))).detach().numpy()
    ixst = np.multiply(np.unravel_index(ix, toothEncDset.unfold_dims), toothEncDset.stride)
    ixed = np.add(ixst,toothEncDset.kernel_size)
    # print(ix, ixst, ixed)
    model1_reconstructed[ixst[0]:ixed[0], ixst[1]:ixed[1], ixst[2]:ixed[2]] = tmp_dec

  model1_reconstructed = model1_reconstructed[0:toothEncDset.orig_shape[0], 0:toothEncDset.orig_shape[1], 0:toothEncDset.orig_shape[2]]
  # model1_reconstructed = gaussian_filter(model1_reconstructed, sigma=1, radius=np.divide(kernel_size,2).astype(np.int32)) # filter output to get rid of some of the blockiness
  mse = np.mean(np.power((model1_reconstructed - tooth_array),2)) # from https://stackoverflow.com/a/18047482
  print(f"MSE reconstructed - input: {mse}")
  model1_reconstructed = (model1_reconstructed*255).astype(np.uint8).transpose().reshape(-1)
  model1_reconstructed.tofile(ospath.join(folder_path, f"{model1.name}.raw"), format="%<hhf")

  return

In [20]:
model1_kernel = [8]*3
model1_stride = [8]*3
model1_name = "linae1pass_8_8"
model1 = autoEncLinPass

model1_epochs = 20 # 0 epochs just runs the encoder/decoder without training

# trainAndOutputAutoEncPassModel(folder_path=folder_path, model=model1, kernel_size=model1_kernel, stride_size=model1_stride, epochs=model1_epochs, name=model1_name, vol_name="tooth", verbose=True)

##### AELin1

In [21]:
class autoEncLin1(nn.Module):

  def __init__(self, name="linae1", kernel_size=[8,8,8], enc_size=9):
    super(autoEncLin1, self).__init__()
    sample_size = np.prod(kernel_size)
    self.encoder = nn.Sequential(
      nn.Flatten(),
      nn.Linear(in_features=sample_size, out_features=int(sample_size/2)),
      nn.PReLU(), #use parametric relu for activation functions
      nn.Linear(int(sample_size/2), int(sample_size/4)),
      nn.PReLU(),
      nn.Linear(int(sample_size/4), int(sample_size/8)),
      nn.PReLU(),
      nn.Linear(int(sample_size/8), enc_size),
      nn.PReLU(),
      )
    self.decoder = nn.Sequential(
      nn.Linear(enc_size, int(sample_size/8)),
      nn.PReLU(),
      nn.Linear(int(sample_size/8), int(sample_size/4)),
      nn.PReLU(),
      nn.Linear(int(sample_size/4), int(sample_size/2)),
      nn.PReLU(),
      nn.Linear(int(sample_size/2), int(sample_size)),
      nn.Unflatten(1, kernel_size),
      nn.Sigmoid(),
      )
    self.name = name

  def get_encoding(self, x):
    return self.encoder(x)

  def get_decoding(self, c):
    return self.decoder(c)

  def forward(self, x):
    c = self.encoder(x)
    y = self.decoder(c)
    return y

In [40]:
def trainAndOutputAutoEnc1Model(folder_path=folder_path, model=autoEncLin1, kernel_size=[4,4,4], stride_size=[4,4,4], enc_size=4, epochs=4, name="model1lin", optim=SGD, loss=MSELoss, vol_name="tooth", verbose=0, load_model_weights=False):
  # to train model
  tooth_path=ospath.join(folder_path, files[vol_name]["file_name"])
  tooth_array=readRawFile(tooth_path, files[vol_name]["dtype"], files[vol_name]["shape"], cutoffLow=(100,0))#, gaussianFilter=True)
  model1toothDset = Dataset(tooth_array, stride=stride_size, kernel_size=kernel_size, returnSurroundingIndices=False)
  model1 = model(name=name, kernel_size=kernel_size, enc_size=enc_size)
  if (load_model_weights):
    save_path = ospath.join(folder_path, model1.name)
    model1.load_state_dict(torch.load(save_path))
  model1_best, model1_losses = fit(model1, model1toothDset, num_epochs=epochs, train_batches=model1toothDset.length-1, optimFunc=optim, lossFunc=loss, folder_path=folder_path, verbose=verbose)

  # to output encoded representation
  if (verbose > 0):
    print(f"starting encoding at: {datetime.datetime.now()}")
  toothEncDset = Dataset(tooth_array, stride=kernel_size, kernel_size=kernel_size, returnSurroundingIndices=False)
  model1_encoded_reps = np.array([model1_best.encoder(toothEncDset[x]).detach().numpy().flatten().astype(np.float16) for x in range(toothEncDset.length)])
  model1_encoded_reps.reshape(-1).tofile(ospath.join(folder_path, f"{model1.name}.enc"), format="%<hf")

  # output reconstructed model
  if (verbose>0):
      print(f"starting decoding at: {datetime.datetime.now()}")
  model1_reconstructed = np.empty(toothEncDset.samples.shape)
  for ix, enc in enumerate(model1_encoded_reps):
    tmp_dec = model1_best.decoder(torch.tensor(enc.astype(np.float32)).reshape((1,-1))).detach().numpy()
    ixst = np.multiply(np.unravel_index(ix, toothEncDset.unfold_dims), toothEncDset.stride)
    ixed = np.add(ixst,toothEncDset.kernel_size)
    # print(ix, ixst, ixed)
    model1_reconstructed[ixst[0]:ixed[0], ixst[1]:ixed[1], ixst[2]:ixed[2]] = tmp_dec
  if (verbose>0):
      print(f"finish decoding at: {datetime.datetime.now()}")

  model1_reconstructed = model1_reconstructed[0:toothEncDset.orig_shape[0], 0:toothEncDset.orig_shape[1], 0:toothEncDset.orig_shape[2]]
  # model1_reconstructed = gaussian_filter(model1_reconstructed, sigma=1, radius=np.divide(kernel_size,2).astype(np.float32)) # filter output to get rid of some of the blockiness
  mse = np.mean((model1_reconstructed - tooth_array)**2) # from https://stackoverflow.com/a/18047482
  mae = np.mean(np.abs((model1_reconstructed - tooth_array)))
  print(f"MSE reconstructed - input: {mse}; MAE: {mae}")
  model1_reconstructed = (model1_reconstructed*255).astype(np.uint8).transpose().reshape(-1)
  model1_reconstructed.tofile(ospath.join(folder_path, f"{model1.name}.raw"), format="%<hhf")

  model1_decoder = model1_best
  model1_decoder.encoder = None
  torch.save(model1_decoder.state_dict(), ospath.join(folder_path, f"{model1.name}.dec"))

  return model1_losses

In [41]:
model1_kernel = [8]*3
model1_stride = [8]*3
model1_enc_size = 8
model1_name = "linae1_8_8_8"
model1 = autoEncLin1

model1_epochs = 1 # 0 epochs just runs the encoder/decoder without training
model1_optim = SGD  #Adam#SGD
model1_loss = MSELoss   #CrossEntropyLoss#MSELoss
model1_load_weights = True # if changing anything above this line (especially kernel,stride,enc_size,name), set load_model_weights to False

model1_losses = trainAndOutputAutoEnc1Model(folder_path, model1, model1_kernel, model1_stride, model1_enc_size, model1_epochs, model1_name, model1_optim, model1_loss, vol_name="tooth", verbose=2, load_model_weights=model1_load_weights)
pd.Series(model1_losses).describe()

starting training for model 'linae1_8_8_8' at: 2024-05-26 15:38:03.882953
epoch 1/1 at time 15:38:14.897525; current loss: 0.0012940410842699234
end of training 1 epochs: 2024-05-26 15:38:14.897757; elapsed time: 0:00:11.014809
starting encoding at: 2024-05-26 15:38:14.909588
starting decoding at: 2024-05-26 15:38:16.133603
finish decoding at: 2024-05-26 15:38:17.270399
MSE reconstructed - input: 0.0017210463625708953; MAE: 0.015702462532426957


count    1.000000
mean     0.001294
std           NaN
min      0.001294
25%      0.001294
50%      0.001294
75%      0.001294
max      0.001294
dtype: float64

In [42]:
model1_kernel = [4]*3
model1_stride = [4]*3
model1_enc_size = 8
model1_name = "linae1_4_4_8"
model1 = autoEncLin1

model1_epochs = 1 # 0 epochs just runs the encoder/decoder without training
model1_optim = SGD  #Adam#SGD
model1_loss = MSELoss   #CrossEntropyLoss#MSELoss
model1_load_weights = True # if changing anything above this line (especially kernel,stride,enc_size,name), set load_model_weights to False

model1_losses = trainAndOutputAutoEnc1Model(folder_path, model1, model1_kernel, model1_stride, model1_enc_size, model1_epochs, model1_name, model1_optim, model1_loss, vol_name="tooth", verbose=2, load_model_weights=model1_load_weights)
pd.Series(model1_losses).describe()

starting training for model 'linae1_4_4_8' at: 2024-05-26 15:38:17.387903
epoch 1/1 at time 15:39:12.868775; current loss: 0.0006420350716156618
end of training 1 epochs: 2024-05-26 15:39:12.869028; elapsed time: 0:00:55.481128
starting encoding at: 2024-05-26 15:39:12.877005
starting decoding at: 2024-05-26 15:39:20.718999
finish decoding at: 2024-05-26 15:39:29.386775
MSE reconstructed - input: 0.000742868928268601; MAE: 0.009005612449657726


count    1.000000
mean     0.000642
std           NaN
min      0.000642
25%      0.000642
50%      0.000642
75%      0.000642
max      0.000642
dtype: float64

##### AELin2

In [25]:
class autoEncLin2(nn.Module):

  def __init__(self, name="linae2", kernel_size=[8,8,8], enc_size=9):
    super(autoEncLin2, self).__init__()
    sample_size = np.prod(kernel_size)
    self.encoder = nn.Sequential(
      nn.Flatten(),
      nn.Linear(in_features=sample_size, out_features=int(sample_size/8)),
      nn.PReLU(), #use parametric relu for activation functions
      nn.Linear(int(sample_size/8), int(sample_size/8)),
      nn.PReLU(),
      nn.Linear(int(sample_size/8), int(sample_size/12)),
      nn.PReLU(),
      nn.Linear(int(sample_size/12), enc_size),
      nn.PReLU(),
      )
    self.decoder = nn.Sequential(
      nn.Linear(enc_size, int(sample_size/12)),
      nn.PReLU(),
      nn.Linear(int(sample_size/12), int(sample_size/8)),
      nn.PReLU(),
      nn.Linear(int(sample_size/8), int(sample_size/8)),
      nn.PReLU(),
      nn.Linear(int(sample_size/8), int(sample_size)),
      nn.Unflatten(1, kernel_size),
      nn.Sigmoid(),
      )
    self.name = name

  def get_encoding(self, x):
    return self.encoder(x)

  def get_decoding(self, c):
    return self.decoder(c)

  def forward(self, x):
    c = self.encoder(x)
    y = self.decoder(c)
    return y

In [44]:
model1_kernel = [8]*3
model1_stride = [8]*3
model1_enc_size = 8
model1_name = f"linae2_8_8_{model1_enc_size}"
model1 = autoEncLin2

model1_epochs = 1 # 0 epochs just runs the encoder/decoder without training
model1_optim = SGD  #Adam#SGD
model1_loss = MSELoss   #CrossEntropyLoss#MSELoss
model1_load_weights = True # if changing anything above this line (especially kernel,stride,enc_size,name), set load_model_weights to False

model1_losses = trainAndOutputAutoEnc1Model(folder_path, model1, model1_kernel, model1_stride, model1_enc_size, model1_epochs, model1_name, model1_optim, model1_loss, vol_name="tooth", verbose=2, load_model_weights=model1_load_weights)
pd.Series(model1_losses).describe()

starting training for model 'linae2_8_8_8' at: 2024-05-26 15:43:32.260149
epoch 1/1 at time 15:43:39.872894; current loss: 0.0022808388583393066
end of training 1 epochs: 2024-05-26 15:43:39.874846; elapsed time: 0:00:07.614695
starting encoding at: 2024-05-26 15:43:39.882852
starting decoding at: 2024-05-26 15:43:41.070351
finish decoding at: 2024-05-26 15:43:42.214508
MSE reconstructed - input: 0.0029273420156157907; MAE: 0.02294337057319233


count    1.000000
mean     0.002281
std           NaN
min      0.002281
25%      0.002281
50%      0.002281
75%      0.002281
max      0.002281
dtype: float64

In [45]:
model1_kernel = [4]*3
model1_stride = [4]*3
model1_enc_size = 8
model1_name = f"linae2_4_4_{model1_enc_size}"
model1 = autoEncLin2

model1_epochs = 1 # 0 epochs just runs the encoder/decoder without training
model1_optim = SGD  #Adam#SGD
model1_loss = MSELoss   #CrossEntropyLoss#MSELoss
model1_load_weights = True # if changing anything above this line (especially kernel,stride,enc_size,name), set load_model_weights to False

model1_losses = trainAndOutputAutoEnc1Model(folder_path, model1, model1_kernel, model1_stride, model1_enc_size, model1_epochs, model1_name, model1_optim, model1_loss, vol_name="tooth", verbose=2, load_model_weights=model1_load_weights)
pd.Series(model1_losses).describe()

starting training for model 'linae2_4_4_8' at: 2024-05-26 15:43:42.378243
epoch 1/1 at time 15:44:35.730839; current loss: 0.0017531924037302
end of training 1 epochs: 2024-05-26 15:44:35.731926; elapsed time: 0:00:53.353684
starting encoding at: 2024-05-26 15:44:35.737713
starting decoding at: 2024-05-26 15:44:45.032692
finish decoding at: 2024-05-26 15:44:52.306596
MSE reconstructed - input: 0.0021151287194519485; MAE: 0.016955845855715797


count    1.000000
mean     0.001753
std           NaN
min      0.001753
25%      0.001753
50%      0.001753
75%      0.001753
max      0.001753
dtype: float64

#### Non-neural encoders (or mixed models)

Try a model that uses coordinates, variances and singular values

In [50]:
from torch.optim import SGD, Adam
from torch.nn import MSELoss, CrossEntropyLoss
import datetime

def fitWithIndex(model, dset_train, num_epochs=1, train_batches=1, optimFunc=Adam, lossFunc=MSELoss, folder_path=folder_path, verbose=2):
  # with help from https://pytorch.org/tutorials/beginner/introyt/trainingyt.html

  curr_best_loss = np.Infinity
  training_losses = []
  save_path = ospath.join(folder_path, model.name)

  optimizer = optimFunc(model.parameters(), lr=0.1)
  loss = lossFunc()
  model.train(True)
  start_time = datetime.datetime.now()
  if (verbose > 0):
    print(f"starting training for model '{model.name}' at: {datetime.datetime.now()}")
  for i in range(num_epochs):
    t_losses = []

    for j in range(train_batches):
      x_train = dset_train[j]
      p = model(x_train, j)
      l = loss(p, x_train)

      optimizer.zero_grad()
      l.backward()
      optimizer.step()

      t_losses.append(l.item())

    curr_loss = np.mean(t_losses)
    training_losses.append(curr_loss)

    if curr_loss < curr_best_loss:
      curr_best_loss = curr_loss
      torch.save(model.state_dict(), save_path)
    if (verbose > 1):
      print(f"epoch {i+1}/{num_epochs} at time {datetime.datetime.now().time()}; current loss: {curr_loss}")
    elif (verbose == 1):
      print(f"epoch {i+1}/{num_epochs: <{10}}", end="\r")

  end_time = datetime.datetime.now()
  if (verbose > 0):
    print(f"end of training {num_epochs} epochs: {datetime.datetime.now()}; elapsed time: {end_time - start_time}")
  best_model = model
  best_model.load_state_dict(torch.load(save_path))

  return best_model, training_losses

##### Mixmodel1

In [51]:
class MixedModel1(nn.Module):

  def get_encoding(self, x, index):
    arr_coord = np.multiply(np.unravel_index(index, self.dset.unfold_dims), self.dset.stride)+self.kernel_size # add one kernel_size to skip padded 0s
    ret_coords = np.divide(arr_coord, self.dset.samples.shape)
    retCodeInfo = torch.empty(9, dtype=torch.float32)
    # get normalized coords
    retCodeInfo[0:3] = torch.tensor(ret_coords[0:3], dtype=torch.float32)
    # get mean variances
    retCodeInfo[3] = torch.mean(torch.var(torch.reshape(x, self.kernel_size), axis=(0)))
    retCodeInfo[4] = torch.mean(torch.var(torch.reshape(x, self.kernel_size), axis=(1)))
    retCodeInfo[5] = torch.mean(torch.var(torch.reshape(x, self.kernel_size), axis=(2)))
    # get svds
    retCodeInfo[6:9] = torch.linalg.svdvals(torch.reshape(x, self.kernel_size))[0:3,0]
    #combine into ret_array

    return retCodeInfo.reshape((1,-1))

  def __init__(self, dset, name="mxae1"):
    super(MixedModel1, self).__init__()
    self.dset = dset
    kernel_size = dset.kernel_size
    self.kernel_size = kernel_size
    sample_size = np.prod(kernel_size)
    self.encoder = self.get_encoding
    self.decoder = nn.Sequential(
      nn.Linear(9, int(sample_size/8)),
      nn.PReLU(),
      nn.Linear(int(sample_size/8), int(sample_size/4)),
      nn.PReLU(),
      nn.Linear(int(sample_size/4), int(sample_size/2)),
      nn.PReLU(),
      nn.Linear(int(sample_size/2), int(sample_size)),
      nn.Unflatten(1, kernel_size),
      nn.Sigmoid(),
      )
    self.name = name


  def get_decoding(self, c):
    return self.decoder(c)

  def forward(self, x, i):
    c = self.encoder(x, i)
    y = self.decoder(c)
    return y

In [52]:
def trainAndOutputMixedModel(folder_path=folder_path, model=MixedModel1, kernel_size=[4,4,4], stride_size=[4,4,4], epochs=4, name="model1mix", optim=SGD, loss=MSELoss, vol_name="tooth", verbose=True, load_model_weights=False):
  # to train model
  tooth_path=ospath.join(folder_path, files[vol_name]["file_name"])
  tooth_array=readRawFile(tooth_path, files[vol_name]["dtype"], files[vol_name]["shape"], cutoffLow=(105, 0))
  model1toothDset = Dataset(tooth_array, stride=stride_size, kernel_size=kernel_size, returnSurroundingIndices=False)

  model1 = model(dset=model1toothDset, name=name)
  if (load_model_weights):
    save_path = ospath.join(folder_path, model1.name)
    model1.load_state_dict(torch.load(save_path))
  model1_best, model1_losses = fitWithIndex(model1, model1toothDset, num_epochs=epochs, train_batches=model1toothDset.length-1, optimFunc=optim, lossFunc=loss, folder_path=folder_path, verbose=verbose)

  # to output encoded representation
  if (verbose):
    print(f"starting encoding at: {datetime.datetime.now()}")
  toothEncDset = Dataset(tooth_array, stride=kernel_size, kernel_size=kernel_size, returnSurroundingIndices=False)
  model1_encoded_reps = np.array([model1_best.encoder(toothEncDset[x], x).detach().numpy().flatten().astype(np.float16) for x in range(toothEncDset.length)])
  model1_encoded_reps.reshape(-1).tofile(ospath.join(folder_path, f"{model1.name}.enc"), format="%<hf")

  # output reconstructed model
  if (verbose):
      print(f"starting decoding at: {datetime.datetime.now()}")
  model1_reconstructed = np.empty(toothEncDset.samples.shape)
  for ix, enc in enumerate(model1_encoded_reps):
    tmp_dec = model1_best.decoder(torch.tensor(enc, dtype=torch.float32).reshape((1,-1))).detach().numpy()
    ixst = np.multiply(np.unravel_index(ix, toothEncDset.unfold_dims), toothEncDset.stride)
    ixed = np.add(ixst,toothEncDset.kernel_size)
    # print(ix, ixst, ixed)
    model1_reconstructed[ixst[0]:ixed[0], ixst[1]:ixed[1], ixst[2]:ixed[2]] = tmp_dec
  if (verbose>0):
      print(f"finish decoding at: {datetime.datetime.now()}")

  model1_reconstructed = model1_reconstructed[0:toothEncDset.orig_shape[0], 0:toothEncDset.orig_shape[1], 0:toothEncDset.orig_shape[2]]
  # model1_reconstructed = gaussian_filter(model1_reconstructed, sigma=1, radius=np.divide(kernel_size,2).astype(np.float32)) # filter output to get rid of some of the blockiness
  mse = np.mean((model1_reconstructed - tooth_array)**2) # from https://stackoverflow.com/a/18047482
  mae = np.mean(np.abs((model1_reconstructed - tooth_array)))
  print(f"MSE reconstructed - input: {mse}; MAE: {mae}")
  model1_reconstructed = (model1_reconstructed*255).astype(np.uint8).transpose().reshape(-1)
  model1_reconstructed.tofile(ospath.join(folder_path, f"{model1.name}.raw"), format="%<hhf")

  return model1_losses

In [53]:
model1_kernel = [4]*3
model1_stride = model1_kernel
model1_name = "coordvarsvd1_4_4"
model1 = MixedModel1

model1_epochs = 1 # 0 epochs just runs the encoder/decoder without training
model1_optim = SGD  #Adam#SGD
model1_loss = MSELoss   #CrossEntropyLoss#MSELoss
model1_load_weights = True # if changing anything above this line (especially kernel,stride,enc_size,name), set load_model_weights to False

model1_losses = trainAndOutputMixedModel(folder_path, model1, model1_kernel, model1_stride, model1_epochs, model1_name, model1_optim, model1_loss, vol_name="tooth", verbose=2, load_model_weights=model1_load_weights)
pd.Series(model1_losses).describe()

starting training for model 'coordvarsvd1_4_4' at: 2024-05-26 15:49:10.338370
epoch 1/1 at time 15:49:56.141748; current loss: 0.001932225920083854
end of training 1 epochs: 2024-05-26 15:49:56.141933; elapsed time: 0:00:45.803566
starting encoding at: 2024-05-26 15:49:56.147734
starting decoding at: 2024-05-26 15:50:05.656440
finish decoding at: 2024-05-26 15:50:13.344371
MSE reconstructed - input: 0.004003983363984769; MAE: 0.022566684042712035


count    1.000000
mean     0.001932
std           NaN
min      0.001932
25%      0.001932
50%      0.001932
75%      0.001932
max      0.001932
dtype: float64

##### Mixmodel2

In [54]:
class MixedModel2(nn.Module):

  def get_encoding(self, x, index):
    arr_coord = np.multiply(np.unravel_index(index, self.dset.unfold_dims), self.dset.stride)+self.kernel_size # add one kernel_size to skip padded 0s
    ret_coords = np.divide(arr_coord, self.dset.samples.shape)
    retCodeInfo = torch.empty(3, dtype=torch.float32)
    retCodeInfo = torch.linalg.svdvals(torch.reshape(x, self.kernel_size))[0:3,0]

    return retCodeInfo.reshape((1,-1))

  def __init__(self, dset, name="mxae2"):
    super(MixedModel2, self).__init__()
    self.dset = dset
    kernel_size = dset.kernel_size
    self.kernel_size = kernel_size
    sample_size = np.prod(kernel_size)
    self.encoder = self.get_encoding
    self.decoder = nn.Sequential(
      nn.Linear(3, int(sample_size/8)),
      nn.PReLU(),
      nn.Linear(int(sample_size/8), int(sample_size/4)),
      nn.PReLU(),
      nn.Linear(int(sample_size/4), int(sample_size/2)),
      nn.PReLU(),
      nn.Linear(int(sample_size/2), int(sample_size)),
      nn.Unflatten(1, kernel_size),
      nn.Sigmoid(),
      )
    self.name = name


  def get_decoding(self, c):
    return self.decoder(c)

  def forward(self, x, i):
    c = self.encoder(x, i)
    y = self.decoder(c)
    return y

In [55]:
model1_kernel = [4]*3
model1_stride = model1_kernel
model1_name = "svd_4_4"
model1 = MixedModel2

model1_epochs = 1 # 0 epochs just runs the encoder/decoder without training
model1_optim = SGD  #Adam#SGD
model1_loss = MSELoss   #CrossEntropyLoss#MSELoss
model1_load_weights = True # if changing anything above this line (especially kernel,stride,enc_size,name), set load_model_weights to False

model1_losses = trainAndOutputMixedModel(folder_path, model1, model1_kernel, model1_stride, model1_epochs, model1_name, model1_optim, model1_loss, vol_name="tooth", verbose=2, load_model_weights=model1_load_weights)
pd.Series(model1_losses).describe()

starting training for model 'svd_4_4' at: 2024-05-26 15:50:13.463935
epoch 1/1 at time 15:50:54.582840; current loss: 0.003970364428952854
end of training 1 epochs: 2024-05-26 15:50:54.583052; elapsed time: 0:00:41.119118
starting encoding at: 2024-05-26 15:50:54.588614
starting decoding at: 2024-05-26 15:50:59.283024
finish decoding at: 2024-05-26 15:51:08.095513
MSE reconstructed - input: 0.0049014430961756205; MAE: 0.0259280144515157


count    1.00000
mean     0.00397
std          NaN
min      0.00397
25%      0.00397
50%      0.00397
75%      0.00397
max      0.00397
dtype: float64

##### Mixmodel3

In [56]:
class MixedModel3(nn.Module):

  def get_encoding(self, x, index):
    arr_coord = np.multiply(np.unravel_index(index, self.dset.unfold_dims), self.dset.stride)+self.kernel_size # add one kernel_size to skip padded 0s
    ret_coords = np.divide(arr_coord, self.dset.samples.shape)
    retCodeInfo = torch.empty(3, dtype=torch.float32)
    retCodeInfo = torch.linalg.svdvals(torch.reshape(x, self.kernel_size))[0:3,0]

    return retCodeInfo.reshape((1,-1))

  def __init__(self, dset, name="mxae3"):
    super(MixedModel3, self).__init__()
    self.dset = dset
    kernel_size = dset.kernel_size
    self.kernel_size = kernel_size
    sample_size = np.prod(kernel_size)
    self.encoder = self.get_encoding
    self.decoder = nn.Sequential(
      nn.Linear(3, int(sample_size/12)),
      nn.PReLU(),
      nn.Linear(int(sample_size/12), int(sample_size/8)),
      nn.PReLU(),
      nn.Linear(int(sample_size/8), int(sample_size/8)),
      nn.PReLU(),
      nn.Linear(int(sample_size/8), int(sample_size)),
      nn.Unflatten(1, kernel_size),
      nn.Sigmoid(),
      )
    self.name = name


  def get_decoding(self, c):
    return self.decoder(c)

  def forward(self, x, i):
    c = self.encoder(x, i)
    y = self.decoder(c)
    return y

In [60]:
model1_kernel = [4]*3
model1_stride = model1_kernel
model1_name = "svd2_4_4"
model1 = MixedModel3

model1_epochs = 1 # 0 epochs just runs the encoder/decoder without training
model1_optim = SGD  #Adam#SGD
model1_loss = MSELoss   #CrossEntropyLoss#MSELoss
model1_load_weights = True # if changing anything above this line (especially kernel,stride,enc_size,name), set load_model_weights to False

model1_losses = trainAndOutputMixedModel(folder_path, model1, model1_kernel, model1_stride, model1_epochs, model1_name, model1_optim, model1_loss, vol_name="tooth", verbose=2, load_model_weights=model1_load_weights)
pd.Series(model1_losses).describe()

starting training for model 'svd2_4_4' at: 2024-05-26 15:59:16.224574
epoch 1/1 at time 15:59:57.004942; current loss: 0.003994967876574053
end of training 1 epochs: 2024-05-26 15:59:57.005140; elapsed time: 0:00:40.780569
starting encoding at: 2024-05-26 15:59:57.011107
starting decoding at: 2024-05-26 16:00:01.841116
finish decoding at: 2024-05-26 16:00:10.194839
MSE reconstructed - input: 0.004972182177168173; MAE: 0.02673630686496637


count    1.000000
mean     0.003995
std           NaN
min      0.003995
25%      0.003995
50%      0.003995
75%      0.003995
max      0.003995
dtype: float64

##### Mixmodel4

In [58]:
class MixedModel4(nn.Module):

  def get_encoding(self, x, index):
    arr_coord = np.multiply(np.unravel_index(index, self.dset.unfold_dims), self.dset.stride)+self.kernel_size # add one kernel_size to skip padded 0s
    ret_coords = np.divide(arr_coord, self.dset.samples.shape)
    retCodeInfo = torch.tensor(ret_coords, dtype=torch.float32)

    return retCodeInfo.reshape((1,-1))

  def __init__(self, dset, name="mxae4"):
    super(MixedModel4, self).__init__()
    self.dset = dset
    kernel_size = dset.kernel_size
    self.kernel_size = kernel_size
    sample_size = np.prod(kernel_size)
    self.encoder = self.get_encoding
    self.decoder = nn.Sequential(
      nn.Linear(3, int(sample_size/12)),
      nn.PReLU(),
      nn.Linear(int(sample_size/12), int(sample_size/8)),
      nn.PReLU(),
      nn.Linear(int(sample_size/8), int(sample_size/8)),
      nn.PReLU(),
      nn.Linear(int(sample_size/8), int(sample_size)),
      nn.Unflatten(1, kernel_size),
      nn.Sigmoid(),
      )
    self.name = name


  def get_decoding(self, c):
    return self.decoder(c)

  def forward(self, x, i):
    c = self.encoder(x, i)
    y = self.decoder(c)
    return y

In [59]:
model1_kernel = [4]*3
model1_stride = model1_kernel
model1_name = "coords2_4_4"
model1 = MixedModel4

model1_epochs = 0 # 0 epochs just runs the encoder/decoder without training
model1_optim = SGD  #Adam#SGD
model1_loss = MSELoss   #CrossEntropyLoss#MSELoss
model1_load_weights = True # if changing anything above this line (especially kernel,stride,enc_size,name), set load_model_weights to False

# model1_losses = trainAndOutputMixedModel(folder_path, model1, model1_kernel, model1_stride, model1_epochs, model1_name, model1_optim, model1_loss, vol_name="tooth", verbose=2, load_model_weights=model1_load_weights)
# pd.Series(model1_losses).describe()