In [4]:
#load and clean

def load_and_clean(path):
    df = pd.read_csv(path)
    df = df[['Category','Message']]
    df.index = range( df.shape[0])
    le = LabelEncoder()
    df['Category_encoded'] = le.fit_transform(df['Category'])
    df = df.drop(columns=['Category'])
    return df

def tokenize_and_pad(df,max_vocab_ratio=0.15,max_length = 100):
    total_word_count = df['Message'].apply(lambda x : len(x.split(' '))).sum()
    max_vocab_size = math.floor(total_word_count*max_vocab_ratio)
    tokenizer = Tokenizer(num_words=max_vocab_size, oov_token="<oov>" , filters='!@#$%^&*()_+{;}|:"<>?', lower=True)
    tokenizer.fit_on_texts(df['Message'])
    sequences = tokenizer.texts_to_sequences(df['Message'])
    X = pad_sequences(sequences, maxlen=max_length, padding='post', truncating='post')
    y = df['Category_encoded'].values
    return X, y, tokenizer, max_vocab_size
    
    
    
    
    

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, LSTM, Dense, Dropout, SpatialDropout1D
import tensorflow as tf
def build_lstm_model_1_layer(vocabulary_size,max_vocab_size,embedding_dim,max_length,lstm_units):
    model = Sequential([
        Embedding(input_dim = min(vocabulary_size,max_vocab_size),
                    output_dim = embedding_dim,
                    input_length = max_length),
        SpatialDropout1D(0.2),
        LSTM(lstm_units,
               dropout = 0.2,
               recurrent_dropout=0.2,
               return_sequences=True),
        Dense(16,activation='relu'),
        Dense(1,activation='sigmoid')    
    ])
    model.compile(
    optimizer='adam',
    loss = 'binary_crossentropy',
    metrics=['accuracy']
    )
    tf.keras.backend.clear_session()
    model.build(input_shape=(None, max_length))
    
    return model

def build_lstm_model_2_layer(vocabulary_size,max_vocab_size,embedding_dim,max_length,lstm_units):
    model = Sequential([
        Embedding(input_dim = min(vocabulary_size,max_vocab_size),
                    output_dim = embedding_dim,
                    input_length = max_length),
        SpatialDropout1D(0.2),
        
        LSTM(lstm_units,
               dropout = 0.2,
               recurrent_dropout=0.2,
               return_sequences=True),
        
        LSTM(lstm_units//2,
               dropout = 0.2,
               recurrent_dropout=0.2
               ),
        Dense(16,activation='relu'),
        Dense(1,activation='sigmoid')    
    ])
    model.compile(
    optimizer='adam',
    loss = 'binary_crossentropy',
    metrics=['accuracy']
    )
    tf.keras.backend.clear_session()
    model.build(input_shape=(None, max_length))
    
    return model
    
    

In [None]:
#training utilities

from tensorflow.keras.callbacks import EarlyStopping,ReduceLROnPlateau

def get_callbacks():
    early_stopping = EarlyStopping(
    monitor='val_loss',
    min_delta = 0, # minimum change in the monitored quality to qualify as improvement
    patience = 40, # number of epochs with no improvements after which training will be stopped
    verbose = 1, #1 displays the message when the callback takes an action
    mode = 'auto',
    #One of {"auto", "min", "max"}.
    # In min mode, training will stop when the quantity monitored has stopped decreasing;
    # in "max" mode it will stop when the quantity monitored has stopped increasing;
    #in "auto" mode, the direction is automatically inferred from the name of the monitored quantity. Defaults to "auto".
    restore_best_weights =True
    )
    
    reduce_lr = ReduceLROnPlateau(
    monitor='val_loss',
    factor = 0.2, #factor by which the learning rate will be reduced lr*factor
    patience =10, #num,ber of epochs with no improvement after which learning rate will be reduced
    verbose = 1, # for updaing the messages
    mode = 'auto',
    min_lr = 0.001 #lowerbound for learning rate    
)
    callbacks = [early_stopping,reduce_lr]
    
    return callbacks


    
    

In [None]:
from imblearn.over_sampling import SMOTE
from sklearn.utils.class_weight import compute_class_weight

def get_class_weights(y):
    class_weights = compute_class_weight('balanced', classes=np.unique(y), y=y)
    return dict(enumerate(class_weights))

def apply_smote(X, y):
    smote = SMOTE(random_state=42)
    X_res, y_res = smote.fit_resample(X, y)
    return X_res, y_res
def train_model(
    model, X_train, y_train, X_val, y_val, callbacks,
    batch_size=32, epochs=50, use_smote=False, use_class_weight=False
):
    
    if use_smote:
        X_train, y_train = apply_smote(X_train, y_train)
        class_weight = None  
    elif use_class_weight:
        class_weight = get_class_weights(y_train)
    else:
        class_weight = None
        
    history = model.fit(
    X_train, y_train,
    batch_size=batch_size,
    epochs=epochs,
    validation_data=(X_val, y_val),
    callbacks=callbacks,
    verbose=1,
    class_weight = class_weight
)
    return history

In [None]:
from sklearn.metrics import classification_report, confusion_matrix,accuracy_score

def evaluate_model(model,X_test,y_test,threshold):
    y_pred_prob = model.predict(X_test)
    y_pred = (y_pred_prob>threshold).astype(int).flatten()
    print("\nTest accuracy:", accuracy_score(y_test, y_pred))
    print("\nClassification report:\n", classification_report(y_test, y_pred, target_names=['Ham', 'Spam']))
    print("\nConfusion matrix:\n", confusion_matrix(y_test, y_pred))
    return y_pred, y_pred_prob

In [None]:
if __name__ == "__main__":
    # Load and preprocess data
    df = load_and_clean('archive/SPAM text message 20170820 - Data.csv')
    X, y, tokenizer, max_vocab_size = tokenize_and_pad(df)
    max_length = X.shape[1]
    vocab_size = min(len(tokenizer.word_index) + 1, max_vocab_size)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    # Compute class weights
    class_weights = compute_class_weight('balanced', classes=np.unique(y_train), y=y_train)
    class_weight_dict = dict(enumerate(class_weights))
    
    # Build and train model
    model = build_lstm_model_1_layer(vocab_size, max_length)
    callbacks = get_callbacks()
    history = train_model(model, X_train, y_train, X_test, y_test, callbacks, class_weight=class_weight_dict)
    
    # Evaluate
    evaluate_model(model, X_test, y_test)