In [1]:
import numpy as np
import tensorflow as tf
import pandas as pd
import re
import tensorflow as tf
from sklearn.model_selection import KFold
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, confusion_matrix
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, LSTM, Dense, Dropout, Bidirectional,Conv1D, MaxPooling1D, Flatten, BatchNormalization, Input, Concatenate
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.regularizers import l2
from tensorflow.keras.models import Model
from urllib.parse import urlparse # Number of subdomains in the URL

In [2]:
model = tf.keras.models.load_model("phisingmodel.keras")

In [3]:
# extract sender domain 
def separate_senderdomain(text):
    domain = re.findall(r'<(.*?)>', text)

    if not domain:
        domain.append('unknown')
    return ''.join(domain)

def extract_url(text):

    url_pattern = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
    urls = re.findall(url_pattern, text)
    text_without_urls = re.sub(url_pattern, 'url', text)
    return text_without_urls,urls


def extract_url_features(url):
    features = []

    features.append(1 if url.lower().startswith("https") else 0)

    features.append(len(url))

    try:
        parsed_url=urlparse(url)
        subdomain_count = len(parsed_url.netloc.split('.')) -2 #subdomains count, excluding domain and top level domain
        features.append(subdomain_count)
    except Exception as e:
        features.append(0)

    suspicious_keywords = ["login", "verify", "account", "secure", "update","password"]
    features.append(1 if any(keyword in url.lower() for keyword in suspicious_keywords) else 0)

    features.append(url.count('?'))

    return features

def extract_features_from_urls(urls):
    all_features =[]

    for url in urls:
        features = extract_url_features(url)
        all_features.append(features)

    if all_features:
        aggregated_features = []
        for i in range(len(all_features[0])):
            if isinstance(all_features[0][i], (int,float)):
                aggregated_features.append(sum(f[i] for f in all_features) / len(all_features))
            else:
                aggregated_features.append(None)
        return aggregated_features
    else:
        return [0*5]

#clean body and subject
def clean_text(text):
    text = re.sub(r'<.*?>','',text)        # remove the HTML tags
    text = re.sub(r'http\S+|www\S+','',text) # remove URLS
    text = re.sub(r'[^\w\s]','',text) # remove alpha numeric
    text = text.lower().strip() # strip trailing whitespace
    return text

def remove_slashN(text):
    text = text.replace('\n',' ')
    return text

In [4]:
testDataset = pd.DataFrame({'sender':['cPanel <service@cpanel.com>', 'MR. JAMES NGOLA. <james_ngola2002@maktoob.com>'],
                            'body':['Congratulations! You\'ve won a $1000 gift card! Click here to claim your prize: https://fakeurl.com/','Hi Team,I hope this message finds you well. I wanted to remind you about our upcoming meeting scheduled for Thursday, March 15th at 10:00 AM. We will be discussing the latest updates on the project and addressing any questions or concerns you may have.'],
                           'subject':['YOU HAVE WON A LOTTERY','Meeting Reminder: Project Update'],
                           'urls':[1,0]})

In [5]:
testDataset.head()

Unnamed: 0,sender,body,subject,urls
0,cPanel <service@cpanel.com>,Congratulations! You've won a $1000 gift card!...,YOU HAVE WON A LOTTERY,1
1,MR. JAMES NGOLA. <james_ngola2002@maktoob.com>,"Hi Team,I hope this message finds you well. I ...",Meeting Reminder: Project Update,0


In [6]:
def prep_process(testDataset):
    # get the sender domain
    testDataset['senderdomain'] = testDataset['sender'].apply(separate_senderdomain)

    # separate the url and body
    testDataset['body'], testDataset['extracted_url'] = zip(*testDataset['body'].apply(extract_url))
    testDataset['num_urls'] = testDataset['extracted_url'].apply(len)

    # extracting feature of urls
    testDataset['extracted_url'] = testDataset['extracted_url'].apply(extract_features_from_urls)

    #clean subject and body
    testDataset['clean_body'] = testDataset['body'].apply(clean_text)
    testDataset['clean_subject'] = testDataset['subject'].apply(clean_text)

    testDataset['clean_body'] = testDataset['clean_body'].apply(remove_slashN)

    testDataset['clean_body_len'] = testDataset['clean_body'].apply(len)
    testDataset['combined_text'] = testDataset['senderdomain'] + " " + testDataset['clean_subject'] + " " +testDataset['clean_body']

In [7]:
prep_process(testDataset)

In [13]:
def predict(testDataset):
    texts = testDataset['combined_text'].values
    num_urls = testDataset['num_urls'].values.reshape(-1, 1)
    body_len = testDataset['clean_body_len'].values.reshape(-1, 1)
    extracted_url = testDataset['extracted_url'].values.reshape(-1,1)

    vocab_size = 10000
    tokenizer = Tokenizer(num_words=vocab_size, oov_token='<OOV>')
    tokenizer.fit_on_texts(texts)
    sequences = tokenizer.texts_to_sequences(texts)
    #text_length = [len(seq) for seq in sequences]
    #max_length = np.percentile(text_length,95)
    #print(max_length)

    X_padded = pad_sequences(sequences, maxlen=659, padding='post', truncating='post')

    extracted_url = [x[0] if isinstance(x[0], list) else [x[0]] for x in extracted_url]
    MAX_URL_LENGTH = 5  # Choose based on your data analysis
    extracted_url_padded = pad_sequences(extracted_url, maxlen=MAX_URL_LENGTH, padding='post', dtype='float32')

    y_pred = model.predict([X_padded,num_urls,body_len,extracted_url_padded])
    print (["NotSpam" if y > 0.5 else 'Spam' for y in y_pred.flatten()])

In [14]:
predict(testDataset)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 109ms/step
['Spam', 'NotSpam']
