public
Description: a Map/Reduce framework for distributed computing
Homepage: http://discoproject.org
Clone URL: git://github.com/tuulos/disco.git
Ville Tuulos (author)
Sat Dec 27 20:58:30 -0800 2008
commit  9f479c1e244c4f092bf40644034e960222712e79
tree    846bc30edad777d53320da7a7951eb1c747eb46e
parent  ba1323397e1e7fd1600ad8164694cfd8beefaf10
disco / examples / datamining / naive_linreg.py
b2f159bc » Taneli Mielikäinen 2008-09-29 perceptron linear classifie... 1 import disco
2
3 def estimate_map(e, params):
4 x=map(float,e[1].split(' '))
5 y=x[params.y_id]
6 del x[params.y_id]
7
8 return [(e[0],(x,y))]
9
10
11 def estimate_combiner(k, v, vals, done, params):
12 if vals=={}:
13 vals['x']=[0.0]*len(v[0])
14 vals['x2']=[0.0]*len(v[0])
15 vals['xy']=[0.0]*len(v[0])
16 vals['y']=0.0
17 vals['c']=0
18
19 if done:
20 return [(k, ' '. join(map(repr,vals['x'] + vals['x2'] + vals['xy'] + [ vals['y'], vals['c'] ])))]
21
22 for i in range(len(v)):
23 vals['x'][i]+=v[0][i]
24 vals['x2'][i]+=v[0][i]*v[0][i]
25 vals['xy'][i]+=v[0][i]*v[1]
26 vals['y']+=v[1]
27 vals['c']+=1
28
29
30
31 def predict_map(e, params):
32 x=map(float,e[1].split(' '))
33 return [(e[0],' '.join(map(repr,[params[i][0]+params[i][1]*x[i] for i in range(len(params))])))]
34
35
36 def estimate(input, y_id, host="disco://localhost", map_reader=disco.chain_reader):
37 results = disco.job(host, name = 'naive_linear_regression_estimate',
38 input_files = input,
39 map_reader = map_reader,
40 fun_map = estimate_map,
41 combiner = estimate_combiner,
42 params=disco.Params(y_id=y_id),
43 sort = False, clean = False)
44
45 c=0
46 y=0.0
47 l=None
48 x=None
49 x2=None
50 xy=None
51
52 for key,value in disco.result_iterator(results):
53 v=map(float,value.split(' '))
54
55 if l==None:
56 l=(len(v)-2)/3
57 x=[0.0]*l
58 x2=[0.0]*l
59 xy=[0.0]*l
60
61 c+=v[-1]
62 y+=v[-2]
63 for i in range(l):
64 x[i]+=v[i]
65 x2[i]+=v[l+i]
66 xy[i]+=v[2*l+i]
67
68 b = [ (c*xy[i] - x[i]*y)/(c*x2[i]+x[i]*x[i]) for i in range(l) ]
69 a = [ (y-b[i]*x[i])/c for i in range(l) ]
70
71 return zip(*(a,b))
72
73
74 def predict(input, model, host="disco://localhost", map_reader=disco.chain_reader):
75 results = disco.job(host, name = 'naive_linear_regression_predict',
76 input_files = input,
77 map_reader = map_reader,
78 fun_map = predict_map,
79 params=model,
80 sort = False, clean = False)
81
82 return results