In [1]:
import numpy as np
import time
import matplotlib.pyplot as plt
import plotly.graph_objects as go

In [None]:
class RoboticSystemModel:
    def __init__(self, synthetic_data):
        self.position = [0, 0, 0]
        self.speed = 1.0
        self.status = "Operational"
        self.battery_level = 100
        self.synthetic_data = synthetic_data
        self.current_index = 0
        self.failure_log = []
    def analyze_failure_reasons(self, data):
        reasons = []
        
        if data['system_temperature'] > 70:
            reasons.append(f"High System Temperature ({data['system_temperature']:.2f}°C > 70°C)")
        
        if data['power_unit_temperature'] > 80:
            reasons.append(f"High Power Unit Temperature ({data['power_unit_temperature']:.2f}°C > 80°C)")
        
        if data['system_vibration'] > 2.0:
            reasons.append(f"Excessive System Vibration ({data['system_vibration']:.2f} mm/s > 2.0 mm/s)")
        
        if data['operational_hours'] > 4000:
            reasons.append(f"High Operational Hours ({data['operational_hours']:.2f} > 4000 hours)")
        
        if data['previous_failures'] > 2:
            reasons.append(f"Multiple Previous Failures ({data['previous_failures']} > 2)")
        
        if data['hydraulic_pressure'] < 1200 or data['hydraulic_pressure'] > 2800:
            reasons.append(f"Hydraulic Pressure Out of Range ({data['hydraulic_pressure']:.2f} kPa)")
        
        if self.battery_level < 20:
            reasons.append(f"Low Battery Level ({self.battery_level:.2f}%)")
        
        return reasons
    
    def move_to_position(self, target_position):
        if self.current_index >= len(self.synthetic_data):
            print("No more synthetic data available")
            return
        
        data = self.synthetic_data.iloc[self.current_index]
        row_number = self.current_index
        
        print(f"\n{'='*60}")
        print(f"MOVEMENT OPERATION - Row #{row_number}")
        print(f"{'='*60}")
        print(f"Initial System Metrics:")
        print(f"  Temperature: {data['system_temperature']:.2f}°C")
        print(f"  Current: {data['current_draw']:.2f} A")
        print(f"\nMoving from {self.position} to {target_position}...")
        
        distance = np.sqrt(sum((np.array(target_position) - np.array(self.position)) ** 2))
        time_required = distance / self.speed
        time.sleep(min(0.5, time_required))
        
        self.position = target_position
        self.battery_level -= distance * 0.1
        
        print(f"Position updated: {self.position}")
        print(f"Remaining Battery Level: {self.battery_level:.2f}%")
        print(f"Temperature: {data['system_temperature']:.2f}°C")
        print(f"Current Draw: {data['current_draw']:.2f} A")
        
        failure_reasons = self.analyze_failure_reasons(data)
        
        if data['is_failure'] == 1 or failure_reasons:
            self.status = "Maintenance Required"
            print(f"\n{'!'*60}")
            print(f"⚠️ MOVEMENT FAILURE DETECTED - Row #{row_number}")
            print(f"{'!'*60}")
            print(f"Failure Reasons:")
            for i, reason in enumerate(failure_reasons, 1):
                print(f"  {i}. {reason}")
            
            self.failure_log.append({
                'row_number': row_number,
                'operation': 'Movement',
                'position': target_position,
                'reasons': failure_reasons,
                'data': data.to_dict()
            })
        else:
            print(f"✓ Movement completed successfully - All systems nominal")
        
        print(f"\nSystem Status: {self.status}")
        self.current_index += 1
    
    def perform_task(self, task):
        if self.current_index >= len(self.synthetic_data):
            print("No more synthetic data available")
            return
        
        row_number = self.current_index
        
        print(f"\n{'='*60}")
        print(f"TASK EXECUTION: {task} - Row #{row_number}")
        print(f"{'='*60}")
        print(f"Executing task: {task}...")
        time.sleep(0.5)
        
        self.battery_level -= 5
        data = self.synthetic_data.iloc[self.current_index]
        
        print(f"Task '{task}' execution completed.")
        print(f"Remaining Battery Level: {self.battery_level:.2f}%")
        print(f"Hydraulic Pressure: {data['hydraulic_pressure']:.2f} kPa")
        print(f"System Vibration: {data['system_vibration']:.2f} mm/s")
        print(f"Power Unit Temperature: {data['power_unit_temperature']:.2f}°C")
        
        failure_reasons = self.analyze_failure_reasons(data)
        
        if data['is_failure'] == 1 or failure_reasons:
            self.status = "Maintenance Required"
            print(f"\n{'!'*60}")
            print(f"⚠️ TASK FAILURE DETECTED - Row #{row_number}")
            print(f"{'!'*60}")
            print(f"Failure Reasons:")
            for i, reason in enumerate(failure_reasons, 1):
                print(f"  {i}. {reason}")
            
            self.failure_log.append({
                'row_number': row_number,
                'operation': f'Task: {task}',
                'position': self.position,
                'reasons': failure_reasons,
                'data': data.to_dict()
            })
        else:
            print(f"✓ Task completed successfully - All systems nominal")

        print(f"\nSystem Status: {self.status}")
        self.current_index += 1

    def check_status(self):
        print(f"System Status: {self.status}")
    def analyze_tasks(self, tasks):
        failure_tasks = []
        for task in tasks:
            print(f"\nStarting task: {task['name']}")
            self.move_to_position(task['position'])
            self.perform_task(task['name'])
            if self.status == "Maintenance Required":
                failure_tasks.append(task['name'])
        plt.figure(figsize=(10, 6))
        task_names = [task['name'] for task in tasks]
        battery_levels = [self.synthetic_data.iloc[i]['current_draw'] for i in range(len(tasks))]
        plt.bar(task_names, battery_levels, color='blue')
        plt.xlabel('Tasks')
        plt.ylabel('Current Draw (A)')
        plt.title('Task Analysis: Current Draw per Task')
        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.show()
        print(f"Tasks where failure occurred: {failure_tasks}")
    @staticmethod
    def visualize_3d_risk(data):
        fig = go.Figure()
        fig.add_trace(go.Scatter3d(x=data['system_temperature'], y=data['system_vibration'], z=data['hydraulic_pressure'], mode='markers',
        marker=dict(size=6, color=data['is_failure'], colorscale='Viridis', colorbar=dict(title="Failure Status"), opacity=0.8), name='Failure Data'))
        fig.update_layout(scene=dict(
        xaxis_title='System Temperature (°C)',
        yaxis_title='System Vibration (mm/s)',
        zaxis_title='Hydraulic Pressure (kPa)'),
        title='3D Visualization of Failure Risk with Failure Status',
        margin=dict(l=0, r=0, b=0, t=40))
        fig.show()