In [2]:
# Imports 
import numpy as np
from tqdm import trange,tqdm

In [3]:
# Constantes 
GOAL_SCORE = 100.0
ALPHA = 0.1
GAMMA = 0.95
EPSILON = 0.8
EPOCHS = 10000

# Variables
grid_rows = 3
grid_cols = 4

q_table = np.zeros((grid_rows, grid_cols, 4)) 

actions = ['up', 'down', 'left', 'right']

rewards = np.full((grid_rows, grid_cols), -1)
rewards[0, 3] = GOAL_SCORE # meta
obstacles = [[1,1]] # obstaculo

print(rewards)


# 1.b
q_table_b = np.zeros((grid_rows, grid_cols, 4))
rewards_b = np.array([
    [-3., -2., -1., GOAL_SCORE],
    [-4., -100., -2., -1.],
    [-5., -4., -3., -2.]
])
rewards_b[0, 3] = GOAL_SCORE
obstacles_b = [[1,1]]

[[ -1  -1  -1 100]
 [ -1  -1  -1  -1]
 [ -1  -1  -1  -1]]


In [4]:
# Funciones 
def is_terminal_state(row, col, rewards=rewards):
    if rewards[row, col] == GOAL_SCORE:
        return True
    return False

def get_random_location():
    row = np.random.randint(grid_rows)
    col = np.random.randint(grid_cols)
    while is_terminal_state(row, col) or [row, col] in obstacles:
        row = np.random.randint(grid_rows)
        col = np.random.randint(grid_cols)
    return row, col

def get_next_action(row, col, q_table=q_table, epsilon=EPSILON):
    if np.random.random() < epsilon:
        return np.random.randint(4)
    return np.argmax(q_table[row, col])

def get_next_state(row, col, q_table=q_table, obstacles=obstacles, epsilon=EPSILON):
    finded = False
    while not finded:
        action_index = get_next_action(row, col, q_table=q_table, epsilon=epsilon)
        action = actions[action_index]
        new_row, new_col = row, col
        if action == 'up' and row > 0:
            new_row -= 1
        elif action == 'down' and row < grid_rows - 1:
            new_row += 1
        elif action == 'left' and col > 0:
            new_col -= 1
        elif action == 'right' and col < grid_cols - 1:
            new_col += 1

        if [new_row, new_col] not in obstacles and (new_row != row or new_col != col):
            finded = True
    return new_row, new_col, action_index

def get_shortest_path(start_row, start_col, q_table=q_table, rewards=rewards, obstacles=obstacles):
    if is_terminal_state(start_row, start_col, rewards=rewards):
        return []
    else:
        current_row, current_col = start_row, start_col
        path = [[current_row, current_col]]
        while not is_terminal_state(current_row, current_col, rewards=rewards):
            current_row, current_col, _ = get_next_state(current_row, current_col, q_table=q_table, obstacles=obstacles, epsilon=0.0)
            path.append([current_row, current_col])
        return path

In [None]:
# Ejecucion

def train_delta(q_table=q_table, grid=rewards, obstacles=obstacles, conv_threshold=0.001, patience=20):
    stable_epochs = 0
    for epoch in trange(EPOCHS):
        next_row, next_col = 2, 0
        old_matrix = q_table.copy()
        while not is_terminal_state(next_row, next_col, rewards):
            old_row, old_col = next_row, next_col

            next_row, next_col, action_index = get_next_state(next_row, next_col, q_table=q_table, obstacles=obstacles)

            next_reward = grid[next_row, next_col]
            old_q_value = q_table[old_row, old_col, action_index]

            temporal_difference = next_reward + (GAMMA * np.max(q_table[next_row, next_col])) - old_q_value
            new_q_value = old_q_value + (ALPHA * temporal_difference)

            q_table[old_row, old_col, action_index] = new_q_value

        delta = np.linalg.norm(q_table - old_matrix)
        if delta < conv_threshold:
            stable_epochs += 1
            if stable_epochs >= patience:
                print(f"Converged after {epoch+1} epochs.")
                break
        else:
            stable_epochs = 0
        

In [449]:
def train_std(q_table, grid, obstacles, 
          alpha = ALPHA, gamma = GAMMA, 
          epsilon = EPSILON, epsilon_min = 0.01, epsilon_decay = 0.995,
          conv_threshold = 0.1, window_size = 10):
    
    snapshots = [q_table.copy()]
    reward_history = []

    pbar = tqdm(desc="Training")
    epoch_limit = 100000
    epoch = 0

    while epoch < epoch_limit:
      epoch += 1
      pbar.update(1)

      next_row, next_col = 2, 0
      total_reward = 0
      while not is_terminal_state(next_row, next_col, grid):
        old_row, old_col = next_row, next_col

        next_row, next_col, action_index = get_next_state(next_row, next_col, q_table=q_table, obstacles=obstacles, epsilon=epsilon)

        next_reward = grid[next_row, next_col]
        old_q_value = q_table[old_row, old_col, action_index]

        temporal_difference = next_reward + (gamma * np.max(q_table[next_row, next_col])) - old_q_value
        new_q_value = old_q_value + (alpha * temporal_difference)

        q_table[old_row, old_col, action_index] = new_q_value

        total_reward += next_reward

      reward_history.append(total_reward)
      epsilon = max(epsilon_min, epsilon*epsilon_decay)
      snapshots.append(q_table.copy())

      if len(reward_history) >= window_size:
         window = reward_history[-window_size:]
         if np.std(window) < conv_threshold:
            print("Convergio en el epoch: ", epoch)
            pbar.close()
            break
         
    pbar.close()
    snapshots.append(q_table.copy())
    snapshots_output = [
       snapshots[0],
       snapshots[int(len(snapshots)*0.33)],
       snapshots[int(len(snapshots)*0.66)],
       snapshots[-1]
    ]

    return snapshots_output


In [None]:
def train(q_table, grid, obstacles, 
          alpha = ALPHA, gamma = GAMMA, 
          epsilon = EPSILON, epsilon_min = 0.01, epsilon_decay = 0.995,
          conv_threshold = [0.1, 0.1], window_size = 10, patience=10):
   
    
   snapshots = [q_table.copy()]
   reward_history = []

   epoch_limit = 30000
   total_epochs = 0
   stable_epochs = 0
   is_converged = False

   for epoch in range(epoch_limit):
      total_epochs += 1

      next_row, next_col = 2, 0
      total_reward = 0
      old_q_table = q_table.copy()

      while not is_terminal_state(next_row, next_col, grid):
         old_row, old_col = next_row, next_col

         next_row, next_col, action_index = get_next_state(next_row, next_col, q_table=q_table, obstacles=obstacles, epsilon=epsilon)

         next_reward = grid[next_row, next_col]
         old_q_value = q_table[old_row, old_col, action_index]

         temporal_difference = next_reward + (gamma * np.max(q_table[next_row, next_col])) - old_q_value
         new_q_value = old_q_value + (alpha * temporal_difference)

         q_table[old_row, old_col, action_index] = new_q_value

         total_reward += next_reward

      delta = np.linalg.norm(q_table - old_q_table)
      reward_history.append(total_reward)
      epsilon = max(epsilon_min, epsilon*epsilon_decay)
      snapshots.append(q_table.copy())

      if len(reward_history) >= window_size:
         window = reward_history[-window_size:]
         std_window = np.std(window)
      else:
         std_window = float('inf')

      if delta < conv_threshold[0] and std_window < conv_threshold[1]:
            stable_epochs += 1
            if stable_epochs >= patience:
               #print("Convergio en el epoch: ", total_epochs)
               is_converged = True
               break
      else:
            stable_epochs = 0
         
   if not np.array_equal(q_table, snapshots[-1]):
      snapshots.append(q_table.copy())

   snapshots_output = [
      snapshots[0],
      snapshots[int(len(snapshots)*0.33)],
      snapshots[int(len(snapshots)*0.66)],
      snapshots[-1]
   ]

   avg_reward_final = np.mean(reward_history[-window_size:]) if len(reward_history) >= window_size else np.mean(reward_history)
   avg_reward_total = np.mean(reward_history)
   avg_reward_100 = np.mean(reward_history[-100:]) if len(reward_history) >= 100 else np.mean(reward_history)

   results_dict = {
      "snapshots": snapshots_output,
      "avg_reward_final": avg_reward_final,
      "avg_reward_total": avg_reward_total,
      "avg_reward_100": avg_reward_100,
      "total_epochs": total_epochs,
      "is_converged": is_converged
   }

   return results_dict

In [93]:
q_table = np.zeros((grid_rows, grid_cols, 4)) 
#output = train_std(q_table=q_table, grid=rewards, obstacles=obstacles, conv_threshold=0.00001, window_size=50)
#train(q_table,rewards,obstacles, conv_threshold=0.1, patience=20)
output = train(q_table=q_table, grid=rewards, obstacles=obstacles, conv_threshold=[0.0001,0.01], window_size=20, patience=10)

  0%|          | 467/100000 [00:00<00:20, 4824.82it/s]

Convergio en el epoch:  468





In [92]:
# avr si jala esta vaina 
display(q_table)
get_shortest_path(2,0, q_table, rewards, obstacles)

array([[[  0.        ,   9.39576234,   0.        ,  20.98875995],
        [  0.        ,   0.        ,   0.51227339,  63.74672578],
        [  0.        ,  62.26216957,  20.5132619 ,  99.12720364],
        [  0.        ,   0.        ,   0.        ,   0.        ]],

       [[  5.87607678,  72.16613654,   0.        ,   0.        ],
        [  0.        ,   0.        ,   0.        ,   0.        ],
        [ 89.41038685,  79.83469657,   0.        ,  94.        ],
        [100.        ,  85.89523165,  87.71817715,   0.        ]],

       [[ 64.88483149,   0.        ,   0.        ,  77.74075   ],
        [  0.        ,   0.        ,  71.91713975,  82.885     ],
        [ 88.3       ,   0.        ,  76.62256644,  86.81153301],
        [ 93.9687413 ,   0.        ,  64.34045335,   0.        ]]])

[[2, 0], [2, 1], [2, 2], [1, 2], [1, 3], [0, 3]]

In [104]:
q_table_b = np.zeros((grid_rows, grid_cols, 4))
#output =train_delta(q_table=q_table_b, grid=rewards_b, obstacles=obstacles_b, conv_threshold=0.001, patience=20)
output =train(q_table=q_table_b, grid=rewards_b, obstacles=obstacles_b, conv_threshold=[0.0001,0.01], window_size=20, patience=10)

  0%|          | 471/100000 [00:00<00:20, 4900.95it/s]

Convergio en el epoch:  472





In [113]:
display(q_table_b)
get_shortest_path(2,0, q_table_b, rewards_b, obstacles_b)

array([[[  0.        ,  71.08676633,   0.        ,  87.3       ],
        [  0.        ,   0.        ,  79.06725218,  94.        ],
        [  0.        ,  80.54491417,  86.08450391, 100.        ],
        [  0.        ,   0.        ,   0.        ,   0.        ]],

       [[ 79.935     ,  61.43842673,   0.        ,   0.        ],
        [  0.        ,   0.        ,   0.        ,   0.        ],
        [ 93.85215593,  40.86049533,   0.        ,  23.99911309],
        [ 74.58134172,   1.05759267,  14.14913477,   0.        ]],

       [[ 71.93825   ,   0.        ,   0.        ,  48.11605725],
        [  0.        ,   0.        ,  17.43025647,  69.69861071],
        [ 84.99264483,   0.        ,   2.64089184,   7.09764575],
        [ 29.91133017,   0.        ,   1.96343993,   0.        ]]])

[[2, 0], [1, 0], [0, 0], [0, 1], [0, 2], [0, 3]]

In [None]:
# Grid search parameters
test_alphas = [0.05, 0.1, 0.2, 0.3]
test_gammas = [0.8, 0.9, 0.95, 0.99]
test_epsilons = [0.5, 0.6, 0.7, 0.8, 0.9]
test_windows = [10, 30, 50]
test_conv_stds = [0.5, 0.3, 0.1, 0.01]
test_conv_deltas = [0.1, 0.01, 0.001, 0.00001]
test_patiences = [10, 30, 50]
test_decays = [0.99, 0.995, 0.997]
test_num_runs = 5

# Calculate total combinations
total_combinations = (len(test_alphas) * len(test_gammas) * len(test_epsilons) * 
                      len(test_windows) * len(test_conv_stds) * len(test_conv_deltas) * 
                      len(test_patiences) * len(test_decays) * test_num_runs)
print(f"Total experiments: {total_combinations:,}")

results = []
pbar = tqdm(total=total_combinations, desc="Testing")
for alpha in test_alphas:
    for gamma in test_gammas:
        for epsilon in test_epsilons:
            for window in test_windows:
                for conv_std in test_conv_stds:
                    for conv_delta in test_conv_deltas:
                        for patience in test_patiences:
                            for decay in test_decays:
                                run_results = []
                                for _ in range(test_num_runs):
                                    q_table = np.zeros((grid_rows, grid_cols, 4))
                                    result = train(q_table=q_table, grid=rewards, obstacles=obstacles,
                                          alpha=alpha, gamma=gamma, epsilon=epsilon, epsilon_decay=decay,
                                          conv_threshold=[conv_delta, conv_std], window_size=window, patience=patience)
                                    run_results.append({
                                        'is_converged': result['is_converged'],
                                        'convergence_epochs': result['total_epochs'],
                                        'path_length': len(get_shortest_path(2, 0, q_table, rewards, obstacles)),
                                        'avg_reward_total': result['avg_reward_total'],
                                        'avg_reward_final': result['avg_reward_final'],
                                        'avg_reward_100': result['avg_reward_100']
                                    })
                                    pbar.update(1)

                                # Aggregate results for this configuration
                                avg_convergence_epochs = np.mean([r['convergence_epochs'] for r in run_results if r['is_converged']]) if any(r['is_converged'] for r in run_results) else None
                                avg_path_length = np.mean([r['path_length'] for r in run_results])
                                avg_reward_total = np.mean([r['avg_reward_total'] for r in run_results])
                                avg_reward_final = np.mean([r['avg_reward_final'] for r in run_results])
                                avg_reward_100 = np.mean([r['avg_reward_100'] for r in run_results])
                                convergence_rate = sum(r['is_converged'] for r in run_results) / test_num_runs

                                results.append({
                                    'alpha': alpha,
                                    'gamma': gamma,
                                    'epsilon': epsilon,
                                    'window_size': window,
                                    'conv_std': conv_std,
                                    'conv_delta': conv_delta,
                                    'patience': patience,
                                    'decay': decay,
                                    'avg_convergence_epochs': avg_convergence_epochs,
                                    'avg_path_length': avg_path_length,
                                    'avg_reward_total': avg_reward_total,
                                    'avg_reward_final': avg_reward_final,
                                    'avg_reward_100': avg_reward_100,
                                    'convergence_rate': convergence_rate
                                })

pbar.close()
print(f"\nCompleted {len(results)} unique configurations")

Total experiments: 172,800


Testing:   0%|          | 99/172800 [00:11<5:35:50,  8.57it/s]
Testing:   1%|          | 919/172800 [01:31<4:34:15, 10.45it/s] 

In [None]:
# ============================================
# AN√ÅLISIS DE RESULTADOS
# ============================================
import pandas as pd
import matplotlib.pyplot as plt

def analyze_results(df):
    """
    Analiza y visualiza los resultados del grid search
    """
    print("\n" + "="*60)
    print("üìä AN√ÅLISIS DE RESULTADOS")
    print("="*60)
    
    # Top 5 configuraciones por velocidad de convergencia
    print("\nüèÜ Top 5: Convergencia m√°s r√°pida")
    top_speed = df[df['avg_convergence_epochs'].notna()].nsmallest(5, 'avg_convergence_epochs')[
        ['alpha', 'gamma', 'epsilon', 'window_size', 'avg_convergence_epochs', 'convergence_rate']
    ]
    print(top_speed.to_string(index=False))
    
    # Top 5 configuraciones por longitud de camino
    print("\nüéØ Top 5: Camino m√°s corto")
    top_path = df.nsmallest(5, 'avg_path_length')[
        ['alpha', 'gamma', 'epsilon', 'avg_path_length', 'avg_reward_final', 'convergence_rate']
    ]
    print(top_path.to_string(index=False))
    
    # Top 5 por recompensa final promedio
    print("\nüí∞ Top 5: Mayor recompensa final promedio")
    top_reward = df.nlargest(5, 'avg_reward_final')[
        ['alpha', 'gamma', 'epsilon', 'avg_reward_final', 'avg_path_length', 'convergence_rate']
    ]
    print(top_reward.to_string(index=False))
    
    # Top 5 por recompensa √∫ltimos 100 epochs
    print("\nüìà Top 5: Mayor recompensa (√∫ltimos 100 epochs)")
    top_reward_100 = df.nlargest(5, 'avg_reward_100')[
        ['alpha', 'gamma', 'epsilon', 'avg_reward_100', 'avg_path_length', 'convergence_rate']
    ]
    print(top_reward_100.to_string(index=False))
    
    # Configuraciones que NO convergieron
    not_converged = df[df['convergence_rate'] < 1.0]
    if len(not_converged) > 0:
        print(f"\n‚ö†Ô∏è  {len(not_converged)} configuraciones con problemas de convergencia")
    
    # Mejor configuraci√≥n balanceada
    print("\n" + "="*60)
    print("üåü MEJOR CONFIGURACI√ìN BALANCEADA")
    print("="*60)
    
    # Normalizar m√©tricas para scoring
    df_norm = df.copy()
    
    # Solo considerar filas que convergieron para el score de convergencia
    converged_mask = df['avg_convergence_epochs'].notna()
    if converged_mask.any():
        max_epochs = df.loc[converged_mask, 'avg_convergence_epochs'].max()
        df_norm.loc[converged_mask, 'score_convergence'] = 1 - (df.loc[converged_mask, 'avg_convergence_epochs'] / max_epochs)
        df_norm.loc[~converged_mask, 'score_convergence'] = 0
    else:
        df_norm['score_convergence'] = 0
    
    df_norm['score_path'] = 1 - (df['avg_path_length'] / df['avg_path_length'].max())
    df_norm['score_reward_final'] = (df['avg_reward_final'] - df['avg_reward_final'].min()) / (df['avg_reward_final'].max() - df['avg_reward_final'].min())
    df_norm['score_reward_100'] = (df['avg_reward_100'] - df['avg_reward_100'].min()) / (df['avg_reward_100'].max() - df['avg_reward_100'].min())
    
    # Score balanceado (puedes ajustar los pesos)
    df_norm['balanced_score'] = (
        0.2 * df_norm['score_convergence'] +     # 20% velocidad
        0.4 * df_norm['score_path'] +             # 40% calidad del camino
        0.2 * df_norm['score_reward_final'] +     # 20% recompensa final
        0.2 * df_norm['score_reward_100']         # 20% recompensa √∫ltimos 100
    ) * df_norm['convergence_rate']  # Penalizar si no converge siempre
    
    best = df_norm.nlargest(1, 'balanced_score').iloc[0]
    print(f"\nAlpha:       {best['alpha']}")
    print(f"Gamma:       {best['gamma']}")
    print(f"Epsilon:     {best['epsilon']}")
    print(f"Window size: {best['window_size']}")
    print(f"Conv std:    {best['conv_std']}")
    print(f"Conv delta:  {best['conv_delta']}")
    print(f"Patience:    {best['patience']}")
    print(f"Decay:       {best['decay']}")
    print(f"\nM√©tricas:")
    if pd.notna(best['avg_convergence_epochs']):
        print(f"  - Convergencia:      {best['avg_convergence_epochs']:.0f} epochs")
    else:
        print(f"  - Convergencia:      No convergi√≥")
    print(f"  - Path length:       {best['avg_path_length']:.1f} pasos")
    print(f"  - Recompensa final:  {best['avg_reward_final']:.2f}")
    print(f"  - Recompensa total:  {best['avg_reward_total']:.2f}")
    print(f"  - Recompensa (100):  {best['avg_reward_100']:.2f}")
    print(f"  - Tasa convergencia: {best['convergence_rate']*100:.0f}%")
    
    return df_norm


# ============================================
# VISUALIZACI√ìN
# ============================================
def plot_parameter_effects(df):
    """
    Grafica el efecto de cada par√°metro
    """
    fig, axes = plt.subplots(2, 4, figsize=(20, 10))
    axes = axes.flatten()
    
    # Efecto de Alpha
    alpha_effect = df.groupby('alpha').agg({
        'avg_convergence_epochs': 'mean',
        'avg_path_length': 'mean',
        'avg_reward_final': 'mean'
    })
    axes[0].plot(alpha_effect.index, alpha_effect['avg_convergence_epochs'], 'o-')
    axes[0].set_xlabel('Alpha (learning rate)')
    axes[0].set_ylabel('Epochs to converge')
    axes[0].set_title('Efecto de Alpha en Convergencia')
    axes[0].grid(True)
    
    # Efecto de Gamma
    gamma_effect = df.groupby('gamma').agg({
        'avg_convergence_epochs': 'mean',
        'avg_path_length': 'mean',
        'avg_reward_final': 'mean'
    })
    axes[1].plot(gamma_effect.index, gamma_effect['avg_path_length'], 'o-', color='orange')
    axes[1].set_xlabel('Gamma (discount factor)')
    axes[1].set_ylabel('Path length')
    axes[1].set_title('Efecto de Gamma en Path Length')
    axes[1].grid(True)
    
    # Efecto de Epsilon
    epsilon_effect = df.groupby('epsilon').agg({
        'avg_convergence_epochs': 'mean',
        'avg_path_length': 'mean',
        'avg_reward_final': 'mean'
    })
    axes[2].plot(epsilon_effect.index, epsilon_effect['avg_reward_final'], 'o-', color='green')
    axes[2].set_xlabel('Epsilon (exploration rate)')
    axes[2].set_ylabel('Average reward (final)')
    axes[2].set_title('Efecto de Epsilon en Recompensa')
    axes[2].grid(True)
    
    # Efecto de Window Size
    window_effect = df.groupby('window_size').agg({
        'avg_convergence_epochs': 'mean',
        'convergence_rate': 'mean'
    })
    axes[3].plot(window_effect.index, window_effect['convergence_rate'], 'o-', color='purple')
    axes[3].set_xlabel('Window Size')
    axes[3].set_ylabel('Convergence Rate')
    axes[3].set_title('Efecto de Window Size')
    axes[3].grid(True)
    
    # Efecto de Conv Std
    conv_std_effect = df.groupby('conv_std').agg({
        'avg_convergence_epochs': 'mean',
        'convergence_rate': 'mean'
    })
    axes[4].plot(conv_std_effect.index, conv_std_effect['avg_convergence_epochs'], 'o-', color='red')
    axes[4].set_xlabel('Conv Std Threshold')
    axes[4].set_ylabel('Epochs to converge')
    axes[4].set_title('Efecto de Conv Std')
    axes[4].set_xscale('log')
    axes[4].grid(True)
    
    # Efecto de Conv Delta
    conv_delta_effect = df.groupby('conv_delta').agg({
        'avg_convergence_epochs': 'mean',
        'convergence_rate': 'mean'
    })
    axes[5].plot(conv_delta_effect.index, conv_delta_effect['avg_convergence_epochs'], 'o-', color='brown')
    axes[5].set_xlabel('Conv Delta Threshold')
    axes[5].set_ylabel('Epochs to converge')
    axes[5].set_title('Efecto de Conv Delta')
    axes[5].set_xscale('log')
    axes[5].grid(True)
    
    # Efecto de Patience
    patience_effect = df.groupby('patience').agg({
        'avg_convergence_epochs': 'mean',
        'convergence_rate': 'mean'
    })
    axes[6].plot(patience_effect.index, patience_effect['convergence_rate'], 'o-', color='pink')
    axes[6].set_xlabel('Patience')
    axes[6].set_ylabel('Convergence Rate')
    axes[6].set_title('Efecto de Patience')
    axes[6].grid(True)
    
    # Efecto de Decay
    decay_effect = df.groupby('decay').agg({
        'avg_convergence_epochs': 'mean',
        'avg_reward_final': 'mean'
    })
    axes[7].plot(decay_effect.index, decay_effect['avg_reward_final'], 'o-', color='cyan')
    axes[7].set_xlabel('Epsilon Decay')
    axes[7].set_ylabel('Average Reward (final)')
    axes[7].set_title('Efecto de Epsilon Decay')
    axes[7].grid(True)
    
    plt.tight_layout()
    plt.show()


# Convertir results a DataFrame y analizar
df_results = pd.DataFrame(results)
df_analyzed = analyze_results(df_results)
plot_parameter_effects(df_results)