In [2]:
import os
import tarfile
import urllib.request

Download_Root = "https://spamassassin.apache.org/old/publiccorpus/"
Ham_Url = Download_Root + "20021010_easy_ham.tar.bz2"
Spam_Url = Download_Root + "20021010_spam.tar.bz2"
Spam_Path = os.path.join("datasets", "spam")

def fetch_data(ham_url = Ham_Url, spam_url = Spam_Url, spam_path = Spam_Path):
    if not os.path.isdir(spam_path):
        os.makedirs(spam_path)
    for filename, url in (("easy_ham.tar.bz2", ham_url), ("spam.tar.bz2", spam_url)):
        path = os.path.join(spam_path, filename)
        if not os.path.isfile(path):
            urllib.request.urlretrieve(url, path)
        tar_file = tarfile.open(path)
        tar_file.extractall(path = spam_path)
        tar_file.close()

In [3]:
fetch_data()

In [4]:
ham_dir = os.path.join(Spam_Path, "easy_ham")
spam_dir = os.path.join(Spam_Path, "spam")
ham_filenames = [name for name in sorted(os.listdir(ham_dir)) if len(name) > 20]
spam_filenames = [name for name in sorted(os.listdir(spam_dir)) if len(name) > 20]

In [5]:
len(ham_filenames)

2551

In [6]:
len(spam_filenames)

501

In [7]:
import email
import email.policy

def load_emails(is_spam, filename, spam_path = Spam_Path):
    directory = "spam" if is_spam else "easy_ham"
    with open(os.path.join(spam_path, directory, filename), "rb") as f:
        return email.parser.BytesParser(policy = email.policy.default).parse(f)

In [8]:
ham_emails = [load_emails(is_spam = False, filename = name) for name in ham_filenames]
spam_emails = [load_emails(is_spam = True, filename = name) for name in spam_filenames]

In [9]:
print(ham_emails[3].get_content().strip())

Klez: The Virus That Won't Die
 
Already the most prolific virus ever, Klez continues to wreak havoc.

Andrew Brandt
>>From the September 2002 issue of PC World magazine
Posted Thursday, August 01, 2002


The Klez worm is approaching its seventh month of wriggling across 
the Web, making it one of the most persistent viruses ever. And 
experts warn that it may be a harbinger of new viruses that use a 
combination of pernicious approaches to go from PC to PC.

Antivirus software makers Symantec and McAfee both report more than 
2000 new infections daily, with no sign of letup at press time. The 
British security firm MessageLabs estimates that 1 in every 300 
e-mail messages holds a variation of the Klez virus, and says that 
Klez has already surpassed last summer's SirCam as the most prolific 
virus ever.

And some newer Klez variants aren't merely nuisances--they can carry 
other viruses in them that corrupt your data.

...

http://www.pcworld.com/news/article/0,aid,103259,00.asp
____

In [10]:
print(spam_emails[3].get_content().strip())

1) Fight The Risk of Cancer!
http://www.adclick.ws/p.cfm?o=315&s=pk007

2) Slim Down - Guaranteed to lose 10-12 lbs in 30 days
http://www.adclick.ws/p.cfm?o=249&s=pk007

3) Get the Child Support You Deserve - Free Legal Advice
http://www.adclick.ws/p.cfm?o=245&s=pk002

4) Join the Web's Fastest Growing Singles Community
http://www.adclick.ws/p.cfm?o=259&s=pk007

5) Start Your Private Photo Album Online!
http://www.adclick.ws/p.cfm?o=283&s=pk007

Have a Wonderful Day,
Offer Manager
PrizeMama













If you wish to leave this list please use the link below.
http://www.qves.com/trim/?zzzz@example.com%7C17%7C308417


In [11]:
# for understanding what isinstance function do
x = "sjus"

if isinstance(x, str):
    print("string")
else:
    print("not string")

string


In [12]:
def get_email_structure(email):
    if isinstance(email, str):
        return email
    payload = email.get_payload()
    if isinstance(payload, list):
        return f"multipart({', '.join([get_email_structure(sub_email) for sub_email in payload])})"
    else:
        return email.get_content_type()

In [13]:
from collections import Counter

def get_counts(emails):
    counter = Counter()
    for email in emails:
        structure = get_email_structure(email)
        counter[structure] +=1
    return counter

In [14]:
get_counts(ham_emails).most_common()

[('text/plain', 2453),
 ('multipart(text/plain, application/pgp-signature)', 72),
 ('multipart(text/plain, text/html)', 8),
 ('multipart(text/plain, text/plain)', 4),
 ('multipart(text/plain)', 3),
 ('multipart(text/plain, application/octet-stream)', 2),
 ('multipart(text/plain, text/enriched)', 1),
 ('multipart(text/plain, application/ms-tnef, text/plain)', 1),
 ('multipart(multipart(text/plain, text/plain, text/plain), application/pgp-signature)',
  1),
 ('multipart(text/plain, video/mng)', 1),
 ('multipart(text/plain, multipart(text/plain))', 1),
 ('multipart(text/plain, application/x-pkcs7-signature)', 1),
 ('multipart(text/plain, multipart(text/plain, text/plain), text/rfc822-headers)',
  1),
 ('multipart(text/plain, multipart(text/plain, text/plain), multipart(multipart(text/plain, application/x-pkcs7-signature)))',
  1),
 ('multipart(text/plain, application/x-java-applet)', 1)]

In [15]:
get_counts(spam_emails).most_common()

[('text/plain', 222),
 ('text/html', 181),
 ('multipart(text/plain, text/html)', 45),
 ('multipart(text/html)', 19),
 ('multipart(text/plain)', 19),
 ('multipart(multipart(text/html))', 5),
 ('multipart(text/plain, image/jpeg)', 3),
 ('multipart(text/html, application/octet-stream)', 2),
 ('multipart(text/plain, application/octet-stream)', 1),
 ('multipart(text/html, text/plain)', 1),
 ('multipart(multipart(text/html), application/octet-stream, image/jpeg)', 1),
 ('multipart(multipart(text/plain, text/html), image/gif)', 1),
 ('multipart/alternative', 1)]

In [16]:
for header, value in spam_emails[1].items():
    print(header,":",value)

Return-Path : <12a1mailbot1@web.de>
Delivered-To : zzzz@localhost.example.com
Received : from localhost (localhost [127.0.0.1])	by phobos.labs.example.com (Postfix) with ESMTP id 136B943C32	for <zzzz@localhost>; Thu, 22 Aug 2002 08:17:21 -0400 (EDT)
Received : from mail.webnote.net [193.120.211.219]	by localhost with POP3 (fetchmail-5.9.0)	for zzzz@localhost (single-drop); Thu, 22 Aug 2002 13:17:21 +0100 (IST)
Received : from dd_it7 ([210.97.77.167])	by webnote.net (8.9.3/8.9.3) with ESMTP id NAA04623	for <zzzz@example.com>; Thu, 22 Aug 2002 13:09:41 +0100
From : 12a1mailbot1@web.de
Received : from r-smtp.korea.com - 203.122.2.197 by dd_it7  with Microsoft SMTPSVC(5.5.1775.675.6);	 Sat, 24 Aug 2002 09:42:10 +0900
To : dcek1a1@netsgo.com
Subject : Life Insurance - Why Pay More?
Date : Wed, 21 Aug 2002 20:31:57 -1600
MIME-Version : 1.0
Message-ID : <0103c1042001882DD_IT7@dd_it7>
Content-Type : text/html; charset="iso-8859-1"
Content-Transfer-Encoding : quoted-printable


In [17]:
spam_emails[1]['Subject']

'Life Insurance - Why Pay More?'

In [18]:
spam_emails[1]['From']

'12a1mailbot1@web.de'

In [19]:
import numpy as np
from sklearn.model_selection import train_test_split

X = np.array(ham_emails + spam_emails, dtype=object)
y = np.array([0] * len(ham_emails) + [1] * len(spam_emails))

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.2, random_state =42)

In [20]:
print(f"Length of X_train: {len(X_train)}")
print(f"Length of X_test: {len(X_test)}")

Length of X_train: 2441
Length of X_test: 611


In [21]:
import re
from html import unescape

def html_to_plain_text(html):
    text = re.sub('<head.*?>.*?</head>', '', html, flags=re.M | re.S | re.I)
    text = re.sub('<a\s.*?>', ' HYPERLINK ', text, flags=re.M | re.S | re.I)
    text = re.sub('<.*?>', '', text, flags=re.M | re.S)
    text = re.sub(r'(\s*\n)+', '\n', text, flags=re.M | re.S)
    return unescape(text)

In [22]:
html_spam_emails = [email for email in X_train[y_train==1]
                    if get_email_structure(email) == "text/html"]
sample_html_spam = html_spam_emails[7]
print(sample_html_spam.get_content().strip()[:1000], "...")

<html>
<head>
</head>
<center>
<h1>
<b><font face="Arial Black"><font color="#0000FF"><font size=+2>&nbsp;
Free Personal and Business Grants</font></font></font></b></h1></center>

<p>&nbsp;
<center><table BORDER=0 CELLSPACING=0 CELLPADDING=10 WIDTH="419" BGCOLOR="#0000FF" >
<tr>
<td WIDTH="397" BGCOLOR="#FFFF00">
<center>
<h2>
<font face="Arial Narrow">" Qualify for <u>at least</u> $25,000 in free
grants money - Guaranteed! "</font></h2></center>
</td>
</tr>
</table></center>

<center>
<h3>
<font face="Arial"><font size=+0>Each day over One Million Dollars in Free
Government<br>
Grants&nbsp; is given away to people just like you for a wide<br>
variety of Business And Personal Needs</font></font></h3></center>
<font face="Verdana"><font size=-1>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
Dear Grant Seeker,</font></font>
<blockquote><font face="Verdana"><font size=-1>In a moment, I'll tell you
exactly <b>HOW &amp; WHERE</b> to get Grants. This <b>MONEY</b> has to
be given away, <b>WHY</b

In [23]:
print(html_to_plain_text(sample_html_spam.get_content())[:1000], "...")


 
Free Personal and Business Grants
 
" Qualify for at least $25,000 in free
grants money - Guaranteed! "
Each day over One Million Dollars in Free
Government
Grants  is given away to people just like you for a wide
variety of Business And Personal Needs
       
Dear Grant Seeker,
In a moment, I'll tell you
exactly HOW & WHERE to get Grants. This MONEY has to
be given away, WHY not to YOU?
You may be thinking, "How
can I get some of this Free Grants Money"
Maybe you think it's impossible
to get free money?
Let me tell you it's not
impossible! It's a fact, ordinary people and businesses all across the
United States are receiving millions of dollars from these Government and
Private Foundation's everyday.
Who Can Apply?
ANYONE can apply
for a Grant from 18 years old and up!
Grants from $500.00 to $50,000.00
are possible! GRANTS don't have to be paid back,
EVER! Claim
your slice of the FREE American Pie.
This money is not a loan,
Trying to get money through a conventional bank can be ver

In [28]:
def email_to_text(email):
    html = None
    for part in email.walk():
        ctype = part.get_content_type()
        if not ctype in ("text/plain", "text/html"):
            continue
        try:
            content = part.get_content()
        except:
            content = str(part.get_payload())
        if ctype == "text/plain":
            return content
        else:
            html = content
    if html:
        return html_to_plain_text(html)

In [31]:
print(email_to_text(sample_html_spam)[:200], "...")


 
Free Personal and Business Grants
 
" Qualify for at least $25,000 in free
grants money - Guaranteed! "
Each day over One Million Dollars in Free
Government
Grants  is given away to people just lik ...


In [33]:
import nltk

stemmer = nltk.PorterStemmer()
for word in ("readable", "reader", "reading", "peaceful", "written"):
    print(word, "->", stemmer.stem(word))

readable -> readabl
reader -> reader
reading -> read
peaceful -> peac
written -> written


In [35]:
import urlextract

urlextract = urlextract.URLExtract()
print(urlextract.find_urls("you can watch the video from this link https://www.youtube.com/watch?v=u7_d4IzPlFY"))

['https://www.youtube.com/watch?v=u7_d4IzPlFY']


In [38]:
from sklearn.base import BaseEstimator, TransformerMixin

class EmailToWordCounterTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, strip_headers=True, lower_case=True, remove_punctuation=True,
                 replace_urls=True, replace_numbers=True, stemming=True):
        self.strip_headers = strip_headers
        self.lower_case = lower_case
        self.remove_punctuation = remove_punctuation
        self.replace_urls = replace_urls
        self.replace_numbers = replace_numbers
        self.stemming = stemming
    def fit(self, X, y=None):
        return self
    def transform(self, X, y=None):
        X_transformed = []
        for email in X:
            text = email_to_text(email) or ""
            if self.lower_case:
                text = text.lower()
            if self.replace_urls and urlextract is not None:
                urls = list(set(urlextract.find_urls(text)))
                urls.sort(key=lambda url: len(url), reverse=True)
                for url in urls:
                    text = text.replace(url, " URL ")
            if self.replace_numbers:
                text = re.sub(r'\d+(?:\.\d*)?(?:[eE][+-]?\d+)?', 'NUMBER', text)
            if self.remove_punctuation:
                text = re.sub(r'\W+', ' ', text, flags=re.M)
            word_counts = Counter(text.split())
            if self.stemming and stemmer is not None:
                stemmed_word_counts = Counter()
                for word, count in word_counts.items():
                    stemmed_word = stemmer.stem(word)
                    stemmed_word_counts[stemmed_word] += count
                word_counts = stemmed_word_counts
            X_transformed.append(word_counts)
        return np.array(X_transformed)

In [39]:
X_samples = X_train[:3]
X_samples_counts = EmailToWordCounterTransformer().fit_transform(X_samples)
X_samples_counts

array([Counter({'it': 4, 'pay': 3, 't': 3, 'the': 2, 'you': 2, 'without': 2, 's': 2, 'i': 2, 'a': 2, 'can': 2, 'look': 2, 'at': 2, 'if': 1, 'creator': 1, 'didnt': 1, 'say': 1, 'could': 1, 'have': 1, 'theft': 1, 'so': 1, 'simpl': 1, 'hell': 1, 'that': 1, 'even': 1, 'in': 1, 'all': 1, 'major': 1, 'holi': 1, 'book': 1, 'wow': 1, 've': 1, 'got': 1, 'great': 1, 'idea': 1, 'll': 1, 'hire': 1, 'skywrit': 1, 'to': 1, 'write': 1, 'thi': 1, 'then': 1, 'lock': 1, 'up': 1, 'everybodi': 1, 'who': 1, 'and': 1, 'didn': 1, 'fail': 1, 'jesu': 1, 'is': 1, 'on': 1, 'my': 1, 'side': 1, 'url': 1}),
       Counter({'i': 8, 'number': 7, 'to': 5, 'the': 5, 'of': 4, 'com': 3, 'we': 3, 'realli': 3, 'look': 3, 'it': 3, 'that': 3, 'with': 3, 'date': 2, 'welch': 2, 'panasa': 2, 't': 2, 'but': 2, 'is': 2, 'what': 2, 'would': 2, 'time': 2, 'for': 2, 'a': 2, 'have': 2, 'm': 2, 'not': 2, 'will': 2, 'exmh': 2, 'worker': 2, 'tue': 1, 'aug': 1, 'from': 1, 'brent': 1, 'messag': 1, 'id': 1, 'numbervaanumb': 1, 'blackcomb':

In [40]:
from scipy.sparse import csr_matrix

class WordCounterToVectorTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, vocabulary_size=1000):
        self.vocabulary_size = vocabulary_size
    def fit(self, X, y=None):
        total_count = Counter()
        for word_count in X:
            for word, count in word_count.items():
                total_count[word] += min(count, 10)
        most_common = total_count.most_common()[:self.vocabulary_size]
        self.vocabulary_ = {word: index + 1 for index, (word, count) in enumerate(most_common)}
        return self
    def transform(self, X, y=None):
        rows = []
        cols = []
        data = []
        for row, word_count in enumerate(X):
            for word, count in word_count.items():
                rows.append(row)
                cols.append(self.vocabulary_.get(word, 0))
                data.append(count)
        return csr_matrix((data, (rows, cols)), shape=(len(X), self.vocabulary_size + 1))

In [43]:
vocab_transformer = WordCounterToVectorTransformer(vocabulary_size=10)
X_few_vectors = vocab_transformer.fit_transform(X_samples_counts)
X_few_vectors

<3x11 sparse matrix of type '<class 'numpy.intc'>'
	with 27 stored elements in Compressed Sparse Row format>

In [44]:
X_few_vectors.toarray()

array([[ 51,   2,   0,   2,   4,   1,   1,   3,   2,   2,   2],
       [116,   8,   7,   5,   3,   5,   3,   2,   3,   1,   2],
       [ 46,   3,   5,   0,   0,   1,   1,   0,   0,   2,   0]],
      dtype=int32)

In [45]:
vocab_transformer.vocabulary_

{'i': 1,
 'number': 2,
 'the': 3,
 'it': 4,
 'to': 5,
 'that': 6,
 't': 7,
 'look': 8,
 'at': 9,
 'a': 10}

In [47]:
from sklearn.pipeline import Pipeline

pipeline = Pipeline([
    ("emailtoword", EmailToWordCounterTransformer()),
    ("wordcountertovec", WordCounterToVectorTransformer())
])
X_transformed = pipeline.fit_transform(X_train)

In [50]:
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score

log_model = LogisticRegression(solver='lbfgs', max_iter=1000, random_state = 78)
scores = cross_val_score(log_model, X_transformed, y_train, cv=3, verbose=3)
scores.mean()

[CV] END ................................ score: (test=0.988) total time=   0.0s
[CV] END ................................ score: (test=0.994) total time=   0.2s
[CV] END ................................ score: (test=0.985) total time=   0.2s


0.9889374446570018

In [60]:
from sklearn.metrics import precision_score, recall_score

X_test_transformed = pipeline.transform(X_test)

log_model = LogisticRegression(solver='lbfgs', max_iter = 1000, random_state = 78)
log_model.fit(X_transformed, y_train)
y_pred = log_model.predict(X_test_transformed)

ps = precision_score(y_pred, y_test)
rs = recall_score(y_pred, y_test)

print(f"Precision score: {100 * ps:.2f}%")
print(f"Recall score: {100 * rs:.2f}%")

Precision score: 95.65%
Recall score: 94.62%
