# 基于Gzip和kNN的文本分类
## 简介
### 一 项目介绍
#### a 项目描述
该项目源于ACL2023的一篇Findings论文《"Low-Resource" Text Classification: A Parameter-Free Classification Method with Compressors》。\
论文地址：https://aclanthology.org/2023.findings-acl.426.pdf
官方代码：https://github.com/bazingagin/npc_gzip
该论文一经发表就引发了热议。文中表示，在没有任何训练参数的情况下，该方法的分类结果能够与多个基于DNN的方法相媲美，并在五个分布外数据集上胜过了包括BERT在内的所有方法。
作为一种DNN的轻量级替代方法，它使用无损压缩器（如gzip）和kNN相结合的方法实现文本分类的任务。该方法利用压缩长度来近似柯氏复杂性（Kolmogorov complexity），基于归一化压缩距离（Normalized Compression Distance, NCD）度量文本之间的相似度。
#### b 适学人群
大一年级及以上，算法初学者，并且有志于从事机器学习、自然语言处理相关岗位的学生。
#### c 课程基础
掌握Python语言，了解kNN算法。
### 二 任务介绍
阅读了解并学习掌握如何使用gzip和kNN算法来进行文本分类。
### 三 数据集介绍
AG_NEWS数据集中包含了4个类别的新闻文本数据集，一共有12万条训练样本和7600条测试样本。\
torchtext中的datasets模块提供了AG_NEWS的数据集获取方法。\
可以在https://huggingface.co/datasets/ag_news预览该数据集。
### 四 学习目标
通过本项目可以学习到torchtext的数据集加载、gzip的使用方法以及kNN算法的实现。
### 五 运行建议
无

## 1.导入工具包

In [1]:
from torchtext.datasets import AG_NEWS
import os
from collections import defaultdict
import numpy as np
import gzip
from tqdm import tqdm
import operator

## 2.设置默认值

In [2]:
# 中途输出的文件会存在runtime文件夹下
output_dir = './runtime'

# 下载的数据会存在dataset文件夹下
data_dir = './dataset'

# 训练集和测试集每个类别随机采样的数量
num_train = 1000
num_test = 10

# 类别个数
num_classes = 4

# 记录所采样本id的文件位置
train_idx_fn = os.path.join(output_dir, 'train_indicies_{}per_class'.format(num_train))
test_idx_fn = os.path.join(output_dir, 'test_indicies_{}per_class'.format(num_test))

In [3]:
# 从数据集ds中的每个类别随机加载n个数据到output_fn文件地址
def pick_samples(ds, n, output_fn):
    
    # 获取每个类别分别对应的样本和样本id
    label2text = defaultdict(list)
    label2idx = defaultdict(list)
    for i, (label, text) in enumerate(ds):
    label2text[label].append(text)
    label2idx[label].append(i)
    
    # 获取每个类别的样本数量
    class2count = {}
    for cl in label2text:
        class2count[cl] = len(label2text[cl])
    
    # 在每个类别中随机选取n个样本，并记录所选样本的id
    result = []
    labels = []
    recorded_idx = []
    for c in class2count:
        select_idx = np.random.choice(class2count[c], size=n, replace=False)
        select_text = np.array(label2text[c])[select_idx]
        select_text_idx = np.array(label2idx[c])[select_idx]
        recorded_idx+=list(select_text_idx)
        result+=list(select_text)
        labels+=[c]*n
    
    # 将所选取样本的id存储到文件中
    if output_fn is not None:
        np.save(output_fn, np.array(recorded_idx))
    
    return result, labels

In [None]:
# 加载全部数据（后续计算距离用于推断的时间过长）
# def load_all(ds):
#     result = []
#     labels = []
#     for i, (label, line) in enumerate(ds):
#         result.append(line)
#         labels.append(label)
#     return result, labels
# train_data, train_labels = load_all(dataset_pair[0])
# test_data, test_labels = load_all(dataset_pair[1])

## 3.加载数据集

In [5]:
dataset_pair = AG_NEWS(root=data_dir)
train_data, train_labels = pick_samples(dataset_pair[0], num_train, train_idx_fn)
test_data, test_labels = pick_samples(dataset_pair[1], num_test, test_idx_fn)

## 4. 计算文本之间距离
这里采用归一化压缩距离（NCD）作为距离指标。\
NCD的计算公式如下：
$$NCD(x,y) = \frac{C(xy)-{\rm min}\{C(x),C(y)\}}{{\rm max}\{C(x),C(y)\}}$$
其中$C(x)$为文本$x$的压缩后长度，$C(xy)$将为文本$x$和文本$y$连接后进行压缩的长度。

In [6]:
def NCD(x, y):
    c_x = len(gzip.compress(x.encode('utf-8')))
    c_y = len(gzip.compress(y.encode('utf-8')))s
    c_xy = len(gzip.compress((c_x+' '+c_y).encode('utf-8')))
    distance = (c_xy-min(c_x,c_y))/max(c_x, c_y)
    return distance

In [8]:
def cal_dist_single_multi(t1,t2_list):
    distances = []
    for j, t2 in enumerate(t2_list):
        distance = NCD(t1, t2)
        distances.append(distance)
    return distances

In [9]:
def cal_dist(test_data, train_data): 
    dis_matrix = []
    for _, t in tqdm(enumerate(test_data)):
        distances = cal_dist_single_multi(t, train_data)
        dis_matrix.append(distances)
    return dis_matrix

In [10]:
dis = cal_dist(test_data, train_data)
np.save(output_dir+'/dis.npy', np.array(dis))

40it [00:10,  3.79it/s]


In [None]:
def knn(dis, train_labels, k):
    preds = []
    for i in range(len(dis)):
        # 将第i个测试样本的距离列表排序，得到距离最近的k个训练样本
        sorted_idx = np.argsort(np.array(dis[i])) # 按距离排序
        pred_labels = defaultdict(int)
        # 统计前k个样本中的标签出现次数
        for j in range(k):
            pred_l = train_labels[sorted_idx[j]]
            pred_labels[pred_l] += 1
        sorted_pred_lab = sorted(pred_labels.items(), key=operator.itemgetter(1), reverse=True) # 按出现次数排序
        most_label = sorted_pred_lab[0][0]
        preds.append(most_label)
    return preds

In [11]:
def cal_acc(dis, test_labels, train_labels):
    compare_label = train_labels
    start = 0
    end = num_classes
    correct = []
    pred = []
    for i in range(len(dis)):
        sorted_idx = np.argsort(np.array(dis[i]))
        pred_labels = defaultdict(int)
        for j in range(start, end):
            pred_l = compare_label[sorted_idx[j]]
            pred_labels[pred_l] += 1
        sorted_pred_lab = sorted(pred_labels.items(), key=operator.itemgetter(1), reverse=True)
        most_count = sorted_pred_lab[0][1]
        if_right = 0
        most_label = sorted_pred_lab[0][0]
        most_voted_labels = []
        for pair in sorted_pred_lab:
            if pair[1] < most_count:
                break
            if pair[0] == test_labels[i]:
                if_right = 1
                most_label = pair[0]
            else:
                most_voted_labels.append(pair[0])
        pred.append(most_label)
        correct.append(if_right)
    print("Accuracy is {}".format(sum(correct)/len(correct)))
    return pred, correct

In [12]:
pred, correct = cal_acc(dis,test_labels, train_labels)

Accuracy is 0.95
