In [2]:
import numpy as np
import pandas as pd
from pathlib import Path

In [3]:
#6.3.2 应用简化版SMO算法处理小规模数据集

In [4]:
def loadDataSet(fileName):
    dataMat,labelMat = [],[]
    fr = open(fileName)
    for line in fr.readlines():
        lineArr = line.strip().split('\t')
        dataMat.append([float(lineArr[0]),float(lineArr[1])])
        labelMat.append(float(lineArr[2]))
    return dataMat,labelMat

In [5]:
def selectJrand(i,m):
    j = i
    while j == i:
        j = int(np.random.uniform(0,m))
    return j

def clipAlpha(aj,H,L):
    if aj > H:
        aj = H
    if aj < L:
        aj = L
    return aj

In [6]:
data_path = Path('D:\\python_algorithm\\machinelearinginaction\\《机器学习实战》Python3代码\\Ch06')
dataArr,labelArr = loadDataSet(data_path / 'testSet.txt')

In [7]:
def smoSimple(dataMatIn, classLabels, C, toler, maxIter):
    dataMatrix = np.mat(dataMatIn)
    labelMat = np.mat(classLabels).transpose()    #行向量转为列向量
    b = 0
    m,n = np.shape(dataMatrix)
    
    alphas = np.mat(np.zeros((m,1)))
    iter = 0
    
    while iter < maxIter:
        alphaPairsChanged = 0
        for i in range(m):
            fXi = float(np.multiply(alphas,labelMat).T * \
                        (dataMatrix*dataMatrix[i,:].T)) + b
            Ei = fXi - float(labelMat[i])
            
            if ((labelMat[i] * Ei) < -toler and alphas[i] < C) or \
                ((labelMat[i] * Ei) > toler and alphas[i] > 0):
                j = selectJrand(i,m)                                 #随机选择另外一个alpha
                
                fXj = float(np.multiply(alphas,labelMat).T * \
                        (dataMatrix*dataMatrix[j,:].T)) + b
                Ej = fXj - float(labelMat[j])
                
                alphaIold = alphas[i].copy()
                alphaJold = alphas[j].copy()
                
                if labelMat[i] != labelMat[j]:
                    L = max(0,alphas[j] - alphas[i])
                    H = min(C,C+alphas[j] - alphas[i])
                else:
                    L = max(0,alphas[j] + alphas[i] - C)
                    H = min(C, alphas[j] + alphas[i])
                    
                if L == H:
                    #print('L == H')
                    continue
                
                eta = 2.0 * dataMatrix[i,:] * dataMatrix[j,:].T - \
                      dataMatrix[i,:] * dataMatrix[i,:].T - \
                      dataMatrix[j,:] * dataMatrix[j,:].T
                
                if eta >= 0:
                    #print('eta >= 0')
                    continue
                
                alphas[j] -= labelMat[j] * (Ei - Ej)/eta                      #求另外一个alpha的值
                alphas[j] = clipAlpha(alphas[j], H, L)
                
                if abs(alphas[j] - alphaJold) < 0.00001:
                    #print('j not moving enough')
                    continue
                
                alphas[i] += labelMat[j] * labelMat[i] * (alphaJold - alphas[j])
                
                b1 = b - Ei - labelMat[i] * (alphas[i] - alphaIold) * \
                    dataMatrix[i,:] * dataMatrix[i,:].T - \
                    labelMat[j] * (alphas[j] - alphaJold) * \
                    dataMatrix[i,:] * dataMatrix[j,:].T
                b2 = b - Ej - labelMat[i] * (alphas[i] - alphaIold) * \
                    dataMatrix[i,:] * dataMatrix[j,:].T - \
                    labelMat[j] * (alphas[j] - alphaJold) * \
                    dataMatrix[j,:] * dataMatrix[j,:].T
                
                if alphas[i] > 0 and alphas[i] < C:
                    b = b1
                elif alphas[j] > 0 and alphas[j] < C:
                    b = b2
                else:
                    b = (b1 + b2)/2.0
                
                alphaPairsChanged += 1
                
        if alphaPairsChanged == 0:
            iter += 1
        else:
            iter = 0
        #print('iteration number: %d'%iter)
    return b,alphas
            

In [8]:
b,alphas = smoSimple(dataArr,labelArr,0.6,0.001,40)

In [9]:
b

matrix([[-3.8306937]])

In [10]:
alphas[alphas>0]

matrix([[1.24417032e-01, 3.68628739e-17, 2.42268913e-01, 3.68628739e-17,
         2.21177243e-17, 3.66685944e-01, 6.63531730e-17]])

In [11]:
for i in range(100):
    if alphas[i] > 0.0:
        print (dataArr[i],labelArr[i])

[4.658191, 3.507396] -1.0
[7.286357, 0.251077] 1.0
[3.457096, -0.082216] -1.0
[6.960661, -0.245353] 1.0
[5.286862, -2.358286] 1.0
[6.080573, 0.418886] 1.0
[6.543888, 0.433164] 1.0


In [12]:
#6.4 SMO算法的加速优化

In [13]:
class optStruct:
    def __init__(self,dataMatIn, classLabels, C, toler, kTup):
        self.X = dataMatIn
        self.labelMat = classLabels
        self.C = C
        self.tol = toler
        self.m = np.shape(dataMatIn)[0]
        self.alphas = np.mat(np.zeros((self.m,1)))
        self.b = 0
        self.eCache = np.mat(np.zeros((self.m,2)))  #第1列表示是否有效的标志位，第2列表示实际的E值
        
def calcEk(oS,k):
    fXk = float(np.multiply(oS.alphas,oS.labelMat).T * \
               (oS.X * oS.X[k,:].T)) + oS.b
    Ek = fXk - float(oS.labelMat[k])
    return Ek

def selectJ(i,oS,Ei):        #内循环的启发式方法
    maxK = -1
    maxDeltaE = 0
    Ej = 0
    
    oS.eCache[i] = [1,Ei]
    validEcacheList = np.nonzero(oS.eCache[:,0].A)[0]
    if len(validEcacheList) > 1:
        for k in validEcacheList:
            if k ==i:
                continue
            Ek = calcEk(oS,k)
            deltaE = abs(Ei - Ek)
            if deltaE > maxDeltaE:
                maxK = k
                maxDeltaE = deltaE
                Ej = Ek
        return maxK,Ej
    else:
        j = selectJrand(i,oS.m)
        Ej = calcEk(oS,j)
    return j,Ej

def updateEk(oS,k):
    Ek = calcEk(oS,k)
    oS.eCache[k] = [1,Ek]
    

In [14]:
##SMO算法中寻找决策边界的优化历程

In [15]:
def innerL(i, oS):
    Ei = calcEk(oS, i)
    if (oS.labelMat[i] * Ei < -oS.tol and oS.alphas[i] < oS.C) or \
        (oS.labelMat[i] * Ei > oS.tol and oS.alphas[i] > 0):
        j,Ej = selectJ(i,oS,Ei)
        alphaIold = oS.alphas[i].copy()
        alphaJold = oS.alphas[j].copy()
        
        if oS.labelMat[i] != oS.labelMat[j]:
            L = max(0, oS.alphas[j] - oS.alphas[i])
            H = min(oS.C, oS.C + oS.alphas[j] - oS.alphas[i])
        else:
            L = max(0, oS.alphas[j] + oS.alphas[i] - oS.C)
            H = min(oS.C, oS.alphas[j] + oS.alphas[i])
            
        if L == H:
            #print('L == H')
            return 0
        eta = 2.0 * oS.X[i,:] * oS.X[j,:].T - oS.X[i,:] * oS.X[i,:].T - \
              oS.X[j,:] * oS.X[j,:].T
        if eta >= 0:
           # print('eta >= 0')
            return 0
        oS.alphas[j] -= oS.labelMat[j] * (Ei - Ej)/eta
        oS.alphas[j] = clipAlpha(oS.alphas[j],H,L)
        updateEk(oS,j)
        
        if abs(oS.alphas[j] - alphaJold) < 0.00001:
            #print('j not moving enough')
            return 0
        oS.alphas[i] += oS.labelMat[j] * oS.labelMat[i] * (alphaJold - oS.alphas[j])
        updateEk(oS,i)
        
        b1 = oS.b - Ei - oS.labelMat[i] * (oS.alphas[i] - alphaIold) * \
            oS.X[i,:] * oS.X[i,:].T - \
            oS.labelMat[j] * (oS.alphas[j] - alphaJold) * \
            oS.X[i,:] * oS.X[j,:].T
        b2 = oS.b - Ej - oS.labelMat[i] * (oS.alphas[i] - alphaIold) * \
            oS.X[i,:] * oS.X[j,:].T - \
            oS.labelMat[j] * (oS.alphas[j] - alphaJold) * \
            oS.X[j,:] * oS.X[j,:].T
        if 0 < oS.alphas[i] and oS.C > oS.alphas[i]:
            oS.b = b1
        elif 0 < oS.alphas[j] and oS.C > oS.alphas[i]:
            oS.b = b2
        else:
            oS.b = (b1 + b2)/2.0
        return 1
    else:
        return 0

In [16]:
##完整的SMO外循环代码

In [47]:
def smoP(dataMatIn,classLabels, C, toler, maxIter, kTup=('lin',0)):
    oS = optStruct(np.mat(dataMatIn),np.mat(classLabels).transpose(),C,toler, kTup)
    iter = 0
    
    entireSet = True; alphaPairsChanged = 0
    while iter <maxIter and (alphaPairsChanged > 0 or entireSet):
        alphaPairsChanged = 0
        if entireSet:
            for i in range(oS.m):
                alphaPairsChanged += innerL(i,oS)
                #print("fullSet, iter: %d i:%d, pairs changed %d" % (iter,i,alphaPairsChanged))
            iter += 1
        else:
            nonBounds = np.nonzero((oS.alphas.A > 0) * (oS.alphas.A < C))[0]
            for i in nonBounds:
                alphaPairsChanged += innerL(i,oS)
                #print("non-bound, iter: %d i:%d, pairs changed %d" % (iter,i,alphaPairsChanged))
            iter += 1
        if entireSet:
            entireSet = False
        elif alphaPairsChanged == 0:
            entireSet = True
            #print("iteration number: %d" % iter)
    return oS.b,oS.alphas    

In [48]:
b,alphas = smoP(dataArr,labelArr,0.6,0.001,40)

In [49]:
def calcWs(alphas,dataArr,classLabels):
    X = np.mat(dataArr); labelMat = np.mat(classLabels).transpose()
    m,n = np.shape(X)
    w = np.zeros((n,1))
    for i in range(m):
        w += np.multiply(alphas[i]*labelMat[i],X[i,:].T)
    return w

In [50]:
ws = calcWs(alphas,dataArr,labelArr)

In [51]:
ws

array([[ 0.65307162],
       [-0.17196128]])

In [52]:
dataMat = np.mat(dataArr)

In [23]:
#测试结果
dataMat[0] * np.mat(ws) + b

matrix([[-0.82962589]])

In [24]:
labelArr[0]

-1.0

In [25]:
#利用径向基核函数映射数据到高维空间

In [40]:
def kernelTrans(X,A,kTup):
    m,n = np.shape(X)
    K = np.mat(np.zeros((m,1)))
    if kTup[0] == 'lin':
        K = X * A.T
    elif kTup[0] == 'rbf':
        for j in range(m):
            deltaRow = X[j,:] - A
            K[j] = deltaRow * deltaRow.T
        K = np.exp(K / (-1 * kTup[1] ** 2))
    else:
        raise NameError('we have a problem')
    return K

class optStruct:
    def __init__(self,dataMatIn, classLabels, C, toler, kTup):
        self.X = dataMatIn
        self.labelMat = classLabels
        self.C = C
        self.tol = toler
        self.m = np.shape(dataMatIn)[0]
        self.alphas = np.mat(np.zeros((self.m,1)))
        self.b = 0
        self.eCache = np.mat(np.zeros((self.m,2)))  #第1列表示是否有效的标志位，第2列表示实际的E值
        self.K = np.mat(np.zeros((self.m,self.m)))
        for i in range(self.m):
            self.K[:,i] = kernelTrans(self.X, self.X[i,:], kTup)

In [35]:
def calcEk(oS,k):
    fXk = float(np.multiply(oS.alphas,oS.labelMat).T * \
               oS.K[:,k] + oS.b)
    Ek = fXk - float(oS.labelMat[k])
    return Ek

In [36]:
#寻找决策边界的优化过程
def innerL(i, oS):
    Ei = calcEk(oS, i)
    if (oS.labelMat[i] * Ei < -oS.tol and oS.alphas[i] < oS.C) or \
        (oS.labelMat[i] * Ei > oS.tol and oS.alphas[i] > 0):
        j,Ej = selectJ(i,oS,Ei)
        alphaIold = oS.alphas[i].copy()
        alphaJold = oS.alphas[j].copy()
        
        if oS.labelMat[i] != oS.labelMat[j]:
            L = max(0, oS.alphas[j] - oS.alphas[i])
            H = min(oS.C, oS.C + oS.alphas[j] - oS.alphas[i])
        else:
            L = max(0, oS.alphas[j] + oS.alphas[i] - oS.C)
            H = min(oS.C, oS.alphas[j] + oS.alphas[i])
            
        if L == H:
            #print('L == H')
            return 0
        eta = 2.0 * oS.K[i,j] - oS.K[i,i] - oS.K[j,j]
        if eta >= 0:
           # print('eta >= 0')
            return 0
        oS.alphas[j] -= oS.labelMat[j] * (Ei - Ej)/eta
        oS.alphas[j] = clipAlpha(oS.alphas[j],H,L)
        updateEk(oS,j)
        
        if abs(oS.alphas[j] - alphaJold) < 0.00001:
            #print('j not moving enough')
            return 0
        oS.alphas[i] += oS.labelMat[j] * oS.labelMat[i] * (alphaJold - oS.alphas[j])
        updateEk(oS,i)
        
        b1 = oS.b - Ei - oS.labelMat[i] * (oS.alphas[i] - alphaIold) * \
            oS.K[i,i] - \
            oS.labelMat[j] * (oS.alphas[j] - alphaJold) * \
            oS.K[i,j]
        b2 = oS.b - Ej - oS.labelMat[i] * (oS.alphas[i] - alphaIold) * \
            oS.K[i,j] - \
            oS.labelMat[j] * (oS.alphas[j] - alphaJold) * \
            oS.K[j,j]
        if 0 < oS.alphas[i] and oS.C > oS.alphas[i]:
            oS.b = b1
        elif 0 < oS.alphas[j] and oS.C > oS.alphas[i]:
            oS.b = b2
        else:
            oS.b = (b1 + b2)/2.0
        return 1
    else:
        return 0

In [37]:
#6.5.3 测试使用核函数

In [53]:
def testRbf(k1 = 1.3):
    dataArr,labelArr = loadDataSet(data_path / 'testSetRBF.txt')
    b,alphas = smoP(dataArr,labelArr,200,0.0001,10000,('rbf',k1))
    datMat = np.mat(dataArr)
    labelMat = np.mat(labelArr).transpose()
    svInd = np.nonzero(alphas.A > 0)[0]
    sVs = dataMat[svInd]
    labelSV = labelMat[svInd]
    
    m,n = np.shape(datMat)
    errorCount = 0
    for i in range(m):
        kernelEval = kernelTrans(sVs,datMat[i,:],('rbf',k1))
        predict = kernelEval.T * np.multiply(labelSV,alphas[svInd] + b)
        if np.sign(predict) != np.sign(labelArr[i]):
            errorCount += 1
    print('the training error rate is: %f'%(float(errorCount)/m))

In [54]:
testRbf()

the training error rate is: 0.440000


In [55]:
testRbf(0.1)

the training error rate is: 0.340000


In [57]:
##6.6 手写识别问题

In [62]:
data_path = Path('D:/python_algorithm/machinelearinginaction/《机器学习实战》Python3代码/Ch06/digits')

In [63]:
data_path

WindowsPath('D:/python_algorithm/machinelearinginaction/《机器学习实战》Python3代码/Ch06/digits')

In [74]:
def img2vector(filename):
    returnVect = np.zeros((1,1024))
    fr = open(filename)
    for i in range(32):
        lineStr = fr.readline()
        for j in range(32):
            returnVect[0,32*i+j] = int(lineStr[j])
    return returnVect

def loadImages(dirName):
    from os import listdir
    hwLabels = []
    trainingFileList = listdir(dirName)           #load the training set
    m = len(trainingFileList)
    trainingMat = np.zeros((m,1024))
    for i in range(m):
        fileNameStr = trainingFileList[i]
        fileStr = fileNameStr.split('.')[0]     #take off .txt
        classNumStr = int(fileStr.split('_')[0])
        if classNumStr == 9: hwLabels.append(-1)
        else: hwLabels.append(1)
        trainingMat[i,:] = img2vector('%s/%s' % (dirName, fileNameStr))
    return trainingMat, hwLabels    

def testDigits(kTup=('rbf', 10)):
    dataArr,labelArr = loadImages(data_path / 'trainingDigits')
    b,alphas = smoP(dataArr, labelArr, 200, 0.0001, 10000, kTup)
    datMat=np.mat(dataArr); labelMat = np.mat(labelArr).transpose()
    svInd=np.nonzero(alphas.A>0)[0]
    sVs=datMat[svInd] 
    labelSV = labelMat[svInd];
    print("there are %d Support Vectors" % np.shape(sVs)[0])
    m,n = np.shape(datMat)
    errorCount = 0
    for i in range(m):
        kernelEval = kernelTrans(sVs,datMat[i,:],kTup)
        predict=kernelEval.T * np.multiply(labelSV,alphas[svInd]) + b
        if np.sign(predict)!=np.sign(labelArr[i]): errorCount += 1
    print("the training error rate is: %f" % (float(errorCount)/m))
    dataArr,labelArr = loadImages(data_path / 'testDigits')
    errorCount = 0
    datMat=np.mat(dataArr); labelMat =np.mat(labelArr).transpose()
    m,n = np.shape(datMat)
    for i in range(m):
        kernelEval = kernelTrans(sVs,datMat[i,:],kTup)
        predict=kernelEval.T * np.multiply(labelSV,alphas[svInd]) + b
        if np.sign(predict)!=np.sign(labelArr[i]): errorCount += 1    
    print("the test error rate is: %f" % (float(errorCount)/m)) 

In [75]:
testDigits(('rbf',20))

there are 60 Support Vectors
the training error rate is: 0.000000
the test error rate is: 0.005376
