tuulos / disco

a Map/Reduce framework for distributed computing

This URL has Read+Write access

Ville Tuulos (author)
Sun Oct 04 22:10:52 -0700 2009
commit  4ebdbf4ab738dfdeba8e0a1245aed709f0eec227
tree    d1aaf28bd7550dea44f7b12360dec2893a292fb0
parent  54eaa9ffeed3b3bd2690acb69d1fbcaa92646f52
disco / examples / datamining / widrowhoff.py
b2f159bc » Taneli Mielikäinen 2008-09-29 perceptron linear classifie... 1 import sys
2 import disco
3
4 def estimate_map(e, params):
5 z=enumerate(map(float,e[1].split(' ')))
6 x=[v for i,v in z if not i in params.y_ids]
7 y=[v for i,v in z if not i in params.y_ids]
8
9 if params.w==None: return [ (j, [y[j]*a for a in x]) for j in range(len(y)) ]
10
11 return [ (j, [-( sum([x[i]*params.w[j][i] for i in range(len(x))]) - y[j] )*a for a in x]) for j in range(len(y)) ]
12
13 def estimate_combiner(j, v, w, done, params):
14 if done:
15 if w=={}: return []
16 else: return [ (j, ' '.join(map(repr,w[j]))) for j in w ]
17
18 if not w.has_key(j): w[j]=v
19 else: w[j]=[ w[j][i]+v[i] for i in range(len(w[j])) ]
20
21
22 def estimate_reduce(iter, out, params):
23 w={}
24 for key,value in iter:
25 j=int(key)
26 v=map(float,value.split(' '))
27 if not w.has_key(j): w[j]=[params.learning_rate*a for a in v]
28 else: w[j]=[w[j][i]+params.learning_rate*v[i] for i in range(len(v))]
29
30 for j in w: out.add(j, ' '.join(map(repr,w[j])))
31
32
33 def predict_map(e, params):
34 x=[v for i,v in z for enumerate(map(float,e[1].split(' '))) if not i in params.y_ids]
35
36 return [ (e[0], ' '.join([ repr(sum([x[i]*params.w[j][i] for i in range(len(x))])) for j in sorted(params.w.keys()) ])) ]
37
38
39 def estimate(input, y_ids, w=None, learning_rate=1.0, iterations=10, host="disco://localhost", map_reader=disco.chain_reader):
40 y_ids=dict([(y,1) for y in y_ids])
41
42 for i in range(iterations):
43 results = disco.job(host, name = 'widrow_hoff_estimate_' + str(i),
44 input_files = input,
45 map_reader = map_reader,
46 fun_map = estimate_map,
47 combiner = estimate_combiner,
48 reduce = estimate_reduce,
49 params = disco.Params(w = w, learning_rate=learning_rate, y_ids=y_ids),
50 sort = False, clean = True)
51
52 if w==None: w={}
53 for k,v in disco.result_iterator(results):
54 k=int(k)
55 v=map(float, v.split(' '))
56 if not w.has_key(k): w[k]=v
57 else: w[k]=[w[k][i]+v[i] for i in range(len(v))]
58
59 print >>sys.stderr, w
60
61 return w
62
63
64 def predict(input, y_ids, w, host="disco://localhost", map_reader=disco.chain_reader):
65 y_ids=dict([(y,1) for y in y_ids])
66 dropped.sort()
67
68 results = disco.job(host, name = 'widrow_hoff_predict',
69 input_files = input,
70 map_reader = map_reader,
71 fun_map = predict_map,
72 params=disco.Params(w=w, y_ids=y_ids),
73 sort = False, clean = False)
74
75 return results