# Recurrent neural network: spam detection

In [None]:
# Standard library
import re
from pathlib import Path

# Third party imports
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf

from sklearn.model_selection import train_test_split

RANDOM_SEED = 315

## 1. Data loading

### 1.1. Load the data from URL

In [None]:
# Read csv file into dataframe
data_df = pd.read_csv('https://raw.githubusercontent.com/4GeeksAcademy/NLP-project-tutorial/main/url_spam.csv')

# Drop duplicates if any
data_df.drop_duplicates(inplace=True)
data_df.reset_index(inplace=True, drop=True)

### 1.2. Save a local copy

In [None]:
# Make a directory for raw data
Path('../data/raw').mkdir(exist_ok=True, parents=True)

# Save a local copy of the raw data
data_df.to_parquet('../data/raw/urls.parquet')

### 1.3. Inspect the data

In [None]:
data_df.head()

In [None]:
data_df.info()

## 2. EDA

### 2.1. Label frequency

First, let's just take a look at how many 'spam' vs 'not spam' urls we have:

In [None]:
label_counts = data_df['is_spam'].value_counts()

not_spam = label_counts.iloc[0]
spam = label_counts.iloc[1]

print(f'URLs are {(not_spam/(spam + not_spam)*100):.1f}% not spam')

This is unbalanced, but not extremely so - we may not need to do anything with this information. But, it is good to keep it in mind as we work through EDA and modeling.

### 2.2. URL length distribution

In [None]:
data_df['URL_length'] = data_df['url'].str.len().tolist()

plt.title('URL length distribution')
plt.xlabel('Characters')
plt.ylabel('URLs')
plt.hist(data_df['URL_length'], bins=30, color='black')
plt.show()

print(f"URL length mean: {np.mean(data_df['URL_length']):.0f}")
print(f"URL length min: {min(data_df['URL_length']):.0f}")
print(f"URL length max: {max(data_df['URL_length']):.0f}")

### 2.3. Short URLs

In [None]:
short_urls = data_df[data_df['URL_length'] < 20]
short_urls

### 2.4. Long URLs

In [None]:
long_urls = data_df[data_df['URL_length'] > 200]
long_urls

## 3. Data preprocessing

### 3.1. Label encoding

In [None]:
data_df['is_spam'] = data_df['is_spam'].astype(str)
data_df['is_spam'] = data_df['is_spam'].replace({'True': '1', 'False': '0'})
data_df['is_spam'] = data_df['is_spam'].astype(int)

### 3.2. URL splitting

In [None]:
def domain_splitter(url:str) -> list:
    '''Splits URLs on non-word characters, then joins on space
    for compatibility with the Tensorflow text vectorizer'''

    return ' '.join(re.findall(f'[\w`]+', url))

data_df['url'] = data_df['url'].apply(domain_splitter)
data_df.head()

### 3.1. Train-test split

In [None]:
train_df, test_df = train_test_split(data_df, test_size=0.3, random_state=RANDOM_SEED)
print(f'Test data: {test_df.shape}')

### 3.1. Train-validation split

In [None]:
training_df, validation_df = train_test_split(train_df, test_size=0.3, random_state=RANDOM_SEED)
print(f'Training data: {train_df.shape}')
print(f'Validation data: {validation_df.shape}')

In [None]:
training_df.info()

## 3. RNN model

### 3.2. Text encoding

In [None]:
def domain_splitter(url:str) -> list:
    '''Splits URLs on non-word characters'''

    return re.findall(f'[\w`]+', url)

In [None]:
training_features = tf.convert_to_tensor(train_df['url'].to_numpy())
validation_features = tf.convert_to_tensor(validation_df['url'].to_numpy())
testing_features = tf.convert_to_tensor(test_df['url'].to_numpy())

encoder = tf.keras.layers.TextVectorization(
    ngrams=3,
    output_mode='tf_idf',
    sparse=False,
    split='whitespace'
)

encoder.adapt(training_features)

vocab = np.array(encoder.get_vocabulary())
vocab[:20]

### 3.2. Model definition

In [None]:
model = tf.keras.Sequential([
    encoder,
    tf.keras.layers.Embedding(
        input_dim=len(encoder.get_vocabulary()),
        output_dim=64,
        mask_zero=True
    ),
    tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(32)),
    tf.keras.layers.Dense(8, activation='relu'),
    tf.keras.layers.Dense(1, activation='sigmoid')
])

model.compile(
    loss=tf.keras.losses.BinaryCrossentropy(),
    optimizer=tf.keras.optimizers.Adam(1e-5),
    metrics=['accuracy']
)

### 3.3. Class weighting

In [None]:
total = len(train_df['is_spam'])
pos = sum(train_df['is_spam'])
neg = total-pos

weight_for_0 = (1 / neg) * (total / 2.0)
weight_for_1 = (1 / pos) * (total / 2.0)

class_weight = {0: weight_for_0, 1: weight_for_1}

print('Weight for class 0: {:.2f}'.format(weight_for_0))
print('Weight for class 1: {:.2f}'.format(weight_for_1))

### 3.3. Model training

In [None]:
%%time

training_results = model.fit(
    training_features,
    train_df['is_spam'],
    epochs=200,
    validation_data=(validation_features, validation_df['is_spam']),
    class_weight=class_weight,
    verbose=0
)

# Set-up a 1x2 figure for accuracy and binary cross-entropy
fig, axs = plt.subplots(1,2, figsize=(8,4))

# Add the main title
fig.suptitle('RNN training curves', size='large')

# Plot training and validation accuracy
axs[0].set_title('Accuracy')
axs[0].plot(np.array(training_results.history['accuracy']) * 100, label='Training')
axs[0].plot(np.array(training_results.history['val_accuracy']) * 100, label='Validation')
axs[0].set_xlabel('Epoch')
axs[0].set_ylabel('Accuracy (%)')
axs[0].legend(loc='best')

# Plot training and validation binary cross-entropy
axs[1].set_title('Binary cross-entropy')
axs[1].plot(training_results.history['loss'])
axs[1].plot(training_results.history['val_loss'])
axs[1].set_xlabel('Epoch')
axs[1].set_ylabel('Binary cross-entropy')

fig.tight_layout()

print()

### 3.2. Regularization

In [None]:
%%time

# Add regularization to the model
regularizer=tf.keras.regularizers.L1L2(l1=0.02, l2=0.002)

model=tf.keras.Sequential([
    encoder,
    tf.keras.layers.Embedding(
        input_dim=len(encoder.get_vocabulary()),
        output_dim=128,
        mask_zero=True),
    tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(64)),
    tf.keras.layers.Dense(16, activation='relu', kernel_regularizer=regularizer),
    tf.keras.layers.Dense(1, activation='sigmoid')
])

# Re-compile
model.compile(
    loss=tf.keras.losses.BinaryCrossentropy(),
    optimizer=tf.keras.optimizers.Adam(1e-5),
    metrics=['accuracy']
)

# Re-train
training_results=model.fit(
    training_features,
    training_labels,
    epochs=500,
    validation_data=(validation_features,validation_labels),
    class_weight=class_weight,
    verbose=0
)

# Set-up a 1x2 figure for accuracy and binary cross-entropy
fig, axs=plt.subplots(1,2, figsize=(8,4))

# Add the main title
fig.suptitle('RNN training curves', size='large')

# Plot training and validation accuracy
axs[0].set_title('Accuracy')
axs[0].plot(np.array(training_results.history['accuracy']) * 100, label='Training')
axs[0].plot(np.array(training_results.history['val_accuracy']) * 100, label='Validation')
axs[0].set_xlabel('Epoch')
axs[0].set_ylabel('Accuracy (%)')
axs[0].legend(loc='best')

# Plot training and validation binary cross-entropy
axs[1].set_title('Binary cross-entropy')
axs[1].plot(training_results.history['loss'])
axs[1].plot(training_results.history['val_loss'])
axs[1].set_xlabel('Epoch')
axs[1].set_ylabel('Binary cross-entropy')

fig.tight_layout()

print()

## 4. Model evaluation

### 4.1. Test set predictions

In [None]:
from sklearn.metrics import accuracy_score, confusion_matrix, ConfusionMatrixDisplay

threshold=0.5
predictions=model.predict(tf.convert_to_tensor(testing_tokens))
predictions=[1 if p > threshold else 0 for p in predictions]

accuracy=accuracy_score(predictions, encoded_testing_labels)*100

# Plot the confusion matrix
cm=confusion_matrix(encoded_testing_labels, predictions, normalize='true')
cm_disp=ConfusionMatrixDisplay(confusion_matrix=cm)
_=cm_disp.plot()

plt.title(f'Test set performance\noverall accuracy: {accuracy:.1f}%')
plt.xlabel('Predicted outcome')
plt.ylabel('True outcome')
plt.show()