In [None]:
from transformers import XLMRobertaTokenizer, XLMRobertaForSequenceClassification, Trainer, TrainingArguments,EarlyStoppingCallback
from torch.utils.data import Dataset
import pandas as pd
from sklearn.model_selection import train_test_split
import torch
from sklearn.metrics import mean_squared_error
import numpy as np
from torch import nn

# Load data
data = pd.read_excel("SQP_dummyvars_data.xlsx")
data = data.dropna(subset=["quality(q^2)"])
features = data[["Request for answer text", "Answer options text"]]
labels_quality = data["quality(q^2)"]
labels_reliability = data["reliability(r^2)"]
labels_validity = data["validity(v^2)"]

train_features, test_features, train_labels_quality, test_labels_quality, train_labels_reliability, test_labels_reliability, train_labels_validity, test_labels_validity = train_test_split(
    features, labels_quality, labels_reliability, labels_validity, test_size=0.2, random_state=42
)

# Define a dataset class
class QualityDataset(Dataset):
    def __init__(self, tokenizer, texts, labels=None):
        self.encodings = tokenizer(texts[0], texts[1], truncation=True, padding=True, max_length=128)
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        if self.labels is not None:
            item['labels'] = torch.tensor(self.labels[idx], dtype=torch.float)
        return item

    def __len__(self):
        return len(self.labels)

# Initialize the tokenizer
tokenizer = XLMRobertaTokenizer.from_pretrained('xlm-roberta-base')

# # Prepare the datasets
# train_dataset_reliability = QualityDataset(tokenizer, (train_features["Request for answer text"].tolist(), train_features["Answer options text"].tolist()), train_labels_reliability.tolist())
# test_dataset_reliability = QualityDataset(tokenizer, (test_features["Request for answer text"].tolist(), test_features["Answer options text"].tolist()), test_labels_reliability.tolist())

# train_dataset_validity = QualityDataset(tokenizer, (train_features["Request for answer text"].tolist(), train_features["Answer options text"].tolist()), train_labels_validity.tolist())
# test_dataset_validity = QualityDataset(tokenizer, (test_features["Request for answer text"].tolist(), test_features["Answer options text"].tolist()), test_labels_validity.tolist())

In [None]:
import os
from sklearn.metrics import mean_squared_error
import pandas as pd

# Function to predict and save results with MSE
def predict_and_save(trainer, dataset, prefix):
    predictions = trainer.predict(dataset).predictions.squeeze()
    labels = dataset.labels
    mse = mean_squared_error(labels, predictions)
    
    results = pd.DataFrame({
        "Labels": labels,
        "Predictions": predictions
    })
    results.to_csv(f"{prefix}_predictions_{mse:.4f}.csv", index=False)
    
    return mse

# # Redefine compute_mse to predict and save results for both train and test datasets
# def compute_mse(trainer, train_dataset, test_dataset):
#     train_mse = predict_and_save(trainer, train_dataset, "train")
#     test_mse = predict_and_save(trainer, test_dataset, "test")
    
#     return {"train_mse": train_mse, "test_mse": test_mse}

def compute_mse(p):
    return {"mse": mean_squared_error(p.label_ids, p.predictions.squeeze())}

class EarlyStoppingByMSE(EarlyStoppingCallback):
    def __init__(self, patience=1, min_delta=0.0, verbose=False):
        super().__init__(patience, min_delta, verbose)

    def on_evaluate(self, args, state, control, **kwargs):
        logs = kwargs.get("metrics", {})
        mse = logs.get("eval_loss")  # 假设eval_loss是MSE

        if mse is not None and mse < 0.015:
            control.should_training_stop = True
            if self.verbose:
                print(f"Early stopping as MSE reached {mse:.4f}, which is below the threshold of 0.15.")
        return control

# Use this function in the Trainer evaluation
# Note: You will need to pass the trainer, train_dataset, and test_dataset to this function

# Training arguments (remains unchanged)
training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=30,
    per_device_train_batch_size=8,
    warmup_steps=500,
    weight_decay=0.01,
    evaluation_strategy="steps",
    logging_strategy="steps",
    logging_steps=300,
    lr_scheduler_type='cosine',
    save_total_limit=1
)

# Define the datasets (remains unchanged)
train_dataset_reliability = QualityDataset(tokenizer, (train_features["Request for answer text"].tolist(), train_features["Answer options text"].tolist()), train_labels_reliability.tolist())
test_dataset_reliability = QualityDataset(tokenizer, (test_features["Request for answer text"].tolist(), test_features["Answer options text"].tolist()), test_labels_reliability.tolist())

train_dataset_validity = QualityDataset(tokenizer, (train_features["Request for answer text"].tolist(), train_features["Answer options text"].tolist()), train_labels_validity.tolist())
test_dataset_validity = QualityDataset(tokenizer, (test_features["Request for answer text"].tolist(), test_features["Answer options text"].tolist()), test_labels_validity.tolist())

# Load the models (remains unchanged)
model_reliability = XLMRobertaForSequenceClassification.from_pretrained('xlm-roberta-base', num_labels=1)
model_validity = XLMRobertaForSequenceClassification.from_pretrained('xlm-roberta-base', num_labels=1)

# Initialize the Trainers
trainer_reliability = Trainer(
    model=model_reliability,
    args=training_args,
    train_dataset=train_dataset_reliability,
    eval_dataset=test_dataset_reliability,
    compute_metrics=compute_mse,
)

trainer_validity = Trainer(
    model=model_validity,
    args=training_args,
    train_dataset=train_dataset_validity,
    eval_dataset=test_dataset_validity,
    compute_metrics=compute_mse,
)

In [None]:
import random
import os
def seed_everything(seed):
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    os.environ['PYTHONHASHSEED']=str(seed)

seed_everything(0)

In [None]:
# Start training reliability model
trainer_reliability.train()

In [None]:
# Start training validity model
trainer_validity.train()

In [None]:
from transformers import BertTokenizer, BertForSequenceClassification, Trainer, TrainingArguments

tokenizer = BertTokenizer.from_pretrained('bert-base-multilingual-cased')
train_dataset_validity = QualityDataset(tokenizer, (train_features["Request for answer text"].tolist(), train_features["Answer options text"].tolist()), train_labels_validity.tolist())
test_dataset_validity = QualityDataset(tokenizer, (test_features["Request for answer text"].tolist(), test_features["Answer options text"].tolist()), test_labels_validity.tolist())

# Load the models (remains unchanged)
model_validity_bert = BertForSequenceClassification.from_pretrained('bert-base-multilingual-cased', num_labels=1)

trainer_validity_bert = Trainer(
    model=model_validity_bert,
    args=training_args,
    train_dataset=train_dataset_validity,
    eval_dataset=test_dataset_validity,
    compute_metrics=compute_mse,
)

trainer_validity_bert.train()

In [None]:
tokenizer = BertTokenizer.from_pretrained('bert-base-multilingual-cased')
train_dataset_reliability = QualityDataset(tokenizer, (train_features["Request for answer text"].tolist(), train_features["Answer options text"].tolist()), train_labels_reliability.tolist())
test_dataset_reliability = QualityDataset(tokenizer, (test_features["Request for answer text"].tolist(), test_features["Answer options text"].tolist()), test_labels_reliability.tolist())

# Load the models (remains unchanged)
model_reliability_bert = BertForSequenceClassification.from_pretrained('bert-base-multilingual-cased', num_labels=1)

trainer_reliability_bert = Trainer(
    model=model_reliability_bert,
    args=training_args,
    train_dataset=train_dataset_reliability,
    eval_dataset=test_dataset_reliability,
    compute_metrics=compute_mse,
)

trainer_reliability_bert.train()

In [None]:
import gc
gc.collect()
torch.cuda.empty_cache()

In [None]:
# Initialize the tokenizer
tokenizer = XLMRobertaTokenizer.from_pretrained('xlm-roberta-base')

# Prepare the datasets
train_dataset_quality = QualityDataset(tokenizer, (train_features["Request for answer text"].tolist(), train_features["Answer options text"].tolist()), train_labels_quality.tolist())
test_dataset_quality = QualityDataset(tokenizer, (test_features["Request for answer text"].tolist(), test_features["Answer options text"].tolist()), test_labels_quality.tolist())


# Load the model
model_quality = XLMRobertaForSequenceClassification.from_pretrained('xlm-roberta-base', num_labels=1)

# Initialize the Trainer
trainer_quality_ro = Trainer(
    model=model_quality,
    args=training_args,
    train_dataset=train_dataset_quality,
    eval_dataset=test_dataset_quality,
    compute_metrics=compute_mse,
    #callbacks=[EarlyStoppingByMSE(patience=1, verbose=True)],
)

# Start training
trainer_quality_ro.train()

In [None]:
from transformers import BertTokenizer, BertForSequenceClassification, Trainer, TrainingArguments
# Initialize the tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-multilingual-cased')

# Prepare the datasets
train_dataset_quality = QualityDataset(tokenizer, (train_features["Request for answer text"].tolist(), train_features["Answer options text"].tolist()), train_labels_quality.tolist())
test_dataset_quality = QualityDataset(tokenizer, (test_features["Request for answer text"].tolist(), test_features["Answer options text"].tolist()), test_labels_quality.tolist())


# Load the model
model_quality_bert = BertForSequenceClassification.from_pretrained('bert-base-multilingual-cased', num_labels=1)

# Initialize the Trainer
trainer_quality = Trainer(
    model=model_quality_bert,
    args=training_args,
    train_dataset=train_dataset_quality,
    eval_dataset=test_dataset_quality,
    compute_metrics=compute_mse,
)

# Start training
trainer_quality.train()

In [None]:
trainer_quality.train()

In [None]:
from transformers import BertTokenizer, BertForSequenceClassification, Trainer, TrainingArguments
# Initialize the tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-large-multilingual-cased')

# Prepare the datasets
train_dataset_quality = QualityDataset(tokenizer, (train_features["Request for answer text"].tolist(), train_features["Answer options text"].tolist()), train_labels_quality.tolist())
test_dataset_quality = QualityDataset(tokenizer, (test_features["Request for answer text"].tolist(), test_features["Answer options text"].tolist()), test_labels_quality.tolist())


# Load the model
model_quality_bert = BertForSequenceClassification.from_pretrained('bert-large-multilingual-cased', num_labels=1)

# Initialize the Trainer
trainer_quality = Trainer(
    model=model_quality_bert,
    args=training_args,
    train_dataset=train_dataset_quality,
    eval_dataset=test_dataset_quality,
    compute_metrics=compute_mse,
)

# Start training
trainer_quality.train()

In [None]:
reliability_predictions_test = trainer_reliability_bert.predict(test_dataset_reliability).predictions

validity_predictions_test = trainer_validity_bert.predict(test_dataset_validity).predictions

quality_predictions_test = trainer_quality.predict(test_dataset_quality).predictions

In [None]:
# Define the dense neural network model
class QualityModel(nn.Module):
    def __init__(self):
        super(QualityModel, self).__init__()
        self.dense = nn.Sequential(
            nn.Linear(4, 64),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(64, 64),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(64, 1),
        )

    def forward(self, x):
        return self.dense(x)

# Prepare input for the dense neural network
train_inputs = np.concatenate((
    train_labels_reliability.values.reshape(-1, 1),
    train_labels_validity.values.reshape(-1, 1),
    (train_labels_reliability.values * train_labels_validity.values).reshape(-1, 1),
    train_labels_quality#.values.reshape(-1, 1)
), axis=1)

test_inputs = np.concatenate((
    reliability_predictions_test.reshape(-1, 1),
    validity_predictions_test.reshape(-1, 1),
    (reliability_predictions_test * validity_predictions_test).reshape(-1, 1),
    quality_predictions_test.reshape(-1, 1)
), axis=1)

train_inputs = torch.tensor(train_inputs, dtype=torch.float32)
# train_labels_quality = torch.tensor(train_labels_quality.values, dtype=torch.float32).reshape(-1, 1)

test_inputs = torch.tensor(test_inputs, dtype=torch.float32)
# test_labels_quality = torch.tensor(test_labels_quality.values, dtype=torch.float32).reshape(-1, 1)

# Initialize and train the dense neural network
quality_model = QualityModel()
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(quality_model.parameters(), lr=0.00005)

num_epochs = 3000
for epoch in range(num_epochs):
    quality_model.train()
    optimizer.zero_grad()
    outputs = quality_model(train_inputs)
    loss = criterion(outputs, train_labels_quality)
    loss.backward()
    optimizer.step()

    # Evaluate on the test set
    quality_model.eval()
    with torch.no_grad():
        test_outputs = quality_model(test_inputs)
        test_loss = criterion(test_outputs, test_labels_quality).item()

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}, Test MSE: {test_loss:.4f}")


In [None]:
# Evaluate the final quality model
quality_model.eval()
with torch.no_grad():
    predictions = quality_model(test_inputs)
    mse = mean_squared_error(test_labels_quality.numpy(), predictions.numpy())
    print(f"MSE on test set: {mse:.4f}")

# Save the predictions and MSE to a file
results = pd.DataFrame({
    "Test Labels Quality": test_labels_quality.numpy().flatten(),
    "Predictions": predictions.numpy().flatten()
})
results.to_csv(f"quality_predictions_{mse:.4f}.csv", index=False)

In [None]:
# Training arguments
training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=30,
    per_device_train_batch_size=8,
    warmup_steps=500,
    weight_decay=0.01,
    evaluation_strategy="steps",
    logging_strategy="steps",
    logging_steps=300,
    lr_scheduler_type='cosine',
    save_total_limit=1
)

# Define MSE computation for evaluation
def compute_mse(p):
    return {"mse": mean_squared_error(p.label_ids, p.predictions.squeeze())}

# Load the models
model_reliability = XLMRobertaForSequenceClassification.from_pretrained('xlm-roberta-base', num_labels=1)
model_validity = XLMRobertaForSequenceClassification.from_pretrained('xlm-roberta-base', num_labels=1)

# Initialize the Trainers
trainer_reliability = Trainer(
    model=model_reliability,
    args=training_args,
    train_dataset=train_dataset_reliability,
    eval_dataset=test_dataset_reliability,
    compute_metrics=compute_mse,
)

trainer_validity = Trainer(
    model=model_validity,
    args=training_args,
    train_dataset=train_dataset_validity,
    eval_dataset=test_dataset_validity,
    compute_metrics=compute_mse,
)

# Start training reliability model
trainer_reliability.train()
# Evaluate reliability model
reliability_predictions = trainer_reliability.predict(test_dataset_reliability).predictions

# Start training validity model
trainer_validity.train()
# Evaluate validity model
validity_predictions = trainer_validity.predict(test_dataset_validity).predictions

# Define the dense neural network model
class QualityModel(nn.Module):
    def __init__(self):
        super(QualityModel, self).__init__()
        self.dense = nn.Sequential(
            nn.Linear(130, 64),
            nn.ReLU(),
            nn.Linear(64, 1)
        )

    def forward(self, x):
        return self.dense(x)

# Prepare input for the dense neural network
train_inputs = np.concatenate((train_labels_reliability.values.reshape(-1, 1), train_labels_validity.values.reshape(-1, 1)), axis=1)
train_inputs = torch.tensor(train_inputs, dtype=torch.float32)
train_labels_quality = torch.tensor(train_labels_quality.values, dtype=torch.float32).reshape(-1, 1)

test_inputs = np.concatenate((reliability_predictions, validity_predictions), axis=1)
test_inputs = torch.tensor(test_inputs, dtype=torch.float32)
test_labels_quality = torch.tensor(test_labels_quality.values, dtype=torch.float32).reshape(-1, 1)

# Initialize and train the dense neural network
quality_model = QualityModel()
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(quality_model.parameters(), lr=0.001)

num_epochs = 30
for epoch in range(num_epochs):
    quality_model.train()
    optimizer.zero_grad()
    outputs = quality_model(train_inputs)
    loss = criterion(outputs, train_labels_quality)
    loss.backward()
    optimizer.step()

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")

# Evaluate the final quality model
quality_model.eval()
with torch.no_grad():
    predictions = quality_model(test_inputs)
    mse = mean_squared_error(test_labels_quality.numpy(), predictions.numpy())
    print(f"MSE on test set: {mse:.4f}")

# Save the predictions and MSE to a file
results = pd.DataFrame({
    "Test Labels Quality": test_labels_quality.numpy().flatten(),
    "Predictions": predictions.numpy().flatten()
})
results.to_csv(f"quality_predictions_{mse:.4f}.csv", index=False)

In [None]:
good_questions=["""Now I have a couple of statements about keeping up with news of current affairs.
“I’m doing a better job now than I was a year ago at keeping up with news about current affairs.” 
“I’m doing a worse job now than I was a year ago at keeping up with news about current affairs.”
Which of these represents your situation, or are you somewhere in between?
 Would you say that you are doing a lot better or just somewhat better at keeping up with the news about current affairs?""",
               """“In the country as a whole, business conditions are better now than they were a year ago.”
“In the country as a whole, business conditions are worse now than they were a year ago.
Which if these represents your opinion, or are you somwhere in between?
Would you say that business conditions are a lot better, or just somewhat better?

               """]

good_answers=["""1. A lot better
2. Somewhat better
3. A bit better
4. Hasn’t changed
5. A bit worse 
6. Somewhat worse 
7. A lot worse""",
              """1. A lot better
2. Somewhat better
3. A bit better
4. Haven’t changed
5. A bit worse
6. Somewhat worse
7. A lot worse
"""
]

In [None]:
bad_questions=["""Now I have a couple of statements about keeping up with news of current affairs.
“I’m doing a better job now than I was a year ago at keeping up with news about current affairs.” 
“I’m doing a worse job now than I was a year ago at keeping up with news about current affairs.”
Which of these represents your situation, or are you somewhere in between?
 Would you say that you are doing a lot better or just somewhat better at keeping up with the news about current affairs?""",
               """“In the country as a whole, business conditions are better now than they were a year ago.”
“In the country as a whole, business conditions are worse now than they were a year ago.
Which if these represents your opinion, or are you somwhere in between?
Would you say that business conditions are a lot better, or just somewhat better?

               """]

bad_answers=["""1. A lot better
2. 
3. 
4. 
5. 
6. 
7. A lot worse""",
              """1. A lot better
2. 
3. 
4. 
5. 
6. 
7. A lot worse
"""
]