# 前言

旨在在搭建模态缺失任务时的思考和测试代码

In [1]:
import os
import sys
import yaml
from pathlib import Path
import pandas as pd
import numpy as np
import torch

def add_project_root_to_sys_path():
    """动态添加项目根目录到 sys.path"""
    project_root = Path.cwd().resolve().parent
    if str(project_root) not in sys.path:
        sys.path.append(str(project_root))

add_project_root_to_sys_path()
# print('\n'.join(sys.path))

from common.utils import load_config
from data.Dataset import FeatureDataset
from data.LoadFeatures import DataFeatures
from models.Models import  MFAFESM
from common.process_graph import initialize_graph


## 寻找中间模态数据

In [2]:
config_path = Path.cwd().resolve().parent / 'config' / 'config.yaml'
config = load_config(config_path)
config['data'] = config['data']['HCI']
data = DataFeatures(
    data_path=config["data"]["data_path"],
    modalities=config["training"]["using_modalities"],
    subject_lists=config["data"]["subject_lists"],
    Norm="Z_score",
    label_type=config['data']['label_type'],
)

test_person = 0
config["training"]["missing_task"]["checkpoint_dir"] = os.path.join(
    config["training"]["missing_task"]["checkpoint_dir"],
    f"best_checkpoint_{test_person}.pth",
)

# 修改model配置
config['model'] = config['model']['MFAFESM']
config["model"]["classifier"]["nb_classes"] = config["num_classes"]
config["model"]["feature_extract"]["input_dim"] = config["data"]["input_dim"]

 # 根据模态修改输入维度
using_modality = config["training"]["using_modalities"]

input_size = config["data"]["input_size"]
new_input_size = []
if using_modality is not None:
    for modality in using_modality:
        if modality == "eeg":
            new_input_size.append(input_size[0])
        elif modality == "eye":
            new_input_size.append(input_size[1])
        elif modality == "au" or modality == "pps":
            new_input_size.append(input_size[2])
    config["training"]["using_modalities"] = using_modality

    config["data"]["input_size"] = new_input_size
    config["model"]["feature_align"]["input_size"] = new_input_size

d_model = config["model"]["fusion"]["d_model"]
# 根据模态修改模型维度（zhe'k
swell = 1
if config["model"]["type"] == "major_modality_fusion":
    if len(using_modality) > 1:
        swell = 2
elif config["model"]["type"] == "iterative_fusion":
    swell = 3
elif config["model"]["type"] == "full_compose_fusion":
    swell = 6
elif config["model"]["type"] == "add_fusion":
    swell = 1.5
config["model"]["fusion"]["d_model"] = d_model * int(swell * 2)
config["model"]["attention_encoder"]["d_model"] = d_model * int(swell * 2)
print(config['model'])

dict_keys(['eeg', 'eye', 'pps', 'arousal_label', 'valence_label', 'subject_list', 'ch_info', 'info'])
EEG: 1-70filer, 50Hz notch, 256Hz resample;
Subject : 1-30 subject, no 3,9,12,15,16,25 subject; 24 person, 20 trial, 32 EEG channel, 7 pps channels;
Labels :    Arousal: 0:["Sadness", "Disgust", "Neutral"]、1:["Joy, Happiness", "Amusement"] 2:["Surprise", "Fear", "Anger", "Anxiety"]
    Valence: 0:["Fear", "Anger", "Disgust", "Sadness", "Anxiety"] 1:["Surprise", "Neutral"] 2:["Joy, Happiness", "Amusement"]
Eye Track data : DistanceLeft，PupilLeft，ValidityLeft，Distance Right，Pupil Right，Validity Right，Fixation Index，Gaze Point X，Gaze Point Y，Fixation Duration
     PPS data : ECG, GSR，Resp，Temp，Status
开始加载眼动特征...


Processing Subjects: 100%|██████████| 24/24 [00:00<00:00, 3799.33it/s]


眼动特征加载完成。
开始加载PPS特征...


Processing Subjects: 100%|██████████| 24/24 [00:00<00:00, 1657.36it/s]

PPS特征加载完成。
{'type': 'iterative_fusion', 'feature_extract': {'input_dim': 585, 'hidden_dim': 160, 'tok': 0.5}, 'feature_align': {'input_size': [960, 38, 230], 'embed_dim': 160, 'seq_len': 10}, 'fusion': {'embed_dim': 160, 'num_heads': 8, 'd_model': 960}, 'attention_encoder': {'num_layers': 6, 'd_model': 960, 'num_heads': 8, 'd_ff': 2048, 'dropout': 0.1, 'embed_dim': 160}, 'classifier': {'nb_classes': 3, 'embed_dim': 1600}}





In [3]:
trainSet = FeatureDataset(
    data,
    ex_nums = config['data']['ex_nums'],
    mode = "train",
    test_person = test_person,
    config=config,
)

In [6]:
trainSet

<data.Dataset.FeatureDataset at 0x7f4e522f3820>

### 找到小搓代表性数据

In [None]:
labels = trainSet.labels
# print(labels)
labels = np.array(labels)
# 获取每个类别的索引
category_indices = {category: np.where(labels == category)[0] for category in np.unique(labels)}

# 设定每个类别要选取的样本数
samples_per_category = 64 // len(category_indices)  # 每个类别选取相同数量的样本

# 随机选取每个类别的索引
selected_indices = []
for category, indices in category_indices.items():
    np.random.seed(42)  # 确保每次运行结果一致
    selected_indices.extend(np.random.choice(indices, samples_per_category, replace=False))

# 输出选中的索引
selected_indices = np.array(selected_indices)
print("Selected Indices: ", selected_indices)
print(labels[selected_indices])

print(np.unique(labels[selected_indices], return_counts=True))

In [None]:
new_data = {}
for key, value in trainSet.features.items():
    new_data[key] = value[selected_indices]

for key, value in new_data.items():
    print(key, value.shape)

### 根据小搓数据加载模型中间输出数据

In [None]:
model = MFAFESM(config['model'])
model = model.cuda()
print(model)

In [None]:
checkpoint = torch.load(config["training"]["missing_task"]["checkpoint_dir"])
model.load_state_dict(checkpoint["model_state_dict"])

In [13]:
device = "cuda" if torch.cuda.is_available() else "cpu"


def process_input(inputs):
    """处理输入数据，确保其正确放到设备上，并且转换成torch.float32"""
    return_inputs = {}
    for key in using_modality:
        if inputs.get(key, None) is not None:
            if not isinstance(inputs[key], torch.Tensor):
                inputs[key] = torch.tensor(inputs[key])
            return_inputs[key] = inputs[key].to(device).to(torch.float32)
        else:
            return_inputs[key] = None
    return return_inputs.values()

In [None]:
# 根据加载的模型，获取中间层的输出
model.eval()
with torch.no_grad():
    adj, graph_indicator = initialize_graph(config['data'], data_len = 63, device = device)
    eeg, eye, au = process_input(new_data)

    result, fused_features, encoded_features = model(adj, graph_indicator, eeg, eye, au, pps=None)

In [None]:
print(result.shape)
print(fused_features.shape)
print(encoded_features.shape)

In [None]:
selected_indices

## 模型参数查看

In [None]:
# from models.Models import MFAFESM
# model = MFAFESM(config['model'], config['training'], config['data'])
import torch
model = torch.load(config["training"]["missing_task"]["checkpoint_dir"])
print(model.keys()) # dict_keys(['epoch', 'model_state_dict', 'optimizer_state_dict'])

# 这么来看的话还是需要初始化模型
model_state_dict = model['model_state_dict']
for name, param in model_state_dict.items():
    print(name, param.size())

In [None]:
# 拿到融合部分的attention块权重可视化成热图看看
fn_weights = model_state_dict['fusion.fn.weight'] + model_state_dict['fusion.fn.bias']
print(fn_weights.size())
fn_weights = fn_weights.cpu().numpy()
import matplotlib.pyplot as plt
plt.imshow(fn_weights, cmap='hot', interpolation='nearest')
plt.show()

In [None]:
# 画一下直方图
plt.hist(fn_weights.flatten(), bins=100)
plt.show()

In [None]:
# 获取浅层encoder和深层encoder的ffn层的权重
shallow_ffn_weights = model_state_dict['attention_encoder.encoder_layer.0.ff.linear_2.weight'] 
deep_ffn_weights = model_state_dict['attention_encoder.encoder_layer.5.ff.linear_2.weight'] 
# 可视化热图

plt.imshow(shallow_ffn_weights.cpu().numpy(), cmap='hot', interpolation='nearest')
plt.show()

plt.imshow(deep_ffn_weights.cpu().numpy(), cmap='hot', interpolation='nearest')
plt.show()

In [None]:
# 看下直方图
plt.hist(shallow_ffn_weights.cpu().numpy().flatten(), bins=100)
plt.hist(deep_ffn_weights.cpu().numpy().flatten(), bins=100, color='r')
plt.hist(fn_weights.flatten(), bins=100)
plt.legend(['shallow', 'deep', 'fusion'])
plt.show()


# 将两种直方图合并到一起，采用不同的颜色