IMPORTING ALL ESSENTIAL LIBRARIES

In order to avoid any version compatibility issues:
- run the following code
- restart the session
- run the rest of the code

- do NOT rerun the code just below after restarting the session

In [None]:

!pip uninstall -y scipy gensim
!pip install --no-cache-dir scipy==1.10.1 gensim==4.3.1 transformers==4.36.2


In [None]:
import numpy as np
import pandas as pd
import scipy
from transformers import AutoTokenizer, AutoModelForMaskedLM
from gensim.models import KeyedVectors
import torch
import spacy
import re
import nltk

nltk.download('wordnet')
nltk.download('omw-1.4')


IMPORTING FILES FROM DRIVE

In [None]:
from google.colab import drive
drive.mount('/content/MyDrive')

DEFINING ALL THE PATHS

In [None]:
combined_labels_spa = '/content/MyDrive/MyDrive/Computational Linguistics/Lexical Simplification/multils_test_spanish_combined_labels.tsv'

subtlex_es = '/content/MyDrive/MyDrive/Computational Linguistics/Lexical Simplification/spa-pr_web_2016_100K-words.txt'


PREPROCESSING OF THE SUBTLEX ES DICTIONARY

In [None]:
# Building a dictionary with the word frecuency data we need:

word_frequency_dict = {}

with open(subtlex_es, 'r', encoding='utf-8') as f:
  for line in f:
    parts = line.strip().split('\t')
    if len(parts) == 3:
      _, word, frequency = parts
      lemma = word.lower()
      if re.match(r'^[A-Za-záéíóúñüÁÉÍÓÚÑÜ]+$', lemma):
        word_frequency_dict[lemma] = int(frequency)

# We wanna keep only words so we're gonna filter out punctuation using regex.

word_frequency_dict = {word: frequency for word, frequency in word_frequency_dict.items() if re.match(r'^\w+$', word)}

In [None]:
!python -m spacy download es_core_news_sm

In [None]:
!wget https://dl.fbaipublicfiles.com/fasttext/vectors-crawl/cc.es.300.vec.gz
!gunzip cc.es.300.vec.gz

ORGANIZING AND PREPROCESSING THE DATA

In [None]:
import spacy
from gensim.models import KeyedVectors
from itertools import islice


# Loading the FastText model:

fasttext_model = KeyedVectors.load_word2vec_format('cc.es.300.vec')


nlp = spacy.load("es_core_news_sm")

# Lemmatization function:

def lemmatize(word):
  document = nlp(word)
  return document[0].lemma_ if document else word

# Clean every single word:

def clean_word(word):
  return re.sub(r'[^\wáéíóúñüÁÉÍÓÚÑÜ ]', '', word.lower().strip())

# Cleaning the candidate words list:

def clean_candidates(row):
    candidates = row[5:]
    cleaned = [clean_word(word) for word in candidates if pd.notnull(word) and str(word).strip()]
    return list(set(cleaned))

df_raw = pd.read_csv(combined_labels_spa, sep='\t', encoding='utf-8')

df = pd.DataFrame({
    'sentence': df_raw['context'].str.strip(),
    'target_word': df_raw['target'].apply(clean_word),
    'complexity': df_raw['complexity'],
    'candidate_words': df_raw.apply(clean_candidates, axis=1)
})

# Function to generate wordnet substitutions for Spanish:

def gen_fasttext_subs(word, top_k=5, min_frequency=1):
  """
  - this function lemmatized both the words and the candidates
  - it removes target lemma from the output
  - it filters the candidates by frequency, stopwords, length and duplication
  """

  try:
  # Getting similar words:
    target_lemma = lemmatize(word)
    target_length = len(word)
    similar_words = fasttext_model.most_similar(word.lower(), topn=top_k * 5)

    candidates = []
    for candidate, score in similar_words:
      lemma = lemmatize(candidate)
      word_clean = candidate.lower()

      if lemma == target_lemma:
        continue

      if word_frequency_dict.get(lemma, 0) < min_frequency:
        continue

      if word_clean in STOP_WORDS or not word_clean.isalpha():
        continue

      if len(word_clean) > target_length * 1.2:
        continue

      candidates.append((lemma, score))

      if len(candidates) >= top_k * 5:
        break

    # threshold will be dynamic and keep top_k by score
    sorted_selected = [word for word, _ in sorted(candidates, key=lambda x: -x[1])]

    deduplicated = list(dict.fromkeys(sorted_selected))

    return deduplicated[:top_k]


  except KeyError:
    return []

GENERATING SUBSTITUTION CANDIDATES

In [None]:
from spacy.lang.es.stop_words import STOP_WORDS
# Generating candidates using Pre-Trained Spanish FastText Embeddings :

df['generated_candidates_model_1'] = df['target_word'].apply(gen_fasttext_subs)

# Now we filter the candidates by frequency using SUBTLEX:

df_with_candidates = df[['target_word', 'generated_candidates_model_1']]

df_with_candidates.head(20)

LOADING MODEL AND TOKENIZER

In [None]:
tokenizer = AutoTokenizer.from_pretrained('PlanTL-GOB-ES/roberta-base-bne')
model = AutoModelForMaskedLM.from_pretrained('PlanTL-GOB-ES/roberta-base-bne')
model.eval()

DEFINING A FUNCTION TO GENERATE SUBSITUTION CANDIDATES

In [None]:
def gen_mlm_subs(sentence, target_word, top_k=5):
  # This function will replace the target word with [MASK] and use MLM to predict
  # its substitutions.

  clean_target = clean_word(target_word)
  pattern = re.compile(rf'\b{re.escape(clean_target)}\b', re.IGNORECASE)

  if not pattern.search(sentence):
    return []

  # Replacing only the first occurence:

  masked = pattern.sub(tokenizer.mask_token, sentence, count=1)
  inputs = tokenizer(masked, return_tensors='pt')
  with torch.no_grad():
    outputs = model(**inputs)

  mask_token_index = torch.where(inputs['input_ids'] == tokenizer.mask_token_id)[1]

  logits = outputs.logits[0, mask_token_index, :]
  token_ids = torch.topk(logits, top_k * 5, dim=1).indices[0].tolist()

  # POS and lemmatization filtering

  target_doc = nlp(clean_target)
  if not target_doc:
    return []

  target_lemma = target_doc[0].lemma_.lower()
  target_pos = target_doc[0].pos_

  candidates = []
  for token_id in token_ids:
    # Preventing artifacts for better decoding:
    word = tokenizer.decode([token_id], skip_special_tokens=True).strip()
    word_clean = clean_word(word)
    if not word_clean:
      continue

    doc = nlp(word_clean)

    if not doc:
      continue

    lemma = doc[0].lemma_.lower()
    pos = doc[0].pos_

    if lemma == target_lemma or pos != target_pos:
        continue

    if word_clean in STOP_WORDS or not word_clean.isalpha():
      continue

    if len(word_clean) > len(clean_target) * 1.2:
      continue

    if word_frequency_dict.get(lemma, 0) < 1:
      continue

    candidates.append(lemma)

    if len(candidates) >= top_k:
      break

  return list(dict.fromkeys(candidates))

GENERATING...

In [None]:
df['generated_candidates_model_bert'] = df.apply(lambda row: gen_mlm_subs(row['sentence'], row['target_word']), axis=1)

# Preview:

df_with_generated_bert = df[['sentence', 'target_word', 'generated_candidates_model_bert']]

df_with_generated_bert.head(20)

NOW: FOR THE TASK OF SUBSTITUTION SELECTION

We will implement classic filter-based selection for the first model approach, and an embedding-based semantic selection approach for the second model.

This is a function that will select candidate words by filtering through PoS, frequency and length.

In [None]:
from spacy.lang.es.stop_words import STOP_WORDS
from sklearn.metrics.pairwise import cosine_similarity
from nltk.corpus import wordnet

def get_antonyms(word):
  """
  This function will return a set of antonyms in Spanish for the target word.
  This will help us filter out any antonyms in the code.
  """
  antonyms = set()
  for syn in wordnet.synsets(word, lang='spa'):
    for lemma in syn.lemmas(lang='spa'):
      for antonym in lemma.antonyms():
        antonyms.add(antonym.name().lower())
  return antonyms

def select_model_1(target_word, sentence, candidates, fasttext_model, nlp, window_size=2, top_k=5):
  """
  - this function will rank substitution candidates based on their semantic similarity
  to the sentence words around the target word.
  """
  antonyms = get_antonyms(target_word)


  doc = nlp(sentence)
  tokens = [token.text.lower() for token in doc]

  # Finding the index of the target word:

  try:
    idx = tokens.index(target_word.lower())
  except ValueError:
    return []

  # Constructing a context windows around the target word that EXCLUDES the target word:

  start = max(0, idx - window_size)
  end = min(len(tokens), idx + window_size + 1)
  context_words = tokens[start:idx] + tokens[idx+1:end]

  # Mean context vector

  context_vectors = [fasttext_model[word] for word in context_words if word in fasttext_model]
  if not context_vectors:
    return []
  context_vector = np.mean(context_vectors, axis=0)

  # Scoring each candidate by its cosine similarity to the context vector

  scored = []
  for candidate in candidates:
    candidate_lower = candidate.lower()
    if candidate_lower not in fasttext_model:
      continue
    if candidate_lower in antonyms:
      continue
    candidate_vector = fasttext_model[candidate_lower].reshape(1, -1)
    similarity = cosine_similarity(context_vector.reshape(1, -1), candidate_vector)[0][0]
    scored.append((candidate, similarity))

  # Sorting and returning top_k:

  scored.sort(key=lambda x: -x[1])
  return [candidate for candidate, _ in scored[:top_k]]


Now applying the function to the candidates generated by the first model:

In [None]:
df['selected_candidates_1'] = df.apply(lambda row: select_model_1(row['target_word'],
                                                                  row['sentence'],
                                                                  row['generated_candidates_model_1'],
                                                                  fasttext_model,
                                                                  nlp,
                                                                  window_size=2,
                                                                  top_k=5), axis=1)



df_selected_1 = df[['target_word', 'generated_candidates_model_1', 'selected_candidates_1']]

df_selected_1.head(20)

Substitution Selection via Prompt Engineering

In [None]:
MISTRAL_API_KEY = 'z5HoxSvQiMUrFg4KXIoUDi6arrkGuBD3'

In [None]:
from ast import pattern
from sklearn.metrics.pairwise import cosine_similarity
from requests.models import Response
import requests
import re

def select_model_2(sentence, target_word, candidates, top_k=5):
  """
  - this function will prompt Mistral to reorder the candidate words based on how well they'd fit on the masked space.
  """

  pattern = re.compile(rf'\b{re.escape(target_word)}\b', re.IGNORECASE)
  masked_sentence = pattern.sub('____', sentence, count=1)
  antonyms = get_antonyms(target_word) or []

  # Constructing the prompt en español:

  prompt = (
      f"Dada la oración incompleta: \n\n"
      f"\"{masked_sentence}\n\n"
      f"Y las siguientes palabras: {candidates}\n\n"
      f"Ordena las palabras desde la más simple y adecuada (1) a la menos adecuada o la menos simple {len(candidates)} "
      f"según lo bien que encajarían en el espacio en blanco y según lo fácil que fuesen entendidas por un lector promedio."
  )

  # Mistral API

  url = 'https://api.mistral.ai/v1/chat/completions'
  headers = {
      'Authorization': f'Bearer {MISTRAL_API_KEY}',
      'Content-Type': 'application/json'
  }

  payload = {
      'model': 'mistral-small',
      'messages': [
          {
              'role': 'user',
              'content': prompt
          }
      ],
      'temperature': 0.0,
      'max_tokens': 200
  }

  response = requests.post(url, headers=headers, json=payload)
  response.raise_for_status()
  result = response.json()

  # Extracting reply

  reply = result['choices'][0]['message']['content']

  # Next, we extract the list of candidates ordered by mistral
  ord_candidates = []
  for line in reply.split('\n'):
    for candidate in candidates:
      if candidate in antonyms:
        continue
      if candidate.lower() in line.lower():
          ord_candidates.append(candidate)
          break

  return ord_candidates[:top_k]



Now we apply to the RoBERTa MLM candidates:

In [None]:
import time

def selecting_model_2(row):
  result = select_model_2(sentence=row['sentence'], target_word=row['target_word'], candidates=row['generated_candidates_model_bert'])
  time.sleep(2)
  return result

df['selected_candidates_2'] = df.apply(selecting_model_2, axis=1)

df_selected_2 = df[['target_word', 'generated_candidates_model_bert', 'selected_candidates_2']]

df_selected_2.head(20)


In [None]:
df_selected_2.to_csv("selected_candidates_mslp2024.csv", index=False, encoding='utf-8')

EVALUATION OF THE RESULTS FOR SUBSTITUTION GENERATION

DEFINING THE METRIC FUNCTIONS

In [None]:
gold_df = pd.read_csv(combined_labels_spa, sep='\t', encoding='utf-8')

sub_cols = [col for col in gold_df.columns if col.startswith('substitution_')]

def extract_gold_substitutes(row):
  return set(
      str(word).strip().lower() for word in row[5:]
      if isinstance(word, str) and word.strip())

gold_df['gold_substitutes'] = gold_df.apply(extract_gold_substitutes, axis=1)

gold_subs = gold_df[['target', 'gold_substitutes']].rename(columns={'target': 'target_word'})


In [None]:
def precision_at_k(selected, gold, k=10):
  selected = selected[:k]
  return len(set(selected) & gold) / k if k else 0.0

def recall_at_k(selected, gold, k=10):
  return len(set(selected) & gold) / len(gold) if gold else 0.0

def f1_score_at_k(p, r):
  return 2 * p * r / (p + r) if p + r else 0.0

def potential_at_k(selected, gold, k=10):
  return int(len(set(selected[:k]) & gold) > 0)

EVALUATION FUNCTION FOR GENERATION

In [None]:
def evaluate_substitution_generation_clean(df, pred_col, gold_col='gold_substitutes', k=10):
    precision_scores = []
    recall_scores    = []
    f1_scores        = []
    potential_scores = []
    valid_rows       = 0

    for _, row in df.iterrows():
        # normalize
        predicted = [w.strip().lower() for w in row[pred_col] if isinstance(w, str)]
        if not isinstance(row[gold_col], (set, list)):
          continue
        gold = {w.strip().lower() for w in row[gold_col] if isinstance(w, str)}

        if not gold or not predicted:
            continue

        valid_rows += 1
        p = precision_at_k(predicted, gold, k)
        r = recall_at_k(predicted, gold, k)
        f1 = f1_score_at_k(p, r)
        pot = potential_at_k(predicted, gold, k)

        precision_scores.append(p)
        recall_scores.append(r)
        f1_scores.append(f1)
        potential_scores.append(pot)

    if valid_rows == 0:
        return {
            "Precision at 10": 0.0,
            "Recall at 10"   : 0.0,
            "F1 Score at 10": 0.0,
            "Potential at 10": 0.0
        }

    return {
        "Precision at 10": sum(precision_scores) / valid_rows,
        "Recall at 10"   : sum(recall_scores)    / valid_rows,
        "F1 Score at 10": sum(f1_scores)        / valid_rows,
        "Potential at 10": sum(potential_scores) / valid_rows
    }


EVALUATION FUNCTION FOR SS

In [None]:
def evaluate_selection(df, pred_col, gold_col='gold_substitutes', k=5):
    precision_scores = []
    recall_scores = []
    f1_scores = []
    mrr_scores = []
    valid_rows = 0

    for _, row in df.iterrows():
        predicted = [w.strip().lower() for w in row[pred_col] if isinstance(w, str)]
        if not isinstance(row[gold_col], (set, list)):
            continue
        gold = {w.strip().lower() for w in row[gold_col] if isinstance(w, str)}

        if not gold or not predicted:
            continue

        valid_rows += 1

        # Precision, Recall, F1
        p = precision_at_k(predicted, gold, k)
        r = recall_at_k(predicted, gold, k)
        f1 = f1_score_at_k(p, r)

        precision_scores.append(p)
        recall_scores.append(r)
        f1_scores.append(f1)

        # MRR (Mean Reciprocal Rank)
        reciprocal_ranks = []
        for idx, candidate in enumerate(predicted[:k]):
            if candidate in gold:
                reciprocal_ranks.append(1 / (idx + 1))
        mrr = max(reciprocal_ranks) if reciprocal_ranks else 0.0
        mrr_scores.append(mrr)

    if valid_rows == 0:
        return {
            "Precision at 5": 0.0,
            "Recall at 5": 0.0,
            "F1 Score at 5": 0.0,
            "MRR at 5": 0.0
        }

    return {
        "Precision at 5": sum(precision_scores) / valid_rows,
        "Recall at 5": sum(recall_scores) / valid_rows,
        "F1 Score at 5": sum(f1_scores) / valid_rows,
        "MRR at 5": sum(mrr_scores) / valid_rows
    }


EVALUATION OF SUBSTITUTION SELECTION

In [None]:
for df_ in [df_with_candidates, df_with_generated_bert, df_selected_1, df_selected_2]:
    if 'gold_substitutes' in df_.columns:
        df_.drop(columns=['gold_substitutes'], inplace=True)


In [None]:
df_with_candidates = df_with_candidates.merge(gold_subs, on='target_word', how='left')
df_with_generated_bert = df_with_generated_bert.merge(gold_subs, on='target_word', how='left')
df_selected_1 = df_selected_1.merge(gold_subs, on='target_word', how='left')
df_selected_2 = df_selected_2.merge(gold_subs, on='target_word', how='left')

In [None]:
for name, df_ in [('df_with_candidates', df_with_candidates),
                  ('df_with_generated_bert', df_with_generated_bert),
                  ('df_selected_1', df_selected_1),
                  ('df_selected_2', df_selected_2)]:
    print(f"{name}: gold_substitutes in columns? {'gold_substitutes' in df_.columns}")


In [None]:
# Generation Metrics
metrics_gen_fasttext = evaluate_substitution_generation_clean(df_with_candidates, 'generated_candidates_model_1')
metrics_gen_bert     = evaluate_substitution_generation_clean(df_with_generated_bert, 'generated_candidates_model_bert')

# Selection Metrics
metrics_sel_1 = evaluate_selection(df_selected_1, 'selected_candidates_1')
metrics_sel_2 = evaluate_selection(df_selected_2, 'selected_candidates_2')

# Pretty Print Function
def pretty_print_metrics(title, metrics):
    print(f"\n📊 {title}")
    print("=" * (len(title) + 4))
    for k, v in metrics.items():
        print(f"{k:<20}: {v * 100:.2f}%")


In [None]:
pretty_print_metrics("FastText Generation Metrics", metrics_gen_fasttext)
pretty_print_metrics("RoBERTa MLM Generation Metrics", metrics_gen_bert)

pretty_print_metrics("Filtering Selection Metrics", metrics_sel_1)
pretty_print_metrics("RoBERTa Selection Metrics", metrics_sel_2)


ERROR ANALYSIS: SOME OBSERVATIONS ABOUT WHAT'S GOING ON BEHIND THE SCENES.

PER-INSTANCE ERROR ANALYSIS

In [None]:
def get_prediction_outcomes(row, pred_col, gold_col='gold_substitutes', k=10):
  preds = [word.strip().lower() for word in row[pred_col] if isinstance(word, str)][:k]

  gold_raw = row[gold_col]
  if not isinstance(gold_raw, (set, list)):
    return pd.Series({
        'target_word': row['target_word'],
        'sentence': row.get('sentence', ''),
        'predictions': preds,
        'gold_substitutes': [],
        'true_positives': [],
        'false_positives': preds,
        'false_negatives': [],
        'TP_count': 0,
        'FP_count': len(preds),
        'FN_count': 0,
        'potential_hit': 0
    })

  golds = {word.strip().lower() for word in row[gold_col] if isinstance(word, str)}

  tp = [word for word in preds if word in golds] # True positives
  fp = [word for word in preds if word not in golds] # False positives
  fn = [word for word in golds if word not in preds] # False negatives

  return pd.Series({
      'target_word': row['target_word'],
      'sentence': row.get('sentence', ''),
      'predictions': preds,
      'gold_substitutes': list(golds),
      'true_positives': tp,
      'false_positives': fp,
      'false_negatives': fn,
      'TP_count': len(tp),
      'FP_count': len(fp),
      'FN_count': len(fn),
      'potential_hit': int(len(tp) > 0)
  })

In [None]:
# Error analysis for generation:

error_analysis_gen_fasttext = df_with_candidates.apply(lambda row: get_prediction_outcomes(row, 'generated_candidates_model_1'), axis=1)
error_analysis_gen_bert = df_with_generated_bert.apply(lambda row: get_prediction_outcomes(row, 'generated_candidates_model_bert'), axis=1)

error_analysis_gen_fasttext[error_analysis_gen_fasttext['TP_count'] == 0].head(20)


In [None]:
error_analysis_gen_bert[error_analysis_gen_bert['TP_count'] == 0].head(20)


In [None]:
error_analysis_selec_fasttext = df_selected_1.apply(lambda row: get_prediction_outcomes(row, 'selected_candidates_1'), axis=1)
error_analysis_selec_bert = df_selected_2.apply(lambda row: get_prediction_outcomes(row, 'selected_candidates_2'), axis=1)

error_analysis_selec_fasttext[error_analysis_selec_fasttext['TP_count'] == 0].head(20)

In [None]:
error_analysis_selec_bert[error_analysis_selec_bert['TP_count'] == 0].head(20)

TP vs. FP Confusion Matrix

In [None]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

def plot_tp_fp_heatmap(error_df, model_name="Model"):
    # Create a pivot table (confusion-like matrix)
    matrix = error_df.groupby(['TP_count', 'FP_count']).size().unstack(fill_value=0)

    # Plot the heatmap
    plt.figure(figsize=(10, 8))
    sns.heatmap(matrix, annot=True, fmt='d', cmap='YlOrRd', linewidths=0.5)
    plt.title(f"📊 TP vs FP Heatmap — {model_name}")
    plt.xlabel("False Positives (FP)")
    plt.ylabel("True Positives (TP)")
    plt.tight_layout()
    plt.show()

# Example usage:
plot_tp_fp_heatmap(error_analysis_gen_fasttext, "FastText (Generation)")


In [None]:
plot_tp_fp_heatmap(error_analysis_gen_bert, "RoBERTa (Generation)")

SOME ERROR ANALYSIS FOR SUBSTITUTION SELECTION DATA

In [None]:
error_analysis_sel_1 = df_selected_1.apply(lambda row: get_prediction_outcomes(row, 'selected_candidates_1'), axis=1)
error_analysis_sel_2 = df_selected_2.apply(lambda row: get_prediction_outcomes(row, 'selected_candidates_2'), axis=1)


In [None]:
plot_tp_fp_heatmap(error_analysis_sel_1, "FastText (Selection)")


In [None]:
plot_tp_fp_heatmap(error_analysis_sel_2, "RoBERTa (Selection)")

In [None]:
import matplotlib.pyplot as plt

all_candidates = [word for sublist in df['generated_candidates_model_1'] for word in sublist]
freqs = [word_frequency_dict.get(word, 0) for word in all_candidates]

plt.hist(freqs, bins=50, log=True)
plt.xlabel('Word Frequency')
plt.ylabel('Count (log scale)')
plt.title('Distribution of Frequencies in FastText Candidates')
plt.axvline(1, color='red', linestyle='--', label='min_freq=1')
plt.legend()
plt.show()
