In [1]:
import numpy as np

Здесь я сравниваю свои имплементации алгоритмов вычисления и применения WTT с соответствующими алгоритмами из репозитория.

Мой код:

In [2]:
def wtt_filter(input_vector, d, modes, ranks=None, eps=None, check_correctness=False):
    assert ranks is not None or eps is not None
    
    filters = []
    prod_modes = input_vector.size
    
    if check_correctness:
        assert len(modes) == d
        if ranks is not None:
            assert len(ranks) == d - 1
        if eps is not None:
            assert eps > 0
        assert prod_modes == np.prod(modes)
        
    true_ranks = []
    
    r_prev = 1
    A = input_vector
    for k in range(d):
        A = A.reshape((r_prev * modes[k], prod_modes // modes[k]), order='F')
        if A.shape[0] <= A.shape[1]:
            u, sigmas, vt = np.linalg.svd(A, full_matrices=False)
        else:
            u, sigmas, vt = np.linalg.svd(A, full_matrices=True)
            
        r_given = None if ranks is None else (1 if k == d - 1 else ranks[k])
        r_eps = None if eps is None else max(1, (sigmas >= eps).sum())
        if r_given is not None and r_eps is not None:
            r_cur = min(r_given, r_eps)
        elif r_given is not None:
            r_cur = r_given
        else:
            r_cur = r_eps
        
        filters.append(u)

        if check_correctness:
            assert u.shape[0] == u.shape[1] == r_prev * modes[k]
            if k < d - 1:
                assert r_cur <= r_prev * modes[k]

        if k < d - 1:
            A = (u.T @ A)[:r_cur,:]
            prod_modes //= modes[k]
            true_ranks.append(r_cur)
            r_prev = r_cur
    
    return filters, true_ranks

```input_vector``` --- нампаевский одномерный массив.

```d``` --- целое число.

```modes``` --- iterable из $d$ целых чисел.

```ranks``` --- iterable из $d - 1$ целого числа (необязательно; если есть, используется округление по рангу).

```eps``` --- вещественное число (необязательно; если есть, используется $\varepsilon$-округление по сингулярным числам).

In [3]:
def wtt_apply(input_vector, d, filters, modes, ranks, check_correctness=False):
    prod_modes = input_vector.size
    
    if check_correctness:
        assert len(filters) == d
        assert len(modes) == d
        assert len(ranks) == d - 1
        assert prod_modes == np.prod(modes)
        
    tails = []
    A = input_vector
    r_prev = 1
    for k in range(d):
        A = A.reshape((r_prev * modes[k], prod_modes // modes[k]), order='F')
        A = filters[k].T @ A

        if check_correctness:
            assert A.shape[0] == r_prev * modes[k]
            if k < d - 1:
                assert ranks[k] <= r_prev * modes[k]
                
        if k < d - 1:
            tails.append(A[ranks[k]:,:])
            A = A[:ranks[k],:]
            prod_modes //= modes[k]
            r_prev = ranks[k]
        
    result = A
    for k in range(d - 2, -1, -1):        
        result = np.vstack([
            result.reshape((ranks[k], prod_modes), order='F'),
            tails[k]
        ])
        prod_modes *= modes[k]
    
    return result.flatten(order='F')

In [4]:
def values(func, left, right, n):
    return func(np.linspace(left, right, n))

In [5]:
d = 10
n = 2 ** d
left = 0.
right = 1.

linspace = np.linspace(left, right, n)

sqrt_x_values = values(lambda x: np.sqrt(x), left, right, n)

modes = [2] * d
ranks = [2] * (d - 1)
eps = 1e-8

filters, ranks = wtt_filter(
    sqrt_x_values,
    d,
    modes,
    ranks=ranks,
    check_correctness=True
)
wtt_res = wtt_apply(
    sqrt_x_values,
    d,
    filters,
    modes,
    ranks,
    True
)

Код из репозитория:

In [6]:
def reshape(T, shape):
    return np.reshape(T, shape, order='F')

def computeSVDFilters(
    T, rank=1, eps=None, maxLevel=None, docopy=True, use_sv_truncation=False,
    return_sv=False, return_transformation=False
):
    '''
    Routine for computing WTT filters as left singular matrices
    
    T = np.ndarray
        Input tensor of shape n.
    rank = int / np.ndarray / list / tuple; default: 1.
        WTT rank(s); to be corrected while filters are computing
    eps = float; default: None.
        Truncation constant for singular values
    maxLevel = integer; default: None
        Maximum level of decomposition, all dimensions after this will
        be skipped.
    docopy = boolean; default: True
        Specifies whether make copy of input tensor or not.
    use_sv_truncation = boolean; default: False
        Either use truncation condition for singular values or not;
        If chosen, |s_i| < eps*|s_max| will be set to zero. It leads
        to lower rank estimation.
    return_sv = boolean; default: False
        Specifies whether return singular values associated with each filter or not.
    return_transformation = boolean; default: False
        Specifies whether return result of input transformation or not
    '''
    d = T.ndim
    n = T.shape

    if eps is not None:
        assert isinstance(eps, float), "Non-float eps parameter"
    else:
        eps = np.spacing(1.)
    if maxLevel is not None:
        assert isinstance(maxLevel, int), "Non-integer maxLevel parameter"
        assert 0 < maxLevel <= d, "Invalid maxLevel parameter"
        numberOfFilters = min(maxLevel, d)
    else:
        numberOfFilters = d
    if isinstance(rank, int):
        r = np.ones(numberOfFilters, dtype='i')
        r[1:] = rank
    elif isinstance(rank, (tuple, list)):
        r = np.array(rank)
    elif isinstance(rank, np.ndarray):
        r = rank.flatten()
        r = r.astype('i')
    assert len(r) == numberOfFilters, "There should be %d ranks, not %d" % (
        numberOfFilters, len(r)
    )
    assert r[0] == 1, "The 1st rank is always 1, not %d" % (r[0])
    
    filters = []
    if return_sv:
        singularValues = []
    
    if docopy:
        tmp = T.copy()
    else:
        tmp = T
        
    if return_transformation:
        Y = []

    
    #_relaxCTimeConst = 2.
    #_relaxCTimeConstSize = 2.
    
    for k in range(numberOfFilters):
        nRows = int(round(n[k]*r[k]))
        tmp = reshape(tmp, [nRows, -1])
        nCols = tmp.shape[1]
        fullMatricesFlag = nRows > nCols
        #if _relaxCTimeConst*nRows < nCols:
        
        u, s, vt = np.linalg.svd(tmp, full_matrices=fullMatricesFlag)
        # u, s, vt = scipy.linalg.svd(tmp, full_matrices=fullMatricesFlag)
        filters.append(u.T)
        tmp = np.dot(u.T, tmp)
        # truncate condition: |s| < eps*|s_max|
        nnzSV = np.sum(np.abs(s) >= (eps*np.abs(s).max()))
        if return_sv:
            singularValues.append(s[:nnzSV])
        if k < numberOfFilters-1:
            newRank = min(r[k+1], nRows, nCols)
            if use_sv_truncation:
                newRank = min(newRank, nnzSV)
            r[k+1] = int(newRank)
            if return_transformation:
                Y = [tmp[r[k+1]:, :]] + Y
            tmp = tmp[:r[k+1], :]
    return_values = [filters, r]
    if return_transformation:
        return_values.append(Y)
    if return_sv:
        return_values.append(singularValues)
    return tuple(return_values)

Ремарка по строчке
```nnzSV = np.sum(np.abs(s) >= (eps*np.abs(s).max()))```

Почему именно так, если сингулярные числа и так неотрицательные и располагаются в порядке невозрастания?

In [7]:
def WTT(T, directFilters, ranks, docopy=True):
    '''
    Wavelet Tensor Train transform.
    
    T = np.ndarray
        Tensorized signal to be transformed
    directFilters = list / tuple (of np.ndarrays)
        Filter bank for direct transform
    ranks = np.ndarray / list / tuple
        Ranks of WTT decomposition
    docopy = boolean; default: True
        Specifies whether make copy of input tensor or not.
    '''
    d = T.ndim
    n = T.shape
    
    numberOfFilters = len(directFilters)
    
    assert 0 < numberOfFilters <= d, "Invalid number of filters"
    
    r = np.array(ranks, dtype='i')
    assert len(r) == numberOfFilters, "There should be %d ranks, not %d" % (
        numberOfFilters, len(r)
    )
    assert r[0] == 1, "The 1st rank is always 1, not %d" % (r[0])
    
    if docopy:
        tmp = T.copy()
    else:
        tmp = T
    
    transformedT = []
    for k in range(numberOfFilters):
        nRows = int(round(n[k]*r[k]))
        tmp = reshape(tmp, [nRows, -1])
        nCols = tmp.shape[1]
        tmp = np.dot(directFilters[k], tmp)
        if k < numberOfFilters-1:
            transformedT = [tmp[r[k+1]:, :]] + transformedT
            tmp = tmp[:r[k+1], :]
        else:
            transformedT = [tmp] + transformedT
    return transformedT

def iWTT(transformedT, inverseFilters, ranks, n=None, docopy=True, result_tens=True):
    '''
    Inverse of Wavelet Tensor Train transform.
    
    transformedT = list / tuple (of np.ndarrays)
        Parts of decomposed signal
    inverseFilters = list / tuple (of np.ndarrays)
        Filter bank for inverse transform. Order of filters is direct
        (i.e., the last is related to the 1st mode).
    ranks = np.ndarray / list / tuple
        Ranks of WTT decomposition
    n = np.ndarray / list / tuple; default: None
        Mode sizes of original tensor. If it is not specified, it will be
        estimated from available filters.
    docopy = boolean; default: True
        Specifies whether make copy of input or not.
    result_tens = boolean; default: True
        Either tensorize output or not.
    '''
    numberOfFilters = len(inverseFilters)
    r = np.array(ranks, dtype='i')
    assert len(r) == numberOfFilters, "There should be %d ranks, not %d" % (
        numberOfFilters, len(r)
    )
    assert r[0] == 1, "The 1st rank is always 1, not %d" % (r[0])
    nNoneFlag = n is None
    if not nNoneFlag:
        d = len(n)
    else:
        d = numberOfFilters
        n = [int(round(inverseFilters[d-i-1].shape[0]/r[i])) for i in range(d)]
    assert 0 < numberOfFilters <= d, "Invalid number of filters"
    

    
    
    if docopy:
        T = transformedT[0].copy()
    else:
        T = transformedT[0]
    
    for k in range(numberOfFilters):
        k_reversed = numberOfFilters-k-1
        T = np.dot(inverseFilters[k], T)
        T = reshape(T, [r[k_reversed], -1])
        if k_reversed > 0:
            if r[k_reversed-1]*n[k_reversed-1] > r[k_reversed]:
                T = np.vstack([T, transformedT[k+1]])
    if result_tens:
        if nNoneFlag:
            additionalMode = int(round(T.size / np.prod(n)))
            n.append(additionalMode)
        T = reshape(T, n)
    return T

Попробую в свой iWTT.

In [8]:
def iwtt_apply(input_vector, d, filters, modes, ranks, check_correctness=False):
    prod_modes = input_vector.size
    
    if check_correctness:
        assert len(filters) == d
        assert len(modes) == d
        assert len(ranks) == d - 1
        assert prod_modes == np.prod(modes)
        
    tails = []
    A = input_vector
    r_prev = 1
    for k in range(d):
        A = A.reshape((r_prev * modes[k], prod_modes // modes[k]), order='F')

        if check_correctness:
            assert A.shape[0] == r_prev * modes[k]
            if k < d - 1:
                assert ranks[k] <= r_prev * modes[k]
                
        if k < d - 1:
            tails.append(A[ranks[k]:,:])
            A = A[:ranks[k],:]
            prod_modes //= modes[k]
            r_prev = ranks[k]
        
    #prod_modes == modes[-1] в конце
    result = A
    for k in range(d - 1, -1, -1):
        
        r_prev = 1 if k == 0 else ranks[k - 1]
        if k == d - 1:
            result = (filters[k] @ result).reshape((r_prev, prod_modes), order='F')
        else:
            result = (filters[k] @ np.vstack([
                result,
                tails[k]
            ])).reshape((r_prev, prod_modes), order='F')
        prod_modes *= modes[k]
    
    return result.flatten(order='F')

In [9]:
iwtt_res = iwtt_apply(
    wtt_res,
    d,
    filters,
    modes,
    ranks,
    True
)

In [10]:
np.linalg.norm(sqrt_x_values - iwtt_res)

1.9577947432628667e-14

Сравним, что даёт код из репозитория.

In [11]:
T = reshape(sqrt_x_values, [2] * d)
#надо тензор давать в функции

filters_r, ranks_r = computeSVDFilters(
    T,
    rank = 2
)

In [13]:
print(ranks)
print(ranks_r)
#в реализации из репозитория есть r_0, всегда равный 1

[2, 2, 2, 2, 2, 2, 2, 2, 2]
[1 2 2 2 2 2 2 2 2 2]


In [15]:
[u.shape for u in filters_r]

[(2, 2),
 (4, 4),
 (4, 4),
 (4, 4),
 (4, 4),
 (4, 4),
 (4, 4),
 (4, 4),
 (4, 4),
 (4, 4)]

Как у меня. Проверим, что фильтры также совпадают (в реализации из репозитория они даются транспонированными).

In [17]:
[np.all(u1.T == u2) for u1, u2 in zip(filters, filters_r)]

[True, True, True, True, True, True, True, True, True, True]

In [18]:
wtt_result_r = WTT(T, filters_r, ranks_r)

Тут, как я понял, возвращается список из результатов на разных уровнях, а не единый итоговый вектор.

In [24]:
#wtt_result_r
concatenated = np.concatenate(
    [u.flatten(order='F') for u in wtt_result_r]
)
concatenated

array([ 2.26271567e+01,  3.69570201e-16,  8.21192713e-16, ...,
       -3.71911324e-06, -1.82856285e-04, -3.74363753e-06])

In [25]:
wtt_res

array([ 2.26271567e+01,  3.69570201e-16, -4.29187308e-03, ...,
        1.26439345e-05, -1.82856285e-04, -3.74363753e-06])

Не так...

In [37]:
concatenated = wtt_result_r[0].copy()
for k in range(1, d):
    concatenated = np.vstack([
        reshape(concatenated, (ranks_r[-k], -1)),
        wtt_result_r[k]
    ])
concatenated = concatenated.flatten(order='F')
concatenated

array([ 2.26271567e+01,  3.69570201e-16, -4.29187308e-03, ...,
        1.26439345e-05, -1.82856285e-04, -3.74363753e-06])

In [36]:
np.allclose(wtt_res, concatenated)

True

Надо по-честному всё стакать, видимо.

Теперь попробуем обратное преобразование:

In [40]:
iwtt_result_r = iWTT(
    wtt_result_r,
    [u.T for u in filters_r[::-1]],
    ranks_r
)
iwtt_result_r.shape

(2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1)

In [48]:
np.allclose(iwtt_result_r.flatten(order='F'), iwtt_res)

True

Вновь сошлось.

In [49]:
iwtt_result_r = iWTT(
    wtt_result_r,
    [u.T for u in filters_r[::-1]],
    ranks_r,
    result_tens=False
)
np.allclose(iwtt_result_r, iwtt_res)

True