In [16]:
import csv
from datasets import load_dataset

import tensorflow_hub as hub
import tensorflow as tf
import numpy as np
import transformers
import random
import nltk

from nltk.corpus import wordnet

nltk.download('wordnet')
nltk.download('omw-1.4')
nltk.download('averaged_perceptron_tagger_eng')

[nltk_data] Downloading package wordnet to /home/raikara/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package omw-1.4 to /home/raikara/nltk_data...
[nltk_data]   Package omw-1.4 is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger_eng to
[nltk_data]     /home/raikara/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger_eng is already up-to-
[nltk_data]       date!


True

In [5]:
DEBUG = False

In [15]:
def get_synonyms(word):
    """
    Retrieve a set of synonyms for a given word using WordNet.

    word: string representing the target word

    return: set of synonyms for the word
    """
    synonyms = set()
    for syn in wordnet.synsets(word):
      for lemma in syn.lemmas():
        lemm = lemma.name().replace("_", " ")
        if len(lemm.split()) > 1:
          # Multi word synonyms are discarded
          continue
        synonyms.add(lemm)
    return list(synonyms)

def replace_synonyms(adversarial_example, pos_tags, target_pos):
      """
      Replace words in an adversarial example with their synonyms based on their
      POS tags.

      adversarial_example: randomly initialized adverstial example
      pos_tags: POS tags
      target_pos: set of broad POS categories to be replaced

      return: modified adversarial example
      """
      for i, (word, pos) in enumerate(pos_tags):
          if pos[:2] in target_pos:  # Match broad POS categories
              synonyms = get_synonyms(word)
              if synonyms:
                  adversarial_example[i] = random.choice(synonyms)  # Random synonym replacement
      return ' '.join(adversarial_example)

def generate_random_adversarial_example(x, f):
    """
    Generate a random adversarial example by replacing certain words with synonyms.

    x: list of words (original text)
    f: victim model (a function that returns model predictions)

    return: new generated adversarial example
    """

    max_iter = 100

    # Get Part-Of-Speech (POS) tags for words in x
    pos_tags = nltk.pos_tag(x)
    adversarial_example = x[:]  # Copy the original text
    original_exmp = ' '.join(x)
    
    # Define POS tags to be replaced (NN: noun, VB: verb, RB: adverb, JJ: adjective)
    target_pos = {'NN', 'VB', 'RB', 'JJ'}

    adv_exmp = replace_synonyms(adversarial_example, pos_tags, target_pos)

    query_count = 1
    # Ensure the adversarial condition is met (prediction change)
    while f(adv_exmp)[0]['label'] == f(original_exmp)[0]['label']:
        if query_count == max_iter:
            break
        query_count += 1

        pos_tags = nltk.pos_tag(adversarial_example)
        adv_exmp = replace_synonyms(adversarial_example, pos_tags, target_pos)

        if not any(get_synonyms(word) for word, pos in pos_tags if pos[:2] in target_pos):
            print("No synonyms found!")
            break

    if DEBUG:
      print(f"No. of queries made = {query_count}")
      if query_count < max_iter:
          print("Adversarial example found!")
      else:
          print("Max iterations reached!")
          print("No adversarial example found!")

    return adv_exmp, query_count

In [17]:
# Load SST-2 dataset
dataset = load_dataset("glue", "sst2")

# Check dataset structure
print(dataset)

DatasetDict({
    train: Dataset({
        features: ['sentence', 'label', 'idx'],
        num_rows: 67349
    })
    validation: Dataset({
        features: ['sentence', 'label', 'idx'],
        num_rows: 872
    })
    test: Dataset({
        features: ['sentence', 'label', 'idx'],
        num_rows: 1821
    })
})


In [27]:
class US_Encoder:
    """
    A class to hold Universal Sequence Encoder
    
    """
    def __init__(self):            
        self.model = hub.load("https://tfhub.dev/google/universal-sentence-encoder/4")
        print("Universal Sequence Encoder model loaded!")
    
    def compute_similarity(self, text1, text2):
        """
        Compute the similarity score between two texts using Universal Sentence Encoder (USE).

        text1: First sentence (string)
        text2: Second sentence (string)

        return: Cosine similarity score
        """
        # Encode the sentences into embeddings
        embeddings = self.model([text1, text2])

        # Compute cosine similarity
        similarity = np.inner(embeddings[0], embeddings[1])

        return similarity

In [28]:
use_encoder = US_Encoder()

Universal Sequence Encoder model loaded!


In [29]:
# Example usage
text1 = "I enjoy working"
text2 = "I enjoy working with NLP models."

similarity_score = use_encoder.compute_similarity(text1, text2)
print(f"Similarity Score: {similarity_score:.4f}")

Similarity Score: 0.4120


In [None]:
def substitute_original_words(x, x_t, f, query_count):
    """
    Improved Algorithm 1: Substituting Original Words Back. New implementation

    x: Original text (list of words)
    x_t: Adversarial example (list of words)
    f: Victim model (a function that returns model predictions)
    compute_similarity: Function to compute similarity between sentences
    query_count: Counter for model queries

    return: New adversarial example x_t after substitution
    """
    while True:
        diffs = [i for i, (orig, adv) in enumerate(zip(x, x_t)) if orig != adv]
        if not diffs:
            print("No differences remaining.")
            break

        best_choice = None
        best_sim_score = -1
        best_x_tmp = None

        for i in diffs:
            x_tmp = copy.deepcopy(x_t)
            x_tmp[i] = x[i]  # Replace adversarial word with original

            sim_score = compute_similarity(' '.join(x), ' '.join(x_tmp))
            
            if sim_score > best_sim_score:
                best_sim_score = sim_score
                best_choice = i
                best_x_tmp = x_tmp

        if best_choice is not None:
            # Check if adversarial condition is still met
            if f(' '.join(best_x_tmp))[0]['label'] != f(' '.join(x))[0]['label']:
                query_count += 1
                x_t = best_x_tmp  # Apply best rollback found
            else:
                break  # Stop if no more valid replacements can be made
        else:
            break

    return x_t, query_count
