Skip to content

Commit

Permalink
don't use connection_pool with sidekiq #446
Browse files Browse the repository at this point in the history
  • Loading branch information
Martin Fenner committed Apr 9, 2016
1 parent 4be90dd commit c77238e
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 91 deletions.
56 changes: 26 additions & 30 deletions app/jobs/agent_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,8 @@ class AgentJob < ActiveJob::Base
end

def perform(agent, options={})
ActiveRecord::Base.connection_pool.with_connection do
if options[:ids].present?
Array(options[:ids]).each do |id|
# check for failed queries and rate-limiting
agent.work_after_check
fail AgentInactiveError, "#{agent.title} is not in working state" unless agent.working?

# observe rate-limiting settings, put back in queue if wait time is more than 5 sec
wait_time = agent.wait_time
fail TooManyRequestsError, "Wait time too long (#{wait_time.to_i} sec) for #{agent.title}" if wait_time > 5

sleep wait_time

work = Work.where(id: id).first
fail ActiveRecord::RecordNotFound if work.nil?

# store API response result and duration in api_responses table
response = { work_id: id, agent_id: agent.id }
ActiveSupport::Notifications.instrument("api_response.get") do |payload|
response.merge!(agent.collect_data(options.merge(work_id: id)))
payload.merge!(response)
end
end
else
if options[:ids].present?
Array(options[:ids]).each do |id|
# check for failed queries and rate-limiting
agent.work_after_check
fail AgentInactiveError, "#{agent.title} is not in working state" unless agent.working?
Expand All @@ -56,20 +34,38 @@ def perform(agent, options={})

sleep wait_time

work = Work.where(id: id).first
fail ActiveRecord::RecordNotFound if work.nil?

# store API response result and duration in api_responses table
response = { agent_id: agent.id }
response = { work_id: id, agent_id: agent.id }
ActiveSupport::Notifications.instrument("api_response.get") do |payload|
response.merge!(agent.collect_data(options))
response.merge!(agent.collect_data(options.merge(work_id: id)))
payload.merge!(response)
end
end
else
# check for failed queries and rate-limiting
agent.work_after_check
fail AgentInactiveError, "#{agent.title} is not in working state" unless agent.working?

# observe rate-limiting settings, put back in queue if wait time is more than 5 sec
wait_time = agent.wait_time
fail TooManyRequestsError, "Wait time too long (#{wait_time.to_i} sec) for #{agent.title}" if wait_time > 5

sleep wait_time

# store API response result and duration in api_responses table
response = { agent_id: agent.id }
ActiveSupport::Notifications.instrument("api_response.get") do |payload|
response.merge!(agent.collect_data(options))
payload.merge!(response)
end
end
end

after_perform do |job|
ActiveRecord::Base.connection_pool.with_connection do
agent, _options = job.arguments
agent.wait_after_check
end
agent, _options = job.arguments
agent.wait_after_check
end
end
28 changes: 13 additions & 15 deletions app/jobs/api_snapshot_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,21 @@ class ApiSnapshotJob < ActiveJob::Base
queue_as :high

def perform(options={})
ActiveRecord::Base.connection_pool.with_connection do
id = options[:id] || raise(ArgumentError, "Must supply :id")
upload_on_finished = options[:upload_on_finished]
id = options[:id] || raise(ArgumentError, "Must supply :id")
upload_on_finished = options[:upload_on_finished]

api_snapshot = ApiSnapshot.find_by_id!(options[:id])
api_snapshot.snapshot!
api_snapshot = ApiSnapshot.find_by_id!(options[:id])
api_snapshot.snapshot!

if !api_snapshot.finished?
api_snapshot.update_attributes!(
start_page: api_snapshot.pageno + 1,
mode: ApiSnapshot::APPEND_MODE
)
ApiSnapshotJob.perform_later(id: api_snapshot.id)
elsif upload_on_finished
ApiSnapshotUtility.zip(api_snapshot)
ApiSnapshotUtility.export_to_zenodo(api_snapshot)
end
if !api_snapshot.finished?
api_snapshot.update_attributes!(
start_page: api_snapshot.pageno + 1,
mode: ApiSnapshot::APPEND_MODE
)
ApiSnapshotJob.perform_later(id: api_snapshot.id)
elsif upload_on_finished
ApiSnapshotUtility.zip(api_snapshot)
ApiSnapshotUtility.export_to_zenodo(api_snapshot)
end
rescue Exception => ex
Notification.create(exception: ex, details: options.inspect)
Expand Down
16 changes: 6 additions & 10 deletions app/jobs/cache_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,13 @@ class CacheJob < ActiveJob::Base

end

# rescue_from StandardError do |exception|
# ActiveRecord::Base.connection_pool.with_connection do
# Notification.where(message: exception.message).where(unresolved: true).first_or_create(
# exception: exception,
# class_name: exception.class.to_s)
# end
# end
rescue_from StandardError do |exception|
Notification.where(message: exception.message).where(unresolved: true).first_or_create(
exception: exception,
class_name: exception.class.to_s)
end

def perform(resource)
ActiveRecord::Base.connection_pool.with_connection do
resource.write_cache
end
resource.write_cache
end
end
6 changes: 2 additions & 4 deletions app/jobs/data_export_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ class DataExportJob < ActiveJob::Base
queue_as :high

def perform(options={})
ActiveRecord::Base.connection_pool.with_connection do
data_export = DataExport.find_by_id!(options[:id])
data_export.export!
end
data_export = DataExport.find_by_id!(options[:id])
data_export.export!
rescue Exception => ex
Notification.create(exception: ex)
end
Expand Down
6 changes: 2 additions & 4 deletions app/jobs/delete_canonical_url_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ class DeleteCanonicalUrlJob < ActiveJob::Base
queue_as :high

def perform(source)
ActiveRecord::Base.connection_pool.with_connection do
# reset all canonical urls
Work.update_all(canonical_url: nil)
end
# reset all canonical urls
Work.update_all(canonical_url: nil)
end
end
10 changes: 4 additions & 6 deletions app/jobs/delete_work_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ class DeleteWorkJob < ActiveJob::Base
queue_as :high

def perform(options = {})
ActiveRecord::Base.connection_pool.with_connection do
collection = Work
collection = collection.where(publisher_id: options[:publisher_id]) unless options[:publisher_id] == "all"
collection = collection.where(source_id: options[:source_id]) unless options[:source_id] == "all"
collection.destroy_all
end
collection = Work
collection = collection.where(publisher_id: options[:publisher_id]) unless options[:publisher_id] == "all"
collection = collection.where(source_id: options[:source_id]) unless options[:source_id] == "all"
collection.destroy_all
end
end
4 changes: 1 addition & 3 deletions app/jobs/deposit_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ class DepositJob < ActiveJob::Base
end

def perform(deposit)
ActiveRecord::Base.connection_pool.with_connection do

This comment has been minimized.

Copy link
@afandian

afandian Apr 11, 2016

Contributor

Why did you remove the database pool for all of the Sidekiq jobs? Is it because of all the 'failed to get a connection' issues?

I think I found a bug, which was a failure to call ActiveRecord::Base.clear_active_connections! on the rescue_from StandardError handler, which might have been responsible for the problem.

This comment has been minimized.

Copy link
@mfenner

mfenner Apr 11, 2016

Member

I implemented this call (ActiveRecord::Base.connection_pool.with_connection) a few months ago trying to fix a different set of database connection problems, where the database was running out of connections. So far it seems that this functionality isn't really needed, as long as you have a reasonable number of database connections available in your pool (currently defaults to 100).

My current thinking is that we put too much stress on the database with the workers rather than a specific bug.

This comment has been minimized.

Copy link
@afandian

afandian Apr 11, 2016

Contributor

To be clear, is the pool still being used with the ActiveRecord::Base.connection_pool.with_connection removed?

This comment has been minimized.

Copy link
@mfenner

mfenner Apr 11, 2016

Member

I am still testing whether this change makes any difference. So far it seems that the number of open connection has gone up, but no reduction in database connection timeout errors. The pool is still being used.

Background: sidekiq/sidekiq#1047

deposit.process_data
end
deposit.process_data
end
end
4 changes: 1 addition & 3 deletions app/jobs/deposit_reprocess_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ class DepositReprocessJob < ActiveJob::Base
end

def perform(ids)
ActiveRecord::Base.connection_pool.with_connection do
Deposit.where(id: ids).find_each { |deposit| deposit.reset }
end
Deposit.where(id: ids).find_each { |deposit| deposit.reset }
end
end
12 changes: 4 additions & 8 deletions app/jobs/relation_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,12 @@ class RelationJob < ActiveJob::Base
end

rescue_from StandardError do |exception|
ActiveRecord::Base.connection_pool.with_connection do
Notification.where(message: exception.message).where(unresolved: true).first_or_create(
exception: exception,
class_name: exception.class.to_s)
end
Notification.where(message: exception.message).where(unresolved: true).first_or_create(
exception: exception,
class_name: exception.class.to_s)
end

def perform
ActiveRecord::Base.connection_pool.with_connection do
Relation.set_month_id
end
Relation.set_month_id
end
end
12 changes: 4 additions & 8 deletions app/jobs/status_cache_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,12 @@ class StatusCacheJob < ActiveJob::Base
end

rescue_from StandardError do |exception|
ActiveRecord::Base.connection_pool.with_connection do
Notification.where(message: exception.message).where(unresolved: true).first_or_create(
exception: exception,
class_name: exception.class.to_s)
end
Notification.where(message: exception.message).where(unresolved: true).first_or_create(
exception: exception,
class_name: exception.class.to_s)
end

def perform
ActiveRecord::Base.connection_pool.with_connection do
Status.create
end
Status.create
end
end

0 comments on commit c77238e

Please sign in to comment.