Permalink
Browse files

prepend class name to all log messages

  • Loading branch information...
1 parent bfde386 commit aaaf307d5e0f7177af95cb45a4eb97938ecb2c34 Chuck Remes committed Dec 5, 2011
View
@@ -21,6 +21,8 @@
HEARTBEAT, update its intervals, and then wait another interval before
sending a message. By including the interval and retry information in
the READY message, we avoid all of these timing issues.
+
+ * Improved some logging.
== 0.6.0 / 2011-11-30
* Changed Broker to send a payload in the ClientReplyFailure message.
@@ -65,7 +65,7 @@ def process_client(message)
if available_workers?(message.service_name)
dispatch_client_work(message)
else
- @reactor.log(:info, "Broker, no workers to handle request; failed!")
+ @reactor.log(:info, "#{self.class}, No workers to handle request; failed!")
send_client_failure(message.envelope_msgs, message)
end
end
@@ -90,7 +90,7 @@ def purge_expired_workers
end
def send_client_failure(return_address, message)
- @reactor.log(:error, "Broker sending a client failure message.")
+ @reactor.log(:error, "#{self.class}, Broker sending a client failure message.")
@router.write(return_address + @client_reply_failure_msg_klass.from_request(message).to_msgs)
end
@@ -99,12 +99,12 @@ def available_workers?(service_name)
if service
# returns true if there are workers, false otherwise
- @reactor.log(:debug, "Broker, found service for [#{service_name}]")
+ @reactor.log(:debug, "#{self.class}, Found service for [#{service_name}]")
found = service.workers?
- @reactor.log(:debug, "Broker, workers for [#{service_name}] found? [#{found}]")
+ @reactor.log(:debug, "#{self.class}, Workers for [#{service_name}] found? [#{found}]")
found
else
- @reactor.log(:warn, "Broker, no service found for [#{service_name}]")
+ @reactor.log(:warn, "#{self.class}, No service found for [#{service_name}]")
false
end
end
@@ -113,17 +113,17 @@ def connect_worker(message)
worker_identity = message.envelope_identity
if worker = @services.find_worker(worker_identity)
# worker was already registered; got READY msg out of sequence
- @reactor.log(:warn, "Worker [#{worker_identity}] already exists; force disconnect.")
+ @reactor.log(:warn, "#{self.class}, Worker [#{worker_identity}] already exists; force disconnect.")
#disconnect_worker(worker)
#@services.deregister(worker)
else
@services.register(message.service_name, worker_identity, message.heartbeat_interval, message.heartbeat_retries, message.envelope.dup)
- @reactor.log(:info, "Activated worker [#{worker_identity}] for service [#{message.service_name}]")
+ @reactor.log(:info, "#{self.class}, Activated worker [#{worker_identity}] for service [#{message.service_name}]")
end
end
def disconnect_worker(worker)
- @reactor.log(:info, "Disconnecting a worker [#{worker.identity}] for service [#{worker.service_name}].")
+ @reactor.log(:info, "#{self.class}, Disconnecting a worker [#{worker.identity}] for service [#{worker.service_name}].")
@router.write(worker.return_address + @worker_disconnect_msg_klass.new(worker.service_name).to_msgs)
@services.deregister_worker(worker)
end
@@ -140,38 +140,38 @@ def process_worker_heartbeat(message)
# pass in the worker; uses worker to build hb message and get return address
def send_worker_heartbeat(worker)
- @reactor.log(:debug, "Broker, heartbeat for worker [#{worker.identity}]")
+ @reactor.log(:debug, "#{self.class}, Heartbeat for worker [#{worker.identity}]")
@router.write(worker.return_address + @worker_heartbeat_msg_klass.new.to_msgs)
end
def send_worker_request(worker, request)
- @reactor.log(:debug, "Sending request to worker [#{worker.identity}]")
+ @reactor.log(:debug, "#{self.class}, Sending request to worker [#{worker.identity}]")
@router.write(worker.return_address + @worker_request_msg_klass.from_client_request(request).to_msgs)
end
def send_client_reply_success(return_address, service_name, sequence_id, payload)
- @reactor.log(:debug, "Broker, sending a successful reply to client.")
+ @reactor.log(:debug, "#{self.class}, Sending a successful reply to client.")
@router.write(return_address + @client_reply_success_msg_klass.new(service_name, sequence_id, payload).to_msgs)
end
def send_client_reply_failure(return_address, service_name, sequence_id, payload)
- @reactor.log(:debug, "Broker, sending a failure reply to client.")
+ @reactor.log(:debug, "#{self.class}, Sending a failure reply to client.")
@router.write(return_address + @client_reply_failure_msg_klass.new(service_name, sequence_id, payload).to_msgs)
end
def dispatch_client_work(message)
- @reactor.log(:error, "Called #dispatch_client_work. Should be overridden by subclass!")
+ @reactor.log(:error, "#{self.class}, Called #dispatch_client_work. Should be overridden by subclass!")
end
def process_worker_reply(message)
- @reactor.log(:error, "Called #process_worker_reply. Should be overridden by subclass!")
+ @reactor.log(:error, "#{self.class}, Called #process_worker_reply. Should be overridden by subclass!")
end
private
def configure_messages_classes(config)
- @reactor.log(:error, "Called #configure_messages_classes. Should be overridden by subclass!")
+ @reactor.log(:error, "#{self.class}, Called #configure_messages_classes. Should be overridden by subclass!")
end
def valid_client?(message)
@@ -30,13 +30,13 @@ def each
def add(worker)
@workers[worker.identity] = worker
- @reactor.log(:debug, "Service [#{name}] adding worker, [#{worker_count}] total workers.")
+ @reactor.log(:debug, "#{self.class}, Service [#{name}] adding worker, [#{worker_count}] total workers.")
end
def delete(worker)
@workers.delete(worker.identity)
worker.die
- @reactor.log(:debug, "Service [#{name}] deleting worker, [#{worker_count}] remaining workers.")
+ @reactor.log(:debug, "#{self.class}, Service [#{name}] deleting worker, [#{worker_count}] remaining workers.")
end
def [](identity)
@@ -13,7 +13,7 @@ def initialize(reactor, handler, service_klass, worker_klass)
def register(service_name, identity, heartbeat_interval, heartbeat_retries, envelope)
unless @services.has_key?(service_name)
- @reactor.log(:info, "Services, creating service for [#{service_name}]")
+ @reactor.log(:info, "#{self.class}, Creating service for [#{service_name}]")
@services[service_name] = @service_klass.new(@reactor, service_name, @handler)
end
@@ -26,7 +26,7 @@ def return_address
end
def start_heartbeat
- @reactor.log(:debug, "Worker [#{@identity}] for service [#{@service_name}] starting heartbeat with interval [#{@heartbeat_interval}]")
+ @reactor.log(:debug, "#{self.class}, Worker [#{@identity}] for service [#{@service_name}] starting heartbeat with interval [#{@heartbeat_interval}]")
@heartbeat_timer = @reactor.periodical_timer(@heartbeat_interval) { beat }
end
@@ -37,14 +37,14 @@ def beat
# Called by handler whenever it receives a worker heartbeat.
#
def process_heartbeat(message = nil)
- @reactor.log(:debug, "On broker, worker [#{@identity}] received a HB message.")
+ @reactor.log(:debug, "#{self.class}, On broker, worker [#{@identity}] received a HB message.")
@hb_received_at = Time.now
end
# Called when the worker has sent a DISCONNECT or its heartbeats have timed out.
#
def die
- @reactor.log(:info, "Worker [#{@identity}] is exiting.")
+ @reactor.log(:info, "#{self.class}, Worker [#{@identity}] is exiting.")
@heartbeat_timer.cancel
end
@@ -56,7 +56,7 @@ def expired?
elapsed = ((Time.now - @hb_received_at) * 1_000)
if elapsed > (@heartbeat_interval * @heartbeat_retries)
- @reactor.log(:warn, "Broker Worker [#{@identity}] expiring, last received hb at #{@hb_received_at}")
+ @reactor.log(:warn, "#{self.class}, Broker Worker [#{@identity}] expiring, last received hb at #{@hb_received_at}")
true
else
false
@@ -38,14 +38,14 @@ def on_read(socket, messages, envelope)
def send_request(service_name, sequence_id, payload)
message = @client_request_msg_klass.new(service_name, sequence_id, payload)
- @reactor.log(:debug, "client, sending request #{message.inspect}")
+ @reactor.log(:debug, "#{self.class}, Sending request #{message.inspect}")
write(@base_msg_klass.delimiter + message.to_msgs)
end
def process_request(message, request_options = nil)
request_options ||= RequestOptions.new
message.sequence_id = get_sequence_id
- @reactor.log(:debug, "client, processing request #{message.inspect}")
+ @reactor.log(:debug, "#{self.class}, Processing request #{message.inspect}")
@requests.add(message, request_options)
end
@@ -77,7 +77,7 @@ def on_failure(request, message = nil)
# also regenerate our client ID before reconnecting.
#
def timeouts_exceeded
- @reactor.log(:warn, "Client exceeded allowable [#{@max_broker_timeouts}] timeout failures; reopening socket to Broker!")
+ @reactor.log(:warn, "#{self.class}, Client exceeded allowable [#{@max_broker_timeouts}] timeout failures; reopening socket to Broker!")
# active requests that haven't timed out & failed will still have the old
# client ID; we need to restart those requests with the new ID
@@ -33,7 +33,7 @@ def initialize(reactor, handler, service_name, sequence_id, payload, options)
def resend
@try_count += 1
- @reactor.log(:info, "Request id [#{@sequence_id.inspect}] sent [#{@try_count}] times.")
+ @reactor.log(:info, "#{self.class}, Request id [#{@sequence_id.inspect}] sent [#{@try_count}] times.")
@handler.send_request(@service_name, @sequence_id, @payload)
set_timer if timeout_desired?
end
@@ -48,9 +48,14 @@ def process_reply(reply)
cancel_timer
if reply.success_reply?
+ @reactor.log(:debug, "#{self.class}, Processing successful reply.")
@handler.on_success(self, reply)
elsif reply.failure_reply?
+ @reactor.log(:debug, "#{self.class}, Processing failed reply.")
@handler.on_failure(self, reply)
+ else
+ @reactor.log(:error, "#{self.class}, Processing UNKNOWN reply.")
+ @reactor.log(:error, reply.inspect)
end
end
@@ -61,10 +66,10 @@ def retries_exceeded?
def on_timeout
@failed_at = Time.now
@timer = nil
- @reactor.log(:warn, "Request id [#{@sequence_id.inspect}] timed out at [#{@failed_at}].")
+ @reactor.log(:warn, "#{self.class}, Request id [#{@sequence_id.inspect}] timed out at [#{@failed_at}].")
if retries_exceeded?
- @reactor.log(:warn, "Request id [#{@sequence_id.inspect}] retries exceeded; force fail.")
+ @reactor.log(:warn, "#{self.class}, Request id [#{@sequence_id.inspect}] retries exceeded; force fail.")
@handler.on_failure(self)
else
resend
@@ -24,19 +24,21 @@ def add(message, options)
def process_reply(reply)
if (request = find_active(reply.sequence_id))
- @reactor.log(:debug, "Client found a request to match reply id #{reply.sequence_id.inspect}")
+ @reactor.log(:debug, "#{self.class}, Client found a request to match reply id #{reply.sequence_id.inspect}")
request.process_reply(reply)
close(request)
else
- @reactor.log(:warn, "Client could not find a request to match reply id #{reply.sequence_id.inspect}")
- #@reactor.log(:debug, @requests.keys.inspect)
+ @reactor.log(:warn, "#{self.class}, Client could not find a request to match reply id #{reply.sequence_id.inspect}")
+ @reactor.log(:debug, @requests.keys.inspect)
end
end
# Replaces each active request with a copy of the original request
# with a new sequence_id.
#
def restart_all_with_client_id(client_id)
+ @reactor.log(:warn, "#{self.class}, Restarting all requests with new client id [#{client_id}]")
+
@requests.keys.each_with_index do |key, new_sequence_number|
# create a new request from the current one but with the revised client_id and sequence_number
request = @requests[key]
@@ -9,16 +9,16 @@ class Handler < RzmqBrokers::Broker::Handler
def dispatch_client_work(message)
service = @services.find_service_by_name(message.service_name)
if service && service.ready? && service.request_ok?(message)
- @reactor.log(:info, "MajordomoBroker, adding client request.")
+ @reactor.log(:info, "#{self.class}, Adding client request.")
# send request to workers
service.add_request(message)
else
# tell client the request failed
if !service.request_ok?(message)
- @reactor.log(:warn, "MajordomoBroker, request was rejected; failed!")
+ @reactor.log(:warn, "#{self.class}, Request was rejected; failed!")
else
- @reactor.log(:warn, "MajordomoBroker, no service to handle request; failed!")
+ @reactor.log(:warn, "#{self.class}, No service to handle request; failed!")
end
send_client_failure(message.envelope_msgs, message)
@@ -26,7 +26,7 @@ def initialize(configuration)
def on_read(socket, messages, envelope)
message = @base_msg_klass.create_from(messages, envelope)
- @reactor.log(:debug, "Worker, reading message #{message.inspect}")
+ @reactor.log(:debug, "#{self.class}, Reading message #{message.inspect}")
@hb_received_at = Time.now # receiving *any* message resets the heartbeat timer
@@ -41,15 +41,15 @@ def on_read(socket, messages, envelope)
def send_readiness_to_broker
message = @worker_ready_msg_klass.new(@service_name, @heartbeat_interval, @heartbeat_retries)
- @reactor.log(:info, "Worker, sending READY for service [#{@service_name}] with HB interval [#{@heartbeat_interval}] and retries [#{@heartbeat_retries}]")
+ @reactor.log(:info, "#{self.class}, Sending READY for service [#{@service_name}] with HB interval [#{@heartbeat_interval}] and retries [#{@heartbeat_retries}]")
write_messages(@base_msg_klass.delimiter + message.to_msgs)
start_heartbeat
start_broker_timer
end
def disconnect_from_broker
message = @worker_disconnect_msg_klass.new(@service_name)
- @reactor.log(:info, "Worker, sending DISCONNECT for [#{@service_name}]; canceling broker timer.")
+ @reactor.log(:info, "#{self.class}, Sending DISCONNECT for [#{@service_name}]; canceling broker timer.")
write_messages(@base_msg_klass.delimiter + message.to_msgs)
@broker_timer.cancel
end
@@ -58,22 +58,22 @@ def disconnect_from_broker
# determine Broker health.
#
def process_heartbeat(message)
- @reactor.log(:debug, "Worker received HB from broker at [#{@hb_received_at}].")
+ @reactor.log(:debug, "#{self.class}, Worker received HB from broker at [#{@hb_received_at}].")
end
def process_request(message)
- @reactor.log(:debug, "Worker received work request.")
+ @reactor.log(:debug, "#{self.class}, Worker received work request.")
@on_request.call(self, message)
end
def send_success_reply_to_broker(sequence_id, payload)
- @reactor.log(:debug, "Worker sending a successful reply to broker.")
+ @reactor.log(:debug, "#{self.class}, Worker sending a successful reply to broker.")
reply = @base_msg_klass.delimiter + @worker_reply_success_msg_klass.new(sequence_id, payload).to_msgs
write_messages(reply)
end
def send_failure_reply_to_broker(sequence_id, payload)
- @reactor.log(:debug, "Worker sending a failure reply to broker.")
+ @reactor.log(:debug, "#{self.class}, Worker sending a failure reply to broker.")
reply = @base_msg_klass.delimiter + @worker_reply_failure_msg_klass.new(sequence_id, payload).to_msgs
write_messages(reply)
end
@@ -83,16 +83,16 @@ def send_failure_reply_to_broker(sequence_id, payload)
# and open a new one.
def broker_check
unless ((Time.now - @hb_received_at) * 1_000) <= (@heartbeat_interval * @heartbeat_retries)
- @reactor.log(:warn, "Worker [#{@service_name}] is missing expected heartbeats from broker! Broker timeout!")
- @reactor.log(:warn, "Worker [#{@service_name}] last saw a heartbeat at [#{@hb_received_at}] and is now [#{Time.now}]")
+ @reactor.log(:warn, "#{self.class}, Worker [#{@service_name}] is missing expected heartbeats from broker! Broker timeout!")
+ @reactor.log(:warn, "#{self.class}, Worker [#{@service_name}] last saw a heartbeat at [#{@hb_received_at}] and is now [#{Time.now}]")
reconnect_broker
else
- @reactor.log(:info, "Worker [#{@service_name}] sees a healthy broker, time now [#{Time.now}]")
+ @reactor.log(:info, "#{self.class}, Worker [#{@service_name}] sees a healthy broker, time now [#{Time.now}]")
end
end
def succeeded(sequence_id, payload)
- @reactor.log(:debug, "Worker, sending a success reply.")
+ @reactor.log(:debug, "#{self.class}, Sending a success reply.")
send_success_reply_to_broker(sequence_id, payload)
end
@@ -106,7 +106,7 @@ def failed(sequence_id, payload)
def write_messages(messages)
@hb_sent_at = Time.now
- @reactor.log(:debug, "Worker, sending a message and updating its hb_sent_at to [#{@hb_sent_at}].")
+ @reactor.log(:debug, "#{self.class}, Sending a message and updating its hb_sent_at to [#{@hb_sent_at}].")
write(messages)
end
@@ -126,7 +126,7 @@ def send_heartbeat
# messages sent to the broker (READY, REPLY) are equivalent to heartbeats
if ((Time.now - @hb_sent_at) * 1_000) >= @heartbeat_interval
message = @worker_heartbeat_msg_klass.new
- @reactor.log(:debug, "Worker, sending HEARTBEAT")
+ @reactor.log(:debug, "#{self.class}, Sending HEARTBEAT")
write_messages(@base_msg_klass.delimiter + message.to_msgs)
end
end
@@ -136,7 +136,7 @@ def start_broker_timer
end
def reconnect_broker
- @reactor.log(:warn, "Worker exceeded broker tries; reconnecting!")
+ @reactor.log(:warn, "#{self.class}, Worker exceeded broker tries; reconnecting!")
@heartbeat_timer.cancel if @heartbeat_timer
@broker_timer.cancel if @broker_timer
reopen_socket

0 comments on commit aaaf307

Please sign in to comment.