Skip to content

Commit

Permalink
Includes ip adress in all headers
Browse files Browse the repository at this point in the history
  • Loading branch information
fragallia committed May 28, 2013
1 parent e24f80f commit 2c4a4b8
Show file tree
Hide file tree
Showing 8 changed files with 21 additions and 16 deletions.
4 changes: 3 additions & 1 deletion lib/run_rabbit_run/amqp/logger.rb
@@ -1,3 +1,5 @@
require 'run_rabbit_run/utils/system'

module RRR
module Amqp
class Logger
Expand Down Expand Up @@ -68,7 +70,7 @@ def headers
headers: {
created_at: Time.now.to_f,
pid: Process.pid,
host: Socket.gethostname
ip: RRR::Utils::System.ip_address
}
}
end
Expand Down
3 changes: 2 additions & 1 deletion lib/run_rabbit_run/amqp/queue.rb
Expand Up @@ -55,7 +55,8 @@ def headers
routing_key: name,
headers: {
created_at: Time.now.to_f,
pid: Process.pid
pid: Process.pid,
ip: RRR::Utils::System.ip_address
}
}
end
Expand Down
2 changes: 1 addition & 1 deletion lib/run_rabbit_run/amqp/system.rb
Expand Up @@ -29,7 +29,7 @@ def headers
name: @worker_name,
created_at: Time.now.to_f,
pid: Process.pid,
host: Socket.gethostname
ip: RRR::Utils::System.ip_address
}
end

Expand Down
4 changes: 2 additions & 2 deletions lib/run_rabbit_run/master.rb
Expand Up @@ -58,7 +58,7 @@ def listen_to_worker_stop
headers.ack
else
if headers.delivery_tag > 100
RRR.logger.error "Worker stop failed [#{headers.headers['name']}][#{headers.headers['host']}][#{headers.headers['pid']}] with [#{payload.inspect}]"
RRR.logger.error "Worker stop failed [#{headers.headers['name']}][#{headers.headers['ip']}][#{headers.headers['pid']}] with [#{payload.inspect}]"
headers.reject
else
headers.reject requeue: true
Expand Down Expand Up @@ -86,7 +86,7 @@ def listen_to_worker_start
def listen_to_workers
queue = RRR::Amqp::Queue.new(@queue_name, auto_delete: true)
queue.subscribe do | headers, payload |
RRR.logger.info "master got message from [#{headers.headers['name']}][#{headers.headers['host']}][#{headers.headers['pid']}] with [#{payload.inspect}]"
RRR.logger.info "master got message from [#{headers.headers['name']}][#{headers.headers['ip']}][#{headers.headers['pid']}] with [#{payload.inspect}]"

case payload['message'].to_sym
when :started
Expand Down
16 changes: 8 additions & 8 deletions lib/workers/loadbalancer.rb
@@ -1,24 +1,24 @@
#TODO
# 1. Queues : use name parameter as rabbitmq queue name if it exists.
# 2. Write tests for the loadbalancer worker
# 3. Optimize tests to have standart helpers for the worker
# 4. Optimize rake tasks to be able to run whole system with one command
# * Queues : use name parameter as rabbitmq queue name if it exists.
# * Write tests for the loadbalancer worker
# * Optimize tests to have standart helpers for the worker
# * Optimize rake tasks to be able to run whole system with one command

RRR::Worker.run 'system_loadbalancer' do
queue "#{RRR.config[:env]}.system.loadbalancer", durable: true
queue "#{RRR.config[:env]}.system.worker.start", durable: true
queue :loadbalancer, name: "#{RRR.config[:env]}.system.loadbalancer", durable: true
queue :worker_start, name: "#{RRR.config[:env]}.system.worker.start", durable: true

processes max: 1, min: 1, desirable: 1

subscribe "#{RRR.config[:env]}.system.loadbalancer"
subscribe :loadbalancer

def call headers, payload
if payload['action'] == 'deploy'
raise 'No code given' unless payload['code']

worker = eval(payload['code'])
worker.processes[:min].times do | index |
queues["#{RRR.config[:env]}.system.worker.start"].notify code: payload['code']
queues[:worker_start].notify code: payload['code']
end

@workers ||= {}
Expand Down
2 changes: 1 addition & 1 deletion spec/support/helpers.rb
Expand Up @@ -8,7 +8,7 @@ def master_default_headers options = {}
name: "name",
created_at: Time.local(2000).to_f,
pid: Process.pid,
host: 'host'
ip: '1.1.1.1'
}
}.merge(options)
end
Expand Down
4 changes: 3 additions & 1 deletion spec/unit/worker/queues_spec.rb
Expand Up @@ -19,14 +19,16 @@ def call; end
let(:queue) { stub(:queue) }

before do
RRR::Utils::System.stub(:ip_address).and_return('1.1.1.1')
RRR::Amqp.stub(:channel).and_return(channel)
queue.stub(:bind)
channel.should_receive(:prefetch)
exchange.should_receive(:publish).with("{\"some\":\"data\"}", {
routing_key: :input,
headers: {
created_at: Time.local(2000).to_f,
pid: Process.pid
pid: Process.pid,
ip: '1.1.1.1'
}
})
end
Expand Down
2 changes: 1 addition & 1 deletion spec/unit/worker_runner_spec.rb
Expand Up @@ -13,7 +13,7 @@ def call; end
channel = stub(:channel)
exchange = stub(:exchange)

Socket.stub(:gethostname).and_return('host')
RRR::Utils::System.stub(:ip_address).and_return('1.1.1.1')

RRR::Amqp.stub(:channel).and_return(channel)

Expand Down

0 comments on commit 2c4a4b8

Please sign in to comment.