<a href="https://colab.research.google.com/github/LukyLuke92/freeCodeCamp-files-LR/blob/main/MachineLearningWithPython/NeuralNetworkSMSTextClassifier/fcc_sms_text_classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# import libraries
#!pip install --upgrade tensorflow
#!pip install --upgrade keras

import tensorflow as tf
import keras
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

print(tf.__version__)

In [None]:
# get data files
!wget https://cdn.freecodecamp.org/project-data/sms/train-data.tsv
!wget https://cdn.freecodecamp.org/project-data/sms/valid-data.tsv

train_file_path = "train-data.tsv"
test_file_path = "valid-data.tsv"

In [None]:
# Import the data as dataframes
train_data = pd.read_csv(train_file_path,sep='\t',header=None)
test_data = pd.read_csv(test_file_path,sep='\t',header=None)
# Rename the columns to label and text
train_data.rename(columns={0: 'label',1: 'text'},inplace=True)
test_data.rename(columns={0: 'label',1: 'text'},inplace=True)
# Convert back and forth between ham and spam labels (ham=0, spam=1)
hamspam_to_idx = {'ham': 0, 'spam': 1}
idx_to_hamspam = ['ham','spam']
# Apply hamspam_to_idx to both datasets and pop them off into label dataframes
train_labels = train_data.pop('label').apply(lambda x: hamspam_to_idx[x])
test_labels = test_data.pop('label').apply(lambda x: hamspam_to_idx[x])

In [None]:
# Find the number of words (text separated by spaces) in each text
text_words = train_data['text'].apply(lambda x: x.split(' ')).str.len().value_counts()
plt.bar(text_words.index,height=text_words)
# Find the maximum number of words in a text
# Note that this is cast as an int, since it is originally a numpy int64, which
#  appears to be incompatible as an input to the textvectorization layer
max_text_len = int(train_data['text'].apply(lambda x: x.split(' ')).str.len().max())
print(max_text_len)

In [None]:
# Create an input layer (object?)
input_layer = keras.Input(
    shape=(1,),
    dtype='string'
)

# Create a layer that will vectorize the text
text_vectorizer = keras.layers.TextVectorization(
    split='whitespace',
    output_sequence_length=max_text_len
)
text_vectorizer.adapt(train_data)

vocab_size = len(text_vectorizer.get_vocabulary())

# Create a layer for embedding the vectorized text
embedding_dim = 256
embedding_layer = keras.layers.Embedding(vocab_size,
                                         embedding_dim)

In [None]:
# Create the model
# Use a global average pooling 1D layer to average the embedding values across
#  all words found in the text

model = keras.models.Sequential([
    input_layer,
    text_vectorizer,
    embedding_layer,
    keras.layers.GlobalAveragePooling1D(),
    keras.layers.Flatten(),
    keras.layers.Dense(256,
                      activation='tanh'),
    keras.layers.Dense(128,
                      activation='relu'),
    keras.layers.Dense(1,activation='sigmoid')
])

In [None]:
# Compile the model
model.compile(optimizer='Adam',
              loss='binary_crossentropy',
              metrics=['accuracy'])

In [None]:
# Train the model
# Note that using GPU speeds this up by > 50 x (~10 seconds per epoch with GPU,
#  ~10 minutes per epoch without)
# Adding class_weight dictionary, since spam messages are significantly under-
#  represented in the dataset

history = model.fit(x=train_data,
          y=train_labels,
          epochs=25,
          validation_split=0.2,
          class_weight={0: 1,
                        1: 25}
          )

In [None]:
# Plot the accuracy values over time
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])

In [None]:
model.evaluate(x=test_data,
               y=test_labels)

In [None]:
# function to predict messages based on model
# (should return list containing prediction and label, ex. [0.008318834938108921, 'ham'])
def predict_message(pred_text):
  model_out = model.predict(np.expand_dims(pred_text,axis=0))
  prediction = [model_out[0][0], 'ham' if model_out[0][0] < 0.5 else 'spam']
  return (prediction)

pred_text = "you have won £1000 cash! call to claim your prize."
prediction = predict_message(pred_text)
print(prediction)

In [None]:
# Run this cell to test your function and model. Do not modify contents.
def test_predictions():
  test_messages = ["how are you doing today",
                   "sale today! to stop texts call 98912460324",
                   "i dont want to go. can we try it a different day? available sat",
                   "our new mobile video service is live. just install on your phone to start watching.",
                   "you have won £1000 cash! call to claim your prize.",
                   "i'll bring it tomorrow. don't forget the milk.",
                   "wow, is your arm alright. that happened to me one time too"
                  ]

  test_answers = ["ham", "spam", "ham", "spam", "spam", "ham", "ham"]
  passed = True

  for msg, ans in zip(test_messages, test_answers):
    prediction = predict_message(msg)
    if prediction[1] != ans:
      passed = False
      print(f'incorrectly classified "{msg}"')

  if passed:
    print("You passed the challenge. Great job!")
  else:
    print("You haven't passed yet. Keep trying.")

test_predictions()


In [None]:
###### DO NOT USE CODE BELOW ######
# Code below is first attempt, which split each message into individual
#  characters, embedded the resulting list, and used an LSTM - while this
#  performed very well on the test (>97% accuracy), it still fails to catch the
#  test case "our new mobile video service is live. just install on your phone
#  to start watching." - it is also much more computationally expensive
# Because this is very subtly spam, I think the model will need to look for
#  specific words, such as 'service' and 'install' - because of that, the code
#  above uses a text vectorization layer to instead split by spaces

In [None]:
# get data files
!wget https://cdn.freecodecamp.org/project-data/sms/train-data.tsv
!wget https://cdn.freecodecamp.org/project-data/sms/valid-data.tsv

train_file_path = "train-data.tsv"
test_file_path = "valid-data.tsv"

In [None]:
# Import and clean/wrangle the data
# Import the data as dataframes
train_data = pd.read_csv(train_file_path,sep='\t',header=None)
test_data = pd.read_csv(test_file_path,sep='\t',header=None)
# Rename the columns to label and text
train_data.rename(columns={0: 'label',1: 'text'},inplace=True)
test_data.rename(columns={0: 'label',1: 'text'},inplace=True)
# Plot distributions of text lengths in train_data and test_data
fig, (ax1,ax2) = plt.subplots(1,2,figsize=(16,9))
ax1.bar(train_data['text'].str.len().value_counts().index,height=train_data['text'].str.len().value_counts())
ax2.bar(test_data['text'].str.len().value_counts().index,height=test_data['text'].str.len().value_counts())
# Get percentage of texts with length > max_length
max_length = 200
num_gr_max_train = train_data['text'].str.len().value_counts()[train_data['text'].str.len().value_counts().index > max_length].sum()
num_gr_max_test = test_data['text'].str.len().value_counts()[test_data['text'].str.len().value_counts().index > max_length].sum()
print(f'frac greater than {max_length} in train_data',num_gr_max_train/train_data.shape[0])
print(f'frac greater than {max_length} in test_data',num_gr_max_test/test_data.shape[0])

In [None]:
# It looks like only ~2% of texts are longer than 200 characters
# In addition, none of the texts in the test case are longer than 200
# Therefore, truncate texts to a max length of 200 - also, pad texts that are
#  less than 200, so that they are all the same length (using ljust)
# Do this using a function - any text that you'll want to make predictions on
#  later will need to be formatted using the same function
def trunc_or_pad(msg):
  if len(msg) > 200:
    return msg[0:200]
  else:
    return msg.ljust(200)

In [None]:
# Create your vocabulary by finding all unique characters in both datasets
vocab = sorted(set(train_data['text'].sum() + test_data['text'].sum()))
vocab_size = len(vocab)
# Create a dictionary that contains char as key and index as val
char_to_idx = { u: i for i, u in enumerate(vocab) }
# Create an array that has char corresponding to each index
idx_to_char = np.array(vocab)
# Do the same for ham and spam labels (ham=0, spam=1)
hamspam_to_idx = {'ham': 0, 'spam': 1}
idx_to_hamspam = ['ham','spam']

In [None]:
# Define a function that converts an input string into an input vector that can
#  be fed into the input (embedding) layer of the model
def string_to_input(text):
  padded_text = trunc_or_pad(text)
  return np.array([char_to_idx[x] for x in list(padded_text)])

In [None]:
# Now apply string_to_input to both datasets
train_data['text'] = train_data['text'].apply(lambda x: string_to_input(x))
test_data['text'] = test_data['text'].apply(lambda x: string_to_input(x))

In [None]:
# Now apply hamspam_to_idx to labels and pop them off into a new series
train_labels = train_data.pop('label').apply(lambda x: hamspam_to_idx[x])
test_labels = test_data.pop('label').apply(lambda x: hamspam_to_idx[x])

In [None]:
# Finally, create a new dataframe that has each element of the list in 'text'
#  in a separate column
# I do not think that this is the optimal way to do this, but it should work
for i in range(200):
  train_data[i] = train_data['text'].apply(lambda x: x[i])
  test_data[i] = test_data['text'].apply(lambda x: x[i])
train_data = train_data.drop(columns='text')
test_data = test_data.drop(columns='text')

In [None]:
# Create the model
# The first layer will be an embedding layer, followed by a layer of LSTM nodes,
#  then at least one hidden dense layer, and finally a dense layer consisting of
#  two output nodes with a 'softmax' activation function

# Dimension of the dense embedding
embedding_dim = 256
RNN_units = 1024
batch_size = 200
timesteps = 50

def build_model(vocab_size, embedding_dim, RNN_units, batch_size):
  model = keras.Sequential([
      tf.keras.layers.Embedding(vocab_size,
                            embedding_dim,
                            input_shape=(batch_size,)),
      tf.keras.layers.Dropout(0.1),
      tf.keras.layers.LSTM(RNN_units,
                        return_sequences=True,
                        recurrent_initializer='glorot_uniform',
                        batch_input_shape=(batch_size, timesteps, embedding_dim)), # By default uses tanh activation function
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64,activation='relu'),
      tf.keras.layers.Dense(1,activation='sigmoid')
  ])
  return model

model = build_model(vocab_size, embedding_dim, RNN_units, batch_size)
model.summary()

In [None]:
# Compile the model
model.compile(optimizer='Adam',
              loss='binary_crossentropy',
              metrics=['accuracy'])

In [None]:
# Train the model
# Note that using GPU speeds this up by > 50 x (~10 seconds per epoch with GPU,
#  ~10 minutes per epoch without)
# Adding class_weight dictionary, since spam messages are significantly under-
#  represented in the dataset
history = model.fit(x=train_data,
          y=train_labels,
          batch_size=batch_size,
          epochs=25,
          validation_split=0.2,
          class_weight={0: 1,
                        1: 25}
          )

In [None]:
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])

In [None]:
model.evaluate(x=test_data,
               y=test_labels,
               batch_size=batch_size)

In [None]:
# function to predict messages based on model
# (should return list containing prediction and label, ex. [0.008318834938108921, 'ham'])
def predict_message(pred_text):
  padded_pred_text = np.array(string_to_input(pred_text))
  model_out = model.predict(np.expand_dims(padded_pred_text,axis=0))
  prediction = [model_out[0][0], 'ham' if model_out[0][0] < 0.5 else 'spam']
  return (prediction)

pred_text = "you have won £1000 cash! call to claim your prize."
prediction = predict_message(pred_text)
print(prediction)

In [None]:
# Run this cell to test your function and model. Do not modify contents.
def test_predictions():
  test_messages = ["how are you doing today",
                   "sale today! to stop texts call 98912460324",
                   "i dont want to go. can we try it a different day? available sat",
                   "our new mobile video service is live. just install on your phone to start watching.",
                   "you have won £1000 cash! call to claim your prize.",
                   "i'll bring it tomorrow. don't forget the milk.",
                   "wow, is your arm alright. that happened to me one time too"
                  ]

  test_answers = ["ham", "spam", "ham", "spam", "spam", "ham", "ham"]
  passed = True

  for msg, ans in zip(test_messages, test_answers):
    prediction = predict_message(msg)
    if prediction[1] != ans:
      passed = False
      print(f'incorrectly classified "{msg}"')

  if passed:
    print("You passed the challenge. Great job!")
  else:
    print("You haven't passed yet. Keep trying.")

test_predictions()
