Skip to content

Commit

Permalink
Fixed listener job parsing test
Browse files Browse the repository at this point in the history
  • Loading branch information
Roy committed May 4, 2023
1 parent b09e224 commit 9418fe7
Showing 1 changed file with 15 additions and 11 deletions.
26 changes: 15 additions & 11 deletions lib/exekutor/internal/listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def initialize(worker_id:, provider:, pool:, queues: nil, wait_timeout: 60, set_
@thread_running = Concurrent::AtomicBoolean.new false
@listening = Concurrent::AtomicBoolean.new false
end

# rubocop:enable Metrics/ParameterLists

# Starts the listener
Expand Down Expand Up @@ -124,7 +125,11 @@ def wait_for_jobs(connection)
throw :shutdown unless running?
next unless channel == JOB_ENQUEUED_CHANNEL

job_info = JobParser.parse(payload)
job_info = begin
JobParser.parse(payload)
rescue StandardError => e
logger.error e.message
end
next unless job_info && listening_to_queue?(job_info["q"])

@provider.update_earliest_scheduled_at(job_info["t"].to_f)
Expand Down Expand Up @@ -175,17 +180,16 @@ class JobParser
JOB_INFO_KEYS = %w[id q t].freeze

def self.parse(payload)
job_info = payload.split(";").to_h { |el| el.split(":") }
if JOB_INFO_KEYS.all? { |n| job_info[n].present? }
job_info
else
missing_keys = JOB_INFO_KEYS.select { |n| job_info[n].blank? }.join(", ")
logger.error "[Listener] Notification payload is missing #{missing_keys}"
nil
job_info = begin
payload.split(";").to_h { |el| el.split(":") }
rescue StandardError
raise Error, "Invalid notification payload: #{payload}"
end
if (missing_keys = JOB_INFO_KEYS.select { |n| job_info[n].blank? }).present?
raise Error, "[Listener] Notification payload is missing #{missing_keys.join(", ")}"
end
rescue StandardError
logger.error "Invalid notification payload: #{payload}"
nil

job_info
end
end

Expand Down

0 comments on commit 9418fe7

Please sign in to comment.