# maimai 谱面难度预测 - 基于 LSTM 的时序建模

本项目使用 LSTM 神经网络直接处理谱面的 note 序列数据，将每个 note 的时间戳和类型等信息作为时序特征输入模型，预测谱面的难度定数。

**核心思路**：将谱面视为时间序列，每个 note 包含时间戳、类型、位置等属性，通过 LSTM 学习 note 序列的时序特征来预测难度。

## 1. 导入所需库

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import numpy as np
import json
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
import csv
import os
import sys

## 2. 数据处理与序列化

数据处理分为两个主要步骤：
1. **谱面解析**：将 maidata.txt 格式解析为结构化的 note 序列数据
2. **序列预处理**：将 note 序列转换为适合 LSTM 输入的格式

**核心理念**：每个谱面是一个时间序列，包含按时间顺序排列的 note 序列。每个 note 具有时间戳、类型、位置等属性。

### 2.1 解析 maidata.txt

我们使用外部工具 `SimaiSerializerFromMajdataEdit.exe` 来将 `maidata.txt` 格式的谱面文件解析并序列化为 JSON 文件。

数据来源：maichart-converts

**使用方法:**

在终端中执行以下命令，它会将 `data\maichart-converts` 目录下的所有谱面处理并输出到 `data\serialized` 目录。


In [11]:
command = (
    r"src\serializer\src\bin\Release\net8.0\SimaiSerializerFromMajdataEdit.exe "
    r"data\maichart-converts data\serialized"
)
print(command)

src\serializer\src\bin\Release\net8.0\SimaiSerializerFromMajdataEdit.exe data\maichart-converts data\serialized


该工具的通用命令格式为： `SimaiSerializerFromMajdataEdit.exe <输入文件或目录> <输出目录>`

执行完毕后，我们将得到包含 note 序列数据的 JSON 文件，每个文件对应一个特定难度的谱面。

**TODO**：
- 运行序列化工具并检查输出结果
- 验证生成的 JSON 文件结构
- 统计不同谱面的 note 数量分布，为序列长度标准化做准备


### 2.2 处理谱面标签数据

从 maimai-songs 库的 songs.json 中提取训练标签：
- **歌曲ID**：song_id（json中为id）
- **难度序号**：level_index（在json中并未显式标明，charts中依次对应level_index 1-5的数据）
- **难度定数**：difficulty_constant（json中为level）- 这是我们的预测目标

**TODO**：
- 提取标签数据并与序列化的谱面数据进行匹配
- 处理缺失的难度定数（null值）
- 过滤掉六位数ID的宴谱数据
- 从 flevel.json 中获取拟合等级数据作为辅助信息
- 验证标签与谱面文件的一一对应关系

In [None]:
def extract_song_info(json_file_path):
    # 添加文件存在性检查
    if not os.path.exists(json_file_path):
        print(f"错误：文件不存在 - {json_file_path}")
        print("请检查：")
        print(f"1. 文件实际位置（当前工作目录：{os.getcwd()}）")
        print("2. 路径是否正确（注意大小写）")
        sys.exit(1)  # 退出程序
    
    # 读取JSON文件
    with open(json_file_path, 'r', encoding='utf-8') as json_file:
        songs_data = json.load(json_file)
    
    # 提取所需信息
    extracted_info = []
    for song in songs_data:
        song_id = song.get('id')
        charts = song.get('charts', [])
        
        for level_index, chart in enumerate(charts, start=1):
            difficulty_constant = chart.get('level')
            extracted_info.append({
                'song_id': song_id,
                'level_index': level_index,
                'difficulty_constant': difficulty_constant
            })
    
    return extracted_info

def write_to_csv(data, csv_file_path):
    # 创建输出目录（如果不存在）
    os.makedirs(os.path.dirname(csv_file_path), exist_ok=True)
    
    # 写入CSV文件
    with open(csv_file_path, 'w', newline='', encoding='utf-8') as csv_file:
        fieldnames = ['song_id', 'level_index', 'difficulty_constant']
        writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
        
        writer.writeheader()
        for item in data:
            writer.writerow(item)

# 使用跨平台路径处理
base_dir = os.path.dirname(os.path.abspath(''))  # 获取当前工作目录
json_path = os.path.join(base_dir, "data", "maimai-songs", "songs.json")  # 修正路径
csv_path = os.path.join(base_dir, "data", "song_info.csv")  # 修正路径

# print(f"尝试读取JSON文件: {json_path}")
# extracted_data = extract_song_info(json_path)
# write_to_csv(extracted_data, csv_path)
# print(f"成功提取 {len(extracted_data)} 条记录，已写入 {csv_path}")

尝试读取JSON文件: d:\wushuopei\code\BMK-mdp\data\maimai-songs\songs.json
成功提取 6442 条记录，已写入 d:\wushuopei\code\BMK-mdp\data\song_info.csv


## 3. 序列预处理与特征编码

不同于传统的特征工程方法，我们直接使用原始的 note 序列数据。主要任务是将 note 属性转换为数值向量，并处理序列长度不一致的问题。

### 3.0 构建自定义Dataset类
我们创建一个自定义Dataset，存储json文件的位置以及csv的位置。
在Dataset中，需要实现：
1. `__init__()`：初始化函数，传入serialized目录以及csv文件位置。
    - 需要存储每个json的路径
2. `__len__()`：返回数据集的长度。
3. `__getitem__()`：返回数据集中的第i个样本。直接返回tensor
    - 在`__getitem__()`中才读取json文件，并返回tensor
    - 读取json文件，然后再去csv中找对应`(song_id,level_index)`的行
    - index顺序是什么

In [None]:
import glob
class MaichartDataset(Dataset):
    def __init__(self, serialized_dir, labels_csv):
        self.serialized_dir = serialized_dir
        self.labels_data = pd.read_csv(labels_csv)

        cleaned_data = self.labels_data.dropna(subset=['song_id', 'level_index', 'difficulty_constant'])

        self.label_map = (
            cleaned_data.astype({'song_id': int, 'level_index': int, 'difficulty_constant': float})
            .set_index(['song_id', 'level_index'])['difficulty_constant']
            .to_dict()
        )

        # TouchArea映射
        self.touch_area_mapping = {" ": 0, "A": 1, "D": 2, "E": 3, "B": 4, "C": 5} # 从外到内

        # 初始化编码器
        self._setup_encoders()
        
        # 获取JSON文件路径
        self.json_paths = glob.glob(os.path.join(serialized_dir, "*.json"))

    def _setup_encoders(self):
        """设置note类型和位置的编码器"""
        # Note类型编码器
        NOTE_TYPES = ['Tap', 'Hold', 'Slide', 'Touch', 'TouchHold']
        self.note_type_encoder = OneHotEncoder(
            sparse_output=False,
            dtype=np.float32,
            handle_unknown='ignore'
            )

        self.note_type_onehot.fit(np.array(NOTE_TYPES).reshape(-1, 1))
        
        # 位置编码器（假设位置范围是1-8）
        positions = list(range(1, 9))  # maimai有8个位置
        self.position_encoder = OneHotEncoder(
            sparse_output=False,
            dtype=np.float32,
            handle_unknown='ignore'
        )

    def _encode_note_type(self, note_type):
        """将note类型编码为one-hot向量"""
        return self.note_type_encoder.transform([[note_type]])[0]
    
    def _encode_position(self, position):
        """将位置编码为one-hot向量"""
        return self.position_encoder.transform([[position]])[0]

    def __getitem__(self, index):
        json_file_name = self.json_paths[index]
        json_file_path = os.path.join(self.serialized_dir, json_file_name)
        
        with open(json_file_path, 'r', encoding='utf-8') as f:
            try:
                json_data = json.load(f)
            except json.JSONDecodeError as e:
                raise ValueError(f"JSON解析失败: {json_file_path}") from e
            
            # 获取定数标签
            song_id = int(json_data['song_id'])
            level_index = int(json_data['level_index'])
            difficulty_constant = self.label_map.get((song_id, level_index))
            if difficulty_constant is None:
                raise ValueError(f"找不到对应的难度定数: song_id={song_id}, level_index={level_index}")

            # 加载谱面数据
            note_groups = json_data.get('notes', [])
            note_features_sequence = []
            for note_group in note_groups:
                time = note_group['Time']
                notes = note_group['Notes']
                for note in notes:
                    note_type = note['Type']
                    # 将note类型转换为数值编码
                    note_type_encoded = self._encode_note_type(note_type)
                    position = note['startPosition']
                    # 将位置转换为数值编码
                    position_encoded = self._encode_position(position)
                    hold_time = note.get('holdTime', 0)
                    is_break = int(note['isBreak'])
                    is_ex = int(note['isEx'])
                    is_slide_break = int(note['isSlideBreak'])
                    slide_start_time = note['slideStartTime']
                    slide_end_time = slide_start_time + note['slideTime']
                    touch_area = self.touch_area_mapping[note['touchArea']]
                    feature_vector = np.concatenate([
                        [time],             # 1维
                        note_type_encoded,  # 5维
                        position_encoded,   # 8维
                        [hold_time],        # 1维
                        [is_break],         # 1维
                        [is_ex],            # 1维
                        [is_slide_break],   # 1维
                        [slide_start_time], # 1维
                        [slide_end_time],   # 1维
                        [touch_area]        # 1维
                    ]) # 总共 17维
                    note_features_sequence.append(feature_vector)

        # 将谱面数据转换为张量
        note_features_tensor = torch.from_numpy(np.array(note_features_sequence, dtype=np.float32))
        difficulty_constant_tensor = torch.tensor(difficulty_constant, dtype=torch.float32)
        return note_features_tensor, difficulty_constant_tensor

    def __len__(self):
        return len(self.json_paths)

In [13]:
import os

base_dir = os.path.dirname(os.path.abspath(''))  # 获取当前工作目录
serialized_dir = os.path.join(base_dir, "data", "serialized")
if not os.path.exists(serialized_dir):
	print(f"目录不存在: {serialized_dir} (当前工作目录: {os.getcwd()})")
	json_paths = []
else:
	json_paths = os.listdir(serialized_dir)
json_paths[0:10]

['100018_LOVEYOU_6.json',
 '100018_LOVEYOU_DX_6.json',
 '100022_INCHAOS_6.json',
 '100022_INCHAOS_DX_6.json',
 '100114_SPACEHARRIERMAINTHEMEREBORN_6.json',
 '100199_チルノノハアフエクトサンスウキヨウシツ_6.json',
 '100199_チルノノハアフエクトサンスウキヨウシツ_DX_6.json',
 '100206_シユワスハタイサクセン_6.json',
 '100206_シユワスハタイサクセン_DX_6.json',
 '10021_コネクト_1.json']

### 3.1 Note 属性编码

将每个 note 的属性编码为固定维度的向量：

**基础特征（每个 note）**：
- **时间特征**：相对时间戳（相对于谱面开始时间）
- **类型特征**：note 类型的 one-hot 编码（tap, hold, slide, break 等）
- **位置特征**：按键位置（1-8）的 one-hot 编码或数值编码
- **持续时间**：对于 hold 类型的 note，编码其持续时间
- **时间间隔**（可选）：与前一个 note 的时间间隔

**序列处理**：
- **序列长度标准化**：使用 padding 或截断将所有序列调整为相同长度
- **序列归一化**：对时间特征进行归一化处理（暂不处理）
- **序列排序**：确保 note 按时间顺序排列（好像不需要）

**TODO**：
- 设计 note 特征的编码方案
- 确定最佳的序列长度
- 实现序列预处理管道
- 考虑是否需要添加全局特征（如 BPM、总时长等）

## 4. LSTM 模型构建与数据准备

构建基于 LSTM 的时序模型来处理 note 序列数据。模型将接收形状为 `(batch_size, sequence_length, feature_dim)` 的输入，输出难度定数的预测值。

**模型架构设计**：
- **输入层**：接收编码后的 note 序列
- **LSTM层**：捕捉序列中的时序依赖关系
- **全连接层**：将 LSTM 输出映射到难度预测
- **输出层**：回归输出，预测难度定数

In [None]:
# # 合并特征和标签
# full_df = pd.merge(feature_df, label_df, on='song_id')
#
# # 分离特征和目标变量
# X = full_df.drop(['song_id', 'difficulty_constant'], axis=1).values
# y = full_df['difficulty_constant'].values
#
# # 数据标准化
# scaler = StandardScaler()
# X_scaled = scaler.fit_transform(X)
#
# # 划分训练集和测试集
# X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2, random_state=42)
#
# # 转换为 PyTorch Tensors
# X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
# y_train_tensor = torch.tensor(y_train, dtype=torch.float32).view(-1, 1)
# X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
# y_test_tensor = torch.tensor(y_test, dtype=torch.float32).view(-1, 1)

### 4.1 定义 LSTM 模型架构

**模型设计考虑**：
- **多层 LSTM**：评估单层 vs 多层 LSTM 的效果
- **Dropout**：防止过拟合
- **Attention 机制**：突出重要的 note 序列部分

**TODO**：
- 实现基础的 LSTM 模型类
- 设计模型的超参数（hidden_size, num_layers, dropout_rate）
- 考虑添加注意力机制
- 实验不同的模型架构

In [None]:
# class DifficultyPredictor(nn.Module):
#     def __init__(self, input_features):
#         super(DifficultyPredictor, self).__init__()
#         self.layer1 = nn.Linear(input_features, 128)
#         self.layer2 = nn.Linear(128, 64)
#         self.output_layer = nn.Linear(64, 1)
#         self.relu = nn.ReLU()

#     def forward(self, x):
#         x = self.relu(self.layer1(x))
#         x = self.relu(self.layer2(x))
#         x = self.output_layer(x)
#         return x

# # model = DifficultyPredictor(X_train_tensor.shape[1])

## 5. 模型训练与优化

**训练策略**：
- **损失函数**：使用 MSE 或 MAE 损失函数（回归任务）
- **优化器**：Adam 优化器，考虑学习率调度
- **批次处理**：合理设置 batch_size 处理变长序列
- **正则化**：Dropout + L2 正则化防止过拟合

**训练监控**：
- 训练损失和验证损失曲线
- 早停机制防止过拟合
- 学习率衰减策略

**TODO**：
- 实现训练循环
- 设置验证集监控
- 实现早停和模型保存机制
- 调试序列批次处理中的 padding 问题
- 优化训练超参数

In [None]:
# # 定义损失函数和优化器
# criterion = nn.MSELoss()
# optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
#
# # 训练循环
# epochs = 100
# for epoch in range(epochs):
#     model.train()
#     optimizer.zero_grad()
#     outputs = model(X_train_tensor)
#     loss = criterion(outputs, y_train_tensor)
#     loss.backward()
#     optimizer.step()
#
#     if (epoch+1) % 10 == 0:
#         print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}')

## 6. 模型评估与性能分析

**评估指标**：
- **回归指标**：MSE, MAE, R²
- **难度区间准确性**：预测值在真实值 ±0.1, ±0.2, ±0.5 范围内的比例
- **分布分析**：预测值与真实值的分布对比

**详细分析**：
- **不同难度等级的预测准确性**：分析模型在低难度 vs 高难度谱面上的表现
- **序列长度影响**：分析谱面长度对预测准确性的影响
- **错误案例分析**：找出预测偏差较大的谱面特征

**TODO**：
- 实现全面的评估指标计算
- 可视化预测结果分布
- 分析不同难度区间的预测准确性
- 进行错误案例的深入分析
- 与传统特征工程方法进行对比

In [None]:
# model.eval()
# with torch.no_grad():
#     predictions = model(X_test_tensor)
#     test_loss = criterion(predictions, y_test_tensor)
#     print(f'Test Loss: {test_loss.item():.4f}')
#
# # 可以在这里添加更详细的评估指标，例如 MAE, R^2 等

## 7. 结果分析与模型迭代

**深度分析**：
- **时序特征的重要性**：LSTM 是否有效捕捉了时序信息
- **不同 note 类型的影响**：哪些类型的 note 对难度预测更重要
- **序列长度 vs 准确性**：最优的序列长度设置
- **模型复杂度 vs 性能**：单层 vs 多层 LSTM 的权衡

**模型优化方向**：
- **架构改进**：考虑 Transformer、CNN-LSTM 混合架构
- **特征增强**：是否需要添加手工特征作为辅助
- **数据增强**：通过时间扭曲、音符变换等方式增加训练数据
- **多任务学习**：同时预测难度和其他属性（如技巧需求）

**TODO**：
- 深入分析 LSTM 学到的时序模式
- 可视化注意力权重（如果使用了注意力机制）
- 比较不同模型架构的效果
- 设计更鲁棒的数据增强策略
- 考虑集成学习方法提升性能
- 为生产环境部署准备模型压缩和优化

分析模型的预测结果，与真实定数进行比较。

思考以下问题：
- 模型的误差主要来自哪些谱面？
- 是否有必要调整特征工程的方案？
- 是否需要更复杂的模型结构？

根据分析结果，回到前面的步骤进行迭代优化。

**关键思考问题**：

1. **时序建模的有效性**：
   - LSTM 是否真的比传统统计特征更有效？
   - 谱面的时序特征对难度的影响有多大？

2. **数据表示的完整性**：
   - 当前的 note 编码是否充分表达了游戏的复杂性？
   - 是否遗漏了重要的游戏机制信息？

3. **模型的可解释性**：
   - 如何理解模型学到的难度判断规律？
   - 能否提取出可解释的难度评估规则？

4. **实际应用价值**：
   - 模型的预测精度是否满足实际需求？
   - 如何将模型集成到谱面制作工具中？

**下一步迭代方向**：
根据实验结果，有针对性地改进数据处理、模型架构或训练策略，最终目标是构建一个既准确又实用的难度预测系统。