<a href="https://colab.research.google.com/github/Armando5347/polaridad-opinion/blob/main/polaridad_de_opinion_limpiado.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Imports a utilizar**

In [None]:
import pandas as pd
import numpy as np
import stanza
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
from sklearn.pipeline import Pipeline
from sklearn.neural_network import MLPClassifier
from sklearn.decomposition import TruncatedSVD
import re, os
import pickle
import threading
import gc
from scipy.sparse import hstack
from scipy.sparse import csr_matrix
from scipy.sparse import vstack
from imblearn.over_sampling import RandomOverSampler
from imblearn.under_sampling import RandomUnderSampler
from sklearn.model_selection import StratifiedKFold
from imblearn.pipeline import make_pipeline
from spellchecker import SpellChecker
from sklearn.utils import resample

**Normalizar el texto**

In [None]:
def limpiar_texto(texto):
    texto = texto.lower()
    reemplazos = {
        'á': 'a', 'é': 'e', 'í': 'i', 'ó': 'o', 'ú': 'u',
        'à': 'a', 'è': 'e', 'ì': 'i', 'ò': 'o', 'ù': 'u',
        'ä': 'a', 'ë': 'e', 'ï': 'i', 'ö': 'o', 'ü': 'u',
        'ñ': 'n'
    }

    for acentuada, normal in reemplazos.items():
        texto = texto.replace(acentuada, normal)

    return texto

config = {
    'processors': 'tokenize,mwt,pos,lemma',
    'lang': 'es'
}

nlp = stanza.Pipeline(**config)

def normalizarTexto(texto, limpia, quita_stopwords, lematiza):
    if limpia:
        texto = limpiar_texto(texto)

    try:
        doc = nlp(texto)
        cadenaNorm = ""
        for sent in doc.sentences:
            for token in sent.words:
                if quita_stopwords and lematiza:
                    if token.pos not in {'ADP', 'CCONJ', 'DET', 'SCONJ', 'PRON'}:
                        cadenaNorm += token.lemma + " "
                elif quita_stopwords:
                    if token.pos not in {'ADP', 'CCONJ', 'DET', 'SCONJ', 'PRON'}:
                        cadenaNorm += token.text + " "
                elif lematiza:
                    cadenaNorm += token.lemma + " "
                else:
                    cadenaNorm += token.text + " "
    except:
        cadenaNorm = ""

    return cadenaNorm

**Normalización adicional**

In [None]:
def corregir_texto(texto):

    # Inicializa el corrector para el idioma español
    spell = SpellChecker(language='es')

    # Dividir el texto en palabras, preservando la puntuación
    palabras = texto.split()
    texto_corregido = []

    for palabra in palabras:
        # Extrae signos de puntuación al inicio y al final
        inicio = ''.join(char for char in palabra if not char.isalnum())
        final = ''.join(char for char in reversed(palabra) if not char.isalnum())
        palabra_central = palabra[len(inicio):-len(final) or None]

        # Si la palabra está mal escrita, corrige
        if palabra_central and palabra_central not in spell:
            sugerencia = spell.correction(palabra_central)
            palabra_central = sugerencia if sugerencia else palabra_central

        # Reconstruye la palabra con signos de puntuación
        texto_corregido.append(f"{inicio}{palabra_central}{final}")

    # Une las palabras corregidas en el texto final
    return ' '.join(texto_corregido)



def corregir_repeticiones(texto):

    # Elimina repeticiones consecutivas de letras en una palabra (e.g., "caaaarro" -> "carro")
    def corregir_letras_repetidas(palabra):
        return re.sub(r'(.)\1{2,}', r'\1', palabra)

    # Dividir el texto en palabras
    palabras = texto.split()
    palabras_corregidas = []
    ultima_palabra = None

    for palabra in palabras:
        # Corregir letras repetidas en exceso
        palabra_corregida = corregir_letras_repetidas(palabra)

        # Eliminar palabras repetidas consecutivamente
        if palabra_corregida != ultima_palabra:
            palabras_corregidas.append(palabra_corregida)
            ultima_palabra = palabra_corregida

    # Reconstruir el texto corregido
    return ' '.join(palabras_corregidas)


def procesar_texto(texto, aplicar_repeticiones=False, aplicar_ortografia=False):

    if aplicar_repeticiones:
        texto = corregir_repeticiones(texto)
    if aplicar_ortografia:
        texto = corregir_texto(texto)
    return texto
def limpiado_adicional(texto):
  return corregir_texto(corregir_repeticiones(texto))

**Obtener el dataset**

In [None]:
if (os.path.exists('./polaridad-opinion/corpusLimpiado.csv')):
    data = pd.read_csv('./polaridad-opinion/corpusLimpiado.csv', sep='\t', index = False)
else:
  data = pd.read_csv("./polaridad-opinion/corpusNorm.csv", sep="\t")
  #data['Content'] = data['Content'].apply(limpiado_adicional)
  data.to_csv('./corpusLimpiado.csv', sep='\t', index = False)
data['Polarity'] = data['Polarity'].astype("UInt8") #optimizando espacio
train_split, test_split = train_test_split(
        data,
        test_size=0.2,  # 20% para prueba y 80% para entrenamiento
        random_state=0,  # Semilla para asegurar reproducibilidad
        stratify=data['Polarity']  # Mantener proporciones de clase
    )

X_train = train_split["Content"]
X_train_copy = X_train.copy()
y_train = train_split["Polarity"]
y_train_copy = y_train.copy()
X_test = test_split["Content"]
X_test_copy = X_test.copy()
y_test = test_split["Polarity"]
y_test_copy = y_test.copy()

0        1
1        1
2        1
3        1
4        1
        ..
30207    5
30208    5
30209    5
30210    5
30211    5
Name: Polarity, Length: 30212, dtype: UInt8


**Calcular polaridad**

In [None]:
def getSELFeatures(cadenas, lexicon_sel):
	polaridad_cadenas = []
	for cadena in cadenas:
		valor_alegria = 0.0
		valor_enojo = 0.0
		valor_miedo = 0.0
		valor_repulsion = 0.0
		valor_sorpresa = 0.0
		valor_tristeza = 0.0
		cadena_palabras = re.split('\s+', cadena)
		dic = {}
		for palabra in cadena_palabras:
			if palabra in lexicon_sel:
				caracteristicas = lexicon_sel[palabra]
				for emocion, valor in caracteristicas:
					if emocion == 'Alegría':
						valor_alegria = valor_alegria + float(valor)
					elif emocion == 'Tristeza':
						valor_tristeza = valor_tristeza + float(valor)
					elif emocion == 'Enojo':
						valor_enojo = valor_enojo + float(valor)
					elif emocion == 'Repulsión':
						valor_repulsion = valor_repulsion + float(valor)
					elif emocion == 'Miedo':
						valor_miedo = valor_miedo + float(valor)
					elif emocion == 'Sorpresa':
						valor_sorpresa = valor_sorpresa + float(valor)
		dic['__alegria__'] = valor_alegria
		dic['__tristeza__'] = valor_tristeza
		dic['__enojo__'] = valor_enojo
		dic['__repulsion__'] = valor_repulsion
		dic['__miedo__'] = valor_miedo
		dic['__sorpresa__'] = valor_sorpresa

		#Esto es para los valores acumulados del mapeo a positivo (alegría + sorpresa) y negativo (enojo + miedo + repulsión + tristeza)
		dic['acumuladopositivo'] = dic['__alegria__'] + dic['__sorpresa__']
		dic['acumuladonegative'] = dic['__enojo__'] + dic['__miedo__'] + dic['__repulsion__'] + dic['__tristeza__']

		polaridad_pos = np.array([dic['acumuladopositivo']])
		polaridad_neg = np.array([dic['acumuladonegative']])
		polaridad_cadena = np.concatenate((polaridad_pos, polaridad_neg), axis=0)
		polaridad_cadenas.append(polaridad_cadena)
		polarida_cadenas = csr_matrix(polaridad_cadenas) #pasar a matriz dispersa, para reducir espacio

	return polaridad_cadenas

if (os.path.exists('./polaridad-opinion/lexicon_sel.pkl')):
    lexicon_sel_file = open ('./polaridad-opinion/lexicon_sel.pkl','rb')
    lexicon_sel = pickle.load(lexicon_sel_file)
else:
    print("No se ha encontrado el archivo lexicon_sel.pkl")
    exit()

polaridad_train = getSELFeatures(X_train_copy, lexicon_sel)
polaridad_test = getSELFeatures(X_test_copy, lexicon_sel)
#print(polaridad_train)


**Vectorización**

In [None]:
tfidf_vectorizer = TfidfVectorizer(token_pattern= r'(?u)\w+|\w+\n|\.|\¿|\?', ngram_range=(1,1))
X_train_tfidf = tfidf_vectorizer.fit_transform(X_train_copy)
X_test_tfidf = tfidf_vectorizer.transform(X_test_copy)

frequency_vectorizer = CountVectorizer(binary=False, ngram_range=(1, 1))
X_train_freq = frequency_vectorizer.fit_transform(X_train_copy)
X_test_freq = frequency_vectorizer.transform(X_test_copy)

X_tain_pol = None
X_test_pol = None

**Añadir polarización a la vectorización**

In [None]:
X_train_pol = hstack([X_train_tfidf, polaridad_train]).tocsr()
X_test_pol = hstack([X_test_tfidf, polaridad_test]).tocsr()
del X_train, X_train_copy, X_test, X_test_copy, X_test_tfidf
gc.collect()

0

**Vectorización con SVD**

In [None]:
svd = TruncatedSVD(n_components=3000, random_state=0)
X_train_tfidf_svd = csr_matrix(svd.fit_transform(X_train_tfidf))
X_test_tfidf_svd = csr_matrix(svd.transform(X_test_tfidf))

X_train_pol = hstack([X_train_tfidf_svd, polaridad_train]).tocsr()
X_test_pol = hstack([X_test_tfidf_svd, polaridad_test]).tocsr()
del X_train, X_train_copy, X_test, X_test_copy, X_test_tfidf, X_test_tfidf_svd, X_train_tfidf_svd
gc.collect()

**Prueba de balanceo de clases con cross validation con modelos no pesados**

Utilizando polaridad de texto, undersampling y oversampling
*Aplicando la función resample*

In [None]:
pliegues = 5
pliegues_estratificados = StratifiedKFold(n_splits=pliegues, shuffle=True, random_state=0)
results = []
modelo_prueba = SVC(random_state=0, C=10, kernel='linear', gamma="scale")
with open('./resultados_pruebas.txt','a', encoding='utf-8') as salida:
  salida.write("Pruebas con MSV (C=10, kernel = 'linear', gamma='scale') con el corpus Normalizado\nHaciendo pruebas de balanceo [3, 2.5, 2, 1.75, 0.5]\n\n")
for train_index, test_index in pliegues_estratificados.split(X_train_pol, y_train_copy):

    # Dividir los datos en entrenamiento y prueba
    X_trainn = X_train_pol[train_index]
    X_testt = X_train_pol[test_index]
    y_trainn = y_train_copy.iloc[train_index]
    y_testt = y_train_copy.iloc[test_index]


    class_counts = y_trainn.value_counts()

    oversampling_factors = { #sujetos a cambio
        1: 3,
        2: 2.5,
        3: 2,
        4: 1.75,
    }
    undersampling_factor = 0.5

    X_train_balanceado = []
    y_train_balanceado = []

    for polarity, count in class_counts.items():
      class_data = X_trainn[y_trainn == polarity]
      #y_class = y_train[y_train == polarity]

      if polarity == 5:
        target_size = int(count * undersampling_factor)
        #class_data  = resample(class_data, replace=False, n_samples=target_size, random_state=0)
        indices = resample(np.arange(X_trainn.shape[0]), replace=False, n_samples=target_size, random_state=0)
      else:
        factor = oversampling_factors.get(polarity, 1.0)
        target_size = int(count * factor)
        indices = resample(np.arange(X_trainn.shape[0]), replace=True, n_samples=target_size, random_state=0)
        #class_data = resample(class_data, replace=True, n_samples=target_size, random_state=0)

      X_balanceado_parcial = X_trainn[indices]
      y_balanceado_parcial = y_trainn.iloc[indices]
      X_train_balanceado.append(X_balanceado_parcial)
      y_train_balanceado.append(y_balanceado_parcial)
      #aqui acaba el for, que las identaciones se pusieron de comendiates

    X_train_balanceado = vstack(X_train_balanceado)
    y_train_balanceado = np.hstack(y_train_balanceado)
    modelo_prueba.fit(X_train_balanceado, y_train_balanceado)

    y_pred = modelo_prueba.predict(X_testt)
    report = classification_report(y_testt, y_pred, output_dict=True, digits=4)
    with open('./resultados_pruebas.txt','a', encoding='utf-8') as salida:
      salida.write(str(report) + '\n')
    results.append(report['macro avg']['f1-score'])

    #una vez termina todo lo de aquí, a limpiar
    del X_trainn, X_testt, y_trainn, y_testt, X_train_balanceado
    gc.collect()

average_macro_f1 = sum(results) / len(results)
printeado =f"Promedio del f1-score de 'macro avg' en todas las iteraciones: {average_macro_f1}"
y_pred = modelo_prueba.predict(X_test_pol)
with open('./resultados_pruebas.txt','a', encoding='utf-8') as salida:
      salida.write(printeado + "\n")
      salida.write(str(classification_report(y_test, y_pred, output_dict=False, digits=4)))
      salida.write(str(confusion_matrix(y_test, y_pred)) + "\n")
print(printeado)
#print(results)


Utilizando undersampling y oversampling

In [None]:
pliegues = 5
pliegues_estratificados = StratifiedKFold(n_splits=pliegues, shuffle=True, random_state=0)
results = []
modelo_prueba = LogisticRegression()

for train_index, test_index in pliegues_estratificados.split(X_train_pol, y_train_copy):


    # Dividir los datos en entrenamiento y prueba
    X_trainn, X_testt = X_train_pol[train_index], X_train_pol[test_index]
    y_trainn, y_testt = y_train_copy.iloc[train_index], y_train_copy.iloc[test_index]

    # Aplicar under-sampling y over-sampling en los datos de entrenamiento
    under_sampler = RandomUnderSampler(random_state=0)
    over_sampling = RandomOverSampler(random_state=0)

    # Paso 1: Under-sampling
    X_resampled, y_resampled = under_sampler.fit_resample(X_trainn, y_trainn)

    # Paso 2: Over-sampling
    X_resampled, y_resampled = over_sampling.fit_resample(X_resampled, y_resampled)

    # Entrenar el modelo con los datos balanceados
    modelo_prueba.fit(X_resampled, y_resampled)

    y_pred = modelo_prueba.predict(X_testt)
    report = classification_report(y_testt, y_pred, output_dict=True)
    results.append(report['macro avg']['f1-score'])
average_macro_f1 = sum(results) / len(results)
print("Promedio del f1-score de 'macro avg' en todas las iteraciones:", average_macro_f1)
#print(results)


Promedio del f1-score de 'macro avg' en todas las iteraciones: 0.41603843745733526


**Crear pipeline, junto con los grid_search_view para los clasificadores**

In [None]:
clasificadores = [SVC(random_state=0), MLPClassifier(max_iter=1000, random_state=0)]
param_grid_svc = {
                'classifier__C': [0.1, 1, 10],  # Hiperparámetro C para SVM
                'classifier__kernel': ['linear', 'rbf', 'poly'],  # Tipo de kernel
                'classifier__gamma': ['scale', 'auto']  # Parámetro gamma
            }

param_grid_mlp = {
                'classifier__hidden_layer_sizes': [(50,), (100,), (50, 50)],
                'classifier__activation': ['tanh', 'relu'], #funcion de activacion
                'classifier__alpha': [0.0001, 0.001, 0.01]
            }
def probarClasificador(clasificador, parametros, X_train, y_train, X_test, y_test, lock):



  pipe = Pipeline([('text_representation', TfidfVectorizer(token_pattern= r'(?u)\w+|\w+\n|\.|\¿|\?', ngram_range=(1,1))), ('classifier',clasificador)])
  #aqui, cv hace cross validation por su cuenta, y busca ajustar los mejores hipermarametros a partir del f1-macro
  grid_search = GridSearchCV(pipe, parametros, cv=5,scoring='f1_macro')
  if isinstance(clasificador, SVC):
    print("Resultados de la maquina de soporte vectorial")
  else:
    print("Resultados del perceptrón multicapa")
  # Entrenar el modelo con GridSearchCV
  grid_search.fit(X_train, y_train)
  y_pred = grid_search.predict(X_test)
  with lock:
    print(str(grid_search.best_params_))
    print(classification_report(y_test, y_pred))

lock = threading.Lock()

hilo_svc = threading.Thread(name="Experimento Maquina de soporte vectorial",target=probarClasificador, args=(clasificadores[0], param_grid_svc, X_train, y_train, X_test, y_test, lock))
hilo_mlp = threading.Thread(name="Experimento Perceptron multicapa",target=probarClasificador, args=(clasificadores[1], param_grid_mlp, X_train_copy, y_train_copy, X_test_copy, y_test_copy, lock))
#ejecutar hilos
hilo_svc.start()
hilo_mlp.start()
#esperar a que terminen
hilo_svc.join()
hilo_mlp.join()