In [None]:
!python --version

Python 3.11.11


In [None]:
import nltk
nltk.download('wordnet')
nltk.download('averaged_perceptron_tagger')
nltk.download('punkt')
nltk.download('punkt_tab')
nltk.download('averaged_perceptron_tagger_eng')

[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger_eng to
[nltk_data]     /root/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger_eng is already up-to-
[nltk_data]       date!


True

In [None]:
import os
from nltk.corpus import wordnet as wn
from nltk import word_tokenize, pos_tag
from nltk.tokenize.treebank import TreebankWordDetokenizer
import re
import random

In [None]:
MAX_TRIES = 80 # Numero máximo de intentos que probará una estratégia por mensaje
ALPHA = 0.05 # Se recomiendo entre 0.05 y 0.1 de pendiendo del tamaño original del dataset.  <=2000 muestras : 0.05
FOLDS = 12 # Para el numero de muestras debe estar entre x8 y x16

In [None]:
POS_MAP = {
    'NN': wn.NOUN, 'NNS': wn.NOUN, 'NNP': wn.NOUN, 'NNPS': wn.NOUN,  # Sustantivos
    'VB': wn.VERB, 'VBD': wn.VERB, 'VBG': wn.VERB, 'VBN': wn.VERB, 'VBP': wn.VERB, 'VBZ': wn.VERB,  # Verbos
    'JJ': wn.ADJ, 'JJR': wn.ADJ, 'JJS': wn.ADJ,  # Adjetivos
    'RB': wn.ADV, 'RBR': wn.ADV, 'RBS': wn.ADV,  # Adverbios
    'IN': None, 'DT': None, 'CC': None, 'CD': None, 'EX': None, 'FW': None, 'LS': None,
    'MD': None, 'PDT': None, 'POS': None, 'PRP': None, 'PRP$': None, 'RP': None, 'SYM': None,
    'TO': None, 'UH': None, 'WDT': None, 'WP': None, 'WP$': None, 'WRB': None,
    '.': None, ',': None, ':': None, '-LRB-': None, '-RRB-': None
}

VALID_TAGS = [
    'NN', 'NNS', 'NNP', 'NNPS',
    'VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ',
    'JJ', 'JJR', 'JJS',
    'RB', 'RBR', 'RBS']

# **Funciones para la generación de msg**

In [None]:
import re
# remove email address
def token_email_address(text):
  email = None
  pattern = r"[\w\.-]+@[\w\.-]+"

  result = re.search(pattern, text)
  if result:
    email = result.group(0)
    text = re.sub(pattern, "THIS_TOKEN_EMAIL", text)

  return text, email

In [None]:
def token_links(text):
  url = None
  pattern = r"(?:http\S+|(?:bit\.ly|goo\.gl|tinyurl\.com|is\.gd|ow\.ly|buff\.ly|adf\.ly|bit\.do|t\.co|shrtco\.de|cutt\.ly|v\.gd|lnkd\.in|rebrand\.ly|clck\.ru|www\.)\S+)"

  result = re.search(pattern, text)
  if result:
    url = result.group(0)
    text = re.sub(pattern, "THIS_TOKEN_URL", text)

  return text, url

In [None]:
def get_synonym(wd, pos):
  a = 0

  wd_synsets = wn.synsets(wd, pos=POS_MAP[pos])
  if not wd_synsets:
    print(f"No hay synsets para la palabra: {wd}")
    return None

  synonyms = list()
  for wd_synset in wd_synsets:
    synonyms.extend(wd_synset.lemma_names())

  # Eliminar las palabras que sean iguales
  true_synonyms = list(filter(lambda x: x!= wd, synonyms))

  # Eliminar polisemicos (solo dejar uno). Necesario si la selección del sinónimo es aleatoria
  true_synonyms = list(dict.fromkeys(true_synonyms))

  # A falta de una forma para determinar que sinónimo se ajusta mejor al significado de la palabra
  # original devuelvo el primero que es el más comun dentro del idioma. El problema sucede cuendo
  # la palabra de referencia es polisémica y no corresponde con la versión más usada. NLTK devuelve
  # primero sinónimos para la palabra más común entre todas las que comparten semántica.
  #return true_synonyms[0]

  # Se puede devolver también uno aleatorio.
  if len(true_synonyms) == 0:
    return None
  if len(true_synonyms) > 1:
    a = random_int(len(true_synonyms)-1)

  return true_synonyms[a]

In [None]:
def random_int(pool):
  return random.randint(0, pool)


In [None]:
def random_pair(pool):
  if(pool<0): return None
  correct = False
  r_a = 0
  r_b = 0

  while(not correct):
    r_a = random_int(pool)
    r_b = random_int(pool)
    if(r_a != r_b): correct = True

  return r_a, r_b

In [None]:
def random_change(msg):
  token_msg = msg.split()
  msg_len = len(token_msg)
  n = max(1, round(ALPHA * msg_len))

  for _ in range(n):
    a, b =  random_pair(msg_len-1)
    token_msg[a], token_msg[b] = token_msg[b], token_msg[a]
  return ' '.join(token_msg)

In [None]:
def random_deletion(msg):
  token_msg = msg.split()
  msg_len = len(token_msg)
  n = max(1, round(ALPHA * msg_len))

  for _ in range(n):
    a = random_int(msg_len-1)
    del token_msg[a]
    msg_len = len(token_msg)

  return ' '.join(token_msg)

In [None]:
def detokenize_message(token_msg, url, email):
  detokenized_sentence = TreebankWordDetokenizer().detokenize([word for word, tag in token_msg])
  if email:
    detokenized_sentence = detokenized_sentence.replace('THIS_TOKEN_EMAIL', email)
  if url:
    detokenized_sentence = detokenized_sentence.replace('THIS_TOKEN_URL', url)

  return detokenized_sentence

In [None]:
def random_synonym_change(msg):
  msg, email = token_email_address(msg)
  msg, url = token_links(msg)
  words = nltk.word_tokenize(msg)

  if not words:  # Asegurar que hay palabras
    print("Error: No hay palabras después de la tokenización.")
    return msg

  token_msg = nltk.pos_tag(words)
  msg_len = len(token_msg)

  if msg_len == 0:
    print("Error: No hay palabras etiquetadas.")
    return msg

  n = max(1, round(ALPHA * msg_len))

  for _ in range(n):
    valid = False
    tries = 0  # Si en estos intentos no encuentra una palabra con sinonimo lo deja igual
    synonym = ''
    a = 0

    while (not valid):
      a =  random_int(msg_len-1)
      wd, wd_pos = token_msg[a]

      # Comprobar que sea un tipo de palabras con sinónimos
      if (wd_pos in VALID_TAGS):
        synonym = get_synonym(wd, wd_pos)

        # Comprobar wue el sinónimo existe
        if synonym != None:
          valid = True
        else:
          tries+=1

      else:
        tries+=1

      if tries >= MAX_TRIES:  # Si no hay sinonimos para las palabras de la frase, que la devuelva como es
        print(f"No he encontrado sinónimos para ninguna de las palabras en :'{msg}'. Tries: {tries}. Generación abortada para este mensaje.")
        return detokenize_message(token_msg, url, email)

    # Sustituyo en la lista de palabras separadas el sinónimo
    words[a] = synonym.replace('_', ' ')
    token_msg = nltk.pos_tag(words) # hay que volver a tokenizar

  return detokenize_message(token_msg, url, email)

In [None]:
print(random_synonym_change("Last chance 2 claim ur £250 worth of discount vouchers-Text YES to 85022 now!SavaMob-member offers mobile T Cs 08717898036. £3.00 Sub. 16 ."))

Mensaje inicial: Last chance 2 claim ur £250 worth of discount vouchers-Text YES to 85022 now!SavaMob-member offers mobile T Cs 08717898036. £3.00 Sub. 16 .
Last chance 2 claim ur £250 worth of discount vouchers-Text YES to 85022 now! SavaMob-member offers mobile T caesium 08717898036 . £3.00 Sub . 16.


In [None]:
def random_synonym_insert(msg):
  msg, email = token_email_address(msg)
  msg, url = token_links(msg)
  words = nltk.word_tokenize(msg)

  if not words:  # Asegurar que hay palabras
    print("Error: No hay palabras después de la tokenización.")
    return msg

  token_msg = nltk.pos_tag(words)
  msg_len = len(token_msg)

  if msg_len == 0:
    print("Error: No hay palabras etiquetadas.")
    return msg

  n = max(1, round(ALPHA * msg_len))

  for _ in range(n):
    valid = False
    synonym = ''
    tries = 0  # Si en estos intentos no encuentra una palabra con sinonimo lo deja igual
    a = 0

    while (not valid):
      a =  random_int(msg_len-1)
      wd, wd_pos = token_msg[a]

      # Comprobar que sea un tipo de palabras con sinónimos
      if (wd_pos in VALID_TAGS):
        synonym = get_synonym(wd, wd_pos)

        # Comprobar wue el sinónimo existe
        if synonym != None:
          valid = True
        else:
          tries+=1

      else:
        tries+=1

      if tries >= MAX_TRIES:  # Si no hay sinonimos para las palabras de la frase, que la devuelva como es
        print(f"No he encontrado sinónimos para ninguna de las palabras en :'{msg}'. Tries: {tries}. Generación abortada para este mensaje.")
        return detokenize_message(token_msg, url, email)

    b =  random_int(msg_len-1) # Posición del sinonimo en la frase
    words.insert(b, synonym.replace('_', ' '))
    token_msg = nltk.pos_tag(words) # hay que volver a tokenizar

  return detokenize_message(token_msg, url, email)

## **Pruebas**

#**Generación Dataset**

In [None]:
import os
from time import sleep
import pandas as pd

In [None]:
# Montar drive para obtener archivos necesarios
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
dataset_path = '/content/drive/MyDrive/Master/PRACTICAS/Datasets/Dataset_Smishing14_1400.xlsx'
print(os.path.exists(dataset_path))

True


In [None]:
# Cargar el dataset
df = pd.read_excel(dataset_path)

In [None]:
def augment_dataset(df, tk):
  aug_df = df.copy()
  print('Aumentar')

  if 'SR' == tk:    # For Random Substitution
    aug_df['TEXT'] = aug_df['TEXT'].apply(lambda x: random_synonym_change(x))
  elif 'RI' == tk:  # For Random Insertion
    aug_df['TEXT'] = aug_df['TEXT'].apply(lambda x: random_synonym_insert(x))
  elif 'RS' == tk:  # For Random Swap
    aug_df['TEXT'] = aug_df['TEXT'].apply(lambda x: random_change(x))
  elif 'RD' == tk:  # For Random Deletion
    aug_df['TEXT'] = aug_df['TEXT'].apply(lambda x: random_deletion(x))
  else:
    print("No conozco esa técnica")
  return aug_df

In [None]:
# SR -> For Random Substitution
# RI -> For Random Insertion
# RS -> For Random Swap
# RD -> For Random Deletion

def augment_and_save(df, p, tk, path):
  result = pd.DataFrame()

  for _ in range(p):
    tmp = augment_dataset(df, tk)
    result = pd.concat([result, tmp], ignore_index=True)

  save_path = os.path.join(path, f"aug_x{p}_1400_with_{tk}.csv")
  print(save_path)
  result.to_csv(save_path, sep=',', index=False)

  return result

In [None]:
X = int(FOLDS/4)
augment_and_save(df, X, 'SR', '/content/drive/MyDrive/Master/PRACTICAS/Datasets')
augment_and_save(df, X, 'RI', '/content/drive/MyDrive/Master/PRACTICAS/Datasets')
augment_and_save(df, X, 'RS', '/content/drive/MyDrive/Master/PRACTICAS/Datasets')
augment_and_save(df, X, 'RD', '/content/drive/MyDrive/Master/PRACTICAS/Datasets')

[1;30;43mSe han truncado las últimas 5000 líneas del flujo de salida.[0m
No hay synsets para la palabra: RESPOND
No hay synsets para la palabra: RESPOND
No hay synsets para la palabra: Bcm
No hay synsets para la palabra: wc1n3xx
No hay synsets para la palabra: NEO69
No hay synsets para la palabra: NEO69
No hay synsets para la palabra: Hard
No hay synsets para la palabra: 03-27-31
No hay synsets para la palabra: £800
No hay synsets para la palabra: 've
No hay synsets para la palabra: Row/W1JHL
No hay synsets para la palabra: Nokia
No hay synsets para la palabra: come through
No hay synsets para la palabra: Nokia
No hay synsets para la palabra: Row/W1JHL
No hay synsets para la palabra: Bx526
No hay synsets para la palabra: Please
No hay synsets para la palabra: secure
No hay synsets para la palabra: Paytm
No hay synsets para la palabra: Though
No hay synsets para la palabra: Pls
No hay synsets para la palabra: Pls
No hay synsets para la palabra: meetins
No hay synsets para la palabra: 

Unnamed: 0,TEXT,LABEL
0,! Warning:For security reasons your account wi...,Bank
1,! BBVA Bank: From 03/04/2022 you will not be a...,Bank
2,"+1 954-283-7757 BBVA: Dear customer, your acco...",Bank
3,+34629010971 CaixaBank: We regret to inform yo...,Bank
4,11 Your account has been temporarily blocked f...,Accounts
...,...,...
4297,XCLUSIVE@CLUBSAISAI 2MOROW 28/5 SOIREE SPECIAL...,Gifts
4298,XMAS iscoming & ur awarded either £500 CD gift...,Gifts
4299,"XXXMobileMovieClub: To use your credit, click ...",Customer service
4300,Our brand new mobile music service is now live...,SMS_Service
