In [1]:
!pip install torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cu118
!pip install opencv-python

Defaulting to user installation because normal site-packages is not writeable
Looking in indexes: https://pypi.org/simple, https://download.pytorch.org/whl/cu118



[notice] A new release of pip available: 22.3.1 -> 24.3.1
[notice] To update, run: python.exe -m pip install --upgrade pip


Defaulting to user installation because normal site-packages is not writeable


[notice] A new release of pip available: 22.3.1 -> 24.3.1
[notice] To update, run: python.exe -m pip install --upgrade pip





In [2]:

import torch
from torch.utils.data import Dataset, DataLoader
import json
import numpy as np
from torchvision import models, transforms
import cv2


print(torch.__version__)
print("CUDA available:", torch.cuda.is_available())
print("CUDA version:", torch.version.cuda)
print("Number of GPUs:", torch.cuda.device_count())
if torch.cuda.is_available():
    print("GPU Name:", torch.cuda.get_device_name(0))

2.5.0+cu118
CUDA available: True
CUDA version: 11.8
Number of GPUs: 1
GPU Name: NVIDIA GeForce RTX 3050 Laptop GPU


In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
class ImageDataset(Dataset):
    def __init__(self, img_dir, data_file):
        self.img_dir = img_dir
        self.data = self._load_data(data_file)
        self.transforms = self._build_transforms()

    def _load_data(self, data_file):
        with open(data_file, "r") as f:
            return json.load(f)

    def _build_transforms(self):
        return transforms.Compose([
            transforms.ToPILImage(),
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data[idx]
        img = self._load_image(item["id"])
        img, kps = self._process_image_and_keypoints(img, item["kps"])
        return img, kps

    def _load_image(self, img_id):
        img_path = f"{self.img_dir}/{img_id}.png"
        img = cv2.imread(img_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        return img

    def _process_image_and_keypoints(self, img, keypoints):
        h, w = img.shape[:2]
        img = self.transforms(img)
        kps = np.array(keypoints, dtype=np.float32).flatten()
        kps[::2] *= 224.0 / w 
        kps[1::2] *= 224.0 / h
        return img, kps



In [None]:

train_dataset = ImageDataset(img_dir="data/images", data_file="data/data_train.json")
val_dataset = ImageDataset(img_dir="data/images", data_file="data/data_val.json")

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=True)

In [9]:
model = models.resnet50(pretrained=True)
model.fc =  torch.nn.Linear(model.fc.in_features, 14*2)
model = model.to(device)
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)



In [10]:
epochs = 20

for epoch in range(epochs):
    for i, (imgs, kps) in enumerate(train_loader):
        imgs = imgs.to(device)
        kps = kps.to(device)

        optimizer.zero_grad()
        outputs = model(imgs)
        loss = criterion(outputs, kps)
        loss.backward()
        optimizer.step()

        if i % 10 == 0:
            print(f"Epoch {epoch}, iter {i}, loss: {loss.item()}")

    torch.save(model.state_dict(), f"model_epoch_{epoch+1}.pth")
    print(f"Model saved after epoch {epoch+1}")


Epoch 0, iter 0, loss: 14656.884765625
Epoch 0, iter 10, loss: 14803.48828125
Epoch 0, iter 20, loss: 13598.6103515625
Epoch 0, iter 30, loss: 13789.7451171875
Epoch 0, iter 40, loss: 13977.4873046875
Epoch 0, iter 50, loss: 13054.314453125
Epoch 0, iter 60, loss: 13312.2431640625
Epoch 0, iter 70, loss: 12089.998046875
Epoch 0, iter 80, loss: 11847.1279296875
Epoch 0, iter 90, loss: 11584.8642578125
Epoch 0, iter 100, loss: 11149.8818359375
Epoch 0, iter 110, loss: 10382.1123046875
Epoch 0, iter 120, loss: 10886.794921875
Epoch 0, iter 130, loss: 10459.69140625
Epoch 0, iter 140, loss: 9719.8505859375
Epoch 0, iter 150, loss: 9796.7255859375
Epoch 0, iter 160, loss: 9108.037109375
Epoch 0, iter 170, loss: 8521.859375
Epoch 0, iter 180, loss: 7707.30615234375
Epoch 0, iter 190, loss: 8351.17578125
Epoch 0, iter 200, loss: 7418.123046875
Epoch 0, iter 210, loss: 7320.1533203125
Epoch 0, iter 220, loss: 7943.533203125
Epoch 0, iter 230, loss: 6699.02490234375
Epoch 0, iter 240, loss: 679

In [3]:
def load_model(model_path):
    model = models.resnet50(pretrained=True)
    model.fc = torch.nn.Linear(model.fc.in_features, 14*2) 
    model.load_state_dict(torch.load(model_path, map_location='cpu'))
    model.eval()
    return model

def get_transform():
    return transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

def predict_keypoints(model, image, transform):
    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image_tensor = transform(image_rgb).unsqueeze(0)
    
    with torch.no_grad():
        outputs = model(image_tensor)
    
    keypoints = outputs.squeeze().cpu().numpy()
    original_h, original_w = image.shape[:2]
    keypoints[::2] *= original_w / 224.0
    keypoints[1::2] *= original_h / 224.0

    return keypoints

def draw_keypoints(image, keypoints):
    keypoints = keypoints.astype(int)
    
    for i in range(0, len(keypoints), 2):
        x, y = keypoints[i], keypoints[i + 1]
        cv2.circle(image, (x, y), 5, (0, 255, 0), -1)
    
    return image

model_path = "model_epoch_18.pth"
model = load_model(model_path)
transform = get_transform()

image = cv2.imread("test.jpg")

keypoints = predict_keypoints(model, image, transform)
image_with_keypoints = draw_keypoints(image, keypoints)

cv2.imwrite('output_with_keypoints181.jpg', image_with_keypoints)


  model.load_state_dict(torch.load(model_path, map_location='cpu'))


True

In [10]:
class KeypointsVisualizer:
    def __init__(self, keypoint_color=(0, 255, 0), keypoint_radius=5):
        self.keypoint_color = keypoint_color
        self.keypoint_radius = keypoint_radius

    def draw_keypoints(self, frame, keypoints):
        for i in range(0, len(keypoints), 2):
            x, y = int(keypoints[i]), int(keypoints[i+1])
            cv2.circle(frame, (x, y), self.keypoint_radius, self.keypoint_color, -1)
        return frame

    def draw_keypoints_on_video(self, video_path, model, transform):
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            print("Error: Could not open video.")
            return None

        frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = cap.get(cv2.CAP_PROP_FPS)

        output_path = 'output_with_keypoints_video.mp4'
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))

        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            keypoints = predict_keypoints(model, image_rgb, transform)

            frame_with_keypoints = self.draw_keypoints(frame, keypoints)

            out.write(frame_with_keypoints)

        cap.release()
        out.release()

        return output_path

In [None]:
model_path = "model_epoch_18.pth"
model = load_model(model_path)
transform = get_transform()

visualizer = KeypointsVisualizer()
input_video_path = "input_videos/input_video.mp4"
output_video_path = visualizer.draw_keypoints_on_video(input_video_path, model, transform)

print(f"Output video saved at: {output_video_path}")


  model.load_state_dict(torch.load(model_path, map_location='cpu'))


Output video saved at: output_with_keypoints_video.mp4
