diff --git a/lib/run_rabbit_run/amqp/logger.rb b/lib/run_rabbit_run/amqp/logger.rb index be398f5..798e29d 100644 --- a/lib/run_rabbit_run/amqp/logger.rb +++ b/lib/run_rabbit_run/amqp/logger.rb @@ -1,3 +1,5 @@ +require 'run_rabbit_run/utils/system' + module RRR module Amqp class Logger @@ -68,7 +70,7 @@ def headers headers: { created_at: Time.now.to_f, pid: Process.pid, - host: Socket.gethostname + ip: RRR::Utils::System.ip_address } } end diff --git a/lib/run_rabbit_run/amqp/queue.rb b/lib/run_rabbit_run/amqp/queue.rb index 2ac8e66..8035647 100644 --- a/lib/run_rabbit_run/amqp/queue.rb +++ b/lib/run_rabbit_run/amqp/queue.rb @@ -55,7 +55,8 @@ def headers routing_key: name, headers: { created_at: Time.now.to_f, - pid: Process.pid + pid: Process.pid, + ip: RRR::Utils::System.ip_address } } end diff --git a/lib/run_rabbit_run/amqp/system.rb b/lib/run_rabbit_run/amqp/system.rb index 87786c3..ebbef82 100644 --- a/lib/run_rabbit_run/amqp/system.rb +++ b/lib/run_rabbit_run/amqp/system.rb @@ -29,7 +29,7 @@ def headers name: @worker_name, created_at: Time.now.to_f, pid: Process.pid, - host: Socket.gethostname + ip: RRR::Utils::System.ip_address } end diff --git a/lib/run_rabbit_run/master.rb b/lib/run_rabbit_run/master.rb index 898dbc4..a82f0a8 100644 --- a/lib/run_rabbit_run/master.rb +++ b/lib/run_rabbit_run/master.rb @@ -58,7 +58,7 @@ def listen_to_worker_stop headers.ack else if headers.delivery_tag > 100 - RRR.logger.error "Worker stop failed [#{headers.headers['name']}][#{headers.headers['host']}][#{headers.headers['pid']}] with [#{payload.inspect}]" + RRR.logger.error "Worker stop failed [#{headers.headers['name']}][#{headers.headers['ip']}][#{headers.headers['pid']}] with [#{payload.inspect}]" headers.reject else headers.reject requeue: true @@ -86,7 +86,7 @@ def listen_to_worker_start def listen_to_workers queue = RRR::Amqp::Queue.new(@queue_name, auto_delete: true) queue.subscribe do | headers, payload | - RRR.logger.info "master got message from [#{headers.headers['name']}][#{headers.headers['host']}][#{headers.headers['pid']}] with [#{payload.inspect}]" + RRR.logger.info "master got message from [#{headers.headers['name']}][#{headers.headers['ip']}][#{headers.headers['pid']}] with [#{payload.inspect}]" case payload['message'].to_sym when :started diff --git a/lib/workers/loadbalancer.rb b/lib/workers/loadbalancer.rb index 89086fe..6ae6993 100644 --- a/lib/workers/loadbalancer.rb +++ b/lib/workers/loadbalancer.rb @@ -1,16 +1,16 @@ #TODO -# 1. Queues : use name parameter as rabbitmq queue name if it exists. -# 2. Write tests for the loadbalancer worker -# 3. Optimize tests to have standart helpers for the worker -# 4. Optimize rake tasks to be able to run whole system with one command +# * Queues : use name parameter as rabbitmq queue name if it exists. +# * Write tests for the loadbalancer worker +# * Optimize tests to have standart helpers for the worker +# * Optimize rake tasks to be able to run whole system with one command RRR::Worker.run 'system_loadbalancer' do - queue "#{RRR.config[:env]}.system.loadbalancer", durable: true - queue "#{RRR.config[:env]}.system.worker.start", durable: true + queue :loadbalancer, name: "#{RRR.config[:env]}.system.loadbalancer", durable: true + queue :worker_start, name: "#{RRR.config[:env]}.system.worker.start", durable: true processes max: 1, min: 1, desirable: 1 - subscribe "#{RRR.config[:env]}.system.loadbalancer" + subscribe :loadbalancer def call headers, payload if payload['action'] == 'deploy' @@ -18,7 +18,7 @@ def call headers, payload worker = eval(payload['code']) worker.processes[:min].times do | index | - queues["#{RRR.config[:env]}.system.worker.start"].notify code: payload['code'] + queues[:worker_start].notify code: payload['code'] end @workers ||= {} diff --git a/spec/support/helpers.rb b/spec/support/helpers.rb index f6a3b62..794c0d5 100644 --- a/spec/support/helpers.rb +++ b/spec/support/helpers.rb @@ -8,7 +8,7 @@ def master_default_headers options = {} name: "name", created_at: Time.local(2000).to_f, pid: Process.pid, - host: 'host' + ip: '1.1.1.1' } }.merge(options) end diff --git a/spec/unit/worker/queues_spec.rb b/spec/unit/worker/queues_spec.rb index a78d9b9..ffff28a 100644 --- a/spec/unit/worker/queues_spec.rb +++ b/spec/unit/worker/queues_spec.rb @@ -19,6 +19,7 @@ def call; end let(:queue) { stub(:queue) } before do + RRR::Utils::System.stub(:ip_address).and_return('1.1.1.1') RRR::Amqp.stub(:channel).and_return(channel) queue.stub(:bind) channel.should_receive(:prefetch) @@ -26,7 +27,8 @@ def call; end routing_key: :input, headers: { created_at: Time.local(2000).to_f, - pid: Process.pid + pid: Process.pid, + ip: '1.1.1.1' } }) end diff --git a/spec/unit/worker_runner_spec.rb b/spec/unit/worker_runner_spec.rb index b326dfa..7f4736b 100644 --- a/spec/unit/worker_runner_spec.rb +++ b/spec/unit/worker_runner_spec.rb @@ -13,7 +13,7 @@ def call; end channel = stub(:channel) exchange = stub(:exchange) - Socket.stub(:gethostname).and_return('host') + RRR::Utils::System.stub(:ip_address).and_return('1.1.1.1') RRR::Amqp.stub(:channel).and_return(channel)