In [2]:
import numpy as np
from sklearn.ensemble import IsolationForest
from tensorflow import keras

In [4]:
class ObstacleAvoidanceAI:
    def __init__(self):
        # Initialize models
        self.tof_model = self._init_tof_model()
        self.ultrasonic_model = self._init_ultrasonic_model()
        self.fusion_model = self._init_fusion_model()
        
        # Memory buffer for temporal analysis
        self.sensor_buffer = []
        self.buffer_size = 5
        
    def _init_tof_model(self):
        """Time-of-Flight (Laser) obstacle classifier"""
        model = keras.Sequential([
            keras.layers.Dense(8, activation='relu', input_shape=(3,)),
            keras.layers.Dense(4, activation='relu'),
            keras.layers.Dense(1, activation='sigmoid')
        ])
        model.compile(optimizer='adam', loss='binary_crossentropy')
        return model
    
    def _init_ultrasonic_model(self):
        """Ultrasonic anomaly detector"""
        return IsolationForest(contamination=0.1)
    
    def _init_fusion_model(self):
        """Multi-sensor fusion model"""
        return keras.models.load_model('fusion_model.h5')  # Pre-trained
    
    def predict_obstacle(self, sensor_data):
        """
        Input: {'tof': [current, prev1, prev2], 
                'ultrasonic': current_distance,
                'speed': current_speed}
        Output: {'obstacle': bool, 'direction': 'left'/'right'/'back'}
        """
        # Time-of-Flight prediction
        tof_input = np.array(sensor_data['tof']).reshape(1, -1)
        tof_pred = self.tof_model.predict(tof_input)[0][0] > 0.5
        
        # Ultrasonic anomaly detection
        us_input = np.array([sensor_data['ultrasonic']])
        us_pred = self.ultrasonic_model.predict(us_input)[0] == -1
        
        # Sensor fusion
        if tof_pred or us_pred:
            fusion_input = np.array([
                sensor_data['tof'][0]/8191.0,  # Normalized TOF
                sensor_data['ultrasonic']/400.0,  # Normalized (max 4m)
                sensor_data['speed']/2.0  # Normalized (max 2m/s)
            ]).reshape(1, -1)
            
            fusion_output = self.fusion_model.predict(fusion_input)
            direction = ['left', 'right', 'back'][np.argmax(fusion_output)]
            
            return {'obstacle': True, 'direction': direction}
        
        return {'obstacle': False}

In [5]:
class AvoidanceController:
    def __init__(self):
        self.ai = ObstacleAvoidanceAI()
        self.safety_margin = 0.5  # meters
        self.last_commands = []
        
    def get_avoidance_command(self, sensor_readings):
        """
        Processes sensor data and returns motor commands
        Format: {'action': 'forward'/'left'/'right'/'back', 'speed': 0-100}
        """
        # Update sensor buffer
        self.sensor_buffer.append(sensor_readings)
        if len(self.sensor_buffer) > self.buffer_size:
            self.sensor_buffer.pop(0)
        
        # Get AI prediction
        obstacle_info = self.ai.predict_obstacle({
            'tof': [r['tof'] for r in self.sensor_buffer[-3:]],
            'ultrasonic': sensor_readings['ultrasonic'],
            'speed': sensor_readings['speed']
        })
        
        # Determine action
        if obstacle_info['obstacle']:
            if obstacle_info['direction'] == 'left':
                return {'action': 'right', 'speed': 70}
            elif obstacle_info['direction'] == 'right':
                return {'action': 'left', 'speed': 70}
            else:
                return {'action': 'back', 'speed': 50}
        else:
            return {'action': 'forward', 'speed': min(100, sensor_readings['speed']*50)}