In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import os
import yaml
import glob

file_paths = ['../downloaded_files/run-2025-09-17-172852/rooms-generated-p1/']
def find_metadata_files():
    """Find all rosbag2/metadata.yaml files in the workspace"""
    metadata_files = []
    
    for base_dir in file_paths:
        if os.path.exists(base_dir):
            pattern = os.path.join(base_dir, '**/rosbag2/metadata.yaml')
            files = glob.glob(pattern, recursive=True)
            metadata_files.extend(files)
            #print(f"Found {len(files)} files in {base_dir}")
    
    #print(f"Total metadata files found: {len(metadata_files)}")
    return metadata_files

# Find all metadata files
metadata_files = find_metadata_files()

In [None]:

from analysis_common import get_tf_poses
import itertools
import analysis_common.map_visualizer as map_visualizer
from analysis_common import get_variant_data

map_file = None
if file_paths:
    map_path = os.path.join(file_paths[0], 'maps')
    # Find a single .yaml file in the map_paths directory
    map_files = [os.path.join(map_path, f) for f in os.listdir(map_path) if f.endswith('.yaml')]

    if len(map_files) > 1:
        print(f"Multiple map files found, using the first one: {map_files[0]}")
    map_file = map_files[0] if map_files else None

    variant_data = get_variant_data(file_paths[0])
    robot_start = None
    robot_end = None

    if variant_data and "start_pose" in variant_data:
        start_pose = variant_data['start_pose'] if "start_pose" in variant_data else None
        goal_poses = variant_data['goal_poses'] if "goal_poses" in variant_data else None

        # Store for use in map visualization
        if start_pose and start_pose.get('position'):
            robot_start = {
                'x': start_pose['position']['x'],
                'y': start_pose['position']['y'], 
                'yaw': start_pose.get('orientation', {}).get('yaw', 0)
            }

        if goal_poses and goal_poses[0].get('position'):
            robot_end = {
                'x': goal_poses[0]['position']['x'],
                'y': goal_poses[0]['position']['y'],
                'yaw': goal_poses[0].get('orientation', {}).get('yaw', 0)
            }

    
last_poses = []
if map_file:
    # Create map visualizer instance
    viz = map_visualizer.MapVisualizer()
    
    # Load the map
    if viz.load_map(map_file):
        # Create the figure
        fig, ax = viz.create_figure(figsize=(8, 6))
        
        if robot_start:
            viz.add_robot_pose(robot_start['x'], robot_start['y'], theta=robot_start['yaw'], color='green', size=0.3)

        if robot_end:
            viz.add_robot_pose(robot_end['x'], robot_end['y'], theta=robot_end['yaw'], color='red', size=0.3)

        colors = itertools.cycle(['blue', 'orange', 'green', 'red', 'purple', 'brown', 'pink', 'gray', 'olive', 'cyan'])
        for file_path in file_paths:
            poses = get_tf_poses(os.path.join(file_path, 'rosbag2'), 'nav2_turtlebot4_base_link_gt')
            if poses:
                robot_path = [(pose['x'], pose['y']) for pose in poses]
                
                last_poses.append(poses[-1])
                # Draw the path on the map
                # Use a different color for each file_path
                color = next(colors)
                viz.draw_path(robot_path, color=color, linewidth=1, label='Robot Path')
                
        plt.tight_layout()
        plt.show()
        
        # # Print map information
        # print("\\nMap Information:")
        # bounds = viz.get_map_bounds()
        # print(f"Map bounds: x=[{bounds[0]:.2f}, {bounds[1]:.2f}], y=[{bounds[2]:.2f}, {bounds[3]:.2f}]")
    else:
        print("Failed to load map")
        


In [None]:
def extract_duration_data(metadata_files):
    """Extract duration data from metadata.yaml files"""
    duration_data = []
    
    for file_path in metadata_files:
        try:
            with open(file_path, 'r') as f:
                data = yaml.safe_load(f)
            
            # Extract duration in nanoseconds
            duration_ns = data['rosbag2_bagfile_information']['duration']['nanoseconds']
            
            # Create a readable test identifier from the file path
            # Extract the test name from the path structure
            path_parts = file_path.split('/')
            
            # Find the test name
            rosbag_idx = path_parts.index('rosbag2')
            test_name = path_parts[rosbag_idx - 1]
            
            duration_data.append({
                'test_path': file_path,
                'test_name': test_name,
                'duration_ns': duration_ns,
                'duration_seconds': duration_ns / 1e9,
                'duration_minutes': duration_ns / 1e9 / 60
            })
            
        except Exception as e:
            print(f"Error reading {file_path}: {e}")
    
    return pd.DataFrame(duration_data)

# Extract duration data from all files
#print("Extracting duration data...")
df = extract_duration_data(metadata_files)
#print(f"Successfully processed {len(df)} test files")
#print(f"Duration range: {df['duration_seconds'].min():.1f}s to {df['duration_seconds'].max():.1f}s")

# Display first few entries
#df.head()

In [None]:
# Create comprehensive visualizations to identify duration outliers
import matplotlib.patches as patches
from matplotlib.colors import ListedColormap

# Set up the plotting style
plt.style.use('default')

# Create a figure with multiple subplots for comprehensive analysis
fig = plt.figure(figsize=(20, 15))

# 1. Histogram of test durations
plt.subplot(3, 3, 1)
plt.hist(df['duration_seconds'], bins=50, alpha=0.7, edgecolor='black', color='skyblue')
plt.xlabel('Duration (seconds)')
plt.ylabel('Number of Tests')
plt.title('Distribution of Test Durations')
plt.grid(True, alpha=0.3)

# Add statistical information
mean_duration = df['duration_seconds'].mean()
median_duration = df['duration_seconds'].median()
plt.axvline(mean_duration, color='red', linestyle='--', label=f'Mean: {mean_duration:.1f}s')
plt.axvline(median_duration, color='green', linestyle='--', label=f'Median: {median_duration:.1f}s')
plt.legend()

# 2. Box plot to identify outliers
plt.subplot(3, 3, 2)
box_plot = plt.boxplot(df['duration_seconds'], patch_artist=True)
box_plot['boxes'][0].set_facecolor('lightblue')
plt.ylabel('Duration (seconds)')
plt.title('Box Plot - Duration Outliers')
plt.grid(True, alpha=0.3)

# 5. Identify and highlight extreme outliers
plt.subplot(3, 3, 3)
Q1 = df['duration_seconds'].quantile(0.25)
Q3 = df['duration_seconds'].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR

outliers = df[(df['duration_seconds'] < lower_bound) | (df['duration_seconds'] > upper_bound)]
normal_tests = df[(df['duration_seconds'] >= lower_bound) & (df['duration_seconds'] <= upper_bound)]

# Create combined dataframe for plotting with test names
combined_df = pd.concat([normal_tests, outliers]).reset_index(drop=True)
colors = ['blue'] * len(normal_tests) + ['red'] * len(outliers)
sizes = [20] * len(normal_tests) + [50] * len(outliers)

plt.scatter(range(len(combined_df)), combined_df['duration_seconds'], 
           alpha=0.7, c=colors, s=sizes)

# Set test names as x-axis labels
plt.xticks(range(len(combined_df)), combined_df['test_name'], rotation=45, ha='right')
plt.xlabel('Test Name')
plt.ylabel('Duration (seconds)')
plt.title('Tests with Outlier Durations Highlighted')

# Create custom legend
from matplotlib.lines import Line2D
legend_elements = [Line2D([0], [0], marker='o', color='w', markerfacecolor='blue', 
                         markersize=8, alpha=0.7, label=f'Normal ({len(normal_tests)} tests)'),
                  Line2D([0], [0], marker='o', color='w', markerfacecolor='red', 
                         markersize=10, alpha=0.7, label=f'Outliers ({len(outliers)} tests)')]
plt.legend(handles=legend_elements)
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print(f"\nDuration Statistics:")
print(f"Total tests analyzed: {len(df)}")
print(f"Mean duration: {mean_duration:.1f} seconds ({mean_duration/60:.1f} minutes)")
print(f"Median duration: {median_duration:.1f} seconds ({median_duration/60:.1f} minutes)")
print(f"Standard deviation: {df['duration_seconds'].std():.1f} seconds")
print(f"Min duration: {df['duration_seconds'].min():.1f} seconds")
print(f"Max duration: {df['duration_seconds'].max():.1f} seconds")
print(f"\nOutlier Analysis:")
print(f"Tests with duration < {lower_bound:.1f}s: {len(df[df['duration_seconds'] < lower_bound])}")
print(f"Tests with duration > {upper_bound:.1f}s: {len(df[df['duration_seconds'] > upper_bound])}")
print(f"Total outliers: {len(outliers)} ({len(outliers)/len(df)*100:.1f}% of all tests)")

In [None]:
# Show detailed information about outlier tests
print("\n" + "="*80)
print("DETAILED ANALYSIS OF OUTLIER TESTS")
print("="*80)

if len(outliers) > 0:
    print(f"\nTESTS WITH SIGNIFICANTLY DIFFERENT DURATIONS ({len(outliers)} tests):")
    print("-" * 80)
    
    # Sort outliers by duration for better readability
    outliers_sorted = outliers.sort_values('duration_seconds', ascending=False)
    
    for idx, (_, row) in enumerate(outliers_sorted.iterrows(), 1):
        print(f"\n{idx}. Test: {row['test_name']}")
        print(f"   Duration: {row['duration_seconds']:.1f} seconds ({row['duration_minutes']:.2f} minutes)")
        print(f"   Path: {row['test_path']}")
        
        # Classify the outlier type
        if row['duration_seconds'] > upper_bound:
            deviation = ((row['duration_seconds'] - mean_duration) / mean_duration) * 100
            print(f"   Type: LONG DURATION ({deviation:+.1f}% vs mean)")
        else:
            deviation = ((row['duration_seconds'] - mean_duration) / mean_duration) * 100
            print(f"   Type: SHORT DURATION ({deviation:+.1f}% vs mean)")
            
    # Summary statistics for outliers
    print(f"\nOUTLIER SUMMARY:")
    print(f"- Outlier duration range: {outliers['duration_seconds'].min():.1f}s to {outliers['duration_seconds'].max():.1f}s")
    print(f"- Normal test range (IQR): {Q1:.1f}s to {Q3:.1f}s")
    print(f"- Outlier threshold: < {lower_bound:.1f}s or > {upper_bound:.1f}s")
    
else:
    print("\nNo significant outliers detected using IQR method.")

# Additional analysis: Show tests grouped by duration ranges for better understanding
print(f"\n" + "="*80)
print("DURATION DISTRIBUTION ANALYSIS")
print("="*80)

duration_ranges = pd.cut(df['duration_seconds'], 
                        bins=[0, 5, 10, 20, 30, 45, 60, 90, 120, 200], 
                        labels=['0-5s', '5-10s', '10-20s', '20-30s', '30-45s', 
                               '45-60s', '60-90s', '90-120s', '120s+'])

range_analysis = df.groupby(duration_ranges, observed=True).agg({
    'duration_seconds': ['count', 'mean', 'min', 'max'],
    'test_name': lambda x: list(x)[:3]  # Show first 3 test names as examples
}).round(1)

range_analysis.columns = ['Count', 'Avg_Duration', 'Min_Duration', 'Max_Duration', 'Example_Tests']

print("\nTest count and characteristics by duration range:")
for range_name, row in range_analysis.iterrows():
    if row['Count'] > 0:
        print(f"\n{range_name}: {int(row['Count'])} tests")
        print(f"  - Average: {row['Avg_Duration']:.1f}s")
        print(f"  - Range: {row['Min_Duration']:.1f}s - {row['Max_Duration']:.1f}s")
        example_tests = row['Example_Tests'][:2]  # Show first 2 examples
        if example_tests:
            print(f"  - Examples: {', '.join(example_tests)}")

In [None]:
# Create a focused visualization specifically highlighting outliers
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 6))

# Left plot: All tests with outliers highlighted in different colors
normal_mask = (df['duration_seconds'] >= lower_bound) & (df['duration_seconds'] <= upper_bound)
short_outliers_mask = df['duration_seconds'] < lower_bound
long_outliers_mask = df['duration_seconds'] > upper_bound

ax1.scatter(range(len(df[normal_mask])), df[normal_mask]['duration_seconds'], 
           alpha=0.6, color='blue', label=f'Normal tests ({normal_mask.sum()})', s=20)

if short_outliers_mask.any():
    short_indices = np.where(short_outliers_mask)[0]
    ax1.scatter(short_indices, df[short_outliers_mask]['duration_seconds'], 
               alpha=0.9, color='orange', label=f'Short outliers ({short_outliers_mask.sum()})', s=60, marker='^')

if long_outliers_mask.any():
    long_indices = np.where(long_outliers_mask)[0]
    ax1.scatter(long_indices, df[long_outliers_mask]['duration_seconds'], 
               alpha=0.9, color='red', label=f'Long outliers ({long_outliers_mask.sum()})', s=60, marker='v')

ax1.axhline(y=mean_duration, color='green', linestyle='--', alpha=0.7, label=f'Mean ({mean_duration:.1f}s)')
ax1.axhline(y=upper_bound, color='red', linestyle=':', alpha=0.7, label=f'Upper threshold ({upper_bound:.1f}s)')

ax1.set_xlabel('Test Index')
ax1.set_ylabel('Duration (seconds)')
ax1.set_title('All Tests - Outliers Highlighted')
ax1.legend()
ax1.grid(True, alpha=0.3)

# Right plot: Zoomed view of outliers only (if any exist)
if len(outliers) > 0:
    outliers_sorted = outliers.sort_values('duration_seconds')
    bars = ax2.barh(range(len(outliers_sorted)), outliers_sorted['duration_seconds'], 
                    color=['red' if x > upper_bound else 'orange' for x in outliers_sorted['duration_seconds']])
    
    # Create shorter labels for better readability
    labels = []
    for test_name in outliers_sorted['test_name']:
        # Extract key parts of test name
        parts = test_name.split('/')
        if len(parts) >= 2:
            # Take the last meaningful part
            short_name = parts[-1] if len(parts[-1]) > 5 else '/'.join(parts[-2:])
        else:
            short_name = test_name
        
        # Further shorten if still too long
        if len(short_name) > 25:
            short_name = short_name[:25] + '...'
        labels.append(short_name)
    
    ax2.set_yticks(range(len(outliers_sorted)))
    ax2.set_yticklabels(labels, fontsize=9)
    ax2.set_xlabel('Duration (seconds)')
    ax2.set_title('Outlier Tests Detail View')
    ax2.grid(True, alpha=0.3, axis='x')
    
    # Add duration values on bars
    for i, (bar, duration) in enumerate(zip(bars, outliers_sorted['duration_seconds'])):
        ax2.text(bar.get_width() + 1, bar.get_y() + bar.get_height()/2, 
                f'{duration:.1f}s', va='center', fontsize=9)
else:
    ax2.text(0.5, 0.5, 'No outliers detected\nAll tests have similar durations', 
             ha='center', va='center', transform=ax2.transAxes, fontsize=12)
    ax2.set_title('No Outliers Detected')

plt.tight_layout()
plt.show()

# Final summary for user
print("\n" + "="*60)
print("SUMMARY FOR IDENTIFYING TESTS WITH DIFFERENT DURATIONS")
print("="*60)
print(f"✓ Analyzed {len(df)} test files from rosbag2/metadata.yaml")
print(f"✓ Found {len(outliers)} tests with significantly different durations")
print(f"✓ Normal duration range: {Q1:.1f}s - {Q3:.1f}s (interquartile range)")
print(f"✓ Most tests cluster around {median_duration:.1f}s (median duration)")

if len(outliers) > 0:
    print(f"\n⚠️  TESTS TO INVESTIGATE:")
    for i, (_, row) in enumerate(outliers.sort_values('duration_seconds', ascending=False).iterrows(), 1):
        duration_type = "LONG" if row['duration_seconds'] > upper_bound else "SHORT"
        print(f"   {i}. {row['test_name']} ({row['duration_seconds']:.1f}s - {duration_type})")
else:
    print(f"✓ All tests have consistent durations - no outliers detected!")

print(f"\n💡 Use the visualizations above to identify patterns and investigate unusual test durations.")