In [0]:
!tar -xvf ./20021010_spam.tar

In [0]:
!tar -xvf ./20021010_easy_ham.tar

In [0]:
!tar -xvf ./20021010_hard_ham.tar

In [0]:
import glob, re
import math
from typing import List, Tuple, Dict, Iterable, NamedTuple, TypeVar, Set
from collections import defaultdict, Counter

class Message(NamedTuple):
    text: str
    is_spam: bool

X = TypeVar('X')
def split_data(data, prob):
    """Split data into fractions [prob, 1 - prob]"""
    data = data[:]
    random.shuffle(data)
    cut = int(len(data) * prob)
    return data[:cut], data[cut:]

# 스팸 제목을 토큰화 시키기 위함
def tokenize(text: str) -> Set[str]:
    text = text.lower()                         # Convert to lowercase,
    all_words = re.findall("[a-z0-9']+", text)  # extract the words, and
    return set(all_words) 

In [0]:
path = 'Spam/*/*'

data: List[Message] = []

# 데이터 중 subject와 스팸 여부(is_spam)를 Message class에 넣는다
# 해당 Message class 를 data에 append
for filename in glob.glob(path):
    is_spam = "ham" not in filename 
    with open(filename, errors='ignore') as email_file:
        for line in email_file:
            if line.startswith("Subject:"):
                subject = line.lstrip("Subject: ")
                data.append(Message(subject, is_spam))
                break 

In [9]:
data[:10]

[Message(text='CNET NEWS.COM: The tech side of homeland defense\n', is_spam=False),
 Message(text='Your Daily Dilbert 07/13/2002\n', is_spam=False),
 Message(text='[Lockergnome Tech Specialist]  Frequent Format\n', is_spam=False),
 Message(text='CNET DIGITAL DISPATCH: Mac zealots unite\n', is_spam=False),
 Message(text='Yahoo! News Story - Top Stories\n', is_spam=False),
 Message(text='firewalls Digest V1 #33\n', is_spam=False),
 Message(text="DayTips' Poem-a-Day: 09/13/02\n", is_spam=False),
 Message(text="Classic Novels, Aesop's Fables, Issue 49\n", is_spam=False),
 Message(text='Personal Finance: Resolutions You Can Keep\n', is_spam=False),
 Message(text='[Lockergnome Tech Specialist]  Geothermal Caffeine\n', is_spam=False)]

In [0]:
class NaiveBayesClassifier:
    def __init__(self, k: float = 0.5) -> None:
        self.k = k  # smoothing factor

        self.tokens: Set[str] = set()
        self.token_spam_counts: Dict[str, int] = defaultdict(int)
        self.token_ham_counts: Dict[str, int] = defaultdict(int)
        self.spam_messages = self.ham_messages = 0

    def train(self, messages: Iterable[Message]) -> None:
        for message in messages:
            # 메시지 자체를 스팸 여부에 따라 count
            if message.is_spam:
                self.spam_messages += 1
            else:
                self.ham_messages += 1

            # 단어 추출 뒤 스팸 여부에 따라 count
            for token in tokenize(message.text):
                self.tokens.add(token)
                if message.is_spam:
                    self.token_spam_counts[token] += 1
                else:
                    self.token_ham_counts[token] += 1

    def _probabilities(self, token: str) -> Tuple[float, float]:
        # P (token | spam) 과 P(token | not spam) 을 return
        spam = self.token_spam_counts[token]
        ham = self.token_ham_counts[token]

        p_token_spam = (spam + self.k) / (self.spam_messages + 2 * self.k)
        p_token_ham = (ham + self.k) / (self.ham_messages + 2 * self.k)

        return p_token_spam, p_token_ham

    def predict(self, text: str) -> float:
        text_tokens = tokenize(text)
        log_prob_if_spam = log_prob_if_ham = 0.0

        # train 된 단어들(vocabulary 에 있는 단어들)을 loop
        for token in self.tokens:
            prob_if_spam, prob_if_ham = self._probabilities(token)

            # voca 에 있는 단어가 예측하기 위한 단어 집합(text_tokens)에 있는 경우 
            # probabilities를 통해 구한 확률에 log 처리후 더해 줌
            if token in text_tokens:
                log_prob_if_spam += math.log(prob_if_spam)
                log_prob_if_ham += math.log(prob_if_ham)

            # text_tokens 에 있는 단어가 voca 에는 없음
            else:
                log_prob_if_spam += math.log(1.0 - prob_if_spam)
                log_prob_if_ham += math.log(1.0 - prob_if_ham)

        prob_if_spam = math.exp(log_prob_if_spam)
        prob_if_ham = math.exp(log_prob_if_ham)
        return prob_if_spam / (prob_if_spam + prob_if_ham)

In [0]:
import random
random.seed(0)

# train/test split
train_messages, test_messages = split_data(data, 0.75)

model = NaiveBayesClassifier()
model.train(train_messages)

In [0]:
# 테스트 데이터에 대한 predict
predictions = [(message, model.predict(message.text)) for message in test_messages]

# confusion matrix
# 실제 스팸이 스팸이라고 예측 (True, True) (TP)
# 햄이 스팸으로 예측 (False, True) (FP)
# 햄이 햄으로 예측 (False, False) (TN)
# 실제 스팸이 햄으로 예측 (True, False) (FN)
confusion_matrix = Counter((message.is_spam, spam_probability > 0.5) for message, spam_probability in predictions)

In [13]:
confusion_matrix

Counter({(False, False): 675,
         (False, True): 24,
         (True, False): 51,
         (True, True): 75})

In [15]:
cm = confusion_matrix
# precision : TP / (TP + FP)
# 스팸이라고 판단 한 것 중 실제 스팸인 비율
print(cm[(True, True)] / (cm[(True, True)] + cm[(False, True)]))

# recall : TP / (TP + FN)
# 실제 스팸 중 스팸이라고 판단한 비율
print(cm[(True, True)] / (cm[(True, True)] + cm[(True, False)]))

0.7575757575757576
0.5952380952380952


In [16]:
def p_spam_given_token(token: str, model: NaiveBayesClassifier) -> float:
    # naive bayse 를 통해 p(스팸 | 메시지가 해당 단어를 포함) 을 계산
    prob_if_spam, prob_if_ham = model._probabilities(token)

    return prob_if_spam / (prob_if_spam + prob_if_ham)

# 스팸일 확률에 따라 메시지를 오름차순 정렬
words = sorted(model.tokens, key=lambda t: p_spam_given_token(t, model))

# 스팸일 확률이 가장 높은 메시지
print("spammiest_words", words[-10:])

# 스팸일 확률이 가장 낮은 메시지
print("hammiest_words", words[:10])

spammiest_words ['clearance', 'account', 'per', 'sale', 'year', 'zzzz', 'systemworks', 'rates', 'money', 'adv']
hammiest_words ['spambayes', 'users', 'zzzzteana', 'razor', 'sadev', 'ouch', 'perl', 'spam', 'bliss', 'selling']
