In [1]:
import json
from pathlib import Path
from sklearn.metrics import f1_score

def flatten(list_of_list):
    return [item for sublist in list_of_list for item in sublist]

path_to_training = Path("training")
path_to_test = Path("test")

def split_dataset(validate=False):
    training_set = ['ES2002', 'ES2005', 'ES2006', 'ES2007', 'ES2008', 'ES2009', 'ES2010', 'ES2012', 'ES2013', 'ES2015', 'ES2016', 'IS1000', 'IS1001', 'IS1002', 'IS1003', 'IS1004', 'IS1005', 'IS1006', 'IS1007', 'TS3005', 'TS3008', 'TS3009', 'TS3010', 'TS3011', 'TS3012']
    training_set = flatten([[m_id+s_id for s_id in 'abcd'] for m_id in training_set])
    training_set.remove('IS1002a')
    training_set.remove('IS1005d')
    training_set.remove('TS3012c')

    test_set = ['ES2003', 'ES2004', 'ES2011', 'ES2014', 'IS1008', 'IS1009', 'TS3003', 'TS3004', 'TS3006', 'TS3007']
    test_set = flatten([[m_id+s_id for s_id in 'abcd'] for m_id in test_set])
    
    if validate:
        # randomly select 10% of training set as validation set
        import random
        random.seed(6969)
        validate_set = random.choices(training_set, k=int(len(training_set)*0.1))
        training_set = list(set(training_set) - set(validate_set))
        return training_set, validate_set, test_set

    return training_set, test_set

from sentence_transformers import SentenceTransformer
import networkx as nx
bert = SentenceTransformer('all-MiniLM-L6-v2')


def get_text_feature(dataset, path, show_progress_bar=True):
    text_feature = []
    for transcription_id in dataset:
        with open(path / f"{transcription_id}.json", "r") as text_file:
            transcription = json.load(text_file)
        
        for utterance in transcription:
            text_feature.append(utterance["speaker"] + ": " + utterance["text"])

    text_feature = bert.encode(text_feature, show_progress_bar=show_progress_bar)
    return text_feature


def get_graph_feature(dataset, path, relation_mapping=None):
    graph_feature = []
    
    if relation_mapping is None:
        relation_mapping = {'nan': 0}  # 将np.nan映射为0
        next_relation_id = 1
    
    for transcription_id in dataset:       
        with open(path / f"{transcription_id}.txt", "r") as graph_file:
            edges = []
            relations = []
            for line in graph_file:
                parts = line.split()
                source, relation, target = int(parts[0]), parts[1], int(parts[2])
                
                if relation not in relation_mapping:
                    relation_mapping[relation] = next_relation_id
                    next_relation_id += 1
                
                edges.append((source, target, {'relation': relation}))
                relations.append(relation_mapping[relation])
            
        G = nx.DiGraph()
        G.add_edges_from(edges)
        
        node_degrees = dict(G.degree())
        
        # 添加中心性度量，这里以度中心性为例
        degree_centrality = nx.degree_centrality(G)
        
        # 处理叶子节点，将关系设置为nan
        for node in G.nodes:
            if G.out_degree(node) == 0:
                relations.append(0)  # 将np.nan映射为0
        
        # 组合节点的度、关系、和中心性度量
        combined_feature = list(zip(node_degrees.values(), relations, degree_centrality.values()))
        graph_feature.extend(combined_feature)
    
    return graph_feature, relation_mapping



def get_label(dataset, label_file):
    labels = []
    with open(label_file, "r") as file:
        all_labels = json.load(file)
    for transcription_id in dataset:  
        labels += all_labels[transcription_id]
    return labels

In [2]:
training_set, test_set = split_dataset()

text_feature_training = get_text_feature(training_set, path_to_training)
graph_feature_training, relation_mapping = get_graph_feature(training_set, path_to_training)
y_training = get_label(training_set, "training_labels.json")

Batches:   0%|          | 0/2270 [00:00<?, ?it/s]

In [3]:
import torch
import torch.nn as nn

class LSTMClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(LSTMClassifier, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        # 取最后一个时间步的输出
        lstm_out = lstm_out[:, -1, :]
        output = self.fc(lstm_out)
        output = self.sigmoid(output)
        return output

In [4]:
from torch.utils.data import TensorDataset, DataLoader
import torch.optim as optim

# 定义模型的超参数
sequence_length = 10 # 假设你的每个对话的长度为 sequence_length
input_size = text_feature_training.shape[1]
hidden_size = 64
num_layers = 2
output_size = 1  # 二元分类任务

# 初始化模型
model = LSTMClassifier(input_size, hidden_size, num_layers, output_size)

# 定义损失函数和优化器
criterion = nn.BCELoss()  # 二元交叉熵损失
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 转换为 PyTorch 张量
X_training_tensor = torch.tensor(text_feature_training)
y_training_tensor = torch.tensor(y_training, dtype=torch.int)

# 创建 TensorDataset
train_dataset = TensorDataset(X_training_tensor, y_training_tensor)

# 使用 DataLoader 加载数据
batch_size = 64  # 你可以根据需要调整批量大小
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# 模型训练
num_epochs = 25
for epoch in range(num_epochs):
    for inputs, labels in train_dataloader:
        inputs = inputs.unsqueeze(1)
        labels = labels.unsqueeze(1)
        # 梯度清零
        optimizer.zero_grad()
        
        # 前向传播
        outputs = model(inputs)
        
        # 计算损失
        loss = criterion(outputs, labels.float())
        
        # 反向传播
        loss.backward()
        
        # 参数更新
        optimizer.step()

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

Epoch [1/25], Loss: 0.2735
Epoch [2/25], Loss: 0.1896
Epoch [3/25], Loss: 0.2879
Epoch [4/25], Loss: 0.2352
Epoch [5/25], Loss: 0.3023
Epoch [6/25], Loss: 0.2143
Epoch [7/25], Loss: 0.2124
Epoch [8/25], Loss: 0.2253
Epoch [9/25], Loss: 0.3078
Epoch [10/25], Loss: 0.2805
Epoch [11/25], Loss: 0.3034
Epoch [12/25], Loss: 0.2427
Epoch [13/25], Loss: 0.2101
Epoch [14/25], Loss: 0.2816
Epoch [15/25], Loss: 0.2404
Epoch [16/25], Loss: 0.2615
Epoch [17/25], Loss: 0.2608
Epoch [18/25], Loss: 0.1979
Epoch [19/25], Loss: 0.2443
Epoch [20/25], Loss: 0.1346
Epoch [21/25], Loss: 0.2300
Epoch [22/25], Loss: 0.2357
Epoch [23/25], Loss: 0.2844
Epoch [24/25], Loss: 0.2568
Epoch [25/25], Loss: 0.2332


In [5]:
from xgboost import XGBClassifier

clf = XGBClassifier(n_estimators=100, max_depth=25, objective='binary:logistic', n_jobs=-1, random_state=0)
clf.fit(graph_feature_training, y_training)


In [10]:
from sklearn.tree import DecisionTreeClassifier

clf = DecisionTreeClassifier(min_samples_leaf=1, min_samples_split=6, random_state=0)
clf.fit(graph_feature_training, y_training)

In [14]:
# 模型评估
model.eval()
y_text_pred = []
validate_dataloader = DataLoader(train_dataset, batch_size=1)

with torch.no_grad():
    for inputs, labels in validate_dataloader:
        inputs = inputs.unsqueeze(1)
        labels = labels.unsqueeze(1)
        # 预测
        outputs = model(inputs)
        predictions = (outputs >= 0.5).int()
        
        # 保存预测值和标签
        y_text_pred.extend(predictions.numpy().tolist())

: 

In [7]:
y_graph_pred = clf.predict(graph_feature_training).tolist()

In [12]:
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# 将 LSTM 和 XGBoost 的预测结果水平拼接
combined_predictions = np.column_stack((y_text_pred, y_graph_pred))

# 初始化逻辑回归模型（或其他模型）
logistic_model = LogisticRegression()

# 训练逻辑回归模型
logistic_model.fit(combined_predictions, y_training)

In [10]:
text_feature_test = get_text_feature(test_set, path_to_test)
graph_feature_test, _ = get_graph_feature(test_set, path_to_test, relation_mapping)

Batches:   0%|          | 0/970 [00:00<?, ?it/s]

In [13]:
test_labels = {}
with torch.no_grad():
    for transcription_id in test_set:
        with open(path_to_test / f"{transcription_id}.json", "r") as file:
            transcription = json.load(file)
        
        text_test = []
        for utterance in transcription:
            text_test.append(utterance["speaker"] + ": " + utterance["text"])
        
        text_test = bert.encode(text_test)
        text_test = torch.tensor(text_test).unsqueeze(1)

        outputs = model(text_test)
        y_text = (outputs >= 0.5).int()
        y_text = y_text.squeeze(1).tolist()
        
        with open(path_to_test / f"{transcription_id}.txt", "r") as graph_file:
            edges = []
            relations = []
            for line in graph_file:
                parts = line.split()
                source, relation, target = int(parts[0]), parts[1], int(parts[2])
                
                edges.append((source, target, {'relation': relation}))
                relations.append(relation_mapping[relation])
            
        G = nx.DiGraph()
        G.add_edges_from(edges)
        
        node_degrees = dict(G.degree())
        
        # 添加中心性度量，这里以度中心性为例
        degree_centrality = nx.degree_centrality(G)
        
        # 处理叶子节点，将关系设置为nan
        for node in G.nodes:
            if G.out_degree(node) == 0:
                relations.append(0)  # 将np.nan映射为0
        
        # 组合节点的度、关系、和中心性度量
        graph_test = list(zip(node_degrees.values(), relations, degree_centrality.values()))
        y_graph = clf.predict(graph_test).tolist()
        
        
        y_combined = np.column_stack((y_text, y_graph))
        
        final_predictions = logistic_model.predict(y_combined)
        
        
        
        
        
        
        test_labels[transcription_id] = final_predictions.tolist()

with open("test_labels_text_baseline.json", "w") as file:
    json.dump(test_labels, file, indent=4)