In [None]:
import os
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "6"

## EVALUATOR

In [None]:
import argparse
import sys
from functools import partial
from typing import Optional, Type, Tuple, Dict, Callable, List, Union
import numpy as np
import numpy.typing as npt
import pandas as pd
import torch
from sacrebleu import CHRF
from tqdm.auto import trange
from transformers import (
    AutoModelForSequenceClassification,
    AutoTokenizer,
)
from sentence_transformers import SentenceTransformer
from scipy.spatial.distance import cosine

In [None]:
def prepare_target_label(
    model: AutoModelForSequenceClassification, target_label: Union[int, str]
) -> int:
    """
    Prepare the target label to ensure it is valid for the given model.

    Args:
        model (AutoModelForSequenceClassification): Text classification model.
        target_label (Union[int, str]): The target label to prepare.

    Returns:
        int: The prepared target label.

    Raises:
        ValueError: If the target_label is not found in model labels or ids.
    """
    if target_label in model.config.id2label:
        pass
    elif target_label in model.config.label2id:
        target_label = model.config.label2id.get(target_label)
    elif (
        isinstance(target_label, str)
        and target_label.isnumeric()
        and int(target_label) in model.config.id2label
    ):
        target_label = int(target_label)
    else:
        raise ValueError(
            f'target_label "{target_label}" not in model labels or ids: {model.config.id2label}.'
        )
    assert isinstance(target_label, int)
    return target_label

In [None]:
def classify_texts(
    model: AutoModelForSequenceClassification,
    tokenizer: AutoTokenizer,
    texts: List[str],
    target_label: Union[int, str],
    second_texts: Optional[List[str]] = None,
    batch_size: int = 32,
    raw_logits: bool = False,
    desc: Optional[str] = "Calculating STA scores",
) -> npt.NDArray[np.float64]:
    """
    Classify a list of texts using the given model and tokenizer.

    Args:
        model (AutoModelForSequenceClassification): Text classification model.
        tokenizer (AutoTokenizer): The tokenizer corresponding to the model.
        texts (List[str]): List of texts to classify.
        target_label (Union[int, str]): The target label for classification.
        second_texts (Optional[List[str]]): List of secondary texts (not needed by default).
        batch_size (int): Batch size for inference.
        raw_logits (bool): Whether to return raw logits instead of probs.
        desc (Optional[str]): Description for tqdm progress bar.

    Returns:
        npt.NDArray[np.float64]: Array of classification scores for the texts.
    """

    target_label = prepare_target_label(model, target_label)

    res = []

    for i in trange(0, len(texts), batch_size, desc=desc):
        inputs = [texts[i : i + batch_size]]

        if second_texts is not None:
            inputs.append(second_texts[i : i + batch_size])
        inputs = tokenizer(
            *inputs,
            return_tensors="pt",
            padding=True,
            truncation=True,
            max_length=512,
        ).to(model.device)

        with torch.no_grad():
            try:
                logits = model(**inputs).logits
                if raw_logits:
                    preds = logits[:, target_label]
                elif logits.shape[-1] > 1:
                    preds = torch.softmax(logits, -1)[:, target_label]
                else:
                    preds = torch.sigmoid(logits)[:, 0]
                preds = preds.cpu().numpy()
            except:
                print(i, i + batch_size)
                preds = [0] * len(inputs)
        res.append(preds)
    return np.concatenate(res)

In [None]:
def evaluate_sta(
    model: AutoModelForSequenceClassification,
    tokenizer: AutoTokenizer,
    texts: List[str],
    target_label: int = 1,  # 1 is polite, 0 is toxic
    batch_size: int = 32,
) -> npt.NDArray[np.float64]:
    """
    Evaluate the STA of a list of texts using the given model and tokenizer.

    Args:
        model (AutoModelForSequenceClassification): Text classification model.
        tokenizer (AutoTokenizer): The tokenizer corresponding to the model.
        texts (List[str]): List of texts to evaluate.
        target_label (int): The target label for style evaluation.
        batch_size (int): Batch size for inference.

    Returns:
        npt.NDArray[np.float64]: Array of STA scores for the texts.
    """
    target_label = prepare_target_label(model, target_label)
    scores = classify_texts(
        model, tokenizer, texts, target_label, batch_size=batch_size, desc="Style"
    )

    return scores

In [None]:
def evaluate_sim(
    model: SentenceTransformer,
    original_texts: List[str],
    rewritten_texts: List[str],
    batch_size: int = 32,
    efficient_version: bool = False,
) -> npt.NDArray[np.float64]:
    """
    Evaluate the semantic similarity between original and rewritten texts.
    Note that the subtraction is done due to the implementation of the `cosine` metric in `scipy`.
    For more details see: https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.distance.cosine.html

    Args:
        model (SentenceTransformer): The sentence transformer model.
        original_texts (List[str]): List of original texts.
        rewritten_texts (List[str]): List of rewritten texts.
        batch_size (int): Batch size for inference.
        efficient_version (bool): To use efficient calculation method.

    Returns:
        npt.NDArray[np.float64]: Array of semantic similarity scores between \
              original and rewritten texts.
    """
    similarities = []

    batch_size = min(batch_size, len(original_texts))
    for i in trange(0, len(original_texts), batch_size, desc="Calculating SIM scores"):
        original_batch = original_texts[i : i + batch_size]
        rewritten_batch = rewritten_texts[i : i + batch_size]

        embeddings = model.encode(original_batch + rewritten_batch)
        original_embeddings = embeddings[: len(original_batch)]
        rewritten_embeddings = embeddings[len(original_batch) :]

        if efficient_version:
            similarity_matrix = np.dot(original_embeddings, rewritten_embeddings.T)
            original_norms = np.linalg.norm(original_embeddings, axis=1)
            rewritten_norms = np.linalg.norm(rewritten_embeddings, axis=1)
            similarities.extend(
                1
                - similarity_matrix / (np.outer(original_norms, rewritten_norms) + 1e-9)
            )
        else:
            t = [
                1 - cosine(original_embedding, rewritten_embedding)
                for original_embedding, rewritten_embedding in zip(
                    original_embeddings, rewritten_embeddings
                )
            ]
            similarities.extend(t)
    return similarities

In [None]:
def evaluate_style_transfer(
    original_texts: List[str],
    rewritten_texts: List[str],
    style_model: AutoModelForSequenceClassification,
    style_tokenizer: AutoTokenizer,
    meaning_model: AutoModelForSequenceClassification,
    references: Optional[List[str]] = None,
    style_target_label: int = 1,
    batch_size: int = 32,
) -> Dict[str, npt.NDArray[np.float64]]:
    """
    Wrapper for calculating sub-metrics and joint metric.

    Args:
        original_texts (List[str]): List of original texts.
        rewritten_texts (List[str]): List of rewritten texts.
        style_model (AutoModelForSequenceClassification): The style classification model.
        style_tokenizer (AutoTokenizer): The tokenizer corresponding to the style model.
        meaning_model (AutoModelForSequenceClassification): The meaning classification model.
        references (Optional[List[str]]): List of reference texts (if available).
        style_target_label (int): The target label for style classification.
        batch_size (int): Batch size for inference.

    Returns:
        Dict[str, npt.NDArray[np.float64]]: Dictionary containing evaluation metrics.
    """
    accuracy = evaluate_sta(
        style_model,
        style_tokenizer,
        rewritten_texts,
        target_label=style_target_label,
        batch_size=batch_size,
    )

    similarity = evaluate_sim(
        model=meaning_model,
        original_texts=original_texts,
        rewritten_texts=rewritten_texts,
        batch_size=batch_size,
    )

    result = {
        "STA": accuracy,
        "SIM": similarity,
    }

    if references is not None:

        chrf = CHRF()

        result["CHRF"] = np.array(
            [
                chrf.sentence_score(hypothesis=rewritten, references=[reference]).score
                / 100
                for rewritten, reference in zip(rewritten_texts, references)
            ],
            dtype=np.float64,
        )

        result["J"] = result["STA"] * result["SIM"] * result["CHRF"]

    return result

In [None]:
def load_model(
    model_name: Optional[str] = None,
    model: Optional[AutoModelForSequenceClassification] = None,
    tokenizer: Optional[AutoTokenizer] = None,
    model_class: Type[
        AutoModelForSequenceClassification
    ] = AutoModelForSequenceClassification,
    use_cuda: bool = True,
) -> Tuple[AutoModelForSequenceClassification, AutoTokenizer]:
    """
    Load a pre-trained model and tokenizer from Hugging Face Hub.

    Args:
        model_name (Optional[str]): The name of the model to load.
        model (Optional[AutoModelForSequenceClassification]): A pre-loaded model instance.
        tokenizer (Optional[AutoTokenizer]): A pre-loaded tokenizer instance.
        model_class (Type[AutoModelForSequenceClassification]): The class of the model to load.
        use_cuda (bool): Whether to use CUDA for GPU acceleration.

    Returns:
        Tuple[AutoModelForSequenceClassification, AutoTokenizer]: The loaded model and tokenizer.
    """
    if model_name == "sentence-transformers/LaBSE":
        model = SentenceTransformer("sentence-transformers/LaBSE")
        return model
    if model is None:
        if model_name is None:
            raise ValueError("Either model or model_name should be provided")
        model = model_class.from_pretrained(model_name)

        if torch.cuda.is_available() and use_cuda:
            model.cuda()
    if tokenizer is None:
        if model_name is None:
            raise ValueError("Either tokenizer or model_name should be provided")
        tokenizer = AutoTokenizer.from_pretrained(model_name)
    return model, tokenizer

In [None]:
def format_prototext(measure: str, value: str) -> str:
    """
    Format evaluation metrics into prototext format.

    Args:
        measure (str): The name of the evaluation measure.
        value (str): The value of the evaluation measure.

    Returns:
        str: The formatted prototext string.
    """
    return f'measure{{\n  key: "{measure}"\n  value: "{value}"\n}}\n'

In [None]:
def run_evaluation(
    input: str,
    prediction: str,
    output: str,
    evaluator: Callable[..., Dict[str, npt.NDArray[np.float64]]],
) -> Dict[str, npt.NDArray[np.float64]]:
    """
    Run evaluation on input data using the specified evaluator.

    Args:
        args (argparse.Namespace): Parsed command-line arguments.
        evaluator (Callable[..., Dict[str, npt.NDArray[np.float64]]]): The evaluation function.

    Returns:
        Dict[str, npt.NDArray[np.float64]]: Dictionary containing evaluation results.
    """
    df_input = pd.read_json(input, convert_dates=False, lines=True)
    df_input = df_input[["id", "text"]]
    df_input.set_index("id", inplace=True)
    df_input.rename(columns={"text": "input"}, inplace=True)

    df_prediction = pd.read_json(prediction, convert_dates=False, lines=True)
    df_prediction = df_prediction[["id", "text"]]
    df_prediction.set_index("id", inplace=True)
    df_prediction.rename(columns={"text": "prediction"}, inplace=True)

    df = df_input.join(df_prediction)

    
    assert (
        len(df) == len(df_input) == len(df_prediction)
    ), f"Dataset lengths {len(df_input)} & {len(df_prediction)} != {len(df)}"

    assert not df.isna().values.any(), "Datasets contain missing entries"

    result = evaluator(
        original_texts=df["input"].tolist(),
        rewritten_texts=df["prediction"].tolist(),
        references=None,
    )

    aggregated = {measure: np.mean(values).item() for measure, values in result.items()}

    for measure, value in aggregated.items():
        output.write(format_prototext(measure, str(value)))
    return result

In [None]:
def main() -> None:
    parser = argparse.ArgumentParser()

    parser.add_argument(
        "-i",
        "--input",
        type=argparse.FileType("rb"),
        required=True,
        help="Initial texts before style transfer",
    )
    parser.add_argument(
        "-g",
        "--golden",
        type=argparse.FileType("rb"),
        required=False,
        help="Ground truth texts after style transfer",
    )
    parser.add_argument(
        "-o",
        "--output",
        type=argparse.FileType("w", encoding="UTF-8"),
        default=sys.stdout,
        help="Path where to write the evaluation results",
    )
    parser.add_argument(
        "--no-cuda", action="store_true", default=False, help="Disable use of CUDA"
    )
    parser.add_argument(
        "--prediction", type=argparse.FileType("rb"), help="Your model predictions"
    )

    args = parser.parse_args()

    style_model, style_tokenizer = load_model(
        "textdetox/xlmr-large-toxicity-classifier", use_cuda=not args.no_cuda
    )
    meaning_model = load_model("sentence-transformers/LaBSE", use_cuda=not args.no_cuda)

    run_evaluation(
        args,
        evaluator=partial(
            evaluate_style_transfer,
            style_model=style_model,
            style_tokenizer=style_tokenizer,
            meaning_model=meaning_model,
            style_target_label=0,
        ),
    )

In [None]:
style_model, style_tokenizer = load_model(
        "textdetox/xlmr-large-toxicity-classifier"
    )
meaning_model = load_model("sentence-transformers/LaBSE")

## LOAD DATASET

In [None]:
from datasets import load_dataset
dataset = load_dataset("textdetox/multilingual_paradetox", cache_dir="../../cache")

In [None]:
from datasets import concatenate_datasets
combined_dataset = concatenate_datasets(dataset.values())

In [None]:
lines1 = []
lines2 = []
for i, pair in enumerate(combined_dataset):
    lines1.append({"id":str(i),"text":pair["toxic_sentence"]})
    lines2.append({"id":str(i),"text":pair["neutral_sentence"]})
    #print(pair)

In [None]:
import json
with open("toxfile.jsonl", 'w') as f:
    for line in lines1:
        f.write(json.dumps(line) + '\n')

In [None]:
import json
with open("detoxfile.jsonl", 'w') as f:
    for line in lines2:
        f.write(json.dumps(line) + '\n')

## EVALUATE DATASET

In [None]:
output = run_evaluation(
        input="toxfile.jsonl",
        prediction="detoxfile.jsonl",
        output = open("output.json", "w", encoding="UTF-8"),
        evaluator=partial(
            evaluate_style_transfer,
            style_model=style_model,
            style_tokenizer=style_tokenizer,
            meaning_model=meaning_model,
            style_target_label=0,
        ),
    )

In [None]:
import matplotlib.pyplot as plt

# Example array of numbers
numbers = np.sort(output['STA'])

# Create a line plot
plt.plot(numbers)

# Add labels and title
plt.xlabel('Index')
plt.ylabel('Value')
plt.title('Array of Numbers')

# Display the plot
plt.show()

## FILTER AND PREPARE DATASET

In [None]:
from transformers import UMT5ForConditionalGeneration, AutoTokenizer, Seq2SeqTrainer, Seq2SeqTrainingArguments

In [None]:
from datasets import concatenate_datasets
from sklearn.model_selection import train_test_split

language_prompts = {
    "en": "translate from English to English: ",
    "ru": "translate from Russian to Russian: ",
    "uk": "translate from Ukrainian to Ukrainian: ",
    "de": "translate from German to German: ",
    "es": "translate from Spanish to Spanish: ",
    "am": "translate from Amharic to Amharic: ",
    "zh": "translate from Chinese to Chinese: ",
    "ar": "translate from Arabic to Arabic: ",
    "hi": "translate from Hindi to Hindi: ",
}

# Combine datasets and add language prompts
combined_datasets = {}
for lang, datasett in dataset.items():
    prompt = language_prompts[lang]
    datasett = datasett.map(lambda example: {"input_text": example["toxic_sentence"], "target_text": example["neutral_sentence"]}, remove_columns=["toxic_sentence", "neutral_sentence"])
    combined_datasets[lang] = datasett

# Concatenate all datasets
combined_dataset = concatenate_datasets(combined_datasets.values())

In [None]:
df = pd.DataFrame(combined_dataset)

In [None]:
df['lang'] = 'en'

In [None]:
df['lang'][0:400] = "en"
df['lang'][400:800] = "ru"
df['lang'][800:1200] = "uk"
df['lang'][1200:1600] = "de"
df['lang'][1600:2000] = "es"
df['lang'][2000:2400] = "am"
df['lang'][2400:2800] = "zh"
df['lang'][2800:3200] = "ar"
df['lang'][3200:] = "hi"

In [None]:
lines1 = []
lines2 = []
for i, tox in enumerate(df["input_text"]):
    lines1.append({"id":str(i),"text":tox})
    lines2.append({"id":str(i),"text":df["target_text"][i]})

In [None]:
df['sta'] = output['STA']

In [None]:
filtered_df = df[df['sta'] > 0.2]

In [None]:
for i in filtered_df.index:
    filtered_df['input_text'][i] = language_prompts[filtered_df['lang'][i]] + filtered_df['input_text'][i]

In [None]:
filtered_df

In [None]:
from datasets import Dataset
filtered_dataset = Dataset.from_pandas(filtered_df)

In [None]:
filtered_dataset

In [None]:
language_prompts = {
    "en": "translate from English to English: ",
    "ru": "translate from Russian to Russian: ",
    "uk": "translate from Ukrainian to Ukrainian: ",
    "de": "translate from German to German: ",
    "es": "translate from Spanish to Spanish: ",
    "am": "translate from Amharic to Amharic: ",
    "zh": "translate from Chinese to Chinese: ",
    "ar": "translate from Arabic to Arabic: ",
    "hi": "translate from Hindi to Hindi: ",
}

In [None]:
datasett = filtered_dataset.map(remove_columns=["lang", "sta", "__index_level_0__"])

In [None]:
datasets = datasett.train_test_split(test_size=0.15, seed=42)

In [None]:
datasets.save_to_disk("../filtered_dataset")

## TOKENIZE

In [None]:
from transformers import UMT5ForConditionalGeneration, AutoTokenizer, Seq2SeqTrainer, Seq2SeqTrainingArguments
from datasets import load_dataset

In [None]:
tokenizer = AutoTokenizer.from_pretrained("google/umt5-base", cache_dir="../../cache")

In [None]:
def tokenize_function(examples):
    return  {"input_ids": tokenizer(examples["input_text"], padding='max_length', truncation=True, max_length=512)["input_ids"], "labels": tokenizer(examples["target_text"], padding='max_length', truncation=True, max_length=128)["input_ids"]}

In [None]:
tokenized_datasets = datasets.map(tokenize_function, remove_columns=["input_text", "target_text"], num_proc=4, batched=True)

In [None]:
tokenized_datasets

In [None]:
tokenized_datasets.save_to_disk("../filtered_tokenized_datasets")