**подключение библиотек**   
transforms - для быстрого умнжения.  
- hadamar
- fft

In [37]:
import numpy as np # type: ignore
import sys
import os

from scipy.linalg import qr # type: ignore
from scipy.linalg import svd # type: ignore
import numpy as np # type: ignore
import transforms # type: ignore

import time
import random

print(sys.executable)

/home/taisia/anaconda3/envs/rand_dec/bin/python


**my_matmul**
рукописное умножение для сравнений

In [38]:
def my_matmul(A, B):
  n, m = A.shape
  _, k = B.shape
  C = np.zeros((n, k))
  for i in range(0, n):
    for j in range(0, k):
      for l in range(0, m):
        C[i][j] += A[i][l] * B[l][j]
  return C

In [39]:
def matmul(A, B):
  return A @ B

**transforms_projection**  
быстрое умножение матрицы.  
**mul_mode** влияет на способ умножения:
- fast_hadamar_transform
- fft

In [40]:
def transforms_projection(A, k, mul_mode, p=5):
    m, n = A.shape
    l = k + p
    Y = A.copy()
    if mul_mode == "hadamar":
      for i in range(0, m):
        Y[i, :] = transforms.fast_hadamard_transform(Y[i, :])
    else:
      for i in range(0, m):
        Y[i, :] = transforms.fft(Y[i, :])
    random_cols = np.random.choice(n, l, replace = False)
    Y = Y[:, random_cols]
    return Y


**get_QR**  
получение матрцы Q

In [41]:
def get_QR(A, Y0, q=3):
    Q0, _ = np.linalg.qr(Y0)
    Qj = Q0
    for _ in range(q):
        Y_tilde_j = A.conj().T @ Qj
        Q_tilde_j, _ = np.linalg.qr(Y_tilde_j)

        Yj = A @ Q_tilde_j
        Qj, _ = np.linalg.qr(Yj)

    return Qj

## Далее 3 функции описывают 3 разных подхода умножения матриц в алгоритме 4.4

- **randomized_subspace_iteration_with_hadamar**  
Алгоритм 4.4 из статьи.  
но для получения Y0 = A*Ω и самой случайной матрицы Ω используем transforms_projection with hadamar_mode=True

In [42]:
def randomized_subspace_iteration_with_hadamar(A, k, p=3, q=3):
    
    Y0 = transforms_projection(A, k, "hadamar", p)
    return get_QR(A, Y0, q)


- **randomized_subspace_iteration_with_fft**  
Честный алгоритм 4.4, использует fft умножение

In [43]:
def randomized_subspace_iteration_with_fft(A, k, p, q=3):

    Y0 = transforms_projection(A, k, "fft", p)
    return get_QR(A, Y0, q)


**randomized_subspace_iteration_with_built_in**  
Алгоритм 4.4 с встроенным умножением

In [44]:
def randomized_subspace_iteration_with_built_in(A, k, p, q=3):
    
    _, n = A.shape
    l = k + p
    Omega = np.random.randn(n, l)
    Y0 = A @ Omega

    return get_QR(A, Y0, q)

### 2 функции подсчета loss  
1) использует встроенный mat_mul, честно считает ошибку
2) использует несколько случайных проекций. Возвращает максимальную ошибку.

- **get_loss**  
возвращает нормированную ошибку Q\*QT\*A - A.  
считает через встроенное умножение

In [45]:
def get_loss(A, Q):
  return np.linalg.norm(Q @ (Q.T @ A) - A) / np.linalg.norm(A)

In [46]:
accuracy = 1e-8

- **get_loss_with_random_projections**  
    - get_array_projections.  
    первый шаг - формирование проекций
    - get_loss_with_random_projections.  
    второй шаг - возврат максимальной ошибки

In [47]:
def get_array_projections(A, seed=10):
    _, n = A.shape
    projections = list()
    for _ in range(seed):
        rand_vec = np.random.randn(n)
        projection = A @ rand_vec
        projections.append(projection / np.linalg.norm(projection))
    return projections

In [48]:
def get_loss_with_random_projections(A, projections_arr, Q):
    max_loss = 0
    for projection in projections_arr:
        result = Q @ (Q.T @ projection)
        if np.linalg.norm(result - projection) > max_loss:
            max_loss = np.linalg.norm(result - projection)
    return max_loss
########## !!!!!!!!!!!!!!!!!!!!!!

In [49]:
def check_loss_rand(A, get_Q_func, projections_arr, k, p=3, q=3, max_loss = accuracy):
    Q = get_Q_func(A, k, p, q)
    loss = get_loss_with_random_projections(A, projections_arr, Q)
    return loss < max_loss

In [50]:
def check_loss_k(A, get_Q_func, k, p=3, q=3, max_loss = accuracy):
    Q = get_Q_func(A, k, p, q)
    loss = get_loss(A, Q)
    return loss < max_loss

In [51]:
def get_rank_binary_search(A, left, right, get_Q_func, p=5, q=3, max_loss = accuracy):
  while right - left > 1: 
    k = (right + left) // 2
    flag = check_loss_k(A, get_Q_func, k, p, q, max_loss)
      
    if flag:
      right = k
    else:
      left = k
  return right

In [52]:
def get_rank_binary_search_with_projections(A, left, right, get_Q_func, projections_arr, p=5, q=3, max_loss = accuracy):
  while right - left > 1: 
    k = (right + left) // 2
    flag = check_loss_rand(A, get_Q_func, projections_arr, k, p, q, max_loss)
      
    if flag:
      right = k
    else:
      left = k
  return right

**get_project_rank**  
наш поиск ранга. 
1) ищет первую степень двойки, для которой ранг точно хорошо аппроксимирует 
2) бинарным поиском находим ранг  
проверка идет по check_loss_k.

In [53]:
def get_project_rank(A, get_Q_func, p=5, q=3, debug=False, max_loss=accuracy):
  k = 1
  while True:
    Q = get_Q_func(A, k, p, q)
    loss = get_loss(A, Q)
    if debug:
      print(f'now k = {k}, now loss = {loss}')
      
    if (check_loss_k(A, get_Q_func, k, p, q, max_loss)):
      return get_rank_binary_search(A, k // 2, k, get_Q_func, p, q, max_loss)
    k *= 2


**get_project_rank_with_random_projections**  
такой же поиск ранга, но с случайными проекциями

In [54]:
def get_project_rank_with_random_projections(A, get_Q_func, p=5, q=3, seed=10, debug=False, max_loss=accuracy):
    projections_arr = get_array_projections(A, seed)
    k = 1
    while True:
        Q = get_Q_func(A, k, p, q)
        loss = get_loss(A, Q)
        if debug:
            print(f'now k = {k}, now loss = {loss}')
        if (check_loss_rand(A, get_Q_func, projections_arr, k, p, q, max_loss)):
            return get_rank_binary_search_with_projections(A, k // 2, k, get_Q_func, projections_arr, p, q, max_loss)
        k *= 2

# -------------
# **далее тесты**
# --------------

### Генерация случайной матрицы заданного ранга

In [55]:
def make_random_rank_k_matrix(m, n, k, random_state=None):
    B = np.random.randn(m, k)
    C = np.random.randn(k, n)
    A: np.matrix = B @ C
    return A

In [56]:
k = 45
p = 5
q = 3

In [57]:
m, n = 2**11, 2**11

In [58]:
A = make_random_rank_k_matrix(m, n, k)

### 3 функции запуска и получения времени  
1) get_project_time - наш алгоритм с бин поиском
2) get_project_time_with_random_projections - наш алгоритм с бин поиском и случайными прекциями 
3) get_classic_time - встроенный ранг

In [59]:
def get_project_time(A, iteration):
    time_start = time.time()
    rank = get_project_rank(A, iteration)
    time_finish = time.time()
    return (time_finish - time_start, rank)

In [60]:
def get_project_time_with_random_projections(A, iteration, seed=10):
    time_start = time.time()
    rank = get_project_rank_with_random_projections(A, iteration, seed)
    time_finish = time.time()
    return (time_finish - time_start, rank)

In [61]:
def get_classic_time(A):
    time_start = time.time()
    rank = np.linalg.matrix_rank(A)
    time_finish = time.time()
    return (time_finish - time_start, rank)

**Хотим тестировать:**
1) умножение матрицы. будем передавать используемую функцию
    - randomized_subspace_iteration_**with_hadamar**
    - randomized_subspace_iteration_**with_fft**
    - randomized_subspace_iteration_**with_built_in**
2) проверку ошибки:
    - честную
    - со случайными проекциями.  
на проверке ошибки так же используем то же умножение


In [62]:
from tqdm import tqdm

### hadamar

In [63]:
haramar_time = get_project_time(A, randomized_subspace_iteration_with_hadamar)
haramar_time_with_projections = get_project_time_with_random_projections(A, randomized_subspace_iteration_with_hadamar)

print(f'rank time with hadamar')
print(f'without projections\n{haramar_time}')
print(f'with projections\n{haramar_time_with_projections}')

rank time with hadamar
without projections
(4.0476133823394775, 40)
with projections
(3.2115979194641113, 35)


### fft

In [64]:
fft_time = get_project_time(A, randomized_subspace_iteration_with_fft)
fft_time_witch_projections = get_project_time_with_random_projections(A, randomized_subspace_iteration_with_fft)

print(f'rank time with fft.')
print(f'without projections\n{fft_time}')
print(f'with projections\n{fft_time_witch_projections}')

rank time with fft.
without projections
(8.556162595748901, 40)
with projections
(6.965434789657593, 35)


### built-in умножение

In [65]:
built_in_time = get_project_time(A, randomized_subspace_iteration_with_built_in)
built_in_time_witch_projections = get_project_time_with_random_projections(A, randomized_subspace_iteration_with_built_in)

print(f'rank time with built_in multiplication.')
print(f'without projections\n{built_in_time}')
print(f'with projections\n{built_in_time_witch_projections}')

rank time with built_in multiplication.
without projections
(2.763775587081909, 40)
with projections
(2.244253158569336, 35)


### np.linalg.rank

In [66]:
classic_time = get_classic_time(A)

print(f'rank time with np.linalg')
print(classic_time)

rank time with np.linalg
(4.358630657196045, np.int64(45))


In [67]:
def print_all_scores():
    print(f'rank time with hadamar')
    print(f'without projections\n{haramar_time}')
    print(f'with projections\n{haramar_time_with_projections}')
    print(f'rank time with fft.')
    print(f'without projections\n{fft_time}')
    print(f'with projections\n{fft_time_witch_projections}')
    print(f'rank time with built_in multiplication.')
    print(f'without projections\n{built_in_time}')
    print(f'with projections\n{built_in_time_witch_projections}')
    print(f'rank time with np.linalg')
    print(classic_time)

In [68]:
print_all_scores()

rank time with hadamar
without projections
(4.0476133823394775, 40)
with projections
(3.2115979194641113, 35)
rank time with fft.
without projections
(8.556162595748901, 40)
with projections
(6.965434789657593, 35)
rank time with built_in multiplication.
without projections
(2.763775587081909, 40)
with projections
(2.244253158569336, 35)
rank time with np.linalg
(4.358630657196045, np.int64(45))


In [69]:
# from tqdm import tqdm

# all_time_subspace_rand = 0
# all_time_classic = 0
# for i in tqdm(range(5), desc="running"):
    
#     all_time_classic += get_classic_time(A)[0]
#     all_time_subspace_rand += get_project_time_with_random_projections(A, randomized_subspace_iteration)[0]
    
# print(f"classic rank = {all_time_classic}")
# print (f"our fast rank = {all_time_subspace_rand}")


In [70]:
import pandas as pd
from datetime import datetime

In [71]:
global_results = pd.DataFrame(columns=[
    'timestamp', 'method', 'projections', 'time', 'rank', 'matrix_size', 'true_rank'
])

In [72]:

def save_results(method, use_projections, time_taken, rank, matrix_size, true_rank, 
                 filename='test_results.csv', append_to_global=True):
    """
    Сохраняет результаты теста в CSV файл и (опционально) добавляет в глобальную таблицу
    
    Parameters:
        method: str - используемый метод ('hadamar', 'fft', 'built_in', 'classic')
        use_projections: bool - использовались ли случайные проекции
        time_taken: float - время выполнения
        rank: int - найденный ранг
        matrix_size: tuple - размер матрицы (m, n)
        true_rank: int - истинный ранг матрицы
        filename: str - имя файла для сохранения
        append_to_global: bool - добавлять ли в глобальную таблицу
    """
    result = {
        'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
        'method': method,
        'projections': use_projections,
        'time': time_taken,
        'rank': rank,
        'matrix_size': f"{matrix_size[0]}x{matrix_size[1]}",
        'true_rank': true_rank
    }
    
    # Сохраняем в отдельный файл
    df = pd.DataFrame([result])
    try:
        existing = pd.read_csv(filename)
        df = pd.concat([existing, df], ignore_index=True)
    except FileNotFoundError:
        pass
    
    df.to_csv(filename, index=False)
    
    # Добавляем в глобальную таблицу (если нужно)
    if append_to_global:
        global global_results
        global_results = pd.concat([global_results, df], ignore_index=True)
    
    return df

In [73]:
def run_comparison_test(A, true_rank, num_tests=5, save_to_file=True):
    """
    Запускает сравнение всех методов и сохраняет результаты
    
    Parameters:
        A: np.array - тестовая матрица
        true_rank: int - истинный ранг матрицы
        num_tests: int - количество прогонов для усреднения
        save_to_file: bool - сохранять ли результаты в файл
    """
    methods = {
        'hadamar': randomized_subspace_iteration_with_hadamar,
        'fft': randomized_subspace_iteration_with_fft,
        'built_in': randomized_subspace_iteration_with_built_in,
        'classic': None  # особый случай
    }
    
    results = []
    matrix_size = A.shape
    
    for method_name, method_func in methods.items():
        print(f"\nTesting {method_name} method...")
        
        # Для классического метода особый случай
        if method_name == 'classic':
            times = []
            for _ in tqdm(range(num_tests), desc=f"Testing {method_name}"):
                time_taken, rank = get_classic_time(A)
                times.append(time_taken)
            
            avg_time = np.mean(times)
            results.append((method_name, False, avg_time, rank, matrix_size, true_rank))
            if save_to_file:
                save_results(method_name, False, avg_time, rank, matrix_size, true_rank)
            continue
        
        # Для остальных методов тестируем оба варианта (с проекциями и без)
        for use_proj in [False, True]:
            times = []
            ranks = []
            for _ in tqdm(range(num_tests), desc=f"Testing {method_name} (proj={use_proj})"):
                if use_proj:
                    time_taken, rank = get_project_time_with_random_projections(A, method_func)
                else:
                    time_taken, rank = get_project_time(A, method_func)
                times.append(time_taken)
                ranks.append(rank)
            
            avg_time = np.mean(times)
            avg_rank = int(np.round(np.mean(ranks)))
            results.append((method_name, use_proj, avg_time, avg_rank, matrix_size, true_rank))
            
            if save_to_file:
                save_results(method_name, use_proj, avg_time, avg_rank, matrix_size, true_rank)
    
    print("\n=== Results ===")
    result_df = pd.DataFrame(results, columns=['method', 'projections', 'time', 'rank', 'matrix_size', 'true_rank'])
    print(result_df)
    
    return result_df

In [74]:

results = run_comparison_test(A, k, num_tests=3)


Testing hadamar method...


Testing hadamar (proj=False): 100%|██████████| 3/3 [00:10<00:00,  3.48s/it]
  global_results = pd.concat([global_results, df], ignore_index=True)
Testing hadamar (proj=True): 100%|██████████| 3/3 [00:10<00:00,  3.55s/it]



Testing fft method...


Testing fft (proj=False): 100%|██████████| 3/3 [00:22<00:00,  7.39s/it]
Testing fft (proj=True): 100%|██████████| 3/3 [00:20<00:00,  6.77s/it]



Testing built_in method...


Testing built_in (proj=False): 100%|██████████| 3/3 [00:06<00:00,  2.21s/it]
Testing built_in (proj=True): 100%|██████████| 3/3 [00:07<00:00,  2.47s/it]



Testing classic method...


Testing classic: 100%|██████████| 3/3 [00:13<00:00,  4.44s/it]


=== Results ===
     method  projections      time  rank   matrix_size  true_rank
0   hadamar        False  3.481187    40  (2048, 2048)         45
1   hadamar         True  3.553735    35  (2048, 2048)         45
2       fft        False  7.393593    40  (2048, 2048)         45
3       fft         True  6.768396    35  (2048, 2048)         45
4  built_in        False  2.211542    40  (2048, 2048)         45
5  built_in         True  2.468361    35  (2048, 2048)         45
6   classic        False  4.434572    45  (2048, 2048)         45



