public
Description: a Map/Reduce framework for distributed computing
Homepage: http://discoproject.org
Clone URL: git://github.com/tuulos/disco.git
disco / examples / datamining / perceptron.py
b2f159bc » Taneli Mielikäinen 2008-09-29 perceptron linear classifie... 1 import sys
2 import disco
3
4 def estimate_map(e, params):
5 x=map(float,e[1].split(' '))
6 y=x[params.y_id]
7 del x[params.y_id]
8 if params.w!=None and y*sum([x[i]*params.w[i] for i in range(len(params.w))])>0: return []
9 return [('',[y*a for a in x])]
10
11
12 def estimate_combiner(k, v, w, done, params):
13 if done:
14 if w=={}: return []
15 else: return [('', ' '.join(map(repr,w[''])))]
16
17 if w=={}: w['']=v
18 else: w['']=[w[''][i]+v[i] for i in range(len(v))]
19
20
21 def estimate_reduce(iter, out, params):
22 w=None
23 for key,value in iter:
24 v=map(float,value.split(' '))
25 if w==None: w=[params.learning_rate*a for a in v]
26 else: w=[w[i]+params.learning_rate*v[i] for i in range(len(v))]
27
28 if w!=None: out.add('', ' '.join(map(repr,w)))
29
30
31 def predict_map(e, params):
32 x=map(float,e[1].split(' '))
33 del x[params.y_id]
34 return [(e[0],sum([x[i]*params.w[i] for i in range(len(params.w))]))]
35
36
37 def estimate(input, y_id, w=None, learning_rate=1.0, iterations=10, host="disco://localhost", map_reader=disco.chain_reader):
38 for i in range(iterations):
39 results = disco.job(host, name = 'perceptron_estimate_' + str(i),
40 input_files = input,
41 map_reader = map_reader,
42 fun_map = estimate_map,
43 combiner = estimate_combiner,
44 reduce = estimate_reduce,
45 params = disco.Params(w = w, learning_rate=learning_rate,y_id=y_id),
46 sort = False, clean = True)
47
48 for key,value in disco.result_iterator(results):
49 v=map(float,value.split(' '))
50 if w==None: w=v
51 else: w=[w[i]+v[i] for i in range(len(w))]
52
53 print >>sys.stderr,w
54
55 return w
56
57
58 def predict(input, y_id, w, host="disco://localhost", map_reader=disco.chain_reader):
59 results = disco.job(host, name = 'perceptron_predict',
60 input_files = input,
61 map_reader = map_reader,
62 fun_map = predict_map,
63 params=disco.Params(w=w, y_id=y_id),
64 sort = False, clean = False)
65
66 return results