<a href="https://colab.research.google.com/github/LeeSeungYun1020/Introduction_To_Data_Science/blob/master/classroom/Naive_Bayes_Spam_Filter.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 스팸 필터

## 나이브 베이즈

1. 학습 데이터에 대한 Data Class를 만든다.
2. 문장을 단어로 분리하는 함수를 만든다.
3. 나이브 베이즈로 분류하는 Class를 만든다.
  - 클래스는 훈련, 예측 메소드를 제공한다.
  - smoothing factor와 단어 집합 및 각 단어별 스팸/햄 count를 attribute로 가진다.

In [47]:
from typing import NamedTuple, Set, List, Tuple, Dict, Iterable, TypeVar
from collections import defaultdict
import re
import math

In [3]:
class Message(NamedTuple):
  text: str
  is_spam: bool = False

In [4]:
def tokenize(text: str) -> Set[str]:
  return set(re.findall("[a-z0-9]+", text.lower()))

tokenize("DAta sciEnce is 'science'?")

{'data', 'is', 'science'}

In [41]:
class NaiveBayesClassifier:
  def __init__(self, k:float = 0.5) -> None:
    self.k = k
    self.tokens: Set[str] = set()
    self.spam_tokens = defaultdict(int)
    self.ham_tokens = defaultdict(int)
    self.spam_count = 0
    self.ham_count = 0

  def train(self, messages: Iterable[Message]) -> None:
    for message in messages:
      if message.is_spam:
        self.spam_count += 1
      else:
        self.ham_count += 1

      for word in tokenize(message.text):
        self.tokens.add(word)
        if message.is_spam:
          self.spam_tokens[word] += 1
        else:
          self.ham_tokens[word] += 1

  def _probabilities(self, token: str) -> Tuple[float, float]:
    pt_spam = (self.spam_tokens[token] + self.k) / (self.spam_count + 2 * self.k)
    pt_ham = (self.ham_tokens[token] + self.k) / (self.ham_count + 2 * self.k)
    return pt_spam, pt_ham

  def predict(self, text: str, printDetail: bool = False):
    text_tokens = tokenize(text)
    log_spam = 0.0
    log_ham = 0.0
    t_spam = []
    t_ham = []

    for token in self.tokens:
      pt_spam, pt_ham = self._probabilities(token)
      if token in text_tokens:
        log_spam += math.log(pt_spam)
        log_ham += math.log(pt_ham)
        t_spam.append(pt_spam)
        t_ham.append(pt_ham)
      else:
        log_spam += math.log(1.0 - pt_spam)
        log_ham += math.log(1.0 - pt_ham)
        t_spam.append(1.0 - pt_spam)
        t_ham.append(1.0 - pt_ham)
    if printDetail:
      print(t_spam)
      print(t_ham)
    p_spam = math.exp(log_spam)
    p_ham = math.exp(log_ham)
    return p_spam / (p_spam + p_ham)


## 유닛 테스트

앞에서 만든 나이브 베이즈 Class를 Test한다.

In [43]:
test_messages = [
                 Message("Is this spam?", False),
                 Message("This is ham", True),
                 Message("delicious spam", False)
]

model = NaiveBayesClassifier()
model.train(test_messages)

assert model.tokens == {"is", "this", "spam", "ham", "delicious"}
assert model.spam_count == 1
assert model.ham_count == 2
assert model.spam_tokens == {"this": 1, "is": 1, "ham": 1}
assert model.ham_tokens == {"this": 1, "is": 1, "spam": 2, "delicious": 1}

text = "This is delicious ham!"

probs_if_spam = [
  (1 + 0.5) / (1 + 2 * 0.5),      # is
  (1 + 0.5) / (1 + 2 * 0.5),      # this
  1 - (0 + 0.5) / (1 + 2 * 0.5),  # spam
  (1 + 0.5) / (1 + 2 * 0.5),      # ham
  (0 + 0.5) / (1 + 2 * 0.5)       # delicious
]

probs_if_ham = [
  (1 + 0.5) / (2 + 2 * 0.5),      # is
  (1 + 0.5) / (2 + 2 * 0.5),      # this
  1 - (2 + 0.5) / (2 + 2 * 0.5),  # spam
  (0 + 0.5) / (2 + 2 * 0.5),      # ham
  (1 + 0.5) / (2 + 2 * 0.5)       # delicious
]

print(probs_if_spam)
print(probs_if_ham)

p_if_spam = math.exp(sum(math.log(p) for p in probs_if_spam))
p_if_ham = math.exp(sum(math.log(p) for p in probs_if_ham))

assert model.predict(text, printDetail= True) == p_if_spam / (p_if_spam + p_if_ham)
print(model.predict(text))
print(p_if_spam / (p_if_spam + p_if_ham))

[0.75, 0.75, 0.75, 0.75, 0.25]
[0.5, 0.5, 0.16666666666666663, 0.16666666666666666, 0.5]
[0.75, 0.75, 0.75, 0.75, 0.25]
[0.5, 0.16666666666666663, 0.16666666666666666, 0.5, 0.5]
0.9579500657030223
0.9579500657030223


## 학습

1. 스팸 메일 데이터를 불러와서 확인 전처리한다.
2. 나이브 베이즈 Class에 적합한 형태로 데이터를 준비한다.
3. 데이터에 나이브 베이즈 알고리즘을 적용한다.

In [48]:
from io import BytesIO
import requests
import tarfile
import random, glob, re

In [45]:
URL = "https://spamassassin.apache.org/old/publiccorpus"
FILES = ["20021010_easy_ham.tar.bz2",
         "20021010_hard_ham.tar.bz2",
         "20021010_spam.tar.bz2"]
OUTPUT_DIR = 'spam_data'

In [46]:
for filename in FILES:
  content = requests.get(f"{URL}/{filename}").content
  fin = BytesIO(content)
  with tarfile.open(fileobj=fin, mode='r:bz2') as tf:
    tf.extractall(OUTPUT_DIR)

In [55]:
path = '/content/spam_data/*/*'
path_spam = '/content/spam_data/hard_ham/*'
path_ham = '/content/spam_data/spam/*'

with open(glob.glob(path_spam)[0], errors='ignore') as spam_file:
  print(spam_file.read())

Return-Path: <bounce-lgweb-2534373@sprocket.lockergnome.com>
Received: from lockergnome.com (sprocket.lockergnome.com [130.94.96.247])
	by dogma.slashnull.org (8.11.6/8.11.6) with SMTP id g6CJ0fJ25750
	for <qqqqqqqqqq-lg@example.com>; Fri, 12 Jul 2002 20:00:41 +0100
X-Mailer: ListManager Web Interface
Date: Fri, 12 Jul 2002 11:39:07 -0500
Subject: [Lockergnome Webmaster Weekly]  Compatible Whistles
To: qqqqqqqqqq-lg@example.com
From: Lockergnome Webmaster Weekly <subscriptions@lockergnome.com>
List-Unsubscribe: <mailto:leave-lgweb-2534373J@sprocket.lockergnome.com>
List-Subscribe: <mailto:subscribe-lgweb@sprocket.lockergnome.com>
List-Owner: <mailto:owner-lgweb@sprocket.lockergnome.com>
X-URL: <http://www.lockergnome.com/>
X-List-Host: Lockergnome <http://www.lockergnome.com/>
Reply-To: leave-lgweb-2534373J@sprocket.lockergnome.com
Sender: bounce-lgweb-2534373@sprocket.lockergnome.com
Message-Id: <LISTMANAGERSQL-2534373-1674170-2002.07.12-11.41.03--qqqqqqqqqq-lg#example.com@sprocket.lo

In [56]:
with open(glob.glob(path_ham)[0], errors='ignore') as ham_file:
  print(ham_file.read())

From KV4nWPbovS0LLVR@sky.seed.net.tw  Sat Sep  7 22:05:37 2002
Return-Path: <KV4nWPbovS0LLVR@sky.seed.net.tw>
Delivered-To: zzzz@localhost.jmason.org
Received: from localhost (jalapeno [127.0.0.1])
	by zzzzason.org (Postfix) with ESMTP id E909E16FA5
	for <zzzz@localhost>; Sat,  7 Sep 2002 21:57:32 +0100 (IST)
Received: from jalapeno [127.0.0.1]
	by localhost with IMAP (fetchmail-5.9.0)
	for zzzz@localhost (single-drop); Sat, 07 Sep 2002 21:57:32 +0100 (IST)
Received: from john000 ([202.64.208.252]) by dogma.slashnull.org
    (8.11.6/8.11.6) with SMTP id g87010C28948 for <webmaster@efi.ie>;
    Sat, 7 Sep 2002 01:01:00 +0100
Date: Sat, 7 Sep 2002 01:01:00 +0100
Received: from titan by ara.seed.net.tw with SMTP id Hj5t3pNT4kz68k;
    Sat, 07 Sep 2002 07:51:45 +0800
Message-Id: <YOxIduD@iris.seed.net.tw>
From: marketing@securepro.com.hk
To: AGRICULTURE@dogma.slashnull.org, SPAIN.TXT@dogma.slashnull.org,
	SWITZERLAND.TXT@dogma.slashnull.org, TAIWAN.TXT@dogma.slashnull.org,
	UNITED.KINGDOM.

In [58]:
data: List[Message] = []
for filename in glob.glob(path):
  is_spam = "ham" not in filename
  with open(filename, errors='ignore') as email:
    for line in email:
      if line.startswith("Subject:"):
        subject = line.lstrip("Subject: ")
        data.append(Message(subject, is_spam))
        break # subject 부분만 분석 아래 파트는 분석 안함

In [60]:
T = TypeVar("T")
def split_data(data: List[T], p: float) -> Tuple[List[T], List[T]]:
  #data = data[:]
  random.shuffle(data)
  cut = int(len(data) * p)
  return data[:cut], data[cut:]

In [69]:
random.seed(0)
train, test = split_data(data, 0.75)
print(len(train))
print(len(test))

2475
825


In [70]:
model = NaiveBayesClassifier()
model.train(train)

## 평가

1. 적용한 알고리즘을 평가한다.
2. 개선할 점이 있는지 확인한다.

In [72]:
from collections import Counter

predictions = [
               (message, model.predict(message.text))
               for message in test
]

In [74]:
confusion_matrix = Counter((message.is_spam, spam_p > 0.5)
for message, spam_p in predictions
)

print(confusion_matrix)

Counter({(False, False): 664, (True, True): 89, (True, False): 43, (False, True): 29})


In [77]:
def get_spam_p(token: str, model: NaiveBayesClassifier) -> float:
  p_spam, p_ham = model._probabilities(token)
  return p_spam / (p_spam + p_ham)

words = sorted(model.tokens, key=lambda t: get_spam_p(t, model))
print(words[-10:])
print(words[:10])

['needed', 'account', 'norton', 'attn', 'assistance', 'money', 'clearance', 'sale', 'adv', 'systemworks']
['spambayes', 'users', 'razor', 'sadev', 'zzzzteana', 'apt', 'perl', 'spamassassin', 'ouch', 'bliss']
