In [1]:
import os
import numpy as np
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
from tqdm import tqdm


from dataloader import read_corpus
from sentence_sim import SBERT
import criteria
from prepare_synonym_dict import read_and_clean_synonym_dict
import pymorphy2
import re
from synonym_replacement import replace_word, tokenize_ukrainian, lower_grammar_restrictions, stepwise_inflect

morph = pymorphy2.MorphAnalyzer(lang='uk')

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

path_to_ulif = '/home/mudryi/phd_projects/synonym_attack/synonyms_dictionaries/ulif_clean.json'
synonym_dict = read_and_clean_synonym_dict(path_to_ulif)

  from .autonotebook import tqdm as notebook_tqdm
Processing synonym dict: 100%|██████████| 256676/256676 [01:01<00:00, 4146.79it/s] 


In [2]:
def get_normal_form(word, morph):
    parsed = morph.parse(word)
    if parsed:
        return parsed[0].normal_form
    else:
        return word
    
def get_all_synonyms(word):
    word = word.lower()
    normal_form = get_normal_form(word, morph)

    if word in synonym_dict:
        return synonym_dict[word]
    elif normal_form in synonym_dict:
        return synonym_dict[normal_form]
    else:
        return []

def is_word_token(tok):
    return tok.strip().isalpha()  # This excludes whitespace and punctuation

In [3]:
dataset_path = "/home/mudryi/phd_projects/synonym_attack/cross_domain_uk_reviews/test_reviews.csv" # "Which dataset to attack."
nclasses = 5 # "How many classes for classification."
target_model = 'xlm-roberta-base' # "Target models for text classification: fasttext, charcnn, word level lstm "
target_model_path = "/home/mudryi/phd_projects/xml-roberta-finetune-reviews/trained_models/tmdk/model_tmdk_7_1000" #"pre-trained target model path"

word_embeddings_path = None # "path to the word embeddings for the target model"
counter_fitting_embeddings_path = None # "path to the counter-fitting embeddings we used to find synonyms"
counter_fitting_cos_sim_path = None # "pre-compute the cosine similarity scores based on the counter-fitting embeddings"

SBERT_path = 'sentence-transformers/paraphrase-xlm-r-multilingual-v1' # "Path to the USE encoder cache."

output_dir = 'adv_results_reviews_xml_roberta' # "The output directory where the attack results will be written."

## Model hyperparameters
sim_score_window = 25 # "Text length or token number to compute the semantic similarity score")
import_score_threshold = -1 # "Required mininum importance score.")
sim_score_threshold = 0.7 # "Required minimum semantic similarity score.")
synonym_num = 10 # "Number of synonyms to extract"
batch_size = 32 # "Batch size to get prediction"
data_size = 9000 # "Data size to create adversaries" reviews have 9663 records
perturb_ratio = 0 # "Whether use random perturbation for ablation study")
max_seq_length = 256 # "max sequence length for BERT target model")

if os.path.exists(output_dir) and os.listdir(output_dir):
    print("Output directory ({}) already exists and is not empty.".format(output_dir))
else:
    os.makedirs(output_dir, exist_ok=True)

# get data to attack
texts, labels = read_corpus(dataset_path)
data = list(zip(texts, labels))

data = data[:data_size] # choose how many samples for adversary
print("Data import finished!")

# construct the model
print("Building Model...")
model = AutoModelForSequenceClassification.from_pretrained(target_model_path, num_labels=nclasses)
model.to(device)
model.eval()
tokenizer = AutoTokenizer.from_pretrained(target_model)

def predictor(texts):
    """
    texts: a list of strings, e.g. ["This is a test", "Another sample"]
    Returns: a torch.Tensor of shape (batch_size, nclasses) with probabilities
    """
    if len(texts) > 0 and isinstance(texts[0], str):
        texts = [texts]

    # Now 'token_lists' is guaranteed to be a list of lists of tokens
    # Convert each token-list into one full string
    texts = [" ".join(tokens) for tokens in texts]

    # Tokenize with truncation/padding, move to GPU if available
    inputs = tokenizer(
        texts, 
        return_tensors="pt", 
        truncation=True, 
        padding=True
    ).to(device)
    
    with torch.no_grad():
        output = model(**inputs)
    
    # Output logits of shape [batch_size, nclasses]
    logits = output.logits
    
    # Convert logits -> probabilities
    probs = torch.softmax(logits, dim=1)
    return probs

# predictor = model.text_pred
print("Model built!")

# build the semantic similarity module
sbert = SBERT(SBERT_path)

# start attacking
orig_failures = 0.
adv_failures = 0.
changed_rates = []
nums_queries = []
orig_texts = []
adv_texts = []
true_labels = []
new_labels = []
log_file = open(os.path.join(output_dir, 'results_log'), 'a')

stop_words_set = criteria.get_stopwords()
print('Start attacking!')

Output directory (adv_results_reviews_xml_roberta) already exists and is not empty.
Data import finished!
Building Model...
Model built!
Start attacking!


In [288]:
sim_predictor=sbert
import_score_threshold=-1.
# sim_score_threshold = 0.7 # "Required minimum semantic similarity score.")
# sim_score_window = 25 # "Text length or token number to compute the semantic similarity score")
# synonym_num = 100 # "Number of synonyms to extract"

In [309]:
idx = 8126
text_ls, true_label = data[idx]
true_label = true_label - 1

''.join(text_ls), true_label

('До звуку претензій нема, матеріал вироблення хороший. Однак гарантійний талон порожній ... Єдині дані, які в ньому є - гарантійний талон на навушники jbl і перелік моделей. Всі поля на талоні порожні. Чи можу я бути впевненою, що такий гарантій талон приймуть як дійсний? ',
 3)

In [310]:
orig_probs = predictor([text_ls]).squeeze()
orig_label = torch.argmax(orig_probs)
orig_prob = orig_probs.max()

if true_label != orig_label:
    print("Bad")

num_queries = 1
len_text = len(text_ls)
pos_ls = criteria.get_pos(text_ls)

perturbable_indices = [i for i, tok in enumerate(text_ls) if is_word_token(tok) and tok not in stop_words_set]

leave_1_texts = [text_ls[:i] + ['<oov>'] + text_ls[i+1:] for i in perturbable_indices]
if len(leave_1_texts)==0:
    print('no words')

leave_1_probs = predictor(leave_1_texts)

num_queries += len(leave_1_texts)
leave_1_probs_argmax = torch.argmax(leave_1_probs, dim=-1)
import_scores = (orig_prob - leave_1_probs[:, orig_label] + (leave_1_probs_argmax != orig_label).float() * (leave_1_probs.max(dim=-1)[0] - torch.index_select(orig_probs, 0, leave_1_probs_argmax))).data.cpu().numpy()
words_perturb = []

for idx, score in sorted(zip(perturbable_indices, import_scores), key=lambda x: x[1], reverse=True):
    try:
        if score > import_score_threshold:
            words_perturb.append((idx, text_ls[idx]))
    except:
        print(idx, len(text_ls), import_scores.shape, text_ls, len(leave_1_texts))

In [311]:
synonyms_all = []
for (position, word) in words_perturb:
    # Use your custom dictionary-based function
    synonyms = get_all_synonyms(word)
    synonyms = [synonym for synonym in synonyms if len(synonym.split(' '))==1] #TODO explore to replace word to phrase
    synonyms = synonyms[:50]

    if synonyms:
        synonyms_all.append((word, position, synonyms))

text_prime = text_ls.copy()
text_cache = text_prime.copy()

num_changed = 0

idx = matching_indices = [i for i, (word, _, _) in enumerate(synonyms_all) if word == 'порожній'][0]
synonyms = synonyms_all[idx]
synonyms

('порожній',
 22,
 ['легковажний',
  'шалапутний',
  'пустодзвонний',
  'безшерстий',
  'відсутній',
  'спорожнений',
  'безлюдний',
  'спорожнілий',
  'безперий',
  'голісінький',
  'порожнистий',
  'спустілий',
  'голий',
  'кручений',
  'малолюдний',
  'пустопорожній',
  'несолідний',
  'фривольний',
  'неоперений',
  'вітряний',
  'легкодухий',
  'відлюдний',
  'пустинний',
  'вітруватий',
  'лисий',
  'легкодушний',
  'легкомисний',
  'легкодумний',
  'марний',
  'плюсклий',
  'оголений',
  'беззмістовний',
  'вільний',
  'поверховий',
  'нелюдний',
  'пустельний',
  'полисілий',
  'несерйозний',
  'бездумний',
  'порожніти',
  'безпредметний',
  'нізчимний',
  'пустотілий',
  'суєтний',
  'пісний',
  'малозмістовний',
  'незайнятий',
  'нагий',
  'безпутний',
  'безволосий'])

In [312]:
''.join(replace_word(''.join(text_prime), text_prime[synonyms[1]], "спустілий", morph, debug=True))

bad match порожній -> спустілий
VERB None
[Parse(word='спустілий', tag=OpencorporaTag('ADJF,actv,perf masc,nomn'), normal_form='спустілий', score=1.0, methods_stack=((DictionaryAnalyzer(), 'спустілий', 739, 0),)), Parse(word='спустілий', tag=OpencorporaTag('ADJF,actv,perf masc,accs'), normal_form='спустілий', score=1.0, methods_stack=((DictionaryAnalyzer(), 'спустілий', 739, 4),)), Parse(word='спустілий', tag=OpencorporaTag('ADJF,actv,perf masc,voct'), normal_form='спустілий', score=1.0, methods_stack=((DictionaryAnalyzer(), 'спустілий', 739, 8),))]


'До звуку претензій нема, матеріал вироблення хороший. Однак гарантійний талон спустілий ... Єдині дані, які в ньому є - гарантійний талон на навушники jbl і перелік моделей. Всі поля на талоні спустілі. Чи можу я бути впевненою, що такий гарантій талон приймуть як дійсний? '

In [294]:
morph.parse("зірка")

[Parse(word='зірка', tag=OpencorporaTag('ADJF femn,nomn'), normal_form='зіркий', score=1.0, methods_stack=((DictionaryAnalyzer(), 'зірка', 5, 9),)),
 Parse(word='зірка', tag=OpencorporaTag('ADJF femn,voct'), normal_form='зіркий', score=1.0, methods_stack=((DictionaryAnalyzer(), 'зірка', 5, 15),)),
 Parse(word='зірка', tag=OpencorporaTag('NOUN,anim femn,nomn'), normal_form='зірка', score=1.0, methods_stack=((DictionaryAnalyzer(), 'зірка', 9, 0),)),
 Parse(word='зірка', tag=OpencorporaTag('NOUN,femn,inan nomn'), normal_form='зірка', score=1.0, methods_stack=((DictionaryAnalyzer(), 'зірка', 10, 0),)),
 Parse(word='зірка', tag=OpencorporaTag('NOUN,inan femn,nomn'), normal_form='зірка', score=1.0, methods_stack=((DictionaryAnalyzer(), 'зірка', 45, 0),))]

In [11]:
grammemes = morph.parse(text_prime[synonyms[1]])[0].tag.grammemes 
cleaned_grammemes = lower_grammar_restrictions(grammemes)

stepwise_inflect(morph.parse('файно')[0], cleaned_grammemes)

Parse(word='файно', tag=OpencorporaTag('ADVB'), normal_form='файно', score=1.0, methods_stack=((DictionaryAnalyzer(), 'файно', 52, 0),))

In [12]:
cleaned_grammemes

{'2per', 'VERB', 'impf', 'impr', 'sing'}

In [13]:
# # new_texts = [text_prime[:idx] + [synonym] + text_prime[min(idx + 1, len_text):] for synonym in synonyms]
# new_texts = [replace_word(' '.join(text_prime), text_prime[idx], synonym) for synonym in synonyms]
# new_texts = [x for x in new_texts if x is not None]

# if len(new_texts) == 0:
#     print(f"no synonyms for word {text_prime[idx], text_prime}")

# new_probs = predictor(new_texts)
# num_queries += len(new_texts)

# semantic_sims = sim_predictor.semantic_sim([' '.join(text_cache)] * len(new_texts), [''.join(text) for text in new_texts])[0]

# if len(new_probs.shape) < 2:
#     new_probs = new_probs.unsqueeze(0)

# new_probs_mask = (orig_label != torch.argmax(new_probs, dim=-1)).data.cpu().numpy()
# new_probs_mask *= (semantic_sims >= sim_score_threshold)

# synonyms_pos_ls = [criteria.get_pos(new_text[max(idx - 4, 0):idx + 5])[min(4, idx)]
#                    if len(new_text) > 10 else criteria.get_pos(new_text)[idx] for new_text in new_texts]

# pos_mask = np.array(criteria.pos_filter(pos_ls[idx], synonyms_pos_ls))
# new_probs_mask *= pos_mask

In [14]:
# if np.sum(new_probs_mask) > 0:
#     text_prime[idx] = synonyms[(new_probs_mask * semantic_sims).argmax()]
#     num_changed += 1
#     break

# else:
#     new_label_probs = new_probs[:, orig_label] + torch.from_numpy(
#             (semantic_sims < sim_score_threshold) + (1 - pos_mask).astype(float)).float().cuda()
#     new_label_prob_min, new_label_prob_argmin = torch.min(new_label_probs, dim=-1)
#     if new_label_prob_min < orig_prob:
#         text_prime[idx] = synonyms[new_label_prob_argmin]
#         num_changed += 1
#     text_cache = text_prime[:]
# return ' '.join(text_prime), num_changed, orig_label, torch.argmax(predictor([text_prime])), num_queries


In [None]:

pertubation