In [1]:
# 导入必要的库
from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score
import pandas as pd
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import roc_auc_score
from sklearn.tree import DecisionTreeClassifier
import torch
import torch.nn as nn
import numpy as np
import random
from sklearn.model_selection import StratifiedKFold, cross_val_predict
from sklearn.model_selection import cross_val_score
from sklearn.neural_network import MLPClassifier
from sklearn import metrics
from sklearn.preprocessing import MinMaxScaler
from sklearn.decomposition import PCA
from sklearn.impute import KNNImputer
import torch.nn.functional as F
import import_ipynb
import network
from torch.nn import Parameter
from torch.nn.init import xavier_normal_

importing Jupyter notebook from network.ipynb
importing Jupyter notebook from utils.ipynb


In [2]:
seed=42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

In [3]:
def cosine_distance_torch(x1, x2=None, eps=1e-8):
    x2 = x1 if x2 is None else x2
    w1 = x1.norm(p=2, dim=1, keepdim=True)
    w2 = w1 if x2 is x1 else x2.norm(p=2, dim=1, keepdim=True)
    return 1 - torch.mm(x1, x2.t()) / (w1 * w2.t()).clamp(min=eps)

In [4]:
def cal_adj_mat_parameter(edge_per_node, data, metric="cosine"):
    assert metric == "cosine", "Only cosine distance implemented"
    dist = cosine_distance_torch(data, data)
    parameter = torch.sort(dist.reshape(-1,)).values[edge_per_node*data.shape[0]]
    return np.squeeze(parameter.data.cpu().numpy())

In [5]:
def graph_from_dist_tensor(dist, parameter, self_dist=True):
    if self_dist:
        assert dist.shape[0] == dist.shape[1], "Input is not pairwise dist matrix"

    # 将 NumPy 数组转换为 PyTorch 张量
    dist_tensor = dist.clone().detach()
    parameter_tensor = torch.tensor(parameter)  # 将参数也转换为 PyTorch 张量

    # 在进行比较操作时，确保使用 PyTorch 的方法
    g = (dist_tensor <= parameter_tensor).float()

    if self_dist:
        diag_idx = torch.arange(g.shape[0])
        g[diag_idx, diag_idx] = 0

    return g

In [6]:
def gen_adj_mat_tensor(data, parameter, metric="cosine"):
    assert metric == "cosine", "Only cosine distance implemented"
    dist = cosine_distance_torch(data, data)
    g = graph_from_dist_tensor(dist, parameter, self_dist=True)
    if metric == "cosine":
        adj = 1-dist
    else:
        raise NotImplementedError
    adj = adj*g 
    adj_T = adj.transpose(0,1)
    I = torch.eye(adj.shape[0])
    adj = adj + adj_T*(adj_T > adj).float() - adj*(adj_T > adj).float()
    adj = F.normalize(adj + I, p=1)
    
    return adj

In [7]:
def gen_trte_adj_mat(X_train, adj_parameter):
    adj_metric = "cosine" # cosine distance
    adj_train = []
    adj_parameter_adaptive = cal_adj_mat_parameter(adj_parameter, X_train, adj_metric)
    adj_train = gen_adj_mat_tensor(X_train, adj_parameter_adaptive, adj_metric)
    
    return adj_train

In [8]:
def cal_sample_weight(labels, num_class, use_sample_weight=True):
    labels_np = labels.numpy()  # 将 PyTorch 张量转换为 NumPy 数组
    if not use_sample_weight:
        return np.ones(len(labels_np)) / len(labels_np)
    count = np.zeros(num_class)
    for i in range(num_class):
        count[i] = np.sum(labels_np == i)
    sample_weight = np.zeros(labels_np.shape)
    for i in range(num_class):
        sample_weight[np.where(labels_np == i)[0]] = count[i] / len(labels_np)
    return torch.FloatTensor(sample_weight)  # 将 NumPy 数组转换回 PyTorch 张量

In [9]:
def one_hot_tensor(y, num_dim):
    y_onehot = torch.zeros(y.shape[0], num_dim)
    y_onehot.scatter_(1, y.view(-1,1), 1)
    
    return y_onehot

In [None]:
def batch_dataframe_to_tensors(*dataframes, device=None):
    """
    批量将多个 pandas DataFrame 转换为 torch Tensors 并移动到指定设备。

    参数:
        *dataframes: 任意数量的 pandas DataFrame 对象
        device (torch.device, optional): 目标设备（例如 'cpu' 或 'cuda'），如果为 None，则自动检测设备
    
    返回:
        list: 一个包含转换后 tensors 的列表
    """
    
    # 自动检测设备（优先选择 GPU，如果不可用则选择 CPU）
    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    tensors = []
    for i, df in enumerate(dataframes):
        try:
            if isinstance(df, pd.DataFrame):
                # 将 DataFrame 转换为 numpy 数组
                numpy_array = df.to_numpy()
                # 将 numpy 数组转换为 torch Tensor 并直接在目标设备上创建
                tensor = torch.tensor(numpy_array, dtype=torch.float32, device=device)
                tensors.append(tensor)
                print(f"DataFrame {i} 已转换为 Tensor，并移动到 {tensor.device}")
            else:
                raise ValueError(f"对象 {i} 不是 pandas.DataFrame 类型，无法转换")
        except Exception as e:
            print(f"转换对象 {i} 时出错: {e}")

    return tensors


In [11]:
def pad_collate(batch):
    # 获取当前批次的最大长度（假设第一个元素是需要填充的部分）
    max_length = max(len(item[0]) for item in batch)  

    padded_batch = []
    device = batch[0][0].device  # 获取第一个样本的设备

    for item in batch:
        # 填充到 max_length 长度，并确保在同一设备上
        padded_item = torch.cat([item[0].to(device), torch.zeros(max_length - len(item[0]), device=device)])
        padded_batch.append((padded_item, *item[1:]))  # 保留其他信息

    return tuple(zip(*padded_batch))  # 转换为适合 DataLoader 的格式


In [12]:
class LMF(nn.Module):
    '''
    Low-rank Multimodal Fusion (去掉 omics 输入)
    '''
    def __init__(self, input_dims, rank, use_softmax=False):
        '''
        Args:
            input_dims - a length-3 tuple, contains (audio_dim, video_dim, text_dim)
            rank - int, specifying the size of rank in LMF
            use_softmax - boolean, whether to apply softmax to the final output
        '''
        super(LMF, self).__init__()

        # 现在只有三种输入数据：audio, video, text
        self.audio_in = input_dims
        self.video_in = input_dims
        self.text_in = input_dims

        self.output_dim = self.audio_in  # 假设输出维度与 audio 输入维度一致
        self.rank = rank
        self.use_softmax = use_softmax

        # 为每个输入创建低秩因子
        self.audio_factor = Parameter(torch.Tensor(self.rank, self.audio_in + 1, self.output_dim))
        self.video_factor = Parameter(torch.Tensor(self.rank, self.video_in + 1, self.output_dim))
        self.text_factor = Parameter(torch.Tensor(self.rank, self.text_in + 1, self.output_dim))

        # 融合权重和偏置
        self.fusion_weights = Parameter(torch.Tensor(1, self.rank))
        self.fusion_bias = Parameter(torch.Tensor(1, self.output_dim))

        # 初始化所有因子
        xavier_normal_(self.audio_factor)
        xavier_normal_(self.video_factor)
        xavier_normal_(self.text_factor)
        xavier_normal_(self.fusion_weights)
        self.fusion_bias.data.fill_(0)

    def forward(self, audio_x, video_x, text_x):
        '''
        Args:
            audio_x: tensor of shape (batch_size, audio_in)
            video_x: tensor of shape (batch_size, video_in)
            text_x: tensor of shape (batch_size, sequence_len, text_in)
        '''
        audio_h = audio_x
        video_h = video_x
        text_h = text_x
        batch_size = audio_h.data.shape[0]

        # 判断数据是否在 GPU 上
        if audio_h.is_cuda:
            DTYPE = torch.cuda.FloatTensor
        else:
            DTYPE = torch.FloatTensor

        # 为每个输入数据添加偏置项
        _audio_h = torch.cat((torch.ones(batch_size, 1).type(DTYPE), audio_h), dim=1)
        _video_h = torch.cat((torch.ones(batch_size, 1).type(DTYPE), video_h), dim=1)
        _text_h = torch.cat((torch.ones(batch_size, 1).type(DTYPE), text_h), dim=1)

        # 分别对每种输入进行低秩因子分解
        fusion_audio = torch.matmul(_audio_h, self.audio_factor)
        fusion_video = torch.matmul(_video_h, self.video_factor)
        fusion_text = torch.matmul(_text_h, self.text_factor)

        # 低秩多模态融合：将三个输入进行逐元素相乘
        fusion_zy = fusion_audio * fusion_video * fusion_text

        # 使用融合权重进行加权
        output = torch.matmul(self.fusion_weights, fusion_zy.permute(1, 0, 2)).squeeze() + self.fusion_bias
        output = output.view(-1, self.output_dim)
        
        # 如果需要使用softmax，可以在最后应用
        if self.use_softmax:
            output = F.softmax(output, dim=-1)
        
        return output