Skip to content

Commit

Permalink
Merge 1330c0d into 5ac5ade
Browse files Browse the repository at this point in the history
  • Loading branch information
wshihadeh committed Oct 20, 2020
2 parents 5ac5ade + 1330c0d commit 7aa2f84
Show file tree
Hide file tree
Showing 11 changed files with 557 additions and 82 deletions.
15 changes: 15 additions & 0 deletions README.md
Expand Up @@ -77,6 +77,21 @@ config.active_job.queue_adapter = :delayed_job

See the [rails guide](http://guides.rubyonrails.org/active_job_basics.html#setting-the-backend) for more details.


Json Log Subscriber
==========
To switch to Json Log Subscriber, set the logging_format in config/application.rb

```ruby
config.delayed_job.logging_format = 'json'
```

Or use DELAYED_JOB_LOG_FORMAT Environment variable

```ruby
export DELAYED_JOB_LOG_FORMAT = 'json'
```

Rails 4.x
=========
If you are using the protected_attributes gem, it must appear before delayed_job in your gemfile. If your jobs are failing with:
Expand Down
18 changes: 18 additions & 0 deletions lib/delayed/json_formatter.rb
@@ -0,0 +1,18 @@
require 'json'

module Delayed
class JsonFormatter
def call(severity, timestamp, progname, msg)
json = {:level => severity, :timestamp => timestamp.to_s}
json = json.merge(prosses_message(msg))
json = json.merge(:progname => progname.to_s) unless progname.nil?

json.to_json + "\n"
end

def prosses_message(msg)
return msg if msg.is_a?(Hash)
{:message => msg.strip}
end
end
end
105 changes: 105 additions & 0 deletions lib/delayed/json_log_subscriber.rb
@@ -0,0 +1,105 @@
module Delayed
class JsonLogSubscriber < ActiveSupport::LogSubscriber
class << self
def logger
@logger ||= Delayed::Worker.logger
@logger.formatter = Delayed::JsonFormatter.new
@logger
end
end

def logger
JsonLogSubscriber.logger
end

def starting(event)
dynamic_log(say({:log_event => 'starting', :message => 'Starting job worker'}, event.payload))
end

def consecutive_failures(event)
text = "FAILED permanently because of #{event.payload[:consecutive_attempts]} consecutive failures"
error(job_say({:log_event => 'consecutive_failures', :message => text, :consecutive_attempts => event.payload[:consecutive_attempts]}, event.payload))
end

def running(event)
dynamic_log(job_say({:log_event => 'running', :message => 'RUNNING'}, event.payload))
end

def completed(event)
runtime = format('%.4f', event.payload[:runtime])
dynamic_log(job_say({:log_event => 'completed', :message => "COMPLETED after #{runtime}", :runtime => runtime}, event.payload))
end

def failed_permanently(event)
text = "FAILED permanently with #{event.payload[:error_name]}: #{event.payload[:error_message]}"
error(job_say({:log_event => 'failed_permanently', :message => text, :error_name => event.payload[:error_name], :error_message => event.payload[:error_message]}, event.payload))
end

def failed(event)
text = "FAILED (#{event.payload[:attempts]} prior attempts) with #{event.payload[:error_name]}: #{event.payload[:error_message]}"
error(job_say({:log_event => 'failed', :message => text, :attempts => event.payload[:attempts], :error_name => event.payload[:error_name], :error_message => event.payload[:error_message]}, event.payload))
end

def exiting(event)
dynamic_log(say({:log_event => 'exiting', :message => 'Exiting...'}, event.payload))
end

def no_jobs_available(event)
dynamic_log(say({:log_event => 'no_jobs_available', :message => 'No more jobs available. Exiting'}, event.payload))
end

def jobs_processed(event)
rate = format('%.4f', event.payload[:count] / event.payload[:realtime])
text = format("#{event.payload[:count]} jobs processed at #{rate} j/s, %d failed", event.payload[:faild])
dynamic_log(say({:log_event => 'jobs_processed', :message => text, :count => event.payload[:count], :rate => rate, :faild_no => event.payload[:faild]}, event.payload))
end

def failure_callback_error(event)
text = "Error when running failure callback: #{event.payload[:error]}"
error(say({:log_event => 'failure_callback_error', :message => text, :error_message => event.payload[:error]}, event.payload))
end

def error_backtrace(event)
error(say({:log_event => 'error_backtrace', :message => event.payload[:error_backtrace], :error_backtrace => event.payload[:error_backtrace]}, event.payload))
end

def reserving_error(event)
text = "Error while reserving job: #{event.payload[:error]}"
error(say({:log_event => 'reserving_error', :message => text, :error_message => event.payload[:error]}, event.payload))
end

private

def dynamic_log(log)
logger.send(log_level, log)
end

def log_level
level = Delayed::Worker.default_log_level
unless level.is_a?(String)
say 'Usage of Fixnum log levels is deprecated'
level = Delayed::Worker::DEFAULT_LOG_LEVEL
end
level
end

def job_say(log, payload = {})
log[:message] = "Job #{payload[:dj_name]} (id=#{payload[:dj_id]})#{say_queue(payload[:dj_queue])} #{log[:message]}"
log[:name] = payload[:dj_name]
log[:id] = payload[:dj_id]
log[:queue] = payload[:dj_queue]
say(log, payload)
end

def say(log, payload = {})
log[:message] = "#{payload[:dj_time]}: [Worker(#{payload[:dj_worker]})] #{log[:message]}"
log[:dj_worker] = payload[:dj_worker]
log[:dj_time] = payload[:dj_time]
log
end

def say_queue(queue)
" (queue=#{queue})" if queue
end
end
end
97 changes: 97 additions & 0 deletions lib/delayed/log_subscriber.rb
@@ -0,0 +1,97 @@
module Delayed
class LogSubscriber < ActiveSupport::LogSubscriber
class << self
def logger
@logger ||= Delayed::Worker.logger
end
end

def logger
LogSubscriber.logger
end

def starting(event)
dynamic_log(say('Starting job worker', event.payload))
end

def consecutive_failures(event)
text = "FAILED permanently because of #{event.payload[:consecutive_attempts]} consecutive failures"
error(job_say(text, event.payload))
end

def running(event)
dynamic_log(job_say('RUNNING', event.payload))
end

def completed(event)
dynamic_log(job_say(format('COMPLETED after %.4f', event.payload[:runtime]), event.payload))
end

def failed_permanently(event)
text = "FAILED permanently with #{event.payload[:error_name]}: #{event.payload[:error_message]}"
error(job_say(text, event.payload))
end

def failed(event)
text = "FAILED (#{event.payload[:attempts]} prior attempts) with #{event.payload[:error_name]}: #{event.payload[:error_message]}"
error(job_say(text, event.payload))
end

def exiting(event)
dynamic_log(say('Exiting...', event.payload))
end

def no_jobs_available(event)
dynamic_log(say('No more jobs available. Exiting', event.payload))
end

def jobs_processed(event)
text = format("#{event.payload[:count]} jobs processed at %.4f j/s, %d failed", event.payload[:count] / event.payload[:realtime], event.payload[:faild])
dynamic_log(say(text, event.payload))
end

def failure_callback_error(event)
text = "Error when running failure callback: #{event.payload[:error]}"
error(say(text, event.payload))
end

def error_backtrace(event)
error(say(event.payload[:error_backtrace], event.payload))
end

def reserving_error(event)
text = "Error while reserving job: #{event.payload[:error]}"
error(say(text, event.payload))
end

private

def dynamic_log(text)
logger.send(log_level, text)
end

def log_level
level = Delayed::Worker.default_log_level
unless level.is_a?(String)
say 'Usage of Fixnum log levels is deprecated'
level = Delayed::Worker::DEFAULT_LOG_LEVEL
end
level
end

def job_say(text, payload = {})
text = "Job #{payload[:dj_name]} (id=#{payload[:dj_id]})#{say_queue(payload[:dj_queue])} #{text}"
say(text, payload)
end

def say(text, payload = {})
text = "[Worker(#{payload[:dj_worker]})] #{text}"
puts text unless payload[:quiet]
"#{payload[:dj_time]}: #{text}"
end

def say_queue(queue)
" (queue=#{queue})" if queue
end
end
end
14 changes: 14 additions & 0 deletions lib/delayed/railtie.rb
Expand Up @@ -3,16 +3,30 @@

module Delayed
class Railtie < Rails::Railtie
config.delayed_job = ::ActiveSupport::OrderedOptions.new

initializer :after_initialize do
Delayed::Worker.logger ||= if defined?(Rails)
Rails.logger
elsif defined?(RAILS_DEFAULT_LOGGER)
RAILS_DEFAULT_LOGGER
end

config.after_initialize do
if json_logging?(config)
JsonLogSubscriber.attach_to(:delayed_job)
else
LogSubscriber.attach_to(:delayed_job)
end
end
end

rake_tasks do
load 'delayed/tasks.rb'
end

def json_logging?(config)
config.delayed_job.logging_format == 'json' || (ENV.fetch('DELAYED_JOB_LOG_FORMAT', 'text') == 'json')
end
end
end

0 comments on commit 7aa2f84

Please sign in to comment.