<br>
<font size='12px'>
Perceptron Learning Algorithm
</font>
<br>

[©2020 AI在弦上](https://www.dataml.cn)

# 概要

## PLA 核心思想

**假设空间**

$H(x) = sign(w \cdot x), w \in R_{n+1} $

**几何含义**

目标函数, 对应于$n$维空间上, 可以将正负样本分开的超平面


**知错能改, 善莫大焉**

1. 知错, 检测误分类点  
  $y_i (w_{k} \cdot x_i) < 0$  
  st. $h(x) = sign(w \cdot x)$  
  
2. 能改, 根据当前误分类点修正权重向量   
  $w_{k+1} = w_{k} + y_i x_i$
  
  $
   y_i (w_{k} \cdot x_i) = \left\{
    \begin{aligned}
      & w_{k}\cdot x_i < 0, & y_i > 0, \\
      & w_{k}\cdot x_i > 0, & y_i < 0
    \end{aligned}
  \right.
  $
  - $y_i > 0$ 时, $w \cdot x_i < 0$, 对应两者夹角 $\cos \theta < 0$, 夹角过大  
    调整的策略权重向量偏向$x_i$, $w_{k+1} = w_{k} + x_i$  
    
  - $y_i < 0$ 时, $w \cdot x_i > 0$, 对应两者夹角 $\cos \theta > 0$, 夹角过小    
    调整的策略权重向量远离$x_i$, $w_{k+1} = w_{k} - x_i$


# 符号说明

输入特征向量为$n$维, 这里将截距合入特征向量中

- 特征向量, $x = [1, x^{(1)}, x^{(2)}, \ldots, x^{(n)}]$
- 权重向量, $w = [w^{(0)}, w^{(1)}, w^{(2)}, \ldots, w^{(n)}]$

# 环境

In [1]:
import numpy as np

In [2]:
%load_ext ipython_unittest
import unittest

# 原始形式

## 实现

In [3]:
class Pla:
    
    def __init__(self, X, y, 
                 weight=None,
                 max_iterates=1000,
                 learning_rate=1,
                 method='random',
                 recycle=False,
                 debug=False):
        """
        PLA 初始化
        
        Params
        -------
        
        X: array, 样本特征
        y: array, 样本标签
        
        weight: array, 权重向量
        
        method: str, 选取错误样本的方式
                cycle: 按输入样本顺序, 循环遍历找错误样本
                random: 随机选取错误样本
        
        recycle: boolean, 当且仅当method=cycle时有用
                 True, 每次从第一个样本, 寻找下一个错误样本
                 False, 每次从上一次错误样本后面, 寻找下一个错误样本 
        """
        
        self.X = np.insert(X, 0, values=np.ones(len(X)), axis=1)
        self.y = y
                
        self.weight = weight if weight else np.zeros(np.shape(self.X)[1])
        self.method = method
        self.re_cycle = recycle if recycle else False
        
        self.max_iterates = max_iterates
        self.learning_rate = learning_rate
        
        self.debug = debug
        
        self.sample_size = len(self.X)
        self.iterates = 1
        self.error_index = -1
        self.error_x = None
        self.error_y = None        
        
    def train(self):
        while(self.iterates < self.max_iterates):          
            self.select_error_sample()
            if self.error_index != -1:
                self.update_weights()
                self.update_print()
                self.iterates += 1
            else:
                print('\n训练成功, 迭代次数:{0}, 权重: {1}'.format(self.iterates - 1, self.weight))
                break
            
        if self.iterates >= self.max_iterates:
            print('\n训练结束, 有限迭代次数内未找到合适的超平面:{0}, 权重: {1}'.format(self.iterates, self.weight))

    def predict(self, x):
        X = np.array(x)
        
        if X.ndim == 1:
            X = np.insert(X, 0, 1)
        else:
            X = np.insert(X, 0, values=np.ones(len(X)), axis=1)
        return np.sign(np.dot(X, self.weight))
    
    def calc_dist(self, index):
        return np.dot(self.X[index], self.weight) * self.y[index]

    def select_error_sample(self):
        
        if self.method == 'random':
            self.select_error_sample_random()
        elif self.method == 'cycle':
            self.select_error_sample_cycle()
        else:
            self.select_error_sample_random()
        
        if self.error_index != -1:
            self.error_x, self.error_y = self.X[self.error_index], self.y[self.error_index]
        else:
            self.error_x, self.error_y = None, None
            
        return self.error_index
    
    def select_error_sample_random(self):
        sample_index_pool = [i for i in range(self.sample_size)]
        self.error_index = -1
        
        for i in range(self.sample_size):
            un_check_smaple_size = len(sample_index_pool)
            sample_index = sample_index_pool[np.random.randint(un_check_smaple_size)]
            
            dist = self.calc_dist(sample_index)
            
            if dist < 0:
                self.error_index = sample_index
                break
            else:
                sample_index_pool.remove(sample_index)

        return self.error_index
    
    def select_error_sample_cycle(self):
        
        sample_index =  -1 if self.re_cycle else self.error_index
        self.error_index = -1
        
        for i in range(self.sample_size):
            sample_index += 1
            sample_index = sample_index % self.sample_size
            
            dist = self.calc_dist(sample_index)
            
            if dist < 0:
                self.error_index = sample_index
                break
            else:
                pass
            
        return self.error_index
    
    def update_weights(self):
        self.weight += self.learning_rate * self.error_y * np.array(self.error_x) 
        
    def update_print(self):
        self.debug_print('\n{0}\n迭代: {1}'.format('-'*50, self.iterates))
        self.debug_print('错误样本: X = {0}, y = {1}'.format(self.error_x, self.error_y))
        self.debug_print('更新后权重: {0}'.format(self.weight))
    
    def debug_print(self, msg):
        if self.debug:
            print(msg)
    

## 单元测试

In [8]:
%%unittest_main
class PlaTest(unittest.TestCase):
    
    def setUp(self):
        self.X = [[1, 1], [3, 3], [4, 3]]
        self.y = [1, -1, -1]
        self.weight = [0, 1, 2]
        
    def calc_error_points_num(self, pla):
        
        dist = np.dot(pla.X, pla.weight) * pla.y
        error_samples_index = np.argwhere(dist < 0).reshape(-1)
        return len(error_samples_index)
        
    
    def test_select_random(self):
        
        pla = Pla(self.X, self.y, weight=self.weight, method='random')
        error_samples = [pla.select_error_sample() for i in range(10)]
        sample_count = [error_samples.count(i) for i in range(3)]
        
        
        self.assertEqual(sample_count[0], 0, '第0个节点, 作为错误节点的次数应该为0次')
        self.assertGreater(sample_count[1], 0, '第0个节点, 作为错误节点的次数应该大于0次')
        self.assertGreater(sample_count[2], 0, '第0个节点, 作为错误节点的次数应该大于0次')
        
    def test_select_cycle(self):
        
        pla = Pla(self.X, self.y, weight=self.weight, method='cycle')
        error_samples = [pla.select_error_sample() for i in range(10)]      
        
        self.assertEqual(error_samples, [1, 2, 1, 2, 1, 2, 1, 2, 1, 2], '每次搜索在顺序在上一样本之后开始查找')
    
            
    def test_select_recycle(self):
        
        pla = Pla(self.X, self.y, weight=self.weight, method='cycle', recycle=True)
        error_samples = [pla.select_error_sample() for i in range(10)]      
        
        self.assertEqual(error_samples, [1, 1, 1, 1, 1, 1, 1, 1, 1, 1], '每次搜索从第0个样本开始查找')
        
        
    def test_train_random(self):
        
        print('info, method: random')
        pla = Pla(self.X, self.y, weight=self.weight, method='random', debug=False)
        pla.train()
        
        self.assertEqual(self.calc_error_points_num(pla), 0, '成功分离, 错误点个数应该为0')
        print('='*50, '\n')
        
        
    def test_train_recycle_random(self):
        
        print('info, method: recycle')
        pla = Pla(self.X, self.y, weight=self.weight, method='cycle', recycle=True, debug=False)
        pla.train()
    
        self.assertEqual(self.calc_error_points_num(pla), 0, '成功分离, 错误点个数应该为0')
        print('='*50, '\n')
            
    def test_train_cycle_random(self):
        
        print('info, method: cycle')
        pla = Pla(self.X, self.y, weight=self.weight, method='cycle', recycle=False, debug=False)
        pla.train()
        
        self.assertEqual(self.calc_error_points_num(pla), 0, '成功分离, 错误点个数应该为0')
        print('='*50, '\n')
        
        
    def test_predict(self):
        pla = Pla(self.X, self.y, weight=self.weight, method='cycle', recycle=False, debug=False)
        pla.train()
        
        X = [5, 5]
        y = pla.predict(X)
        self.assertEqual(y, -1, '[5, 5]应该为负样本')
        
        X2 = [[-1, -1], [5, 5]]
        y2 = pla.predict(X2)
        self.assertEqual(y2.tolist(), [1, -1], '[[-1, -1], [5, 5]]应该分别为正、负样本')




训练成功, 迭代次数:6, 权重: [ 2 -1  0]
info, method: cycle

训练成功, 迭代次数:6, 权重: [ 2 -1  0]

info, method: random

训练成功, 迭代次数:6, 权重: [ 2 -2  0]

info, method: recycle

训练成功, 迭代次数:6, 权重: [ 2 -1  0]



Success

.......
----------------------------------------------------------------------
Ran 7 tests in 0.008s

OK


<unittest.runner.TextTestResult run=7 errors=0 failures=0>