In [1]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
from time import sleep
import json
import urllib

In [2]:
def regularize(xMat): #regularize by columns
    inMat = xMat.copy()
    inMeans = np.mean(inMat,0) #计算平均数，然后减去它
    inVar = np.var(inMat,0)
    inMat = (inMat-inMeans)/inVar
    return inMat

### 计算最佳拟合直线
$w'=\left(X^TX\right)^{-1}X^Ty$

In [3]:
def standRegres(xArr, yArr):
    xMat = np.mat(xArr)
    yMat = np.mat(yArr).T
    xTx = xMat.T * xMat
    if np.linalg.det(xTx) == 0.0:    # 判断行列式是否为0
        print('This matrix is singular, cannot do inverse.')
        return
    ws = xTx.I * (xMat.T * yMat)
    return ws

### 局部加权线性回归函数
$w'=\left(X^TWX\right)^{-1}X^TWy$  
高斯核：  
$w\left(i,i\right)=exp\left(\frac{\left|x^{(i)}-x\right|}{-2k^2}\right)$

In [4]:
def lwlr(testPoint, xArr, yArr, k=1.0):
    xMat = np.mat(xArr)
    yMat = np.mat(yArr).T
    m = np.shape(xMat)[0]
    weights = np.mat(np.eye((m)))    # 创建对角矩阵
    # 样本点与代预测点距离越远，权重以指数级衰减，k控制衰减速度
    for j in range(m):
        diffMat = testPoint - xMat[j, :]
        weights[j, j] = np.exp(diffMat * diffMat.T / (-2.0 * k ** 2))
    xTx = xMat.T * (weights * xMat)
    if np.linalg.det(xTx) == 0.0:    # 判断行列式是否为0
        print('This matrix is singular, cannot do inverse.')
        return
    ws = xTx.I * (xMat.T * (weights * yMat))
    return testPoint * ws

In [5]:
# 为数据集中每个点调用lwlr()
def lwlrTest(testArr, xArr, yArr, k=1.0):
    m = np.shape(testArr)[0]
    yHat = np.zeros(m)
    for i in range(m):
        yHat[i] = lwlr(testArr[i], xArr, yArr, k)
    return yHat

In [6]:
def rssError(yArr, yHatArr):
    return ((yArr - yHatArr)**2).sum()

## 岭回归（ridge regression）  
在矩阵$X^TX$上加一个$\lambda I$从而使得矩阵非奇异，进而能对$X^TX+\lambda I$求逆。  
回归系数的计算公式将变成：  
$w'=\left(X^TX+\lambda I\right)^{-1}X^Ty$  
约束条件：  
$\sum_{k=1}^nw_k^2\leq\lambda$  

### 计算回归系数

In [7]:
def ridgeRegres(xMat, yMat, lam=0.2):
    xTx = xMat.T * xMat
    denom = xTx + np.eye(np.shape(xMat)[1]) * lam
    if np.linalg.det(denom) == 0.0:    # 判断行列式是否为0
        print('This matrix is singular, cannot do inverse.')
        return
    ws = denom.I * (xMat.T * yMat)
    return ws

In [8]:
def ridgeTest(xArr, yArr):
    xMat = np.mat(xArr)
    yMat = np.mat(yArr).T
    yMean = np.mean(yMat, 0)
    # 数据标准化
    yMat = yMat - yMean
    xMeans = np.mean(xMat,0)
    xVar = np.var(xMat, 0)
    xMat = (xMat - xMeans) / xVar    # 减去均值，除以方差
    numTestPts = 30
    wMat = np.zeros((numTestPts, np.shape(xMat)[1]))
    for i in range(numTestPts):
        ws = ridgeRegres(xMat, yMat, np.exp(i-10))
        wMat[i,:] = ws.T
    return wMat

## lasso
约束条件：  
$\sum_{k=1}^n\left|w_k\right|\leq\lambda$  

## 前向逐步回归  
伪代码：  
数据标准化，使其分布满足0均值和单位方差  
在美轮迭代过程中：  
&ensp;&ensp;&ensp;&ensp;设置当前最小误差lowestError为正无穷  
&ensp;&ensp;&ensp;&ensp;对每个特征：  
&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;增大或缩小  
&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;改变一个系数得到一个新的W  
&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;计算新W下的误差  
&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;如果误差Error小于当前最小误差lowestError：  
&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;设置Wbest等于当前W  
&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;&ensp;将W设为新的Wbest

In [9]:
def stageWise(xArr, yArr, eps=0.01, numIt=100):    # eps:每次迭代调整的步长
    xMat = np.mat(xArr)
    yMat = np.mat(yArr).T
    yMean = np.mean(yMat, 0)
    yMat = yMat - yMean
    xMat = regularize(xMat)
    m, n = np.shape(xMat)
    returnMat = np.zeros((numIt, n))
    ws = np.zeros((n, 1))
    wsTest = ws.copy()
    wsMax = ws.copy()
    for i in range(numIt):
        print(ws.T)
        lowestError = np.inf
        for j in range(n):
            for sign in [-1, 1]:
                wsTest = ws.copy()
                wsTest[j] += eps * sign
                yTest = xMat * wsTest
                rssE = rssError(yMat.A, yTest.A)
                if rssE < lowestError:
                    lowestError = rssE
                    wsMax = wsTest
        ws = wsMax.copy()
        returnMat[i, :] = ws.T
    return returnMat

In [18]:
def searchForSet(retX, retY, setNum, yr, numPce, origPrc):
    sleep(10)
    myAPIstr = 'AIzaSyD2cR2KFyx12hXu6PFU-wrWot3NXvko8vY'
    searchURL = 'https://www.googleapis.com/shopping/search/v1/public/products?key=%s&country=US&q=lego+%d&alt=json' % (myAPIstr, setNum)
    pg = urllib.request.urlopen(searchURL)
    retDict = json.loads(pg.read())
    for i in range(len(retDict['items'])):
        try:
            currItem = retDict['items'][i]
            if currItem['product']['condition'] == 'new':
                newFlag = 1
            else: newFlag = 0
            listOfInv = currItem['product']['inventories']
            for item in listOfInv:
                sellingPrice = item['price']
                if  sellingPrice > origPrc * 0.5:
                    print("%d\t%d\t%d\t%f\t%f" % (yr,numPce,newFlag,origPrc, sellingPrice))
                    retX.append([yr, numPce, newFlag, origPrc])
                    retY.append(sellingPrice)
        except: 
            print('problem with item %d' % i)

In [21]:
def setDataCollect(retX, retY):
    searchForSet(retX, retY, 8288, 2006, 800, 49.99)
    searchForSet(retX, retY, 10030, 2002, 3096, 269.99)
    searchForSet(retX, retY, 10179, 2007, 5195, 499.99)
    searchForSet(retX, retY, 10181, 2007, 3428, 199.99)
    searchForSet(retX, retY, 10189, 2008, 5922, 299.99)
    searchForSet(retX, retY, 10196, 2009, 3263, 249.99)

### 交叉验证测试岭回归

In [24]:
def crossValidation(xArr,yArr,numVal=10):
    m = len(yArr)                           
    indexList = range(m)
    errorMat = np.zeros((numVal, 30))#create error mat 30columns numVal rows
    for i in range(numVal):
        trainX = []
        trainY = []
        testX = []
        testY = []
        np.random.shuffle(indexList)
        for j in range(m):#create training set based on first 90% of values in indexList
            if j < m * 0.9: 
                trainX.append(xArr[indexList[j]])
                trainY.append(yArr[indexList[j]])
            else:
                testX.append(xArr[indexList[j]])
                testY.append(yArr[indexList[j]])
        wMat = ridgeTest(trainX,trainY)    #get 30 weight vectors from ridge
        for k in range(30):#loop over all of the ridge estimates
            matTestX = np.mat(testX)
            matTrainX = np.mat(trainX)
            meanTrain = np.mean(matTrainX, 0)
            varTrain = np.var(matTrainX, 0)
            matTestX = (matTestX - meanTrain) / varTrain #regularize test with training params
            yEst = matTestX * np.mat(wMat[k,:]).T + np.mean(trainY)#test ridge results and store
            errorMat[i,k] = rssError(yEst.T.A, np.array(testY))
            #print errorMat[i,k]
    meanErrors = np.mean(errorMat,0)#calc avg performance of the different ridge weight vectors
    minMean = float(min(meanErrors))
    bestWeights = wMat[nonzero(meanErrors == minMean)]
    #can unregularize to get model
    #when we regularized we wrote Xreg = (x-meanX)/var(x)
    #we can now write in terms of x not Xreg:  x*w/var(x) - meanX/var(x) +meanY
    xMat = np.mat(xArr)
    yMat = np.mat(yArr).T
    meanX = np.mean(xMat, 0)
    varX = np.var(xMat, 0)
    unReg = bestWeights / varX
    print("the best model from Ridge Regression is:\n", unReg)
    print("with constant term: ", -1*sum(multiply(meanX,unReg)) + mean(yMat))