In [1]:
import os
import gc
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, StratifiedKFold, GroupKFold
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from tqdm.auto import tqdm, trange
from sklearn.metrics import accuracy_score, f1_score
from transformers import get_cosine_schedule_with_warmup,get_linear_schedule_with_warmup, AutoModel, AutoTokenizer, AutoModelForMultipleChoice, AutoConfig
import wandb
os.environ["TOKENIZERS_PARALLELISM"] = "false"

In [2]:
maper = {'business': 0, 'sport': 1, 'tech': 2,'entertainment': 3}
inv_maper = {v:k for k,v in maper.items()}

In [3]:
class TextDataset(Dataset):
    def __init__(self, df, tokenizer):
        super().__init__()
        self.data = df
        self.tokenizer = tokenizer
        
    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        row = self.data.iloc[index]        
        
        encodes = self.tokenizer.encode_plus(
            row['text'],
            max_length=384,
            truncation=True,
            padding='max_length',
            return_tensors='pt'
        )
        
        return {
            'input_ids': encodes.input_ids.squeeze(0),
            'attention_mask': encodes.attention_mask.squeeze(0),
            'token_type_ids': encodes.token_type_ids.squeeze(0),
            'labels': torch.tensor(row['label'])
        }

In [4]:
class MeanPooling(nn.Module):
    def __init__(self, clamp_min=1e-9):
        super(MeanPooling, self).__init__()
        self.clamp_min = clamp_min

    def forward(self, last_hidden_state, attention_mask):
        input_mask_expanded = attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float()
        sum_embeddings = torch.sum(last_hidden_state * input_mask_expanded, 1)
        sum_mask = input_mask_expanded.sum(1)
        sum_mask = torch.clamp(sum_mask, min=self.clamp_min)
        mean_embeddings = sum_embeddings / sum_mask
        return mean_embeddings


class TextModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.model = AutoModel.from_pretrained('/bohr/debertasmall3-nhez/v1/deberta_small/')
        self.pooler = MeanPooling()
        self.head_drop = nn.Dropout(0.00)
        self.head = nn.Linear(768,4)
        
    def forward(self, batch):
        out = self.model(batch['input_ids'],attention_mask=batch['attention_mask'])
        logits = self.pooler(out[0], batch['attention_mask'])
        logits = self.head_drop(logits)
        logits = self.head(logits)
        return logits

In [5]:
tokenizer = AutoTokenizer.from_pretrained('/bohr/debertasmall3-nhez/v1/deberta_small/')

In [6]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = TextModel().to(device)

In [7]:
model.load_state_dict(torch.load('/bohr/weightsV2de-xb26/v1/weightsV3.pt'))

In [12]:
device = 'cpu'
model = model.to(device)

In [13]:
#以下为测试过程，The test process. 
if os.environ.get('DATA_PATH'):
    data_path = os.environ.get("DATA_PATH") + "/"  
else:
    print("Baseline运行时，因为无法读取测试集，所以会有此条报错，属于正常现象")  #Baseline运行时，因为无法读取测试集，所以会有此条报错，属于正常现象
    print("When the baseline is running, this error message will appear because the test set cannot be read, which is a normal phenomenon.") #When the baseline is running, this error message will appear because the test set cannot be read, which is a normal phenomenon.
test_df = pd.read_csv(data_path+"test_news_nolabel.csv")

test_df['label'] = 0
test_dataset = TextDataset(test_df,tokenizer)
test_dataloader = DataLoader(
    test_dataset,
    batch_size=16,
    shuffle=False,
    num_workers=4,
    pin_memory=True
)
model.eval()
predicted_labels = []
with torch.no_grad():
    for batch in tqdm(test_dataloader):
        for k in batch:
            batch[k] = batch[k].to(device)
            
        with torch.autocast(str(device)):
            logits = model(batch)
            
        _, predicted = torch.max(logits, 1)
        predicted_labels.extend(predicted.cpu().numpy())

test_df["category"] = predicted_labels
test_df['category'] = test_df['category'].map(inv_maper)

output_path = "submission.csv"
test_df[['text','category']].to_csv(output_path, index=False)
print("submission.csv is generated successfully")