In [3]:
import textattack
from textattack import AttackArgs, Attacker
from textattack.models.wrappers import HuggingFaceModelWrapper
from textattack.datasets import Dataset
from textattack.transformations import (
    WordSwapRandomCharacterDeletion, CompositeTransformation,
    WordSwapEmbedding, WordSwapWordNet, WordSwapMaskedLM
)
from textattack.constraints.pre_transformation import RepeatModification, StopwordModification
from textattack.search_methods import ParticleSwarmOptimization
from textattack.goal_functions import GoalFunction, UntargetedClassification
from textattack.goal_function_results import ClassificationGoalFunctionResult
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
import numpy as np
import Levenshtein
import torch
from transformers import BartForConditionalGeneration, BartTokenizer
from transformers import DistilBertModel, DistilBertTokenizer
from sklearn.metrics.pairwise import cosine_similarity

# Define device (GPU if available, otherwise CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load sentiment classification model and tokenizer
model_name = "lxyuan/distilbert-base-multilingual-cased-sentiments-student"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)
wrapped_model = HuggingFaceModelWrapper(model, tokenizer)


# Define attack parameters
target_sentence = "My grandmother's secret sauce is the best ever made!"
n_decimal = 3
min_levenshtein = 30
min_length = 40
max_length = 60




In [4]:
# Custom goal function to match target probabilities
class RoundedScoreGoal_prob(GoalFunction):
    def __init__(self, model, target_scores, label_order=('positive', 'neutral', 'negative'), n_decimal=4):
        super().__init__(model)
        self.target_scores = np.round(np.array(target_scores), decimals=n_decimal)
        self.label_order = label_order
        self.n_decimal = n_decimal
        self._validate_target_scores()

    def _validate_target_scores(self):
        """Ensure rounded targets sum to ~1 and are valid probabilities."""
        if not np.isclose(self.target_scores.sum(), 1.0, atol=1e-2):
            raise ValueError("Rounded target scores must sum to ~1")
        if (self.target_scores < 0).any() or (self.target_scores > 1).any():
            raise ValueError("All target scores must be between 0 and 1")

    def _is_goal_complete(self, model_output, attacked_text):
        """Check if the generated text meets the target probabilities."""
        scores = model_output.numpy().flatten()
        rounded_scores_final = np.round(scores, decimals=self.n_decimal)
        return np.allclose(rounded_scores_final, self.target_scores)

    def _get_score(self, model_output, attacked_text):
        """Compute score as the negative distance from the target probabilities."""
        scores = model_output.numpy().flatten()
        rounded_scores = np.round(scores, decimals=self.n_decimal)
        return -np.linalg.norm(rounded_scores - self.target_scores)

    def _process_model_outputs(self, inputs, model_outputs):
        probabilities = torch.nn.functional.softmax(model_outputs, dim=-1)
        return probabilities

    def _goal_function_result_type(self):
        return ClassificationGoalFunctionResult


# Function to compute sentiment scores with rounding
def get_rounded_scores(sentence, model, n_decimal=6):
    """Compute sentiment scores and round to n_decimal places."""
    inputs = tokenizer(sentence, return_tensors="pt", padding=True, truncation=True)
    with torch.no_grad():
        outputs = model(**inputs)
    logits = outputs.logits
    scores = torch.softmax(logits, dim=1).numpy()[0]
    return np.round(scores, decimals=n_decimal)

# Custom constraint for Levenshtein distance and text length
class LevenshteinConstraint(textattack.constraints.Constraint):
    """Ensure the transformed text meets Levenshtein distance and length constraints."""
    def __init__(self, original_sentence, min_distance, min_length, max_length):
        super().__init__(compare_against_original=True)
        self.original = original_sentence
        self.min_distance = min_distance
        self.min_length = min_length
        self.max_length = max_length

    def _check_constraint(self, transformed_text, reference_text):
        candidate = transformed_text.text
        if not (self.min_length <= len(candidate) <= self.max_length):
            return False
        return Levenshtein.distance(self.original, candidate) >= self.min_distance
    
    # Function to automatically compute target scores and label
def compute_target_scores_and_label(sentence, model):
    """Compute the original sentiment scores and extract the target label."""
    scores = get_rounded_scores(sentence, model, n_decimal=3)
    label = int(np.argmax(scores))  # Extract label with the highest probability and cast to int
    return scores, label



def generate_best_paraphrase(input_sentence: str,
                             min_levenshtein_distance: int = 30,
                             min_length: int = 30,
                             max_length: int = 60,
                             num_return_sequences: int = 20,
                             temperature: float = 1.5,
                             top_k: int = 50,
                             top_p: float = 0.95,
                             cosine_similarity_threshold: float = 0.8,
                             threshold_attempts = 300) -> str:
    """
    Generates diverse paraphrases for a given input sentence and returns the best one
    based on cosine similarity and constraints. Keeps generating paraphrases
    until one has a cosine similarity score greater than the threshold.

    Args:
        input_sentence (str): Sentence to paraphrase.
        min_levenshtein_distance (int): Minimum Levenshtein distance from input_sentence.
        min_length (int): Minimum length constraint for generated paraphrases.
        max_length (int): Maximum length constraint for generated paraphrases.
        num_return_sequences (int): Number of paraphrases to generate per attempt.
        temperature (float): Temperature parameter for diversity during sampling.
        top_k (int): Top-k sampling parameter.
        top_p (float): Nucleus sampling (top-p) parameter.
        cosine_similarity_threshold (float): Minimum cosine similarity score for valid paraphrase.

    Returns:
        str: Best paraphrase based on cosine similarity or message indicating failure.
    """
    # Load BART paraphrase model and tokenizer
    bart_model = BartForConditionalGeneration.from_pretrained('eugenesiow/bart-paraphrase')
    bart_tokenizer = BartTokenizer.from_pretrained('eugenesiow/bart-paraphrase')
    bart_model = bart_model.to(device)

    # Load DistilBERT model for embeddings
    embedding_model = DistilBertModel.from_pretrained('lxyuan/distilbert-base-multilingual-cased-sentiments-student')
    embedding_tokenizer = DistilBertTokenizer.from_pretrained('lxyuan/distilbert-base-multilingual-cased-sentiments-student')
    embedding_model = embedding_model.to(device)

    # Tokenize the input sentence
    batch = bart_tokenizer(input_sentence, return_tensors='pt').to(device)


    attempts = 0
    # Loop to keep generating paraphrases until similarity score > 0.8
    while attempts < threshold_attempts:
        # Generate diverse paraphrases
        generated_ids = bart_model.generate(
            batch['input_ids'],
            num_return_sequences=num_return_sequences,
            num_beams=1,
            temperature=temperature,
            top_k=top_k,
            top_p=top_p,
            do_sample=True,
            repetition_penalty=1.5,
        )
        paraphrases = bart_tokenizer.batch_decode(generated_ids, skip_special_tokens=True)

        # Filter based on Levenshtein distance and length
        valid_paraphrases = [
            p for p in paraphrases if
            Levenshtein.distance(input_sentence, p) >= min_levenshtein_distance and
            min_length <= len(p) <= max_length
        ]

        # If valid paraphrases are found, proceed to similarity check
        if valid_paraphrases:
            # Function to compute embeddings
            def compute_embedding(sentence):
                inputs = embedding_tokenizer(sentence, return_tensors='pt', truncation=True, padding=True).to(device)
                with torch.no_grad():
                    outputs = embedding_model(**inputs)
                return outputs.last_hidden_state.mean(dim=1).squeeze().cpu().numpy()

            # Compute embeddings for the original sentence and valid paraphrases
            original_embedding = compute_embedding(input_sentence)
            paraphrase_embeddings = [compute_embedding(p) for p in valid_paraphrases]

            # Compute cosine similarity
            similarities = [cosine_similarity([original_embedding], [embedding])[0][0] for embedding in paraphrase_embeddings]

            # Check if any paraphrase meets the similarity threshold
            for i, similarity in enumerate(similarities):
                if similarity > cosine_similarity_threshold:
                    best_paraphrase = valid_paraphrases[i]
                    return best_paraphrase

        # If no valid paraphrase is found, continue the loop until success

    return "No valid paraphrase found after several attempts."


In [5]:

# Define text transformations
transformation = CompositeTransformation([
    WordSwapEmbedding(max_candidates=30),
    WordSwapMaskedLM(method="bae", max_candidates=40),
    WordSwapRandomCharacterDeletion()
])

# Define constraints
constraints = [
    RepeatModification(),
    StopwordModification(),
    LevenshteinConstraint(target_sentence, min_levenshtein, min_length, max_length),
]

# Define search method
search_method = ParticleSwarmOptimization(pop_size=80, max_iters=40, post_turn_check=True, max_turn_retries=10)


# Compute target scores and label dynamically
target_scores, label = compute_target_scores_and_label(target_sentence)



# Instantiate goal function
goal_function = RoundedScoreGoal_prob(
    model=wrapped_model,
    target_scores=target_scores,
    label_order=('positive', 'neutral', 'negative'),
    n_decimal=3
)


# Define and run attack
attack = textattack.Attack(goal_function, constraints, transformation, search_method)

import time
# Generate paraphrase dynamically
generated_sentence = generate_best_paraphrase(target_sentence)
start_time = time.time()
attack_result = attack.attack(generated_sentence, label)
elapsed_time = time.time() - start_time
print(f"Attack took {elapsed_time:.2f} seconds.")
print(f"The new text satisfying the criteria is: {attack_result}")


BertForMaskedLM has generative capabilities, as `prepare_inputs_for_generation` is explicitly overwritten. However, it doesn't directly inherit from `GenerationMixin`. From 👉v4.50👈 onwards, `PreTrainedModel` will NOT inherit from `GenerationMixin`, and this model will lose the ability to call `generate` and other related functions.
  - If you are the owner of the model architecture code, please modify your model class such that it inherits from `GenerationMixin` (after `PreTrainedModel`, otherwise you'll get an exception).
  - If you are not the owner of the model architecture class, please contact the model code owner to update it.
Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertForMaskedLM: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight', 'cls.seq_relationship.bias', 'cls.seq_relationship.weight']
- This IS expected if you are initializing BertForMaskedLM from the checkpoint of a model trained on another task or with another archite

RuntimeError: CUDA error: CUDA-capable device(s) is/are busy or unavailable
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.
