# Imports

In [16]:
import sys

modules_to_reload = [
    "src.utils.method_loggers",
    "src.utils.method_runners",
    "src.utils.metrics_calculators",
    "src.utils.tensor_handlers",
    "src.utils.trackers",
    "src.utils.video_controller",
    "src.utils.optimal_rank_finders",
]

for module in modules_to_reload:
    if module in sys.modules:
        del sys.modules[module]

%load_ext memory_profiler
%load_ext autoreload
%autoreload 2

import gc
import os

import numpy as np
import plotly.graph_objects as go
import plotly.io as pio
import tensorflow as tf
import torch
from tqdm import tqdm
from scipy.signal import argrelextrema
from functools import partial
from typing import Callable
from torch.nn import Conv2d, ConvTranspose2d
import re


np.random.seed(42)
os.environ["OPENBLAS_NUM_THREADS"] = "8"
os.environ["MKL_NUM_THREADS"] = "8"

tf.random.set_seed(42)

import time
from itertools import product

import tensorly as tl
from dotenv import load_dotenv
from scipy.optimize import OptimizeResult
from scipy.optimize import minimize, differential_evolution, shgo

from src.utils.image_controller import download_image, extract_image_frames
from src.utils.metrics_calculators import IMetricCalculator
from src.utils.optimal_rank_finders import (
    find_optimal_rank_tensor_train_by_compression_ratio,
)

load_dotenv()

The memory_profiler extension is already loaded. To reload it, use:
  %reload_ext memory_profiler
The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


True

In [17]:
tensor_train_args = {"svd": "truncated_svd"}

# get tensor

In [18]:
cache_dir_image = "../.cache/image"

image_urls = [
    "https://i.pinimg.com/564x/04/b2/68/04b26838bdd5e2ba54d0144558685bae.jpg",
    "https://cdnstatic.rg.ru/crop620x412/uploads/images/187/94/47/iStock-644032024.jpg",
    "https://i.sstatic.net/uQggz.png",
]

images = {}

In [19]:
image_paths = [download_image(image_url, cache_dir_image) for image_url in image_urls]

Изображение уже загружено и закешировано: ../.cache/image/04b26838bdd5e2ba54d0144558685bae.jpg
Изображение уже загружено и закешировано: ../.cache/image/iStock-644032024.jpg
Изображение уже загружено и закешировано: ../.cache/image/uQggz.png


In [20]:
for image_index, image_path in enumerate(image_paths):
    image_frames = extract_image_frames(image_path)

    images[f"image-{image_index}"] = {
        "image_url": image_urls[image_index],
        "image_path": image_path,
        "frames": image_frames,
    }

    print(f"image-{image_index} - {image_frames.shape}")

image-0 - (564, 564, 3)
image-1 - (412, 620, 3)
image-2 - (689, 1195, 3)


In [21]:
tensor_name = "image-1"
example_tensor = images[tensor_name]["frames"].copy().astype(np.float32)

# get tensor - layer of NN

In [22]:
# gan = torch.hub.load('facebookresearch/pytorch_GAN_zoo:hub', 'DCGAN', pretrained=True, useGPU=False)
# 
# def SingleLayer(model):
#     for name, child in model.named_children():
#         if isinstance(child, Conv2d):
#             return child
#         elif isinstance(child, ConvTranspose2d):
#             return child
#         else:
#             return SingleLayer(child)
# 
# layer_weights_nn_in_array = SingleLayer(gan.netG).weight.detach().numpy()
# size = layer_weights_nn_in_array.shape
# layer_weights_nn_in_array = layer_weights_nn_in_array.reshape(size[0], size[1], size[2] * size[3])

In [23]:
# example_tensor = layer_weights_nn_in_array
# tensor_name = "random_reshaped_layer_from_NN"

# func for calculate bounds for tensor train factors

In [24]:
def calculate_tt_bounds(shape: tuple | list) -> list:
    """
    Calculate the bounds for TT-ranks of a tensor based on its shape.

    Parameters
    ----------
    shape : tuple[int, ...] | list[int]
        The shape of the tensor as a list or tuple of integers.
        Each element represents the size of the tensor along a corresponding dimension.

    Returns
    -------
    list[tuple[int, int]]
        A list of rank bounds for the Tensor Train (TT) decomposition.
        Each element is a tuple (r_min, r_max), where:
        - r_min is always 1.
        - r_max is the upper bound for the TT-rank at the corresponding position.

    Examples
    -------
    >>> calculate_tt_bounds((3, 4, 5))
    [(1, 1), (1, 3), (1, 12), (1, 1)]
    """
    d = len(shape)
    bounds = [(1, 1)]

    for k in range(1, d):
        prod_left = 1
        for i in range(k):
            prod_left *= shape[i]

        prod_right = 1
        for j in range(k, d):
            prod_right *= shape[j]

        rk_max = min(prod_left, prod_right)
        bounds.append((1, rk_max))

    bounds.append((1, 1))
    return bounds

# func for calculate optimal initial rank of tensor train

In [25]:
## Выходит локальный минимум ;c
# def calculate_tensor_train_initial_rank(bounds: tuple) -> list[int]:
#     return [max(1, round(max_bound / 2)) for min_bound, max_bound in bounds]

def calculate_tensor_train_initial_rank(bounds: tuple | list) -> list[int]:
    return [min_bound for min_bound, max_bound in bounds]

# calculate search area

In [26]:
tt_bounds_example_tensor = calculate_tt_bounds(example_tensor.shape)
example_tensor_initial_rank = calculate_tensor_train_initial_rank(tt_bounds_example_tensor)

print(
    tt_bounds_example_tensor,
    example_tensor_initial_rank,
    sep='\n'
)

[(1, 1), (1, 412), (1, 3), (1, 1)]
[1, 1, 1, 1]


In [27]:
target_compression_ratio_for_graphs = 50.0
frobenius_error_coef_for_graphs = 1.0
compression_ratio_coef_for_graphs = 10.0

rank_ranges = [range(bound[0], bound[1] + 1) for bound in tt_bounds_example_tensor]

tqdm_iterable = product(*rank_ranges)
tqdm_total = np.prod([len(r) for r in rank_ranges])

search_area_example_results = {}

with tl.backend_context("pytorch"):
    example_tensor_cuda = tl.tensor(example_tensor).to("cuda")

    for rank_combination in tqdm(
            iterable=tqdm_iterable, total=tqdm_total, desc="Processing Ranks"
    ):
        test_rank = list(rank_combination)
        internal_indices = test_rank[1:-1]

        try:
            method_result = tl.decomposition.tensor_train(example_tensor_cuda, rank=test_rank, **tensor_train_args)
            tt_factors = method_result
            reconstructed_tensor = tl.tt_to_tensor(tt_factors)

            frobenius_error = (
                    tl.norm(reconstructed_tensor - example_tensor_cuda) / tl.norm(example_tensor_cuda)
            ).item()
            compression_ratio = IMetricCalculator.get_tensors_size(*tt_factors) / IMetricCalculator.get_tensors_size(
                example_tensor_cuda
            )
            compression_penalty = (target_compression_ratio_for_graphs / 100 - compression_ratio) ** 2
            loss_function_result = (
                    frobenius_error_coef_for_graphs * frobenius_error
                    + compression_ratio_coef_for_graphs * compression_penalty
            )

            search_area_example_results[tuple(internal_indices)] = {
                "rank": test_rank,
                "frobenius_error": frobenius_error,
                "compression_ratio": compression_ratio,
                "compression_penalty": compression_penalty,
                "loss_function_result": loss_function_result,
            }
        except Exception as e:
            search_area_example_results[tuple(internal_indices)] = {"rank": test_rank, "error": str(e)}
        finally:
            torch.cuda.synchronize()
            del tt_factors, reconstructed_tensor
            torch.cuda.empty_cache()
            gc.collect()

    torch.cuda.synchronize()
    del example_tensor_cuda
    torch.cuda.empty_cache()
    gc.collect()

Processing Ranks: 100%|██████████| 1236/1236 [05:19<00:00,  3.86it/s]


## Graph of search area

In [28]:
internal_indices = np.array(list(search_area_example_results.keys()))
metrics = ["frobenius_error", "compression_ratio", "compression_penalty", "loss_function_result"]

metric_dict = {tuple(idx): search_area_example_results[idx] for idx in search_area_example_results}
figs = []

for metric in metrics:
    z_values = np.array([search_area_example_results[key].get(metric, np.nan) for key in search_area_example_results])
    x_indices = internal_indices[:, 0]
    y_indices = internal_indices[:, 1]

    local_min_points = []

    for i, (x, y) in enumerate(zip(x_indices, y_indices)):
        z = z_values[i]
        neighbors = [(x - 1, y), (x + 1, y), (x, y - 1), (x, y + 1)]
        is_local_min = all(
            (neighbor not in metric_dict or metric_dict[neighbor].get(metric, np.inf) >= z)
            for neighbor in neighbors
        )
        if is_local_min:
            local_min_points.append((x, y, z))

    x_min, y_min, z_min = zip(*local_min_points) if local_min_points else ([], [], [])

    fig = go.Figure()
    fig.add_trace(go.Scatter3d(
        x=x_indices,
        y=y_indices,
        z=z_values,
        mode="markers",
        marker={"size": 5, "color": z_values, "colorscale": "Viridis", "opacity": 0.8},
        name=metric
    ))
    fig.add_trace(go.Scatter3d(
        x=x_min,
        y=y_min,
        z=z_min,
        mode="markers+text",
        marker={"size": 6, "color": "red", "symbol": "diamond"},
        text=[f"min: {val:.6f}" for val in z_min],
        textposition="top center",
        name="Local Minima"
    ))
    fig.update_layout(
        title=f"Search area for example tensor {tensor_name} of {metric.replace('_', ' ').title()}",
        height=800,
        scene={
            "xaxis_title": "Rank Index 2",
            "yaxis_title": "Rank Index 3",
            "zaxis_title": metric.replace("_", " ").title(),
            "yaxis": {"tickmode": "array", "tickvals": list(set(y_indices.astype(int)))}
        },
        margin={"l": 0, "r": 0, "t": 40, "b": 0},
        template="plotly_white",
        showlegend=False,
    )

internal_indices = np.array(list(search_area_example_results.keys()))
metrics = ["frobenius_error", "compression_ratio", "compression_penalty", "loss_function_result"]

metric_dict = {tuple(idx): search_area_example_results[idx] for idx in search_area_example_results}

figs = []

for metric in metrics:
    z_values = np.array([search_area_example_results[key].get(metric, np.nan) for key in search_area_example_results])
    x_indices = internal_indices[:, 0]
    y_indices = internal_indices[:, 1]

    local_min_points = []

    for i, (x, y) in enumerate(zip(x_indices, y_indices)):
        z = z_values[i]

        neighbors = [
            (x - 1, y), (x + 1, y),
            (x, y - 1), (x, y + 1)
        ]

        is_local_min = all(
            (neighbor not in metric_dict or metric_dict[neighbor].get(metric, np.inf) >= z)
            for neighbor in neighbors
        )

        if is_local_min:
            local_min_points.append((x, y, z))

    x_min, y_min, z_min = zip(*local_min_points) if local_min_points else ([], [], [])

    fig = go.Figure()

    fig.add_trace(go.Scatter3d(
        x=x_indices,
        y=y_indices,
        z=z_values,
        mode="markers",
        marker={"size": 5, "color": z_values, "colorscale": "Viridis", "opacity": 0.8},
        name=metric
    ))

    fig.add_trace(go.Scatter3d(
        x=x_min,
        y=y_min,
        z=z_min,
        mode="markers+text",
        marker={"size": 6, "color": "red", "symbol": "diamond"},
        text=[f"min: {val:.6f}" for val in z_min],
        textposition="top center",
        name="Local Minima"
    ))

    fig.update_layout(
        title=f"Search area for example tensor {tensor_name} of {metric.replace('_', ' ').title()}",
        height=800,
        scene={
            "xaxis_title": "Rank Index 2",
            "yaxis_title": "Rank Index 3",
            "zaxis_title": metric.replace("_", " ").title(),
            "yaxis": {"tickmode": "array", "tickvals": list(set(y_indices.astype(int)))}
        },
        margin={"l": 0, "r": 0, "t": 40, "b": 0},
        template="plotly_white",
        showlegend=False,
    )

    figs.append(fig)

html_str = ""
for fig in figs:
    html_str += go.Figure(fig).to_html(full_html=False, include_plotlyjs=False)

html_file = f"""
<!DOCTYPE html>
<html>
<head>
    <script src="https://cdn.plot.ly/plotly-latest.min.js"></script>
</head>
<body>
<h1>Search area by some metrics</h1>
{html_str}
</body>
</html>
"""

output_path = f"../.cache/data_analyze/optimization_algs_for_tensor_train_search_area_{tensor_name}.html"
with open(output_path, "w", encoding="utf-8") as f:  # noqa: PTH123
    f.write(html_file)

# Optimization algs test

## optimize with custom algorithm

In [29]:
start_time = time.perf_counter()
best_rank, compression_ratio, frobenius_error, find_rank_logs = find_optimal_rank_tensor_train_by_compression_ratio(
    tensor=example_tensor,
    target_compression_ratio=50.0,
    initial_rank_arg=example_tensor_initial_rank,
    tensor_train_args=tensor_train_args,
    search_strategy="custom",
)
elapsed_time = time.perf_counter() - start_time

Optimal rank search process for TensorTrain:
step | rank | compression ratio (%) | frobenius error (%)
1 | [1, 2, 1, 1] | 0.269731 % | 29.621759 %
2 | [1, 3, 1, 1] | 0.404400 % | 27.258003 %
3 | [1, 3, 2, 1] | 0.647510 % | 25.136411 %
4 | [1, 4, 2, 1] | 0.863086 % | 22.779040 %
5 | [1, 5, 2, 1] | 1.078662 % | 21.445341 %
6 | [1, 6, 2, 1] | 1.294237 % | 20.110290 %
7 | [1, 7, 2, 1] | 1.509813 % | 18.870835 %
8 | [1, 8, 2, 1] | 1.725389 % | 17.982182 %
9 | [1, 9, 2, 1] | 1.940965 % | 17.207886 %
10 | [1, 10, 2, 1] | 2.156540 % | 16.597034 %
11 | [1, 11, 2, 1] | 2.372116 % | 16.011199 %
12 | [1, 12, 2, 1] | 2.587692 % | 15.534294 %
13 | [1, 13, 2, 1] | 2.803268 % | 15.124343 %
14 | [1, 14, 2, 1] | 3.018843 % | 14.732207 %
15 | [1, 15, 2, 1] | 3.234419 % | 14.346740 %
16 | [1, 16, 2, 1] | 3.449995 % | 14.027411 %
17 | [1, 17, 2, 1] | 3.665571 % | 13.729918 %
18 | [1, 18, 2, 1] | 3.881146 % | 13.445653 %
19 | [1, 19, 2, 1] | 4.096722 % | 13.194573 %
20 | [1, 20, 2, 1] | 4.312298 % | 12.9502

In [30]:
print(
    f"Tensor shape = {list(example_tensor.shape)}",
    f"Best Rank = {best_rank}",
    f"Frobenius Error = {frobenius_error:.6f}%",
    f"Compression Ratio = {compression_ratio:.6f}%",
    f"Elapsed Time = {elapsed_time:.6f} seconds",
    sep="\n",
)

# Tensor shape = [412, 620, 3]
# Best Rank = [1, 169, 3, 1]
# Frobenius Error = 3.607338%
# Compression Ratio = 50.106613%
# Elapsed Time = 46.066486 seconds

Tensor shape = [412, 620, 3]
Best Rank = [1, 169, 3, 1]
Frobenius Error = 3.607338%
Compression Ratio = 50.106613%
Elapsed Time = 47.735504 seconds


### calculate metrics

In [80]:
target_compression_ratio_for_graphs_percent = 50.0
frobenius_error_coef_for_graphs = 1.0
compression_ratio_coef_for_graphs = 10.0

(
    custom_alg_compression_ratios,
    custom_alg_frobenius_errors,
    custom_alg_compression_penalties,
    custom_alg_loss_function_results,
) = [], [], [], []
for element in find_rank_logs:
    compression_ratio = element["compression_ratio"] / 100.0
    frobenius_error = element["frobenius_error"] / 100.0
    target_compression_ratio_for_graphs = target_compression_ratio_for_graphs_percent / 100.0

    custom_alg_compression_ratios.append(compression_ratio)
    custom_alg_frobenius_errors.append(frobenius_error)

    compression_penalty = (target_compression_ratio_for_graphs - compression_ratio) ** 2
    loss_function_result = (
            frobenius_error_coef_for_graphs * frobenius_error + compression_ratio_coef_for_graphs * compression_penalty
    )

    custom_alg_compression_penalties.append(compression_penalty)
    custom_alg_loss_function_results.append(loss_function_result)

### Graph of metrics path in search area

In [81]:
figs = []

for metric, metric_data in zip(
        metrics,
        [
            custom_alg_frobenius_errors,
            custom_alg_compression_ratios,
            custom_alg_compression_penalties,
            custom_alg_loss_function_results,
        ],
        strict=False,
):
    z_values = np.array([search_area_example_results[key].get(metric, np.nan) for key in search_area_example_results])
    x_indices = internal_indices[:, 0]
    y_indices = internal_indices[:, 1]

    # Поиск локальных минимумов
    local_min_points = []

    metric_dict = {tuple(idx): search_area_example_results[idx] for idx in search_area_example_results}

    for i, (x, y) in enumerate(zip(x_indices, y_indices)):
        z = z_values[i]

        neighbors = [
            (x - 1, y), (x + 1, y),  # По оси X
            (x, y - 1), (x, y + 1)  # По оси Y
        ]

        is_local_min = all(
            (neighbor not in metric_dict or metric_dict[neighbor].get(metric, np.inf) >= z)
            for neighbor in neighbors
        )

        if is_local_min:
            local_min_points.append((x, y, z))

    x_min, y_min, z_min = zip(*local_min_points) if local_min_points else ([], [], [])

    fig = go.Figure()

    # Основные точки
    fig.add_trace(go.Scatter3d(
        x=x_indices,
        y=y_indices,
        z=z_values,
        mode="markers",
        marker={"size": 5, "color": z_values, "colorscale": "Viridis", "opacity": 0.8},
    ))

    path_x = []
    path_y = []
    path_z = []

    for i, log in enumerate(find_rank_logs):
        rank = log["rank"]
        if metric == "frobenius_error":
            z_value = custom_alg_frobenius_errors[i]
        elif metric == "compression_ratio":
            z_value = custom_alg_compression_ratios[i]
        elif metric == "compression_penalty":
            z_value = custom_alg_compression_penalties[i]
        elif metric == "loss_function_result":
            z_value = custom_alg_loss_function_results[i]

        path_x.append(rank[1])
        path_y.append(rank[2])
        path_z.append(z_value)

        fig.add_trace(
            go.Scatter3d(
                x=[rank[1]],
                y=[rank[2]],
                z=[z_value],
                mode="markers",
                marker={
                    "size": 10 if i == 0 or i == len(find_rank_logs) - 1 else 5,
                    "color": "yellow" if i == 0 or i == len(find_rank_logs) - 1 else "red",
                    "opacity": 0.8,
                },
            )
        )

    # Добавляем локальные минимумы (выделенные точки)
    fig.add_trace(go.Scatter3d(
        x=x_min,
        y=y_min,
        z=z_min,
        mode="markers+text",
        marker={"size": 8, "color": "blue", "symbol": "diamond"},
        text=[f"min: {val:.6f}" for val in z_min],
        textposition="top center",
    ))

    fig.add_trace(
        go.Scatter3d(
            x=path_x,
            y=path_y,
            z=path_z,
            mode="lines+markers",
            marker={"size": 5, "color": "red", "opacity": 0.8},
            line={"color": "red", "width": 3},
        )
    )

    fig.update_layout(
        title=f"Search area for example tensor {tensor_name} of {metric.replace('_', ' ').title()} with custom alg path",
        height=800,
        scene={
            "xaxis_title": "Rank Index 2",
            "yaxis_title": "Rank Index 3",
            "zaxis_title": metric.replace("_", " ").title(),
            "yaxis": {"tickmode": "array", "tickvals": list(set(y_indices.astype(int)))},
        },
        margin={"l": 0, "r": 0, "t": 40, "b": 0},
        template="plotly_white",
        showlegend=False,
    )

    figs.append(fig)

html_str = ""
for fig in figs:
    html_str += go.Figure(fig).to_html(full_html=False, include_plotlyjs=False)

html_file = f"""
<!DOCTYPE html>
<html>
<head>
    <script src="https://cdn.plot.ly/plotly-latest.min.js"></script>
</head>
<body>
<h1>Search area by some metrics</h1>
{html_str}
</body>
</html>
"""

output_path = f"../.cache/data_analyze/optimization_algs_for_tensor_train_search_area_{tensor_name}_with_custom_alg.html"
with open(output_path, "w", encoding="utf-8") as f:  # noqa: PTH123
    f.write(html_file)

## scipy algs

### args for algs

In [82]:
print(
    f"Tensor shape: {example_tensor.shape}",
    f"Initial rank: {example_tensor_initial_rank}",
    f"TT args: {tensor_train_args}",
    sep='\n',
)

frobenius_error_coef_algs = 1.0
compression_ratio_coef_algs = 10.0

target_compression_ratio_algs = 50.0

Tensor shape: (689, 1195, 3)
Initial rank: [1, 1, 1, 1]
TT args: {'svd': 'truncated_svd'}


In [83]:
scipy_algs_results = {}

### Funcs for check scipy algs

#### loss function

In [84]:
def loss_function(
        rank: list,
        tensor: np.ndarray,
        target_compression_ratio: float,
        tensor_train_args: dict[str, str],
        frobenius_error_coef: float = 1.0,
        compression_ratio_coef: float = 10.0,
):
    try:
        with tl.backend_context("pytorch"):
            tensor_cuda = tl.tensor(tensor).to("cuda")
            
            tt_factors = tl.decomposition.tensor_train(tensor_cuda, rank=rank, **tensor_train_args)
            reconstructed_tensor = tl.tt_to_tensor(tt_factors)
    
            frobenius_error = (tl.norm(reconstructed_tensor - tensor_cuda) / tl.norm(tensor_cuda)).item()
            compression_ratio = IMetricCalculator.get_tensors_size(*tt_factors) / IMetricCalculator.get_tensors_size(tensor_cuda)
    
            target_compression_ratio /= 100
    
            compression_penalty = (target_compression_ratio - compression_ratio) ** 2
    
            # compression_penalty = target_compression_ratio - compression_ratio
            #
            # if compression_ratio > 1.0 or compression_ratio < 0.0 or compression_penalty < 0.0 or compression_penalty > 1.0:
            #     compression_penalty = float("inf")
    
        return frobenius_error_coef * frobenius_error + compression_ratio_coef * compression_penalty

    except Exception as e:
        print(e)
        return float("inf")
    finally:
        torch.cuda.synchronize()
        del tensor, tensor_cuda, tt_factors, reconstructed_tensor, compression_ratio, compression_penalty
        torch.cuda.empty_cache()
        gc.collect()

#### local optimization

In [85]:
def local_optimize_rank(
        tensor: np.ndarray,
        target_compression_ratio: float,
        tensor_train_args: dict[str, str],
        initial_rank: list[int],
        frobenius_error_coef: float = 1.0,
        compression_ratio_coef: float = 10.0,
        optimization_method: str = "nelder-mead",
        jac: str | None = None,
        hess: str | None = None,
):
    def loss_wrapper(free_rank: list):
        full_rank = [1] + list(np.clip(np.round(free_rank).astype(int), 1, None)) + [1]  # noqa: RUF005
        return loss_function(
            rank=full_rank,
            tensor=tensor,
            target_compression_ratio=target_compression_ratio,
            tensor_train_args=tensor_train_args,
            frobenius_error_coef=frobenius_error_coef,
            compression_ratio_coef=compression_ratio_coef,
        )

    def calculate_tt_bounds(tensor_shape: tuple | list) -> list:
        """
        Calculates the bounds for TT-ranks of a tensor based on its shape.

        Parameters
        ----------
        tensor_shape : tuple or list
            List or tuple of tensor dimensions. Each element represents the size of the tensor along that dimension.

        Returns
        -------
        list
            List of rank bounds in the format [(1, 1), (1, r1_max), ..., (1, 1)].

        """
        d = len(tensor_shape)
        bounds = [(1, 1)]

        for k in range(1, d):
            prod_left = 1
            for i in range(k):
                prod_left *= tensor_shape[i]

            prod_right = 1
            for j in range(k, d):
                prod_right *= tensor_shape[j]

            rk_max = min(prod_left, prod_right)
            bounds.append((1, rk_max))

        bounds.append((1, 1))
        return bounds

    def calculate_metrics(
            tensor: np.ndarray,
            rank: list,
            tensor_train_args: dict[str, str],
            target_compression_ratio_percent: float = 50.0,
            frobenius_error_coef: float = 1.0,
            compression_ratio_coef: float = 10.0,
    ):

        with tl.backend_context("pytorch"):
            tensor_cuda = tl.tensor(tensor).to("cuda")
            
            tt_factors = tl.decomposition.tensor_train(tensor_cuda, rank=rank, **tensor_train_args)
            reconstructed_tensor = tl.tt_to_tensor(tt_factors)
    
            target_compression_ratio_percent /= 100
    
            frobenius_error = (tl.norm(reconstructed_tensor - tensor_cuda) / tl.norm(tensor_cuda)).item()
            compression_ratio = IMetricCalculator.get_tensors_size(*tt_factors) / IMetricCalculator.get_tensors_size(tensor_cuda)
            compression_penalty = (target_compression_ratio_percent - compression_ratio) ** 2
            loss = frobenius_error_coef * frobenius_error + compression_ratio_coef * compression_penalty
    
            metrics = {
                "frobenius_error": frobenius_error,
                "compression_ratio": compression_ratio,
                "compression_penalty": compression_penalty,
                "loss": loss,
            }
        
        torch.cuda.synchronize()
        del tensor_cuda, tensor, tt_factors, reconstructed_tensor
        torch.cuda.empty_cache()
        gc.collect()
        
        return metrics

    class OptimizationLogger:
        def __init__(
                self,
                tensor: np.ndarray,
                tensor_train_args: dict[str, str],
                target_compression_ratio: float = 50.0,
                frobenius_error_coef: float = 1.0,
                compression_ratio_coef: float = 10.0,
        ):
            self.logs = []
            self.current_iteration = -1

            self.tensor = tensor
            self.tensor_train_args = tensor_train_args
            self.target_compression_ratio = target_compression_ratio
            self.frobenius_error_coef = frobenius_error_coef
            self.compression_ratio_coef = compression_ratio_coef

        def calculate_metrics(
                self,
                rank: list,
        ) -> dict[str, float]:
            return calculate_metrics(
                tensor=self.tensor,
                rank=rank,
                tensor_train_args=self.tensor_train_args,
                target_compression_ratio_percent=self.target_compression_ratio,
                frobenius_error_coef=self.frobenius_error_coef,
                compression_ratio_coef=self.compression_ratio_coef,
            )

        def callback(self, xk):
            self.current_iteration += 1

            rank = [1] + list(np.round(xk).astype(int)) + [1]  # noqa: RUF005
            metrics = self.calculate_metrics(rank=rank)

            self.logs.append(
                {
                    "step": self.current_iteration,
                    "raw_xk": xk,
                    "rank": rank,
                    "metrics": metrics,
                }
            )
            print(f"\n=== Iteration {self.current_iteration} complete ===", f"New rank estimate: {rank}\n", sep="\n")

    optimization_logger = OptimizationLogger(
        tensor=tensor,
        tensor_train_args=tensor_train_args,
        target_compression_ratio=target_compression_ratio,
        frobenius_error_coef=frobenius_error_coef,
        compression_ratio_coef=compression_ratio_coef,
    )

    free_rank = initial_rank[1:-1]

    # params
    is_bounds_variable_usable = [
        "nelder-mead",
        "l-bfgs-b",
        "tnc",
        "slsqp",
        "powell",
        "trust-constr",
        "cobyla",
        "cobyqa",
    ]

    is_adaptive_variable_usable = ["nelder-mead"]

    is_jac_variable_usable = [
        "cg",
        "bfgs",
        "newton-cg",
        "l-bfgs-b",
        "tnc",
        "slsqp",
        "trust-ncg",
        "trust-krylov",
        "trust-exact",
        "trust-constr",
    ]

    is_hess_variable_usable = ["newton-cg", "dogleg", "trust-ncg", "trust-krylov", "trust-exact", " trust-constr"]

    is_callback_variable_not_usable = ["tnc", "slsqp", "cobyla"]

    free_bounds = calculate_tt_bounds(tensor.shape)[1:-1] if optimization_method in is_bounds_variable_usable else None

    callback_param = (
        optimization_logger.callback if optimization_method not in is_callback_variable_not_usable else None
    )

    adaptive = optimization_method.lower() in is_adaptive_variable_usable

    jac = jac if optimization_method.lower() in is_jac_variable_usable else None
    jac = None

    hess = hess if optimization_method.lower() in is_hess_variable_usable else None

    minimize_kwargs = {
        "fun": loss_wrapper,
        "x0": free_rank,
        "method": optimization_method,
        "jac": jac,
        "hess": hess,
        "bounds": free_bounds,
        "callback": callback_param,
        "options": {
            "disp": True,
            # "maxiter": 1000,
        },
    }

    if adaptive:
        minimize_kwargs["options"]["adaptive"] = adaptive

    if jac:
        minimize_kwargs["jac"] = jac

    if hess:
        minimize_kwargs["hess"] = hess

    # params

    result = minimize(**minimize_kwargs)

    optimal_rank = [1] + list(np.clip(np.round(result.x).astype(int), 1, None)) + [1]  # noqa: RUF005
    final_loss = result.fun

    return optimal_rank, final_loss, result, optimization_logger.logs

#### global optimization

In [86]:
def loss_wrapper(
        free_rank: list,
        tensor: np.ndarray,
        target_compression_ratio: float,
        frobenius_error_coef: float,
        compression_ratio_coef: float,
        tensor_train_args: dict[str, str],
) -> float:
    full_rank = [1] + list(np.clip(np.round(free_rank).astype(int), 1, None)) + [1]  # noqa: RUF005
    try:
        loss = loss_function(
            rank=full_rank,
            tensor=tensor,
            target_compression_ratio=target_compression_ratio,
            tensor_train_args=tensor_train_args,
            frobenius_error_coef=frobenius_error_coef,
            compression_ratio_coef=compression_ratio_coef,
        )
    except Exception as e:
        print(e)
        loss = float("inf")
    finally:
        return loss

In [87]:
def global_optimize_rank(
        tensor: np.ndarray,
        target_compression_ratio: float,
        tensor_train_args: dict[str, str],
        initial_rank: list[int],
        frobenius_error_coef: float = 1.0,
        compression_ratio_coef: float = 10.0,
        optimization_method: str = "differential_evolution",
        loss_function_fixed: Callable | None = None,
):
    def calculate_tt_bounds(tensor_shape: tuple | list) -> list:
        """
        Calculates the bounds for TT-ranks of a tensor based on its shape.

        Parameters
        ----------
        tensor_shape : tuple or list
            List or tuple of tensor dimensions. Each element represents the size of the tensor along that dimension.

        Returns
        -------
        list
            List of rank bounds in the format [(1, 1), (1, r1_max), ..., (1, 1)].

        """
        d = len(tensor_shape)
        bounds = [(1, 1)]

        for k in range(1, d):
            prod_left = 1
            for i in range(k):
                prod_left *= tensor_shape[i]

            prod_right = 1
            for j in range(k, d):
                prod_right *= tensor_shape[j]

            rk_max = min(prod_left, prod_right)
            bounds.append((1, rk_max))

        bounds.append((1, 1))
        return bounds

    def calculate_metrics(
            tensor: np.ndarray,
            rank: list,
            tensor_train_args: dict[str, str],
            target_compression_ratio_percent: float = 50.0,
            frobenius_error_coef: float = 1.0,
            compression_ratio_coef: float = 10.0,
    ):

        with tl.backend_context("pytorch"):
            tensor_cuda = tl.tensor(tensor).to("cuda")

            tt_factors = tl.decomposition.tensor_train(tensor_cuda, rank=rank, **tensor_train_args)
            reconstructed_tensor = tl.tt_to_tensor(tt_factors)
    
            target_compression_ratio_percent /= 100
    
            frobenius_error = (tl.norm(reconstructed_tensor - tensor_cuda) / tl.norm(tensor_cuda)).item()
            compression_ratio = IMetricCalculator.get_tensors_size(*tt_factors) / IMetricCalculator.get_tensors_size(tensor_cuda)
            compression_penalty = (target_compression_ratio_percent - compression_ratio) ** 2
            loss = frobenius_error_coef * frobenius_error + compression_ratio_coef * compression_penalty
    
            metrics = {
                "frobenius_error": frobenius_error,
                "compression_ratio": compression_ratio,
                "compression_penalty": compression_penalty,
                "loss": loss,
            }

        torch.cuda.synchronize()
        del tensor_cuda, tensor, tt_factors, reconstructed_tensor
        torch.cuda.empty_cache()
        gc.collect()

        return metrics

    class OptimizationLogger:
        def __init__(
                self,
                tensor: np.ndarray,
                tensor_train_args: dict[str, str],
                target_compression_ratio: float = 50.0,
                frobenius_error_coef: float = 1.0,
                compression_ratio_coef: float = 10.0,
        ):
            self.logs = []
            self.current_iteration = -1

            self.tensor = tensor
            self.tensor_train_args = tensor_train_args
            self.target_compression_ratio = target_compression_ratio
            self.frobenius_error_coef = frobenius_error_coef
            self.compression_ratio_coef = compression_ratio_coef

        def calculate_metrics(
                self,
                rank: list,
        ) -> dict[str, float]:
            return calculate_metrics(
                tensor=self.tensor,
                rank=rank,
                tensor_train_args=self.tensor_train_args,
                target_compression_ratio_percent=self.target_compression_ratio,
                frobenius_error_coef=self.frobenius_error_coef,
                compression_ratio_coef=self.compression_ratio_coef,
            )

        def callback(self, intermediate_result: OptimizeResult):
            self.current_iteration += 1

            rank = [1] + list(np.round(intermediate_result.x).astype(int)) + [1]  # noqa: RUF005
            metrics = self.calculate_metrics(rank=rank)

            self.logs.append(
                {
                    "step": self.current_iteration,
                    "rank": rank,
                    "metrics": metrics,
                    "raw_results": intermediate_result,
                }
            )
            print(f"\n=== Iteration {self.current_iteration} complete ===", f"New rank estimate: {rank}\n", sep="\n")

    optimization_logger = OptimizationLogger(
        tensor=tensor,
        tensor_train_args=tensor_train_args,
        target_compression_ratio=target_compression_ratio,
        frobenius_error_coef=frobenius_error_coef,
        compression_ratio_coef=compression_ratio_coef,
    )

    loss_function_fixed = partial(
        loss_wrapper,
        tensor=tensor,
        target_compression_ratio=target_compression_ratio,
        tensor_train_args=tensor_train_args,
        frobenius_error_coef=frobenius_error_coef,
        compression_ratio_coef=compression_ratio_coef,
    )

    # params
    is_bounds_variable_usable = [
        "differential_evolution",
    ]

    is_callback_variable_not_usable = []

    free_bounds = calculate_tt_bounds(tensor.shape)[1:-1] if optimization_method in is_bounds_variable_usable else None

    callback_param = (
        optimization_logger.callback if optimization_method not in is_callback_variable_not_usable else None
    )

    if optimization_method == "differential_evolution":
        optimization_kwargs_differential_evolution = {

            "func": loss_function_fixed,
            "bounds": free_bounds,

            "strategy": "best1bin",
            "maxiter": 50,
            "popsize": 10,
            "tol": 0.01,
            "atol": 0.001,
            "mutation": (0.3, 0.7),
            "recombination": 0.9,
            "init": "latinhypercube",
            "polish": True,

            "workers": 1,
            "updating": "immediate", # {‘immediate’ - when 1 worker, ‘deferred’ - when more than 1 worker}

            "callback": callback_param,
            "disp": True,
        }

        result = differential_evolution(**optimization_kwargs_differential_evolution)

        optimal_rank = [1] + list(np.clip(np.round(result.x).astype(int), 1, None)) + [1]  # noqa: RUF005
        final_loss = result.fun

    return optimal_rank, final_loss, result, optimization_logger.logs

### check algs

https://docs.scipy.org/doc/scipy-1.15.0/tutorial/optimize.html#

#### nelder-mead

In [88]:
method = "nelder-mead"

print(
    f"Testing optimization method: {method}",
    f"Tensor shape: {example_tensor.shape}",
    f"Initial rank: {example_tensor_initial_rank}",
    sep="\n",
)
try:
    # check optimizer method
    start_time = time.perf_counter()
    optimal_rank, final_loss, minimize_result_nelder_mead, iteration_logs_nelder_mead = local_optimize_rank(
        tensor=example_tensor,
        target_compression_ratio=target_compression_ratio_algs,
        tensor_train_args=tensor_train_args,
        initial_rank=example_tensor_initial_rank,
        optimization_method=method,
        frobenius_error_coef=frobenius_error_coef_algs,
        compression_ratio_coef=compression_ratio_coef_algs,
    )
    elapsed_time = time.perf_counter() - start_time

    # check final frobenius error and compression ratio
    tt_factors = tl.decomposition.tensor_train(example_tensor, rank=optimal_rank, **tensor_train_args)
    reconstructed_tensor = tl.tt_to_tensor(tt_factors)

    frobenius_error = 100.0 * (tl.norm(reconstructed_tensor - example_tensor) / tl.norm(example_tensor)).item()
    compression_ratio = (
            100.0 * IMetricCalculator.get_tensors_size(*tt_factors) / IMetricCalculator.get_tensors_size(example_tensor)
    )

    scipy_algs_results[method] = {
        "final_results": minimize_result_nelder_mead,
        "steps_results": iteration_logs_nelder_mead,
    }

    print(
        f"Optimal rank: {optimal_rank}",
        f"Elapsed time: {elapsed_time:.6f} seconds",
        f"Frobenius Error: {frobenius_error:.6f}%",
        f"Compression Ratio: {compression_ratio:.6f}%",
        sep="\n",
        end="\n\n",
    )
except Exception as e:
    print(f"Error with method {method}: {e}")

Testing optimization method: nelder-mead
Tensor shape: (689, 1195, 3)
Initial rank: [1, 1, 1, 1]

=== Iteration 0 complete ===
New rank estimate: [1, 1, 1, 1]


=== Iteration 1 complete ===
New rank estimate: [1, 1, 1, 1]


=== Iteration 2 complete ===
New rank estimate: [1, 1, 1, 1]


=== Iteration 3 complete ===
New rank estimate: [1, 1, 1, 1]


=== Iteration 4 complete ===
New rank estimate: [1, 1, 1, 1]


=== Iteration 5 complete ===
New rank estimate: [1, 1, 1, 1]


=== Iteration 6 complete ===
New rank estimate: [1, 1, 1, 1]


=== Iteration 7 complete ===
New rank estimate: [1, 1, 1, 1]


=== Iteration 8 complete ===
New rank estimate: [1, 1, 1, 1]


=== Iteration 9 complete ===
New rank estimate: [1, 1, 1, 1]

Optimization terminated successfully.
         Current function value: 2.908761
         Iterations: 10
         Function evaluations: 39
Optimal rank: [1, 1, 1, 1]
Elapsed time: 13.851103 seconds
Frobenius Error: 41.639394%
Compression Ratio: 0.076395%



#### powell

In [89]:
method = "powell"

print(
    f"Testing optimization method: {method}",
    f"Tensor shape: {example_tensor.shape}",
    f"Initial rank: {example_tensor_initial_rank}",
    sep="\n",
)
try:
    # check optimizer method
    start_time = time.perf_counter()
    optimal_rank, final_loss, minimize_result_powell, iteration_logs_powell = local_optimize_rank(
        tensor=example_tensor,
        target_compression_ratio=target_compression_ratio_algs,
        tensor_train_args=tensor_train_args,
        initial_rank=example_tensor_initial_rank,
        optimization_method=method,
        frobenius_error_coef=frobenius_error_coef_algs,
        compression_ratio_coef=compression_ratio_coef_algs,
    )
    elapsed_time = time.perf_counter() - start_time

    # check final frobenius error and compression ratio
    tt_factors = tl.decomposition.tensor_train(example_tensor, rank=optimal_rank, **tensor_train_args)
    reconstructed_tensor = tl.tt_to_tensor(tt_factors)

    frobenius_error = 100.0 * (tl.norm(reconstructed_tensor - example_tensor) / tl.norm(example_tensor)).item()
    compression_ratio = (
            100.0 * IMetricCalculator.get_tensors_size(*tt_factors) / IMetricCalculator.get_tensors_size(example_tensor)
    )

    scipy_algs_results[method] = {
        "final_results": minimize_result_powell,
        "steps_results": iteration_logs_powell,
    }

    print(
        f"Optimal rank: {optimal_rank}",
        f"Elapsed time: {elapsed_time:.6f} seconds",
        f"Frobenius Error: {frobenius_error:.6f}%",
        f"Compression Ratio: {compression_ratio:.6f}%",
        sep="\n",
        end="\n\n",
    )
except Exception as e:
    print(f"Error with method {method}: {e}")

Testing optimization method: powell
Tensor shape: (689, 1195, 3)
Initial rank: [1, 1, 1, 1]

=== Iteration 0 complete ===
New rank estimate: [1, 656, 2, 1]


=== Iteration 1 complete ===
New rank estimate: [1, 401, 2, 1]


=== Iteration 2 complete ===
New rank estimate: [1, 401, 2, 1]

Optimization terminated successfully.
         Current function value: 0.215818
         Iterations: 3
         Function evaluations: 138
Optimal rank: [1, 401, 2, 1]
Elapsed time: 43.084818 seconds
Frobenius Error: 21.581759%
Compression Ratio: 49.985932%



#### SLSQP

In [90]:
method = "slsqp"

print(
    f"Testing optimization method: {method}",
    f"Tensor shape: {example_tensor.shape}",
    f"Initial rank: {example_tensor_initial_rank}",
    sep="\n",
)
try:
    # check optimizer method
    start_time = time.perf_counter()
    optimal_rank, final_loss, minimize_result_slsqp, iteration_logs_slsqp = local_optimize_rank(
        tensor=example_tensor,
        target_compression_ratio=target_compression_ratio_algs,
        tensor_train_args=tensor_train_args,
        initial_rank=example_tensor_initial_rank,
        optimization_method=method,
        frobenius_error_coef=frobenius_error_coef_algs,
        compression_ratio_coef=compression_ratio_coef_algs,
        jac=None,
    )
    elapsed_time = time.perf_counter() - start_time

    # check final frobenius error and compression ratio
    tt_factors = tl.decomposition.tensor_train(example_tensor, rank=optimal_rank, **tensor_train_args)
    reconstructed_tensor = tl.tt_to_tensor(tt_factors)

    frobenius_error = 100.0 * (tl.norm(reconstructed_tensor - example_tensor) / tl.norm(example_tensor)).item()
    compression_ratio = (
            100.0 * IMetricCalculator.get_tensors_size(*tt_factors) / IMetricCalculator.get_tensors_size(example_tensor)
    )

    scipy_algs_results[method] = {
        "final_results": minimize_result_slsqp,
        "steps_results": iteration_logs_slsqp,
    }

    print(
        f"Optimal rank: {optimal_rank}",
        f"Elapsed time: {elapsed_time:.6f} seconds",
        f"Frobenius Error: {frobenius_error:.6f}%",
        f"Compression Ratio: {compression_ratio:.6f}%",
        sep="\n",
        end="\n\n",
    )
except Exception as e:
    print(f"Error with method {method}: {e}")

Testing optimization method: slsqp
Tensor shape: (689, 1195, 3)
Initial rank: [1, 1, 1, 1]
Optimization terminated successfully    (Exit mode 0)
            Current function value: 2.9087606543916573
            Iterations: 1
            Function evaluations: 3
            Gradient evaluations: 1
Optimal rank: [1, 1, 1, 1]
Elapsed time: 0.775754 seconds
Frobenius Error: 41.639394%
Compression Ratio: 0.076395%



#### differential_evolution

In [91]:
method = "differential_evolution"

print(
    f"Testing optimization method: {method}",
    f"Tensor shape: {example_tensor.shape}",
    sep="\n",
)
try:
    # check optimizer method
    start_time = time.perf_counter()
    optimal_rank, final_loss, minimize_result_differential_evolution, iteration_logs_differential_evolution = global_optimize_rank(
        tensor=example_tensor,
        target_compression_ratio=target_compression_ratio_algs,
        tensor_train_args=tensor_train_args,
        initial_rank=example_tensor_initial_rank,
        optimization_method=method,
        frobenius_error_coef=frobenius_error_coef_algs,
        compression_ratio_coef=compression_ratio_coef_algs,
    )
    elapsed_time = time.perf_counter() - start_time

    # check final frobenius error and compression ratio
    with tl.backend_context("pytorch"):
        tensor_cuda = tl.tensor(example_tensor).to("cuda")
        tt_factors = tl.decomposition.tensor_train(tensor_cuda, rank=optimal_rank, **tensor_train_args)
        reconstructed_tensor = tl.tt_to_tensor(tt_factors)
    
        frobenius_error = 100.0 * (tl.norm(reconstructed_tensor - tensor_cuda) / tl.norm(tensor_cuda)).item()
        compression_ratio = (
                100.0 * IMetricCalculator.get_tensors_size(*tt_factors) / IMetricCalculator.get_tensors_size(tensor_cuda)
        )

    scipy_algs_results[method] = {
        "final_results": minimize_result_differential_evolution,
        "steps_results": iteration_logs_differential_evolution,
    }

    print(
        f"Optimal rank: {optimal_rank}",
        f"Elapsed time: {elapsed_time:.6f} seconds",
        f"Frobenius Error: {frobenius_error:.6f}%",
        f"Compression Ratio: {compression_ratio:.6f}%",
        sep="\n",
        end="\n\n",
    )
except Exception as e:
    print(f"Error with method {method}: {e}")

Testing optimization method: differential_evolution
Tensor shape: (689, 1195, 3)
differential_evolution step 1: f(x)= 0.00021813352015890506

=== Iteration 0 complete ===
New rank estimate: [1, 287, 3, 1]

differential_evolution step 2: f(x)= 0.00021813352015890506

=== Iteration 1 complete ===
New rank estimate: [1, 287, 3, 1]

differential_evolution step 3: f(x)= 0.00021813352015890506

=== Iteration 2 complete ===
New rank estimate: [1, 287, 3, 1]

differential_evolution step 4: f(x)= 0.00010292673974682591

=== Iteration 3 complete ===
New rank estimate: [1, 289, 3, 1]

differential_evolution step 5: f(x)= 0.00010292673974682591

=== Iteration 4 complete ===
New rank estimate: [1, 289, 3, 1]

differential_evolution step 6: f(x)= 0.00010292673974682591

=== Iteration 5 complete ===
New rank estimate: [1, 289, 3, 1]

Polishing solution with 'L-BFGS-B'
Optimal rank: [1, 289, 3, 1]
Elapsed time: 42.662653 seconds
Frobenius Error: 0.010288%
Compression Ratio: 50.006579%



#### SHGO

In [92]:
method = "shgo"

print(
    f"Testing optimization method: {method}",
    f"Tensor shape: {example_tensor.shape}",
    f"Initial rank: {example_tensor_initial_rank}",
    sep="\n",
)
try:
    # check optimizer method
    start_time = time.perf_counter()
    optimal_rank, final_loss, minimize_result_shgo, iteration_logs_shgo = global_optimize_rank(
        tensor=example_tensor,
        target_compression_ratio=target_compression_ratio_algs,
        tensor_train_args=tensor_train_args,
        initial_rank=example_tensor_initial_rank,
        optimization_method=method,
        frobenius_error_coef=frobenius_error_coef_algs,
        compression_ratio_coef=compression_ratio_coef_algs,
    )
    elapsed_time = time.perf_counter() - start_time

    # check final frobenius error and compression ratio
    tt_factors = tl.decomposition.tensor_train(example_tensor, rank=optimal_rank, **tensor_train_args)
    reconstructed_tensor = tl.tt_to_tensor(tt_factors)

    frobenius_error = 100.0 * (tl.norm(reconstructed_tensor - example_tensor) / tl.norm(example_tensor)).item()
    compression_ratio = (
            100.0 * IMetricCalculator.get_tensors_size(*tt_factors) / IMetricCalculator.get_tensors_size(example_tensor)
    )

    scipy_algs_results[method] = {
        "final_results": minimize_result_shgo,
        "steps_results": iteration_logs_shgo,
    }

    print(
        f"Optimal rank: {optimal_rank}",
        f"Elapsed time: {elapsed_time:.6f} seconds",
        f"Frobenius Error: {frobenius_error:.6f}%",
        f"Compression Ratio: {compression_ratio:.6f}%",
        sep="\n",
        end="\n\n",
    )
except Exception as e:
    print(f"Error with method {method}: {e}")

Testing optimization method: shgo
Tensor shape: (689, 1195, 3)
Initial rank: [1, 1, 1, 1]
Error with method shgo: cannot access local variable 'optimal_rank' where it is not associated with a value


### Path of metrics in search area

In [93]:
for method_name, method_results in scipy_algs_results.items():

    metric_names = ["frobenius_error", "compression_ratio", "compression_penalty", "loss_function_result"]
    frobenius_error_from_method, compression_ratio_from_method, compression_penalty_from_method, loss_from_method = [], [], [], []
    for method_steps_logs in method_results['steps_results']:
        frobenius_error_from_method.append(method_steps_logs["metrics"]["frobenius_error"])
        compression_ratio_from_method.append(method_steps_logs["metrics"]["compression_ratio"])
        compression_penalty_from_method.append(method_steps_logs["metrics"]["compression_penalty"])
        loss_from_method.append(method_steps_logs["metrics"]["loss"])

    figs = []

    for metric, metric_data in zip(
            metric_names,
            [
                frobenius_error_from_method,
                compression_ratio_from_method,
                compression_penalty_from_method,
                loss_from_method,
            ],
            strict=False,
    ):
        z_values = np.array(
            [search_area_example_results[key].get(metric, np.nan) for key in search_area_example_results])
        x_indices = internal_indices[:, 0]
        y_indices = internal_indices[:, 1]

        # Поиск локальных минимумов
        local_min_points = []

        metric_dict = {tuple(idx): search_area_example_results[idx] for idx in search_area_example_results}

        for i, (x, y) in enumerate(zip(x_indices, y_indices)):
            z = z_values[i]

            neighbors = [
                (x - 1, y), (x + 1, y),  # По оси X
                (x, y - 1), (x, y + 1)  # По оси Y
            ]

            is_local_min = all(
                (neighbor not in metric_dict or metric_dict[neighbor].get(metric, np.inf) >= z)
                for neighbor in neighbors
            )

            if is_local_min:
                local_min_points.append((x, y, z))

        x_min, y_min, z_min = zip(*local_min_points) if local_min_points else ([], [], [])

        fig = go.Figure()

        # Основные точки
        fig.add_trace(go.Scatter3d(
            x=x_indices,
            y=y_indices,
            z=z_values,
            mode="markers",
            marker={"size": 5, "color": z_values, "colorscale": "Viridis", "opacity": 0.8},
        ))

        path_x = []
        path_y = []
        path_z = []

        for i, log in enumerate(method_results['steps_results']):
            rank = log["rank"]
            if metric == "frobenius_error":
                z_value = frobenius_error_from_method[i]
            elif metric == "compression_ratio":
                z_value = compression_ratio_from_method[i]
            elif metric == "compression_penalty":
                z_value = compression_penalty_from_method[i]
            elif metric == "loss_function_result":
                z_value = loss_from_method[i]

            path_x.append(rank[1])
            path_y.append(rank[2])
            path_z.append(z_value)

            fig.add_trace(
                go.Scatter3d(
                    x=[rank[1]],
                    y=[rank[2]],
                    z=[z_value],
                    mode="markers",
                    marker={
                        "size": 10 if i == 0 or i == len(method_results['steps_results']) - 1 else 5,
                        "color": "yellow" if i == 0 or i == len(method_results['steps_results']) - 1 else "red",
                        "opacity": 0.8,
                    },
                )
            )

        # Добавляем локальные минимумы (выделенные точки)
        fig.add_trace(go.Scatter3d(
            x=x_min,
            y=y_min,
            z=z_min,
            mode="markers+text",
            marker={"size": 8, "color": "blue", "symbol": "diamond"},
            text=[f"min: {val:.6f}" for val in z_min],
            textposition="top center",
        ))

        fig.add_trace(
            go.Scatter3d(
                x=path_x,
                y=path_y,
                z=path_z,
                mode="lines+markers",
                marker={"size": 5, "color": "red", "opacity": 0.8},
                line={"color": "red", "width": 3},
            )
        )

        fig.update_layout(
            title=f"Search area for example tensor {tensor_name} of {metric.replace('_', ' ').title()} with {method_name} alg path",
            height=800,
            scene={
                "xaxis_title": "Rank Index 2",
                "yaxis_title": "Rank Index 3",
                "zaxis_title": metric.replace("_", " ").title(),
                "yaxis": {"tickmode": "array", "tickvals": list(set(y_indices.astype(int)))},
            },
            margin={"l": 0, "r": 0, "t": 40, "b": 0},
            template="plotly_white",
            showlegend=False,
        )

        figs.append(fig)

    html_str = ""
    for fig in figs:
        html_str += go.Figure(fig).to_html(full_html=False, include_plotlyjs=False)

    html_file = f"""
    <!DOCTYPE html>
    <html>
    <head>
        <script src="https://cdn.plot.ly/plotly-latest.min.js"></script>
    </head>
    <body>
    <h1>Search area by some metrics</h1>
    {html_str}
    </body>
    </html>
    """

    output_path = f"../.cache/data_analyze/optimization_algs_for_tensor_train_search_area_{tensor_name}_with_{method_name}_alg.html"
    with open(output_path, "w", encoding="utf-8") as f:  # noqa: PTH123
        f.write(html_file)

# Optimization alg for target tensor

## calculation

In [94]:
# from src.utils.eeg_controller import create_eeg_limo_data_tensor
# 
# cache_dir_eeg = "../.cache/eeg"
# 
# target_tensor = create_eeg_limo_data_tensor(cache_dir_eeg=cache_dir_eeg)
# target_initial_rank = calculate_tensor_train_initial_rank(calculate_tt_bounds(target_tensor.shape))

In [95]:
# calculate_tt_bounds(target_tensor.shape)

In [96]:
# method = "differential_evolution"
# 
# scipy_target_tensor_results = {}
# 
# print(
#     f"Testing optimization method: {method}",
#     f"Tensor shape: {target_tensor.shape}",
#     sep="\n",
# )
# try:
#     # check optimizer method
#     start_time = time.perf_counter()
#     optimal_rank, final_loss, minimize_result_differential_evolution, iteration_logs_differential_evolution = global_optimize_rank(
#         tensor=target_tensor,
#         target_compression_ratio=target_compression_ratio_algs,
#         tensor_train_args=tensor_train_args,
#         initial_rank=target_initial_rank,
#         optimization_method=method,
#         frobenius_error_coef=frobenius_error_coef_algs,
#         compression_ratio_coef=compression_ratio_coef_algs,
#     )
#     elapsed_time = time.perf_counter() - start_time
# 
#     scipy_target_tensor_results[method] = {
#         "final_results": minimize_result_differential_evolution,
#         "steps_results": iteration_logs_differential_evolution,
#     }
# except Exception as e:
#     print(f"Error with method {method}: {e}")