Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 41 additions & 4 deletions docs/15.大数据与MapReduce.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,47 @@ Pegasos是指原始估计梯度求解器(Peimal Estimated sub-GrAdient Solver)

> 训练算法

[完整代码地址](https://github.com/apachecn/MachineLearning/blob/master/src/python/2.KNN/kNN.py): <https://github.com/apachecn/MachineLearning/blob/master/src/python/2.KNN/kNN.py>


我们继续看 Python 版本的代码实现。
```python
def batchPegasos(dataSet, labels, lam, T, k):
"""batchPegasos()

Args:
dataMat 特征集合
labels 分类结果集合
lam 固定值,微调的空间
T 迭代次数
k 待处理列表大小
Returns:
w 权重向量
"""
m, n = shape(dataSet)
w = zeros(n)
dataIndex = range(m)
for t in range(1, T+1):
wDelta = mat(zeros(n)) # 重置 wDelta

# 它是学习率,代表了权重调整幅度的大小。(也可以理解为随机梯度的步长)
# 输入T和K分别设定了迭代次数和待处理列表的大小。在T次迭代过程中,每次需要重新计算eta
eta = 1.0/(lam*t)
random.shuffle(dataIndex)
for j in range(k): # 全部的训练集 内循环中执行批处理,将分类错误的值全部做累加后更新权重向量
i = dataIndex[j]
p = predict(w, dataSet[i, :]) # mapper 代码

# 如果预测正确,并且预测结果的绝对值>=1, 认为没问题。
# 否则算是预测错误, 通过预测错误的结果,来累计更新w.
if labels[i]*p < 1: # mapper 代码
wDelta += labels[i]*dataSet[i, :].A # 累积变化
# w通过不断的随机梯度的方式来优化
w = (1.0 - 1/t)*w + (eta/k)*wDelta # 在每个 T上应用更改
# print '-----', w
# print '++++++', w
return w
```

[完整代码地址](https://github.com/apachecn/MachineLearning/blob/master/src/python/15.BigData_MapReduce/pegasos.py): <https://github.com/apachecn/MachineLearning/blob/master/src/python/15.BigData_MapReduce/pegasos.py>

[MR版本的代码位置](https://github.com/apachecn/MachineLearning/blob/master/src/python/15.BigData_MapReduce/mrSVM.py): <https://github.com/apachecn/MachineLearning/blob/master/src/python/15.BigData_MapReduce/mrSVM.py>

* * *

Expand Down
24 changes: 13 additions & 11 deletions src/python/15.BigData_MapReduce/mrSVM.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
# coding:utf8
'''
Created on 2017-04-07
Update on 2017-06-20
MapReduce version of Pegasos SVM
Using mrjob to automate job flow
@author: Peter/ApacheCN-xy
@author: Peter/ApacheCN-xy/片刻
《机器学习实战》更新地址:https://github.com/apachecn/MachineLearning
'''
from mrjob.job import MRJob

Expand All @@ -17,14 +19,14 @@ class MRsvm(MRJob):

def __init__(self, *args, **kwargs):
super(MRsvm, self).__init__(*args, **kwargs)
self.data = pickle.load(open('input/15.BigData_MapReduce/svmDat27'))
self.data = pickle.load(open('/opt/git/MachineLearning/input/15.BigData_MapReduce/svmDat27'))
self.w = 0
self.eta = 0.69
self.dataList = []
self.k = self.options.batchsize
self.numMappers = 1
self.t = 1 # iteration number

def configure_options(self):
super(MRsvm, self).configure_options()
self.add_passthrough_option(
Expand All @@ -42,20 +44,20 @@ def map(self, mapperId, inVals): # 需要 2 个参数
self.w = inVals[1]
elif inVals[0] == 'x':
self.dataList.append(inVals[1]) # 累积数据点计算
elif inVals[0] == 't':
elif inVals[0] == 't': # 迭代次数
self.t = inVals[1]
else:
self.eta = inVals # 这用于 debug, eta未在map中使用
self.eta = inVals # 这用于 debug, eta未在map中使用

def map_fin(self):
labels = self.data[:,-1]
X = self.data[:, 0:-1] # 将数据重新形成 X 和 Y
if self.w == 0:
labels = self.data[:, -1]
X = self.data[:, :-1] # 将数据重新形成 X 和 Y
if self.w == 0:
self.w = [0.001] * shape(X)[1] # 在第一次迭代时,初始化 w
for index in self.dataList:
p = mat(self.w)*X[index, :].T # calc p=w*dataSet[key].T
p = mat(self.w)*X[index, :].T # calc p=w*dataSet[key].T
if labels[index]*p < 1.0:
yield (1, ['u', index]) # 确保一切数据包含相同的key
yield (1, ['u', index]) # 确保一切数据包含相同的key
yield (1, ['w', self.w]) # 它们将在同一个 reducer
yield (1, ['t', self.t])

Expand All @@ -66,7 +68,7 @@ def reduce(self, _, packedVals):
elif valArr[0] == 'w':
self.w = valArr[1]
elif valArr[0] == 't':
self.t = valArr[1]
self.t = valArr[1]

labels = self.data[:, -1]
X = self.data[:, 0:-1]
Expand Down
81 changes: 54 additions & 27 deletions src/python/15.BigData_MapReduce/pegasos.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,75 +10,102 @@


def loadDataSet(fileName):
dataMat = []; labelMat = []
dataMat = []
labelMat = []
fr = open(fileName)
for line in fr.readlines():
lineArr = line.strip().split('\t')
#dataMat.append([float(lineArr[0]), float(lineArr[1]), float(lineArr[2])])
# dataMat.append([float(lineArr[0]), float(lineArr[1]), float(lineArr[2])])
dataMat.append([float(lineArr[0]), float(lineArr[1])])
labelMat.append(float(lineArr[2]))
return dataMat,labelMat
return dataMat, labelMat


def seqPegasos(dataSet, labels, lam, T):
m,n = shape(dataSet); w = zeros(n)
m, n = shape(dataSet)
w = zeros(n)
for t in range(1, T+1):
i = random.randint(m)
eta = 1.0/(lam*t)
p = predict(w, dataSet[i,:])
p = predict(w, dataSet[i, :])
if labels[i]*p < 1:
w = (1.0 - 1/t)*w + eta*labels[i]*dataSet[i,:]
w = (1.0 - 1/t)*w + eta*labels[i]*dataSet[i, :]
else:
w = (1.0 - 1/t)*w
print w
return w


def predict(w, x):
return w*x.T
return w*x.T # 就是预测 y 的值


def batchPegasos(dataSet, labels, lam, T, k):
m,n = shape(dataSet); w = zeros(n);
"""batchPegasos()

Args:
dataMat 特征集合
labels 分类结果集合
lam 固定值,微调的空间
T 迭代次数
k 待处理列表大小
Returns:
w 权重向量
"""
m, n = shape(dataSet)
w = zeros(n)
dataIndex = range(m)
for t in range(1, T+1):
wDelta = mat(zeros(n)) # 重置 wDelta
wDelta = mat(zeros(n)) # 重置 wDelta

# 它是学习率,代表了权重调整幅度的大小。(也可以理解为随机梯度的步长)
# 输入T和K分别设定了迭代次数和待处理列表的大小。在T次迭代过程中,每次需要重新计算eta
eta = 1.0/(lam*t)
random.shuffle(dataIndex)
for j in range(k):# 全部的训练集
for j in range(k): # 全部的训练集 内循环中执行批处理,将分类错误的值全部做累加后更新权重向量
i = dataIndex[j]
p = predict(w, dataSet[i,:]) # mapper 代码
if labels[i]*p < 1: # mapper 代码
wDelta += labels[i]*dataSet[i,:].A # 累积变化
w = (1.0 - 1/t)*w + (eta/k)*wDelta # 在每个 T上应用更改
return w
p = predict(w, dataSet[i, :]) # mapper 代码

# 如果预测正确,并且预测结果的绝对值>=1, 认为没问题。
# 否则算是预测错误, 通过预测错误的结果,来累计更新w.
if labels[i]*p < 1: # mapper 代码
wDelta += labels[i]*dataSet[i, :].A # 累积变化
# w通过不断的随机梯度的方式来优化
w = (1.0 - 1/t)*w + (eta/k)*wDelta # 在每个 T上应用更改
# print '-----', w
# print '++++++', w
return w


datArr,labelList = loadDataSet('testSet.txt')
datArr, labelList = loadDataSet('input/15.BigData_MapReduce/testSet.txt')
datMat = mat(datArr)
#finalWs = seqPegasos(datMat, labelList, 2, 5000)
# finalWs = seqPegasos(datMat, labelList, 2, 5000)
finalWs = batchPegasos(datMat, labelList, 2, 50, 100)
print finalWs

import matplotlib
import matplotlib.pyplot as plt
fig = plt.figure()
ax = fig.add_subplot(111)
x1=[]; y1=[]; xm1=[]; ym1=[]
x1 = []
y1 = []
xm1 = []
ym1 = []
for i in range(len(labelList)):
if labelList[i] == 1.0:
x1.append(datMat[i,0]); y1.append(datMat[i,1])
x1.append(datMat[i, 0])
y1.append(datMat[i, 1])
else:
xm1.append(datMat[i,0]); ym1.append(datMat[i,1])
xm1.append(datMat[i, 0])
ym1.append(datMat[i, 1])
ax.scatter(x1, y1, marker='s', s=90)
ax.scatter(xm1, ym1, marker='o', s=50, c='red')
x = arange(-6.0, 8.0, 0.1)
y = (-finalWs[0,0]*x - 0)/finalWs[0,1]
#y2 = (0.43799*x)/0.12316
y2 = (0.498442*x)/0.092387 #2 iterations
ax.plot(x,y)
ax.plot(x,y2,'g-.')
ax.axis([-6,8,-4,5])
ax.legend(('50 Iterations', '2 Iterations') )
y = (-finalWs[0, 0]*x - 0)/finalWs[0, 1]
# y2 = (0.43799*x)/0.12316
y2 = (0.498442*x)/0.092387 # 2 iterations
ax.plot(x, y)
ax.plot(x, y2, 'g-.')
ax.axis([-6, 8, -4, 5])
ax.legend(('50 Iterations', '2 Iterations'))
plt.show()
31 changes: 17 additions & 14 deletions src/python/15.BigData_MapReduce/proximalSVM.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,39 @@
#!/usr/bin/python
# coding:utf8
'''
Created on Feb 25, 2011

@author: Peter
Created on 2011-02-25
Update on 2017-06-20
@author: Peter/ApacheCN-xy/片刻
《机器学习实战》更新地址:https://github.com/apachecn/MachineLearning
'''
import numpy

def map(key, value):
# input key= class for one training example, e.g. "-1.0"
classes = [float(item) for item in key.split(",")] # e.g. [-1.0]
D = numpy.diag(classes)

# input value = feature vector for one training example, e.g. "3.0, 7.0, 2.0"
featurematrix = [float(item) for item in value.split(",")]
A = numpy.matrix(featurematrix)

# create matrix E and vector e
e = numpy.matrix(numpy.ones(len(A)).reshape(len(A),1))
E = numpy.matrix(numpy.append(A,-e,axis=1))
e = numpy.matrix(numpy.ones(len(A)).reshape(len(A), 1))
E = numpy.matrix(numpy.append(A, -e, axis=1))

# create a tuple with the values to be used by reducer
# and encode it with base64 to avoid potential trouble with '\t' and '\n' used
# as default separators in Hadoop Streaming
producedvalue = base64.b64encode(pickle.dumps( (E.T*E, E.T*D*e) )
producedvalue = base64.b64encode(pickle.dumps( (E.T*E, E.T*D*e))

# note: a single constant key "producedkey" sends to only one reducer
# somewhat "atypical" due to low degree of parallism on reducer side
print "producedkey\t%s" % (producedvalue)

def reduce(key, values, mu=0.1):
sumETE = None
sumETDe = None

# key isn't used, so ignoring it with _ (underscore).
for _, value in values:
# unpickle values
Expand All @@ -39,13 +42,13 @@ def reduce(key, values, mu=0.1):
# create the I/mu with correct dimensions
sumETE = numpy.matrix(numpy.eye(ETE.shape[1])/mu)
sumETE += ETE

if sumETDe == None:
# create sumETDe with correct dimensions
sumETDe = ETDe
else:
sumETDe += ETDe

# note: omega = result[:-1] and gamma = result[-1]
# but printing entire vector as output
result = sumETE.I*sumETDe
Expand Down