# TensorFlow2实现kNN分类和回归

In [1]:
import sys
from collections import Iterable, Counter


import tensorflow as tf
from tensorflow.keras.datasets import mnist, boston_housing
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris
import numpy as np


print("python version:", sys.version_info)
print(tf.__name__, tf.__version__)

  


python version: sys.version_info(major=3, minor=7, micro=2, releaselevel='final', serial=0)
tensorflow 2.3.0


## 1. TensorFlow实现kNN基类

In [2]:
class TFKNNBase:
    """KNN basic class with TensorFlow.
    
    Attributes:
        n_neighbors: A int number, number of neighbors.
        _metric: A method object, choose from {_manhattan_distance, _euclidean_distance, _chebyshev_distance}.
        _X_train: feature data for training. A tf.Tensor matrix of (sample_lenght, feature_lenght) shape, 
            data type must be continuous value type. 
        _y_train: label data for training. A tf.Tensor array of (sample_lenght, ) shape, 
            data type must be discrete value.
    """
    def __new__(cls):
        raise Exception("Can't instantiate an object from TFKNNBase! ") 
        
    def __init__(self, n_neighbors=5, metric="euclidean"):
        """Init method.
    
        Args:
            n_neighbors: int, optional (default = 5), the integer must greater then 0.
                Number of neighbors to use by default for :meth:`kneighbors` queries.
            metric: {"manhattan", "euclidean", "chebyshev"}, optional, default 'euclidean'.
        
        Raises:
            ValueError: metric value is out of options.
            AssertionError: n_neighbors value is not a integer or n_neighbors > 0. 
        """
        assert isinstance(n_neighbors, int) and n_neighbors > 0
        self.n_neighbors = n_neighbors
        if metric == "manhattan":
            self._metric = self._manhattan_distance
        elif metric == "euclidean":
            self._metric = self._euclidean_distance
        elif metric == "chebyshev":
            self._metric = self._chebyshev_distance
        else:
            raise ValueError(f'No such metric as {metric}, please option from: {"manhattan", "euclidean", "chebyshev"}')
        self._X_train, self._y_train = [None] * 2
    
    def fit(self, X_train, y_train):
        """method for training model. 
        
        Args:
            X_train: A matrix of (sample_lenght, feature_lenght) shape, data type must be continuous value type. 
            y_train: A array of (sample_lenght, ) shape.
        
        Raises:
            AssertionError: X_train value or y_train value with a mismatched shape.
        """
        assert isinstance(X_train, Iterable) and isinstance(y_train, Iterable)
        assert len(X_train) == len(y_train)
        self._X_train = tf.convert_to_tensor(X_train, dtype=tf.dtypes.float32)
        self._y_train = y_train if isinstance(y_train, tf.Tensor) else tf.convert_to_tensor(y_train)
    
    def predict(self, X_test):
        """predict test data.
        
        Args:
            X_test: A np.ndarray matrix of (sample_lenght, feature_lenght) shape, 
                or a np.ndarray array of (feature_lenght, ) shape.
            
        Returns:
            A list for samples predictions or a single prediction.
        
        Raises:
            ValueError: X_test value with a mismatched shape.
        """
        assert isinstance(X_test, Iterable)
        X_test = tf.convert_to_tensor(X_test, dtype=tf.dtypes.float32)
        
        if X_test.shape == (self._X_train.shape[1], ):
            y_pred = self._predict_sample(X_test)
        elif X_test.shape[1] == self._X_train.shape[1]: 
            y_pred = []
            for sample in X_test:
                y_pred.append(self._predict_sample(sample))
        else:
            raise ValueError("Mismatched shape for X_test")
        return y_pred
    
    def _manhattan_distance(self, x):
        return tf.reduce_sum(tf.abs(self._X_train - x), axis=1)    
    
    def _euclidean_distance(self, x):
        return tf.sqrt(tf.reduce_sum(tf.square(self._X_train - x), axis=1))
    
    def _chebyshev_distance(self, x):
        return tf.reduce_max(tf.abs(self._X_train - x), axis=1)
    
    def _find_k_labels(self, sample):
        distance = self._metric(sample)
        sorted_idx = tf.argsort(distance)
        k_labels = tf.gather(self._y_train, indices=sorted_idx[:self.n_neighbors])
        return k_labels
    
    def _predict_sample(self, sample):
        raise Exception("Can call predict method for NumpyKNNBase object! ")
        
    def _score_validation(self, X_test, y_test):
        assert isinstance(X_test, Iterable) and isinstance(y_test, Iterable)
        assert len(X_test) == len(y_test)
        X_test = tf.convert_to_tensor(X_test, dtype=tf.dtypes.float32)
        y_test = y_test if isinstance(y_test, tf.Tensor) else tf.convert_to_tensor(y_test)
        return X_test, y_test

## 2. TensorFlow实现kNN Classifier

In [3]:
class TFKNNClassifier(TFKNNBase):
    """kNN Classifier with TensorFlow, explicitly inherits from TFKNNBase already.
    
    Attributes:
        n_neighbors: A int number, number of neighbors.
        _metric: A method object, choose from {_manhattan_distance, _euclidean_distance, _chebyshev_distance}.
        _X_train: feature data for training. A tf.Tensor matrix of (sample_lenght, feature_lenght) shape, 
            data type must be continuous value type. 
        _y_train: label data for training. A tf.Tensor array of (sample_lenght, ) shape, 
            data type must be discrete value.
    """
    def __new__(cls):
        return object.__new__(cls)
    
    def score(self, X_test, y_test):
        """Use test dataset to evaluate the trained model.
        
        Args:
            X_test: A np.ndarray matrix of (sample_lenght, feature_lenght) shape.
            y_test: A np.ndarray array of (sample_lenght, ) shape. data type must be
                discrete value.
        Returns:
            return accuracy, a float number. accuracy = correct_count / y_test.shape[0]
        """
        X_test, y_test = self._score_validation(X_test, y_test)
        
        y_pred = self.predict(X_test)
        correct_count = tf.reduce_sum(tf.cast(y_pred == y_test, dtype=tf.dtypes.int32))
        accuracy = correct_count / y_test.shape[0]
        return accuracy.numpy()
    
    def _predict_sample(self, sample):
        k_labels = self._find_k_labels(sample)
        pred = Counter(k_labels.numpy()).most_common(1)[0][0]
        return pred

In [4]:
a = tf.convert_to_tensor([2])
a / 2

<tf.Tensor: shape=(1,), dtype=float64, numpy=array([1.])>

### 2.1 iris数据集验证算法

In [5]:
X_data, y_data = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X_data, y_data, random_state=1, shuffle=True)
clf = TFKNNClassifier()
clf.fit(X_train, y_train)
clf.score(X_test, y_test)

1.0

### 2.2 mnist手写数字识别

https://tensorflow.google.cn/api_docs/python/tf/keras/datasets/mnist/load_data

http://yann.lecun.com/exdb/mnist/

In [6]:
(X_train, y_train), (X_test, y_test) = mnist.load_data()
X_train.shape, y_train.shape, X_test.shape, y_test.shape

((60000, 28, 28), (60000,), (10000, 28, 28), (10000,))

In [7]:
X_train = X_train.reshape([60000, -1])
X_train.shape

(60000, 784)

In [8]:
X_test = X_test.reshape([10000, -1])
X_test.shape

(10000, 784)

In [9]:
clf = TFKNNClassifier()
clf.fit(X_train, y_train)
clf.score(X_test[:100], y_test[:100])

0.99

#### Numpy实现Min-Max归一化

In [10]:
class NumpyMinMaxScaler:
    def __init__(self):
        self.min_ = None
        self.max_ = None
    
    def fit(self, X_train):
        self.max_ = np.max(X_train, axis=0)
        self.min_ = np.min(X_train, axis=0)
    
    def transform(self, x):
        return (x - self.min_) / (self.max_ - self.min_ + 0.00001)
    
    def fit_transform(self, X_train):
        self.fit(X_train)
        return self.transform(X_train)

In [11]:
scaler = NumpyMinMaxScaler()
X_train = scaler.fit_transform(X_train)
X_train.shape

(60000, 784)

In [12]:
clf = TFKNNClassifier()
clf.fit(X_train, y_train)
clf.score(X_test[:100], y_test[:100])

0.76

### 2.2 红酒数据集验证NumpyMinMaxScaler

In [13]:
from sklearn.datasets import load_wine

X_data, y_data = load_wine(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X_data, y_data, random_state=1, shuffle=True)
clf = TFKNNClassifier()
clf.fit(X_train, y_train)
clf.score(X_test, y_test)

0.6888888888888889

In [14]:
X_data, y_data = load_wine(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X_data, y_data, random_state=1, shuffle=True)

scaler = NumpyMinMaxScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

clf = TFKNNClassifier()
clf.fit(X_train, y_train)
clf.score(X_test, y_test)

0.9777777777777777

## 3. TensorFlow实现kNN Regressor

In [15]:
class TFKNNRegressor(TFKNNBase): 
    """kNN Regressor with tensorflow, explicitly inherits from TFKNNBase already.
    
    Attributes:
        n_neighbors: A int number, number of neighbors.
        _metric: A method object, choose from {_manhattan_distance, _euclidean_distance, _chebyshev_distance}.
        _X_train: feature data for training. A tf.Tensor matrix of (sample_lenght, feature_lenght) shape, 
            data type must be continuous value type. 
        _y_train: label data for training. A tf.Tensor array of (sample_lenght, ) shape, 
            data type must be discrete value.
    """
    def __new__(cls):
        return object.__new__(cls)
    
    def score(self, X_test, y_test):
        """Use test dataset to evaluate the trained model.
        
        Args:
            X_test: A np.ndarray matrix of (sample_lenght, feature_lenght) shape.
            y_test: A np.ndarray array of (sample_lenght, ) shape. data type must be
                discrete value.
        Returns:
            return R^2, R^2 = 1 - u / v. u = sum((y_pred - y_true)^2), v = sum((y_true - y_true_mean)^2)
        """
        X_test, y_test = self._score_validation(X_test, y_test)
        
        y_pred = self.predict(X_test)
        y_true_mean = tf.reduce_mean(y_test, axis=0)
        u = tf.reduce_sum(tf.square(y_pred - y_test))
        v = tf.reduce_sum(tf.square(y_test - y_true_mean))
        r_squared = 1 - u / v
        return r_squared.numpy()
    
    def _predict_sample(self, sample):
        k_labels = self._find_k_labels(sample)
        pred = tf.reduce_mean(k_labels, axis=0)
        return pred

### 3.1 boston房价数据集验证算法

In [16]:
(X_train, y_train), (X_test, y_test) = boston_housing.load_data()

In [17]:
rgs = TFKNNRegressor()
rgs.fit(X_train, y_train)
rgs.score(X_test, y_test)

0.5558913435303654

In [18]:
scaler = NumpyMinMaxScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

rgs = TFKNNRegressor()
rgs.fit(X_train, y_train)
rgs.score(X_test, y_test)

0.6401800189130427

## 4. 总结

1. TensorFlow API
    > tf.convert_to_tensor(x ,dtype=tf.dtypes.int32)  
    > tf.abs()  
    > tf.sqrt()  
    > tf.square()  
    > tf.reduce_sum()  
    > tf.reduce_max(input_tensor, axis=1)  
    > tf.argsort(distance)  
    > tf.gather(self._y_train, indices)  
2. TensorFlow的数据类型
    > tf.Tensor
    > tf.dtypes.float32  
    > tf.dtypes.int32  
3. tensorflow.keras的数据集
    > from tensorflow.keras.datasets import mnist, boston_housing
    > (X_train, y_train), (X_test, y_test) = mnist.load_data()
4. Numpy实现Min-Max Normalization
    > 分母加一个极小值， 防止分母为0  
    > Min-Max Normalization不一定能得到更好训练效果  

## 5. 作业

1. 惯例 至少敲两遍。第一遍拷贝kNN-numpy，照着改。第二遍从零开始默写。并完善注释 
2. tensorflow实现Min-Max Normalization。
3. 弄清上述tf API的功能及其参数，总结tensorflow API和numpy的区别。

## 相关链接

<a href="./1.1.kNN.ipynb" style=""> 1.1 kNN k近邻算法原理 </a>  
<a href="./1.2.kNN-sklearn.ipynb" style=""> 1.2 sklearn中使用kNN做分类、回归任务 </a>  
<a href="./1.3.kNN-numpy.ipynb" style=""> 1.3 numpy实现kNN分类和回归 </a>    
  
<a href="./1.5.kNN-torch1.ipynb"> 1.5 Pytorch1实现kNN分类和回归 </a>  

## 项目源码  
https://github.com/LossJ     
进入后点击Statistic-Machine-Learning