# In Depth: k-Nearest Neighbors

k最近鄰居算法（簡稱KNN）是一種非常簡單的技術。

首先儲存訓練數據集，而當需要預測時，然後找到與訓練數據集中的新記錄最相似的k個記錄(k-most similar records)。<br>從這些鄰居(neighbors)中，進行了匯總預測(summarized prediction)。<br><br>

記錄之間的相似性可以用許多不同的方法來衡量。通常，使用表格數據時，建議可以使用歐幾里得距離(Euclidean distance)。<br>

(KNN可以用於分類或回歸問題。)

Reference: https://machinelearningmastery.com/tutorial-to-implement-k-nearest-neighbors-in-python-from-scratch/

## Introducing Iris Flower Species Dataset

鳶尾花數據集涉及根據花的觀測量來預測花的種類。<br>

這是一個多類分類問題。每個類別的觀察次數是平衡的。<br>有150個觀測值，其中包含4個輸入變量和1個輸出變量。變量名稱如下：

萼片長度、萼片寬度、花瓣長度、花瓣寬度、類(class)

註: 此問題的基準性能(baseline performance)約為33％。

In [1]:
import pandas as pd

df = pd.read_csv('iris.csv')
df.head()

Unnamed: 0,5.1,3.5,1.4,0.2,Iris-setosa
0,4.9,3.0,1.4,0.2,Iris-setosa
1,4.7,3.2,1.3,0.2,Iris-setosa
2,4.6,3.1,1.5,0.2,Iris-setosa
3,5.0,3.6,1.4,0.2,Iris-setosa
4,5.4,3.9,1.7,0.4,Iris-setosa


# k-Nearest Neighbors

k-Nearest Neighbors分為3部分：<br><br>

步驟1：計算歐幾里得距離。<br>
步驟2：取得最近的鄰居。<br>
步驟3：做出預測。<br><br>

這些步驟將實現k-最近鄰居算法並將其應用於分類和回歸預測建模問題。<br><br>

### 步驟1：計算歐幾里得距離<br>
第一步是計算數據集中兩行之間的距離。<br><br>

數據行(Rows of data)主要由數字組成，計算數字向量之間的距離的方法是繪製一條直線。<br><br>

我們可以使用歐幾里得距離度量來計算兩個向量之間的直線距離。<br>
它被計算為兩個向量之間平方差之和的平方根。<br><br>

Euclidean Distance = sqrt(sum i to N (x1_i – x2_i)^2)<br>
其中x1是數據的第一行，x2是數據的第二行，i是當我們跨所有列 求和時對特定列的索引。<br><br>

對於歐幾里得距離，值越小，兩條記錄越相似。值為0表示兩個記錄之間沒有差異。<br>

In [2]:
# calculate the Euclidean distance between two vectors
def euclidean_distance(row1, row2):
    distance = 0.0
    for i in range(len(row1)-1):
        distance += (row1[i] - row2[i])**2
    return sqrt(distance)

可以觀察該函數假定每行的最後一列是一個輸出值，該值將從距離計算中忽略。 <br>

我們可以使用一個人為分類數據集來測試該距離函數。 <br>
下面列出了完整的範例。

![img](https://3qeqpr26caki16dnhd19sv6by6v-wpengine.netdna-ssl.com/wp-content/uploads/2014/09/Scatter-Plot-of-the-Small-Contrived-Dataset-for-Testing-the-KNN-Algorithm2.png)

In [3]:
# Example of calculating Euclidean distance
from math import sqrt
 
# calculate the Euclidean distance between two vectors
def euclidean_distance(row1, row2):
	distance = 0.0
	for i in range(len(row1)-1):
		distance += (row1[i] - row2[i])**2
	return sqrt(distance)
 
# Test distance function
dataset = [[2.7810836,2.550537003,0],
	[1.465489372,2.362125076,0],
	[3.396561688,4.400293529,0],
	[1.38807019,1.850220317,0],
	[3.06407232,3.005305973,0],
	[7.627531214,2.759262235,1],
	[5.332441248,2.088626775,1],
	[6.922596716,1.77106367,1],
	[8.675418651,-0.242068655,1],
	[7.673756466,3.508563011,1]]
row0 = dataset[0]
for row in dataset:
	distance = euclidean_distance(row0, row)
	print(distance)

0.0
1.3290173915275787
1.9494646655653247
1.5591439385540549
0.5356280721938492
4.850940186986411
2.592833759950511
4.214227042632867
6.522409988228337
4.985585382449795


### 步驟2：取得最近的鄰居
新數據的鄰居是k個最接近的實例(k closest instances)，這由我們的距離度量定義。<br><br>

為了在數據集中找到新數據的鄰居，我們必須首先計算數據集中每個記錄到新數據之間的距離。<br>
我們可以使用上面準備的距離函數來做到這一點。<br><br>

一旦計算出距離，我們就必鬚根據它們與新數據的距離對訓練數據集中的所有記錄進行排序。<br>
然後，我們可以選擇頂部k作為最相似的鄰居返回。<br><br>

我們可以通過以元組的形式跟踪數據集中每個記錄的距離，通過距離（按降序）對元組列表進行排序，<br>
然後檢索鄰居來做到這一點。<br><br>

下面是一個名為get_neighbors（）的函數，用於實現此功能。

In [4]:
# Locate the most similar neighbors
def get_neighbors(train, test_row, num_neighbors):
    distances = list()
    for train_row in train:
        dist = euclidean_distance(test_row, train_row)
        distances.append((train_row, dist))
    distances.sort(key=lambda tup: tup[1])
    neighbors = list()
    for i in range(num_neighbors):
        neighbors.append(distances[i][0])
    return neighbors

您可以看到在上一步中開發的euclidean_distance（）函數用於計算每個train_row和新的test_row之間的距離。<br><br>

在使用自定義鍵的地方對train_row和distance元組的列表進行排序，<br>
以確保在排序操作中使用了元組中的第二項（tup[1]）。<br><br>

最後，列表num_neighbors最相似的鄰居test_row返回。<br><br>

我們可以使用上一部分中準備的人為數據集來測試此功能。<br>
下面列出了完整的範例。

In [5]:
from math import sqrt
 
# calculate the Euclidean distance between two vectors
def euclidean_distance(row1, row2):
    distance = 0.0
    for i in range(len(row1)-1):
        distance += (row1[i] - row2[i])**2
    return sqrt(distance)
 
# Locate the most similar neighbors
def get_neighbors(train, test_row, num_neighbors):
    distances = list()
    for train_row in train:
        dist = euclidean_distance(test_row, train_row)
        distances.append((train_row, dist))
    distances.sort(key=lambda tup: tup[1])
    neighbors = list()
    for i in range(num_neighbors):
        neighbors.append(distances[i][0])
    return neighbors
 
# Test distance function
dataset = [[2.7810836,2.550537003,0],
    [1.465489372,2.362125076,0],
    [3.396561688,4.400293529,0],
    [1.38807019,1.850220317,0],
    [3.06407232,3.005305973,0],
    [7.627531214,2.759262235,1],
    [5.332441248,2.088626775,1],
    [6.922596716,1.77106367,1],
    [8.675418651,-0.242068655,1],
    [7.673756466,3.508563011,1]]
neighbors = get_neighbors(dataset, dataset[0], 3)
for neighbor in neighbors:
    print(neighbor)

[2.7810836, 2.550537003, 0]
[3.06407232, 3.005305973, 0]
[1.465489372, 2.362125076, 0]


此範例將按相似性順序將數據集中的3個最相似的記錄打印到第一條記錄。<br>
第一條記錄與其最相似，並且位於列表的頂部。<br>
現在我們知道瞭如何從數據集中獲取鄰居，我們可以使用它們進行預測。

### 步驟3：做出預測
從訓練數據集中收集的最相似的鄰居可以用來進行預測。<br><br>

在分類的情況下，我們可以返回最代表的類(most represented class)。<br><br>

我們可以通過對鄰居的輸出值列表執行max（）函數來實現此目的。<br>
給定在鄰居中觀察到的類值列表，max（）函數採用一組唯一的類值，<br>
並為該集中的每個類值調用類值列表中的計數。<br>


In [6]:
# Example of making predictions
from math import sqrt
 
# calculate the Euclidean distance between two vectors
def euclidean_distance(row1, row2):
    distance = 0.0
    for i in range(len(row1)-1):
        distance += (row1[i] - row2[i])**2
    return sqrt(distance)
 
# Locate the most similar neighbors
def get_neighbors(train, test_row, num_neighbors):
    distances = list()
    for train_row in train:
        dist = euclidean_distance(test_row, train_row)
        distances.append((train_row, dist))
    distances.sort(key=lambda tup: tup[1])
    neighbors = list()
    for i in range(num_neighbors):
        neighbors.append(distances[i][0])
    return neighbors
 
# Make a classification prediction with neighbors
def predict_classification(train, test_row, num_neighbors):
    neighbors = get_neighbors(train, test_row, num_neighbors)
    output_values = [row[-1] for row in neighbors]
    prediction = max(set(output_values), key=output_values.count)
    return prediction
 
# Test distance function
dataset = [[2.7810836,2.550537003,0],
    [1.465489372,2.362125076,0],
    [3.396561688,4.400293529,0],
    [1.38807019,1.850220317,0],
    [3.06407232,3.005305973,0],
    [7.627531214,2.759262235,1],
    [5.332441248,2.088626775,1],
    [6.922596716,1.77106367,1],
    [8.675418651,-0.242068655,1],
    [7.673756466,3.508563011,1]]
prediction = predict_classification(dataset, dataset[0], 3)
print('Expected %d, Got %d.' % (dataset[0][-1], prediction))

Expected 0, Got 0.


我們可以想像如何可以更改predict_classification()函數以計算結果值的平均值。

現在，我們擁有使用KNN進行預測的所有方法。讓我們將其應用於真實數據集。

## Iris Flower Species Case Study

我們將使用5-fold 交叉驗證(k-fold cross-validation)來評估算法。<br>
即是指每個fold 中將有150/5 = 30個記錄。<br>
我們將使用輔助函數validate_algorithm（）來評估具有交叉驗證的算法，<br>
並使用precision_metric（）來計算預測的準確性。<br><br>

開發了一個名為k_nearest_neighbors（）的新函數來管理KNN算法的應用，<br>
首先從訓練數據集中學習統計信息，然後使用它們為測試數據集做出預測。

In [7]:
# k-nearest neighbors on the Iris Flowers Dataset
from random import seed
from random import randrange
from csv import reader
from math import sqrt

# Load a CSV file
def load_csv(filename):
    dataset = list()
    with open(filename, 'r') as file:
        csv_reader = reader(file)
        for row in csv_reader:
            if not row:
                continue
            dataset.append(row)
    return dataset

# Convert string column to float
def str_column_to_float(dataset, column):
    for row in dataset:
        row[column] = float(row[column].strip())

# Convert string column to integer
def str_column_to_int(dataset, column):
    class_values = [row[column] for row in dataset]
    unique = set(class_values)
    lookup = dict()
    for i, value in enumerate(unique):
        lookup[value] = i
    for row in dataset:
        row[column] = lookup[row[column]]
    return lookup

# Find the min and max values for each column
def dataset_minmax(dataset):
    minmax = list()
    for i in range(len(dataset[0])):
        col_values = [row[i] for row in dataset]
        value_min = min(col_values)
        value_max = max(col_values)
        minmax.append([value_min, value_max])
    return minmax

# Rescale dataset columns to the range 0-1
def normalize_dataset(dataset, minmax):
    for row in dataset:
        for i in range(len(row)):
            row[i] = (row[i] - minmax[i][0]) / (minmax[i][1] - minmax[i][0])

# Split a dataset into k folds
def cross_validation_split(dataset, n_folds):
    dataset_split = list()
    dataset_copy = list(dataset)
    fold_size = int(len(dataset) / n_folds)
    for _ in range(n_folds):
        fold = list()
        while len(fold) < fold_size:
            index = randrange(len(dataset_copy))
            fold.append(dataset_copy.pop(index))
        dataset_split.append(fold)
    return dataset_split

# Calculate accuracy percentage
def accuracy_metric(actual, predicted):
    correct = 0
    for i in range(len(actual)):
        if actual[i] == predicted[i]:
            correct += 1
    return correct / float(len(actual)) * 100.0

# Evaluate an algorithm using a cross validation split
def evaluate_algorithm(dataset, algorithm, n_folds, *args):
    folds = cross_validation_split(dataset, n_folds)
    scores = list()
    for fold in folds:
        train_set = list(folds)
        train_set.remove(fold)
        train_set = sum(train_set, [])
        test_set = list()
        for row in fold:
            row_copy = list(row)
            test_set.append(row_copy)
            row_copy[-1] = None
        predicted = algorithm(train_set, test_set, *args)
        actual = [row[-1] for row in fold]
        accuracy = accuracy_metric(actual, predicted)
        scores.append(accuracy)
    return scores

# Calculate the Euclidean distance between two vectors
def euclidean_distance(row1, row2):
    distance = 0.0
    for i in range(len(row1)-1):
        distance += (row1[i] - row2[i])**2
    return sqrt(distance)

# Locate the most similar neighbors
def get_neighbors(train, test_row, num_neighbors):
    distances = list()
    for train_row in train:
        dist = euclidean_distance(test_row, train_row)
        distances.append((train_row, dist))
    distances.sort(key=lambda tup: tup[1])
    neighbors = list()
    for i in range(num_neighbors):
        neighbors.append(distances[i][0])
    return neighbors

# Make a prediction with neighbors
def predict_classification(train, test_row, num_neighbors):
    neighbors = get_neighbors(train, test_row, num_neighbors)
    output_values = [row[-1] for row in neighbors]
    prediction = max(set(output_values), key=output_values.count)
    return prediction

# kNN Algorithm
def k_nearest_neighbors(train, test, num_neighbors):
    predictions = list()
    for row in test:
        output = predict_classification(train, row, num_neighbors)
        predictions.append(output)
    return(predictions)

# Test the kNN on the Iris Flowers dataset
seed(1)
filename = 'iris.csv'
dataset = load_csv(filename)
for i in range(len(dataset[0])-1):
    str_column_to_float(dataset, i)
# convert class column to integers
str_column_to_int(dataset, len(dataset[0])-1)
# evaluate algorithm
n_folds = 5
num_neighbors = 5
scores = evaluate_algorithm(dataset, k_nearest_neighbors, n_folds, num_neighbors)
print('Scores: %s' % scores)
print('Mean Accuracy: %.3f%%' % (sum(scores)/float(len(scores))))

Scores: [96.66666666666667, 96.66666666666667, 100.0, 90.0, 100.0]
Mean Accuracy: 96.667%


我們可以看到，平均準確度約為96.6％，遠高於基線準確度33％。<br>
可以使用訓練數據集為新的觀測值（數據行）做出預測。<br>

這涉及到對predict_classification（）函數的調用，其中一行代表了我們用來預測類標籤的新觀察結果。<br>

我們也可能想知道用於預測的類標籤（字符串）。<br><br>
我們可以更新str_column_to_int（）函數以印出字串(string)類名稱映射到的整數，以便解釋模型所做的預測。

In [8]:
def str_column_to_int(dataset, column):
    class_values = [row[column] for row in dataset]
    unique = set(class_values)
    lookup = dict()
    for i, value in enumerate(unique):
        lookup[value] = i
        print('[%s] => %d' % (value, i))
    for row in dataset:
        row[column] = lookup[row[column]]
    return lookup

下面列出了將KNN與整個數據集結合使用並對單個觀測值進行單個預測的完整範例。

In [9]:
# Make Predictions with k-nearest neighbors on the Iris Flowers Dataset
from csv import reader
from math import sqrt

# Load a CSV file
def load_csv(filename):
    dataset = list()
    with open(filename, 'r') as file:
        csv_reader = reader(file)
        for row in csv_reader:
            if not row:
                continue
            dataset.append(row)
    return dataset

# Convert string column to float
def str_column_to_float(dataset, column):
    for row in dataset:
        row[column] = float(row[column].strip())

# Convert string column to integer
def str_column_to_int(dataset, column):
    class_values = [row[column] for row in dataset]
    unique = set(class_values)
    lookup = dict()
    for i, value in enumerate(unique):
        lookup[value] = i
        print('[%s] => %d' % (value, i))
    for row in dataset:
        row[column] = lookup[row[column]]
    return lookup

# Find the min and max values for each column
def dataset_minmax(dataset):
    minmax = list()
    for i in range(len(dataset[0])):
        col_values = [row[i] for row in dataset]
        value_min = min(col_values)
        value_max = max(col_values)
        minmax.append([value_min, value_max])
    return minmax

# Rescale dataset columns to the range 0-1
def normalize_dataset(dataset, minmax):
    for row in dataset:
        for i in range(len(row)):
            row[i] = (row[i] - minmax[i][0]) / (minmax[i][1] - minmax[i][0])

# Calculate the Euclidean distance between two vectors
def euclidean_distance(row1, row2):
    distance = 0.0
    for i in range(len(row1)-1):
        distance += (row1[i] - row2[i])**2
    return sqrt(distance)

# Locate the most similar neighbors
def get_neighbors(train, test_row, num_neighbors):
    distances = list()
    for train_row in train:
        dist = euclidean_distance(test_row, train_row)
        distances.append((train_row, dist))
    distances.sort(key=lambda tup: tup[1])
    neighbors = list()
    for i in range(num_neighbors):
        neighbors.append(distances[i][0])
    return neighbors

# Make a prediction with neighbors
def predict_classification(train, test_row, num_neighbors):
    neighbors = get_neighbors(train, test_row, num_neighbors)
    output_values = [row[-1] for row in neighbors]
    prediction = max(set(output_values), key=output_values.count)
    return prediction

# Make a prediction with KNN on Iris Dataset
filename = 'iris.csv'
dataset = load_csv(filename)
for i in range(len(dataset[0])-1):
    str_column_to_float(dataset, i)
# convert class column to integers
str_column_to_int(dataset, len(dataset[0])-1)
# define model parameter
num_neighbors = 5
# define a new record
row = [5.7,2.9,4.2,1.3]
# predict the label
label = predict_classification(dataset, row, num_neighbors)
print('Data=%s, Predicted: %s' % (row, label))

[Iris-setosa] => 0
[Iris-versicolor] => 1
[Iris-virginica] => 2
Data=[5.7, 2.9, 4.2, 1.3], Predicted: 1


運行數據首先總結了 類標籤到整數的映射，然後將模型擬合到整個數據集。<br>
然後定義一個新的觀察值（在這種情況下，我從數據集中取了一行），併計算了預測的標籤。<br>

在這種情況下，我們的觀測值預測為屬於1類，我們知道這是"iris-setosa"。