In [None]:
# This script simulates an interactive Imitation Learning (IL) activity for a simple robotics "Pick-and-Place" task using Behavioral Cloning.
#
# The goal is to demonstrate:
# 1. Expert Demonstration (Data Collection)
# 2. Policy Training (Supervised Learning)
# 3. Policy Execution and Failure due to Covariate Shift (Testing)
# 4. Interactive Correction and Retraining
import numpy as np
import random

# --- 1. Environment and Robot Setup ---

# Robot State (Observation) representation:
# We simplify the task into a single dimension: distance to target.
# State S = [distance_to_target]
# Action A = [movement_step]

TARGET_DISTANCE = 0.0 # The task is complete when distance is close to zero.

def get_robot_state(current_position):
    """Simulates getting the robot's current state (distance to target)."""
    return np.array([current_position - TARGET_DISTANCE])

def execute_action(current_position, action):
    """Simulates the robot moving based on the action."""
    # The new position is the current position adjusted by the action (movement step)
    new_position = current_position + action[0]
    return new_position


# --- 4. Testing & Covariate Shift Demonstration ---

def test_policy(policy, initial_position, max_steps=25): # Max steps remains 25
    """Executes the task and records the robot's performance."""
    print(f"\n[TEST START] Initial Position (Distance): {initial_position:.2f}")
    current_position = initial_position
    
    for step in range(max_steps):
        current_state = get_robot_state(current_position)
        
        # Check for task completion
        if abs(current_state[0]) < 0.1:
            print(f"--> [SUCCESS] Task complete in {step} steps. Final position: {current_position:.2f}")
            return True, current_position

        # Policy selects the action
        action = policy(current_state)
        
        # Execute the action (with a slight chance of real-world drift)
        current_position = execute_action(current_position, action)
        
        print(f"Step {step+1}: State (Dist)={current_state[0]:.2f} -> Action={action[0]:.2f} -> New Pos={current_position:.2f}")

        # Check for catastrophic failure (e.g., movement outside sensible bounds)
        if current_position < -2.0 or current_position > 20.0:
            print(f"*** [FAILURE] Robot crashed/overshot after reaching an unseen state. Policy unable to recover. ***")
            return False, current_position
            
    print(f"--- [FAILURE] Max steps reached. Task incomplete. Final position: {current_position:.2f}")
    return False, current_position

In [None]:
# --- 2. Demonstration (Data Collection - The Expert Phase) ---

# The Expert (the user) provides demonstrations for a successful run.
print("--- Step 1: Expert Demonstration (Data Collection) ---")
print("The Expert (user) provides successful movement trajectories to reach the target (10.0 -> 0.0).")
# 

# The Expert dataset D = [(State, Action), ...]
# Adjusted with more fine-grained steps near the end for better convergence
expert_trajectory = [
    (get_robot_state(10.0), np.array([-2.0])), # At dist 10, move -2.0
    (get_robot_state(8.0), np.array([-1.5])),  # At dist 8, move -1.5
    (get_robot_state(6.5), np.array([-1.0])),  # At dist 6.5, move -1.0
    (get_robot_state(5.5), np.array([-1.0])),  # At dist 5.5, move -1.0
    (get_robot_state(4.5), np.array([-0.5])),  # At dist 4.5, move -0.5
    (get_robot_state(4.0), np.array([-0.5])),  # At dist 4.0, move -0.5
    (get_robot_state(3.5), np.array([-0.5])),  # At dist 3.5, move -0.5
    (get_robot_state(3.0), np.array([-0.5])),  # At dist 3.0, move -0.5
    (get_robot_state(2.5), np.array([-0.5])),  # At dist 2.5, move -0.5
    (get_robot_state(2.0), np.array([-0.4])),  # At dist 2.0, move -0.4
    (get_robot_state(1.6), np.array([-0.3])),  # At dist 1.6, move -0.3
    (get_robot_state(1.3), np.array([-0.2])),  # At dist 1.3, move -0.2 
    (get_robot_state(1.0), np.array([-0.15])), # New fine step 1
    (get_robot_state(0.8), np.array([-0.1])),  # New fine step 2
    (get_robot_state(0.5), np.array([-0.05])), # New fine step 3 (Final positioning)
]

print(f"Expert Dataset size: {len(expert_trajectory)} observations.")
print("Example observation (State, Action):", expert_trajectory[0])
print("\n" + "="*50 + "\n")

In [None]:
# --- 3. Training (Behavioral Cloning Policy) ---

def train_policy_bc(dataset):
    """
    Simulates training a Behavioral Cloning (BC) policy using Nearest Neighbor.
    Crucially, it simulates catastrophic failure (extrapolation) if the state
    is too far from the training data (Covariate Shift).
    """
    states = [item[0][0] for item in dataset]
    actions = [item[1] for item in dataset]

    def policy(current_state):
        current_distance = current_state[0]
        min_distance = float('inf')
        best_action = np.array([0.0])

        for i, expert_dist in enumerate(states):
            # Find the state in the dataset that is closest to the current state
            diff = abs(expert_dist - current_distance)
            if diff < min_distance:
                min_distance = diff
                best_action = actions[i]
        
        # --- COVARIATE SHIFT SIMULATION LOGIC ---
        # If the nearest training example is more than 1.0 unit away, 
        # the policy is "extrapolating" too far, and it simulates a catastrophic, wrong action.
        if min_distance > 1.0:
            print(f"!!! COVARIATE SHIFT WARNING: State {current_distance:.2f} is too far from expert data. Extrapolating to a BAD action.")
            # Wildly incorrect extrapolation, moving away from the target
            return np.array([5.0]) 

        # Introduce a small amount of "noise" that all real robots have
        # NOISE REDUCED for better convergence near the target
        noise = random.uniform(-0.02, 0.02)
        best_action[0] += noise 
        
        return best_action

    return policy

# Train the policy using the expert data
trained_policy = train_policy_bc(expert_trajectory)
print("--- Step 2: Policy Training (Behavioral Cloning) ---")
# Range updated based on new expert data
print("Policy trained on the expert's demonstrations. It works well only in the range [0.5, 10.0].")
print("\n" + "="*50 + "\n")

In [None]:
print("--- Step 3: Execution (Testing the Robot) ---")
print("--- Test 3A: In-Distribution (Easy Test) ---")
# Start at a position very close to one in the training data (e.g., 9.8)
test_policy(trained_policy, initial_position=9.8)

print("\n" + "-"*30 + "\n")

print("--- Test 3B: Out-of-Distribution (The Covariate Shift Test) ---")
# Start at a position far outside the expert's initial range (e.g., 11.5).
# This is a state the policy has never seen, triggering the failure mechanism.
success, failure_pos = test_policy(trained_policy, initial_position=11.5)

print("\n" + "="*50 + "\n")

In [None]:
# --- 5. Interactive Correction and Retraining ---

if not success:
    print("--- Step 4: Expert Intervention & Retraining ---")
    print("The robot failed due to Covariate Shift! The crucial error was made at the initial out-of-distribution state.")
    print("The human Expert must now intervene and teach the correct action for that failure state.")
    
    # Target the initial out-of-distribution state (11.50)
    corrective_position = 11.50 
    failure_state = get_robot_state(corrective_position)
    
    # The expert decides a large, safe step back is needed to get back into the known range (near 8.0)
    corrective_action = np.array([-3.5]) 

    print(f"EXPERT ACTION: State (Dist)={failure_state[0]:.2f} -> Corrective Action={corrective_action[0]:.2f}")
    
    # Add the new, corrected observation to the dataset
    expert_trajectory.append((failure_state, corrective_action))
    
    # Retrain the policy with the new data
    retrained_policy = train_policy_bc(expert_trajectory)
    print("\nPolicy retrained with one crucial corrective demonstration for the boundary state (11.50).")

    print("\n--- Test 4: Re-Execution with Retrained Policy ---")
    # Test again from the problematic starting position (11.5)
    success_retrained, _ = test_policy(retrained_policy, initial_position=11.5)

    if success_retrained:
        print("\n[CONCLUSION]: The robot SUCCEEDED! By correcting the boundary state, the policy knew how to recover and re-entered the known distribution.")
    else:
        print("\n[CONCLUSION]: The robot FAILED even after correction, indicating more demonstrations might be needed.")

# Final check to ensure the file ends with the required marker