In [1]:
from typing import Set
import re

In [2]:
def tokenize(text: str) -> Set[str]:
    text = text.lower()
    all_words = re.findall('[a-z0-9]+', text)
    return set(all_words)

In [3]:
print(tokenize("Data Science is science"))

{'data', 'science', 'is'}


In [4]:
from typing import NamedTuple

In [5]:
class Message(NamedTuple):
    text: str
    is_spam: bool

In [6]:
from typing import List, Tuple, Dict, Iterable
import math
from collections import defaultdict

In [7]:
class NaiveBayesClassifier:
    def __init__(self, k: float=0.5) -> None:
        self.k = k
        self.tokens: Set[str] = set()
        self.token_spam_counts: Dict[str, int] = defaultdict(int)
        self.token_ham_counts: Dict[str, int] = defaultdict(int)
        self.spam_messages = self.ham_messages = 0

    def train(self, messages: Iterable[Message]) -> None:
        for message in messages:
            # 메세지의 수를 증가시킨다.
            if message.is_spam:
                self.spam_messages += 1
            else:
                self.ham_messages += 1

            # 단어의 빈도를 증가시킨다.
            for token in tokenize(message.text):
                self.tokens.add(token)
                if message.is_spam:
                    self.token_spam_counts[token] += 1
                else:
                    self.token_ham_counts[token] += 1
    
    def _probabilities(self, token: str) -> Tuple[float, float]:
        """P(단어|스팸)과 P(단어|햄)을 반환"""
        spam = self.token_spam_counts[token]
        ham = self.token_ham_counts[token]

        p_token_spam = (spam + self.k) / (self.spam_messages + 2 * self.k)
        p_token_ham = (ham + self.k) / (self.ham_messages + 2 * self.k)

        return p_token_spam, p_token_ham
        
    def predict(self, text: str) -> float:
        text_tokens = tokenize(text)
        log_prob_if_spam = 0.0
        log_prob_if_ham = 0.0

        # 모든 메세지 안의 각 단어를 순회한다.
        for token in self.tokens:
            prob_if_spam, prob_if_ham = self._probabilities(token)
            
            # 만약 *token*이 메세지에 나온다면
            # 단어가 등장할 로그 확률값을 더한다.
            if token in text_tokens:
                log_prob_if_spam += math.log(prob_if_spam)
                log_prob_if_ham += math.log(prob_if_ham)

            # 그게 아니라면 단어가 등장하지 않을 로그 확률을 더한다.
            # 이는 log(1 - 등장할 확률)이다.
            else:
                log_prob_if_spam += math.log(1.0 - prob_if_spam)
                log_prob_if_ham += math.log(1.0 - prob_if_ham)

        prob_if_spam = math.exp(log_prob_if_spam)
        prob_if_ham = math.exp(log_prob_if_ham)
        return prob_if_spam / (prob_if_spam + prob_if_ham)

In [8]:
from io import BytesIO
import requests
import tarfile
BASE_URL = 'https://spamassassin.apache.org/old/publiccorpus'
FILES = ["20021010_easy_ham.tar.bz2",
         "20021010_hard_ham.tar.bz2",
         "20021010_spam.tar.bz2"]

OUTPUT_DIR = '../DATA/spam_data'

for filename in FILES:
    # requests를 사용하여 각각의 URL에서 파일의 내용을 가져오자.
    content = requests.get(f"{BASE_URL}/{filename}").content

    fin = BytesIO(content)

    with tarfile.open(fileobj=fin, mode='r:bz2') as tf:
        tf.extractall(OUTPUT_DIR)

In [9]:
import glob, re

In [10]:
path = '../DATA/spam_data/*/*'

data: List[Message] = []

for filename in glob.glob(path):
    is_spam = 'ham' not in filename

    # 메일에 잘못된 문자가 들어 있는 경우가 있다. 
    with open(filename, errors='ignore') as email_file:
        for line in email_file:
            if line.startswith('Subject:'):
                subject = line.lstrip('Subject: ') # 왼쪽의 'Subject: ' 문자열 제거
                data.append(Message(subject, is_spam))
                break

In [11]:
import random
from scratch.machine_learning import split_data

In [12]:
random.seed(0)
train_messages, test_messages = split_data(data, 0.75)

model = NaiveBayesClassifier()
model.train(train_messages)

In [13]:
from collections import Counter

In [15]:
predictions = [(message, model.predict(message.text))
               for message in test_messages]

# 메세지가 스팸일 확률이 0.5보다 크면 스팸이라고 하자.
# 그리고 예측된 스팸 메세지가 실제 스팸인 경우를 세어 보자.

confusion_matrix = Counter((message.is_spam, spam_probability > 0.5)
                           for message, spam_probability in predictions)
print(confusion_matrix)

Counter({(False, False): 673, (True, True): 87, (True, False): 39, (False, True): 26})


In [16]:
def p_spam_given_token(token: str, model: NaiveBayesClassifier) -> float:
    prob_if_spam, prob_if_ham = model._probabilities(token)

    return prob_if_spam / (prob_if_spam + prob_if_ham)

In [17]:
words = sorted(model.tokens, key=lambda t: p_spam_given_token(t, model))

print('spammiest_words', words[-10:])
print('hammiest_words', words[:10])

spammiest_words ['assistance', 'zzzz', 'attn', '95', 'clearance', 'per', 'money', 'sale', 'systemworks', 'adv']
hammiest_words ['spambayes', 'users', 'razor', 'zzzzteana', 'sadev', 'apt', 'perl', 'ouch', 'spamassassin', 'bliss']
