# From NN to ONN, different strategies comparison








Given Y=WX a standard trained NN layer, with Y the prediction, X input data, and W ALREADY TRAINED weights.

Convert the weights W into a sequence of 2x2 rotation matrices (describing MZI) and scaling vector (photonic attenuator).

Source:

[1] https://aip.scitation.org/doi/10.1063/5.0070913# Inspiration for designing the strategy 1

[2] https://github.com/Bihaqo/t3f/ T3F framework for TensorTrain arithmetics based on Tensorflow

[3] https://www.comsol.fr/blogs/analyzing-an-optical-computation-device-with-simulation/

In [239]:
np.random.seed(0)
weights = np.random.uniform(-1., +1., (4, 4))
X=np.random.uniform(-1., +1., (4, 1))

Y=np.dot(weights, X)
print("X")
print(X)
print("weights")
print(weights)
print("Y")
print(Y)

X
[[-0.95956321]
 [ 0.66523969]
 [ 0.5563135 ]
 [ 0.7400243 ]]
weights
[[ 0.09762701  0.43037873  0.20552675  0.08976637]
 [-0.1526904   0.29178823 -0.12482558  0.783546  ]
 [ 0.92732552 -0.23311696  0.58345008  0.05778984]
 [ 0.13608912  0.85119328 -0.85792788 -0.8257414 ]]
Y
[[ 0.37339233]
 [ 0.85102612]
 [-0.67755906]
 [-0.65268413]]


# Strategy 1: rank-2 TensorTrain decomposition

Illustration of the strategy:

![alt text](SVD.png "svd decomposition")



## Util functions:
* from_arr_to_tt : Converts np.ndarray into TensorTrain
* from_tt_to_arr : Converts TensorTrain format into np.ndarray
* tt_dot : Dot product between TensorTrain tensors

In [1]:
import numpy as np


def ranks(tt_cores):
    ranks = []
    for i in range(len(tt_cores)):
        s = tt_cores[i].shape[0]
        ranks.append(s)
    s = tt_cores[-1].shape[-1]
    ranks.append(s)
    return np.stack(ranks, axis=0)


class TensorTrain:
    def __init__(self, tt_cores, tt_shapes, tt_ranks):
        self.tt_cores = tt_cores
        self.tt_shapes = tt_shapes
        self.tt_ranks = tt_ranks


def from_tt_to_arr(tt:TensorTrain, original_shape:tuple) -> np.ndarray:
    """Converts a TensorTrain into a regular tensor or matrix."""
    tt_ranks = tt.tt_ranks
    res = tt.tt_cores[0]
    for i in range(1, len(tt.tt_cores)):
        res = np.reshape(res, (-1, tt_ranks[i]))
        curr_core = np.reshape(tt.tt_cores[i], (tt_ranks[i], -1))
        res = np.matmul(res, curr_core)
    return np.reshape(res, original_shape)


def _from_nd_arr_to_tt(arr:np.ndarray, max_tt_rank:int=10) -> TensorTrain:
    """Converts a given Numpy array to a TT-tensor of the same shape."""
    static_shape = list(arr.shape)
    dynamic_shape = arr.shape
    d = static_shape.__len__()
    max_tt_rank = np.array(max_tt_rank).astype(np.int32)
    if max_tt_rank.size == 1:
        max_tt_rank = (max_tt_rank * np.ones(d + 1)).astype(np.int32)
    ranks = [1] * (d + 1)
    tt_cores = []
    are_tt_ranks_defined = True
    for core_idx in range(d - 1):
        curr_mode = static_shape[core_idx]
        if curr_mode is None:
            curr_mode = dynamic_shape[core_idx]
        rows = ranks[core_idx] * curr_mode
        arr = np.reshape(arr, [rows, -1])
        columns = arr.shape[1]
        if columns is None:
            columns = np.shape(arr)[1]

        u, s, vT = np.linalg.svd(arr, full_matrices=False)
        v = vT.T.T.T  # anti-transpose

        # arr == u @ diag(s) @ vT
        if max_tt_rank[core_idx + 1] == 1:
            ranks[core_idx + 1] = 1
        else:
            ranks[core_idx + 1] = min(max_tt_rank[core_idx + 1], rows, columns)
        u = u[:, 0:ranks[core_idx + 1]]
        s = s[0:ranks[core_idx + 1]]
        v = v[:, 0:ranks[core_idx + 1]]
        core_shape = (ranks[core_idx], curr_mode, ranks[core_idx + 1])
        tt_cores.append(np.reshape(u, core_shape))
        arr = np.matmul(np.diag(s), np.transpose(v))
    last_mode = static_shape[-1]
    if last_mode is None:
        last_mode = dynamic_shape[-1]
    core_shape = (ranks[d - 1], last_mode, ranks[d])
    tt_cores.append(np.reshape(arr, core_shape))
    if not are_tt_ranks_defined:
        ranks = None
    return TensorTrain(tt_cores, static_shape, ranks)


def from_arr_to_tt(mat:np.ndarray, shape:tuple, max_tt_rank:int=10) -> TensorTrain:
    """Converts a given matrix or vector to a TT-matrix."""

    # transpose
    shape = np.array(shape)
    tens = np.reshape(mat, shape.flatten())  # Warning there:
    d = len(shape[0])
    transpose_idx = np.arange(2 * d).reshape(2, d).T.flatten()
    transpose_idx = list(transpose_idx.astype(int))
    while len(transpose_idx) < len(tens.shape):
        transpose_idx.append(len(transpose_idx))
    tens = np.transpose(tens, transpose_idx)

    new_shape = np.prod(shape, axis=0)
    tens = np.reshape(tens, new_shape)
    tt_tens = _from_nd_arr_to_tt(tens, max_tt_rank)

    tt_cores = []
    static_tt_ranks = list(tt_tens.tt_ranks)
    dynamic_tt_ranks = ranks(tt_tens.tt_cores)
    for core_idx in range(d):
        curr_core = tt_tens.tt_cores[core_idx]
        curr_rank = static_tt_ranks[core_idx]
        if curr_rank is None:
            curr_rank = dynamic_tt_ranks[core_idx]
        next_rank = static_tt_ranks[core_idx + 1]
        if next_rank is None:
            next_rank = dynamic_tt_ranks[core_idx + 1]
        curr_core_new_shape = [curr_rank, shape[0, core_idx], shape[1, core_idx], next_rank]

        # patch:
        # if max_tt_rank==2:
        # while np.prod(curr_core_new_shape) < np.prod(curr_core.shape):
        #  curr_core_new_shape.insert(1, 2)
        try:
            curr_core = np.reshape(curr_core, curr_core_new_shape)
        except:
            print("Error")

        tt_cores.append(curr_core)
    return TensorTrain(tt_cores, shape, tt_tens.tt_ranks)


def tt_dot(a: TensorTrain, b: TensorTrain) -> TensorTrain:
    """Multiplies two TT-matrices and returns the TT-matrix of the result."""
    ndims = len(a.tt_cores)
    einsum_str = 'aijb,cjkd->acikbd'
    result_cores = []
    for core_idx in range(ndims):
        a_core = a.tt_cores[core_idx]
        b_core = b.tt_cores[core_idx]

        try:
            curr_res_core = np.einsum(einsum_str, a_core, b_core) #<------------ 2x2 multiplication
        except ValueError:
            print("Einstein Sum error")

        res_left_rank = a.tt_ranks[core_idx] * b.tt_ranks[core_idx]
        res_right_rank = a.tt_ranks[core_idx + 1] * b.tt_ranks[core_idx + 1]
        left_mode = a.tt_shapes[0][core_idx]
        right_mode = b.tt_shapes[1][core_idx]

        core_shape = [res_left_rank, left_mode, right_mode, res_right_rank]
        # while np.prod(core_shape) < np.prod(curr_res_core.shape):
        #  core_shape.insert(1, 2)
        curr_res_core = np.reshape(curr_res_core, core_shape)

        result_cores.append(curr_res_core)
    res_shape = (a.tt_shapes[0], b.tt_shapes[1])
    out_ranks = [a_r * b_r for a_r, b_r in zip(a.tt_ranks, b.tt_ranks)]
    return TensorTrain(result_cores, res_shape, out_ranks)


### Conversion of weights

In [250]:
# SVD decomposition
u, s, vT = np.linalg.svd(weights, full_matrices=False)
V = vT.T.T.T # anti-transpose
s_diag=np.diag(s)
reconstructed_weights=np.dot(np.dot(u , s_diag) , vT)
print(f"Weigths reconstruction after SVD MSE: {np.mean((weights-reconstructed_weights)**2)}")

# from array to rank2 tt format
X_tt=from_arr_to_tt(X, ((2, 2), (1, 1)), max_tt_rank=2)
vt_tt=from_arr_to_tt(vT, ((2, 2), (2, 2)), max_tt_rank=2)
s_tt=from_arr_to_tt(s_diag, ((2, 2), (2, 2)), max_tt_rank=2)
u_tt=from_arr_to_tt(u, ((2, 2), (2, 2)), max_tt_rank=2)

# compute TT weights
w_tt = tt_dot(tt_dot(u_tt, s_tt), vt_tt)
#w_tt=from_arr_to_tt(weights, ((2, 2), (2, 2)), max_tt_rank=2)

# Only for checking
weights_reconstructed=from_tt_to_arr(w_tt, weights.shape)
print(f"Weigths reconstruction after 2x2 TT decomp. MSE: {np.mean((weights-weights_reconstructed)**2)}")

Weigths reconstruction after SVD MSE: 1.8262931624205774e-31
Weigths reconstruction after 2x2 TT decomp. MSE: 0.13211937943719954


## Prediction with TT cores

In [6]:
Y_tt=tt_dot(w_tt ,X_tt)
Y_reconstructed=from_tt_to_arr(Y_tt, Y.shape)

print(Y_reconstructed)
print(f"Prediction MSE: {np.mean((Y-Y_reconstructed)**2)}")

[[ 0.47324387]
 [ 0.67756381]
 [-0.49314283]
 [-1.02924294]]
Prediction MSE: 0.0539663482169657


### TODO: convert unitary TT-cores into 2x2 rotation TT-cores

## Strategy 2: Tiled 2x2 matrices decomposition

Steps:
* Split NxN matrix into 2x2 tiles
* Using eiven values/vectors decomposition to compute 2x2 rotation matrices 2d scaling vectors
* Using those matrices/vectors, to compute phase shift in MZIs/attenuators based on arccos/arcsin

## Utils
* from_arr_to_tiles
* from_tiles_to_arr
* dot_tile_vec

In [437]:
def from_arr_to_tiles(matrix):
    n = matrix.shape[0]
    if n % 2 != 0:
        raise ValueError("Matrix must be of even size")

    block_matrices = []
    for i in range(0, n, 2):
        for j in range(0, n, 2):
            block = matrix[i:i+2, j:j+2]
            block_matrices.append(block)
    return block_matrices

def from_tiles_to_arr(block_matrices):
    n = len(block_matrices)
    size = int(np.sqrt(n))
    if size ** 2 != n:
        raise ValueError("Number of block matrices must be a perfect square")

    C = np.zeros((size*2, size*2))
    for i in range(size):
        for j in range(size):
            C[i*2:i*2+2, j*2:j*2+2] = block_matrices[i*size+j]
    return C

def dot_tile_vec(block_matrices, x):
    """tiled matrix and normal vector multiplication """
    n = len(block_matrices)
    size = int(np.sqrt(n))
    if size ** 2 != n:
        raise ValueError("Number of block matrices must be a perfect square")
    if x.shape[0] != size*2:
        raise ValueError("Input vector must have the same size as the matrix")
    y = x.copy()
    result = np.zeros(y.shape)
    for i in range(size):
        for j in range(size):
            b1=block_matrices[i*size+j]
            b2=y[j*2:j*2+2]
            result[i*2:i*2+2] += np.dot(b1, b2) #<----- 2x2 dot between matrix and vector
    return result

def tiled_prediction(tiled_mat_a, tiled_vec_b, vec_x):
    """tiled computation between: (a.x)+b. 
    With a tiled matrix format and b a tiled vector format. x is a standard vector. """
    n = len(tiled_mat_a)
    size = int(np.sqrt(n))
    y = vec_x.copy()
    result = np.zeros(y.shape)
    for i in range(size):
        for j in range(size):
            b1=tiled_mat_a[i * size + j] # <---- 2x2 matrix
            b2=y[j*2:j*2+2] # <------ 1x2 numpy array
            b3=np.array([tiled_vec_b[i * size + j]]) # <------- 1x2 numpy array
            result[i*2:i*2+2] += np.dot(b1, b2) + np.diag(b3) #<----- The core computing is here
    return result


def from_random_to_rotation(x):
    eigenvalues, eigenvectors = np.linalg.eig(x)
    
    # rotation matrix are not unique. Different runs may produces different rotation. However, the dot product of them is unique.
    rot0=[[eigenvectors[0][0], -eigenvectors[1][0]],
          [eigenvectors[1][0], eigenvectors[0][0]]]
    rot1=[[eigenvectors[0][1], -eigenvectors[1][1]],
          [eigenvectors[1][1], eigenvectors[0][1]]]
    rot=np.dot(rot0,rot1)
    
    make_real=lambda x: x.real
    rot[0][0]=make_real(rot[0][0])
    rot[1][0]=make_real(rot[1][0])
    rot[0][1]=make_real(rot[0][1])
    rot[1][1]=make_real(rot[1][1])
    
    eigenvalues[0]=make_real(eigenvalues[0])
    eigenvalues[1]=make_real(eigenvalues[1])
    
    return rot.astype(np.float32), eigenvalues.astype(np.float32)

def check_and_validate_2x2_rot_mat(x):
    epsilon=1e-2
    if x.shape != (2, 2):
        raise ValueError(f"Matrix {x} is not 2x2")
    det = np.linalg.det(x)
    if abs(det - 1) > epsilon: #The determinant of a rotation matrix is always 1, because the matrix preserves the volume of the space it acts on. 
        raise ValueError(f"Matrix {x} is not a rotation matrix. Determinant: {det}")
    if np.allclose(np.dot(x, x.T),np.eye(2),atol=epsilon): #
        raise ValueError(f"Matrix {x} is not orthogonal. x.xT = {I}")

def rotation_mzi_angle(x):
    #check_and_validate_2x2_rot_mat(x)

    clipped_x=np.clip(x, -1, 1)
    angle = np.arccos(clipped_x[0][0])
    # revert the angle if needed
    if clipped_x[1,0] < 0:
        angle = 2*np.pi - angle 
    return angle

def rotation_2attenuators_angle(x):
    clipped_x=np.clip(x, -1, 1)
    return np.arccos(clipped_x)

## Converts NxN random weights -> 2x2 rotation matrices and 2d scaling vectors

In [438]:
# Tiling weights 4x4 into 4 2x2 matrices
w_t=from_arr_to_tiles(weights)

# From 2x2 matrix to 2x2 rotation matrix
w_rot_t=[]
w_val_t=[]
for w_ti in w_t:
    rot_matrix, eigen_values=from_random_to_rotation(w_ti)
    w_rot_t.append(rot_matrix)
    w_val_t.append(eigen_values)

  return rot.astype(np.float32), eigenvalues.astype(np.float32)


Predicting with 2x2 rotation matrices and 2d vectors

## Prediction with the 2x2 rotation matrices and 2D scaling vec.

In [439]:
# Predicting with 2x2 rotations and 2d scaling vectors 
reconstructed_y=tiled_prediction(w_rot_t, w_val_t, X)
print(f"Prediction MSE: {np.mean((Y-Y_reconstructed)**2)}")

Prediction MSE: 1.3019286424057714e-31


## MZI and attenuators angles of the phase shift (radian)

In [440]:
# MZI angles
for i in range(len(w_rot_t)):
    mzi_theta=rotation_mzi_angle(w_rot_t[i])
    atts_theta=rotation_2attenuators_angle(w_val_t[i])
    print(f"MZI#{i} rotation angle = {str(round(mzi_theta,7))} , associated attenuators: {np.round(atts_theta,7)}")


MZI#0 rotation angle = 1.074408 , associated attenuators: [1.3748369 1.3748369]
MZI#1 rotation angle = 1.6313761 , associated attenuators: [1.3432273 0.7021385]
MZI#2 rotation angle = 1.3048719 , associated attenuators: [0.4750728 0.4750728]
MZI#3 rotation angle = 1.0543903 , associated attenuators: [0.9916114 2.4810073]
