# Ablation study. Classic approach

Ablation for approaches based on [Beam Search with CaleyPy](https://www.kaggle.com/code/fedimser/beam-search-with-cayleypy) notebook by [Dmytro Fedoriaka](https://www.kaggle.com/fedimser) and [Chervov, A. et al. "A Machine Learning Approach That Beats Large Rubik’s Cubes: The CayleyPy Project". arXiv:2502.13266 [cs.LG]. 2025.](https://arxiv.org/abs/2502.13266)

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
import time
import json
from datetime import datetime
import time
import gc
from tqdm import tqdm
from itertools import product
import shap

from sklearn.ensemble import RandomForestRegressor
import torch
from torch.utils.data import DataLoader, TensorDataset, random_split

from cayleypy import PermutationGroups, CayleyGraph, Predictor, prepare_graph

import warnings
warnings.filterwarnings('ignore')

In [None]:
def seed_everything(seed: int = 42):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
DATA_ROOT = Path('../data')
assert DATA_ROOT.exists(), f'Dataset directory not found: {DATA_ROOT!s}'

TEST_PATH = DATA_ROOT / 'test.csv'
SUBMISSION_PATH = Path('submission.csv')  # Kaggle expects this in the working directory
TEST_PATH

In [None]:
test_df = pd.read_csv(TEST_PATH)

# Baseline

In [None]:
def pancake_sort_path(perm: list[int]) -> list[str]:
    """Return a sequence of prefix reversals that sorts `perm` to the identity permutation."""
    arr = list(perm)
    n = len(arr)
    moves: list[str] = []

    for target in range(n, 1, -1):
        desired_value = target - 1
        idx = arr.index(desired_value)

        if idx == target - 1:
            continue  # already in place

        if idx != 0:
            moves.append(f'R{idx + 1}')
            arr[: idx + 1] = reversed(arr[: idx + 1])

        moves.append(f'R{target}')
        arr[:target] = reversed(arr[:target])

    return moves

## Model

In [None]:
# model
class Net(torch.nn.Module):
    pass

## Utils

In [None]:
def train_one_epoch(model, X, y, learning_rate=0.001):
    val_ratio = 0.1
    batch_size = 512 #1024
    dataset = TensorDataset(X, y.float())
    val_size = int(len(dataset) * val_ratio)
    train_size = len(dataset)-val_size
    train_ds, val_ds = random_split(dataset, [train_size, val_size])
    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_ds, batch_size=batch_size)
    
    loss_fn = torch.nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    model.train()
    total_train_loss = 0
    
    for xb, yb in train_loader:
        pred = model(xb)
        loss = loss_fn(pred, yb)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_train_loss += loss.item() * xb.size(0)

    model.eval()
    total_val_loss = 0
    with torch.no_grad():
        for xb, yb in val_loader:
            pred = model(xb)
            loss = loss_fn(pred, yb)
            total_val_loss += loss.item() * xb.size(0)

    avg_train_loss = total_train_loss / train_size
    avg_val_loss = total_val_loss / val_size

    return avg_train_loss, avg_val_loss

def train_n(n, width=1000, length=20, n_dim=128, epochs=30):
    print(f"train for {n} permautations")

    graph=CayleyGraph(PermutationGroups.pancake(n), device="cuda", random_seed=42)
    X, y = graph.random_walks(width=width, length=min(length, 2*n), mode="bfs")

    input_size = graph.definition.state_size
    num_classes = n

    learning_rate = 0.001

    train_losses, val_losses = [], []
    start_time = time.time()

    model = Net(input_size, num_classes, [n_dim]).to(graph.device)

    for i in range(epochs):
        avg_train_loss, avg_val_loss = train_one_epoch(model, X, y)
        train_losses.append(avg_train_loss)
        val_losses.append(avg_val_loss)
        print(f"Epoch {i} | Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f}")
    
    end_time = time.time()
    print(end_time - start_time)

    sns.set_theme(style="whitegrid")
    plt.figure(figsize=(10, 5))
    sns.lineplot(x=range(epochs), y=train_losses, label='Train Loss', marker='o')
    sns.lineplot(x=range(epochs), y=val_losses, label='Val Loss', marker='o')
    plt.title(f'Loss for n={n}')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

    model = model.cpu()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
        
    return model, graph

In [None]:
def find_prefix_length(generator_perm: list[int]) -> int:
    n = len(generator_perm)
    for i in range(1, n):
        if generator_perm[i] != generator_perm[i-1] - 1:
            return i 
    
    return n 
    
def convert_to_rk_format(internal_path: list[int], graph: CayleyGraph) -> list[str]:
    moves = []
    generators = graph.definition.generators
    
    for move_index in internal_path:
        if 0 <= move_index < len(generators):
            generator_perm = generators[move_index]

            k = find_prefix_length(generator_perm)
            moves.append(f'R{k}')
    
    return moves
   
def solve(permutation: list[int], graph: CayleyGraph, model: torch.nn.Module, beam_width: int=10000) -> str:
    start_state = np.array(permutation)
    n = len(permutation) 
    result = graph.beam_search(
        start_state=start_state,
        beam_width=beam_width,
        max_steps=3*n,
        predictor=Predictor(graph, model),
        return_path=True
    )
    
    if result.path_found:
        moves = convert_to_rk_format(result.path, graph)
        return '.'.join(moves)
    else:
        return 'UNSOLVED'

In [None]:
def measure_success(predictor, beam_width, test_start_states, heurestic_path, graph, max_steps_multiplier=2):
    success_count = 0
    sum_length = 0
    diff_length = 0
    optimal_count = 0
    
    for state, hp in tqdm(zip(test_start_states, heurestic_path), total=len(test_start_states), desc=f"Beam {beam_width}"):
        try:
            with torch.no_grad():
                result = graph.beam_search(
                    start_state=state, 
                    beam_width=beam_width, 
                    max_steps=len(state)*max_steps_multiplier, 
                    predictor=predictor,
                    return_path=True
                )
            
            if result.path_found:
                success_count += 1
                path_length = len(result.path)
                sum_length += path_length
                diff_length += path_length - len(hp)
                
                if path_length == len(hp):
                    optimal_count += 1
        except Exception as e:
            print(f"Error in beam search: {e}")
            continue
    
    success_rate = success_count / len(test_start_states) if test_start_states else 0
    avg_length = sum_length / success_count if success_count > 0 else 0
    avg_diff = diff_length / success_count if success_count > 0 else 0
    optimality_rate = optimal_count / success_count if success_count > 0 else 0
    
    return success_rate, avg_length, avg_diff, optimality_rate

In [None]:

def statistical_analysis(df):
    print("\n=== STATISTICAL ANALYSIS ===")
    
    numeric_cols = ['success_rate', 'optimality_rate', 'avg_solution_length', 
                   'width', 'length', 'n_dim', 'epochs', 'beam_width', 'n']
    numeric_df = df[numeric_cols].select_dtypes(include=[np.number])
    
    if not numeric_df.empty:
        correlation_matrix = numeric_df.cr()
        
        plt.figure(figsize=(10, 8))
        sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0, fmt='.3f')
        plt.title('Correlation Matrix')
        plt.tight_layout()
        plt.savefig('correlation_matrix.png', dpi=300, bbox_inches='tight')
        plt.show()
    
    try:
        shap.initjs()
        
        X = df[['width', 'length', 'n_dim', 'epochs', 'beam_width', 'n']]
        y_success = df['success_rate']
        
        model = RandomForestRegressor(n_estimators=100, random_state=42)
        model.fit(X, y_success)
        
        explainer = shap.TreeExplainer(model)
        shap_values = explainer.shap_values(X)
        
        plt.figure(figsize=(10, 8))
        shap.summary_plot(shap_values, X, show=False)
        plt.title('SHAP Summary Plot - Success Rate')
        plt.tight_layout()
        plt.savefig('shap_summary.png', dpi=300, bbox_inches='tight')
        plt.show()
        
    except Exception as e:
        print(f"SHAP analysis failed: {e}")

In [None]:

def save_intermediate_results(results, experiment_id):
    df = pd.DataFrame([{
        **{'experiment_id': r['experiment_id'], 'n': r['n']},
        **r['hyperparameters'],
        **r['metrics']
    } for r in results])
    
    df.to_csv(f'intermediate_results_{experiment_id}.csv', index=False)

def save_final_results(results):
    df = pd.DataFrame([{
        **{'experiment_id': r['experiment_id'], 'n': r['n']},
        **r['hyperparameters'],
        **r['metrics']
    } for r in results])
    
    df.to_csv('final_ablation_results.csv', index=False)
    
    with open('final_ablation_results.json', 'w') as f:
        json.dump(results, f, indent=2)

## Ploting

In [None]:
def plot_comprehensive_analysis(df):
    fig, axes = plt.subplots(2, 3, figsize=(18, 12))
    
    # 1. Success rate vs n
    for bw in df['beam_width'].unique():
        subset = df[df['beam_width'] == bw]
        success_by_n = subset.groupby('n')['success_rate'].mean()
        axes[0,0].plot(success_by_n.index, success_by_n.values, 'o-', label=f'beam={bw}')
    axes[0,0].set_xlabel('n')
    axes[0,0].set_ylabel('Success Rate')
    axes[0,0].set_title('Success Rate vs n')
    axes[0,0].legend()
    
    # 2. Effect of  width on success rate
    for n in df['n'].unique()[:3]:
        subset = df[df['n'] == n]
        axes[0,1].scatter(subset['width'], subset['success_rate'], label=f'n={n}', alpha=0.6)
    axes[0,1].set_xlabel('Training Width')
    axes[0,1].set_ylabel('Success Rate')
    axes[0,1].set_title('Effect of Training Width')
    axes[0,1].legend()
    
    # 3. Effect of  n_dim on производительность
    n_dim_performance = df.groupby('n_dim')['success_rate'].mean()
    axes[0,2].bar(n_dim_performance.index, n_dim_performance.values)
    axes[0,2].set_xlabel('Hidden Dimension')
    axes[0,2].set_ylabel('Average Success Rate')
    axes[0,2].set_title('Effect of Hidden Dimension')
    
    # 4. Heatmap: beam_width vs n
    pivot_success = df.pivot_table(values='success_rate', index='n', columns='beam_width', aggfunc='mean')
    sns.heatmap(pivot_success, annot=True, fmt='.2f', ax=axes[1,0])
    axes[1,0].set_title('Success Rate Heatmap')
    
    # 5. Optimality rate analysis
    optimal_by_n = df.groupby('n')['optimality_rate'].mean()
    axes[1,1].plot(optimal_by_n.index, optimal_by_n.values, 'o-', color='red')
    axes[1,1].set_xlabel('n')
    axes[1,1].set_ylabel('Optimality Rate')
    axes[1,1].set_title('Optimality Rate vs n')
    
    # 6. Trade-off: success rate vs solution length
    axes[1,2].scatter(df['success_rate'], df['avg_solution_length'], c=df['n'], alpha=0.6)
    axes[1,2].set_xlabel('Success Rate')
    axes[1,2].set_ylabel('Average Solution Length')
    axes[1,2].set_title('Success Rate vs Solution Length')
    plt.colorbar(axes[1,2].collections[0], ax=axes[1,2], label='n')
    
    plt.tight_layout()
    plt.savefig('comprehensive_analysis.png', dpi=300, bbox_inches='tight')
    plt.show()

def plot_baseline_comparison(baseline_results):
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    
    for i, result in enumerate(baseline_results):
        n = result['n']
        
        # Success rate comparison
        axes[0,0].plot(result['beam_size_range'], result['hamming_success'], 'o--', label=f'Hamming n={n}')
        axes[0,0].plot(result['beam_size_range'], result['nn_success'], 'o-', label=f'NN n={n}')
        
        # Optimality rate comparison  
        axes[0,1].plot(result['beam_size_range'], result['hamming_optimal'], 'o--', label=f'Hamming n={n}')
        axes[0,1].plot(result['beam_size_range'], result['nn_optimal'], 'o-', label=f'NN n={n}')
    
    axes[0,0].set_xlabel('Beam Size')
    axes[0,0].set_ylabel('Success Rate')
    axes[0,0].set_title('Success Rate Comparison')
    axes[0,0].legend()
    axes[0,0].set_xscale('log')
    
    axes[0,1].set_xlabel('Beam Size')
    axes[0,1].set_ylabel('Optimality Rate')
    axes[0,1].set_title('Optimality Rate Comparison')
    axes[0,1].legend()
    axes[0,1].set_xscale('log')
    
    plt.tight_layout()
    plt.savefig('baseline_comparison.png', dpi=300, bbox_inches='tight')
    plt.show()



## Ablation

In [None]:
def ablation_study(n_range: list[int], hyperparameter_space: dict):
    seed_everything(42)
    results = []

    experiment_id = 1
    total_experiments = len(n_range) * len(list(product(*hyperparameter_space.values())))
    
    print(f"Total experiments: {total_experiments}")
    
    for n in n_range:
        print(f"\n=== Experiment for {n} ===")
        
        seed_everything(0)
        test_start_states = [np.random.permutation(n) for _ in range(100)]
        heurestic_path = [pancake_sort_path(x) for x in test_start_states]
        
        hyperparam_combinations = list(product(
            hyperparameter_space['width'],
            hyperparameter_space['length'], 
            hyperparameter_space['n_dim'],
            hyperparameter_space['epochs'],
            hyperparameter_space['beam_width']
        ))
        
        for width, length, n_dim, epochs, beam_width in tqdm(hyperparam_combinations, desc=f"n={n}"):
            try:
                model, graph = train_n(n, width, length, n_dim, epochs)
                predictor = Predictor(graph, model)
                
                success_rate, avg_length, avg_diff, optimality_rate = measure_success(
                    predictor, beam_width, test_start_states, heurestic_path, graph
                )
                
                result = {
                    'experiment_id': experiment_id,
                    'n': n,
                    'hyperparameters': {
                        'width': width,
                        'length': length, 
                        'n_dim': n_dim,
                        'epochs': epochs,
                        'beam_width': beam_width
                    },
                    'metrics': {
                        'success_rate': success_rate,
                        'avg_solution_length': avg_length,
                        'avg_diff_from_optimal': avg_diff,
                        'optimality_rate': optimality_rate
                    },
                    'timestamp': datetime.now().isoformat()
                }
                
                results.append(result)
                experiment_id += 1
                
                if experiment_id % 50 == 0:
                    save_intermediate_results(results, experiment_id)
                    
                del model, predictor
                if torch.cuda.is_available():
                    torch.cuda.empty_cache()
                    
            except Exception as e:
                print(f"Exception while compute {experiment_id}: {e}")
                continue
    
    save_final_results(results)
    
    analyze_results(results)
    
    return results


def compare_with_baselines(n_range, beam_size_range):
    baseline_results = []
    
    for n in n_range:
        print(f"\n=== Compare with baseline for n={n} ===")
        
        seed_everything(0)
        test_start_states = [np.random.permutation(n) for _ in range(50)]
        heurestic_path = [pancake_sort_path(x) for x in test_start_states]

        graph = CayleyGraph(PermutationGroups.pancake(n), device="cuda", random_seed=42)
        
        hamming_success_rates = []
        hamming_optimal_rates = []
        
        best_nn_success_rates = []
        best_nn_optimal_rates = []
        
        for beam_size in beam_size_range:
            hamming_predictor = Predictor(graph, "hamming")
            hamming_success, _, _, hamming_optimal = measure_success(
                hamming_predictor, beam_size, test_start_states, heurestic_path, graph
            )
            hamming_success_rates.append(hamming_success)
            hamming_optimal_rates.append(hamming_optimal)

            best_width, best_length, best_n_dim, best_epochs = 1000, 20, 128, 30
            nn_model, _ = train_n(n, best_width, best_length, best_n_dim, best_epochs)
            nn_predictor = Predictor(graph, nn_model)
            nn_success, _, _, nn_optimal = measure_success(
                nn_predictor, beam_size, test_start_states, heurestic_path, graph
            )
            best_nn_success_rates.append(nn_success)
            best_nn_optimal_rates.append(nn_optimal)
            
            del nn_model, nn_predictor
            if torch.cuda.is_available():
                torch.cuda.empty_cache()
        
        baseline_results.append({
            'n': n,
            'beam_size_range': beam_size_range,
            'hamming_success': hamming_success_rates,
            'hamming_optimal': hamming_optimal_rates,
            'nn_success': best_nn_success_rates,
            'nn_optimal': best_nn_optimal_rates
        })
    
    return baseline_results


def analyze_results(results):
    df = pd.DataFrame([{
        **{'experiment_id': r['experiment_id'], 'n': r['n']},
        **r['hyperparameters'], 
        **r['metrics']
    } for r in results])
    

    best_by_n = df.loc[df.groupby('n')['success_rate'].idxmax()]
    print("\nЛучшие результаты по n:")
    print(best_by_n[['n', 'success_rate', 'optimality_rate', 'width', 'length', 'n_dim', 'epochs', 'beam_width']])
    

    plot_comprehensive_analysis(df)
    

    statistical_analysis(df)


# Final

In [None]:
# code