In [13]:
!pip install pandas
!pip install torch
!pip install torchvision
!pip install scikit-learn




In [14]:
import os
import json
import ast
from PIL import Image
import numpy as np
import torch
from torch.utils.data import Dataset
import torchvision.transforms as T
import pandas as pd

# Função para limpar o campo 'image' no DataFrame
def clean_img_name(name):
    # Se for string no formato de lista (ex: "['nome.jpg']"), converte para lista e pega o primeiro item
    if isinstance(name, str) and name.startswith('[') and name.endswith(']'):
        try:
            parsed = ast.literal_eval(name)
            if isinstance(parsed, list) and len(parsed) > 0:
                return parsed[0]
        except:
            pass
    return name

# Carregar CSV e limpar nomes das imagens
df = pd.read_csv('../dados/dataset_pose.csv')
df['image'] = df['image'].apply(clean_img_name)

# Converter DataFrame em lista de dicionários
samples = df.to_dict(orient='records')

# Mapear ações para índices
actions = sorted(set(s['action'] for s in samples))
action_to_idx = {a: i for i, a in enumerate(actions)}


In [15]:

def collate_fn_pad(batch):
    """
    batch: lista de tuplas (image, keypoints_tensor, label)
    Faz padding dos keypoints para o tamanho do maior do batch.
    """
    images = [item[0] for item in batch]
    keypoints = [item[1] for item in batch]
    labels = [item[2] for item in batch]
    
    # Empilha imagens normalmente (mesmo shape)
    images = torch.stack(images)
    
    # Pega o tamanho máximo de keypoints no batch
    max_kps = max(kp.shape[0] for kp in keypoints)
    
    # Padding dos keypoints com zeros
    padded_kps = []
    for kp in keypoints:
        pad_size = max_kps - kp.shape[0]
        if pad_size > 0:
            padding = torch.zeros((pad_size, 3), dtype=torch.float32)
            kp_padded = torch.cat([kp, padding], dim=0)
        else:
            kp_padded = kp
        padded_kps.append(kp_padded)
    
    keypoints_batch = torch.stack(padded_kps)
    labels = torch.tensor(labels, dtype=torch.long)
    
    return images, keypoints_batch, labels

In [16]:
class PoseDataset(Dataset):
    def __init__(self, data, images_dir, img_key='image', keypoints_key='keypoints', label_key='action',
                 image_size=(256, 256), transform=None):
     
        self.data = data
        self.images_dir = images_dir
        self.img_key = img_key
        self.keypoints_key = keypoints_key
        self.label_key = label_key
        self.image_size = image_size
        self.transform = transform
        
        # Se não passar transform, cria padrão de redimensionar e converter em tensor
        if self.transform is None:
            self.transform = T.Compose([
                T.Resize(self.image_size),
                T.ToTensor(),
            ])
    
    def __len__(self):
        return len(self.data)
    

    
    def __getitem__(self, idx):
        row = self.data[idx]
    
        img_filename = row[self.img_key]
        if isinstance(img_filename, list):
            img_filename = img_filename[0]
        if isinstance(img_filename, str) and img_filename.startswith("[") and img_filename.endswith("]"):
            img_filename = img_filename.strip("[]").replace("'", "").replace('"', "").strip()
    
        img_path = os.path.join(self.images_dir, img_filename)
    
        image = Image.open(img_path).convert('RGB')
        image = self.transform(image)
    
        keypoints = row[self.keypoints_key]
    
        if isinstance(keypoints, str):
            keypoints = json.loads(keypoints)
    
        if isinstance(keypoints, list) and isinstance(keypoints[0], dict):
            keypoints = [[kp['x'], kp['y']] for kp in keypoints]
        keypoints = np.array(keypoints)
        if keypoints.shape[1] == 2:
            # Adiciona uma terceira coluna com 1 (visibilidade = 1, por exemplo)
            vis = np.ones((keypoints.shape[0], 1))
            keypoints = np.hstack((keypoints, vis))
        keypoints = torch.tensor(keypoints, dtype=torch.float32)
    
        label = row[self.label_key]
        
        # Se label for string numérica
        try:
            label = int(label)
        except:
            # se for categoria textual, faça seu mapeamento aqui
            label_map = {'dancing': 0, 'miscellaneous': 1, 'sports': 2}  # ajuste conforme seu caso
            label = label_map[label]
        
        label = torch.tensor(label, dtype=torch.long)
        

        max_kps = 16
        if keypoints.shape[0] < max_kps:
            pad_size = max_kps - keypoints.shape[0]
            padding = torch.zeros(pad_size, keypoints.shape[1])  # assume (N,3)
            keypoints = torch.cat([keypoints, padding], dim=0)
        elif keypoints.shape[0] > max_kps:
            keypoints = keypoints[:max_kps]
    
        return image, keypoints, label

In [17]:
import pandas as pd
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader

# 1. Ler CSV
df = pd.read_csv('../dados/dataset_pose.csv')

# 2. Criar mapeamento das ações para índices
actions = sorted(df['action'].unique())
action_to_idx = {a: i for i, a in enumerate(actions)}

# 3. Mapear coluna 'action' para numérico
df['action_idx'] = df['action'].map(action_to_idx)  # Usa a coluna esperada pela classe

# 4. Separar treino/val (80/20), estratificado pela label
train_df, val_df = train_test_split(df, test_size=0.2, stratify=df['action_idx'], random_state=42)

# 5. Converter DataFrame para lista de dicts (records)
train_samples = train_df.to_dict('records')
val_samples = val_df.to_dict('records')


In [18]:
train_dataset = PoseDataset(train_samples, images_dir='../dados/mpii_human_pose_v1/images')
val_dataset = PoseDataset(val_samples, images_dir='../dados/mpii_human_pose_v1/images')


In [19]:
from torch.utils.data import DataLoader

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=2, collate_fn=collate_fn_pad)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=2, collate_fn=collate_fn_pad)


In [23]:
from sklearn.metrics import classification_report
def evaluate_model(model, dataloader, device, class_names):
    model.eval()
    all_preds = []
    all_labels = []
    mse_loss = nn.MSELoss()
    total_kp_loss = 0
    with torch.no_grad():
        for images, keypoints, labels in dataloader:
            images = images.to(device)
            keypoints = keypoints[:, :, :2].reshape(images.size(0), -1).to(device)
            labels = labels.cpu().numpy()

            pred_kps, pred_action = model(images)

            # Regressão: acumula MSE dos keypoints
            loss_kps = mse_loss(pred_kps, keypoints)
            total_kp_loss += loss_kps.item()

            # Classificação: coleta predições e labels para classification_report
            preds = pred_action.argmax(dim=1).cpu().numpy()
            all_preds.extend(preds)
            all_labels.extend(labels)

    avg_kp_loss = total_kp_loss / len(dataloader)
    cls_report = classification_report(all_labels, all_preds, target_names=class_names, zero_division=0)

    print("=== Classification Report ===")
    print(cls_report)
    print(f"Average Keypoints MSE Loss: {avg_kp_loss:.4f}")

    return cls_report, avg_kp_loss

In [25]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import KFold
import numpy as np

# Classe modelo adaptada para filtro variável (parâmetro)
class PoseActionCNN(nn.Module):
    def __init__(self, num_keypoints=16, num_classes=3, base_filters=32):
        super(PoseActionCNN, self).__init__()

        self.features = nn.Sequential(
            nn.Conv2d(3, base_filters, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(base_filters, base_filters*2, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(base_filters*2, base_filters*4, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.AdaptiveAvgPool2d((4, 4))  # reduz para (B, base_filters*4, 4, 4)
        )

        self.flatten = nn.Flatten()
        fc_input_size = base_filters*4 * 4 * 4

        self.fc = nn.Sequential(
            nn.Linear(fc_input_size, 512),
            nn.ReLU(),
            nn.Dropout(0.3)
        )

        self.fc_keypoints = nn.Linear(512, num_keypoints * 2)
        self.fc_action = nn.Linear(512, num_classes)

    def forward(self, x):
        x = self.features(x)
        x = self.flatten(x)
        x = self.fc(x)
        return self.fc_keypoints(x), self.fc_action(x)

# Funções train_one_epoch e validate continuam iguais (copie a sua)

def train_one_epoch(model, dataloader, optimizer, mse_loss, ce_loss, device):
    model.train()
    total_loss = 0
    for images, keypoints, labels in dataloader:
        images = images.to(device)
        keypoints = keypoints[:, :, :2].reshape(images.size(0), -1).to(device)
        labels = labels.to(device)

        pred_kps, pred_action = model(images)

        loss_kps = mse_loss(pred_kps, keypoints)
        loss_action = ce_loss(pred_action, labels)
        loss = loss_kps + loss_action

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
    return total_loss / len(dataloader)

def validate(model, dataloader, mse_loss, ce_loss, device):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for images, keypoints, labels in dataloader:
            images = images.to(device)
            keypoints = keypoints[:, :, :2].reshape(images.size(0), -1).to(device)
            labels = labels.to(device)

            pred_kps, pred_action = model(images)

            loss_kps = mse_loss(pred_kps, keypoints)
            loss_action = ce_loss(pred_action, labels)
            loss = loss_kps + loss_action

            total_loss += loss.item()
            preds = pred_action.argmax(dim=1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)

    acc = correct / total
    return total_loss / len(dataloader), acc

# --- Validação cruzada com busca em hiperparâmetros ---

from torch.utils.data import Subset, DataLoader

def cross_validate(dataset, device, k=5, epochs=10, batch_size=32, param_grid=None):
    from torch.utils.data import DataLoader, Subset
    from sklearn.model_selection import KFold

    kf = KFold(n_splits=k, shuffle=True, random_state=42)
    fold_results = []
    reports = []

    # hiperparâmetros para grid search (exemplo)
    if param_grid is None:
        param_grid = [{'lr': 0.01, 'base_filters': 32}, {'lr': 0.1, 'base_filters': 64}]

    for params in param_grid:
        print(f"\nTestando parâmetros: lr={params['lr']}, base_filters={params['base_filters']}")
        
        for fold, (train_idx, val_idx) in enumerate(kf.split(dataset)):
            print(f"\nFold {fold+1}/{k}")

            train_subset = Subset(dataset, train_idx)
            val_subset = Subset(dataset, val_idx)

            train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True, num_workers=2)
            val_loader = DataLoader(val_subset, batch_size=batch_size, shuffle=False, num_workers=2)

            model = PoseActionCNN(base_filters=params['base_filters']).to(device)
            optimizer = torch.optim.Adam(model.parameters(), lr=params['lr'])
            mse_loss = nn.MSELoss()
            ce_loss = nn.CrossEntropyLoss()

            for epoch in range(epochs):
                train_loss = train_one_epoch(model, train_loader, optimizer, mse_loss, ce_loss, device)
                val_loss, val_acc = validate(model, val_loader, mse_loss, ce_loss, device)
                print(f"Epoch {epoch+1}/{epochs} - Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f}")

            # Adiciona relatório detalhado após todas as épocas do fold
            print(f"\nClassification Report for Fold {fold+1}:")
            report, kp_mse = evaluate_model(model, val_loader, device, class_names=['dancing', 'miscellaneous', 'sports'])
            reports.append({'classification_report': report, 'keypoints_mse': kp_mse})

            fold_results.append({'params': params, 'val_loss': val_loss, 'val_acc': val_acc})

    return fold_results, reports



# Exemplo de uso



In [26]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
results, reports = cross_validate(train_dataset, device=device, k=4, epochs=5)



Testando parâmetros: lr=0.01, base_filters=32

Fold 1/4
Epoch 1/5 - Val Loss: 107325.1875 | Val Acc: 0.2833
Epoch 2/5 - Val Loss: 90662.0156 | Val Acc: 0.2833
Epoch 3/5 - Val Loss: 88226.3490 | Val Acc: 0.3667
Epoch 4/5 - Val Loss: 103864.2708 | Val Acc: 0.3500
Epoch 5/5 - Val Loss: 88506.7799 | Val Acc: 0.3667

Classification Report for Fold 1:
=== Classification Report ===
               precision    recall  f1-score   support

      dancing       0.00      0.00      0.00        51
miscellaneous       0.00      0.00      0.00        63
       sports       0.37      1.00      0.54        66

     accuracy                           0.37       180
    macro avg       0.12      0.33      0.18       180
 weighted avg       0.13      0.37      0.20       180

Average Keypoints MSE Loss: 88503.5690

Fold 2/4
Epoch 1/5 - Val Loss: 344597.3203 | Val Acc: 0.3611
Epoch 2/5 - Val Loss: 87907.2142 | Val Acc: 0.3611
Epoch 3/5 - Val Loss: 76998.0749 | Val Acc: 0.3778
Epoch 4/5 - Val Loss: 79481.92

In [27]:
results

[{'params': {'lr': 0.01, 'base_filters': 32},
  'val_loss': 88506.77994791667,
  'val_acc': 0.36666666666666664},
 {'params': {'lr': 0.01, 'base_filters': 32},
  'val_loss': 80858.25065104167,
  'val_acc': 0.37777777777777777},
 {'params': {'lr': 0.01, 'base_filters': 32},
  'val_loss': 79991.8125,
  'val_acc': 0.3},
 {'params': {'lr': 0.01, 'base_filters': 32},
  'val_loss': 81879.40234375,
  'val_acc': 0.3},
 {'params': {'lr': 0.1, 'base_filters': 64},
  'val_loss': 108536.04036458333,
  'val_acc': 0.36666666666666664},
 {'params': {'lr': 0.1, 'base_filters': 64},
  'val_loss': 73989.63606770833,
  'val_acc': 0.2611111111111111},
 {'params': {'lr': 0.1, 'base_filters': 64},
  'val_loss': 95060.50651041667,
  'val_acc': 0.37222222222222223},
 {'params': {'lr': 0.1, 'base_filters': 64},
  'val_loss': 81389.60481770833,
  'val_acc': 0.3}]

In [28]:
reports

[{'classification_report': '               precision    recall  f1-score   support\n\n      dancing       0.00      0.00      0.00        51\nmiscellaneous       0.00      0.00      0.00        63\n       sports       0.37      1.00      0.54        66\n\n     accuracy                           0.37       180\n    macro avg       0.12      0.33      0.18       180\n weighted avg       0.13      0.37      0.20       180\n',
  'keypoints_mse': 88503.56901041667},
 {'classification_report': '               precision    recall  f1-score   support\n\n      dancing       0.38      1.00      0.55        68\nmiscellaneous       0.00      0.00      0.00        47\n       sports       0.00      0.00      0.00        65\n\n     accuracy                           0.38       180\n    macro avg       0.13      0.33      0.18       180\n weighted avg       0.14      0.38      0.21       180\n',
  'keypoints_mse': 80853.48958333333},
 {'classification_report': '               precision    recall  f1-s

In [29]:
class YOLOInspiredNet(nn.Module):
    def __init__(self, num_keypoints=16, num_classes=3):
        super(YOLOInspiredNet, self).__init__()
        self.conv_layers = nn.Sequential(
            nn.Conv2d(3, 16, 3, padding=1),  # (B, 16, H, W)
            nn.BatchNorm2d(16),
            nn.LeakyReLU(0.1),
            nn.MaxPool2d(2),  # (B, 16, H/2, W/2)

            nn.Conv2d(16, 32, 3, padding=1),
            nn.BatchNorm2d(32),
            nn.LeakyReLU(0.1),
            nn.MaxPool2d(2),  # (B, 32, H/4, W/4)

            nn.Conv2d(32, 64, 3, padding=1),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(0.1),
            nn.MaxPool2d(2),  # (B, 64, H/8, W/8)

            nn.Conv2d(64, 128, 3, padding=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.1),
            nn.AdaptiveAvgPool2d((4, 4))  # (B, 128, 4, 4)
        )

        self.flatten = nn.Flatten()
        self.fc = nn.Sequential(
            nn.Linear(128 * 4 * 4, 512),
            nn.LeakyReLU(0.1),
            nn.Dropout(0.3)
        )

        self.fc_keypoints = nn.Linear(512, num_keypoints * 2)  # regressão
        self.fc_class = nn.Linear(512, num_classes)            # classificação

    def forward(self, x):
        x = self.conv_layers(x)
        x = self.flatten(x)
        x = self.fc(x)
        return self.fc_keypoints(x), self.fc_class(x)


In [30]:
def cross_validate2(dataset, device, k=5, epochs=10, batch_size=32, param_grid=None):
    from torch.utils.data import DataLoader, Subset
    from sklearn.model_selection import KFold
    from collections import Counter
    import torch.nn.functional as F

    kf = KFold(n_splits=k, shuffle=True, random_state=42)
    fold_results = []
    reports = []

    if param_grid is None:
        param_grid = [{'lr': 0.01, 'base_filters': 32}, {'lr': 0.1, 'base_filters': 64}]

    for params in param_grid:
        print(f"\nTestando parâmetros: lr={params['lr']}, base_filters={params['base_filters']}")
        
        for fold, (train_idx, val_idx) in enumerate(kf.split(dataset)):
            print(f"\nFold {fold+1}/{k}")

            train_subset = Subset(dataset, train_idx)
            val_subset = Subset(dataset, val_idx)

            train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True, num_workers=2)
            val_loader = DataLoader(val_subset, batch_size=batch_size, shuffle=False, num_workers=2)

            model = YOLOInspiredNet(num_keypoints=16, num_classes=3).to(device)
            optimizer = torch.optim.Adam(model.parameters(), lr=params['lr'])
            mse_loss = nn.MSELoss()

            # === CALCULAR PESOS DAS CLASSES PARA ESSE FOLD ===
            train_labels = [dataset[i][2] for i in train_idx]  # Ajuste se o label estiver em outra posição
            label_counts = Counter(train_labels)
            total = sum(label_counts.values())
            num_classes = 3
            weights = [total / label_counts.get(i, 1) for i in range(num_classes)]
            weights = torch.tensor(weights, dtype=torch.float32).to(device)

            ce_loss = nn.CrossEntropyLoss(weight=weights)

            print(f"Pesos usados para CrossEntropyLoss: {weights.tolist()}")

            for epoch in range(epochs):
                train_loss = train_one_epoch(model, train_loader, optimizer, mse_loss, ce_loss, device,
                                             loss_weights=(0.5, 1.0))  # Ponderação das losses
                val_loss, val_acc = validate(model, val_loader, mse_loss, ce_loss, device)
                print(f"Epoch {epoch+1}/{epochs} - Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f}")

                # Diagnóstico de colapso: distribuição de classes previstas
                if epoch == epochs - 1:
                    with torch.no_grad():
                        all_preds = []
                        for images, _, _ in val_loader:
                            images = images.to(device)
                            logits = model(images)[1]
                            preds = torch.argmax(logits, dim=1)
                            all_preds.extend(preds.cpu().numpy())
                        pred_dist = Counter(all_preds)
                        print(f"Distribuição das classes previstas: {pred_dist}")

            print(f"\nClassification Report for Fold {fold+1}:")
            report, kp_mse = evaluate_model(model, val_loader, device, class_names=['dancing', 'miscellaneous', 'sports'])
            reports.append({'classification_report': report, 'keypoints_mse': kp_mse})

            fold_results.append({'params': params, 'val_loss': val_loss, 'val_acc': val_acc})

    return fold_results, reports


In [31]:
results2, reports2 = cross_validate(train_dataset, device=device, k=4, epochs=5)



Testando parâmetros: lr=0.01, base_filters=32

Fold 1/4
Epoch 1/5 - Val Loss: 378660.2292 | Val Acc: 0.3833
Epoch 2/5 - Val Loss: 297631.9036 | Val Acc: 0.3667
Epoch 3/5 - Val Loss: 104242.2344 | Val Acc: 0.3500
Epoch 4/5 - Val Loss: 96989.0951 | Val Acc: 0.3500
Epoch 5/5 - Val Loss: 87297.4089 | Val Acc: 0.3667

Classification Report for Fold 1:
=== Classification Report ===
               precision    recall  f1-score   support

      dancing       0.00      0.00      0.00        51
miscellaneous       0.00      0.00      0.00        63
       sports       0.37      1.00      0.54        66

     accuracy                           0.37       180
    macro avg       0.12      0.33      0.18       180
 weighted avg       0.13      0.37      0.20       180

Average Keypoints MSE Loss: 87293.6914

Fold 2/4
Epoch 1/5 - Val Loss: 345028.8333 | Val Acc: 0.2611
Epoch 2/5 - Val Loss: 111414.1979 | Val Acc: 0.3611
Epoch 3/5 - Val Loss: 78089.7884 | Val Acc: 0.3778
Epoch 4/5 - Val Loss: 74540.