# Naive Bayes Classifier

## Data Loading

visualize the data in terms of bar graph

In [None]:
import requests
import zipfile

def download_dataset(url, filename):
    data = requests.get(url)
    with open(filename, 'wb') as code:
        code.write(data.content)
    print(f"Downloading the datasets {filename} complete.")
    data_file = zipfile.ZipFile(filename, 'r')
    data_list = data_file.namelist()

    for file in data_list:
        data_file.extract(file, 'd:/Desktop/AI-ML-methods/homework/assignment2/')
    data_file.close()
    print(f"Unzipping the datasets {filename} complete.")

url = 'http://www.nustm.cn/member/rxia/ml/data/Tsinghua.zip'
download_dataset(url, 'Tsinghua.zip')

## NultinomialNB

Multinomial Distribution Naive Bayes


In [None]:
import numpy as np
from collections import Counter
from sklearn.feature_extraction.text import CountVectorizer

class MultinomialNBC(object):

    def __init__(self, alpha=1.0):
        self.alpha = alpha
        self.class_prior_ = None
        self.feature_log_prob_ = None

    def fit(self, X, y):
        # Count occurrences of each term in each class
        count_vect = CountVectorizer()
        X_counts = count_vect.fit_transform(X)
        
        self.feature_names = count_vect.get_feature_names_out()
        self.classes = np.unique(y)
        n_classes = len(self.classes)
        self.class_counts = np.zeros(n_classes)
        self.feature_counts = np.zeros((n_classes, len(self.feature_names)))

        for i, cls_ in enumerate(self.classes):
            cls_indices = np.where(y == cls_)[0]
            self.class_counts[i] = len(cls_indices)
            self.feature_counts[i] = np.array(X_counts[cls_indices].sum(axis=0)).flatten()

        # Calculate class priors and feature probabilities
        self.class_prior_ = self.class_counts / np.sum(self.class_counts)
        self.feature_log_prob_ = np.log((self.feature_counts + self.alpha) /
                                        (np.sum(self.feature_counts, axis=1, keepdims=True) +
                                         self.alpha * len(self.feature_names)))

    def predict(self, X):
        # Transform input documents into counts
        count_vect = CountVectorizer(vocabulary=self.feature_names)
        X_counts = count_vect.fit_transform(X)

        # Calculate log likelihood of each class for each document
        log_likelihood = np.dot(X_counts, self.feature_log_prob_.T) + np.log(self.class_prior_)

        # Predict the class with the highest log likelihood
        return self.classes[np.argmax(log_likelihood, axis=1)]

    

## BernoulliNB

Multi-variate Bernoulli Distribution Naive Bayes

In [None]:
class BernoulliNBC(object):
    def __init__(self, alpha=1.0):
        self.alpha = alpha
        self.class_prior_ = None
        self.feature_log_prob_ = None

    def fit(self, X, y):
        # Transform input documents into binary vectors
        count_vect = CountVectorizer(binary=True)
        X_bin = count_vect.fit_transform(X)
        self.feature_names = count_vect.get_feature_names_out()
        self.classes = np.unique(y)
        n_classes = len(self.classes)
        self.class_counts = np.zeros(n_classes)
        self.feature_counts = np.zeros((n_classes, len(self.feature_names)))

        for i, cls_ in enumerate(self.classes):
            cls_indices = np.where(y == cls_)[0]
            self.class_counts[i] = len(cls_indices)
            self.feature_counts[i] = np.array(X_bin[cls_indices].sum(axis=0)).flatten()

        # Calculate class priors and feature probabilities
        self.class_prior_ = self.class_counts / np.sum(self.class_counts)
        self.feature_log_prob_ = np.log((self.feature_counts + self.alpha) /
                                        (self.class_counts.reshape(-1, 1) + self.alpha * 2))

    def predict(self, X):
        # Transform input documents into binary vectors
        count_vect = CountVectorizer(binary=True, vocabulary=self.feature_names)
        X_bin = count_vect.fit_transform(X)

        # Calculate log likelihood of each class for each document
        log_likelihood = np.dot(X_bin, self.feature_log_prob_.T) + np.log(self.class_prior_)

        # Predict the class with the highest log likelihood
        return self.classes[np.argmax(log_likelihood, axis=1)]
    

## Dataset Preprocession


+ 加载数据集： 从原始数据集中加载文本数据。
+ 分词处理： 使用分词工具（如 jieba）对文本数据进行分词处理，并过滤停用词。
+ 划分训练集和测试集： 将分词处理后的数据集划分为训练集和测试集。
+ 保存训练集和测试集： 将划分后的训练集和测试集保存到指定的目录中。

In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict

# 停词处理
def segment_text(text, stopwords):
    for j in range(text.shape[0]):
        for word in stopwords:
            if word in text[j]:
                text[j] = text[j].replace(word, '')
    # print(f"info of text: {type(text)}, {text.shape}, 
    #       info of stopwords: {type(stopwords)}, {len(stopwords)}")
    return text

# 加载停用词列表
def load_stopwords(stopwords_file):
    with open(stopwords_file, 'r', encoding='utf-8') as f:
        stopwords = [line.strip() for line in f]
    return set(stopwords)

# 加载数据集
label_mapping = defaultdict(int) # label mapping dictionary

def load_data(data_dir, stopwords_file):
    X = []
    X_total = []
    y = []

    # Mapping to convert labels to integers
    stopwords = load_stopwords(stopwords_file)
    categories = os.listdir(data_dir)
    global label_mapping

    for i, category in enumerate(categories):
        # 获取文件名作为label, label不在字典 label_mapping 中时，将其添加字典中
        label = os.path.splitext(category)[0]     
        if label not in label_mapping:
            label_mapping[label] = len(label_mapping)
        
        file_path = os.path.join(data_dir, category)
        with open(file_path, 'r', encoding='utf-8') as file:
            # 读取文件中的每一行，并将其存储为 NumPy 数组
            text = np.array(file.read().splitlines())
        # 移除停用词
        text = segment_text(text, stopwords)

        _b1 = text != '<text>' 
        _b2 = text != '</text>'
        # text 数组的对应元素同时满足 _b1 和 _b2 的条件
        _b = np.all(np.c_[_b1, _b2], axis=1)
        # print(f"info of skiping </text>: {type(_b)}, {_b.shape}")
        text = text[_b]
        # print(text) # 经过处理后的文本 numpy数组

        for sentence in text:
            # 一个句子 sentence 按照空格进行分割，
            words = sentence.split()
            X_total += words
            X.append(words)
            # y.append(label_mapping[label])
        y += [label_mapping[label]] * len(text)
        # print(len(X), len(y), len(X_total))
    return X, y


In [None]:

path_to_train_data = '../Tsinghua/train/'
path_to_test_data = '../Tsinghua/test/'
stopwords_file = '../Tsinghua/stop_words_zh.txt'

X_train, y_train = load_data(path_to_train_data, stopwords_file)
X_test, y_test = load_data(path_to_test_data, stopwords_file)

print(type(X_train), len(X_train))
print(type(y_train), len(y_train))
print(len(X_test))
print(len(y_test))

In [None]:
def create_word_vector(X, label_mapping, method):
    assert method in ['bernulli', 'multinomial'], 'Method must be either "bernulli" or "multinomial"'
    
    # Shape of word_matrix: (number of sentences, number of labels)
    word_matrix = np.zeros((len(X), len(label_mapping)))
    for i, sentence in enumerate(X):
        for word in sentence:
            if word in label_mapping.keys():
                if method == 'bernulli':
                    word_matrix[i, label_mapping[word]] = 1 
                elif method == 'multinomial':
                    word_matrix[i, label_mapping[word]] += 1
    return word_matrix

word_matrix_tb = create_word_vector(X_train, label_mapping, 'bernulli')
word_matrix_tm = create_word_vector(X_train, label_mapping, 'multinomial')
word_matrix_pb = create_word_vector(X_test, label_mapping, 'bernulli')
word_matrix_pm = create_word_vector(X_test, label_mapping, 'multinomial')