In [30]:
import numpy as np
import csv
from collections import OrderedDict
import copy
import pandas as pd
from time import time
from typing import Callable
from typing import Any
from typing import Dict, Tuple, List

In [31]:
t = time()

In [41]:
def readFile(filen: str) -> List[List]:
    '''
    读取文件内容
    由于首先需要获取文章数量和单词向量长度，才能计算TF矩阵
    因此要对文本内容进行两次遍历，为了避免两次读取磁盘文件，故先将文本内容保存到内存中的一个list
    '''
    fdata = []
    with open(filen) as fd:
        reader = csv.reader(fd, delimiter=' ')
        fdata = list(list(row) for row in reader)
    return fdata

In [42]:
def getTFIDF(fdata: List[List]) -> np.array:
    '''
    获取TF-IDF矩阵
    '''
    #首先获取文章数和单词向量
    #使用OrderedDict按单词出现的顺序生成单词列表
    #相比于使用list，好处在于每次判断word是否已经加入单词向量是log(n)复杂度
    word_dict = OrderedDict() 
    #文章数
    D = 0
    for row in fdata:
        D += 1
        for word in row:
            if not word in word_dict:
                word_dict[word] = 1
            else:
                word_dict[word] += 1
    #word_vec是单词向量
    word_vec = word_dict.keys()
    #word_order的键值是当前单词的序号，在生成TF矩阵时会用到
    word_order = dict(zip(word_vec,range(len(word_vec))))
    #生成TF矩阵
    TF = np.zeros((D,len(word_dict)))
    for i,row in enumerate(fdata):
        for word in row:
            TF[i][word_order[word]] += 1
        #每个文章中单词出现次数归一化
        TF[i] /= len(fdata[i])
    #生成IDF矩阵
    IDF = np.log2(D / (1 + np.array(list(word_dict.values()))))
    #生成TF-IDF矩阵
    TF_IDF = TF @ IDF
    return TF_IDF

In [43]:

#TF_IDF
print(time()-t)

41.62517595291138


In [44]:
def DisN(vec1: np.array, vec2: np.array, N: Any) -> float:
    '''
    计算N-norm
    '''
    if(N < 1):
        raise ValueError("norm should be a positive integer or np.inf")
    if np.isinf(N):
        return np.max(np.fabs(vec1 - vec2))
    else:
        return np.power(np.sum(np.power(vec1 - vec2, N)), 1.0/N)

Dis2 = lambda v1, v2: DisN(v1, v2, 2)

In [45]:
def DisInvNormAvg(distances: np.array, Y: np.array) -> np.array:
    '''
    将距离倒数归一化，返回均值
    '''
    for idx, dis in enumerate(distances):
        print(idx, dis)
        if dis == 0:
            return Y[idx]
    distances = np.array(1.0) / distances
    s = np.sum(distances)
    distances = distances / s
    print(distances)
    print(Y)
    tmp = np.diag(distances) @ Y
    print(tmp)
    if len(tmp.shape) is 1:
        return tmp
    else:
        return np.sum(tmp,  axis = (0))

In [51]:
def KNN(trainSet: Tuple[np.array, np.array],
        testVec: np.array,
        DisFunc: Callable[[np.array, np.array], float],
        K: int,
        WeightFunc: Callable[[np.array, np.array], float]) -> np.array: 
    '''
    一个通用的KNN接口
    trainSet: 二元元组，第一个元素是训练集的X，第二个是Y
    testVec: 待预测向量
    DisFunc: 距离函数
    K: K值
    WeightFunc: 依据第一个参数list<距离>,对第二个参数list<Y值>进行加权，返回预测值
    '''
    #对于多个要预测的值，逐一预测
    if len(testVec.shape) > 1:
        return [KNN(trainSet, vec, DisFunc, K, WeightFunc) for vec in testVec]
    else:
        #测量待预测向量到训练集中每个向量的距离
        #distances是一个list<tuple(index, distance)>
        distances = list(enumerate(map(lambda trainVec: DisFunc(trainVec, testVec), trainSet[0])))
        print(distances)
        #依据距离从小到大排序
        distances.sort(key=lambda t: t[1])
        #获取最临近的K个训练样本的下标和对应的距离，输出值
        tmp = list(zip(*distances[:K]))
        kNearIdx = list(tmp[0])
        kNearDis = list(tmp[1])
        kNearY   = trainSet[1][kNearIdx, :]
        #对输出值根据距离加权作为预测输出
        return [WeightFunc(kNearDis, kNearY)]

In [52]:
#trainX = np.array([[10,2],[2,3],[3,5]])
#trainY = np.array([[1,1,1], [2,2,3], [3,3,5]])
xfilen = 'lab1_data/X.txt'
yfilen = 'lab1_data/Y.txt'
xdata = readFile(xfilen)
ydata = readFile(yfilen)
x_tfidf = getTFIDF(xdata)

KNN((trainX, trainY), 
    np.array([3,3]),
    Dis2,
    1,
    DisInvNormAvg)

[(0, 7.0710678118654755), (1, 1.0), (2, 2.0)]
0 1.0
[1.]
[[2 2 3]]
[[2. 2. 3.]]


[array([2., 2., 3.])]