In [3]:
class OpticalFlowTracker:
    def __init__(self):
        # Parámetros para Lucas-Kanade optical flow
        self.lk_params = dict(
            winSize=(21, 21),
            maxLevel=4,
            criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 20, 0.01)
        )
        
        self.prev_gray = None
        self.prev_points = None
        self.redetect_counter = 0
        self.max_frames_before_redetect = 12
        self.movement_history = deque(maxlen=10)  # Historial de movimiento para detectar picos
    
    def track_points(self, frame, landmarks_detector):
        """Realiza seguimiento de puntos usando flujo óptico"""
        frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        output_image = frame.copy()
        movement_magnitude = 0
        
        # Si no hay puntos previos o es hora de redetectar
        self.redetect_counter += 1
        if self.redetect_counter >= self.max_frames_before_redetect or self.prev_points is None:
            # Detectar landmarks usando MediaPipe
            new_points = landmarks_detector.detect_landmarks(frame)
            self.redetect_counter = 0
            
            if new_points is not None and len(new_points) > 0:
                self.prev_points = new_points
                self.prev_indices = landmarks_detector.indices_clave.copy()
                self.prev_gray = frame_gray.copy()
                
                # Visualizar puntos detectados
                output_image = landmarks_detector.visualize_landmarks(frame, new_points)
                return output_image, new_points, movement_magnitude
        
        # Calcular flujo óptico si tenemos puntos previos
        if self.prev_points is not None and len(self.prev_points) > 0:
            # Calcular flujo óptico - Lucas-Kanade method
            new_points, status, err = cv2.calcOpticalFlowPyrLK(
                self.prev_gray, frame_gray, self.prev_points, None, **self.lk_params
            )
            
            # Verificar la calidad del seguimiento
            prev_points_back, status_back, err_back = cv2.calcOpticalFlowPyrLK(
                frame_gray, self.prev_gray, new_points, None, **self.lk_params
            )
            
            prev_points_back, status_back, err_back = cv2.calcOpticalFlowPyrLK(
                frame_gray, self.prev_gray, new_points, None, **self.lk_params
                )
            
            # Calcular diferencia para eliminar puntos de baja calidad
            d = abs(self.prev_points - prev_points_back).reshape(-1, 2).max(-1)
            good = (d < 1.5) & (status.reshape(-1) == 1)
            
            # Filtrar puntos buenos y dibujar
            h, w = frame.shape[:2]
            filtered_points = []
            filtered_indices = []  # Nuevo: para mantener los índices originales
            
            # Calcular la magnitud del movimiento para detección de keyframes
            movement_vectors = []
            
            for i, (new_point, good_point) in enumerate(zip(new_points, good)):
                if good_point:
                    x, y = new_point.ravel()
                    if 0 <= int(y) < h and 0 <= int(x) < w:
                        filtered_points.append(new_point.reshape(1, 2))
                        

                        if hasattr(self, 'prev_indices') and i < len(self.prev_indices):
                            idx_original = self.prev_indices[i]
                        else:
                        # Si no tenemos índice guardado, usar el valor por defecto
                            idx_original = landmarks_detector.indices_clave[i] if i < len(landmarks_detector.indices_clave) else 0
                    
                    # Guardar el índice para la próxima iteración
                        filtered_indices.append(idx_original)
                    
                    # Obtener color según la región facial usando el índice original correcto
                        color = landmarks_detector.punto_colores.get(idx_original, (0, 0, 255))
                    


                        # Dibujar punto
                        cv2.circle(output_image, (int(x), int(y)), 2, color, -1)
                        
                        # Dibujar línea de movimiento
                        prev_x, prev_y = self.prev_points[i].ravel()
                        movement_vector = np.linalg.norm(new_point.ravel() - np.array([prev_x, prev_y]))
                        movement_vectors.append(movement_vector)
                        
                        if movement_vector < 20:  # Filtrar movimientos extremos
                            cv2.line(output_image, (int(x), int(y)), 
                                    (int(prev_x), int(prev_y)), (0, 255, 0), 1)
            
            # Calcular magnitud de movimiento promedio
            if movement_vectors:
                movement_magnitude = np.mean(movement_vectors)
                self.movement_history.append(movement_magnitude)
            
            # Comprobar si perdimos demasiados puntos
            if len(filtered_points) < len(landmarks_detector.indices_clave) * 0.6:
                # Forzar redetección si perdimos muchos puntos
                self.redetect_counter = self.max_frames_before_redetect
                self.prev_points = None
                self.prev_indices = None  # Limpiar también los índices
            else:
                # Actualizar puntos
                self.prev_points = np.array(filtered_points, dtype=np.float32).reshape(-1, 1, 2) if filtered_points else None
                self.prev_indices = filtered_indices  # Actualizar los índices filtrados
                
            # Mostrar información
            if self.prev_points is not None:
                cv2.putText(output_image, f"Puntos: {len(self.prev_points)}/291", (10, 30), 
                           cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
                cv2.putText(output_image, f"Movimiento: {movement_magnitude:.4f}", (10, 60), 
                           cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
        
        # Actualizar frame anterior
        self.prev_gray = frame_gray.copy()
        
        return output_image, self.prev_points, movement_magnitude
    
    def smooth_movement_history(self):
    #"""Aplica un filtro de suavizado a la historia de movimiento"""
        if len(self.movement_history) < 3:
            return
        
    # Filtro de media móvil simple
        smoothed = []
        window = 3
        history_list = list(self.movement_history)
        for i in range(len(history_list) - window + 1):
            avg = sum(history_list[i:i+window]) / window
            smoothed.append(avg)
    
    # Actualizar historia con valores suavizados
        self.movement_history = deque(smoothed, maxlen=self.movement_history.maxlen)

    def is_key_frame(self, current_movement):
   # """Determina si un frame es keyframe basado en patrones de expresión"""
        if len(self.movement_history) < 5:
            return False
    
    # Aplicar suavizado antes de detectar
        self.smooth_movement_history()
    
    # Capturar el inicio de una expresión (aumento de movimiento)
        history = list(self.movement_history)
        if current_movement > MOVEMENT_THRESHOLD:
        # Detectar el inicio de un movimiento (pendiente positiva)
            if len(history) >= 3 and current_movement > history[-2] > history[-3]:
                return True
        # Detectar el final de un movimiento (pendiente negativa después de un máximo)
            elif len(history) >= 3 and history[-2] > current_movement and history[-2] > history[-3]:
                return True
    
        return False

In [5]:
if __name__ == "__main__":
    # Código de prueba...
    tracker = OpticalFlowTracker()
    # ...