In [1]:
import cv2
import json
import numpy as np
import pandas as pd
from pathlib import Path
from collections import defaultdict
import os
from ultralytics import YOLO
from tensorflow.keras.utils import Sequence

DATA_ROOT = Path("C:/Users/2955352g/Desktop/pig_data_edinburgh")

class AnnotatedDataGenerator(Sequence):
    """Data generator for annotated frames to avoid memory issues"""
    
    def __init__(self, data_info, batch_size=8, target_size=(224, 224), shuffle=True):
        self.data_info = data_info
        self.batch_size = batch_size
        self.target_size = target_size
        self.shuffle = shuffle
        self.indices = np.arange(len(data_info))
        if shuffle:
            np.random.shuffle(self.indices)
    
    def __len__(self):
        return int(np.ceil(len(self.data_info) / self.batch_size))
    
    def __getitem__(self, idx):
        batch_indices = self.indices[idx * self.batch_size:(idx + 1) * self.batch_size]
        batch_data = [self.data_info.iloc[i] for i in batch_indices]
        
        X, y = self._load_batch(batch_data)
        return X, y
    
    def _load_batch(self, batch_data):
        X = []
        y = []
        
        for data in batch_data:
            try:
                # Load frame from video
                cap = cv2.VideoCapture(str(data['video_path']))
                cap.set(cv2.CAP_PROP_POS_FRAMES, data['frame_num'])
                ret, frame = cap.read()
                cap.release()
                
                if ret:
                    # Resize frame
                    frame = cv2.resize(frame, self.target_size)
                    frame = frame.astype(np.float32) / 255.0  # Normalize
                    X.append(frame)
                    y.append(data['behavior_encoded'])
                    
            except Exception as e:
                print(f"Error loading frame: {e}")
                continue
        
        return np.array(X), np.array(y)
    
    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indices)

class YOLOv8Detector:
    def __init__(self, model_path=None, device='auto'):
        """
        Initialize YOLOv8 detector
        :param model_path: Path to custom trained YOLOv8 model (.pt file)
        :param device: 'cpu' or 'cuda' or 'auto'
        """
        if model_path is None:
            # Load default COCO-pretrained model
            self.model = YOLO('yolov8n.pt')
        else:
            # Load custom trained model
            self.model = YOLO(model_path)
        
        # Set device
        self.device = device
    
    def train(self, data_yaml, epochs=50, imgsz=640, batch=16):
        """
        Train YOLOv8 model
        :param data_yaml: Path to data.yaml file
        :param epochs: Number of training epochs
        :param imgsz: Image size
        :param batch: Batch size
        """
        results = self.model.train(
            data=data_yaml,
            epochs=epochs,
            imgsz=imgsz,
            batch=batch,
            device=self.device,
            pretrained=True,
            optimizer='auto',
            lr0=0.01,
            patience=10
        )
        return results
    
    def detect(self, frame, conf=0.5, iou=0.4):
        """
        Detect pigs in a frame
        :param frame: Input frame (BGR format)
        :param conf: Confidence threshold
        :param iou: IoU threshold for NMS
        :return: List of detections [x1, y1, x2, y2, conf, cls]
        """
        # Convert frame to RGB
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        
        # Run inference
        results = self.model.predict(
            frame_rgb,
            conf=conf,
            iou=iou,
            device=self.device,
            verbose=False
        )
        
        # Extract bounding boxes
        detections = []
        for result in results:
            boxes = result.boxes.xyxy.cpu().numpy()
            confs = result.boxes.conf.cpu().numpy()
            cls_ids = result.boxes.cls.cpu().numpy()
            
            for box, conf, cls_id in zip(boxes, confs, cls_ids):
                detections.append({
                    'bbox': [int(box[0]), int(box[1]), int(box[2]-box[0]), int(box[3]-box[1])],  # Convert to [x,y,w,h]
                    'confidence': float(conf),
                    'class_id': int(cls_id)
                })
        
        return detections

def load_annotated_data_info():
    """Load annotated data information without loading frames into memory"""
    data_info = []
    
    # Check if annotated directory exists
    annotated_dir = DATA_ROOT / "annotated"
    if not annotated_dir.exists():
        print(f"Warning: Annotated directory {annotated_dir} does not exist")
        return pd.DataFrame()
    
    json_files = list(annotated_dir.rglob("output.json"))
    if not json_files:
        print("No output.json files found in annotated directory")
        return pd.DataFrame()
    
    print(f"Found {len(json_files)} annotation files")
    
    for json_file in json_files:
        try:
            with open(json_file) as f:
                data = json.load(f)
            
            video_path = json_file.parent / "color.mp4"
            if not video_path.exists():
                print(f"Warning: Video file {video_path} does not exist")
                continue
            
            # Quick check if video can be opened
            cap = cv2.VideoCapture(str(video_path))
            if not cap.isOpened():
                print(f"Warning: Cannot open video {video_path}")
                continue
            
            total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
            cap.release()
            
            print(f"Processing {video_path} with {total_frames} frames")
            
            for obj in data.get('objects', []):
                for frame_data in obj.get('frames', []):
                    frame_num = frame_data.get('frameNumber', 0)
                    if frame_num >= total_frames:
                        continue
                    
                    # Handle different bbox formats
                    bbox = frame_data.get('bbox', [0, 0, 100, 100])
                    
                    # Debug: Print first few bbox formats
                    if len(data_info) < 3:
                        print(f"  Sample bbox: {bbox}, type: {type(bbox)}")
                    
                    # Normalize bbox format
                    try:
                        if isinstance(bbox, str):
                            import ast
                            bbox = ast.literal_eval(bbox)
                        
                        if isinstance(bbox, dict):
                            # Convert dict format to list [x, y, width, height]
                            x = bbox.get('x', 0)
                            y = bbox.get('y', 0)
                            w = bbox.get('width', bbox.get('w', 100))
                            h = bbox.get('height', bbox.get('h', 100))
                            bbox = [x, y, w, h]
                        
                        # Ensure bbox is a list with 4 elements
                        if not isinstance(bbox, (list, tuple)) or len(bbox) != 4:
                            bbox = [0, 0, 100, 100]
                    
                    except Exception as e:
                        print(f"Error processing bbox {bbox}: {e}")
                        bbox = [0, 0, 100, 100]
                    
                    data_info.append({
                        'video_path': str(video_path),
                        'behavior': frame_data.get('behaviour', 'unknown'),
                        'bbox': bbox,
                        'video_id': json_file.parent.name,
                        'date': json_file.parent.parent.name,
                        'frame_num': frame_num
                    })
                    
        except Exception as e:
            print(f"Error processing {json_file}: {e}")
            continue
    
    print(f"Found {len(data_info)} annotated frames")
    return pd.DataFrame(data_info)

def prepare_yolo_dataset(data_info, output_dir):
    """
    Prepare dataset in YOLOv8 format
    :param data_info: DataFrame with annotation info
    :param output_dir: Directory to save YOLO dataset
    """
    output_dir = Path(output_dir)
    (output_dir / 'images').mkdir(parents=True, exist_ok=True)
    (output_dir / 'labels').mkdir(parents=True, exist_ok=True)
    
    # Create data.yaml
    data_yaml = {
        'path': str(output_dir.absolute()),
        'train': 'images',
        'val': 'images',
        'names': ['pig'],
        'nc': 1
    }
    
    with open(output_dir / 'data.yaml', 'w') as f:
        yaml.dump(data_yaml, f)
    
    # Process each frame
    for idx, row in data_info.iterrows():
        try:
            # Load frame
            cap = cv2.VideoCapture(str(row['video_path']))
            cap.set(cv2.CAP_PROP_POS_FRAMES, row['frame_num'])
            ret, frame = cap.read()
            cap.release()
            
            if not ret:
                continue
                
            # Save image
            img_path = output_dir / 'images' / f"{row['video_id']}_{row['frame_num']}.jpg"
            cv2.imwrite(str(img_path), frame)
            
            # Prepare YOLO annotation
            bbox = row['bbox']
            if isinstance(bbox, str):
                bbox = ast.literal_eval(bbox)
            if isinstance(bbox, dict):
                x = bbox.get('x', 0)
                y = bbox.get('y', 0)
                w = bbox.get('width', bbox.get('w', 100))
                h = bbox.get('height', bbox.get('h', 100))
                bbox = [x, y, w, h]
            
            # Convert to YOLO format (normalized cx, cy, w, h)
            img_h, img_w = frame.shape[:2]
            x_center = (bbox[0] + bbox[2]/2) / img_w
            y_center = (bbox[1] + bbox[3]/2) / img_h
            width = bbox[2] / img_w
            height = bbox[3] / img_h
            
            # Save label
            label_path = output_dir / 'labels' / f"{row['video_id']}_{row['frame_num']}.txt"
            with open(label_path, 'w') as f:
                f.write(f"0 {x_center} {y_center} {width} {height}")
                
        except Exception as e:
            print(f"Error processing frame {idx}: {e}")
            continue

def train_yolov8_detector(data_info, output_dir='yolo_dataset', device='auto'):
    """
    Train YOLOv8 detector on pig data
    :param data_info: DataFrame with annotation info
    :param output_dir: Directory to save YOLO dataset
    :param device: 'cpu', 'cuda', or 'auto'
    :return: Trained YOLOv8 detector
    """
    # Prepare YOLO format dataset
    prepare_yolo_dataset(data_info, output_dir)
    
    # Initialize detector
    detector = YOLOv8Detector(device=device)
    
    # Train model
    print("Training YOLOv8 detector...")
    results = detector.train(
        data_yaml=str(Path(output_dir) / 'data.yaml'),
        epochs=50,
        imgsz=640,
        batch=-1  # Auto batch size
    )
    
    return detector

def run_full_pipeline(device='auto'):
    print("Starting pig behavior detection pipeline...")
    
    # Check if data directory exists
    if not DATA_ROOT.exists():
        print(f"Error: Data directory {DATA_ROOT} does not exist")
        return None, None, None
    
    print("Loading data information...")
    data_info = load_annotated_data_info()
    
    if data_info.empty:
        print("No annotated data found. Cannot proceed with training.")
        return None, None, None
    
    print("Training YOLOv8 detector...")
    detector = train_yolov8_detector(data_info, device=device)
    
    print("Initializing tracker...")
    tracker = PigTracker()
    
    print("Training behavior classifier...")
    behavior_model, behavior_mapping = train_behavior_classifier(data_info)
    
    print("Pipeline training complete!")
    return detector, tracker, behavior_model

def test_single_video(detector, behavior_model, video_path):
    """Test the trained models on a single video"""
    if not os.path.exists(video_path):
        print(f"Video file {video_path} does not exist")
        return
    
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Cannot open video {video_path}")
        return
    
    print(f"Processing video: {video_path}")
    
    frame_count = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        frame_count += 1
        if frame_count % 30 == 0:  # Process every 30th frame
            # Get detections
            if detector:
                detections = detector.detect(frame)
                print(f"Frame {frame_count}: Detected {len(detections)} pigs")
                for det in detections:
                    print(f"  Bbox: {det['bbox']}, Confidence: {det['confidence']:.2f}")
            
            # Get behavior (resize frame first)
            if behavior_model:
                frame_resized = cv2.resize(frame, (224, 224))
                frame_norm = frame_resized.astype(np.float32) / 255.0
                frame_batch = np.expand_dims(frame_norm, axis=0)
                
                behavior_pred = behavior_model.predict(frame_batch, verbose=0)
                behavior_class = np.argmax(behavior_pred[0])
                confidence = np.max(behavior_pred[0])
                print(f"Frame {frame_count}: Behavior class: {behavior_class}, Confidence: {confidence:.3f}")
    
    cap.release()
    print(f"Processed {frame_count} frames")

if __name__ == "__main__":
    import argparse
    
    parser = argparse.ArgumentParser()
    parser.add_argument('--device', type=str, default='auto', help="Device to use: 'cpu', 'cuda', or 'auto'")
    args = parser.parse_args()
    
    detector, tracker, behavior_model = run_full_pipeline(device=args.device)
    
    if detector is not None:
        print("Detection model trained successfully")
    if behavior_model is not None:
        print("Behavior classification model trained successfully")
    if tracker is not None:
        print("Tracker initialized successfully")
    
    # Test on a single video if models are trained
    if detector or behavior_model:
        test_video_path = DATA_ROOT / "annotated" / "test_video.mp4"  # Adjust path as needed
        if test_video_path.exists():
            test_single_video(detector, behavior_model, str(test_video_path))

usage: ipykernel_launcher.py [-h] [--device DEVICE]
ipykernel_launcher.py: error: unrecognized arguments: --f=c:\Users\2955352g\AppData\Roaming\jupyter\runtime\kernel-v3f6a330ff864cd2b490011f1ac843288765bd2e7e.json


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
