In [1]:
import pandas as pd
import logging
from functools import wraps
from typing import Optional, List, Dict
import numpy as np
from sklearn.preprocessing import LabelEncoder
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
import re
from typing import Tuple
from sentence_transformers import SentenceTransformer
from typing import List, Union

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset, WeightedRandomSampler
from sklearn.model_selection import train_test_split, KFold
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix
from imblearn.over_sampling import SMOTE
import optuna

  from tqdm.autonotebook import tqdm, trange


In [2]:
# Configuración del logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Descarga de recursos necesarios de NLTK
nltk.download('punkt', quiet=True)
nltk.download('stopwords', quiet=True)

def log_execution(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        logger.info(f"Ejecutando {func.__name__}")
        result = func(*args, **kwargs)
        logger.info(f"Finalizado {func.__name__}")
        return result
    return wrapper

In [3]:
class DataLoader:
    def __init__(self, file_path: str):
        self.file_path = file_path
        self.data: Optional[pd.DataFrame] = None

    @log_execution
    def load_data(self, encoding: str = 'utf-8') -> None:
        """
        Carga los datos desde el archivo CSV.
        
        Args:
            encoding (str): La codificación del archivo. Por defecto es 'utf-8'.
        """
        encodings_to_try = [encoding, 'iso-8859-1', 'latin1', 'cp1252']
        
        for enc in encodings_to_try:
            try:
                self.data = pd.read_csv(self.file_path, encoding=enc)
                logger.info(f"Datos cargados exitosamente desde {self.file_path} con codificación {enc}")
                return
            except UnicodeDecodeError:
                logger.warning(f"No se pudo cargar el archivo con la codificación {enc}. Probando otra...")
            except Exception as e:
                logger.error(f"Error al cargar los datos: {str(e)}")
                raise
        
        logger.error("No se pudo cargar el archivo con ninguna de las codificaciones probadas.")
        raise ValueError("No se pudo determinar la codificación correcta del archivo.")

    @log_execution
    def get_info(self) -> None:
        """Muestra información básica sobre el dataset."""
        if self.data is not None:
            logger.info("Información del dataset:")
            print(self.data.info())
        else:
            logger.warning("No hay datos cargados. Ejecute load_data() primero.")

    @log_execution
    def check_nulls(self) -> pd.DataFrame:
        """Verifica y retorna información sobre valores nulos en el dataset."""
        if self.data is not None:
            null_info = self.data.isnull().sum().reset_index()
            null_info.columns = ['Columna', 'Nulos']
            null_info['Porcentaje'] = (null_info['Nulos'] / len(self.data)) * 100
            logger.info("Información de valores nulos:")
            print(null_info)
            return null_info
        else:
            logger.warning("No hay datos cargados. Ejecute load_data() primero.")
            return pd.DataFrame()

In [4]:
# Ejemplo de uso

loader = DataLoader("../data/raw_data/SMS_raw_data.csv")
loader.load_data()

2024-07-29 18:50:57,806 - INFO - Ejecutando load_data
2024-07-29 18:50:57,811 - INFO - Datos cargados exitosamente desde ../data/raw_data/SMS_raw_data.csv con codificación iso-8859-1
2024-07-29 18:50:57,811 - INFO - Finalizado load_data


In [5]:
loader.get_info()

2024-07-29 18:50:58,330 - INFO - Ejecutando get_info
2024-07-29 18:50:58,330 - INFO - Información del dataset:
2024-07-29 18:50:58,336 - INFO - Finalizado get_info


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 957 entries, 0 to 956
Data columns (total 3 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   S. No.        957 non-null    int64 
 1   Message_body  957 non-null    object
 2   Label         957 non-null    object
dtypes: int64(1), object(2)
memory usage: 22.6+ KB
None


In [6]:
loader.check_nulls()

2024-07-29 18:50:58,953 - INFO - Ejecutando check_nulls
2024-07-29 18:50:58,955 - INFO - Información de valores nulos:
2024-07-29 18:50:58,958 - INFO - Finalizado check_nulls


        Columna  Nulos  Porcentaje
0        S. No.      0         0.0
1  Message_body      0         0.0
2         Label      0         0.0


Unnamed: 0,Columna,Nulos,Porcentaje
0,S. No.,0,0.0
1,Message_body,0,0.0
2,Label,0,0.0


In [7]:
# Vemos las primeras filas del dataset

loader.data.head()

Unnamed: 0,S. No.,Message_body,Label
0,1,Rofl. Its true to its name,Non-Spam
1,2,The guy did some bitching but I acted like i'd...,Non-Spam
2,3,"Pity, * was in mood for that. So...any other s...",Non-Spam
3,4,Will ü b going to esplanade fr home?,Non-Spam
4,5,This is the 2nd time we have tried 2 contact u...,Spam


In [8]:
class DataProcessor:
    def __init__(self, df: pd.DataFrame):
        self.df = df.copy()

    @staticmethod
    def clean_text(text: str) -> str:
        """
        Limpia el texto: elimina caracteres especiales, convierte a minúsculas,
        elimina stopwords y tokeniza.
        """
        # Convertir a minúsculas
        text = text.lower()
        
        # Eliminar caracteres especiales y números
        text = re.sub(r'[^a-zA-Z\s]', '', text)
        
        # Tokenizar
        tokens = word_tokenize(text)
        
        # Eliminar stopwords
        stop_words = set(stopwords.words('english'))
        tokens = [token for token in tokens if token not in stop_words]
        
        return ' '.join(tokens)

    def process_data(self) -> 'DataProcessor':
        """
        Procesa los datos: limpia el texto y codifica las etiquetas.
        """
        logger.info("Iniciando procesamiento de datos...")

        # Limpiar el texto de la columna 'Message_body'
        self.df['cleaned_text'] = self.df['Message_body'].apply(self.clean_text)
        logger.info("Texto limpiado exitosamente.")

        # Convertir la columna 'Label' a valores numéricos
        self.df['numeric_label'] = (self.df['Label'] == 'Spam').astype(int)
        logger.info("Etiquetas convertidas a valores numéricos.")

        # Eliminar las columnas originales
        self.df.drop(['Message_body', 'Label'], axis=1, inplace=True)
        logger.info("Columnas originales eliminadas.")

        logger.info("Procesamiento de datos completado.")
        return self

    def get_processed_data(self) -> pd.DataFrame:
        """
        Devuelve el DataFrame procesado.
        """
        if 'cleaned_text' not in self.df.columns or 'numeric_label' not in self.df.columns:
            raise ValueError("Los datos aún no han sido procesados. Ejecute process_data() primero.")
        return self.df

In [9]:
# Ejemplo de uso
    
processor = DataProcessor(loader.data)
processor = processor.process_data()

2024-07-29 18:51:06,898 - INFO - Iniciando procesamiento de datos...
2024-07-29 18:51:06,985 - INFO - Texto limpiado exitosamente.
2024-07-29 18:51:06,987 - INFO - Etiquetas convertidas a valores numéricos.
2024-07-29 18:51:06,988 - INFO - Columnas originales eliminadas.
2024-07-29 18:51:06,988 - INFO - Procesamiento de datos completado.


In [10]:
processor.df.head()

Unnamed: 0,S. No.,cleaned_text,numeric_label
0,1,rofl true name,0
1,2,guy bitching acted like id interested buying s...,0
2,3,pity mood soany suggestions,0
3,4,b going esplanade fr home,0
4,5,nd time tried contact u u pound prize claim ea...,1


In [11]:
# Guardar el DataFrame procesado
processor.df.to_csv("../data/processed_data/processed_sms_data.csv", index=False)

In [12]:
class EmbeddingProcessor:
    def __init__(self, model_name: str = 'all-MiniLM-L6-v2'):
        self.model = SentenceTransformer(model_name)
        logger.info(f"Modelo {model_name} cargado exitosamente.")

    def generate_embeddings(self, texts: Union[List[str], pd.Series]) -> np.ndarray:
        """
        Genera embeddings para una lista de textos o una Serie de pandas.
        """
        logger.info("Generando embeddings...")
        embeddings = self.model.encode(texts, show_progress_bar=True)
        logger.info("Embeddings generados exitosamente.")
        return embeddings

    def process_dataframe(self, df: pd.DataFrame, text_column: str) -> pd.DataFrame:
        """
        Procesa un DataFrame, generando embeddings para la columna de texto especificada
        y los combina en una sola columna.
        """
        if text_column not in df.columns:
            raise ValueError(f"La columna {text_column} no existe en el DataFrame.")

        embeddings = self.generate_embeddings(df[text_column])
        
        # Convertir los embeddings a una lista de listas para almacenarlos en una sola columna
        embeddings_list = embeddings.tolist()
        
        # Añadir los embeddings como una nueva columna al DataFrame
        df['combined_embeddings'] = embeddings_list
        
        logger.info(f"DataFrame procesado. Embeddings combinados en una sola columna.")
        
        return df

    @staticmethod
    def get_embedding_dim(df: pd.DataFrame) -> int:
        """
        Obtiene la dimensión de los embeddings combinados.
        """
        if 'combined_embeddings' not in df.columns:
            raise ValueError("El DataFrame no contiene la columna 'combined_embeddings'.")
        
        # Asumimos que todos los embeddings tienen la misma dimensión
        embedding_dim = len(df['combined_embeddings'].iloc[0])
        return embedding_dim

In [13]:
# Ejemplo de uso
embedding_processor = EmbeddingProcessor()
df_with_embeddings = embedding_processor.process_dataframe(processor.df, 'cleaned_text')

2024-07-29 18:51:11,249 - INFO - Use pytorch device_name: mps
2024-07-29 18:51:11,249 - INFO - Load pretrained SentenceTransformer: all-MiniLM-L6-v2
2024-07-29 18:51:13,140 - INFO - Modelo all-MiniLM-L6-v2 cargado exitosamente.
2024-07-29 18:51:13,142 - INFO - Generando embeddings...


Batches:   0%|          | 0/30 [00:00<?, ?it/s]

2024-07-29 18:51:15,407 - INFO - Embeddings generados exitosamente.
2024-07-29 18:51:15,415 - INFO - DataFrame procesado. Embeddings combinados en una sola columna.


In [14]:
print(f"Número de características de embedding: {df_with_embeddings.filter(like='embedding_').shape[1]}")

Número de características de embedding: 0


In [15]:
df_with_embeddings.head()

Unnamed: 0,S. No.,cleaned_text,numeric_label,combined_embeddings
0,1,rofl true name,0,"[-0.044475845992565155, -0.04105318337678909, ..."
1,2,guy bitching acted like id interested buying s...,0,"[-0.055322881788015366, -0.020870674401521683,..."
2,3,pity mood soany suggestions,0,"[-0.054527826607227325, 0.035373345017433167, ..."
3,4,b going esplanade fr home,0,"[0.027759529650211334, 0.011844214983284473, 0..."
4,5,nd time tried contact u u pound prize claim ea...,1,"[-0.0534316711127758, 0.03708187863230705, 0.0..."


In [16]:
# Guardar el DataFrame con embeddings
df_with_embeddings.to_csv("../data/processed_data/embedded_sms_data.csv", index=False)

In [17]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset, WeightedRandomSampler
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix
from imblearn.over_sampling import SMOTE
import logging
from typing import Tuple, Dict

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class AutoencoderModel(nn.Module):
    def __init__(self, input_dim: int, encoding_dim: int = 32):
        super(AutoencoderModel, self).__init__()
        self.input_dim = input_dim
        self.encoding_dim = encoding_dim
        
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 256),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, encoding_dim)
        )
        
        self.decoder = nn.Sequential(
            nn.Linear(encoding_dim, 128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, 256),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(256, input_dim)
        )

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

In [18]:
class AutoencoderTrainer:
    def __init__(self, df: pd.DataFrame, embedding_col: str = 'combined_embeddings', 
                 label_col: str = 'numeric_label', test_size: float = 0.2, 
                 random_state: int = 42, batch_size: int = 32, encoding_dim: int = 32):
        self.df = df
        self.embedding_col = embedding_col
        self.label_col = label_col
        self.test_size = test_size
        self.random_state = random_state
        self.batch_size = batch_size
        self.encoding_dim = encoding_dim
        self.model = None
        self.train_loader = None
        self.test_loader = None
        self.X_test = None
        self.y_test = None

    def prepare_data(self):
        logger.info("Preparando datos para el autoencoder...")
        
        X = np.array(self.df[self.embedding_col].tolist())
        y = self.df[self.label_col].values
        
        # Aplicar SMOTE para manejar el desequilibrio de clases
        smote = SMOTE(random_state=self.random_state)
        X_resampled, y_resampled = smote.fit_resample(X, y)
        
        X_train, self.X_test, y_train, self.y_test = train_test_split(X_resampled, y_resampled, test_size=self.test_size, random_state=self.random_state)
        
        X_train_tensor = torch.FloatTensor(X_train)
        X_test_tensor = torch.FloatTensor(self.X_test)
        y_train_tensor = torch.FloatTensor(y_train)
        y_test_tensor = torch.FloatTensor(self.y_test)
        
        # Crear un WeightedRandomSampler para manejar el desequilibrio en el entrenamiento
        class_counts = np.bincount(y_train.astype(int))
        class_weights = 1. / class_counts
        sample_weights = class_weights[y_train.astype(int)]
        sampler = WeightedRandomSampler(sample_weights, len(sample_weights))
        
        self.train_loader = DataLoader(TensorDataset(X_train_tensor, y_train_tensor), batch_size=self.batch_size, sampler=sampler)
        self.test_loader = DataLoader(TensorDataset(X_test_tensor, y_test_tensor), batch_size=self.batch_size, shuffle=False)
        
        logger.info(f"Datos preparados. Tamaño del conjunto de entrenamiento: {len(X_train)}, Tamaño del conjunto de prueba: {len(self.X_test)}")
        
        return self.train_loader, self.test_loader

    def create_model(self):
        input_dim = len(self.df[self.embedding_col].iloc[0])
        self.model = AutoencoderModel(input_dim, self.encoding_dim)
        logger.info(f"Modelo creado con dimensión de entrada {input_dim} y dimensión de codificación {self.encoding_dim}")
        return self.model

    def train_model(self, num_epochs: int = 300, learning_rate: float = 0.001):
        if self.model is None or self.train_loader is None:
            raise ValueError("El modelo no ha sido creado o los datos no han sido preparados.")
        
        criterion = nn.MSELoss()
        optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
        
        logger.info("Iniciando entrenamiento del modelo...")
        for epoch in range(num_epochs):
            self.model.train()
            train_loss = 0.0
            for batch_features, _ in self.train_loader:
                optimizer.zero_grad()
                outputs = self.model(batch_features)
                loss = criterion(outputs, batch_features)
                loss.backward()
                optimizer.step()
                train_loss += loss.item()
            
            if (epoch + 1) % 20 == 0:
                logger.info(f'Epoch [{epoch+1}/{num_epochs}], Loss: {train_loss/len(self.train_loader):.4f}')
        
        logger.info("Entrenamiento completado.")

    def find_optimal_threshold(self, step: float = 0.001):
        all_losses = []
        all_labels = []
        
        self.model.eval()
        with torch.no_grad():
            for batch_features, batch_labels in self.test_loader:
                outputs = self.model(batch_features)
                loss = nn.MSELoss(reduction='none')(outputs, batch_features)
                loss = loss.mean(axis=1)
                all_losses.extend(loss.tolist())
                all_labels.extend(batch_labels.tolist())
        
        thresholds = np.arange(min(all_losses), max(all_losses), step)
        f1_scores = []
        
        for threshold in thresholds:
            predictions = [1 if loss < threshold else 0 for loss in all_losses]
            f1 = f1_score(all_labels, predictions)
            f1_scores.append(f1)
        
        optimal_idx = np.argmax(f1_scores)
        optimal_threshold = thresholds[optimal_idx]
        
        return optimal_threshold

    def evaluate_model(self, threshold: float = None):
        if self.model is None or self.test_loader is None:
            raise ValueError("El modelo no ha sido creado o entrenado, o los datos de prueba no han sido preparados.")
        
        if threshold is None:
            threshold = self.find_optimal_threshold()
        
        self.model.eval()
        all_losses = []
        all_labels = []
        
        with torch.no_grad():
            for batch_features, batch_labels in self.test_loader:
                outputs = self.model(batch_features)
                loss = nn.MSELoss(reduction='none')(outputs, batch_features)
                loss = loss.mean(axis=1)
                all_losses.extend(loss.tolist())
                all_labels.extend(batch_labels.tolist())
        
        all_predictions = [1 if loss < threshold else 0 for loss in all_losses]
        
        precision = precision_score(all_labels, all_predictions)
        recall = recall_score(all_labels, all_predictions)
        f1 = f1_score(all_labels, all_predictions)
        auc_roc = roc_auc_score(all_labels, [-loss for loss in all_losses])
        cm = confusion_matrix(all_labels, all_predictions)
        
        logger.info(f"Evaluación con umbral óptimo: {threshold:.4f}")
        logger.info(f"Precisión: {precision:.4f}")
        logger.info(f"Recall: {recall:.4f}")
        logger.info(f"F1-Score: {f1:.4f}")
        logger.info(f"AUC-ROC: {auc_roc:.4f}")
        logger.info(f"Matriz de Confusión:\n{cm}")
        
        return {
            "threshold": threshold,
            "precision": precision,
            "recall": recall,
            "f1_score": f1,
            "auc_roc": auc_roc,
            "confusion_matrix": cm
        }

In [19]:
trainer = AutoencoderTrainer(df_with_embeddings)
train_loader, test_loader = trainer.prepare_data()

2024-07-29 18:51:35,461 - INFO - Preparando datos para el autoencoder...
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
2024-07-29 18:51:35,538 - INFO - Datos preparados. Tamaño del conjunto de entrenamiento: 1336, Tamaño del conjunto de prueba: 334


In [20]:
model = trainer.create_model()
trainer.train_model(num_epochs=300)

2024-07-29 18:51:39,476 - INFO - Modelo creado con dimensión de entrada 384 y dimensión de codificación 32
2024-07-29 18:51:39,477 - INFO - Iniciando entrenamiento del modelo...
2024-07-29 18:51:40,907 - INFO - Epoch [20/300], Loss: 0.0013
2024-07-29 18:51:41,893 - INFO - Epoch [40/300], Loss: 0.0011
2024-07-29 18:51:42,847 - INFO - Epoch [60/300], Loss: 0.0010
2024-07-29 18:51:43,815 - INFO - Epoch [80/300], Loss: 0.0010
2024-07-29 18:51:44,888 - INFO - Epoch [100/300], Loss: 0.0010
2024-07-29 18:51:45,991 - INFO - Epoch [120/300], Loss: 0.0010
2024-07-29 18:51:47,058 - INFO - Epoch [140/300], Loss: 0.0010
2024-07-29 18:51:48,192 - INFO - Epoch [160/300], Loss: 0.0009
2024-07-29 18:51:49,270 - INFO - Epoch [180/300], Loss: 0.0010
2024-07-29 18:51:50,262 - INFO - Epoch [200/300], Loss: 0.0009
2024-07-29 18:51:51,222 - INFO - Epoch [220/300], Loss: 0.0009
2024-07-29 18:51:52,198 - INFO - Epoch [240/300], Loss: 0.0009
2024-07-29 18:51:53,195 - INFO - Epoch [260/300], Loss: 0.0009
2024-07

In [21]:
metrics = trainer.evaluate_model()
print(metrics)

2024-07-29 18:51:56,429 - INFO - Evaluación con umbral óptimo: 0.0010
2024-07-29 18:51:56,429 - INFO - Precisión: 0.9634
2024-07-29 18:51:56,429 - INFO - Recall: 0.9753
2024-07-29 18:51:56,430 - INFO - F1-Score: 0.9693
2024-07-29 18:51:56,430 - INFO - AUC-ROC: 0.9966
2024-07-29 18:51:56,430 - INFO - Matriz de Confusión:
[[166   6]
 [  4 158]]


{'threshold': np.float64(0.0010215527488762746), 'precision': np.float64(0.9634146341463414), 'recall': np.float64(0.9753086419753086), 'f1_score': np.float64(0.9693251533742331), 'auc_roc': np.float64(0.9965546942291128), 'confusion_matrix': array([[166,   6],
       [  4, 158]])}


In [22]:
# Guardar el modelo

torch.save({
    'state_dict': model.state_dict(),
    'input_dim': model.input_dim,
    'encoding_dim': model.encoding_dim
}, "../models/autoencoder_model.pth")

In [26]:
import torch
import numpy as np
from typing import List, Union
from sentence_transformers import SentenceTransformer

class PhishingPredictor:
    def __init__(self, model_path: str, embedding_model: str = 'all-MiniLM-L6-v2', threshold: float = 0.0010):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model = self.load_model(model_path)
        self.embedding_model = SentenceTransformer(embedding_model)
        self.threshold = threshold

    def load_model(self, model_path: str):
        checkpoint = torch.load(model_path, map_location=self.device)
        input_dim = checkpoint['input_dim']
        encoding_dim = checkpoint['encoding_dim']
        model = AutoencoderModel(input_dim, encoding_dim)
        model.load_state_dict(checkpoint['state_dict'])
        model.eval()
        return model

    def preprocess_text(self, text: str) -> str:
        return text.lower()  # Este es un ejemplo simple, ajusta según tus necesidades

    def generate_embedding(self, text: str) -> np.ndarray:
        return self.embedding_model.encode([text])[0]

    def predict(self, text: Union[str, List[str]]) -> Union[bool, List[bool]]:
        if isinstance(text, str):
            text = [text]
        
        processed_texts = [self.preprocess_text(t) for t in text]
        embeddings = [self.generate_embedding(t) for t in processed_texts]
        
        with torch.no_grad():
            input_tensor = torch.FloatTensor(embeddings).to(self.device)
            outputs = self.model(input_tensor)
            losses = torch.mean(torch.pow(outputs - input_tensor, 2), dim=1)
        
        predictions = [loss.item() < self.threshold for loss in losses]
        
        return predictions[0] if len(predictions) == 1 else predictions

    def predict_with_confidence(self, text: Union[str, List[str]]) -> Union[dict, List[dict]]:
        if isinstance(text, str):
            text = [text]
        
        processed_texts = [self.preprocess_text(t) for t in text]
        embeddings = [self.generate_embedding(t) for t in processed_texts]
        
        with torch.no_grad():
            input_tensor = torch.FloatTensor(embeddings).to(self.device)
            outputs = self.model(input_tensor)
            losses = torch.mean(torch.pow(outputs - input_tensor, 2), dim=1)
        
        results = []
        for loss in losses:
            loss_value = loss.item()
            is_phishing = loss_value < self.threshold
            confidence = 1 - (loss_value / self.threshold) if is_phishing else (loss_value / self.threshold) - 1
            confidence = max(min(confidence, 1), 0)  # Clip confidence to [0, 1]
            results.append({
                "is_phishing": is_phishing,
                "confidence": confidence
            })
        
        return results[0] if len(results) == 1 else results

In [27]:
predictor = PhishingPredictor("../models/autoencoder_model.pth")

  checkpoint = torch.load(model_path, map_location=self.device)
2024-07-29 18:53:46,545 - INFO - Use pytorch device_name: mps
2024-07-29 18:53:46,545 - INFO - Load pretrained SentenceTransformer: all-MiniLM-L6-v2


In [28]:
# Predicción simple
result = predictor.predict("Este es un mensaje de prueba")
print(f"¿Es phishing? {result}")

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

¿Es phishing? False


  input_tensor = torch.FloatTensor(embeddings).to(self.device)


In [29]:
# Predicción con confianza
result_with_confidence = predictor.predict_with_confidence("Este es un mensaje de prueba")
print(f"Predicción: {result_with_confidence['is_phishing']}, Confianza: {result_with_confidence['confidence']:.2f}")

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Predicción: False, Confianza: 1.00


In [29]:
import gradio as gr
import torch
import numpy as np
from typing import List, Union
from sentence_transformers import SentenceTransformer


class PhishingDetectorInterface:
    def __init__(self, model_path: str):
        self.predictor = PhishingPredictor(model_path)

    def predict_phishing(self, message: str) -> tuple:
        result = self.predictor.predict_with_confidence(message)
        
        is_phishing = result['is_phishing']
        confidence = result['confidence']
        
        # Calcular porcentajes
        phishing_percentage = confidence * 100 if is_phishing else (1 - confidence) * 100
        not_phishing_percentage = 100 - phishing_percentage

        # Preparar el resultado para Gradio
        if is_phishing:
            label = "Phishing"
            color = "#FF0000"  # Rojo para phishing
        else:
            label = "No Phishing"
            color = "#00FF00"  # Verde para no phishing
        
        return (
            label,
            f"Phishing: {phishing_percentage:.2f}%",
            f"Not Phishing: {not_phishing_percentage:.2f}%",
            color
        )

    def launch(self):
        iface = gr.Interface(
            fn=self.predict_phishing,
            inputs=gr.Textbox(lines=5, label="Enter the message here"),
            outputs=[
                gr.Textbox(label="Prediction"),
                gr.Textbox(label="Phishing Probability"),
                gr.Textbox(label="Not Phishing Probability"),
                gr.ColorPicker(label="Indicator")
            ],
            title="Phishing Detection using Autoencoder",
            description="Enter a message to check if it's phishing or not."
        )
        iface.launch()

In [30]:
# Uso

model_path = "../models/autoencoder_model.pth"
interface = PhishingDetectorInterface(model_path)
interface.launch()

  model = torch.load(model_path, map_location=self.device)
2024-07-29 18:00:55,586 - INFO - Use pytorch device_name: mps
2024-07-29 18:00:55,587 - INFO - Load pretrained SentenceTransformer: all-MiniLM-L6-v2
2024-07-29 18:00:57,352 - INFO - HTTP Request: GET http://127.0.0.1:7862/startup-events "HTTP/1.1 200 OK"
2024-07-29 18:00:57,359 - INFO - HTTP Request: HEAD http://127.0.0.1:7862/ "HTTP/1.1 200 OK"


Running on local URL:  http://127.0.0.1:7862

To create a public link, set `share=True` in `launch()`.
