public
Description: a Map/Reduce framework for distributed computing
Homepage: http://discoproject.org
Clone URL: git://github.com/tuulos/disco.git
Ville Tuulos (author)
Thu Jun 18 22:00:52 -0700 2009
commit  dfa6e76ffd729063ba3c221d2db655686fd59328
tree    c7b84b52e704ef95d5223353592a241282b590af
parent  c254f05ebf8a27358562e3822c38780f30e35815
disco / examples / datamining / kmeans.py
100644 94 lines (70 sloc) 2.788 kb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import disco
 
def init_map(e, params):
  import random
  return [(random.randint(0,params.k-1),e[1])]
 
 
def estimate_map(e, params):
  return[(min([(params.dist(c,map(float,e[1].split(' '))),i) for (i,c) in enumerate(params.centers)])[1],e[1])]
 
 
def estimate_combiner(k, v, centers, done, params):
        if done:
                return [(i,' '.join(map(repr,c))) for (i,c) in centers.iteritems()]
        else:
    v=map(float,v.split(' '))
                if not centers.has_key(k): centers[k]=[0.0]*len(v) + [0]
                for i in range(len(v)): centers[k][i]+=v[i]
 
    centers[k][len(v)]+=1
 
 
def estimate_reduce(iter, out, params):
        x={}
        for k,v in iter:
                y=map(float,v.split(' '))
                if not x.has_key(k):
      x[k]=y
                else:
                        for i in y: x[k][i]+=y[i]
 
        for k,v in x.iteritems():
                for i in range(len(v)-1): v[i]/=v[-1]
                out.add(k,' '.join(map(repr,v)))
 
 
def predict_map(e, params):
  return [(e[0],min([(params.dist(c,map(float,map(float,e[1].split(' ')))),i) for (i,c) in enumerate(params.centers)])[1])]
 
 
def d2(x,y): return sum([(x[i]-y[i])**2 for i in range(len(x))])
 
 
def estimate(input, centers, k, iterations=10, host="disco://localhost", map_reader=disco.chain_reader, nr_reduces=None):
  if centers!=None: k=len(centers)
  if nr_reduces==None: nr_reduces=k
 
  results=None
  if centers==None:  
    results = disco.job(host, name = 'kmeans_init',
         input_files = input,
         map_reader = map_reader,
         fun_map = init_map,
         combiner = estimate_combiner,
         reduce = estimate_reduce,
         nr_reduces = nr_reduces,
         params = disco.Params(k=k),
         sort = False, clean = True)
 
  for i in range(iterations):
    if results!=None:
      centers=[None]*k
      counts=[None]*k
      for key,value in disco.result_iterator(results):
        x=map(float,value.split(' '))
        centers[int(key)]=x[:-1]
        counts[int(key)]=x[-1]
 
    results = disco.job(host, name = 'kmeans_iterate_'+str(i),
         input_files = input,
         map_reader = map_reader,
         fun_map = estimate_map,
         combiner = estimate_combiner,
         reduce = estimate_reduce,
         nr_reduces = nr_reduces,
         params = disco.Params(centers=centers,dist=d2),
         sort = False, clean = True)
    
  return centers
 
 
def predict(input, centers, host="disco://localhost", map_reader=disco.chain_reader, nr_reduces=None):
  if nr_reduces==None: nr_reduces=len(centers)
 
  results = disco.job(host, name = 'kmeans_output',
       input_files = input,
       map_reader = map_reader,
       fun_map = predict_map,
       nr_reduces = nr_reduces,
       params = disco.Params(centers=centers,dist=d2),
       sort = False, clean = True)
 
  return results