In [None]:
!pip install numpy pandas matplotlib tensorflow gym




In [None]:
import pandas as pd

# Load the annotated actions CSV
file_path = 'annotated_actions_new.csv'  # Replace with the actual path to the CSV
actions_df = pd.read_csv(file_path)
actions_df['features'] = actions_df['features'].fillna('').astype(str)

# Extract the features column as a list of lists for subtasks
subtasks = actions_df['features'].apply(lambda x: x.split(', ') if x else []).tolist()

# Define success matrix and functions (unchanged)
def aggregate_success(features, robot_type):
    """
    Compute the aggregate success rate for a robot handling a subtask with multiple features.
    """
    success_matrix = {
        'careful': {'light': 0.9, 'middle': 0.7, 'heavy': 0.5},
        'dexterous': {'light': 0.8, 'middle': 0.6, 'heavy': 0.4},
        'heavy': {'light': 0.5, 'middle': 0.7, 'heavy': 0.9}
    }
    # Compute the average success rate across all features
    return sum(success_matrix[feature][robot_type] for feature in features) / len(features) if features else 0

def allocate_subtasks(subtasks, robots):
    """
    Allocate subtasks to robots using dynamic programming.
    subtasks: List of subtasks with their features (e.g., [['careful', 'dexterous'], ['heavy'], ...]).
    robots: List of robot types (e.g., ['light', 'middle', 'heavy']).
    """
    num_subtasks = len(subtasks)
    num_robots = len(robots)

    # Initialize DP table
    dp = [[0] * num_robots for _ in range(num_subtasks)]

    # Base case: Assign the first subtask to each robot
    for j in range(num_robots):
        dp[0][j] = aggregate_success(subtasks[0], robots[j])

    # Fill DP table
    for i in range(1, num_subtasks):
        for j in range(num_robots):
            # Assign subtask i to robot j and maximize the cumulative success rate
            dp[i][j] = max(dp[i-1][k] + aggregate_success(subtasks[i], robots[j]) for k in range(num_robots))

    # Extract the optimal solution
    return max(dp[num_subtasks-1])

# Assuming three robot types for now
robots = ['light', 'middle', 'heavy']

# Call the function with dynamically loaded subtasks
result = allocate_subtasks(subtasks, robots)
print("Maximum Success Rate:", result)


Maximum Success Rate: 1144.9999999999866


In [None]:
import time

def q_learning_allocation(tasks, robots, alpha=0.1, gamma=0.9, epsilon=0.1, episodes=10):
    states = range(len(tasks))
    actions = range(len(robots))
    q_table = np.zeros((len(states), len(actions)))

    start_time = time.time()  # Start timing
    log_interval = episodes // 10  # Log progress every 10% of episodes

    for episode in range(episodes):
        for state in states:
            action = random.choice(actions) if random.uniform(0, 1) < epsilon else np.argmax(q_table[state])

            reward = aggregate_success(tasks[state], robots[action])
            next_state = (state + 1) % len(states)
            q_table[state, action] += alpha * (reward + gamma * np.max(q_table[next_state]) - q_table[state, action])

        # Log progress
        if (episode + 1) % log_interval == 0 or episode == episodes - 1:
            elapsed_time = time.time() - start_time
            avg_time_per_episode = elapsed_time / (episode + 1)
            remaining_time = avg_time_per_episode * (episodes - episode - 1)
            print(f"Episode {episode + 1}/{episodes} - Elapsed Time: {elapsed_time:.2f}s - Estimated Remaining Time: {remaining_time:.2f}s")

    total_success_rate = 0
    allocations = []
    for state in states:
        best_action = np.argmax(q_table[state])
        task_success_rate = aggregate_success(tasks[state], robots[best_action])
        total_success_rate += task_success_rate
        allocations.append((state, robots[best_action], task_success_rate))

    average_success_rate = total_success_rate / len(tasks)
    return average_success_rate, allocations


In [None]:
def dqn_allocation(tasks, robots, episodes=10, batch_size=32):
    state_size = 1  # Simplified state representation (can be modified for more complex states)
    action_size = len(robots)
    model = build_dqn_model(state_size, action_size)

    gamma = 0.9
    epsilon = 0.1
    memory = []

    start_time = time.time()  # Start timing
    log_interval = max(1, episodes // 10)  # Log progress every 10% of episodes

    for episode in range(episodes):
        for task in tasks:
            state = np.array([[len(task)]], dtype=np.float32)  # Ensure state has shape (1, state_size)

            if random.uniform(0, 1) < epsilon:
                action = random.choice(range(action_size))
            else:
                action = np.argmax(model.predict(state, verbose=0))

            reward = aggregate_success(task, robots[action])
            next_state = np.array([[len(task)]], dtype=np.float32)  # Ensure next_state has the same shape
            memory.append((state, action, reward, next_state))

            if len(memory) >= batch_size:
                minibatch = random.sample(memory, batch_size)
                states_batch = np.array([s[0] for s, _, _, _ in minibatch], dtype=np.float32)
                actions_batch = [a for _, a, _, _ in minibatch]
                rewards_batch = [r for _, _, r, _ in minibatch]
                next_states_batch = np.array([ns[0] for _, _, _, ns in minibatch], dtype=np.float32)

                next_q_values = model.predict(next_states_batch, verbose=0)
                targets = rewards_batch + gamma * np.max(next_q_values, axis=1)

                target_f = model.predict(states_batch, verbose=0)
                for i in range(batch_size):
                    target_f[i][actions_batch[i]] = targets[i]

                model.fit(states_batch, target_f, epochs=1, verbose=0, batch_size=batch_size)

        # Log progress
        if (episode + 1) % log_interval == 0 or episode == episodes - 1:
            elapsed_time = time.time() - start_time
            avg_time_per_episode = elapsed_time / (episode + 1)
            remaining_time = avg_time_per_episode * (episodes - episode - 1)
            print(f"Episode {episode + 1}/{episodes} - Elapsed Time: {elapsed_time:.2f}s - Estimated Remaining Time: {remaining_time:.2f}s")

    total_success_rate = 0
    allocations = []
    for task in tasks:
        state = np.array([[len(task)]], dtype=np.float32)
        best_action = np.argmax(model.predict(state, verbose=0))
        task_success_rate = aggregate_success(task, robots[best_action])
        total_success_rate += task_success_rate
        allocations.append((task, robots[best_action], task_success_rate))

    average_success_rate = total_success_rate / len(tasks)
    return average_success_rate, allocations


In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam

# Define the DQN model
def build_dqn_model(input_size, output_size):
    """
    Build a Deep Q-Network model with the specified input and output sizes.

    Args:
        input_size (int): The number of input features.
        output_size (int): The number of output Q-values (actions).

    Returns:
        model: A compiled Keras Sequential model.
    """
    model = Sequential([
        tf.keras.layers.Input(shape=(input_size,)),  # Input layer
        Dense(40, activation='relu'),                # First hidden layer with 40 nodes
        Dense(40, activation='relu'),                # Second hidden layer with 40 nodes
        Dense(output_size, activation='linear')      # Output layer generating Q-values
    ])
    # Compile the model
    model.compile(optimizer=Adam(learning_rate=0.001), loss='mse')
    return model

# Example usage
input_size = 10  # Example input size (number of features)
output_size = 5  # Example output size (number of actions)
model = build_dqn_model(input_size, output_size)

# Display the model summary
model.summary()


In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
import random
import gym

# Load the annotated actions CSV
file_path = 'annotated_actions_new.csv'  # Replace with the actual path to the CSV
actions_df = pd.read_csv(file_path)
actions_df['features'] = actions_df['features'].fillna('').astype(str)

# Extract the features column as a list of lists for subtasks
subtasks = actions_df['features'].apply(lambda x: x.split(', ') if x else []).tolist()

# Define success matrix
success_matrix = {
    'careful': {'light': 0.9, 'middle': 0.7, 'heavy': 0.5},
    'dexterous': {'light': 0.8, 'middle': 0.6, 'heavy': 0.4},
    'heavy': {'light': 0.5, 'middle': 0.7, 'heavy': 0.9}
}

robots = ['light', 'middle', 'heavy']

def aggregate_success(features, robot_type):
    """
    Compute the aggregate success rate for a robot handling a subtask with multiple features.
    """
    return sum(success_matrix[feature][robot_type] for feature in features) / len(features) if features else 0

# ------------------ Q-Learning ------------------

# def q_learning_allocation(tasks, robots, alpha=0.1, gamma=0.9, epsilon=0.1, episodes=1000):
#     states = range(len(tasks))
#     actions = range(len(robots))
#     q_table = np.zeros((len(states), len(actions)))

#     for _ in range(episodes):
#         for state in states:
#             if random.uniform(0, 1) < epsilon:
#                 action = random.choice(actions)
#             else:
#                 action = np.argmax(q_table[state])

#             reward = aggregate_success(tasks[state], robots[action])
#             next_state = (state + 1) % len(states)
#             q_table[state, action] += alpha * (reward + gamma * np.max(q_table[next_state]) - q_table[state, action])

#     total_success_rate = 0
#     allocations = []
#     for state in states:
#         best_action = np.argmax(q_table[state])
#         task_success_rate = aggregate_success(tasks[state], robots[best_action])
#         total_success_rate += task_success_rate
#         allocations.append((state, robots[best_action], task_success_rate))

#     average_success_rate = total_success_rate / len(tasks)
#     return average_success_rate, allocations

# ------------------ DQN ------------------

# def build_dqn_model(input_size, output_size):
#     model = Sequential([
#         Dense(24, input_dim=input_size, activation='relu'),
#         Dense(24, activation='relu'),
#         Dense(output_size, activation='linear')
#     ])
#     model.compile(optimizer=Adam(learning_rate=0.001), loss='mse')
#     return model

# def dqn_allocation(tasks, robots, episodes=10):
#     state_size = len(tasks[0])
#     action_size = len(robots)
#     model = build_dqn_model(state_size, action_size)

#     gamma = 0.9
#     epsilon = 0.1
#     batch_size = 32
#     memory = []

#     for episode in range(episodes):
#         for task in tasks:
#             state = np.array([len(task)])  # Simplified state representation
#             if random.uniform(0, 1) < epsilon:
#                 action = random.choice(range(action_size))
#             else:
#                 action = np.argmax(model.predict(state, verbose=0))

#             reward = aggregate_success(task, robots[action])
#             next_state = np.array([len(task)])  # Simplified next state
#             memory.append((state, action, reward, next_state))

#             if len(memory) > batch_size:
#                 minibatch = random.sample(memory, batch_size)
#                 for s, a, r, ns in minibatch:
#                     target = r + gamma * np.max(model.predict(ns, verbose=0))
#                     target_f = model.predict(s, verbose=0)
#                     target_f[0][a] = target
#                     model.fit(s, target_f, epochs=1, verbose=0)

#     total_success_rate = 0
#     allocations = []
#     for task in tasks:
#         state = np.array([len(task)])
#         best_action = np.argmax(model.predict(state, verbose=0))
#         task_success_rate = aggregate_success(task, robots[best_action])
#         total_success_rate += task_success_rate
#         allocations.append((task, robots[best_action], task_success_rate))

#     average_success_rate = total_success_rate / len(tasks)
#     return average_success_rate, allocations

# ------------------ Execute and Compare ------------------

# Run Q-Learning Allocation
q_learning_result, q_learning_allocations = q_learning_allocation(subtasks, robots)
print(f"Q-Learning Success Rate: {q_learning_result:.2f}")

# Run DQN Allocation
dqn_result, dqn_allocations = dqn_allocation(subtasks, robots)
print(f"DQN Success Rate: {dqn_result:.2f}")

# Visualization
methods = ['Q-Learning', 'DQN']
success_rates = [q_learning_result, dqn_result]

plt.bar(methods, success_rates, color=['green', 'orange'])
plt.title("Comparison of Success Rates for Q-Learning and DQN")
plt.ylabel("Average Success Rate")
plt.ylim(0, 1)
plt.show()

Episode 1/10 - Elapsed Time: 0.02s - Estimated Remaining Time: 0.17s
Episode 2/10 - Elapsed Time: 0.04s - Estimated Remaining Time: 0.16s
Episode 3/10 - Elapsed Time: 0.06s - Estimated Remaining Time: 0.14s
Episode 4/10 - Elapsed Time: 0.08s - Estimated Remaining Time: 0.13s
Episode 5/10 - Elapsed Time: 0.11s - Estimated Remaining Time: 0.11s
Episode 6/10 - Elapsed Time: 0.12s - Estimated Remaining Time: 0.08s
Episode 7/10 - Elapsed Time: 0.15s - Estimated Remaining Time: 0.06s
Episode 8/10 - Elapsed Time: 0.17s - Estimated Remaining Time: 0.04s
Episode 9/10 - Elapsed Time: 0.19s - Estimated Remaining Time: 0.02s
Episode 10/10 - Elapsed Time: 0.21s - Estimated Remaining Time: 0.00s
Q-Learning Success Rate: 0.71
Episode 1/10 - Elapsed Time: 271.02s - Estimated Remaining Time: 2439.16s
Episode 2/10 - Elapsed Time: 538.20s - Estimated Remaining Time: 2152.80s


In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Placeholder function to simulate Q-Learning and DQN performance
def simulate_performance_q_learning(param):
    # Simulate some performance variation for Q-Learning
    return np.sin(param) * 0.1 + 0.7 + np.random.uniform(-0.02, 0.02)

def simulate_performance_dqn(param):
    # Simulate some performance variation for DQN
    return np.cos(param) * 0.1 + 0.75 + np.random.uniform(-0.02, 0.02)

# Function to plot comparison graph
def plot_comparison(x_values, q_learning_values, dqn_values, title, xlabel):
    plt.figure(figsize=(10, 6))
    plt.plot(x_values, q_learning_values, marker='o', label='Q-Learning', color='green')
    plt.plot(x_values, dqn_values, marker='s', label='DQN', color='orange')
    plt.title(title)
    plt.xlabel(xlabel)
    plt.ylabel('Average Success Rate')
    plt.legend()
    plt.grid(True)
    plt.show()

# ----------------- 1. Learning Rate (α) Variations -----------------
learning_rates = np.linspace(0.01, 0.2, 10)
q_learning_lr = [simulate_performance_q_learning(alpha) for alpha in learning_rates]
dqn_lr = [simulate_performance_dqn(alpha) for alpha in learning_rates]

plot_comparison(learning_rates, q_learning_lr, dqn_lr, 'Performance vs Learning Rate (α)', 'Learning Rate (α)')

# ----------------- 2. Discount Factor (γ) Variations -----------------
discount_factors = np.linspace(0.8, 0.99, 10)
q_learning_gamma = [simulate_performance_q_learning(gamma) for gamma in discount_factors]
dqn_gamma = [simulate_performance_dqn(gamma) for gamma in discount_factors]

plot_comparison(discount_factors, q_learning_gamma, dqn_gamma, 'Performance vs Discount Factor (γ)', 'Discount Factor (γ)')

# ----------------- 3. Number of Episodes -----------------
episodes = np.arange(500, 5001, 500)
q_learning_episodes = [simulate_performance_q_learning(ep) for ep in episodes]
dqn_episodes = [simulate_performance_dqn(ep) for ep in episodes]

plot_comparison(episodes, q_learning_episodes, dqn_episodes, 'Performance vs Number of Episodes', 'Number of Episodes')

# ----------------- 4. Batch Size -----------------
batch_sizes = np.array([16, 32, 64, 128, 256])
q_learning_batch = [simulate_performance_q_learning(bs) for bs in batch_sizes]
dqn_batch = [simulate_performance_dqn(bs) for bs in batch_sizes]

plot_comparison(batch_sizes, q_learning_batch, dqn_batch, 'Performance vs Batch Size', 'Batch Size')

# ----------------- 5. Exploration Rate (ε) Decay -----------------
epsilons = np.linspace(0.99, 0.9, 10)
q_learning_epsilon = [simulate_performance_q_learning(eps) for eps in epsilons]
dqn_epsilon = [simulate_performance_dqn(eps) for eps in epsilons]

plot_comparison(epsilons, q_learning_epsilon, dqn_epsilon, 'Performance vs Exploration Rate Decay (ε)', 'Exploration Rate (ε)')

# ----------------- 6. Task Complexity -----------------
task_complexity = np.array([1, 2, 3, 4, 5])  # Number of features per task
q_learning_complexity = [simulate_performance_q_learning(tc) for tc in task_complexity]
dqn_complexity = [simulate_performance_dqn(tc) for tc in task_complexity]

plot_comparison(task_complexity, q_learning_complexity, dqn_complexity, 'Performance vs Task Complexity', 'Number of Features per Task')

# ----------------- 7. Reward Scaling -----------------
reward_scaling = np.array([0.5, 1.0, 1.5, 2.0])
q_learning_reward = [simulate_performance_q_learning(rs) for rs in reward_scaling]
dqn_reward = [simulate_performance_dqn(rs) for rs in reward_scaling]

plot_comparison(reward_scaling, q_learning_reward, dqn_reward, 'Performance vs Reward Scaling', 'Reward Scaling Factor')

# ----------------- 8. Robot Capability Variations -----------------
robot_capability_scenarios = ['Light Dominant', 'Middle Balanced', 'Heavy Dominant']
indices = np.arange(len(robot_capability_scenarios))
q_learning_capability = [simulate_performance_q_learning(i) for i in indices]
dqn_capability = [simulate_performance_dqn(i) for i in indices]

plt.figure(figsize=(10, 6))
plt.bar(indices - 0.2, q_learning_capability, width=0.4, label='Q-Learning', color='green')
plt.bar(indices + 0.2, dqn_capability, width=0.4, label='DQN', color='orange')
plt.xticks(indices, robot_capability_scenarios)
plt.title('Performance vs Robot Capability Variations')
plt.xlabel('Robot Capability Scenario')
plt.ylabel('Average Success Rate')
plt.legend()
plt.grid(True)
plt.show()

# ----------------- 9. Noise and Uncertainty -----------------
noise_levels = np.linspace(0.01, 0.1, 10)
q_learning_noise = [simulate_performance_q_learning(nl) for nl in noise_levels]
dqn_noise = [simulate_performance_dqn(nl) for nl in noise_levels]

plot_comparison(noise_levels, q_learning_noise, dqn_noise, 'Performance vs Noise Level', 'Noise Standard Deviation')

# ----------------- 10. Training Time per Episode -----------------
training_times = np.linspace(0.1, 1.0, 10)
q_learning_time = [simulate_performance_q_learning(tt) for tt in training_times]
dqn_time = [simulate_performance_dqn(tt) for tt in training_times]

plot_comparison(training_times, q_learning_time, dqn_time, 'Performance vs Training Time per Episode', 'Training Time (seconds)')


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


KeyboardInterrupt: 

In [None]:
import pandas as pd
import numpy as np
import random
import matplotlib.pyplot as plt
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from collections import deque

# ------------------ Load Task Data ------------------
file_path = 'annotated_actions.csv'  # Ensure this file exists
actions_df = pd.read_csv(file_path)
actions_df['features'] = actions_df['features'].fillna('').astype(str)
subtasks = actions_df['features'].apply(lambda x: x.split(', ') if x else []).tolist()

# ------------------ Define Success Matrix and Robots ------------------
success_matrix = {
    'careful': {'light': 0.9, 'middle': 0.7, 'heavy': 0.5},
    'dexterous': {'light': 0.8, 'middle': 0.6, 'heavy': 0.4},
    'heavy': {'light': 0.5, 'middle': 0.7, 'heavy': 0.9}
}
robots = ['light', 'middle', 'heavy']

def aggregate_success(features, robot_type):
    return sum(success_matrix[feature][robot_type] for feature in features) / len(features) if features else 0

# ------------------ Q-Learning ------------------
def q_learning_allocation(tasks, robots, alpha=0.1, gamma=0.9, epsilon=0.1, episodes=100):
    states = range(len(tasks))
    actions = range(len(robots))
    q_table = np.zeros((len(states), len(actions)))

    for _ in range(episodes):
        for state in states:
            if random.uniform(0, 1) < epsilon:
                action = random.choice(actions)
            else:
                action = np.argmax(q_table[state])

            reward = aggregate_success(tasks[state], robots[action])
            next_state = (state + 1) % len(states)
            q_table[state, action] += alpha * (reward + gamma * np.max(q_table[next_state]) - q_table[state, action])

    total_success_rate = 0
    for state in states:
        best_action = np.argmax(q_table[state])
        task_success_rate = aggregate_success(tasks[state], robots[best_action])
        total_success_rate += task_success_rate

    return total_success_rate / len(tasks)

# ------------------ DQN ------------------
def build_dqn_model(input_size, output_size):
    model = Sequential([
        Dense(24, input_shape=(input_size,), activation='relu'),
        Dense(24, activation='relu'),
        Dense(output_size, activation='linear')
    ])
    model.compile(optimizer=Adam(learning_rate=0.001), loss='mse')
    return model

def dqn_allocation(tasks, robots, alpha=0.1, gamma=0.9, epsilon=0.1, episodes=100, batch_size=32):
    # Here, alpha is not directly used in the default DQN (we use a fixed learning rate in the optimizer),
    # but we include it for consistency if you want to modify the optimizer learning rate dynamically.
    state_size = 1
    action_size = len(robots)
    model = build_dqn_model(state_size, action_size)
    memory = deque(maxlen=2000)

    # If you want to adjust learning rate based on alpha:
    # model.optimizer.learning_rate = alpha

    for episode in range(episodes):
        for task in tasks:
            state = np.array([[len(task)]], dtype=np.float32)
            if random.uniform(0, 1) < epsilon:
                action = random.choice(range(action_size))
            else:
                q_values = model.predict(state, verbose=0)
                action = np.argmax(q_values[0])

            reward = aggregate_success(task, robots[action])
            next_state = np.array([[len(task)]], dtype=np.float32)
            memory.append((state, action, reward, next_state))

            if len(memory) > batch_size:
                minibatch = random.sample(memory, batch_size)
                states = np.vstack([s for s, _, _, _ in minibatch])
                targets = model.predict(states, verbose=0)

                # Update targets
                for i, (s, a, r, ns) in enumerate(minibatch):
                    t = targets[i]
                    t[a] = r + gamma * np.max(model.predict(ns, verbose=0))

                model.fit(states, targets, epochs=1, verbose=0)

    total_success_rate = 0
    for task in tasks:
        state = np.array([[len(task)]], dtype=np.float32)
        q_values = model.predict(state, verbose=0)
        best_action = np.argmax(q_values[0])
        task_success_rate = aggregate_success(task, robots[best_action])
        total_success_rate += task_success_rate

    return total_success_rate / len(tasks)

# ------------------ Plotting Helper ------------------
def plot_comparison(x_values, q_learning_values, dqn_values, title, xlabel):
    plt.figure(figsize=(10, 6))
    plt.plot(x_values, q_learning_values, marker='o', label='Q-Learning', color='green')
    plt.plot(x_values, dqn_values, marker='s', label='DQN', color='orange')
    plt.title(title)
    plt.xlabel(xlabel)
    plt.ylabel('Average Success Rate')
    plt.legend()
    plt.grid(True)
    plt.show()

# ------------------ Parameter Variation Simulations ------------------
# We define separate simulation functions for each parameter variation to set them correctly.

# 1. Learning Rate (α) Variations
def simulate_performance_q_learning_alpha(alpha):
    return q_learning_allocation(subtasks, robots, alpha=alpha, episodes=100) # fix episodes for fair comparison

def simulate_performance_dqn_alpha(alpha):
    # Using alpha to hypothetically adjust learning rate of DQN
    # One way is to set episodes fixed:
    return dqn_allocation(subtasks, robots, alpha=alpha, episodes=100, batch_size=32)

learning_rates = np.linspace(0.01, 0.2, 10)
q_learning_lr = [simulate_performance_q_learning_alpha(a) for a in learning_rates]
dqn_lr = [simulate_performance_dqn_alpha(a) for a in learning_rates]

plot_comparison(learning_rates, q_learning_lr, dqn_lr, 'Performance vs Learning Rate (α)', 'Learning Rate (α)')

# 2. Discount Factor (γ) Variations
def simulate_performance_q_learning_gamma(gamma):
    return q_learning_allocation(subtasks, robots, gamma=gamma, episodes=100)

def simulate_performance_dqn_gamma(gamma):
    return dqn_allocation(subtasks, robots, gamma=gamma, episodes=100, batch_size=32)

discount_factors = np.linspace(0.8, 0.99, 10)
q_learning_gamma = [simulate_performance_q_learning_gamma(g) for g in discount_factors]
dqn_gamma = [simulate_performance_dqn_gamma(g) for g in discount_factors]

plot_comparison(discount_factors, q_learning_gamma, dqn_gamma, 'Performance vs Discount Factor (γ)', 'Discount Factor (γ)')

# 3. Number of Episodes
def simulate_performance_q_learning_episodes(ep):
    return q_learning_allocation(subtasks, robots, episodes=ep)

def simulate_performance_dqn_episodes(ep):
    return dqn_allocation(subtasks, robots, episodes=ep, batch_size=32)

episodes = np.arange(500, 5001, 500)
q_learning_episodes = [simulate_performance_q_learning_episodes(ep) for ep in episodes]
dqn_episodes = [simulate_performance_dqn_episodes(ep) for ep in episodes]

plot_comparison(episodes, q_learning_episodes, dqn_episodes, 'Performance vs Number of Episodes', 'Number of Episodes')

# 4. Batch Size (applies mainly to DQN)
def simulate_performance_q_learning_batch(bs):
    # Q-Learning does not use batch size, so we just ignore it
    return q_learning_allocation(subtasks, robots, episodes=100)

def simulate_performance_dqn_batch(bs):
    return dqn_allocation(subtasks, robots, episodes=100, batch_size=bs)

batch_sizes = np.array([16, 32, 64, 128, 256])
q_learning_batch = [simulate_performance_q_learning_batch(bs) for bs in batch_sizes]
dqn_batch = [simulate_performance_dqn_batch(bs) for bs in batch_sizes]

plot_comparison(batch_sizes, q_learning_batch, dqn_batch, 'Performance vs Batch Size', 'Batch Size')

# 5. Exploration Rate (ε) Decay
# We'll treat param as epsilon here
def simulate_performance_q_learning_epsilon(eps):
    return q_learning_allocation(subtasks, robots, epsilon=eps, episodes=100)

def simulate_performance_dqn_epsilon(eps):
    return dqn_allocation(subtasks, robots, epsilon=eps, episodes=100, batch_size=32)

epsilons = np.linspace(0.99, 0.9, 10)
q_learning_epsilon = [simulate_performance_q_learning_epsilon(eps) for eps in epsilons]
dqn_epsilon = [simulate_performance_dqn_epsilon(eps) for eps in epsilons]

plot_comparison(epsilons, q_learning_epsilon, dqn_epsilon, 'Performance vs Exploration Rate Decay (ε)', 'Exploration Rate (ε)')

# 6. Task Complexity (Assuming complexity = number of features per task)
# Here we must simulate tasks with different complexities. We'll create dummy tasks for this test.
def create_tasks_with_complexity(num_features, n_tasks=20):
    features_list = list(success_matrix.keys())  # ['careful','dexterous','heavy']
    tasks = []
    for _ in range(n_tasks):
        task_features = random.choices(features_list, k=num_features)
        tasks.append(task_features)
    return tasks

def simulate_performance_q_learning_complexity(num_features):
    complex_tasks = create_tasks_with_complexity(num_features)
    return q_learning_allocation(complex_tasks, robots, episodes=100)

def simulate_performance_dqn_complexity(num_features):
    complex_tasks = create_tasks_with_complexity(num_features)
    return dqn_allocation(complex_tasks, robots, episodes=100, batch_size=32)

task_complexity = np.array([1, 2, 3, 4, 5])
q_learning_complexity = [simulate_performance_q_learning_complexity(tc) for tc in task_complexity]
dqn_complexity = [simulate_performance_dqn_complexity(tc) for tc in task_complexity]

plot_comparison(task_complexity, q_learning_complexity, dqn_complexity, 'Performance vs Task Complexity', 'Number of Features per Task')

# 7. Reward Scaling (We'll just multiply the reward by this factor)
def q_learning_allocation_scaled(tasks, robots, alpha=0.1, gamma=0.9, epsilon=0.1, episodes=100, scale=1.0):
    states = range(len(tasks))
    actions = range(len(robots))
    q_table = np.zeros((len(states), len(actions)))

    for _ in range(episodes):
        for state in states:
            if random.uniform(0, 1) < epsilon:
                action = random.choice(actions)
            else:
                action = np.argmax(q_table[state])
            reward = aggregate_success(tasks[state], robots[action]) * scale
            next_state = (state + 1) % len(states)
            q_table[state, action] += alpha * (reward + gamma * np.max(q_table[next_state]) - q_table[state, action])

    total_success_rate = 0
    for state in states:
        best_action = np.argmax(q_table[state])
        task_success_rate = aggregate_success(tasks[state], robots[best_action])
        total_success_rate += task_success_rate
    return total_success_rate / len(tasks)

def dqn_allocation_scaled(tasks, robots, alpha=0.1, gamma=0.9, epsilon=0.1, episodes=100, batch_size=32, scale=1.0):
    state_size = 1
    action_size = len(robots)
    model = build_dqn_model(state_size, action_size)
    memory = deque(maxlen=2000)

    for episode in range(episodes):
        for task in tasks:
            state = np.array([[len(task)]], dtype=np.float32)
            if random.uniform(0, 1) < epsilon:
                action = random.choice(range(action_size))
            else:
                q_values = model.predict(state, verbose=0)
                action = np.argmax(q_values[0])

            reward = aggregate_success(task, robots[action]) * scale
            next_state = np.array([[len(task)]], dtype=np.float32)
            memory.append((state, action, reward, next_state))

            if len(memory) > batch_size:
                minibatch = random.sample(memory, batch_size)
                states = np.vstack([s for s, _, _, _ in minibatch])
                targets = model.predict(states, verbose=0)
                for i, (s, a, r, ns) in enumerate(minibatch):
                    t = targets[i]
                    t[a] = r + gamma * np.max(model.predict(ns, verbose=0))
                model.fit(states, targets, epochs=1, verbose=0)

    total_success_rate = 0
    for task in tasks:
        state = np.array([[len(task)]], dtype=np.float32)
        q_values = model.predict(state, verbose=0)
        best_action = np.argmax(q_values[0])
        task_success_rate = aggregate_success(task, robots[best_action])
        total_success_rate += task_success_rate

    return total_success_rate / len(tasks)

def simulate_performance_q_learning_reward(scale):
    return q_learning_allocation_scaled(subtasks, robots, scale=scale, episodes=100)

def simulate_performance_dqn_reward(scale):
    return dqn_allocation_scaled(subtasks, robots, scale=scale, episodes=100, batch_size=32)

reward_scaling = np.array([0.5, 1.0, 1.5, 2.0])
q_learning_reward = [simulate_performance_q_learning_reward(rs) for rs in reward_scaling]
dqn_reward = [simulate_performance_dqn_reward(rs) for rs in reward_scaling]

plot_comparison(reward_scaling, q_learning_reward, dqn_reward, 'Performance vs Reward Scaling', 'Reward Scaling Factor')

# 8. Robot Capability Variations (This is a scenario-based test, you would adjust success_matrix or tasks)
# We'll just simulate different robot sets or success matrices. For simplicity, we will just run with indices as placeholders.
def simulate_performance_q_learning_capability(idx):
    # In a real scenario, you'd modify robots or success_matrix here based on idx.
    return q_learning_allocation(subtasks, robots, episodes=100)

def simulate_performance_dqn_capability(idx):
    return dqn_allocation(subtasks, robots, episodes=100, batch_size=32)

robot_capability_scenarios = ['Light Dominant', 'Middle Balanced', 'Heavy Dominant']
indices = np.arange(len(robot_capability_scenarios))
q_learning_capability = [simulate_performance_q_learning_capability(i) for i in indices]
dqn_capability = [simulate_performance_dqn_capability(i) for i in indices]

plt.figure(figsize=(10, 6))
plt.bar(indices - 0.2, q_learning_capability, width=0.4, label='Q-Learning', color='green')
plt.bar(indices + 0.2, dqn_capability, width=0.4, label='DQN', color='orange')
plt.xticks(indices, robot_capability_scenarios)
plt.title('Performance vs Robot Capability Variations')
plt.xlabel('Robot Capability Scenario')
plt.ylabel('Average Success Rate')
plt.legend()
plt.grid(True)
plt.show()

# 9. Noise and Uncertainty
# We can simulate noise by perturbing rewards.
def simulate_performance_q_learning_noise(noise_level):
    noisy_tasks = []
    for task in subtasks:
        # Add noise to success probabilities
        # We'll do this by adjusting the success probability calculation dynamically
        # Instead of rewriting q_learning_allocation, just create tasks with the same features.
        # We'll handle noise inside the allocation temporarily.
        return q_learning_allocation_noisy(subtasks, robots, noise_level=noise_level, episodes=100)

def simulate_performance_dqn_noise(noise_level):
    return dqn_allocation_noisy(subtasks, robots, noise_level=noise_level, episodes=100, batch_size=32)

def q_learning_allocation_noisy(tasks, robots, alpha=0.1, gamma=0.9, epsilon=0.1, episodes=100, noise_level=0.0):
    states = range(len(tasks))
    actions = range(len(robots))
    q_table = np.zeros((len(states), len(actions)))

    for _ in range(episodes):
        for state in states:
            if random.uniform(0, 1) < epsilon:
                action = random.choice(actions)
            else:
                action = np.argmax(q_table[state])
            base_reward = aggregate_success(tasks[state], robots[action])
            # Add noise
            reward = base_reward + np.random.normal(0, noise_level)
            reward = max(0, min(1, reward))  # clip between 0 and 1
            next_state = (state + 1) % len(states)
            q_table[state, action] += alpha * (reward + gamma * np.max(q_table[next_state]) - q_table[state, action])

    total_success_rate = 0
    for state in states:
        best_action = np.argmax(q_table[state])
        task_success_rate = aggregate_success(tasks[state], robots[best_action])
        total_success_rate += task_success_rate
    return total_success_rate / len(tasks)

def dqn_allocation_noisy(tasks, robots, alpha=0.1, gamma=0.9, epsilon=0.1, episodes=100, batch_size=32, noise_level=0.0):
    state_size = 1
    action_size = len(robots)
    model = build_dqn_model(state_size, action_size)
    memory = deque(maxlen=2000)

    for episode in range(episodes):
        for task in tasks:
            state = np.array([[len(task)]], dtype=np.float32)
            if random.uniform(0, 1) < epsilon:
                action = random.choice(range(action_size))
            else:
                q_values = model.predict(state, verbose=0)
                action = np.argmax(q_values[0])

            base_reward = aggregate_success(task, robots[action])
            reward = base_reward + np.random.normal(0, noise_level)
            reward = max(0, min(1, reward))
            next_state = np.array([[len(task)]], dtype=np.float32)
            memory.append((state, action, reward, next_state))

            if len(memory) > batch_size:
                minibatch = random.sample(memory, batch_size)
                states = np.vstack([s for s, _, _, _ in minibatch])
                targets = model.predict(states, verbose=0)
                for i, (s, a, r, ns) in enumerate(minibatch):
                    t = targets[i]
                    t[a] = r + gamma * np.max(model.predict(ns, verbose=0))
                model.fit(states, targets, epochs=1, verbose=0)

    total_success_rate = 0
    for task in tasks:
        state = np.array([[len(task)]], dtype=np.float32)
        q_values = model.predict(state, verbose=0)
        best_action = np.argmax(q_values[0])
        task_success_rate = aggregate_success(task, robots[best_action])
        total_success_rate += task_success_rate
    return total_success_rate / len(tasks)

noise_levels = np.linspace(0.01, 0.1, 10)
q_learning_noise = [simulate_performance_q_learning_noise(nl) for nl in noise_levels]
dqn_noise = [simulate_performance_dqn_noise(nl) for nl in noise_levels]

plot_comparison(noise_levels, q_learning_noise, dqn_noise, 'Performance vs Noise Level', 'Noise Standard Deviation')

# 10. Training Time per Episode (This is tricky to simulate directly. We'll assume "param" here is episodes.)
# We'll just treat param as episodes, since actual training time depends on hardware.
# In a real scenario, you'd measure actual time. Here we just vary episodes.
def simulate_performance_q_learning_time(ep):
    return q_learning_allocation(subtasks, robots, episodes=int(ep))

def simulate_performance_dqn_time(ep):
    return dqn_allocation(subtasks, robots, episodes=int(ep), batch_size=32)

training_times = np.linspace(0.1, 1.0, 10)
q_learning_time = [simulate_performance_q_learning_time(int(tt*1000)) for tt in training_times]  # scale to episodes
dqn_time = [simulate_performance_dqn_time(int(tt*1000)) for tt in training_times]

plot_comparison(training_times, q_learning_time, dqn_time, 'Performance vs Training Time per Episode', 'Training Time (scaled as episodes)')


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)
