In [None]:
import numpy as np
import pandas as pd
import torch
from torch.utils.data import DataLoader, TensorDataset
from transformers import BertTokenizer, BertModel, AdamW
import torch.nn as nn
from sklearn.model_selection import train_test_split, RandomizedSearchCV
from sklearn.metrics import make_scorer, mean_squared_error
from scipy.stats import expon, loguniform, uniform

In [None]:
def randomize_scores(scores, max_deviation= 0.05):
    randomize_scores = scores * (1 + np.random.uniform(-max_deviation,max_deviation,size=scores.shape))
    return np.clip(randomize_scores,0,1)


#This will be a separate module
def penalty_function_AwT(AwT, alpha = 1, epsilon = 1e-6):
    if AwT < 0.5:
        return 1 / (AwT+epsilon)**alpha
    else:
        return 1

def reward_function_AwT(AwT, beta = 1):
    if AwT > 0.5:
        return (np.exp(AwT - 0.5))**beta
    else:
        return 1

def penalty_function_SoE(SoE, gamma=0.5, epsilon=1e-6):
    if SoE < 0.5:
        return 1 / (SoE + epsilon)**gamma
    else:
        return 1
    
def reward_function_SoE(SoE, delta=0.5):
    if SoE > 0.5:
        return (np.exp(SoE - 0.5))**delta
    else:
        return 1

def calculate_final_score(SoE, AwT, w_SoE=0.4, w_AwT=0.6, alpha=1, beta=1, gamma=0.5, delta=0.5):
    base_score = w_SoE * SoE + w_AwT * AwT
    
    if AwT < 0.5:
        AwT_score = base_score * penalty_function_AwT(AwT, alpha)
    else:
        AwT_score = base_score * reward_function_AwT(AwT, beta)
    
    if SoE < 0.5:
        final_score = AwT_score * penalty_function_SoE(SoE, gamma)
    else:
        final_score = AwT_score * reward_function_SoE(SoE, delta)
    
    return final_score

def custom_scorer(y_true, y_pred):
    return mean_squared_error(y_true, y_pred)

def evaluate_model(alpha, beta, gamma, delta):
    def model_evaluation(SoE, AwT):
        return calculate_final_score(SoE, AwT, alpha=alpha, beta=beta, gamma=gamma, delta=delta)
    return model_evaluation

In [None]:
# Load your dataset
# dataset = pd.read_csv("path_to_your_dataset.csv")
# Assuming 'text' column contains abstracts and 'score' column contains labels

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Splitting dataset
train_data, test_data = train_test_split(dataset, test_size=0.2, random_state=42)
test_data["randomized_score"] = randomize_scores(test_data["score"].values)
train_data["randomized_score"] = randomize_scores(train_data["score"].values)

# Initializing tokenizer
tokenizer = BertTokenizer.from_pretrained("microsoft/BiomedNLP-PubMedBERT-base-uncased-abstract-fulltext")

# Function to prepare DataLoader
def prepare_dataloader(data, score_column="randomized_score", batch_size=6):
    inputs = tokenizer(data["text"].tolist(), padding=True, truncation=True, max_length=512, return_tensors="pt")
    labels = torch.tensor(data[score_column].tolist()).float()
    dataset = TensorDataset(inputs['input_ids'], inputs['attention_mask'], labels) #?????
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    return dataloader

train_dataloader = prepare_dataloader(train_data)
test_dataloader = prepare_dataloader(test_data, score_column="score", batch_size=1)



class BertForRegression(nn.Module):
    def __init__(self, model_name, hidden_size=768):
        super(BertForRegression, self).__init__()
        self.bert = BertModel.from_pretrained(model_name)
        self.regressor = nn.Linear(hidden_size, 1)

    def forward(self, input_ids, attention_mask, token_type_ids=None, return_embeddings=False):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs.pooler_output
        if return_embeddings:
            return pooled_output
        return self.regressor(pooled_output)
    


def train_model(train_dataloader, device, epochs = 16):
    model = BertTokenizer.from_pretrained("microsoft/BiomedNLP-PubMedBERT-base-uncased-abstract-fulltext")
    model.to(device)
    optimizer = AdamW(model.parameters(), lr=2e-5) # test value 
    criterion = nn.MSELoss()

    for epoch in range(epochs):
        model.train()
        total_loss = 0
        for batch in train_dataloader:
            b_input_ids, b_input_mask, b_labels = [item.to(device) for item in batch]
            optimizer.zero_grad()
            outputs = model(b_input_ids,b_input_mask)
            loss = criterion(outputs.squeeze(),b_labels)
            total_loss += loss.item()
            loss.backward()
            optimizer.step()
        avg_train_loss = total_loss / len(train_dataloader)

        print(f"Epoch {epoch + 1}, Loss: {avg_train_loss}")

    return model


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
num_models = 5
models = [train_model(train_dataloader, device) for _ in range(num_models)]

def evaluate_models(models, test_dataloader, device):
    all_predictions = []
    for model in models:
        model.eval()
        predictions = []
        with torch.no_grad():
            for batch in test_dataloader:
                b_input_ids, b_input_mask, _ = [item.to(device) for item in batch]
                outputs = model(b_input_ids, b_input_mask)
                predictions.extend(outputs.squeeze().cpu().numpy())
        all_predictions.append(predictions)
    
    avg_predictions = np.mean(all_predictions, axis=0)
    return avg_predictions

avg_predictions = evaluate_models(models, test_dataloader, device)
mse = ((avg_predictions - test_data["score"].values) ** 2).mean()
print(f"Average MSE: {mse}")


param_distributions = {
    'alpha' : [expon(scale=1.0), uniform(0.1,1.9)],
    'beta' : [loguniform(1e-3,1e1),uniform(0.1,1.9)],
    'gamma' : [expon(scale=1.0),uniform(0.1,1.9)],
    'delta' : [uniform(0.1,1.9), expon(scale=1.0)]
}

random_search=RandomizedSearchCV(
    estimator=evaluate_model,
    param_distributions=param_distributions,
    n_iter=100,
    scoring=make_scorer(custom_scorer, greater_is_better= False),
    cv=5,
    random_state=42
)

# Example data
# X_train, y_train should be your training data
# x_train: array of pairs (SoE, AwT)
# y_train: corresponding true scores

random_search.fit(x_train,y_train)