<?xml version="1.0" encoding="UTF-8"?>
<commit>
  <added type="array">
    <added>
      <filename>lib/nanite/streaming.rb</filename>
    </added>
  </added>
  <modified type="array">
    <modified>
      <diff>@@ -3,12 +3,12 @@
 require File.dirname(__FILE__) + '/../lib/nanite'
 require File.dirname(__FILE__) + '/../lib/nanite/mapper'
 
-Nanite.identity = 'mapper'
+Nanite.identity = Nanite.gen_token
 password = ARGV[0] || 'testing'
 
 
-Nanite::Runner.start  :user           =&gt; Nanite.identity,
+Nanite::Runner.start  :user           =&gt; 'mapper',
                       :pass           =&gt; password,
                       :vhost          =&gt; '/nanite',
                       :host           =&gt; '0.0.0.0',
-                      :ping_time      =&gt; 30
+                      :ping_time      =&gt; 15</diff>
      <filename>bin/mapper</filename>
    </modified>
    <modified>
      <diff>@@ -8,7 +8,7 @@ require File.dirname(__FILE__) + '/../lib/nanite/agent'
 
 
 options = {:root =&gt; Dir.pwd,
-           :resources =&gt; [] }
+           :services =&gt; [] }
 
 # Build a parser for the command line arguments
 opts = OptionParser.new do |opts|
@@ -42,10 +42,6 @@ opts = OptionParser.new do |opts|
     options[:identity] = ident
   end
 
-  opts.on(&quot;-a&quot;, &quot;--address RETURN_TOKEN&quot;, &quot;This flag is for setting the return address token.&quot;) do |ret|
-    options[:return_address] = ret
-  end
-
   opts.on(&quot;-p&quot;, &quot;--pass PASSWORD&quot;, &quot;This flag is for setting the rabbitmq password&quot;) do |pass|
     options[:pass] = pass
   end
@@ -54,8 +50,8 @@ opts = OptionParser.new do |opts|
     options[:vhost] = vhost
   end
 
-  opts.on(&quot;-r&quot;, &quot;--resources RESOURCES&quot;, &quot;This flag is for setting the nanite's global resources: -r /slice/42,/foo/3&quot;) do |res|
-    options[:resources] = res.split(/,/).map { |r| r.strip }
+  opts.on(&quot;-s&quot;, &quot;--services SERVIVES&quot;, &quot;This flag is for setting the nanite's global services: -s /slice/42,/foo/3&quot;) do |res|
+    options[:services] = res.split(/,/).map { |r| r.strip }
   end
 
 end</diff>
      <filename>bin/nanite</filename>
    </modified>
    <modified>
      <diff>@@ -3,6 +3,3 @@
 :vhost: /nanite
 :user: ezra
 :identity: d3d0a65dc2e6025214647d67233804d3
-:resources: 
-- /slice/1
-- /sqlshard/42
\ No newline at end of file</diff>
      <filename>config/agent.yml</filename>
    </modified>
    <modified>
      <diff>@@ -3,5 +3,4 @@
 :vhost: /nanite
 :user: tom
 :identity: 12d0a65dc2e121214647d67233804d3
-:return_address: fb09b121291ad044d2737d4f68ab191
 #:host: 75.101.149.28</diff>
      <filename>examples/foo/config.yml</filename>
    </modified>
    <modified>
      <diff>@@ -3,4 +3,3 @@
 :vhost: /nanite
 :user: tom
 :identity: d3d0a65dc2e121214127d67233804d3
-:return_address: fb09b851291ad124d2737d4f68ab191
\ No newline at end of file</diff>
      <filename>examples/foo2/config.yml</filename>
    </modified>
    <modified>
      <diff>@@ -8,3 +8,5 @@ class Mock &lt; Nanite::Actor
 end
 
 Nanite::Dispatcher.register(Mock.new)
+
+Nanite.subscribe_to_files('foobar')
\ No newline at end of file</diff>
      <filename>examples/myagent/actors/mock.rb</filename>
    </modified>
    <modified>
      <diff>@@ -3,5 +3,4 @@
 :vhost: /nanite
 :user: yehuda
 :identity: d3d0a123dc2e6025214647d67233804d3
-:return_address: 1209b85d4191ad044d2737d4f68ab191
 #:host: 75.101.149.28</diff>
      <filename>examples/myagent/config.yml</filename>
    </modified>
    <modified>
      <diff>@@ -6,6 +6,7 @@ require 'nanite/packets'
 require 'nanite/reducer'
 require 'nanite/dispatcher'
 require 'nanite/actor'
+require 'nanite/streaming'
 require 'extlib'
 
 module Nanite
@@ -13,82 +14,20 @@ module Nanite
   VERSION = '0.1' unless defined?(Nanite::VERSION)
   
   class &lt;&lt; self
-    attr_accessor :identity, :user, :pass, :root, :vhost, :file_root, :files, :host
+    attr_accessor :identity, :status_proc, :results, :root, :vhost, :file_root, :files, :host
     
-    attr_accessor :default_resources, :last_ping, :ping_time, :return_address
-        
-    def request(type, payload=&quot;&quot;, &amp;blk)
-      token = Nanite.gen_token
-      op = Nanite::Request.new(type, payload)
-      op.reply_to = Nanite.return_address
-      Nanite.mapper.route(op) do |answer|
-        Nanite.callbacks[token] = blk if blk
-        Nanite.reducer.watch_for(answer)
-        Nanite.pending[token] = answer.token
-      end
-      token
-    end
-      
-    def broadcast_file(filename, dest, domain='global')
-      begin
-        file_push = FileStart.new(filename, dest)
-        Nanite.amq.topic('file broadcast').publish(Marshal.dump(file_push), :key =&gt; &quot;nanite.filepeer.#{domain}&quot;)
-        file = File.open(file_push.filename, 'rb')
-        res = Nanite::FileChunk.new(file_push.token)
-        while chunk = file.read(65536)
-          res.chunk = chunk
-          Nanite.amq.topic('file broadcast').publish(Marshal.dump(res), :key =&gt; &quot;nanite.filepeer.#{domain}&quot;)
-        end
-        fend = FileEnd.new(file_push.token)
-        Nanite.amq.topic('file broadcast').publish(Marshal.dump(fend), :key =&gt; &quot;nanite.filepeer.#{domain}&quot;)
-      ensure
-        file.close
-      end
-    end
-    
-    class FileState
-      
-      def initialize(token, dest)
-        @token = token
-        @dest = File.open(File.join(Nanite.file_root,dest), 'wb')
-      end
-      
-      def handle_packet(packet)
-        case packet
-        when FileChunk
-          @dest.write(packet.chunk)
-        when FileEnd
-          puts &quot;file written: #{@dest}&quot;
-          @dest.close
-          Nanite.files.delete(packet.token)
-        end  
-      end
-      
-    end  
-    
-    def subscribe_to_files(domain='global')
-      puts &quot;subscribing to file broadcasts for #{domain}&quot;
-      @files ||= {}
-      Nanite.amq.queue(&quot;files#{Nanite.return_address}&quot;).bind(Nanite.amq.topic('file broadcast'), :key =&gt; &quot;nanite.filepeer.#{domain}&quot;).subscribe{ |packet|
-        case msg = Marshal.load(packet)
-        when FileStart
-          @files[msg.token] = FileState.new(msg.token, msg.dest)
-        when FileChunk, FileEnd
-          if file = @files[msg.token]
-            file.handle_packet(msg)
-          end            
-        end
-      }
-    end
+    attr_accessor :default_services, :last_ping, :ping_time
     
+    include FileStreaming
+            
     def send_ping
-      ping = Nanite::Ping.new(Nanite.user, Nanite.identity)
+      ping = Nanite::Ping.new(Nanite.identity, Nanite.status_proc.call)
       Nanite.amq.topic('heartbeat').publish(Marshal.dump(ping), :key =&gt; 'nanite.pings')
     end
     
-    def advertise_resources
-      p &quot;advertise_resources&quot;,Nanite::Dispatcher.all_resources
-      reg = Nanite::Register.new(Nanite.user, Nanite.identity, Nanite::Dispatcher.all_resources)
+    def advertise_services
+      p &quot;advertise_services&quot;,Nanite::Dispatcher.all_services
+      reg = Nanite::Register.new(Nanite.identity, Nanite::Dispatcher.all_services, Nanite.status_proc.call)
       Nanite.amq.topic('registration').publish(Marshal.dump(reg), :key =&gt; 'nanite.register')
     end
     
@@ -117,36 +56,27 @@ module Nanite
       opts = config.merge(opts)
       Nanite.root              = opts[:root]
       Nanite.identity          = opts[:identity] || Nanite.gen_token
-      Nanite.user              = opts[:user]
-      Nanite.pass              = opts[:pass]
       Nanite.host              = opts[:host] || '0.0.0.0'
       Nanite.vhost             = opts[:vhost]
-      Nanite.return_address    = opts[:return_address] || Nanite.gen_token
       Nanite.file_root         = opts[:file_root] || Dir.pwd
-      Nanite.default_resources = opts[:resources] || []
+      Nanite.default_services  = opts[:services] || []
 
-      AMQP.start :user  =&gt; Nanite.user,
-                 :pass  =&gt; Nanite.pass,
+      AMQP.start :user  =&gt; opts[:user],
+                 :pass  =&gt; opts[:pass],
                  :vhost =&gt; Nanite.vhost,
                  :host  =&gt; Nanite.host,
                  :port  =&gt; (opts[:port] || ::AMQP::PORT).to_i
       
       load_actors
-      advertise_resources
+      advertise_services
                               
-      EM.add_periodic_timer(30) do
+      EM.add_periodic_timer(15) do
         send_ping
       end
       
       Nanite.amq.queue(Nanite.identity, :exclusive =&gt; true).subscribe{ |msg|
         Nanite::Dispatcher.handle(Marshal.load(msg))
-      }
-      
-      Nanite.amq.queue(Nanite.return_address, :exclusive =&gt; true).subscribe{ |msg|
-        msg = Marshal.load(msg)
-        Nanite.reducer.handle_result(msg)
-      }
-      
+      }      
       start_console if opts[:console]
     end  
     
@@ -154,10 +84,17 @@ module Nanite
       @reducer ||= Nanite::Reducer.new
     end
     
-    def mapper
-      Thread.current[:mapper] ||= MQ.new.rpc('mapper')
-    end  
+    def status_proc
+      @status_proc ||= lambda{ parse_uptime(`uptime`) rescue &quot;no status&quot;}
+    end
     
+    def parse_uptime(up)
+      if up =~ /load averages?: (.*)/
+        a,b,c = $1.split(/\s+|,\s+/)
+        (a.to_f + b.to_f + c.to_f) / 3
+      end
+    end
+      
     def amq
       Thread.current[:mq] ||= MQ.new
     end</diff>
      <filename>lib/nanite.rb</filename>
    </modified>
    <modified>
      <diff>@@ -5,14 +5,10 @@ module Nanite
         (@actors ||= []) &lt;&lt; actor_instance
       end
       
-      def all_resources
+      def all_services
         (@actors||[]).map {|a| a.provides }.flatten.uniq
       end
-  
-      def candidates(resources)
-        (@actors||[]).select {|actor| match?(actor.provides,resources)}
-      end
-  
+    
       def dispatch_request(op)
         _, actor, meth = op.type.split('/')
         begin
@@ -21,13 +17,13 @@ module Nanite
         rescue Exception =&gt; e
           res = &quot;Dispatch Error: #{e.message}&quot;
         end
-        Nanite::Result.new(op.token, op.reply_to, Nanite.user, res)
+        Nanite::Result.new(op.token, op.reply_to, res)
       end    
       
       def dispatch_getfile(getfile)
         begin
           file = File.new(getfile.filename, 'rb')
-          res = Nanite::Result.new(getfile.token, getfile.reply_to, Nanite.user, '')
+          res = Nanite::Result.new(getfile.token, getfile.reply_to, '')
           while chunk = file.read(getfile.chunksize)
             res.results = chunk
             Nanite.amq.queue(getfile.reply_to).publish(Marshal.dump(res))
@@ -45,18 +41,20 @@ module Nanite
           Nanite.last_ping = Time.now
         when Nanite::Advertise
           Nanite.last_ping = Time.now
-          Nanite.advertise_resources
+          Nanite.advertise_services
         when Nanite::Request
           result = dispatch_request(packet)
           Nanite.amq.queue(packet.reply_to).publish(Marshal.dump(result))
         when Nanite::GetFile
           dispatch_getfile(packet)
+        when Nanite::Result
+          Nanite.reducer.handle_result(packet)
         end
       end
       
-      def match?(required_resource, provided_resources)
-        provided_resources.any? do |r|
-          r == required_resource
+      def match?(required_service, provided_services)
+        provided_services.any? do |r|
+          r == required_service
         end
       end
     end    </diff>
      <filename>lib/nanite/dispatcher.rb</filename>
    </modified>
    <modified>
      <diff>@@ -1,20 +1,31 @@
+require 'nanite'
 require 'nanite/reducer'
 require 'nanite/dispatcher'
 
 module Nanite
+  class &lt;&lt; self
+    
+    attr_accessor :mapper    
+    
+    def request(type, payload=&quot;&quot;, &amp;blk)
+      Nanite.mapper.request(type, payload, &amp;blk)
+    end
+  end
+  
   class Runner
     
     def self.start(opts={})
       EM.run{
-        ping_time = opts.delete(:ping_time) || 30
+        ping_time = opts.delete(:ping_time) || 15
         AMQP.start opts
-        Mapper.new(ping_time)
+        Nanite.mapper = Mapper.new(ping_time)
+        Nanite.start_console #if opts[:start_console]
       }  
     end
   end  
   
   class Mapper
-    
+    attr_accessor :nanites
     def log *args
       p args
     end
@@ -25,10 +36,7 @@ module Nanite
       @nanites = {}
       @amq = MQ.new
       setup_queues
-      EM.add_timer(@ping_time * 1.2) do
-        log &quot;starting mapper with nanites(#{@nanites.keys.size}):&quot;, @nanites.keys
-        MQ.new.rpc('mapper', self) 
-      end
+      log &quot;starting mapper with nanites(#{@nanites.keys.size}):&quot;, @nanites.keys
       EM.add_periodic_timer(@ping_time) { check_pings }
     end
     
@@ -40,11 +48,17 @@ module Nanite
       @amq.queue(&quot;mapper#{@identity}&quot;,:exclusive =&gt; true).bind(@amq.topic('registration'), :key =&gt; 'nanite.register').subscribe{ |msg|
         register(Marshal.load(msg))
       }
+      @amq.queue(Nanite.identity, :exclusive =&gt; true).subscribe{ |msg|
+        msg = Marshal.load(msg)
+        p msg
+        Nanite.reducer.handle_result(msg)
+      }
     end        
     
     def handle_ping(ping)
       if nanite = @nanites[ping.from]
         nanite[:timestamp] = Time.now
+        nanite[:status] = ping.status
         @amq.queue(ping.identity).publish(Marshal.dump(Nanite::Pong.new(ping)))
       else
         @amq.queue(ping.identity).publish(Marshal.dump(Nanite::Advertise.new(ping)))
@@ -62,47 +76,68 @@ module Nanite
     end
     
     def register(reg)
-      @nanites[reg.name] = {:timestamp =&gt; Time.now,
-                            :resources =&gt; reg.resources,
-                            :identity  =&gt; reg.identity}
-      log &quot;registered:&quot;, reg.name, reg.identity, reg.resources
+      @nanites[reg.identity] = {:timestamp =&gt; Time.now,
+                                :services =&gt; reg.services,
+                                :status    =&gt; reg.status}
+      log &quot;registered:&quot;, reg.identity, reg.services
     end
-        
-    def discover(resource)
+
+    def select_nanites
       names = []
-      @nanites.each do |name, content|      
-        names &lt;&lt; [name, content[:identity]] if match?(resource, content[:resources])
-      end  
+      @nanites.each do |name, content|
+        names &lt;&lt; [name, content] if yield(name, content)
+      end
       names
     end
     
-    def match?(resource, resources)
-      resources.any? {|r| r == resource }
+    def least_loaded(res)
+      log &quot;least_loaded: #{res}&quot;
+      candidates = select_nanites { |n,r| r[:services].include?(res) }
+      res = candidates.min { |a,b|  a[1][:status] &lt;=&gt; b[1][:status] }
+      p res
+      res
+    end
+    
+    def request(type, payload=&quot;&quot;, &amp;blk)
+      req = Nanite::Request.new(type, payload)
+      req.token = Nanite.gen_token
+      req.reply_to = Nanite.identity
+      answer = route(req)
+      p &quot;answer: #{answer}&quot;
+      if answer
+        Nanite.callbacks[answer.token] = blk if blk
+        Nanite.reducer.watch_for(answer)
+        answer.token
+      else
+        puts &quot;failed&quot;
+      end    
     end
     
     def route(req)
       log &quot;route(req) from:#{req.from}&quot; 
-      targets = discover(req.type)
+      targets = least_loaded(req.type)
+      p &quot;targets&quot;, targets
+      #return nil if targets.emtpy?
       token = Nanite.gen_token
       answer = Answer.new(token)
       req.token = token
-      
-      targets.reject! { |target| ! allowed?(req.from, target.first) }
-      
-      workers = targets.map{|t| t.first }  
-      answer.workers = Hash[*workers.zip(Array.new(workers.size, :waiting)).flatten]
+            
+      answer.workers = Hash[*targets.zip(Array.new(targets.size, :waiting)).flatten]
     
+      p answer
+      
       EM.next_tick {
         targets.each do |target|
-          send_request(req, target.last)
+          send_request(req, target)
         end
       }
+      p answer
       answer
     end
     
     def file(getfile)
       log &quot;file(getfile) from:#{getfile.from}&quot; 
-      target = discover(getfile.resources).first
+      target = discover(getfile.services).first
       token = Nanite.gen_token
       file_transfer = FileTransfer.new(token)
       getfile.token = token
@@ -122,9 +157,5 @@ module Nanite
       @amq.queue(target).publish(Marshal.dump(req))
     end
         
-    def allowed?(from, to)
-      true
-    end    
-        
   end  
 end
\ No newline at end of file</diff>
      <filename>lib/nanite/mapper.rb</filename>
    </modified>
    <modified>
      <diff>@@ -11,13 +11,8 @@ module Nanite
       @results[res.from] = res.results
       @workers.delete(res.from)
       if @workers.empty?
-        Nanite.pending.each do |k,v|
-          if @token == v
-            Nanite.callbacks[k].call(@results) if Nanite.callbacks[k]
-            Nanite.pending.delete(k) 
-            Nanite.callbacks.delete(k) 
-          end    
-        end
+        cback = Nanite.callbacks.delete(@token) 
+        cback.call(@results) if cback
       end
     end
     
@@ -74,42 +69,43 @@ module Nanite
     attr_accessor :from, :payload, :type, :token, :reply_to
     def initialize(type, payload)
       @type, @payload = type, payload
-      @from = Nanite.user
+      @from = Nanite.identity
     end
   end
   
   class GetFile
-    attr_accessor :from, :filename, :token, :resources, :reply_to, :chunksize
-    def initialize(file, *resources)
-      @filename, @resources = file, resources
-      @from = Nanite.user
+    attr_accessor :from, :filename, :token, :services, :reply_to, :chunksize
+    def initialize(file, service)
+      @filename, @services = file, service
+      @from = Nanite.identity
       @chunksize = 65536
     end
   end
     
   class Result
-    attr_accessor :token, :results, :from, :to
-    def initialize(token, to, from, results)
+    attr_accessor :token, :results, :to, :from
+    def initialize(token, to, results)
       @token = token
       @to = to
-      @from = from
+      @from = Nanite.identity
       @results = results
     end
   end
     
   class Register
-    attr_accessor :name, :identity, :resources
-    def initialize(name, identity, resources)
-      @name = name
+    attr_accessor :identity, :services, :status
+    def initialize(identity, services, status)
+      @status = status
       @identity = identity
-      @resources = resources
+      @services = services
     end
   end  
     
   class Ping
-    attr_accessor :from, :identity
-    def initialize(from, identity)
-      @from = from
+    attr_accessor :identity, :status, :from
+    def initialize(identity, status)
+      @status = status
+      @from = Nanite.identity
       @identity = identity
     end
   end</diff>
      <filename>lib/nanite/packets.rb</filename>
    </modified>
  </modified>
  <removed type="array">
    <removed>
      <filename>lib/nanite/agent.rb</filename>
    </removed>
    <removed>
      <filename>lib/nanite/client.rb</filename>
    </removed>
    <removed>
      <filename>lib/nanite/persistence.rb</filename>
    </removed>
  </removed>
  <parents type="array">
    <parent>
      <id>c6946b59d219b502930e5602f738d1269109cd67</id>
    </parent>
  </parents>
  <author>
    <name>Ezra Zygmuntowicz</name>
    <email>ez@engineyard.com</email>
  </author>
  <url>http://github.com/bmizerany/nanite/commit/08016429dfdc2605d2c04d519ab94bc59f4bb698</url>
  <id>08016429dfdc2605d2c04d519ab94bc59f4bb698</id>
  <committed-date>2008-10-06T22:14:39-07:00</committed-date>
  <authored-date>2008-10-06T22:14:39-07:00</authored-date>
  <message>excuse the major refactoring here. almost ready..</message>
  <tree>03c7e8b9b1deceac09b482b3078459b73d0af103</tree>
  <committer>
    <name>Ezra Zygmuntowicz</name>
    <email>ez@engineyard.com</email>
  </committer>
</commit>
