In [1]:
# 导入必要的库
import os
import pandas as pd
import random
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt


In [2]:
# 加载 CSV 数据
file_path = "df.csv"  # 修改为实际路径
data = pd.read_csv(file_path)

# 提取舞蹈流派、视频编号和摄像头编号
data['dance_type'] = data['images'].apply(lambda x: x.split('/')[1].split('_')[0])  # 舞蹈流派
data['video_id'] = data['images'].apply(lambda x: x.split('/')[1].split('_')[1])   # 视频编号
data['camera_id'] = data['images'].apply(lambda x: x.split('/')[1].split('_')[2])  # 摄像头编号

In [3]:
# 按舞蹈流派和视频编号统计
dance_video_stats = data.groupby(['dance_type', 'video_id']).size().reset_index(name='frame_count')

# 按流派统计视频数量
dance_type_summary = dance_video_stats.groupby('dance_type')['video_id'].count().reset_index(name='num_videos')

# 输出统计结果
print(dance_type_summary)


  dance_type  num_videos
0     HipHop           6
1       Jazz           6
2       Kata           6
3     Taichi           6


In [4]:
# 定义函数进行视频划分（按70%训练集和30%测试集划分）
def split_videos(dance_video_stats, train_ratio=0.7):
    train_videos, test_videos = {}, {}
    for dance_type in dance_video_stats['dance_type'].unique():
        videos = dance_video_stats[dance_video_stats['dance_type'] == dance_type]['video_id'].unique()
        random.shuffle(videos)  # 随机打乱
        split_idx = int(len(videos) * train_ratio)
        train_videos[dance_type] = videos[:split_idx]  # 前70%为训练集
        test_videos[dance_type] = videos[split_idx:]   # 剩余30%为测试集
    return train_videos, test_videos

# 执行划分
train_videos, test_videos = split_videos(dance_video_stats)

# 打印划分结果
print("训练集划分：", {k: len(v) for k, v in train_videos.items()})
print("测试集划分：", {k: len(v) for k, v in test_videos.items()})


训练集划分： {'HipHop': 4, 'Jazz': 4, 'Kata': 4, 'Taichi': 4}
测试集划分： {'HipHop': 2, 'Jazz': 2, 'Kata': 2, 'Taichi': 2}


In [5]:
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image

class DanceDataset(Dataset):
    def __init__(self, data, root_dir, transform=None, num_frames=5):
        """
        Args:
            data (DataFrame): 包含 images 列和 dance_type 列的 DataFrame。
            root_dir (str): images 文件夹的根目录。
            transform (callable, optional): 对图像进行的变换操作。
            num_frames (int): 每个样本包含的帧数（时间维度）。
        """
        self.data = data
        self.root_dir = root_dir
        self.transform = transform
        self.num_frames = num_frames

        # print("Dataset Preview:")
        # print(self.data.head())  # 检查数据内容

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        image_path = os.path.join(self.root_dir, self.data.iloc[idx]['images'])
        # 检查路径是否存在
        if not os.path.exists(image_path):
            raise FileNotFoundError(f"文件未找到：{image_path}")

        # 加载图像
        image = Image.open(image_path).convert("RGB")
        if self.transform:
            image = self.transform(image)

        # 将舞蹈类别转换为整数标签
        label = self.data.iloc[idx]['dance_type']
        label_mapping = {"HipHop": 0, "Jazz": 1, "Kata": 2, "Taichi": 3}
        label = label_mapping[label]

        # 添加时间维度
        # 添加时间维度，重复图像到 num_frames
        image = image.unsqueeze(0).repeat(self.num_frames, 1, 1, 1)  # (time, height, width)
        # image = image.unsqueeze(0)  # (1, height, width)

        return image, label


In [6]:
# 展平 train_videos 和 test_videos 的所有视频 ID 列表
train_video_ids = [video for videos in train_videos.values() for video in videos]
test_video_ids = [video for videos in test_videos.values() for video in videos]

# 过滤训练集数据
train_data = data[data['video_id'].isin(train_video_ids)]
test_data = data[data['video_id'].isin(test_video_ids)]

# 打印训练集和测试集样本数
print(f"训练集样本数: {len(train_data)}")
print(f"测试集样本数: {len(test_data)}")

训练集样本数: 711
测试集样本数: 383


In [7]:
# 定义图像变换
transform = transforms.Compose([
    transforms.Resize((250, 250)),  # 调整图像大小
    transforms.ToTensor(),         # 转换为 Tensor
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])  # 归一化
])

# 创建数据集
root_dir = "segmentation_full_body_mads_dataset"
train_dataset = DanceDataset(train_data, root_dir=root_dir, transform=transform)
test_dataset = DanceDataset(test_data, root_dir=root_dir, transform=transform)

# 创建数据加载器
batch_size = 16
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=0)

In [8]:
for images, labels in train_loader:
    print(f"Batch images shape: {images.shape}")
    print(f"Batch labels: {labels}")
    break

Batch images shape: torch.Size([16, 5, 3, 250, 250])
Batch labels: tensor([1, 1, 2, 0, 3, 0, 2, 3, 3, 0, 3, 0, 0, 3, 3, 1])


In [9]:
# 检查 train_data 的图像路径拼接是否正确
for i in range(5):  # 查看前5条路径
    print(os.path.join(root_dir, train_data.iloc[i]['images']))

# 实例化数据集
train_dataset = DanceDataset(train_data, root_dir=root_dir, transform=transform)


segmentation_full_body_mads_dataset/images/HipHop_HipHop1_C0_00180.png
segmentation_full_body_mads_dataset/images/HipHop_HipHop1_C0_00225.png
segmentation_full_body_mads_dataset/images/HipHop_HipHop1_C0_00360.png
segmentation_full_body_mads_dataset/images/HipHop_HipHop1_C0_00405.png
segmentation_full_body_mads_dataset/images/HipHop_HipHop1_C0_00450.png


In [10]:
# 测试数据加载
dataset = DanceDataset(train_data, root_dir=root_dir, transform=transform)

# 检查第一个样本
image, label = dataset[0]
print(f"图像尺寸: {image.size}")  # 打印图像尺寸
print(f"标签: {label}")         # 打印标签


图像尺寸: <built-in method size of Tensor object at 0x15bd971d0>
标签: 0


In [11]:

# 检查数据加载器是否正常工作
for images, labels in train_loader:
    print(f"图像批次维度: {images.shape}")
    print(f"标签批次: {labels}")
    break


图像批次维度: torch.Size([16, 5, 3, 250, 250])
标签批次: tensor([1, 0, 0, 0, 0, 3, 1, 0, 0, 3, 0, 3, 0, 1, 0, 0])


In [12]:
import torch.nn as nn
import torch.nn.functional as F

class Dance3DCNN(nn.Module):
    def __init__(self, num_classes):
        super(Dance3DCNN, self).__init__()
        self.conv1 = nn.Conv3d(3, 16, kernel_size=(3, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1))
        self.pool1 = nn.MaxPool3d(kernel_size=(2, 2, 2), stride=(2, 2, 2))
        self.conv2 = nn.Conv3d(16, 32, kernel_size=(3, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1))
        self.pool2 = nn.MaxPool3d(kernel_size=(2, 2, 2), stride=(1, 1, 1)) 
        
        self.fc1 = nn.Linear(32 * 1 * 124 * 124, 128) 
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        # print(f"Input to conv1: {x.shape}") 
        x = F.relu(self.conv1(x))
        # print(f"After conv1: {x.shape}")
        x = self.pool1(x)
        # print(f"After pool1: {x.shape}")
        x = F.relu(self.conv2(x))
        # print(f"After conv2: {x.shape}")
        x = self.pool2(x)
        # print(f"After pool2: {x.shape}")
        x = x.view(x.size(0), -1)
        # print(f"After view: {x.shape}")
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x



In [13]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = Dance3DCNN(num_classes=4).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)


In [14]:
# from tqdm import tqdm

def train_model(model, train_loader, test_loader, criterion, optimizer, epochs):
    train_losses, train_accuracies = [], []
    val_losses, val_accuracies = [], []
    
    for epoch in range(epochs):
        print(f"Epoch {epoch+1}/{epochs}")
        model.train()
        train_loss = 0
        correct = 0
        total = 0

        for images, labels in train_loader:
        # for images, labels in tqdm(train_loader, desc=f"Training Epoch {epoch+1}/{epochs}"):
            images, labels = images.to(device), labels.to(device)
            images = images.permute(0, 2, 1, 3, 4)  # (batch_size, channels, time, height, width)
            # (batch_size, channels*time, height, width)
            # images = images.reshape(images.size(0), 3, images.size(2), images.size(3), images.size(4))

            
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

        # train_loss /= len(train_loader)
        train_accuracy = 100. * correct / total
        train_losses.append(train_loss / len(train_loader))  # 平均损失
        # train_losses.append(train_loss)
        train_accuracies.append(train_accuracy)
        print(f"Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}%")
        
        

        # Validation
        model.eval()
        test_loss = 0
        correct = 0
        total = 0
        with torch.no_grad():
            # for images, labels in tqdm(test_loader, desc=f"Validation Epoch {epoch+1}/{epochs}"):
            for images, labels in test_loader:
                images, labels = images.to(device), labels.to(device)
                
                images = images.permute(0, 2, 1, 3, 4)  # 验证也需要调整维度
                
                outputs = model(images)
                loss = criterion(outputs, labels)
                test_loss += loss.item()
                _, predicted = outputs.max(1)
                total += labels.size(0)
                correct += predicted.eq(labels).sum().item()

      
        # test_loss /= len(test_loader)
        test_accuracy = 100. * correct / total
        val_losses.append(test_loss / len(test_loader))  # 平均损失
        val_accuracies.append(test_accuracy)
        print(f"Validation Loss: {test_loss:.4f}, Validation Accuracy: {test_accuracy:.2f}%")

        # 打印学习率
        for param_group in optimizer.param_groups:
            print(f"Learning Rate: {param_group['lr']}")
            

    # 绘制 Loss 和 Accuracy 曲线
    plt.figure(figsize=(10, 5))
    plt.plot(train_losses, label='Train Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()
    
    plt.figure(figsize=(10, 5))
    plt.plot(train_accuracies, label='Train Accuracy')
    plt.plot(val_accuracies, label='Validation Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.show()


In [15]:
for images, labels in train_loader:
    print(f"Batch images shape: {images.shape}")
    print(f"Batch labels: {labels}")
    break

Batch images shape: torch.Size([16, 5, 3, 250, 250])
Batch labels: tensor([1, 1, 1, 2, 2, 3, 1, 0, 3, 1, 2, 0, 2, 2, 0, 0])


In [16]:
epochs = 5
train_model(model, train_loader, test_loader, criterion, optimizer, epochs)
torch.save(model.state_dict(),"xiaoewochaoshini.pth")

Epoch 1/5
Epoch 1/5, Train Loss: 33.5016, Train Accuracy: 69.90%
Validation Loss: 6.0508, Validation Accuracy: 89.56%
Learning Rate: 0.0001
Epoch 2/5
Epoch 2/5, Train Loss: 7.3478, Train Accuracy: 95.22%
Validation Loss: 1.4891, Validation Accuracy: 99.22%
Learning Rate: 0.0001
Epoch 3/5
Epoch 3/5, Train Loss: 2.7033, Train Accuracy: 99.02%
Validation Loss: 0.9476, Validation Accuracy: 98.96%
Learning Rate: 0.0001
Epoch 4/5
Epoch 4/5, Train Loss: 1.3725, Train Accuracy: 99.58%
Validation Loss: 3.8173, Validation Accuracy: 92.43%
Learning Rate: 0.0001
Epoch 5/5
Epoch 5/5, Train Loss: 1.2376, Train Accuracy: 99.58%
Validation Loss: 0.5194, Validation Accuracy: 99.48%
Learning Rate: 0.0001


NameError: name 'train_losses' is not defined

<Figure size 1000x500 with 0 Axes>

In [40]:
import matplotlib.pyplot as plt

# 绘制 Loss 和 Accuracy 曲线
plt.figure(figsize=(10, 5))
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

plt.figure(figsize=(10, 5))
plt.plot(train_accuracies, label='Train Accuracy')
plt.plot(val_accuracies, label='Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

NameError: name 'train_losses' is not defined

<Figure size 1000x500 with 0 Axes>

In [None]:
-