# divide_conquer() internal benchmark
Our goal is to detect possible bottlenecks in divide_conquer().

In [1]:
import sys
import os

project_root = os.path.abspath(os.path.join(os.getcwd(), "..", ".."))
if project_root not in sys.path:
    sys.path.insert(0, project_root)
project_root

'/Users/airdac/Documents/Uni/Second/TFM/TFM_Adria/code'

In [7]:
import numpy as np
import pandas as pd
from concurrent.futures import ProcessPoolExecutor
import time
import pickle

from d_and_c.methods import DRMethod, get_method_function
from d_and_c.utils import plot_3D_to_2D
from d_and_c.private_d_and_c import perform_procrustes, get_partitions_for_divide_conquer

Define divide_conquer and main_divide_conquer but saving timings.

In [5]:
def _main_divide_conquer_benchmark(args: tuple) -> tuple[np.ndarray, pd.DataFrame]:
    """
    Process a single partition in the divide and conquer algorithm.

    Parameters:
        args (tuple) : tuple of the following arguments:
        method (DRMethod): Dimensionality reduction method to use
        x_filtered (np.ndarray): Data points of the current partition.
        x_sample_1 (np.ndarray): Connecting points sampled from the first partition.
        r (int): Target dimensionality.
        original_sample_1 (np.ndarray): Projection of the anchor points from the first partition.
        plot (dict | None): Plotting arguments or None.
        kwargs (Any): Additional method-specific parameters.

    Returns:
        tuple[np.ndarray, pd.DataFrame]: The aligned projection and a DataFrame of internal timings.
        """
    internal_timings_list = []
    start_time_total_internal = time.perf_counter()

    # Renamed plot to plot_args for clarity
    method, x_filtered, x_sample_1, r, original_sample_1, plot_args, kwargs = args

    start_time_get_method = time.perf_counter()
    projection_method = get_method_function(method)
    internal_timings_list.append(
        {'operation': 'get_method_function', 'duration': time.perf_counter() - start_time_get_method})

    # Combine anchor points and partition data
    start_time_vstack = time.perf_counter()
    x_join_sample_1 = np.vstack((x_sample_1, x_filtered))
    internal_timings_list.append(
        {'operation': 'vstack_data', 'duration': time.perf_counter() - start_time_vstack})

    # Apply projection method
    start_time_projection = time.perf_counter()
    projection = projection_method(
        x_join_sample_1, r, principal_components=False, **kwargs)
    internal_timings_list.append(
        {'operation': 'projection_method', 'duration': time.perf_counter() - start_time_projection})

    # Visualize results
    time_plotting_internal = 0
    if plot_args:
        start_time_plotting_internal = time.perf_counter()
        plot_3D_to_2D(x_join_sample_1, projection,
                      str(method), **plot_args)
        time_plotting_internal = time.perf_counter() - start_time_plotting_internal
    internal_timings_list.append(
        {'operation': 'plotting_partition', 'duration': time_plotting_internal})

    # Extract results and align using Procrustes
    n_sample = x_sample_1.shape[0]
    projection_sample_1 = projection[:n_sample, :]
    projection_partition = projection[n_sample:, :]

    start_time_procrustes = time.perf_counter()
    aligned_projection = perform_procrustes(
        projection_sample_1, original_sample_1, projection_partition, translation=False)
    internal_timings_list.append(
        {'operation': 'perform_procrustes', 'duration': time.perf_counter() - start_time_procrustes})

    internal_timings_list.append({'operation': 'total_internal_time',
                                 'duration': time.perf_counter() - start_time_total_internal})

    internal_timings_df = pd.DataFrame(internal_timings_list)
    return aligned_projection, internal_timings_df

In [None]:
def divide_conquer_benchmark(method: DRMethod,
                                   x: np.ndarray,
                                   l: int,
                                   c_points: int,
                                   r: int,
                                   parallel: bool | None = False,
                                   plot: dict | None = None,
                                   # Modified return type for timings
                                   **kwargs) -> tuple[np.ndarray, pd.DataFrame]:
    """
    Apply divide and conquer to a dimensionality reduction method with benchmarking.

    Parameters:
        method (DRMethod): Dimensionality reduction method to use.
        x (np.ndarray): Input data matrix.
        l (int): Partition size.
        c_points (int): Number of common points.
        r (int): Target dimensionality.
        parallel (bool | None): Whether to run in parallel.
        plot (dict): if not None, plot the embedding. Moreover, if not None, it must be a dict with the arguments for the plot: 'color' (np.ndarray), 'dataset_name' (str).
        kwargs (Any): Additional method-specific parameters.

    Returns:
        projection (np.ndarray): Low-dimensional representation of the data.
        timings_df (pd.DataFrame): Benchmark results of each part of the main function.
    """
    main_timings_list = []  # List to store main timing dictionaries
    # List to store DataFrames from each _main_divide_conquer call
    all_internal_timings_dfs = []

    start_time_total = time.perf_counter()

    projection_method = get_method_function(method)
    n_row_x = x.shape[0]

    # For small datasets, apply the method directly
    if n_row_x <= l:
        current_timings_list = []
        start_time_direct_projection = time.perf_counter()
        result = projection_method(x, r, principal_components=True, **kwargs)
        current_timings_list.append({
            'operation': 'direct_projection',
            'duration': time.perf_counter() - start_time_direct_projection
        })
        current_timings_list.append({
            'operation': 'total_time',
            'duration': time.perf_counter() - start_time_total
        })
        timings_df = pd.DataFrame(current_timings_list)
        print("Main Timings (Direct Projection):")
        print(timings_df.to_string())
        return result, timings_df

    # Create partitions
    start_time_partitioning = time.perf_counter()
    idx_list = get_partitions_for_divide_conquer(n_row_x, l, c_points, r)
    num_partitions = len(idx_list)
    length_1 = len(idx_list[0])
    main_timings_list.append(
        {'operation': 'partitioning', 'duration': time.perf_counter() - start_time_partitioning})

    # Process first partition
    print("Projecting partition 1...")
    start_time_first_partition_projection = time.perf_counter()
    x_1 = x[idx_list[0],]
    projection_1 = projection_method(
        x_1, r, principal_components=False, **kwargs)
    main_timings_list.append({'operation': 'first_partition_projection',
                             'duration': time.perf_counter() - start_time_first_partition_projection})

    time_plotting_first_partition = 0
    if plot:
        start_time_plotting_first_partition = time.perf_counter()
        kwargs_str = [f'{key}_{value}' for key, value in kwargs.items()]
        results_path = os.path.join('d_and_c',
                                    'results',
                                    plot["dataset_name"],
                                    f'n_{n_row_x}',
                                    f'l_{l}',
                                    f'c_{c_points}',
                                    str(method),
                                    *kwargs_str,
                                    "d_and_c_partition_plots"
                                    )
        part1_plot_path = os.path.join(results_path, 'part1')
        os.makedirs(part1_plot_path, exist_ok=True)

        fig_title = f'D&C {method} on {plot["dataset_name"]} with n={n_row_x}, l={l}, c_points={c_points}, {", ".join([f'{key}={value}' for key, value in kwargs.items(
        )])}'
        plot_3D_to_2D(
            x=x_1,
            projection=projection_1,
            method=str(method),
            title=fig_title + '. Part 1',
            color=plot["color"][idx_list[0]],
            path=part1_plot_path,
            empty=True
        )
        time_plotting_first_partition = time.perf_counter(
        ) - start_time_plotting_first_partition
    main_timings_list.append(
        {'operation': 'plotting_first_partition', 'duration': time_plotting_first_partition})

    # Sample connecting points from first partition
    start_time_sampling = time.perf_counter()
    sample_1_idx = np.random.choice(length_1, size=c_points, replace=False)
    x_sample_1 = x_1[sample_1_idx, :]
    projection_sample_1 = projection_1[sample_1_idx, :]
    main_timings_list.append({'operation': 'sampling_connecting_points',
                             'duration': time.perf_counter() - start_time_sampling})

    # Process remaining partitions
    start_time_build_args = time.perf_counter()
    partition_plot_args_list = []
    if not plot:
        partition_plot_args_list = [None]*(num_partitions-1)
    else:
        # Ensure results_path is defined if plot is True (it's defined above in plotting_first_partition block)
        for i, idx in enumerate(idx_list[1:]):
            part_path = os.path.join(results_path, f'part{i + 2}')
            os.makedirs(part_path, exist_ok=True)
            partition_plot_args_list.append({"path": part_path,
                                             "color": np.concatenate((plot["color"][idx_list[0][sample_1_idx]], plot["color"][idx])),
                                             # fig_title defined above
                                             "title": fig_title + f'. Part {i + 2}'
                                             })
    args_list = [
        (
            method,
            x[idx, :],
            x_sample_1,
            r,
            projection_sample_1,
            # Handle case of single partition after first
            partition_plot_args_list[i] if num_partitions > 1 else None,
            kwargs.copy()
        )
        for i, idx in enumerate(idx_list[1:])
    ]
    main_timings_list.append({'operation': 'build_remaining_partitions_args',
                             'duration': time.perf_counter() - start_time_build_args})

    start_time_remaining_partitions = time.perf_counter()
    projections = [None] * (num_partitions - 1)
    if parallel:
        print("Projecting remaining partitions in parallel...")
        with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
            futures = [executor.submit(_main_divide_conquer_benchmark, arg_set)
                       for arg_set in args_list]
            for i, future in enumerate(futures):
                proj_result, internal_df = future.result()
                projections[i] = proj_result
                all_internal_timings_dfs.append(internal_df)
    else:
        for i in range(num_partitions-1):
            print(f"Projecting partition {i + 2}...")
            proj_result, internal_df = _main_divide_conquer_benchmark(args_list[i])
            projections[i] = proj_result
            all_internal_timings_dfs.append(internal_df)

    main_timings_list.append({'operation': 'remaining_partitions_processing',
                             'duration': time.perf_counter() - start_time_remaining_partitions})

    # Combine all projections
    start_time_combine_projections = time.perf_counter()
    all_projections_list = [projection_1] + \
        (projections if num_partitions > 1 else [])
    combined_projection = np.vstack(all_projections_list)
    main_timings_list.append({'operation': 'combine_projections',
                             'duration': time.perf_counter() - start_time_combine_projections})

    # Reorder rows to match original data order
    start_time_reorder = time.perf_counter()
    order_idx = np.concatenate(idx_list)
    order = np.argsort(order_idx)
    combined_projection = combined_projection[order, :]
    main_timings_list.append(
        {'operation': 'reorder_rows', 'duration': time.perf_counter() - start_time_reorder})

    # Center and rotate for maximum variance
    start_time_center_rotate = time.perf_counter()
    combined_projection_mean = np.mean(combined_projection, axis=0)
    centered_projection = combined_projection - combined_projection_mean
    cov_matrix = np.cov(centered_projection, rowvar=False)
    eigenvals, eigenvecs = np.linalg.eigh(cov_matrix)
    idx_sort = np.argsort(eigenvals)[::-1]
    sorted_eigenvecs = eigenvecs[:, idx_sort]
    final_projection = centered_projection @ sorted_eigenvecs
    main_timings_list.append({'operation': 'center_rotate',
                             'duration': time.perf_counter() - start_time_center_rotate})

    main_timings_list.append(
        {'operation': 'total_time', 'duration': time.perf_counter() - start_time_total})

    main_timings_df = pd.DataFrame(main_timings_list)
    print("Main Timings:")
    print(main_timings_df.to_string())

    if all_internal_timings_dfs:
        print("\nInternal Timings per _main_divide_conquer call:")
        for i, internal_df in enumerate(all_internal_timings_dfs):
            print(f"\n  Call {i+1}:")
            print(internal_df.to_string())

    return final_projection, main_timings_df

Load MNIST 5000-points subset.

In [9]:
MNIST_path = os.path.join(project_root, 'd_and_c', 'MNIST_5000.pkl')
with open(MNIST_path, "rb") as f:
    bare_data = pickle.load(f)
    pixels = bare_data["pixels"]
    target = bare_data["target"]

Apply D&C SMACOF to MNIST subset.

In [12]:
l, c_points = 1000, 100
embedding, timings = divide_conquer_benchmark(
    DRMethod.SMACOF, pixels, l=l, c_points=c_points, r=2, verbose=0)

Projecting partition 1...
Projecting partition 2...
Projecting partition 3...
Projecting partition 4...
Projecting partition 5...
Projecting partition 6...
Main Timings:
                         operation   duration
0                     partitioning   0.000210
1       first_partition_projection   7.550072
2         plotting_first_partition   0.000000
3       sampling_connecting_points   0.000219
4  build_remaining_partitions_args   0.006455
5  remaining_partitions_processing  30.329799
6              combine_projections   0.000026
7                     reorder_rows   0.000387
8                    center_rotate   0.000334
9                       total_time  37.887548

Internal Timings per _main_divide_conquer call:

  Call 1:
             operation  duration
0  get_method_function  0.000007
1          vstack_data  0.001079
2    projection_method  6.073384
3   plotting_partition  0.000000
4   perform_procrustes  0.000085
5  total_internal_time  6.074576

  Call 2:
             operation

There is no need to repeat the benchmark many times and aggregate results, because they are clear. The bottleneck of d_and_c is the dimensionality reduction tecnique used.