In [1]:
import pandas as pd
from keras import Model
from keras.src.layers import Bidirectional
from keras.src.layers import Embedding, LSTM, Dense
from keras import Sequential
from sklearn.model_selection import train_test_split
from typing import Tuple
from pathlib import Path
from app.utils import create_pad_sequences, clean_text, save_model, create_tokenizer, RANDOM_SEED, BODY_COL_NAME, LABEL_COL_NAME, save_tokenizer
from keras.src.legacy.preprocessing.text import Tokenizer

In [2]:
def read_csv(path: str, body_column_name=BODY_COL_NAME, label_column_name=LABEL_COL_NAME) -> pd.DataFrame:
    df = pd.read_csv(path, quotechar='"', delimiter=',', usecols=[body_column_name, label_column_name])
    df = df.reindex(columns=[body_column_name, label_column_name])
    df.columns = [BODY_COL_NAME, LABEL_COL_NAME]
    return df

In [3]:
def create_model() -> Model:
    max_words = 10000
    model = Sequential()
    model.add(Embedding(input_dim=max_words + 1, output_dim=128))
    model.add(Bidirectional(LSTM(units=64)))
    model.add(Dense(units=1, activation='sigmoid'))

    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
    print(model.summary())
    return model



In [4]:
def train_model(
        model: Model,
        X_train_pad: pd.DataFrame,
        y_train: pd.DataFrame,
        X_test_pad: pd.DataFrame,
        y_test: pd.DataFrame,
        epochs=5,
):
    history = model.fit(
        X_train_pad,
        y_train,
        epochs=epochs,
        batch_size=32,
        validation_data=(X_test_pad, y_test)
    )

In [5]:
def validate_model(model: Model, X_test: pd.DataFrame, y_test: pd.DataFrame):
    loss, accuracy = model.evaluate(X_test, y_test)
    print(f'Test Accuracy: {accuracy:.2f}')


In [6]:
def train_test_prepare_data(df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame, Tokenizer]:
    df[BODY_COL_NAME] = df[BODY_COL_NAME].apply(clean_text)
    df = df.loc[df[BODY_COL_NAME].str.len() > 0]
    X = df[BODY_COL_NAME]
    y = df[LABEL_COL_NAME].values.astype(int)

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=RANDOM_SEED)
    tokenizer = create_tokenizer(X_train)
    X_train_pad = create_pad_sequences(tokenizer, X_train)
    X_test_pad = create_pad_sequences(tokenizer, X_test)

    return X_train_pad, X_test_pad, y_train, y_test, tokenizer


In [7]:
data = [
    ('data/original/spam.csv', 'text', LABEL_COL_NAME),
    ('data/original/fishing.csv', BODY_COL_NAME, LABEL_COL_NAME),
    ('data/original/fraud.csv', 'Body', 'Label'),
]

for file_path, body_col_name, label_col_name in data:
    df = read_csv(file_path, body_column_name=body_col_name, label_column_name=label_col_name)
    print(file_path)
    print(df)
    X_train_pad, X_test_pad, y_train, y_test, tokenizer = train_test_prepare_data(df)
    model = create_model()
    train_model(model, X_train_pad, y_train, X_test_pad, y_test, epochs=3)
    validate_model(model, X_test_pad, y_test)
    model_name = Path(file_path).name.replace('.csv', '')
    save_model(model, f'models/{model_name}')
    save_tokenizer(tokenizer, f'tokenizers/{model_name}')

