In [1]:
# disable pycache
import sys
sys.dont_write_bytecode = True
sys.path.append('../')
from datetime import datetime
from pudu.apis.raw.gas_api import *
from pudu.apis.raw.pudu_api import *
from pudu.app.main import App
from pudu.rds import RDSTable, RDSDatabase
from pudu.rds.utils import *
from pudu.notifications.change_detector import *
from pudu.reporting import *

In [2]:
db = RDSDatabase(
    connection_config="credentials.yaml",
    database_name="foxx_irvine_office"
)

In [3]:
import pandas as pd
from typing import Dict, List, Optional
from sqlalchemy import text

def calculate_robot_health_score(
    database_connection,
    robot_brand: str = "pudu",
    weights: Optional[Dict[str, float]] = None,
    scoring_components: Optional[List[str]] = None
) -> pd.DataFrame:
    """
    Calculate robot health score with flexible components and weights

    Args:
        robot_brand: Robot brand to filter (e.g., "pudu")
        weights: Dictionary of component weights (e.g., {"availability": 0.6, "task_success": 0.25, "efficiency": 0.15, "battery_soh": 0.0})
        scoring_components: List of components to include in scoring

    Returns:
        DataFrame with robot health scores and ratings
    """

    # Default weights if not provided
    if weights is None:
        weights = {
            "availability": 0.5,
            "task_success": 0.2,
            "efficiency": 0.2,
            "battery_soh": 0.1
        }

    # Default components if not provided
    if scoring_components is None:
        scoring_components = ["availability", "task_success", "efficiency", "battery_soh"]

    # Validate weights - only include weights for selected components
    filtered_weights = {k: v for k, v in weights.items() if k in scoring_components}
    total_weight = sum(filtered_weights.values())
    if abs(total_weight - 1.0) > 0.01 and filtered_weights:  # Allow small floating point error
        raise ValueError(f"Weights must sum to 1.0, but got {total_weight}")

    # Build SQL dynamically based on components and weights
    sql_parts = []

    # Base CTE to get Pudu robot_sn from management table
    sql_parts.append(f"""
    WITH pudu_robots AS (
        SELECT DISTINCT robot_sn
        FROM foxx_irvine_office.mnt_robots_management
        WHERE LOWER(robot_type) LIKE '%{robot_brand.lower()}%'
    )""")

    # Availability component
    if "availability" in scoring_components:
        sql_parts.append(f"""
    , robot_metrics AS (
        -- Availability Health
        SELECT
            roh.robot_sn,
            COUNT(CASE WHEN roh.status = 'online' THEN 1 END) * 100.0 / COUNT(*) as availability_score
        FROM foxx_irvine_office.mnt_robot_operation_history roh
        INNER JOIN pudu_robots pr ON roh.robot_sn = pr.robot_sn
        WHERE roh.timestamp_utc >= CURRENT_DATE - INTERVAL 7 DAY
        GROUP BY roh.robot_sn
    )""")

    # Task performance component - only include needed fields
    needs_task_metrics = "task_success" in scoring_components or "efficiency" in scoring_components
    if needs_task_metrics:
        # Build task_metrics with only the fields we need
        task_fields = []
        if "task_success" in scoring_components:
            task_fields.extend(["COUNT(*) as total_tasks", "COUNT(CASE WHEN rt.status IN ('Unknown', 'Manual', 'Task Ended', 'Not Started', 'In Progress') THEN 1 END) as successful_tasks"])
        if "efficiency" in scoring_components:
            task_fields.append("AVG(CASE WHEN rt.status IN ('Unknown', 'Manual', 'Task Ended', 'Not Started', 'In Progress') THEN rt.efficiency ELSE NULL END) as avg_efficiency")

        sql_parts.append(f"""
    , task_metrics AS (
        -- Task Performance Health
        SELECT
            rt.robot_sn,
            {', '.join(task_fields)}
        FROM foxx_irvine_office.mnt_robots_task rt
        INNER JOIN pudu_robots pr ON rt.robot_sn = pr.robot_sn
        WHERE rt.start_time >= CURRENT_DATE - INTERVAL 7 DAY
        GROUP BY rt.robot_sn
    )""")

    # Battery SOH component
    if "battery_soh" in scoring_components:
        sql_parts.append(f"""
    , battery_metrics AS (
        -- Battery Health
        SELECT
            robot_sn,
            CAST(REPLACE(REPLACE(battery_soh, '%', ''), '+', '') AS DECIMAL(5,2)) as soh_numeric
        FROM foxx_irvine_office.mnt_robot_operation_history
        WHERE timestamp_utc >= CURRENT_DATE - INTERVAL 7 DAY
          AND battery_soh IS NOT NULL
          AND battery_soh != ''
          AND robot_sn IN (SELECT robot_sn FROM pudu_robots)
        GROUP BY robot_sn
    )""")

    # Build SELECT clause
    select_parts = ["pr.robot_sn"]  # Use pr.robot_sn to ensure we get all Pudu robots
    score_calculations = []

    # Availability scoring
    if "availability" in scoring_components:
        select_parts.extend([
            "COALESCE(rm.availability_score, 0) as availability_score",
            f"""CASE
                WHEN COALESCE(rm.availability_score, 0) >= 95 THEN 'Excellent'
                WHEN COALESCE(rm.availability_score, 0) >= 85 THEN 'Good'
                WHEN COALESCE(rm.availability_score, 0) >= 60 THEN 'Fair'
                ELSE 'Poor'
            END as availability_rating"""
        ])
        score_calculations.append(f"(COALESCE(rm.availability_score, 0) * {filtered_weights['availability']})")

    # Task success scoring
    if "task_success" in scoring_components:
        select_parts.extend([
            """CASE
                WHEN COALESCE(tm.total_tasks, 0) > 0 THEN (COALESCE(tm.successful_tasks, 0) * 100.0 / tm.total_tasks)
                ELSE 0
            END as task_success_score""",
            """CASE
                WHEN COALESCE(tm.total_tasks, 0) > 0 AND (tm.successful_tasks * 100.0 / tm.total_tasks) >= 90 THEN 'Excellent'
                WHEN COALESCE(tm.total_tasks, 0) > 0 AND (tm.successful_tasks * 100.0 / tm.total_tasks) >= 80 THEN 'Good'
                WHEN COALESCE(tm.total_tasks, 0) > 0 AND (tm.successful_tasks * 100.0 / tm.total_tasks) >= 60 THEN 'Fair'
                ELSE 'Poor'
            END as task_success_rating"""
        ])
        score_calculations.append(f"(CASE WHEN COALESCE(tm.total_tasks, 0) > 0 THEN (COALESCE(tm.successful_tasks, 0) * 100.0 / tm.total_tasks) ELSE 0 END * {filtered_weights['task_success']})")

    # Efficiency scoring
    if "efficiency" in scoring_components:
        select_parts.extend([
            """CASE
                WHEN tm.avg_efficiency >= 700 THEN 100
                WHEN tm.avg_efficiency >= 600 THEN 95
                WHEN tm.avg_efficiency >= 500 THEN 85
                WHEN tm.avg_efficiency >= 400 THEN 75
                WHEN tm.avg_efficiency >= 300 THEN 60
                WHEN tm.avg_efficiency >= 200 THEN 50
                WHEN tm.avg_efficiency >= 100 THEN 30
                ELSE 10
            END as efficiency_score""",
            """CASE
                WHEN tm.avg_efficiency >= 700 THEN 'Excellent'
                WHEN tm.avg_efficiency >= 500 THEN 'Good'
                WHEN tm.avg_efficiency >= 400 THEN 'Fair'
                ELSE 'Poor'
            END as efficiency_rating"""
        ])
        score_calculations.append(f"""(CASE
            WHEN tm.avg_efficiency >= 700 THEN 100
            WHEN tm.avg_efficiency >= 600 THEN 95
            WHEN tm.avg_efficiency >= 500 THEN 85
            WHEN tm.avg_efficiency >= 400 THEN 75
            WHEN tm.avg_efficiency >= 300 THEN 60
            WHEN tm.avg_efficiency >= 200 THEN 50
            WHEN tm.avg_efficiency >= 100 THEN 30
            ELSE 10
        END * {filtered_weights['efficiency']})""")

    # Battery SOH scoring
    if "battery_soh" in scoring_components:
        select_parts.extend([
            "COALESCE(bm.soh_numeric, 0) as battery_soh_score",
            """CASE
                WHEN COALESCE(bm.soh_numeric, 0) >= 90 THEN 'Excellent'
                WHEN COALESCE(bm.soh_numeric, 0) >= 80 THEN 'Good'
                WHEN COALESCE(bm.soh_numeric, 0) >= 70 THEN 'Fair'
                ELSE 'Poor'
            END as battery_soh_rating"""
        ])
        score_calculations.append(f"(COALESCE(bm.soh_numeric, 0) * {filtered_weights['battery_soh']})")

    # Build FROM and JOIN clauses
    from_join_parts = ["FROM pudu_robots pr"]

    if "availability" in scoring_components:
        from_join_parts.append("LEFT JOIN robot_metrics rm ON pr.robot_sn = rm.robot_sn")

    if needs_task_metrics:
        from_join_parts.append("LEFT JOIN task_metrics tm ON pr.robot_sn = tm.robot_sn")

    if "battery_soh" in scoring_components:
        from_join_parts.append("LEFT JOIN battery_metrics bm ON pr.robot_sn = bm.robot_sn")

    # Final SQL assembly
    final_sql = "\n".join(sql_parts) + f"""

    SELECT
        {', '.join(select_parts)},

        -- Overall Health Score
        ROUND({' + '.join(score_calculations) if score_calculations else '0'}, 2) as overall_health_score,

        -- Overall Health Rating
        CASE
            WHEN ({' + '.join(score_calculations) if score_calculations else '0'}) >= 90 THEN 'Excellent'
            WHEN ({' + '.join(score_calculations) if score_calculations else '0'}) >= 80 THEN 'Good'
            WHEN ({' + '.join(score_calculations) if score_calculations else '0'}) >= 60 THEN 'Fair'
            ELSE 'Poor'
        END as overall_health_rating

    {chr(10).join(from_join_parts)}
    ORDER BY overall_health_score DESC;
    """
    # print("Generated SQL:")
    # print(final_sql)

    # Execute the query
    return database_connection.query_data_as_df(final_sql)

In [None]:
result1 = calculate_robot_health_score(
        database_connection=db,
        robot_brand="cc1",
        weights={"task_success": 1.0},
        scoring_components=["task_success"]
    )

  return pd.read_sql_query(query, connection)


In [None]:
result1.columns

Index(['robot_sn', 'task_success_score', 'task_success_rating',
       'overall_health_score', 'overall_health_rating'],
      dtype='object')

In [None]:
def comparison(database_connection, weight_combinations):
    """
    Ultra-simple version that just returns a pivot table
    """
    all_scores = {}

    for i, weights in enumerate(weight_combinations):
        df = calculate_robot_health_score(
            database_connection=database_connection,
            robot_brand="cc1",
            weights=weights,
            scoring_components=list(weights.keys())
        )

        strategy_name = f"S{i+1}"
        # Create a series with robot_sn as index and score as values
        scores = df.set_index('robot_sn')['overall_health_score']
        all_scores[strategy_name] = scores

    # Combine into a dataframe
    result_df = pd.DataFrame(all_scores)

    # Add weights as the first row for reference
    weights_row = pd.DataFrame({f"S{i+1}": str(w) for i, w in enumerate(weight_combinations)}, index=['Weights'])
    result_df = pd.concat([weights_row, result_df])

    return result_df

# Usage:
weight_combinations = [
    {"availability": 0.7, "task_success": 0.1, "efficiency": 0.1, "battery_soh": 0.1},
    {"availability": 0.6, "task_success": 0.1, "efficiency": 0.2, "battery_soh": 0.1},
    {"availability": 0.6, "task_success": 0.2, "efficiency": 0.1, "battery_soh": 0.1},
    {"availability": 0.5, "task_success": 0.2, "efficiency": 0.2, "battery_soh": 0.1},
    {"availability": 0.3, "task_success": 0.3, "efficiency": 0.3, "battery_soh": 0.1},
]

results = comparison(db, weight_combinations)

  return pd.read_sql_query(query, connection)
  return pd.read_sql_query(query, connection)
  return pd.read_sql_query(query, connection)
  return pd.read_sql_query(query, connection)


In [5]:
results

Unnamed: 0,S1,S2,S3,S4,S5
Weights,"{'availability': 0.7, 'task_success': 0.1, 'ef...","{'availability': 0.6, 'task_success': 0.1, 'ef...","{'availability': 0.6, 'task_success': 0.2, 'ef...","{'availability': 0.5, 'task_success': 0.2, 'ef...","{'availability': 0.3, 'task_success': 0.3, 'ef..."
811135422060216,97.4,94.9,97.4,94.9,92.4
811135422060228,93.97,91.47,90.63,88.13,82.3
8110H4802050005,1.0,2.0,1.0,2.0,3.0
8110H4802050006,1.0,2.0,1.0,2.0,3.0
8110H4B08050040,1.0,2.0,1.0,2.0,3.0
811135422060217,1.0,2.0,1.0,2.0,3.0


In [4]:
import pandas as pd
from typing import List

def analyze_area_per_charge_simple(database_connection, robot_sns: List[str] = None) -> pd.DataFrame:
    """
    Analyze approximate actual_area per charge using battery_usage from task table

    Args:
        database_connection: Database connection object
        robot_sns: List of robot serial numbers to analyze (None for all robots)

    Returns:
        DataFrame with robot_sn, mode, task_name, battery_usage, actual_area, and approximate_actual_area
    """

    # Build robot filter condition
    robot_filter = ""
    if robot_sns:
        sn_list = ",".join([f"'{sn}'" for sn in robot_sns])
        robot_filter = f"AND robot_sn IN ({sn_list})"

    # SQL query to get task data with battery usage
    tasks_sql = f"""
    SELECT
        robot_sn,
        mode,
        task_name,
        start_time,
        end_time,
        battery_usage,
        actual_area,
        progress
    FROM foxx_irvine_office.mnt_robots_task
    WHERE status IN ('Task Ended')
      AND battery_usage IS NOT NULL
      AND battery_usage > 0
      AND actual_area IS NOT NULL
      AND actual_area > 0
      {robot_filter}
      AND start_time >= CURRENT_DATE - INTERVAL 7 DAY
    ORDER BY robot_sn, start_time
    """

    # Get tasks data
    tasks_df = database_connection.query_data_as_df(tasks_sql)

    if tasks_df.empty:
        print("No tasks found with battery usage data")
        return pd.DataFrame()

    # Calculate approximate actual area per charge
    # If battery_usage is 50% and actual_area is 102, then approximate_actual_area = 102 * (100/50) = 204
    tasks_df = tasks_df.copy()
    tasks_df['approximate_actual_area'] = tasks_df['actual_area'] * (100 / tasks_df['battery_usage'])

    print(f"Found {len(tasks_df)} tasks with battery usage data")
    print(f"Robots analyzed: {tasks_df['robot_sn'].nunique()}")

    return tasks_df

def generate_summary_report(tasks_df: pd.DataFrame) -> pd.DataFrame:
    """
    Generate summary statistics per robot and mode
    """
    if tasks_df.empty:
        return pd.DataFrame()

    goals = {'Sweeping': 3000, 'Scrubbing': 2000}

    # Correct way: use .map() to apply the goal based on mode
    tasks_df['goal'] = tasks_df['mode'].map(goals)
    tasks_df['meet_goal'] = tasks_df['approximate_actual_area'] >= tasks_df['goal']

    summary = tasks_df.groupby(['robot_sn', 'mode']).agg({
        'approximate_actual_area': ['min', 'max', 'mean'],
        'meet_goal': ['sum', 'count', 'mean']
    }).round(2)

    # Flatten column names
    summary.columns = ['_'.join(col).strip() for col in summary.columns.values]
    summary = summary.reset_index()

    # Calculate meet_goal_percentage
    summary['meet_goal_percentage'] = (summary['meet_goal_sum'] / summary['meet_goal_count'] * 100).round(2)

    return summary

# Usage example:
def run_analysis(database_connection, robot_sns: List[str] = None):
    """
    Run complete analysis and return results
    """
    print("Starting area per charge analysis using battery_usage...")

    # Get task data with calculations
    results_df = analyze_area_per_charge_simple(database_connection, robot_sns)

    if results_df.empty:
        print("No data found")
        return None, None

    # Generate summary
    summary_df = generate_summary_report(results_df)

    # Display some statistics
    print(f"\n=== ANALYSIS SUMMARY ===")
    print(f"Total tasks analyzed: {len(results_df)}")
    print(f"Total robots: {results_df['robot_sn'].nunique()}")
    print(f"Total modes: {results_df['mode'].nunique()}")
    print(f"Average battery usage per task: {results_df['battery_usage'].mean():.1f}%")
    print(f"Average approximate area per charge: {results_df['approximate_actual_area'].mean():.1f}")

    # Show top tasks by efficiency
    print(f"\n=== TOP 5 MOST EFFICIENT TASKS ===")
    top_tasks = results_df.nlargest(5, 'approximate_actual_area')[
        ['robot_sn', 'mode', 'task_name', 'battery_usage', 'actual_area', 'approximate_actual_area']
    ]
    print(top_tasks.to_string(index=False))

    return results_df, summary_df[['robot_sn', 'mode', 'approximate_actual_area_min', 'approximate_actual_area_max', 'approximate_actual_area_mean', 'meet_goal_sum', 'meet_goal_percentage']]

In [7]:
results, summary = run_analysis(db, ['811135422060228', '811135422060216'])

Starting area per charge analysis using battery_usage...


  return pd.read_sql_query(query, connection)


Found 21 tasks with battery usage data
Robots analyzed: 2

=== ANALYSIS SUMMARY ===
Total tasks analyzed: 21
Total robots: 2
Total modes: 2
Average battery usage per task: 39.0%
Average approximate area per charge: 1421.1

=== TOP 5 MOST EFFICIENT TASKS ===
       robot_sn      mode            task_name  battery_usage  actual_area  approximate_actual_area
811135422060216 Scrubbing Bld_205_Grd_Task ALL           36.0       789.49              2193.027778
811135422060216 Scrubbing Bld_205_Grd_Task ALL           37.0       789.49              2133.756757
811135422060216 Scrubbing Bld_205_Grd_Task ALL           37.0       789.49              2133.756757
811135422060216 Scrubbing Bld_205_Grd_Task ALL           41.0       789.49              1925.585366
811135422060216 Scrubbing  ground_elevator_new            7.0       102.32              1461.714286


In [8]:
results

Unnamed: 0,robot_sn,mode,task_name,start_time,end_time,battery_usage,actual_area,progress,approximate_actual_area,goal,meet_goal
0,811135422060216,Scrubbing,Bld_205_Grd_Task ALL,2025-10-07 10:00:24,2025-10-07 11:45:11,36.0,789.49,100.0,2193.027778,2000,True
1,811135422060216,Scrubbing,ground_elevator_new,2025-10-07 12:00:38,2025-10-07 12:18:20,7.0,102.32,100.0,1461.714286,2000,False
2,811135422060216,Scrubbing,Bld_205_Grd_Task dock heavy,2025-10-08 13:47:03,2025-10-08 14:10:37,8.0,105.16,100.0,1314.5,2000,False
3,811135422060216,Scrubbing,ground_elevator_new,2025-10-08 14:32:39,2025-10-08 14:52:33,8.0,102.32,100.0,1279.0,2000,False
4,811135422060216,Scrubbing,ground_elevator_new,2025-10-08 15:38:22,2025-10-08 15:56:56,7.0,102.32,100.0,1461.714286,2000,False
5,811135422060216,Scrubbing,Bld_205_Grd_Task ALL,2025-10-09 10:56:08,2025-10-09 12:41:28,37.0,789.49,100.0,2133.756757,2000,True
6,811135422060216,Scrubbing,ground_elevator_new,2025-10-09 14:44:04,2025-10-09 15:03:58,8.0,102.32,100.0,1279.0,2000,False
7,811135422060216,Scrubbing,Bld_205_Grd_Task ALL,2025-10-10 09:39:20,2025-10-10 11:24:02,37.0,789.49,100.0,2133.756757,2000,True
8,811135422060216,Scrubbing,ground_elevator_new,2025-10-10 12:06:00,2025-10-10 12:24:58,8.0,102.32,100.0,1279.0,2000,False
9,811135422060216,Scrubbing,Bld_205_Grd_Task ALL,2025-10-13 09:37:24,2025-10-13 11:22:51,41.0,789.49,100.0,1925.585366,2000,False


In [9]:
summary

Unnamed: 0,robot_sn,mode,approximate_actual_area_min,approximate_actual_area_max,approximate_actual_area_mean,meet_goal_sum,meet_goal_percentage
0,811135422060216,Scrubbing,1279.0,2193.03,1612.73,3,27.27
1,811135422060228,Sweeping,890.68,1436.82,1210.37,0,0.0


In [5]:
import pandas as pd
from typing import Dict, List, Optional
from sqlalchemy import text

def calculate_robot_health_score(
    database_connection,
    robot_brand: str = "pudu",
    weights: Optional[Dict[str, float]] = None,
    scoring_components: Optional[List[str]] = None
) -> pd.DataFrame:
    """
    Calculate robot health score with flexible components and weights

    Args:
        robot_brand: Robot brand to filter (e.g., "pudu")
        weights: Dictionary of component weights (e.g., {"availability": 0.6, "task_success": 0.25, "efficiency": 0.15, "battery_soh": 0.0, "mode_performance": 0.0})
        scoring_components: List of components to include in scoring

    Returns:
        DataFrame with robot health scores and ratings
    """

    # Default weights if not provided
    if weights is None:
        weights = {
            "availability": 0.4,
            "task_success": 0.2,
            "efficiency": 0.2,
            "mode_performance": 0.1,
            "battery_soh": 0.1
        }

    # Default components if not provided
    if scoring_components is None:
        scoring_components = ["availability", "task_success", "efficiency", "mode_performance", "battery_soh"]

    # Validate weights - only include weights for selected components
    filtered_weights = {k: v for k, v in weights.items() if k in scoring_components}
    total_weight = sum(filtered_weights.values())
    if abs(total_weight - 1.0) > 0.01 and filtered_weights:  # Allow small floating point error
        raise ValueError(f"Weights must sum to 1.0, but got {total_weight}")

    # Build SQL dynamically based on components and weights
    sql_parts = []

    # Base CTE to get Pudu robot_sn from management table
    sql_parts.append(f"""
    WITH pudu_robots AS (
        SELECT DISTINCT robot_sn
        FROM foxx_irvine_office.mnt_robots_management
        WHERE LOWER(robot_type) LIKE '%{robot_brand.lower()}%'
    )""")

    # Mode performance component
    if "mode_performance" in scoring_components:
        sql_parts.append(f"""
    , area_analysis AS (
        -- Get task data with battery usage for area analysis
        SELECT
            robot_sn,
            mode,
            task_name,
            battery_usage,
            actual_area,
            actual_area * (100 / battery_usage) as approximate_actual_area
        FROM foxx_irvine_office.mnt_robots_task
        WHERE status IN ('Task Ended')
          AND battery_usage IS NOT NULL
          AND battery_usage > 0
          AND actual_area IS NOT NULL
          AND actual_area > 0
          AND start_time >= '2025-10-10 04:00:00'
          AND end_time <= '2025-10-15 03:59:59'
          AND robot_sn IN (SELECT robot_sn FROM pudu_robots)
    ),
    mode_performance AS (
    -- Calculate mode performance scores based on max approximate area vs goals
    SELECT
        robot_sn,
        AVG(
            CASE
                WHEN mode = 'Sweeping' THEN
                    CASE
                        WHEN max_area >= 2700 THEN 100  -- 90% of 3000
                        WHEN max_area >= 2400 THEN 90   -- 80% of 3000
                        WHEN max_area >= 2100 THEN 80   -- 70% of 3000
                        WHEN max_area >= 1800 THEN 70   -- 60% of 3000
                        WHEN max_area >= 1500 THEN 60   -- 50% of 3000
                        WHEN max_area >= 1200 THEN 50   -- 40% of 3000
                        WHEN max_area >= 900 THEN 40    -- 30% of 3000
                        WHEN max_area >= 600 THEN 30    -- 20% of 3000
                        WHEN max_area >= 300 THEN 20    -- 10% of 3000
                        WHEN max_area > 0 THEN 10       -- >0% of 3000
                        ELSE 0
                    END
                WHEN mode = 'Scrubbing' THEN
                    CASE
                        WHEN max_area >= 1800 THEN 100  -- 90% of 2000
                        WHEN max_area >= 1600 THEN 90   -- 80% of 2000
                        WHEN max_area >= 1400 THEN 80   -- 70% of 2000
                        WHEN max_area >= 1200 THEN 70   -- 60% of 2000
                        WHEN max_area >= 1000 THEN 60   -- 50% of 2000
                        WHEN max_area >= 800 THEN 50    -- 40% of 2000
                        WHEN max_area >= 600 THEN 40    -- 30% of 2000
                        WHEN max_area >= 400 THEN 30    -- 20% of 2000
                        WHEN max_area >= 200 THEN 20    -- 10% of 2000
                        WHEN max_area > 0 THEN 10       -- >0% of 2000
                        ELSE 0
                    END
                ELSE 0
            END
        ) as mode_performance_score
    FROM (
        SELECT
            robot_sn,
            mode,
            MAX(approximate_actual_area) as max_area
        FROM area_analysis
        GROUP BY robot_sn, mode
    ) mode_max_areas
    GROUP BY robot_sn
   )""")

    # Availability component
    if "availability" in scoring_components:
        sql_parts.append(f"""
    , robot_metrics AS (
        -- Availability Health
        SELECT
            roh.robot_sn,
            COUNT(CASE WHEN roh.status = 'online' THEN 1 END) * 100.0 / COUNT(*) as availability_score
        FROM foxx_irvine_office.mnt_robot_operation_history roh
        INNER JOIN pudu_robots pr ON roh.robot_sn = pr.robot_sn
        WHERE roh.timestamp_utc > '2025-10-10 04:00:00'
          AND roh.timestamp_utc < '2025-10-15 03:59:59'
        GROUP BY roh.robot_sn
    )""")

    # Task performance component - only include needed fields
    needs_task_metrics = "task_success" in scoring_components or "efficiency" in scoring_components
    if needs_task_metrics:
        # Build task_metrics with only the fields we need
        task_fields = []
        if "task_success" in scoring_components:
            task_fields.extend(["COUNT(*) as total_tasks", "COUNT(CASE WHEN rt.status IN ('Unknown', 'Manual', 'Task Ended', 'Not Started', 'In Progress') THEN 1 END) as successful_tasks"])
        if "efficiency" in scoring_components:
            task_fields.append("AVG(CASE WHEN rt.status IN ('Unknown', 'Manual', 'Task Ended', 'Not Started', 'In Progress') THEN rt.efficiency ELSE NULL END) as avg_efficiency")

        sql_parts.append(f"""
    , task_metrics AS (
        -- Task Performance Health
        SELECT
            rt.robot_sn,
            {', '.join(task_fields)}
        FROM foxx_irvine_office.mnt_robots_task rt
        INNER JOIN pudu_robots pr ON rt.robot_sn = pr.robot_sn
        WHERE rt.start_time >= CURRENT_DATE - INTERVAL 7 DAY
        GROUP BY rt.robot_sn
    )""")

    # Battery SOH component
    if "battery_soh" in scoring_components:
        sql_parts.append(f"""
    , battery_metrics AS (
        -- Battery Health
        SELECT
            robot_sn,
            CAST(REPLACE(REPLACE(battery_soh, '%', ''), '+', '') AS DECIMAL(5,2)) as soh_numeric
        FROM foxx_irvine_office.mnt_robot_operation_history
        WHERE timestamp_utc >= CURRENT_DATE - INTERVAL 7 DAY
          AND battery_soh IS NOT NULL
          AND battery_soh != ''
          AND robot_sn IN (SELECT robot_sn FROM pudu_robots)
        GROUP BY robot_sn
    )""")

    # Build SELECT clause
    select_parts = ["pr.robot_sn"]  # Use pr.robot_sn to ensure we get all Pudu robots
    score_calculations = []

    # Mode performance scoring
    if "mode_performance" in scoring_components:
        select_parts.extend([
            "COALESCE(mp.mode_performance_score, 0) as mode_performance_score",
            """CASE
                WHEN COALESCE(mp.mode_performance_score, 0) >= 90 THEN 'Excellent'
                WHEN COALESCE(mp.mode_performance_score, 0) >= 80 THEN 'Good'
                WHEN COALESCE(mp.mode_performance_score, 0) >= 60 THEN 'Fair'
                ELSE 'Poor'
            END as mode_performance_rating"""
        ])
        score_calculations.append(f"(COALESCE(mp.mode_performance_score, 0) * {filtered_weights['mode_performance']})")

    # Availability scoring
    if "availability" in scoring_components:
        select_parts.extend([
            "COALESCE(rm.availability_score, 0) as availability_score",
            f"""CASE
                WHEN COALESCE(rm.availability_score, 0) >= 95 THEN 'Excellent'
                WHEN COALESCE(rm.availability_score, 0) >= 85 THEN 'Good'
                WHEN COALESCE(rm.availability_score, 0) >= 60 THEN 'Fair'
                ELSE 'Poor'
            END as availability_rating"""
        ])
        score_calculations.append(f"(COALESCE(rm.availability_score, 0) * {filtered_weights['availability']})")

    # Task success scoring
    if "task_success" in scoring_components:
        select_parts.extend([
            """CASE
                WHEN COALESCE(tm.total_tasks, 0) > 0 THEN (COALESCE(tm.successful_tasks, 0) * 100.0 / tm.total_tasks)
                ELSE 0
            END as task_success_score""",
            """CASE
                WHEN COALESCE(tm.total_tasks, 0) > 0 AND (tm.successful_tasks * 100.0 / tm.total_tasks) >= 90 THEN 'Excellent'
                WHEN COALESCE(tm.total_tasks, 0) > 0 AND (tm.successful_tasks * 100.0 / tm.total_tasks) >= 80 THEN 'Good'
                WHEN COALESCE(tm.total_tasks, 0) > 0 AND (tm.successful_tasks * 100.0 / tm.total_tasks) >= 60 THEN 'Fair'
                ELSE 'Poor'
            END as task_success_rating"""
        ])
        score_calculations.append(f"(CASE WHEN COALESCE(tm.total_tasks, 0) > 0 THEN (COALESCE(tm.successful_tasks, 0) * 100.0 / tm.total_tasks) ELSE 0 END * {filtered_weights['task_success']})")

    # Efficiency scoring
    if "efficiency" in scoring_components:
        select_parts.extend([
            """CASE
                WHEN tm.avg_efficiency >= 700 THEN 100
                WHEN tm.avg_efficiency >= 600 THEN 95
                WHEN tm.avg_efficiency >= 500 THEN 85
                WHEN tm.avg_efficiency >= 400 THEN 75
                WHEN tm.avg_efficiency >= 300 THEN 60
                WHEN tm.avg_efficiency >= 200 THEN 50
                WHEN tm.avg_efficiency >= 100 THEN 30
                ELSE 10
            END as efficiency_score""",
            """CASE
                WHEN tm.avg_efficiency >= 700 THEN 'Excellent'
                WHEN tm.avg_efficiency >= 500 THEN 'Good'
                WHEN tm.avg_efficiency >= 400 THEN 'Fair'
                ELSE 'Poor'
            END as efficiency_rating"""
        ])
        score_calculations.append(f"""(CASE
            WHEN tm.avg_efficiency >= 700 THEN 100
            WHEN tm.avg_efficiency >= 600 THEN 95
            WHEN tm.avg_efficiency >= 500 THEN 85
            WHEN tm.avg_efficiency >= 400 THEN 75
            WHEN tm.avg_efficiency >= 300 THEN 60
            WHEN tm.avg_efficiency >= 200 THEN 50
            WHEN tm.avg_efficiency >= 100 THEN 30
            ELSE 10
        END * {filtered_weights['efficiency']})""")

    # Battery SOH scoring
    if "battery_soh" in scoring_components:
        select_parts.extend([
            "COALESCE(bm.soh_numeric, 0) as battery_soh_score",
            """CASE
                WHEN COALESCE(bm.soh_numeric, 0) >= 90 THEN 'Excellent'
                WHEN COALESCE(bm.soh_numeric, 0) >= 80 THEN 'Good'
                WHEN COALESCE(bm.soh_numeric, 0) >= 70 THEN 'Fair'
                ELSE 'Poor'
            END as battery_soh_rating"""
        ])
        score_calculations.append(f"(COALESCE(bm.soh_numeric, 0) * {filtered_weights['battery_soh']})")

    # Build FROM and JOIN clauses
    from_join_parts = ["FROM pudu_robots pr"]

    if "mode_performance" in scoring_components:
        from_join_parts.append("LEFT JOIN mode_performance mp ON pr.robot_sn = mp.robot_sn")

    if "availability" in scoring_components:
        from_join_parts.append("LEFT JOIN robot_metrics rm ON pr.robot_sn = rm.robot_sn")

    if needs_task_metrics:
        from_join_parts.append("LEFT JOIN task_metrics tm ON pr.robot_sn = tm.robot_sn")

    if "battery_soh" in scoring_components:
        from_join_parts.append("LEFT JOIN battery_metrics bm ON pr.robot_sn = bm.robot_sn")

    # Final SQL assembly
    final_sql = "\n".join(sql_parts) + f"""

    SELECT
        {', '.join(select_parts)},

        -- Overall Health Score
        ROUND({' + '.join(score_calculations) if score_calculations else '0'}, 2) as overall_health_score,

        -- Overall Health Rating
        CASE
            WHEN ({' + '.join(score_calculations) if score_calculations else '0'}) >= 90 THEN 'Excellent'
            WHEN ({' + '.join(score_calculations) if score_calculations else '0'}) >= 80 THEN 'Good'
            WHEN ({' + '.join(score_calculations) if score_calculations else '0'}) >= 60 THEN 'Fair'
            ELSE 'Poor'
        END as overall_health_rating

    {chr(10).join(from_join_parts)}
    ORDER BY overall_health_score DESC;
    """

    # Execute the query
    return database_connection.query_data_as_df(final_sql)

In [6]:
calculate_robot_health_score(db, robot_brand='CC1')

  return pd.read_sql_query(query, connection)


Unnamed: 0,robot_sn,mode_performance_score,mode_performance_rating,availability_score,availability_rating,task_success_score,task_success_rating,efficiency_score,efficiency_rating,battery_soh_score,battery_soh_rating,overall_health_score,overall_health_rating
0,811135422060216,100.0,Excellent,100.0,Excellent,100.0,Excellent,75,Fair,99.0,Excellent,94.9,Excellent
1,811135422060228,50.0,Poor,100.0,Excellent,66.66667,Fair,75,Fair,98.0,Excellent,83.13,Good
2,8110H4802050005,0.0,Poor,0.0,Poor,0.0,Poor,10,Poor,0.0,Poor,2.0,Poor
3,8110H4802050006,0.0,Poor,0.0,Poor,0.0,Poor,10,Poor,0.0,Poor,2.0,Poor
4,8110H4B08050040,0.0,Poor,0.0,Poor,0.0,Poor,10,Poor,0.0,Poor,2.0,Poor
5,811135422060217,0.0,Poor,0.0,Poor,0.0,Poor,10,Poor,0.0,Poor,2.0,Poor


In [19]:
def comparison(database_connection, weight_combinations):
    """
    Ultra-simple version that just returns a pivot table
    """
    all_scores = {}

    for i, weights in enumerate(weight_combinations):
        df = calculate_robot_health_score(
            database_connection=database_connection,
            robot_brand="cc1",
            weights=weights,
            scoring_components=list(weights.keys())
        )

        strategy_name = f"S{i+1}"
        # Create a series with robot_sn as index and score as values
        scores = df.set_index('robot_sn')['overall_health_score']
        all_scores[strategy_name] = scores

    # Combine into a dataframe
    result_df = pd.DataFrame(all_scores)

    # Add weights as the first row for reference
    weights_row = pd.DataFrame({f"S{i+1}": str(w) for i, w in enumerate(weight_combinations)}, index=['Weights'])
    result_df = pd.concat([weights_row, result_df])

    return result_df

weight_combinations = [
    {"availability": 0.6, "task_success": 0.1, "efficiency": 0.1, "mode_performance": 0.1, "battery_soh": 0.1},
    {"availability": 0.5, "task_success": 0.1, "efficiency": 0.2, "mode_performance": 0.1, "battery_soh": 0.1},
    {"availability": 0.5, "task_success": 0.2, "efficiency": 0.1, "mode_performance": 0.1, "battery_soh": 0.1},
    {"availability": 0.4, "task_success": 0.2, "efficiency": 0.2, "mode_performance": 0.1, "battery_soh": 0.1},
    {"availability": 0.3, "task_success": 0.2, "efficiency": 0.2, "mode_performance": 0.2, "battery_soh": 0.1},
]

results = comparison(db, weight_combinations)

  return pd.read_sql_query(query, connection)
  return pd.read_sql_query(query, connection)
  return pd.read_sql_query(query, connection)
  return pd.read_sql_query(query, connection)
  return pd.read_sql_query(query, connection)


In [20]:
results

Unnamed: 0,S1,S2,S3,S4,S5
Weights,"{'availability': 0.6, 'task_success': 0.1, 'ef...","{'availability': 0.5, 'task_success': 0.1, 'ef...","{'availability': 0.5, 'task_success': 0.2, 'ef...","{'availability': 0.4, 'task_success': 0.2, 'ef...","{'availability': 0.3, 'task_success': 0.2, 'ef..."
811135422060216,97.4,94.9,97.4,94.9,94.9
811135422060228,88.97,86.47,85.63,83.13,78.13
8110H4802050005,1.0,2.0,1.0,2.0,2.0
8110H4802050006,1.0,2.0,1.0,2.0,2.0
8110H4B08050040,1.0,2.0,1.0,2.0,2.0
811135422060217,1.0,2.0,1.0,2.0,2.0
