In [None]:
import pandas as pd
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer, BertModel
from skmultilearn.model_selection import IterativeStratification
import shutil
from torch import nn
import torch
from transformers import BertModel
import torch.nn.functional as F
import numpy as np

In [None]:
df_final=pd.read_excel('Dataset_label_Augment.xlsx')
df_final['target_list']=df_final['target_list'].apply(eval)

def multilabel_train_test_split(X, y, test_size=0.2, val_size=0.1, random_state=42):
    """
    多标签分层划分函数
    """
    # 第一次划分：分离测试集
    stratifier = IterativeStratification(n_splits=2, order=2,
                                        sample_distribution_per_fold=[test_size, 1-test_size])
    train_idx, test_idx = next(stratifier.split(X, y))
    
    # 第二次划分：从训练集分离验证集
    val_ratio = val_size / (1 - test_size)
    stratifier = IterativeStratification(n_splits=2, order=2,
                                        sample_distribution_per_fold=[val_ratio, 1-val_ratio])
    tr_idx, val_idx = next(stratifier.split(X[train_idx], y[train_idx]))
    
    return X[train_idx][tr_idx], X[train_idx][val_idx], X[test_idx], \
           y[train_idx][tr_idx], y[train_idx][val_idx], y[test_idx]

# 应用分层划分
texts = df_final['clean_text'].values
labels = np.array(df_final['target_list'].tolist())

X_train, X_val, X_test, y_train, y_val, y_test = multilabel_train_test_split(
    texts, labels, test_size=0.1, val_size=0.1
)

# 重建DataFrame
train_df = pd.DataFrame({'clean_text': X_train, 'target_list': list(y_train)})
val_df = pd.DataFrame({'clean_text': X_val, 'target_list': list(y_val)})
test_df = pd.DataFrame({'clean_text': X_test, 'target_list': list(y_test)})

In [None]:
def calculate_positive_ratios(df, label_column='target_list'):
 
    # 将标签列表转换为二维数组
    labels_array = np.array(df[label_column].tolist())
    
    # 验证标签维度
    assert labels_array.shape[1] == 34, f"应为34个标签，实际检测到 {labels_array.shape[1]} 个"
    
    # 计算每个标签的正样本比例
    positive_ratios = labels_array.mean(axis=0)
    
    # 转换为带标签的Series
    return pd.Series(positive_ratios, 
                   index=[f'Label_{i}' for i in range(34)],
                   name='Positive_Ratio')


ratio_series = calculate_positive_ratios(train_df)

alpha_series_clipped = (1 - ratio_series).clip(upper=0.99)

alpha_weights=alpha_series_clipped.to_list()

In [None]:
import torch
class Mydataset(Dataset):
    def __init__(self, df, max_len=512):
        self.tokenizer = BertTokenizer.from_pretrained("autodl-tmp/bert-base-uncased")
        self.texts = df['clean_text']
        self.labels = df['target_list']
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self, idx):
        text = self.texts[idx]
        label=self.labels[idx]

        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'        
        )
        
        return {
                'input_ids': encoding['input_ids'].flatten(),
                'attention_mask': encoding['attention_mask'].flatten(),
                'labels': torch.tensor(label, dtype=torch.float)
            }


In [None]:
train_dataset = Mydataset(train_df)
val_dataset = Mydataset(val_df)
test_dataset=Mydataset(test_df)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
class MyBert(nn.Module):
    def __init__(self,num_labels):
        super(MyBert,self).__init__()
        self.bert = BertModel.from_pretrained('autodl-tmp/bert-base-uncased')
        self.dropout = torch.nn.Dropout(0.2)
        self.classifier = nn.Linear(self.bert.config.hidden_size, num_labels)
   
    def forward(self, input_ids, attention_mask):      
        outputs = self.bert(input_ids, attention_mask)     
        pooled_output = outputs.pooler_output  
        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)
        return logits
 

In [None]:
model = MyBert(34)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

gpu_count = torch.cuda.device_count()
if gpu_count > 1:
    model = nn.DataParallel(model)

optimizer = torch.optim.Adam(params=model.parameters(), lr=1e-05)

In [None]:
class EarlyStopping:
    def __init__(self, patience=3, delta=0, path='best_model.pth'):
        self.patience = patience
        self.delta = delta
        self.path = path
        self.best_score = None
        self.early_stop = False
        self.counter = 0

    def __call__(self, val_loss, model):
        if self.best_score is None:
            self.best_score = val_loss
            self.save_model(model)
        elif val_loss > self.best_score + self.delta:
            self.counter += 1
            print(f"EarlyStopping counter: {self.counter}/{self.patience}")
            if self.counter >= self.patience:
                self.early_stop = True  
        else:
            self.best_score = val_loss
            self.save_model(model)
            self.counter = 0  

    def save_model(self, model):
        torch.save(model.state_dict(), self.path)
        print(f"Validation loss improved. Model saved to {self.path}")

early_stopping = EarlyStopping(patience=3, path='best_model.pth')



In [None]:
class FocalLoss(nn.Module):
    def __init__(self, alpha_per_class, gamma=2.0, reduction='mean'):

        super().__init__()
        
        if not isinstance(alpha_per_class, torch.Tensor):
            alpha_per_class = torch.tensor(alpha_per_class)
        
        self.alpha = alpha_per_class  
        self.gamma = gamma
        self.reduction = reduction
        self.eps = 1e-8  

    def forward(self, inputs, targets):

        bce_loss = F.binary_cross_entropy_with_logits(
            inputs, targets, reduction='none')  

        p_t = torch.exp(-bce_loss)   
        alpha_t = self.alpha.to(device=targets.device)  
        alpha_t = alpha_t[None, :]  
    
        focal_loss = alpha_t * (1 - p_t) ** self.gamma * bce_loss
        if self.reduction == 'mean':
            return focal_loss.mean()
        elif self.reduction == 'sum':
            return focal_loss.sum()
        else:
            return focal_loss

criterion = FocalLoss(alpha_per_class=alpha_weights, gamma=2)

In [None]:
from sklearn.metrics import accuracy_score, f1_score, classification_report

num_epochs = 16
for epoch in range(num_epochs):
    model.train()
    total_loss = 0

    for batch_idx,batch in enumerate(train_loader):
        input_ids=batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)

        logits=model(input_ids=input_ids,attention_mask=attention_mask)
        loss=criterion(logits,labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        
        if batch_idx % 100 == 0:
                print(f'Epoch: {epoch+1}, BATCH: {batch_idx}, Training Loss:  {loss.item()}')
    
    print('Epoch {}: Training End'.format(epoch+1))
    print('Epoch {}: Validation Start'.format(epoch+1))

    model.eval()
    predictions, true_labels = [], [] 
    val_loss = 0
    with torch.no_grad(): 
        for batch in val_loader:  
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device).float() 
            logits = model(input_ids=input_ids, attention_mask=attention_mask)
  
            loss = criterion(logits, labels)
            val_loss += loss.item() 
            probs = torch.sigmoid(logits) 
            preds = (probs > 0.5).int()
            predictions.extend(preds.cpu().numpy())
            true_labels.extend(labels.cpu().numpy())
    
    val_loss /= len(val_loader)
    accuracy = accuracy_score(true_labels, predictions)
    f1 = f1_score(true_labels, predictions, average='macro')

    print(f"Epoch {epoch + 1}, Training Loss: {total_loss / len(train_loader)}")
    print(f"Validation Loss: {val_loss}")
    print(f"Validation Accuracy: {accuracy}")
    print(f"Validation F1 Score: {f1}")

    early_stopping(val_loss, model)
    if early_stopping.early_stop:
        print("Early stopping triggered.")
        break


In [None]:
model.load_state_dict(torch.load('best_model.pth'))
print("Loaded best model for testing")

# 添加测试评估部分
from sklearn.metrics import hamming_loss, f1_score, average_precision_score

model.eval()
test_probs = []
test_labels = []

with torch.no_grad():
    for batch in test_loader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        
        logits = model(input_ids, attention_mask)
        probs = torch.sigmoid(logits)
        
        test_probs.append(probs.cpu().numpy())
        test_labels.append(labels.cpu().numpy())

test_probs = np.concatenate(test_probs)
test_labels = np.concatenate(test_labels)
test_preds = (test_probs > 0.5).astype(int)

# 计算评估指标
hl = hamming_loss(test_labels, test_preds)
micro_f1 = f1_score(test_labels, test_preds, average='micro')
macro_f1 = f1_score(test_labels, test_preds, average='macro')
auc_pr = average_precision_score(test_labels, test_probs, average='macro')

print("\nFinal Test Evaluation:")
print(f"Hamming Loss: {hl:.4f}")
print(f"Micro-F1: {micro_f1:.4f}")
print(f"Macro-F1: {macro_f1:.4f}")
print(f"AUC-PR (Macro): {auc_pr:.4f}")

label_columns=['ADRM','AMAN', 'ARC', 'ATM', 'BIRD', 'CABIN','CFIT', 'CTOL', 'EVAC', 'EXTL', 'F-NI', 'F-POST', 'FUEL',
'GCOL',  'GTOW',  'ICE',  'LALT',  'LOC-G',  'LOC-I', 'LOLI', 'MAC', 'OTHERS','RAMP', 'RE',
'RI',  'SCF-NP',  'SCF-PP',  'SEC',  'TURB',  'UIMC',  'UNK', 'USOS','WILD', 'WSTRW']
# 可选：输出分类报告
print("\nClassification Report:")
print(classification_report(test_labels, test_preds, target_names=label_columns, zero_division=0))