In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
import cv2
import numpy as np
import os
from PIL import Image
from ultralytics import YOLO
import yaml

In [4]:
if torch.cuda.is_available():
    device = 'cuda'
else:
    device = 'cpu'
print(device)

cuda


In [6]:
%cd lame_horse

[WinError 2] The system cannot find the file specified: 'lame_horse'
C:\Users\pham\lame_horse


In [8]:
video_path = 'lame horse/dataset/test/4.mp4'
if not os.path.exists(video_path):
    raise FileNotFoundError(f"The path '{video_path}' does not exist.")


FileNotFoundError: The path 'lame horse/dataset/test/4.mp4' does not exist.

In [7]:
%ls

 Volume in drive C is System
 Volume Serial Number is 56BE-7538

 Directory of C:\Users\pham\lame horse\dataset\test\lame

04/07/2024  13:17    <DIR>          .
04/07/2024  13:17    <DIR>          ..
04/07/2024  13:17    <DIR>          .ipynb_checkpoints
22/06/2024  14:31         9,930,787 4.mp4
               1 File(s)      9,930,787 bytes
               3 Dir(s)  386,411,016,192 bytes free


In [13]:
# Define the GetKps class for keypoint extraction
class GetKps:
    def __init__(self, model_path):
        self.model = YOLO(model_path).to(device_name)
        

    def extractkps(self, frame):
        results = self.model(frame)
        kps_list = []

        if results:
            for r in results:
                if hasattr(r, 'keypoints'):
                    keypoints = r.keypoints.xyn.cpu().numpy()
                    kps_list.append(keypoints)
                else:
                    print("No keypoints attribute found in the results.")
                    kps_list.append(self._get_dummy_keypoints(frame.shape))
        else:
            print("No results found.")
            kps_list.append(self._get_dummy_keypoints(frame.shape))
        return kps_list

    def _get_dummy_keypoints(self, shape):
        return np.zeros((17, 3))

# Define the PositionalEncoding class for the transformer
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=1000):
        super(PositionalEncoding, self).__init__()
        self.encoding = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-torch.log(torch.tensor(10000.0)) / d_model))
        self.encoding[:, 0::2] = torch.sin(position * div_term)
        self.encoding[:, 1::2] = torch.cos(position * div_term)
        self.encoding = self.encoding.unsqueeze(0)

    def forward(self, x):
        return self.encoding[:, :x.size(1), :].to(x.device)

# Define the 3D Conv network (if not pre-trained)
class Conv3D_2L(nn.Module):
    def __init__(self):
        super(Conv3D_2L, self).__init__()
        self.conv_layers = nn.Sequential(
            nn.Conv3d(3, 16, kernel_size=(3, 3, 3), padding=(1, 1, 1)),
            nn.BatchNorm3d(16),
            nn.ReLU(inplace=True),
            nn.MaxPool3d(kernel_size=(2, 2, 2), stride=(2, 2, 2)),
            nn.Conv3d(16, 32, kernel_size=(3, 3, 3), padding=(1, 1, 1)),
            nn.BatchNorm3d(32),
            nn.ReLU(inplace=True),
            nn.MaxPool3d(kernel_size=(2, 2, 2), stride=(2, 2, 2)),
        )
        self.avgpool = nn.AdaptiveAvgPool3d((1, 1, 1))

    def forward(self, x):
        x = self.conv_layers(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        return x

# Define the HorseLamenessClassifier class
class HorseLamenessClassifier(nn.Module):
    def __init__(self, feature_model, d_model, nhead, num_layers):
        super(HorseLamenessClassifier, self).__init__()
        self.feature_model = feature_model
        self.positional_encoding = PositionalEncoding(d_model)
        encoder_layer = nn.TransformerEncoderLayer(d_model, nhead)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers)
        self.classifier = nn.Linear(d_model, 2)  # 2 classes: lame or sound

    def forward(self, video_frames, keypoints=None):
        video_features = self.feature_model(video_frames)
        if keypoints is not None and len(keypoints) > 0:
            keypoints = np.array(keypoints)
            keypoints = torch.from_numpy(keypoints).float().to(device_name)
            combined_features = torch.cat((video_features, keypoints), dim=1)
        else:
            combined_features = video_features

        # Positional Encoding
        combined_features = self.positional_encoding(combined_features)

        # Transformer Encoder
        encoded_features = self.transformer_encoder(combined_features)
        encoded_features = encoded_features.mean(dim=1)  # global average pooling over the sequence

        # Classification
        logits = self.classifier(encoded_features)
        return logits

# Load YOLO model
model_path = 'best.pt'
yolo_model = GetKps(model_path)

# Load Pretrained Video Feature Extractor
model_load_path = 'lame horse'
feature_model = torch.load(model_load_path)
feature_model.fc = nn.Identity()
feature_model = feature_model.to(device_name)
feature_model.eval()

# Initialize the HorseLamenessClassifier model
model = HorseLamenessClassifier(feature_model=feature_model, d_model=32, nhead=8, num_layers=4)
model.to(device_name)
model.eval()  # Set the model to evaluation mode

# Video processing
video_path = 'dataset/test/4.mp4'
output_path = 'processed_video_transformer.mp4'

cap = cv2.VideoCapture(video_path)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

if not out.isOpened():
    print('Error: Unable to open output video file')
else:
    print(f"Output video file opened, width: {width}, height: {height}, fps: {fps}")

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Extract keypoints
    try: 
        keypoints = yolo_model.extractkps(frame)
        keypoints = results[0].keypoints.xy.cpu().numpy() if  hasattr(results[0], 'keypoints') else yolo_model._get_dummy_keypoints(frame.shape)
    except Exception as e:  # Catch any potential errors during keypoint detection
        print(f"Error detecting keypoints: {e}")
        keypoints = yolo_model._get_dummy_keypoints(frame.shape)
        
    # Preprocess frame and get video features
    processed_frame = preprocess_frame(frame)
    with torch.no_grad():
        video_features = feature_model(processed_frame.unsqueeze(0).to(device_name))

    # Inference
    with torch.no_grad():
        output = model(video_features, keypoints)  # Get logits
        _, predicted = torch.max(output, dim=1)  # Get class prediction

    # Draw bounding box and keypoints
    results = yolo_model.model(frame)
    for result in results:
        if hasattr(result, 'boxes'):
            boxes = result.boxes.xyxy.cpu().numpy().astype(int)
            for box in boxes:
                x1, y1, x2, y2 = box
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)

    if keypoints is not None and keypoints.size > 0:
        for kp in keypoints[0]:
            x, y, _ = kp
            cv2.circle(frame, (int(x * width), int(y * height)), 3, (0, 0, 255), -1)

    # Display Label
    label = "Lame" if predicted.item() == 0 else "Sound"
    font = cv2.FONT_HERSHEY_SIMPLEX
    text_size = cv2.getTextSize(label, font, 1, 2)[0]
    text_x = int((frame.shape[1] - text_size[0]) / 2)
    text_y = int((frame.shape[0] + text_size[1]) / 2)
    cv2.putText(frame, label, (text_x, text_y), font, 1, (0, 0, 255), 2, cv2.LINE_AA)
   
    out.write(frame)  # Write the frame to the output video

cap.release()
out.release()
cv2.destroyAllWindows()

print('Processig finished!')


Error: Unable to open output video file
Processig finished!
