In [None]:
#This code was written to finetune BERT to identify hedges (binary classification), finetuning on one domain and testing on another.
!pip install datasets evaluate transformers[torch] accelerate -U

In [None]:
from google.colab import drive
import numpy as np
from transformers import AutoModelForSequenceClassification
import pandas as pd
import os
from datasets import load_dataset, Dataset
from sklearn.model_selection import KFold
from transformers import AutoModel, AutoTokenizer
import evaluate
from transformers import TrainingArguments, Trainer, TrainerCallback
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
from sklearn.utils import shuffle

In [None]:

drive.mount('/content/drive')

In [None]:
%cd /content/your_directory

In [None]:
# Load dataset a
ds_a = load_dataset('csv', data_files="dataset_a.csv")

In [None]:
print(ds_a)

In [None]:
tokenizer = AutoTokenizer.from_pretrained("dbmdz/bert-base-german-cased")
#or tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")


In [None]:
#define tokenizer function
def tokenize_function(examples):
  return tokenizer(
      examples["text"], padding="max_length", truncation=True
  )

In [None]:
#tokenize dataset
tokenized_dataset_a = ds_a.map(
    tokenize_function, batched=True
)

In [None]:
print(tokenized_dataset_a)


In [None]:
#make dataframe
dataseta2 = tokenized_dataset_a['train']
df_a = dataseta2.to_pandas()

In [None]:
print(df_a.head())
print(df_a.columns)

In [None]:
  #Split kfolds and make folder to save models in
  n=5
  kf = KFold(n_splits=n, random_state=42, shuffle=True)
  model_save_dir = "./directory_for_saving_models"
  os.makedirs(model_save_dir, exist_ok=True)

In [None]:
print(kf)

In [None]:
#load accuracy, f1, recall, and mcc and store in variable
accuracy_metric = evaluate.load("accuracy")
f1_metric = evaluate.load("f1")
recall_metric = evaluate.load("recall")
matthews_metric = evaluate.load("matthews_correlation")
precision_metric = evaluate.load("precision")


In [None]:
#define compute metrics
def compute_metrics(eval_pred):
  logits, labels = eval_pred
  predictions = np.argmax(logits, axis=-1)
  # store accuracy, f1 and recall and store in variables
  accuracy = accuracy_metric.compute(predictions=predictions, references=labels)['accuracy']
  f1 = f1_metric.compute(predictions=predictions, references=labels)['f1']
  recall = recall_metric.compute(predictions=predictions, references=labels)['recall']
  precision = precision_metric.compute(predictions=predictions, references=labels)['precision']
  mcc = matthews_metric.compute(predictions=predictions, references=labels)['matthews_correlation']
  #tell the computer you want each metric and that you want it labeled accordingly
  return {
        'accuracy': accuracy,
        'f1': f1,
        'recall': recall,
        'precision': precision,
        'mcc': mcc
        }

In [None]:
#save training metric logs
class DetailedTrainingLogger(TrainerCallback):
    def __init__(self, trainer, eval_dataset):
        self.logs = []
        self.trainer = trainer
        self.eval_dataset = eval_dataset

    def on_log(self, args, state, control, logs=None, **kwargs):
        # Access logs directly without disrupting default behavior
        if logs:
            logs_entry = {
                "Step": state.global_step,
                "Training Loss": logs.get("loss", "No log"),
                "Validation Loss": logs.get("eval_loss", None),
                "Accuracy": logs.get("eval_accuracy", None),
                "F1": logs.get("eval_f1", None),
                "Recall": logs.get("eval_recall", None),
                "Precision": logs.get("eval_precision", None),
                "MCC": logs.get("eval_mcc", None),
            }
            self.logs.append(logs_entry)




In [None]:
#define training arguments; adjust as needed
training_args = TrainingArguments(
    output_dir="bert_trainer",
    run_name='name',
    evaluation_strategy="steps",
    eval_steps=50, logging_steps=10,
    per_device_train_batch_size=32,
    num_train_epochs=10,
    load_best_model_at_end=True,
    metric_for_best_model='f1',
    greater_is_better=True,
    report_to="none",
    save_steps=50,
    seed=42
    )

In [None]:
#create a metrics dataframe to save metrics info later
metrics_df = pd.DataFrame(columns=['fold', 'accuracy', 'f1', 'recall', 'precision', 'mcc'])

In [None]:
#make metrics list
metrics_list = []

In [None]:
#make list to fill with incorrectly predicted sentences
incorrect_list = []

In [None]:
#create list to store logs
all_logs = []

In [None]:
fold_num = 1  # Initialize fold counter

# Initialize lists to accumulate predictions and true labels from all folds
all_predictions = []
all_true_labels = []

for train_index, val_index in kf.split(df_a):
    model = AutoModelForSequenceClassification.from_pretrained("dbmdz/bert-base-german-cased", num_labels=2)
    #or model = AutoModelForSequenceClassification.from_pretrained("bert-base-cased", num_labels=2) or other model
    #send to gpu
    model.to("cuda")

    # splitting Dataframe
    train_df = df_a.iloc[train_index]
    val_df = df_a.iloc[val_index]

    # Convert back into dataset for trainer
    train_dataset = Dataset.from_pandas(train_df)
    eval_dataset = Dataset.from_pandas(val_df)

    # Training arguments
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
        compute_metrics=compute_metrics,
    )
    # Create DetailedTrainingLogger with the trainer and eval_dataset
    detailed_logger = DetailedTrainingLogger(trainer, eval_dataset)

    # Add the logger to the trainer's callbacks
    trainer.add_callback(detailed_logger)

    trainer.train()

    # Add logs to the aggregated list, including fold number
    for log in detailed_logger.logs:
        log["Fold"] = fold_num
        all_logs.append(log)

    #Save model
    model_directory =f"{model_save_dir}/model_directory_{fold_num}"
    model.save_pretrained(model_directory)

    # Evaluate
    eval_results = trainer.evaluate()

    # Get predictions
    outputs = trainer.predict(eval_dataset)
    predictions = np.argmax(outputs.predictions, axis=1)
    true_labels = np.array(eval_dataset['label'])

    # Accumulate predictions and true labels
    all_predictions.extend(predictions)
    all_true_labels.extend(true_labels)


    # Collect metrics
    metrics = compute_metrics((outputs.predictions, eval_dataset['label']))

    selected_metrics = {
    'Fold': fold_num,
    'accuracy': metrics.get('accuracy', None),
    'f1': metrics.get('f1', None),
    'recall': metrics.get('recall', None),
    'precision': metrics.get('precision', None),
    'mcc': metrics.get('mcc', None)
}
    metrics_list.append(selected_metrics)

    # Increase fold count
    fold_num += 1

    # Identify misclassified examples
    misclassified_indices = np.where(predictions != true_labels)[0]
    misclassified_examples = val_df.iloc[misclassified_indices]

    # Print misclassified examples
    if len(misclassified_indices) > 0:
        for idx in range(len(misclassified_examples)):
            incorrect = {
                'text id': misclassified_examples.iloc[idx]['text id'],
                'text': misclassified_examples.iloc[idx]['text'],
                'True Label': true_labels[misclassified_indices[idx]],
                'Prediction': predictions[misclassified_indices[idx]]
            }
            incorrect_list.append(incorrect)


In [None]:
# Convert metrics to DataFrame
metrics_df = pd.DataFrame(metrics_list)
# Save to CSV
csv_file_path = '/content/drive/your_metrics_folder/name.csv'
metrics_df.to_csv(csv_file_path, index=False)
print(f'Metrics saved to {csv_file_path}')


In [None]:
# Create dataframe of training logs and save
training_log_df = pd.DataFrame(all_logs)
log_csv_path = '/content/drive/your_metrics_folder/training_log.csv'
training_log_df.to_csv(log_csv_path, index=False)

In [None]:
# Calculate mean and standard deviation for each metric across folds
metrics_mean = metrics_df.mean()
metrics_std = metrics_df.std()

# Add mean and std to the DataFrame for reference
summary_df = pd.DataFrame({
    'metric': metrics_mean.index,
    'mean': metrics_mean.values,
    'std': metrics_std.values
})

# Save the summary of mean and std
summary_csv_path = '/content/drive/your_metrics_folder/metrics_summary.csv'
summary_df.to_csv(summary_csv_path, index=False)

print(f'Metrics saved to {csv_file_path}')
print(f'Summary of mean and std saved to {summary_csv_path}')

In [None]:
sentences_df = pd.DataFrame(columns=['text id','text', 'true label', 'prediction',])

In [None]:
# Convert sentences, labels and predictions to DataFrame
sentences_df = pd.DataFrame(incorrect_list)
# Save to CSV
csv_file_path = '/content/drive/your_metrics_folder/incorrect_sentences.csv'
sentences_df.to_csv(csv_file_path, index=False)
print(f'Sentences saved to {csv_file_path}')

In [None]:

# Turn predictions into numpy array and store in variable
all_predictions = np.array(all_predictions)
all_true_labels = np.array(all_true_labels)

# Generate the confusion matrix
overall_cm = confusion_matrix(all_true_labels, all_predictions)

# Convert the confusion matrix to a DataFrame for better readability
cm_df = pd.DataFrame(overall_cm,
                     index=['True Negative', 'True Positive'],
                     columns=['Predicted Negative', 'Predicted Positive'])

# Save the confusion matrix to a CSV file
csv_file_path = '/content/drive/your_metrics_folder/matrix_dataseta.csv'
cm_df.to_csv(csv_file_path)
print(f"Confusion matrix saved to {csv_file_path}")

# Plot the heatmap
plt.figure(figsize=(8, 6))  # Adjust the figure size as needed
sns.heatmap(cm_df,
            annot=True,           # Annotate each cell with the numeric value
            fmt='d',              # Format for integer numbers
            cmap='Blues',         # Color map for the heatmap
            linewidths=0.5,       # Line widths between cells
            linecolor='white')    # Line color between cells

# Add labels and title
plt.title("Your Title")
plt.ylabel("Actual")
plt.xlabel("Predicted")


# Save the heatmap as an image file
heatmap_file_path = '/content/drive/your_metrics_folder/heatmap_dataseta.png'
plt.tight_layout()  # Adjust layout to avoid clipping of labels
plt.savefig(heatmap_file_path, dpi=300, bbox_inches='tight')  # Save the plot
print(f"Heatmap saved to {heatmap_file_path}")

# Show the plot
plt.tight_layout()  # Adjust layout to avoid clipping of labels
plt.show()


In [None]:
#load second dataset
ds_b= load_dataset('csv', data_files="datasetb.csv")

In [None]:
print(ds_b)

In [None]:
#tokenize second dataset
tokenized_dataset_b = ds_b.map(
    tokenize_function, batched=True
)

In [None]:
print(tokenized_dataset_b)

In [None]:
#Create dataframe
dataset_b_2 = tokenized_dataset_b['train']
df_b = dataset_b_2.to_pandas()

In [None]:
#Shuffle data
df_b = shuffle(df_b, random_state=42).reset_index(drop=True)

In [None]:
#create metrics dataframe
metrics_df_b = pd.DataFrame(columns=['fold', 'accuracy', 'f1', 'recall', 'precision', 'mcc'])

In [None]:
#make metrics list
metrics_list_b = []

In [None]:
#create list for incorrectly predicted sentences
incorrect_list_b = []

In [None]:
#new "training" arguments- set training to no
training_argsb = TrainingArguments(
    output_dir="./results",
    per_device_eval_batch_size=16,
    logging_dir="./logs",
    logging_steps=10,
    evaluation_strategy="no",
    save_strategy="no",
    do_train=False,
    report_to="none",
    disable_tqdm=False,
)

In [None]:
os.environ["WANDB_DISABLED"] = "true"  # Disable W&B for the current session


# Accumulate predictions and true labels across folds
all_predictions_b = []
all_true_labels_b = []

for fold_num in range(1, 6):  # Loop through each saved model (5 folds)
    # Load the saved model
    model_directory = f"{model_save_dir}/model_directory_{fold_num}"
    model = AutoModelForSequenceClassification.from_pretrained(model_directory, num_labels=2)
    #send to gpu
    model.to("cuda")

    # Initialize Trainer for evaluation
    trainer = Trainer(
        model=model,
        args=training_argsb,
        eval_dataset=dataset_b_2,
        compute_metrics=compute_metrics,
    )

    # Evaluate the model
    eval_results = trainer.evaluate()

    # Predict on Dataset B
    outputs = trainer.predict(dataset_b_2)
    predictions = np.argmax(outputs.predictions, axis=1)
    true_labels = dataset_b_2['label']

    # Accumulate predictions and true labels
    all_predictions_b.extend(predictions)
    all_true_labels_b.extend(true_labels)

    # Store evaluation metrics
    selected_metrics = {
        'Fold': fold_num,
        'accuracy': eval_results.get('eval_accuracy', None),
        'f1': eval_results.get('eval_f1', None),
        'recall': eval_results.get('eval_recall', None),
        'precision': eval_results.get('eval_precision', None),
        'mcc': eval_results.get('eval_mcc', None),
    }
    metrics_list_b.append(selected_metrics)

    # Identify misclassified examples
    misclassified_indices = np.where(predictions != true_labels)[0]
    for idx in misclassified_indices:
          # Explicitly convert idx to a Python int
          idx = int(idx)
          incorrect = {
              'column 1': dataset_b_2[idx]['column 1'],
              'column 2': dataset_b_2[idx]['column 2'],
              'True Label': true_labels[idx],
              'Prediction': predictions[idx],
              }
          incorrect_list_b.append(incorrect)

In [None]:
# Convert metrics to DataFrame
metrics_df_b = pd.DataFrame(metrics_list_b)
# Save to CSV
csv_file_path = '/content/drive/your_metrics_folder/metrics_datasetb.csv'
metrics_df_b.to_csv(csv_file_path, index=False)
print(f'Metrics saved to {csv_file_path}')

In [None]:
# Calculate mean and standard deviation for each metric across folds
metrics_mean_b = metrics_df_b.mean()
metrics_std_b = metrics_df_b.std()

# Add mean and std to the DataFrame for reference
summary_df_b = pd.DataFrame({
    'metric': metrics_mean_b.index,
    'mean': metrics_mean_b.values,
    'std': metrics_std_b.values
})

# Save the summary of mean and std
summary_csv_path = '/content/drive/your_metrix_folder/metrics_summary_datasetb.csv'
summary_df_b.to_csv(summary_csv_path, index=False)

print(f'Metrics saved to {csv_file_path}')
print(f'Summary of mean and std saved to {summary_csv_path}')

In [None]:
sentences_df = pd.DataFrame(columns=['text id','text', 'true label', 'prediction',])

In [None]:
# Convert sentences, labels and predictions to DataFrame
sentences_df_b = pd.DataFrame(incorrect_list_b)
# Save to CSV
csv_file_path = '/content/drive/your_metrics_folder/misidentified_sentences_datsetb.csv'
sentences_df_b.to_csv(csv_file_path, index=False)
print(f'Sentences saved to {csv_file_path}')

In [None]:
# Create array from predictions and labels
all_predictions_b = np.array(all_predictions_b)
all_true_labels_b = np.array(all_true_labels_b)

# Generate the confusion matrix
overall_cmb = confusion_matrix(all_true_labels_b, all_predictions_b)

# Convert the confusion matrix to a DataFrame for better readability
cm_df = pd.DataFrame(overall_cmb,
                     index=['True Negative', 'True Positive'],
                     columns=['Predicted Negative', 'Predicted Positive'])

# Save the confusion matrix to a CSV file
csv_file_path = '/content/drive/your_metrics_folder/matrix_dtasetb.csv'
cm_df.to_csv(csv_file_path)
print(f"Confusion matrix saved to {csv_file_path}")

# Plot the heatmap
plt.figure(figsize=(8, 6))  # Adjust the figure size as needed
sns.heatmap(cm_df,
            annot=True,           # Annotate each cell with the numeric value
            fmt='d',              # Format for integer numbers
            cmap='Blues',         # Color map for the heatmap
            linewidths=0.5,       # Line widths between cells
            linecolor='white')    # Line color between cells

# Add labels and title
plt.title("Dataset B")
plt.ylabel("Actual")
plt.xlabel("Predicted")

# Show the plot
plt.tight_layout()  # Adjust layout to avoid clipping of labels
plt.show()
