In [1]:
import operator
import random
import numpy as np
import pandas as pd

In [2]:
def load_dataset():
    """
    获取数据集
    Returns:
        df(DataFrame) 数据集
    """
    column_names=['SepalLength', 'SepalWidth', 'PetalLength', 'PetalWidth', 'Species']
    df = pd.read_csv('Iris.txt', header=None, names=column_names)
    return df

In [3]:
def minmaxscaler(df):
    """
    线性归一化
    Attributes:
        df(DataFrame): 原始数据集
    Returns:
        new_df(DataFrame): 归一化处理后的数据集
    """
    new_df = pd.DataFrame(index = df.index)
    columns = ['SepalLength', 'SepalWidth', 'PetalLength', 'PetalWidth']
    for col in columns:
        data = df[col]
        MAX = data.max()
        MIN = data.min()
        new_df[col] = ((data - MIN) / (MAX - MIN)).tolist()
    new_df['Species'] = df['Species']
    return new_df

In [4]:
def train_test_split(dataset, proportion=0.7, random_seed=None):
    """
    分割数据集
    Attributes:
        dataset(DataFrame): 数据集
        proportion(Float): 训练集比例
        random_seed: 随机数种子
    Returns:
        train_data(list): 训练集
        test_data(lis): 测试集
    """
    train_data = []
    array = np.array(dataset)
    test_data = array.tolist()
    train_size = len(dataset) * proportion
    random.seed(random_seed)
    while len(train_data) < train_size:
        train_data_idx = random.randrange(len(test_data))
        train_data.append(test_data.pop(train_data_idx))
#     return train_data, test_data
    return pd.DataFrame(train_data, columns=dataset.columns), pd.DataFrame(test_data, columns=dataset.columns)

In [5]:
def kfold(dataset, k=10, random_seed=None):
    """
    k折交叉验证, 每次分割都会打乱整个数据集, 等同于给定shuffle=True时的sklearn中的KFold
    Attributes:
        dataset(DataFrame): 数据集
        k(int): 分割的块数
        random_seed: 随机数种子
    Returns:
        train: k-1折数据形成训练集
        test: 1折数据形成测试集
    """
    train_data = []
    array = np.array(dataset)
    test_data = array.tolist()
    train_size = int(len(dataset)/k)
    random.seed(random_seed)
    for i in range(k):
        data_basket = []
        if i!=k-1:
            while len(data_basket) < train_size:
                data_basket_idx = random.randrange(len(test_data))
                data_basket.append(test_data.pop(data_basket_idx))
        else:
            data_basket = test_data
        train_data.append(data_basket[:])
#     return train_data[0:k-1], train_data[k-1]
    train = train_data[0:k-1]
    tmp = np.array(train)
    tmp = tmp.reshape(tmp.shape[0]*tmp.shape[1], -1)
    train = tmp
    test = train_data[k-1]
    train = pd.DataFrame(train, columns=dataset.columns)
    test = pd.DataFrame(test, columns=dataset.columns)
    columns = ['SepalLength', 'SepalWidth', 'PetalLength', 'PetalWidth']
    train[columns] = train[columns].astype('float')
    return train, test

In [6]:
def KNeighborsClassifier(X_test, X_train, y_train, k=5, distance_type='Euclidean'):
    """
    kNN算法
    Attributes：
        X_test: 用于分类的输入向量
        X_train: 训练样本集
        y_train: 标签向量
        k: 超参数,用于选择最近邻居的数目
        distance_type: 超参数, 距离的计算方式--欧氏距离, 马尔科夫距离, 切比雪夫距离
    Returns:
        X_test: 所属的label
    """
    if distance_type == 'Euclidean':
        size = X_train.shape[0]
        diff_mat = np.tile(X_test, (size, 1)) - X_train
        sq_diff_mat = diff_mat**2
        sq_distances = sq_diff_mat.sum(axis=1)
        distances = sq_distances**0.5
    elif distance_type == 'Manhattan':
        size = X_train.shape[0]
        diff_mat = np.tile(X_test, (size, 1)) - X_train
        sq_diff_mat = abs(diff_mat)
        sq_distances = sq_diff_mat.sum(axis=1)
        distances = sq_distances
    elif distance_type == 'Chebyshev':
        size = X_train.shape[0]
        diff_mat = np.tile(X_test, (size, 1)) - X_train
        sq_diff_mat = abs(diff_mat)
        sq_distances = np.amax(sq_diff_mat, axis=1)
        distances = sq_distances
    else:
        return None
    sorted_distances_indicies = distances.argsort()
    class_count={}
    for i in range(k):
        vote_label = y_train[sorted_distances_indicies[i]]
        class_count[vote_label] = class_count.get(vote_label, 0) + 1
    sorted_class_count = sorted(class_count.items(), key=operator.itemgetter(1), reverse=True)
    y_score = {}
    columns = ['Iris-setosa', 'Iris-versicolor', 'Iris-virginica']
    for i in columns:
        if class_count.get(i) != None:
            y_score[i] = (float(class_count.get(i)) / k)
        else:
            y_score[i] = 0.
    return sorted_class_count[0][0], y_score

In [7]:
def train_model(train_data, test_data, KNN_k=5, distance_type='Euclidean'):
    """
    训练模型
    Attributes:
        train_data(DataFrame): 训练集
        test_data(DataFrame): 测试集
        KNN_k(int): 超参数, 选择最近邻居的数目
        distance_type: 超参数, 距离的计算方式--欧氏距离, 马尔科夫距离, 切比雪夫距离
    Returns:
        y_hat: 预测labels
        y_test: 测试集labels
        """
    X_train, y_train = train_data.iloc[:, 0:4], train_data.iloc[:, 4]
    X_test, y_test = test_data.iloc[:, 0:4], test_data.iloc[:, 4]
    y_hat = []
    y_scores = []
    for data in np.array(X_test).tolist():
        y, y_score = KNeighborsClassifier(data, X_train, y_train, k=KNN_k, distance_type=distance_type)
        y_hat.append(y)
        y_scores.append(y_score)
    y_test = np.array(y_test).tolist()
    return y_hat, y_test, y_scores

In [8]:
def kfold_train(train_data, KNN_k, k=5, distance_type='Euclidean'):
    """
    k折交叉验证进行模型调优
    Attributes:
        train_data(DataFrame): 训练集
        KNN_k(int): 超参数, 选择最近邻居的数目
        k(int): 交叉验证的折数
        distance_type: 超参数, 距离的计算方式--欧氏距离, 马尔科夫距离, 切比雪夫距离
    Returns:
        accuracy: 准确度
        """
    accuracy = 0.
    for i in range(k):
        train_data_kfold, test_data_kfold = kfold(dataset=train_data, k=k, random_seed=i*k)
        y_hat, y_test, _ = train_model(train_data_kfold, test_data_kfold, KNN_k=KNN_k, distance_type=distance_type)
        accuracy = accuracy + get_accuracy(y_hat, y_test)
    accuracy = accuracy / float(k)
    return accuracy


In [9]:
def get_accuracy(y_hat, y_test):
    correct = 0
    for i in range(len(y_hat)):
        if(y_hat[i] == y_test[i]):
            correct = correct + 1
    return correct / float(len(y_hat))

In [10]:
# 载入并归一化数据集
dataset = load_dataset()

In [11]:
# 查看是否有缺失值
dataset.isnull().sum()

SepalLength    0
SepalWidth     0
PetalLength    0
PetalWidth     0
Species        0
dtype: int64

In [12]:
# 归一化数据集
dataset = minmaxscaler(dataset)

In [13]:
# 分割训练集与测试集
train_data, test_data = train_test_split(dataset=dataset, proportion=0.8, random_seed=9)

In [14]:
# 模型调优,获取最佳的k值
best_k = 0
best_distance_type=""
best_accuracy = 0;
for i in range(3, 18):
    for distance_type in ['Euclidean', 'Manhattan', 'Chebyshev']:
        accuracy = kfold_train(train_data, KNN_k=i, k=10, distance_type=distance_type)
        if accuracy > best_accuracy:
            best_accuracy = accuracy
            best_k = i
            best_distance_type = distance_type
best_k, best_accuracy, best_distance_type

(7, 0.9833333333333332, 'Euclidean')

In [15]:
# 重新训练模型并进行预测
y_hat, y_test, y_scores = train_model(train_data, test_data, KNN_k=best_k, distance_type='Euclidean')
accuracy = get_accuracy(y_hat, y_test)

In [16]:
print("Accuracy: {:.2%}".format(accuracy))

Accuracy: 96.67%
