In [1]:
import re
import numpy as np
import random

In [2]:
def file_to_list(filename='./SMSSpamCollection.txt'):
    """
    将.txt文件内容转换成列表
    Args:
        filename: 文件路径

    Returns:
        class_list: 邮件类别
        conten_list: 邮件内容
    """
    fr = open(filename)
    array_of_lines = fr.readlines()
    class_list = [] # 存放每封邮件的类别--spam or ham
    content_list = [] # 存放每封邮件的内容
    token_list = [] # 存放处理后的内容所划分出的词条
    for line in array_of_lines: # 取出每封邮件的类别与内容
        content = line.strip('\n').split('\t')
        # class_list.append(content[0])
        if content[0] == 'ham':
            class_list.append(0)
        elif content[0] == 'spam':
            class_list.append(1)
        else:
            class_list.append(-1)
        del(content[0])
        content_list.extend(content)
    for content in content_list: # 划分词条
        token_list.append(content_to_token(content))
    return class_list, token_list

In [3]:
def content_to_token(content):
    """
    使用正则表达式, 将邮件内容划分成若干个词条, 用列表进行存储
    Args:
        content: 邮件内容

    Returns:
        token: 词条
    """
    word_list = re.split('\W', content) # 正则表达式'\W': 匹配字母, 数字, 下划线或汉字, 等价于'[^A-Za-z0-9_]'
    token = [word.lower() for word in word_list if len(word) > 0] # 去除掉长度小于等于2的字符串,如空格,"a"等
    return token

In [4]:
def create_vocab_list(dataset):
    """
    创建词汇表, 存储了训练集所有出现过的词汇
    Args:
        dataset: 数据集

    Returns:
        list(vocab_set): 词汇表
    """
    vocab_set = set([])
    for document in dataset:
        vocab_set = vocab_set | set(document)
    return list(vocab_set)

In [5]:
def token_to_word_vector_euqal_weight(vocab_list, token):
    """
    词集模型
    """
    word_vector = [0] * len(vocab_list)
    for word in token:
        if word in vocab_list:
            word_vector[vocab_list.index(word)] = 1
    return word_vector

In [6]:
def token_to_word_vector(vocab_list, token):
    """
    词袋模型
    """
    word_vector = [0] * len(vocab_list)
    for word in token:
        if word in vocab_list:
            word_vector[vocab_list.index(word)] += 1
    return word_vector

In [7]:
def train_NB(train_matrix, train_category):
    num_train = len(train_matrix)
    num_words = len(train_matrix[0])
    pAbusive = sum(train_category) / float(num_train)
    # p0_num = np.zeros(num_words)
    # p1_num = np.zeros(num_words)
    # p0_denom = 0.
    # p1_denom = 0.
    # laplace平滑, 避免除0异常
    p0_num = np.ones(num_words)
    p1_num = np.ones(num_words)
    # p0_denom = 2.
    # p1_denom = 2.
    p0_denom = float(num_words)
    p1_denom = float(num_words)
    for i in range(num_train):
        if train_category[i] == 1: # spam
            p1_num += train_matrix[i]
            p1_denom += sum(train_matrix[i])
        else:
            p0_num += train_matrix[i]
            p0_denom += sum(train_matrix[i])
    # p0_vector = p0_num / p0_denom
    # p1_vecotr = p1_num / p1_denom
    # 避免下溢出
    p0_vector = np.log(p0_num / p0_denom) # 后验概率
    p1_vecotr = np.log(p1_num / p1_denom) # 后验概率
    return p0_vector, p1_vecotr, pAbusive

In [8]:
def train_test_split(class_list, word_vector_list, proportion=0.7, random_seed=None):
    train_data = []
    train_label = []
    test_data = word_vector_list
    test_label = class_list
    train_size = len(word_vector_list) * proportion
    random.seed(random_seed)
    while len(train_data) < train_size:
        train_data_idx = random.randrange(len(test_data))
        train_data.append(test_data.pop(train_data_idx))
        train_label.append(test_label.pop(train_data_idx))
    return train_data, train_label, test_data, test_label

In [9]:
def classify_NB(word_vector, p0_vector, p1_vector, pAbusive):
    p1 = sum(word_vector * p1_vector) + np.log(pAbusive)
    p0 = sum(word_vector * p0_vector) + np.log(1. - pAbusive)
    prob = []
    prob.append(p0)
    prob.append(p1)
    prob = np.array(prob)
    if p1 > p0:
        return 1, prob
    else:
        return 0, prob

In [10]:
def testing_NB(test_data, test_category, p0_vector, p1_vector, pAbusive):
    pred = []
    score = []
    for i in range(len(test_data)):
        pred_tmp, score_tmp = classify_NB(test_data[i], p0_vector, p1_vector, pAbusive)
        pred.append(pred_tmp)
        score.append(score_tmp)
    return pred, score

In [11]:
def calculate_metrics(pred, test_category):
    # spam--1, ham--0
    true_positive = 0.
    false_positive = 0.
    false_negative = 0.
    true_negative = 0.
    for i in range(len(pred)):
        if pred[i] == test_category[i]:
            if pred[i] == 1:
                true_positive += 1.
            else:
                true_negative += 1.
        else:
            if pred[i] == 1:
                false_positive += 1.
            else:
                false_negative += 1.
    accuracy = (true_positive + true_negative) / len(pred)
    precision = (true_positive) / (true_positive + false_positive)
    recall = (true_positive) / (true_positive + false_negative)
    f1 = 2 * precision * recall / (precision + recall)
    return accuracy, precision, recall, f1

In [12]:
class_list, token_list = file_to_list()

In [13]:
# 所有单词构成的集合
vocab_list = create_vocab_list(token_list)

In [14]:
# 多项式
word_vector_list = []
for token in token_list:
    word_vector_list.append(token_to_word_vector(vocab_list, token))
train_data, train_label, test_data, test_label = train_test_split(class_list, word_vector_list, random_seed=1)

In [15]:
# # 伯努利模型
# word_vector_list = []
# for token in token_list:
#     word_vector_list.append(token_to_word_vector_euqal_weight(vocab_list, token))
# train_data, train_label, test_data, test_label = train_test_split(class_list, word_vector_list, random_seed=4)

In [16]:
# # 混合模型
# train_data_tmp, train_label, test_data_tmp, test_label = train_test_split(class_list, token_list, random_seed=4)
# train_data = []
# test_data = []
# for train_token in train_data_tmp:
#     train_data.append(token_to_word_vector(vocab_list, train_token))
# for test_token in test_data_tmp:
#     test_data.append(token_to_word_vector_euqal_weight(vocab_list, test_token))

In [17]:
# 训练模型
p0_vector, p1_vector, pAbusive = train_NB(train_data, train_label)

In [18]:
# 预测
pred, scores = testing_NB(test_data, test_label, p0_vector, p1_vector, pAbusive)

In [19]:
accuracy, precision, recall, f1 = calculate_metrics(pred, test_label)
print('Accuracy:{:.3}, Precison:{:.3}, Recall:{:.3}, F1:{:.3}'.format(accuracy, precision, recall, f1))

Accuracy:0.98, Precison:0.926, Recall:0.938, F1:0.932
