In [48]:
# Common imports
import numpy as np
import tensorflow as tf
from tensorflow import keras

# Data processing and visualization imports
import string
import pandas as pd
import plotly.express as px
import tensorflow.data as tfd
import matplotlib.pyplot as plt
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

# Model building imports
from sklearn.utils import class_weight
from tensorflow.keras import callbacks
from tensorflow.keras import Model, layers

In [49]:
# Define hyperparameters
num_heads = 4
embed_dim = 256
ff_dim = 128
vocab_size = 10000
max_seq_len = 40

# Set constants
learning_rate = 1e-3
epochs = 100
batch_size = 32

# Define training callbacks
callbacks = [
    keras.callbacks.EarlyStopping(patience=3, restore_best_weights=True),
    keras.callbacks.ModelCheckpoint("SpamDetector.h5", save_best_only=True)
]

In [50]:
# Set up random seed for reproducibility
random_seed = 123
np.random.seed(random_seed)
tf.random.set_seed(random_seed)

In [51]:
# Specify the path to the SPAM text message dataset
data_path = "/Python notebook/dataset/spam.csv"

# Load the dataset using the load_data function
data_frame = pd.read_csv(data_path, encoding='Windows-1252')

# Print the first five rows of the dataset
data_frame.head()

Unnamed: 0,Category,Message,Unnamed: 2,Unnamed: 3,Unnamed: 4
0,ham,"Go until jurong point, crazy.. Available only ...",,,
1,ham,Ok lar... Joking wif u oni...,,,
2,spam,Free entry in 2 a wkly comp to win FA Cup fina...,,,
3,ham,U dun say so early hor... U c already then say...,,,
4,ham,"Nah I don't think he goes to usf, he lives aro...",,,


In [52]:
# Get the counts of each class and their names
class_dis = data_frame.Category.value_counts()
class_names = class_dis.index

# Create the Pie Chart
fig = px.pie(names=class_names,
             values=class_dis,
             color=class_names,
             hole=0.4,
             labels={'value': 'Count', 'names': 'Class'},
             title='Class Distribution of Spam Text Messages')

# Customize the layout
fig.update_layout(
    margin=dict(l=10, r=10, t=60, b=10),
    legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
)

# Show the plot
fig.show()

In [53]:
# Data set size
N_SAMPLES = len(data_frame)

print(f"Total Number of Samples : {N_SAMPLES}")

Total Number of Samples : 5572


In [54]:
max_len = max([len(text) for text in data_frame.Message])
print(f"Maximum Length Of Input Sequence(Chars) : {max_len}")

Maximum Length Of Input Sequence(Chars) : 910


In [55]:
# Extract X and y from the data frame
X = data_frame['Message'].tolist()
y = data_frame['Category'].tolist()


# Initialize label encoder
label_encoder = LabelEncoder()
y = label_encoder.fit_transform(y)

# Print the first 5 elements of X and y
print(f'X[:5]: \n{X[:5]}\n')
print(f'y[:5]: {y[:5]}\n')
print(f"Label Mapping : {label_encoder.inverse_transform(y[:5])}")

X[:5]: 
['Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...', 'Ok lar... Joking wif u oni...', "Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's", 'U dun say so early hor... U c already then say...', "Nah I don't think he goes to usf, he lives around here though"]

y[:5]: [0 0 1 0 0]

Label Mapping : ['ham' 'ham' 'spam' 'ham' 'ham']


# Preprocessing Text

In [56]:
from sklearn.metrics.pairwise import cosine_similarity
import nltk
import numpy as np
import string
from gensim.models import Word2Vec
from gensim.test.utils import datapath
from gensim import utils
# Loading the model
file_path = '/Python notebook/Word2Vec/model.bin'
model = Word2Vec.load(file_path)
print("Load new model completed")

class MyCorpus:
    """An iterator that yields sentences (lists of str)."""

    def __iter__(self):
        corpus_path = datapath('lee_background.cor')
        for line in open(corpus_path):
            # assume there's one document per line, tokens separated by whitespace
            yield utils.simple_preprocess(line)
sentences = MyCorpus()
model = Word2Vec(sentences, vector_size=100, window=5, min_count=1, workers=4)
# Load the Word2Vec model
model = Word2Vec.load(file_path)

# Download NLTK stopwords if you haven't done so
downloadPath ="/Python notebook/nltk_data"
nltk.data.path.append(downloadPath)  # Or any preferred path
nltk.download('stopwords', download_dir=downloadPath)
nltk.download('punkt', download_dir=downloadPath)
nltk.download('punkt_tab', download_dir=downloadPath)
nltk.download('wordnet')
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import PorterStemmer
from nltk.stem import WordNetLemmatizer

# Set of stopwords
stop_words = set(stopwords.words("english"))

def preprocess_text(text):
    # Lowercase the text
    text = text.lower()
    
    # Remove punctuation
    text = text.translate(str.maketrans("", "", string.punctuation))
    
    # Tokenize the text
    tokens = word_tokenize(text)
    
    # # Create stemmer
    # stemmer = PorterStemmer()

    # # Stemming
    # tokens = [stemmer.stem(word) for word in tokens]
    
    # Create lemmatizer
    lemmatizer = WordNetLemmatizer()
    # Lemmatization
    tokens = [lemmatizer.lemmatize(word, pos="v") for word in tokens]  # pos="v" để xác định động từ

    # Remove stopwords
    tokens = [word for word in tokens if word not in stop_words]
    
    processword=""
    for word in tokens:
        processword+=word+" "
    print(processword)
    return processword

def get_sentence_vector(sentence):
    # Preprocess the user input
    tokens = preprocess_text(sentence)
    
    word_vectors = []
    for word in tokens:
        if word in model.wv.key_to_index:
            # Print each word and its vector
            # print(f"Word: {word} \nVector: {model.wv[word]}\n")
            word_vectors.append(model.wv[word])
        # else:
        #     print(f"Word '{word}' not found in the vocabulary.")
    
    # If no word vectors were found, return None or a zero vector
    if not word_vectors:
        return np.zeros(model.vector_size)
    
    # Calculate the mean of all word vectors to get a sentence vector
    sentence_vector = np.mean(word_vectors, axis=0)
    print(f"sen vector size {sentence_vector.size}")
    return sentence_vector



Load new model completed


[nltk_data] Downloading package stopwords to /Python
[nltk_data]     notebook/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to /Python notebook/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package punkt_tab to /Python
[nltk_data]     notebook/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\BaLong\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


In [57]:
import tensorflow as tf
import string
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import word_tokenize

# Initialize the WordNetLemmatizer
lemmatizer = WordNetLemmatizer()

# Define the preprocessing function
def preprocess_text_with_lemmatization(text: str) -> str:
    """
    Preprocess the text by removing punctuation, lowercasing, stripping whitespace,
    and applying WordNetLemmatizer.
    """
    # Remove punctuation
    text = text.translate(str.maketrans("", "", string.punctuation))

    # Lowercase the text
    text = text.lower()

    # Tokenize and lemmatize
    tokens = word_tokenize(text)
    lemmatized_text = " ".join([lemmatizer.lemmatize(token) for token in tokens])

    return lemmatized_text

# Apply preprocessing to the dataset before feeding into TextVectorization
X_preprocessed = [preprocess_text_with_lemmatization(x) for x in X]

# Create a TextVectorization layer
text_vectorizer = tf.keras.layers.TextVectorization(
    max_tokens=vocab_size,                       # Maximum vocabulary size
    output_sequence_length=max_seq_len,          # Maximum sequence length
    standardize=None,                            # Disable built-in standardization
    output_mode='int'                            # Output integer-encoded sequences

)

# Adapt the TextVectorization layer to the preprocessed data
text_vectorizer.adapt(X_preprocessed)


In [58]:
# Compute class weights
class_weights = class_weight.compute_class_weight(class_weight='balanced', classes=data_frame.Category.unique(), y=label_encoder.inverse_transform(y))
class_weights = {number: weight for number, weight in enumerate(class_weights)}
# Show
print(f"Associated class weights: {class_weights}")

Associated class weights: {0: 0.5774093264248704, 1: 3.7295850066934406}


In [59]:
for _ in range(5):
    # Send a text to randomly.
    text_temp = X[np.random.randint(N_SAMPLES)]

    # Apply text to vectorization.
    text_vec_temp = text_vectorizer(text_temp)

    # Show the results
    print(f"Original Text: {text_temp}")
    print(f"Vectorized Text: {text_vec_temp}\n")

Original Text: I sent your maga that money yesterday oh.
Vectorized Text: [   1  198   15 2396   20  229  510    1    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0]

Original Text: No need lar. Jus testing e phone card. Dunno network not gd i thk. Me waiting 4 my sis 2 finish bathing so i can bathe. Dun disturb u liao u cleaning ur room.
Vectorized Text: [   1   73    1    1 2929  161   95    1    1  400   26  690    3    1
    1  243   44   13    1   23  282 2054   25    3   29    1    1 1084
    7  353    7 2017   33    1    0    0    0    0    0    0]

Original Text: Is it ok if I stay the night here? Xavier has a sleeping bag and I'm getting tired
Vectorized Text: [   1    9   48   36    1  486    6  117    1    1    1    5  646 1201
    8    1  259  828    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0]

Origi

In [60]:
# Get the vocabulary
VOCAB = text_vectorizer.get_vocabulary()

# Let's have a look at the tokens present in the vocabulary
print(f"Vocabulary size: {len(VOCAB)}")
print(f"Vocabulary: {VOCAB[150:200]}")

Vocabulary size: 9015
Vocabulary: ['let', 'tomorrow', 'already', 'after', 'ask', 'yes', 'yeah', 'said', 'really', 'min', 'doing', 'e', 'babe', '1', 'were', 'co', 'amp', 'them', 'life', 'meet', 'why', 'didnt', 'morning', 'last', 'very', 'service', 'miss', 'would', 'win', 'year', 'thanks', 'ive', 'find', 'cash', 'won', 'tone', 'lol', 'feel', 'anything', 'every', 'sure', 'pick', 'k', 'also', 'keep', 'contact', 'care', 'something', 'sent', 'over']


# Data Splitting

In [61]:
# Split the data into training and testing
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42, shuffle=True)

# Apply the Text Vectorization
X_train = text_vectorizer(X_train)
X_test = text_vectorizer(X_test)

# One Hot Vectors
Xoh_train = tf.one_hot(X_train, depth=10000)
Xoh_test  = tf.one_hot(X_test, depth=10000)
print(X_train)

tf.Tensor(
[[  1  19 326 ...   0   0   0]
 [  1   1   1 ...   0   0   0]
 [  1   1   3 ...   0   0   0]
 ...
 [  1   1   1 ...   0   0   0]
 [  1 475  11 ...   0   0   0]
 [  1  27  39 ...   0   0   0]], shape=(4457, 40), dtype=int64)


# Transformer Network

In [62]:
class TokenAndPositionalEmbedding(layers.Layer):
    
    def __init__(self, embedding_dims, vocab_size, seq_len, **kwargs):
        super(TokenAndPositionalEmbedding, self).__init__(**kwargs)
        
        # Initialize parameters
        self.seq_len = seq_len
        self.vocab_size = vocab_size
        self.embedding_dims = embedding_dims
        self.embed_scale = tf.math.sqrt(tf.cast(embedding_dims, tf.float32))
        
        # Define layers
        self.token_embedding = layers.Embedding(
            input_dim=vocab_size, 
            output_dim=embedding_dims,
            name="token_embedding"
        )
        
        self.positional_embedding = layers.Embedding(
            input_dim=seq_len, 
            output_dim=embedding_dims,
            name="positional_embedding"
        )
    
    def call(self, inputs):
        seq_len = tf.shape(inputs)[1]
        
        # Token Embedding
        token_embedding = self.token_embedding(inputs)
        token_embedding *= self.embed_scale
        
        # Positional Embedding
        positions = tf.range(start=0, limit=seq_len, delta=1)
        positional_embedding = self.positional_embedding(positions)
        
        # Add Token and Positional Embedding
        embeddings = token_embedding + positional_embedding
        
        return embeddings
        
    
    def get_config(self):
        config = super(TokenAndPositionalEmbedding, self).get_config()
        config.update({
            'embedding_dims': self.embedding_dims,
            'vocab_size': self.vocab_size,
            'seq_len': self.seq_len,
        })
        return config

In [63]:
# Let's look what the layer do.
temp_embeds = TokenAndPositionalEmbedding(embed_dim, vocab_size, max_seq_len)(X_train[:1])
temp_embeds

<tf.Tensor: shape=(1, 40, 256), dtype=float32, numpy=
array([[[ 0.3513416 ,  0.5337459 ,  0.46888822, ...,  0.05251884,
         -0.7375846 , -0.5565266 ],
        [-0.7389438 ,  0.15139118, -0.4617043 , ...,  0.53492385,
          0.64967555, -0.7062983 ],
        [-0.7125268 , -0.4429167 ,  0.18846014, ..., -0.3529785 ,
          0.36600745, -0.0687326 ],
        ...,
        [-0.11747973,  0.27032214, -0.10458586, ...,  0.68665665,
         -0.2731782 , -0.12933066],
        [-0.15750611,  0.21578164, -0.18685976, ...,  0.7183333 ,
         -0.25236303, -0.10842689],
        [-0.13502654,  0.22106877, -0.17577901, ...,  0.75590146,
         -0.19802114, -0.06136565]]], dtype=float32)>

In [64]:
class TransformerLayer(layers.Layer):
    
    def __init__(self, num_heads: int, dropout_rate: float, embedding_dims: int, ff_dim: int, **kwargs):
        super(TransformerLayer, self).__init__(**kwargs)
        
        # Initialize Parameters
        self.num_heads = num_heads
        self.dropout_rate = dropout_rate
        self.embedding_dims = embedding_dims
        self.ff_dim = ff_dim
        
        # Initialize Layers
        self.mha = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embedding_dims, dropout=dropout_rate)
        self.ln1 = layers.LayerNormalization(epsilon=1e-6)
        
        self.ffn = keras.Sequential([
            layers.Dense(ff_dim, activation='relu', kernel_initializer='he_normal'),
            layers.Dense(embedding_dims)
        ])
        self.ln2 = layers.LayerNormalization(epsilon=1e-6)
    
    def call(self, inputs):
        """Forward pass of the Transformer Layer.
        
        Args:
            inputs: Tensor with shape `(batch_size, seq_len, embedding_dims)` representing the input sequence.
        
        Returns:
            Tensor with shape `(batch_size, seq_len, embedding_dims)` representing the output sequence after applying the Transformer Layer.
        """
        
        # Multi-Head Attention
        attention = self.mha(inputs, inputs, inputs)
        
        # Layer Normalization and Residual Connection
        normalized1 = self.ln1(attention + inputs)
        
        # Feedforward Network
        ffn_out = self.ffn(normalized1)
        
        # Layer Normalization and Residual Connection
        normalized2 = self.ln2(ffn_out + normalized1)
        
        return normalized2
    
    def get_config(self):
        """Get the configuration of the Transformer Layer.
        
        Returns:
            Dictionary with the configuration of the layer.
        """
        config = super(TransformerLayer, self).get_config()
        config.update({
            "num_heads": self.num_heads,
            "dropout_rate": self.dropout_rate,
            "embedding_dims": self.embedding_dims,
            "ff_dim": self.ff_dim
        })
        return config

In [65]:
# Transformer layers execution
TransformerLayer(num_heads=num_heads, embedding_dims=embed_dim, ff_dim=ff_dim, dropout_rate=0.1)(temp_embeds)

<tf.Tensor: shape=(1, 40, 256), dtype=float32, numpy=
array([[[-0.433056  ,  1.4694654 ,  0.47007835, ...,  0.22282219,
         -1.9480957 , -0.5588778 ],
        [-0.429655  ,  0.0070728 , -1.5603578 , ...,  0.5198499 ,
          0.62075424, -1.0693371 ],
        [-1.7511014 , -0.43700683,  1.127361  , ..., -0.51604235,
          0.8752314 ,  1.0657878 ],
        ...,
        [ 0.35437462,  0.4312159 , -0.61913925, ...,  1.847894  ,
         -0.59712225, -1.0866992 ],
        [ 0.27687258,  0.42263088, -0.7105943 , ...,  1.9161808 ,
         -0.59378994, -0.9386474 ],
        [ 0.279738  ,  0.39203042, -0.6551212 , ...,  1.8827033 ,
         -0.47296852, -0.96474624]]], dtype=float32)>

In [66]:
# Input layer
InputLayer = layers.Input(shape=(max_seq_len,), name="InputLayer")

# Embedding Layer
embeddings = TokenAndPositionalEmbedding(embed_dim, vocab_size, max_seq_len, name="EmbeddingLayer")(InputLayer)

# Transformer Layer
encodings = TransformerLayer(num_heads=num_heads, embedding_dims=embed_dim, ff_dim=ff_dim, dropout_rate=0.1, name="TransformerLayer")(embeddings)

# Classifier
gap = layers.GlobalAveragePooling1D(name="GlobalAveragePooling")(encodings)
drop = layers.Dropout(0.5, name="Dropout")(gap)
OutputLayer = layers.Dense(1, activation='sigmoid', name="OutputLayer")(drop)

# Model
model = keras.Model(InputLayer, OutputLayer, name="TransformerNet")

# Model Architecture Summary
model.summary()

Model: "TransformerNet"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 InputLayer (InputLayer)     [(None, 40)]              0         
                                                                 
 EmbeddingLayer (TokenAndPos  (None, 40, 256)          2570240   
 itionalEmbedding)                                               
                                                                 
 TransformerLayer (Transform  (None, 40, 256)          1118848   
 erLayer)                                                        
                                                                 
 GlobalAveragePooling (Globa  (None, 256)              0         
 lAveragePooling1D)                                              
                                                                 
 Dropout (Dropout)           (None, 256)               0         
                                                    

In [67]:
# Compile the Model
model.compile(
    loss='binary_crossentropy',
    optimizer='adam',
    metrics=[
        keras.metrics.BinaryAccuracy(name='accuracy'),
        keras.metrics.Precision(name='precision'),
        keras.metrics.Recall(name='recall'),
        keras.metrics.AUC(name='auc'),
    ]
)

# Train Model
history = model.fit(
    X_train, y_train,
    validation_split=0.1,
    batch_size=batch_size,
    epochs=epochs,
    callbacks=callbacks,
    class_weight=class_weights
)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100


In [68]:
# Evaluate model performance on test data
loss, acc, precision, recall, auc = model.evaluate(X_test, y_test, verbose=0)

# Show the model performance
print('Test loss      :', loss)
print('Test accuracy  :', acc*100)
print('Test precision :', precision*100)
print('Test recall    :', recall*100)
print('Test AUC       :', auc*100)

Test loss      : 0.11028499156236649
Test accuracy  : 96.95067405700684
Test precision : 91.97080135345459
Test recall    : 84.5637559890747
Test AUC       : 97.30779528617859


# Transformer Predictions

In [69]:
def decode_tokens(tokens):
    """
    This function takes in a list of tokenized integers and returns the corresponding text based on the provided vocabulary.
    
    Args:
    - tokens: A list of integers representing tokenized text.
    - vocab: A list of words in the vocabulary corresponding to each integer index.
    
    Returns:
    - text: A string of decoded text.
    """
    text = " ".join(VOCAB[int(token)] for token in tokens).strip()
    return text

In [72]:
for _ in range(10):
    # Randomly select a text from the testing data.
    index = np.random.randint(1,len(X_test))
    tokens = X_test[index-1:index]
    label = y_test[index]

    # Feed the tokens to the model
    print(f"\nModel Prediction\n{'-'*100}")
    proba = 1 if model.predict(tokens, verbose=0)[0][0]>0.5 else 0
    pred = label_encoder.inverse_transform([proba])
    print(f"Message: '{decode_tokens(tokens[0])}' | Prediction: {pred[0].title()} | True : {label_encoder.inverse_transform([label])[0].title()}\n")


Model Prediction
----------------------------------------------------------------------------------------------------
Message: '[UNK]' | Prediction: Ham | True : Ham


Model Prediction
----------------------------------------------------------------------------------------------------
Message: '[UNK] much r [UNK] willing to [UNK]' | Prediction: Ham | True : Spam


Model Prediction
----------------------------------------------------------------------------------------------------
Message: '[UNK] for any purpose å£500 [UNK] [UNK] [UNK] [UNK] [UNK] [UNK] [UNK] you been previously [UNK] [UNK] can still [UNK] [UNK] [UNK] 0800 1956669 or text back [UNK]' | Prediction: Spam | True : Spam


Model Prediction
----------------------------------------------------------------------------------------------------
Message: '[UNK] is [UNK] pick up and drop at door [UNK]' | Prediction: Ham | True : Ham


Model Prediction
---------------------------------------------------------------------------------