In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.distributions import MultivariateNormal
from collections import deque
import random
import matplotlib.pyplot as plt

class Policy(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.fc_mean = nn.Linear(dim, dim)
        self.log_std = nn.Parameter(torch.zeros(dim))

        # Value function baseline
        self.value_net = nn.Linear(dim, 1)

    def forward(self, x):
        mean = self.fc_mean(x)
        cov = torch.diag(torch.exp(self.log_std))
        return MultivariateNormal(mean, cov), self.value_net(x)



class ExperienceReplay:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, experience):
        self.buffer.append(experience)

    def sample(self, batch_size):
        return random.sample(self.buffer, min(batch_size, len(self.buffer)))

    def __len__(self):
        return len(self.buffer)

def compute_reward(v_new, v_prev, A, lambda_):
    """Reward function using Rayleigh quotient"""
    # Compute Rayleigh quotient
    with torch.no_grad():
        R = (v_new @ A @ v_new) / (v_new @ v_new)

    # 1. Rayleigh quotient component (closer to true eigenvalue is better)
    rayleigh_component = -torch.abs(R - lambda_).item()

    # 2. Direction consistency component
    cosine_sim = torch.nn.functional.cosine_similarity(
        v_new.unsqueeze(0), v_prev.unsqueeze(0)
    ).item()
    direction_component = 0.3 * cosine_sim

    # 3. Improvement component (compared to previous Rayleigh quotient)
    with torch.no_grad():
        prev_R = (v_prev @ A @ v_prev) / (v_prev @ v_prev)
    improvement_component = 0.2 * (torch.abs(prev_R - lambda_).item() - torch.abs(R - lambda_).item())

    # Combined reward
    reward = rayleigh_component + direction_component + improvement_component

    # Also compute residual for tracking
    residual = torch.linalg.norm((A - lambda_ * torch.eye(A.shape[0])) @ v_new)

    return reward, residual.item(), cosine_sim

def train(A, lambda_, policy, epochs=500, batch_size=32, replay_size=1000, dominant_v=None):
    optimizer = optim.Adam(policy.parameters(), lr=1e-3)
    replay_buffer = ExperienceReplay(replay_size)
    v = torch.randn(A.shape[0])
    v = v / torch.norm(v)

    # Tracking variables
    best_v = v.clone()
    best_residual = float('inf')
    moving_avg_reward = 0
    alpha = 0.1  # For moving average

    for epoch in range(epochs):
        # Generate multiple trajectories for batch update
        states = []
        actions = []
        rewards = []
        values = []

        for _ in range(batch_size):
            v_prev = v.clone()
            dist, value_est = policy(v_prev)
            delta_v = dist.sample()

            with torch.no_grad():
                v_new = v_prev + delta_v
                v_new = v_new / torch.norm(v_new)
                reward, residual, cos_sim = compute_reward(v_new, v_prev, A, lambda_)

                # Update best solution found
                if residual < best_residual:
                    best_residual = residual
                    best_v = v_new.clone()

                # Update moving average for baseline
                moving_avg_reward = alpha * reward + (1 - alpha) * moving_avg_reward

            # Store experience
            replay_buffer.push((v_prev, delta_v, reward, value_est))

            # Store for batch update
            states.append(v_prev)
            actions.append(delta_v)
            rewards.append(reward)
            values.append(value_est)

        # Sample from replay buffer
        if len(replay_buffer) > batch_size:
            replay_batch = replay_buffer.sample(batch_size)
            replay_states, replay_actions, replay_rewards, replay_values = zip(*replay_batch)

            # Combine with current batch
            states.extend(replay_states)
            actions.extend(replay_actions)
            rewards.extend(replay_rewards)
            values.extend(replay_values)

        # Convert to tensors
        states = torch.stack(states)
        actions = torch.stack(actions)
        rewards = torch.tensor(rewards, dtype=torch.float32)
        old_values = torch.cat(values).squeeze()

        # Normalize rewards
        rewards = (rewards - rewards.mean()) / (rewards.std() + 1e-8)

        # Compute advantages
        advantages = rewards - old_values.detach()

        # Policy loss
        dists, value_ests = policy(states)
        log_probs = dists.log_prob(actions)

        # PPO-style clipped objective
        ratios = torch.exp(log_probs - dists.log_prob(actions).detach())
        clipped_ratios = torch.clamp(ratios, 0.8, 1.2)
        policy_loss = -torch.min(ratios * advantages, clipped_ratios * advantages).mean()

        # Value loss (MSE)
        value_loss = 0.5 * (value_ests.squeeze() - rewards).pow(2).mean()

        # Entropy bonus
        entropy = dists.entropy().mean()

        # Total loss
        loss = policy_loss + 0.5 * value_loss - 0.01 * entropy

        # Update
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(policy.parameters(), 1.0)
        optimizer.step()

        # Update v with best candidate from batch
        v = best_v.clone()

        if epoch % 100 == 0:
            print(
                f'Epoch {epoch}: Loss = {loss.item():.4f}, '
                f'Residual = {best_residual:.4f}, '
                f'Avg Reward = {moving_avg_reward:.4f}'
                f', Cosine Similarity = {torch.nn.functional.cosine_similarity(best_v.unsqueeze(0), dominant_v.unsqueeze(0)).item():.4f}'
                )

    return best_v

# Test function with improved training
def test_improved_policy(dim=5, epochs=1000):
    # Generate random symmetric matrix
    A_np = np.random.randn(dim, dim)
    A_np = A_np + A_np.T
    A = torch.tensor(A_np, dtype=torch.float32)

    # Get ground truth
    eigenvalues, eigenvectors = np.linalg.eig(A_np)
    dominant_idx = np.argmax(np.abs(eigenvalues))
    dominant_lambda = eigenvalues[dominant_idx]
    dominant_v = torch.tensor(eigenvectors[:, dominant_idx], dtype=torch.float32)

    print(f"True dominant eigenvalue: {dominant_lambda}")

    # Train
    policy = Policy(dim)
    predicted_v = train(A, dominant_lambda, policy, epochs=epochs, dominant_v=dominant_v)

    # Compare
    cosine_sim = torch.nn.functional.cosine_similarity(
        predicted_v.unsqueeze(0), dominant_v.unsqueeze(0)
    ).item()

    print(f"\nPredicted eigenvector: {predicted_v.detach().numpy()}")
    print(f"True eigenvector: {dominant_v.numpy()}")
    print(f"Cosine similarity: {cosine_sim:.4f}")

    return cosine_sim



# Evaluating on different sizes

In [None]:
choices = [5, 10, 20, 50, 75, 100]
results = []
for dim in choices:
    print(f"\nTesting with dimension: {dim}")
    for i in range(10):
        print(f"Run {i + 1}")
        x = test_improved_policy(dim=dim, epochs=1001)
        results.append(x)
results = np.array(results)
results = results.reshape(len(choices), 10)



Testing with dimension: 5
Run 1
True dominant eigenvalue: 4.423332709445424
Epoch 0: Loss = 0.4892, Residual = 3.0350, Avg Reward = -5.4627, Cosine Similarity = 0.4257
Epoch 100: Loss = 0.3540, Residual = 0.6296, Avg Reward = -5.3484, Cosine Similarity = 0.9924
Epoch 200: Loss = 0.2140, Residual = 0.4287, Avg Reward = -4.1241, Cosine Similarity = 0.9923
Epoch 300: Loss = 0.1668, Residual = 0.2956, Avg Reward = -2.6510, Cosine Similarity = 0.9991
Epoch 400: Loss = 0.1789, Residual = 0.2956, Avg Reward = -2.0786, Cosine Similarity = 0.9991
Epoch 500: Loss = 0.1856, Residual = 0.1921, Avg Reward = -1.2532, Cosine Similarity = 0.9992
Epoch 600: Loss = 0.1808, Residual = 0.1113, Avg Reward = -1.2476, Cosine Similarity = 0.9997
Epoch 700: Loss = 0.1897, Residual = 0.1113, Avg Reward = -0.4516, Cosine Similarity = 0.9997
Epoch 800: Loss = 0.1924, Residual = 0.0831, Avg Reward = -0.1138, Cosine Similarity = 0.9999
Epoch 900: Loss = 0.1951, Residual = 0.0831, Avg Reward = -0.1146, Cosine Simil

## Testing on $200 \times 200$ Matrix

In [None]:
result = np.zeros(200)
for i in range(200):
    result[i]=test_improved_policy(dim=200, epochs=1001)

In [None]:
print(results)

[[-0.99989378  0.99989712 -0.99969399 -0.99979639 -0.99949962 -0.99879837
  -0.99980378 -0.99987417  0.99800885  0.99893636]
 [ 0.98868424  0.98912501  0.39573392  0.90100849  0.99213219 -0.41455376
   0.99516791 -0.99395812  0.96217597  0.9908163 ]
 [ 0.93460745 -0.74831402  0.97032142  0.0392857   0.94186842  0.09373933
   0.85724699 -0.39186513  0.12892543 -0.99045336]
 [ 0.04372105 -0.37457335 -0.13406821 -0.53124219 -0.19299236  0.7946744
   0.06214511 -0.639027    0.46845335 -0.87433743]
 [-0.2202062   0.4366408  -0.00737423  0.30844855  0.78106272  0.28384042
   0.57013911  0.56508613 -0.63855362  0.29097843]
 [ 0.10769464  0.13857806 -0.30613685 -0.31653365 -0.20451172 -0.57231629
   0.00109701 -0.25444061 -0.31986678  0.12743297]]


In [None]:
perf = [np.mean(np.abs(results[i])) for i in range(len(choices))]

In [None]:
for i in range(len(choices)):
  print(f"{choices[i]} :  {perf[i]}")

5 :  0.999689120054245
10 :  0.8939916729927063
20 :  0.8687786281108856
50 :  0.6661560848355293
75 :  0.6077921230345964
100 :  0.49395446181297303
