In [1]:
import os
import cv2


# This script processes images and corresponding YOLO label files to crop and save specific regions (faces of adults and children) into separate directories.
# 1. `read_yolo_labels` function reads YOLO label files and extracts bounding boxes and class ids.
# 2. `convert_yolo_to_bbox` converts YOLO format (center coordinates and dimensions) to traditional bounding box format (x_min, y_min, x_max, y_max).
# 3. `crop_and_save_image` crops the image based on bounding boxes and saves the cropped regions.
# 4. `process_images` iterates through images and their labels, identifies if the bounding box corresponds to an adult (class_id 1) or child (class_id 10), crops, and saves them in respective directories.


def read_yolo_labels(label_file):
    """Read YOLO label file and return a list of bounding boxes and class ids."""
    with open(label_file, 'r') as file:
        lines = file.readlines()
        bboxes = []
        for line in lines:
            parts = line.strip().split()
            class_id = int(parts[0])
            x_center, y_center, width, height = map(float, parts[1:])
            bboxes.append((class_id, x_center, y_center, width, height))
    return bboxes

def convert_yolo_to_bbox(yolo_bbox, img_width, img_height):
    """Convert YOLO format to bounding box format."""
    class_id, x_center, y_center, width, height = yolo_bbox
    x_center *= img_width
    y_center *= img_height
    width *= img_width
    height *= img_height
    x_min = int(x_center - width / 2)
    y_min = int(y_center - height / 2)
    x_max = int(x_center + width / 2)
    y_max = int(y_center + height / 2)
    return class_id, x_min, y_min, x_max, y_max

def crop_and_save_image(img, bbox, output_dir, class_name, img_name, crop_id):
    """Crop the image using the bounding box and save the cropped image."""
    _, x_min, y_min, x_max, y_max = bbox
    crop_img = img[y_min:y_max, x_min:x_max]
    output_path = os.path.join(output_dir, f"{class_name}_{img_name}_{crop_id}.jpg")
    cv2.imwrite(output_path, crop_img)

def process_images(image_dir, label_dir, output_dir_adult, output_dir_child):
    """Process each image and its corresponding label to crop and save faces."""
    if not os.path.exists(output_dir_adult):
        os.makedirs(output_dir_adult)
    if not os.path.exists(output_dir_child):
        os.makedirs(output_dir_child)

    for img_name in os.listdir(image_dir):
        img_path = os.path.join(image_dir, img_name)
        label_path = os.path.join(label_dir, os.path.splitext(img_name)[0] + '.txt')

        if not os.path.exists(label_path):
            continue

        img = cv2.imread(img_path)
        img_height, img_width = img.shape[:2]
        bboxes = read_yolo_labels(label_path)

        crop_id = 0
        for bbox in bboxes:
            class_id, x_min, y_min, x_max, y_max = convert_yolo_to_bbox(bbox, img_width, img_height)

            if class_id == 1:
                crop_and_save_image(img, (class_id, x_min, y_min, x_max, y_max), output_dir_adult, '1', os.path.splitext(img_name)[0], crop_id)
            elif class_id == 10:
                crop_and_save_image(img, (class_id, x_min, y_min, x_max, y_max), output_dir_child, '0', os.path.splitext(img_name)[0], crop_id)

            crop_id += 1

# Define your directories
image_dir = '/hdd/eldor/age_dataset/CPD_done/train/images'
label_dir = '/hdd/eldor/age_dataset/CPD_done/train/labels'
output_dir_adult = '/hdd/eldor/age_dataset/CPD_done/train/faces/1'
output_dir_child = '/hdd/eldor/age_dataset/CPD_done/train/faces/0'

# Run the processing
process_images(image_dir, label_dir, output_dir_adult, output_dir_child)


In [2]:
# Paths and parameters
data_dir = '/hdd/eldor/age_dataset/CPD_done/train/faces'
output_dir = 'resnet_models'  # Directory to save best models

In [2]:
# This script trains multiple ResNet models (ResNet18, ResNet34, ResNet50, ResNet101, and ResNet152) in parallel using multiple GPUs.
# 
# Key Components:
# 1. **Data Preparation**: 
#    - The dataset is loaded from a directory, and the images are preprocessed using transformations (resize, normalize).
#    - The dataset is split into training, validation, and test sets, ensuring stratified splits based on class labels.
# 
# 2. **Model Modification**: 
#    - The fully connected (FC) layers of the pre-trained ResNet models are modified to suit the binary classification task (adult vs. child).
# 
# 3. **Training Process**: 
#    - Each ResNet model is trained in parallel on different GPUs using multiprocessing, optimizing the network over a defined number of epochs.
#    - During training, the model evaluates performance on both training and validation sets, and the best-performing model is saved.
# 
# 4. **Evaluation**: 
#    - After training, each model is evaluated on the test set, and key metrics like accuracy, precision, recall, F1-score, and confusion matrix are computed and displayed.
# 
# 5. **Parallelism**: 
#    - The training is distributed across available GPUs using multiprocessing, allowing models to train concurrently and efficiently.



import torch
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Subset
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import torch.nn as nn
import torch.optim as optim
import numpy as np
import os
from torchvision import models
import copy
import multiprocessing
from tqdm import tqdm

# Ensure 'spawn' is set for multiprocessing (important for Windows and other environments)
try:
    multiprocessing.set_start_method('spawn', force=True)
except RuntimeError:
    pass


# Paths and parameters
data_dir = '/hdd/eldor/age_dataset/CPD_done/train/faces'
output_dir = 'resnet_models'  # Directory to save best models
os.makedirs(output_dir, exist_ok=True)

batch_size = 32
num_classes = 2  # Assuming binary classification (e.g., adult vs. child)
epochs = 30

# Define the ResNet architectures
resnet_variants = {
    'resnet18': models.resnet18(pretrained=True),
    'resnet34': models.resnet34(pretrained=True),
    'resnet50': models.resnet50(pretrained=True),
    'resnet101': models.resnet101(pretrained=True),
    'resnet152': models.resnet152(pretrained=True)
}

# GPUs (You only have GPU 0 and GPU 1)
gpu_ids = [0, 1]

# Define transforms
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# Load dataset
dataset = torchvision.datasets.ImageFolder(root=data_dir, transform=transform)

# Split dataset into train, validation, and test sets with stratification
train_indices, val_test_indices = train_test_split(
    np.arange(len(dataset.targets)),
    test_size=0.3,
    stratify=dataset.targets,
    random_state=42
)

val_indices, test_indices = train_test_split(
    val_test_indices,
    test_size=0.5,
    stratify=[dataset.targets[i] for i in val_test_indices],
    random_state=42
)

train_set = Subset(dataset, train_indices)
val_set = Subset(dataset, val_indices)
test_set = Subset(dataset, test_indices)

train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False)

def modify_resnet_fc(resnet_model, num_classes):
    in_features = resnet_model.fc.in_features
    resnet_model.fc = nn.Sequential(
        nn.Linear(in_features, 512),
        nn.ReLU(),
        nn.Linear(512, 256),
        nn.ReLU(),
        nn.Linear(256, num_classes)
    )
    return resnet_model

def train_model(resnet_name, resnet_model, gpu_id, output_dir, results):
    device = torch.device(f'cuda:{gpu_id}' if torch.cuda.is_available() else 'cpu')
    print(f"Training {resnet_name} on {device}...")

    resnet_model = modify_resnet_fc(resnet_model, num_classes)
    resnet_model = resnet_model.to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(resnet_model.parameters(), lr=0.001, momentum=0.9)

    dataloaders = {
        'train': train_loader,
        'val': val_loader
    }

    best_model_wts = copy.deepcopy(resnet_model.state_dict())
    best_acc = 0.0
    
    for epoch in range(epochs):
        print(f'Epoch {epoch+1}/{epochs}')
        print('-' * 10)
        
        for phase in ['train', 'val']:
            if phase == 'train':
                resnet_model.train()
            else:
                resnet_model.eval()
            
            running_loss = 0.0
            running_corrects = 0
            
            with tqdm(total=len(dataloaders[phase]), desc=f'{phase.capitalize()} {resnet_name}', ncols=100) as pbar:
                for inputs, labels in dataloaders[phase]:
                    inputs, labels = inputs.to(device), labels.to(device)

                    optimizer.zero_grad()
                    
                    with torch.set_grad_enabled(phase == 'train'):
                        outputs = resnet_model(inputs)
                        _, preds = torch.max(outputs, 1)
                        loss = criterion(outputs, labels)
                        
                        if phase == 'train':
                            loss.backward()
                            optimizer.step()

                    running_loss += loss.item() * inputs.size(0)
                    running_corrects += torch.sum(preds == labels.data)
                    
                    pbar.update(1)
            
            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)
            
            print(f'{phase.capitalize()} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
            
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(resnet_model.state_dict())
    
    resnet_model.load_state_dict(best_model_wts)

    model_save_path = os.path.join(output_dir, f"{resnet_name}_best_model.pth")
    torch.save(resnet_model.state_dict(), model_save_path)
    print(f"Best model saved for {resnet_name} at {model_save_path}")
    
    accuracy, precision, recall, f1, conf_matrix = evaluate_model(resnet_model, test_loader, device)
    
    result = {
        'Model': resnet_name,
        'Accuracy': accuracy,
        'Precision': precision,
        'Recall': recall,
        'F1-Score': f1,
        'Confusion Matrix': conf_matrix
    }
    results.append(result)

def evaluate_model(model, dataloader, device='cpu'):
    model.eval()
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds, average='weighted')
    recall = recall_score(all_labels, all_preds, average='weighted')
    f1 = f1_score(all_labels, all_preds, average='weighted')
    conf_matrix = confusion_matrix(all_labels, all_preds)
    
    return accuracy, precision, recall, f1, conf_matrix

def run_parallel_training():
    processes = []
    results = multiprocessing.Manager().list()

    model_pairs = [
        ('resnet18', 'resnet34'),
        ('resnet50', 'resnet101'),
        ('resnet152', None)
    ]

    for model1, model2 in model_pairs:
        if model1:
            p1 = multiprocessing.Process(target=train_model, args=(model1, resnet_variants[model1], gpu_ids[0], output_dir, results))
            p1.start()
            processes.append(p1)
        
        if model2:
            p2 = multiprocessing.Process(target=train_model, args=(model2, resnet_variants[model2], gpu_ids[1], output_dir, results))
            p2.start()
            processes.append(p2)
        
        for p in processes:
            p.join()

    for result in results:
        print(f"Metrics for {result['Model']}:")
        for metric_name, metric_value in result.items():
            if metric_name != 'Model':
                print(f"{metric_name}: {metric_value}")
        print("\n")

if __name__ == '__main__':
    run_parallel_training()


Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/home/eldor/miniconda3/envs/age/lib/python3.12/multiprocessing/spawn.py", line 122, in spawn_main
    exitcode = _main(fd, parent_sentinel)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/eldor/miniconda3/envs/age/lib/python3.12/multiprocessing/spawn.py", line 132, in _main
    self = reduction.pickle.load(from_parent)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: Can't get attribute 'train_model' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/home/eldor/miniconda3/envs/age/lib/python3.12/multiprocessing/spawn.py", line 122, in spawn_main
    exitcode = _main(fd, parent_sentinel)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/eldor/miniconda3/envs/age/lib/python3.12/multiprocessing/spawn.py", line 132, in _main
    self = reduction.pickle.load(from_parent)
           ^^

In [None]:
# Accuracy:
#     resnet18: 0.9954
#     resnet34: 0.9947
#     resnet50: 0.9894
#     resnet101:
#     resnet152:0.9930

In [5]:
# This script evaluates several pre-trained ResNet models (ResNet18, ResNet34, ResNet50, ResNet101, and ResNet152) on a binary classification task (e.g., adult vs. child).
# 
# Key Components:
# 
# 1. **Data Loading and Preprocessing**:
#    - The dataset is loaded from a specified directory using `ImageFolder`, and images are resized, normalized, and converted to tensors.
#    - A test subset is created using a predefined train/test split to ensure consistency with the training process.
# 
# 2. **Model Modification**:
#    - The fully connected (FC) layer of each pre-trained ResNet model is replaced with a new architecture tailored to binary classification. The FC layers include intermediate layers with ReLU activations for feature learning.
# 
# 3. **Model Evaluation**:
#    - The script loads each ResNet model from saved checkpoints (if available) and evaluates it on the test set.
#    - Metrics such as accuracy, precision, recall, F1-score, and the confusion matrix are calculated for each model to provide insights into the model's performance.
# 
# 4. **Evaluation Loop**:
#    - The script iterates over a set of ResNet models, loading each model's state from disk, modifying the architecture for the classification task, and computing evaluation metrics.
#    - If a model file is not found, the script reports it without halting execution.


import torch
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Subset
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import os
import numpy as np
from torchvision import models
import torch.nn as nn

# Paths
output_dir = 'resnet_models'
data_dir = '/hdd/eldor/age_dataset/CPD_done/train/faces'

# Parameters
batch_size = 32
num_classes = 2  # Assuming binary classification (e.g., adult vs. child)

# Define transforms
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# Load dataset
dataset = torchvision.datasets.ImageFolder(root=data_dir, transform=transform)

# Assuming the same train/test split as in training
_, val_test_indices = train_test_split(
    np.arange(len(dataset.targets)),
    test_size=0.3,
    stratify=dataset.targets,
    random_state=42
)

_, test_indices = train_test_split(
    val_test_indices,
    test_size=0.5,
    stratify=[dataset.targets[i] for i in val_test_indices],
    random_state=42
)

test_set = Subset(dataset, test_indices)
test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False)

# Function to modify the model's fully connected layer
def modify_resnet_fc(resnet_model, num_classes):
    in_features = resnet_model.fc.in_features
    resnet_model.fc = nn.Sequential(
        nn.Linear(in_features, 512),
        nn.ReLU(),
        nn.Linear(512, 256),
        nn.ReLU(),
        nn.Linear(256, num_classes)
    )
    return resnet_model

# Function to evaluate the model
def evaluate_model(model, dataloader, device='cpu'):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds, average='weighted')
    recall = recall_score(all_labels, all_preds, average='weighted')
    f1 = f1_score(all_labels, all_preds, average='weighted')
    conf_matrix = confusion_matrix(all_labels, all_preds)

    return accuracy, precision, recall, f1, conf_matrix

# Evaluate all saved models
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model_names = ['resnet18', 'resnet34', 'resnet50', 'resnet101', 'resnet152']

for model_name in model_names:
    model_path = os.path.join(output_dir, f"{model_name}_best_model.pth")
    
    if os.path.exists(model_path):
        print(f"Evaluating {model_name}...")

        # Load the model
        resnet_model = getattr(models, model_name)(pretrained=False)
        resnet_model = modify_resnet_fc(resnet_model, num_classes)
        resnet_model.load_state_dict(torch.load(model_path))
        resnet_model = resnet_model.to(device)

        # Evaluate the model
        accuracy, precision, recall, f1, conf_matrix = evaluate_model(resnet_model, test_loader, device)

        # Print results
        print(f"Metrics for {model_name}:")
        print(f"Accuracy: {accuracy:.4f}")
        print(f"Precision: {precision:.4f}")
        print(f"Recall: {recall:.4f}")
        print(f"F1-Score: {f1:.4f}")
        print(f"Confusion Matrix:\n{conf_matrix}")
        print("\n")
    else:
        print(f"Model {model_name} not found at {model_path}")


Evaluating resnet18...




Metrics for resnet18:
Accuracy: 0.9970
Precision: 0.9970
Recall: 0.9970
F1-Score: 0.9970
Confusion Matrix:
[[1164    0]
 [   9 1845]]


Evaluating resnet34...




Metrics for resnet34:
Accuracy: 0.9964
Precision: 0.9964
Recall: 0.9964
F1-Score: 0.9964
Confusion Matrix:
[[1161    3]
 [   8 1846]]


Evaluating resnet50...




Metrics for resnet50:
Accuracy: 0.9960
Precision: 0.9960
Recall: 0.9960
F1-Score: 0.9960
Confusion Matrix:
[[1161    3]
 [   9 1845]]


Evaluating resnet101...




Metrics for resnet101:
Accuracy: 0.9973
Precision: 0.9974
Recall: 0.9973
F1-Score: 0.9974
Confusion Matrix:
[[1163    1]
 [   7 1847]]


Evaluating resnet152...




Metrics for resnet152:
Accuracy: 0.9954
Precision: 0.9954
Recall: 0.9954
F1-Score: 0.9954
Confusion Matrix:
[[1157    7]
 [   7 1847]]




In [2]:

# Metrics for resnet18:
# Accuracy: 0.9970
# Precision: 0.9970
# Recall: 0.9970
# F1-Score: 0.9970

# Metrics for resnet34:
# Accuracy: 0.9964
# Precision: 0.9964
# Recall: 0.9964
# F1-Score: 0.9964

# Metrics for resnet50:
# Accuracy: 0.9960
# Precision: 0.9960
# Recall: 0.9960
# F1-Score: 0.9960

# Metrics for resnet101:
# Accuracy: 0.9973
# Precision: 0.9974
# Recall: 0.9973
# F1-Score: 0.9974


# Metrics for resnet152:
# Accuracy: 0.9954
# Precision: 0.9954
# Recall: 0.9954
# F1-Score: 0.9954


In [3]:
# Testing the models on a video examples

# import cv2
# import torch
# import torchvision.transforms as transforms
# from torchvision import models
# import torch.nn as nn
# import os
# from facenet_pytorch import MTCNN
# from PIL import Image

# # Initialize MTCNN for face detection
# mtcnn = MTCNN(keep_all=True, device='cuda' if torch.cuda.is_available() else 'cpu')

# # Parameters
# input_video_path = 'test_3.mp4'
# output_dir = 'resnet_output_video'
# os.makedirs(output_dir, exist_ok=True)

# model_dir = 'resnet_models'
# batch_size = 32
# num_classes = 2  # Assuming binary classification (e.g., adult vs. child)

# # Define transforms for the face images
# transform = transforms.Compose([
#     transforms.Resize((224, 224)),
#     transforms.ToTensor(),
#     transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
# ])

# # Function to modify the model's fully connected layer
# def modify_resnet_fc(resnet_model, num_classes):
#     in_features = resnet_model.fc.in_features
#     resnet_model.fc = nn.Sequential(
#         nn.Linear(in_features, 512),
#         nn.ReLU(),
#         nn.Linear(512, 256),
#         nn.ReLU(),
#         nn.Linear(256, num_classes)
#     )
#     return resnet_model

# # Load models
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# model_names = ['resnet18', 'resnet34', 'resnet50', 'resnet101', 'resnet152']

# for model_name in model_names:
#     model_path = os.path.join(model_dir, f"{model_name}_best_model.pth")
    
#     if os.path.exists(model_path):
#         print(f"Processing video for {model_name}...")

#         # Load the model
#         resnet_model = getattr(models, model_name)(pretrained=False)
#         resnet_model = modify_resnet_fc(resnet_model, num_classes)
#         resnet_model.load_state_dict(torch.load(model_path))
#         resnet_model = resnet_model.to(device)
#         resnet_model.eval()

#         # Process the video
#         cap = cv2.VideoCapture(input_video_path)
#         fourcc = cv2.VideoWriter_fourcc(*'XVID')
#         output_video_path = os.path.join(output_dir, f"{model_name}_output_video.avi")
#         out = cv2.VideoWriter(output_video_path, fourcc, 20.0, (int(cap.get(3)), int(cap.get(4))))

#         while cap.isOpened():
#             ret, frame = cap.read()
#             if not ret:
#                 break

#             # Use MTCNN to detect faces
#             boxes, _ = mtcnn.detect(frame)

#             if boxes is not None:
#                 for box in boxes:
#                     x1, y1, x2, y2 = map(int, box)

#                     # Ensure the bounding box is within the frame boundaries
#                     x1 = max(0, x1)
#                     y1 = max(0, y1)
#                     x2 = min(frame.shape[1], x2)
#                     y2 = min(frame.shape[0], y2)

#                     # Check if the bounding box is valid
#                     if x2 > x1 and y2 > y1:
#                         face_img = frame[y1:y2, x1:x2]
                        
#                         # Convert the NumPy array (OpenCV image) to a PIL image
#                         face_img_pil = Image.fromarray(cv2.cvtColor(face_img, cv2.COLOR_BGR2RGB))
                        
#                         # Apply the transformations
#                         face_tensor = transform(face_img_pil)
#                         face_tensor = face_tensor.unsqueeze(0).to(device)

#                         # Get prediction from the model
#                         with torch.no_grad():
#                             output = resnet_model(face_tensor)
#                             _, predicted_class = torch.max(output, 1)
#                             label = 'Child' if predicted_class.item() == 0 else 'Adult'

#                         # Draw bounding box and label
#                         color = (0, 255, 0) if label == 'Adult' else (255, 0, 0)
#                         cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
#                         cv2.putText(frame, label, (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)

#             # Write the frame to the output video
#             out.write(frame)

#         cap.release()
#         out.release()
#         cv2.destroyAllWindows()

#         print(f"Processed video saved to {output_video_path}")
#     else:
#         print(f"Model {model_name} not found at {model_path}")


In [4]:
# import cv2
# import matplotlib.pyplot as plt
# from pycocotools.coco import COCO
# import numpy as np
# import random

# # COCO body keypoint connections (skeleton) in (start, end) format
# COCO_SKELETON = [
#     (5, 6), (5, 7), (6, 8), (7, 9), (8, 10), 
#     (5, 11), (6, 12), (11, 12), (11, 13), 
#     (12, 14), (13, 15), (14, 16)
# ]

# # A set of distinct colors for classes
# COLORS = [
#     (255, 0, 0),  # Red
#     (0, 255, 0),  # Green
#     (0, 0, 255),  # Blue
#     (255, 255, 0),  # Cyan
#     (255, 0, 255),  # Magenta
#     (0, 255, 255),  # Yellow
#     (128, 0, 128),  # Purple
#     (128, 128, 0),  # Olive
#     (0, 128, 128),  # Teal
# ]

# # Color for keypoints (distinct from class colors)
# KEYPOINT_COLOR = (255, 165, 0)  # Orange

# def get_class_color(class_id, num_classes):
#     """Assign a color to a class based on the class ID."""
#     return COLORS[class_id % len(COLORS)]  # Cycle through available colors

# def draw_keypoints(image, keypoints, skeleton=COCO_SKELETON, color=KEYPOINT_COLOR, line_color=KEYPOINT_COLOR):
#     """Draw keypoints and connect them with lines."""
#     # Draw connections (skeleton lines)
#     for connection in skeleton:
#         start_idx, end_idx = connection
#         x1, y1, v1 = keypoints[start_idx * 3:(start_idx + 1) * 3]
#         x2, y2, v2 = keypoints[end_idx * 3:(end_idx + 1) * 3]
        
#         # Only draw the connection if both keypoints are visible (v > 0)
#         if v1 > 0 and v2 > 0:
#             cv2.line(image, (int(x1), int(y1)), (int(x2), int(y2)), line_color, 2)
    
#     # Draw keypoints themselves
#     for i in range(0, len(keypoints), 3):
#         x, y, v = keypoints[i:i+3]
#         if v > 0:  # Only draw visible keypoints
#             cv2.circle(image, (int(x), int(y)), 3, color, -1)
    
#     return image

# def draw_bbox(image, bbox, color):
#     """Draw bounding box on the image."""
#     x, y, w, h = bbox
#     cv2.rectangle(image, (int(x), int(y)), (int(x + w), int(y + h)), color, 2)
#     return image

# def visualize_annotations(image_path, keypoints_json, bbox_json, output_path):
#     # Initialize COCO objects for keypoints and bbox
#     coco_keypoints = COCO(keypoints_json)
#     coco_bbox = COCO(bbox_json)

#     # Get category IDs and assign unique colors to each class
#     categories = coco_bbox.loadCats(coco_bbox.getCatIds())
#     num_classes = len(categories)
#     class_colors = {cat['id']: get_class_color(i, num_classes) for i, cat in enumerate(categories)}

#     # Get image IDs from both files (assuming they contain the same image IDs)
#     img_ids = coco_keypoints.getImgIds()

#     for img_id in img_ids:
#         img_info = coco_keypoints.loadImgs(img_id)[0]
#         image = cv2.imread(image_path + img_info['file_name'])
#         if image is None:
#             print(f"Image {img_info['file_name']} not found!")
#             continue

#         # Create copies of the image for separate visualizations
#         image_bbox = image.copy()
#         image_keypoints = image.copy()

#         # Get annotations for keypoints and bounding boxes for the same image
#         keypoint_ann_ids = coco_keypoints.getAnnIds(imgIds=img_info['id'])
#         bbox_ann_ids = coco_bbox.getAnnIds(imgIds=img_info['id'])

#         keypoint_anns = coco_keypoints.loadAnns(keypoint_ann_ids)
#         bbox_anns = coco_bbox.loadAnns(bbox_ann_ids)

#         # Draw keypoints with connections on keypoint image
#         for ann in keypoint_anns:
#             if 'keypoints' in ann:
#                 image_keypoints = draw_keypoints(image_keypoints, ann['keypoints'])

#         # Draw bounding boxes with class-specific colors on bbox image
#         for ann in bbox_anns:
#             if 'bbox' in ann:
#                 class_id = ann['category_id']
#                 class_color = class_colors.get(class_id, (255, 255, 255))  # Default to white if class color not found
#                 image_bbox = draw_bbox(image_bbox, ann['bbox'], class_color)

#         # Save the image with bounding boxes
#         bbox_output_file = output_path + "bbox_" + img_info['file_name']
#         cv2.imwrite(bbox_output_file, image_bbox)
#         print(f"Saved bounding box image: {bbox_output_file}")

#         # Save the image with keypoints
#         keypoints_output_file = output_path + "keypoints_" + img_info['file_name']
#         cv2.imwrite(keypoints_output_file, image_keypoints)
#         print(f"Saved keypoints image: {keypoints_output_file}")

# # Example usage
# image_path = "sample_data/"
# keypoints_json = "body_keypoints_coco.json"
# bbox_json = "bbox_coco.json"
# output_path = "sample_data_output/"
# visualize_annotations(image_path, keypoints_json, bbox_json, output_path)

In [5]:
# !/home/eldor/miniconda3/envs/age/bin/python -m pip install pycocotools

In [6]:
# import os
# import subprocess

# def calculate_trim_duration(input_file, target_size_mb=100):
#     # Get the file size in MB
#     current_size_mb = os.path.getsize(input_file) / (1024 * 1024)

#     # Get the duration of the video using ffprobe
#     result = subprocess.run(
#         ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', input_file],
#         stdout=subprocess.PIPE,
#         stderr=subprocess.STDOUT
#     )
#     duration = float(result.stdout)
    
#     print(f"Current video size: {current_size_mb:.2f} MB")
#     print(f"Video duration: {duration:.2f} seconds")

#     # Calculate the average bitrate (size per second)
#     average_bitrate = current_size_mb / duration
#     print(f"Average bitrate: {average_bitrate:.6f} MB/second")

#     # Calculate the duration needed to make the video less than the target size
#     target_duration = target_size_mb / average_bitrate
#     trim_duration = duration - target_duration

#     if trim_duration > 0:
#         print(f"You need to trim the video by {trim_duration:.2f} seconds to make it less than {target_size_mb} MB.")
#     else:
#         print(f"The video is already smaller than {target_size_mb} MB, no need to trim.")
    
#     return duration, trim_duration if trim_duration > 0 else 0

# def trim_video_ffmpeg(input_file, output_file, start_time, duration):
#     # Construct the ffmpeg command to trim the video
#     ffmpeg_command = [
#         'ffmpeg', '-i', input_file, '-ss', str(start_time), '-t', str(duration),
#         '-c', 'copy', output_file
#     ]

#     # Run the ffmpeg command
#     subprocess.run(ffmpeg_command)

#     # Check the final size
#     if os.path.exists(output_file):
#         final_size_mb = os.path.getsize(output_file) / (1024 * 1024)
#         print(f"Trimmed video size: {final_size_mb:.2f} MB")
#     else:
#         print(f"Trimming failed. The output file was not created.")

# # Example usage
# input_video_path = 'teleian_sportage_daytime_2024_08_26.mp4'
# output_video_path = 'teleian_sportage_daytime_2024_08_26_compressed.mp4'

# # Calculate how many seconds to trim
# duration, trim_duration = calculate_trim_duration(input_video_path, target_size_mb=90)

# # If trimming is needed, cut the video
# if trim_duration > 0:
#     trim_video_ffmpeg(input_video_path, output_video_path, start_time=0, duration=(duration - trim_duration))