/
defer.rb
146 lines (126 loc) · 3.46 KB
/
defer.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# frozen_string_literal: true
require "weakref"
module Scheduler
module Deferrable
DEFAULT_TIMEOUT ||= 90
STATS_CACHE_SIZE ||= 100
def initialize
@async = !Rails.env.test?
@queue =
WorkQueue::ThreadSafeWrapper.new(
WorkQueue::FairQueue.new(:site, 500) do
WorkQueue::FairQueue.new(:user, 100) { WorkQueue::BoundedQueue.new(50) }
end,
)
@mutex = Mutex.new
@stats_mutex = Mutex.new
@paused = false
@thread = nil
@reactor = nil
@timeout = DEFAULT_TIMEOUT
@stats = LruRedux::ThreadSafeCache.new(STATS_CACHE_SIZE)
end
def timeout=(t)
@mutex.synchronize { @timeout = t }
end
def length
@queue.size
end
def stats
@stats_mutex.synchronize { @stats.to_a }
end
def pause
stop!
@paused = true
end
def resume
@paused = false
end
# for test and sidekiq
def async=(val)
@async = val
end
def later(
desc = nil,
db = RailsMultisite::ConnectionManagement.current_db,
force: true,
current_user: nil,
&blk
)
@stats_mutex.synchronize do
stats = (@stats[desc] ||= { queued: 0, finished: 0, duration: 0, errors: 0 })
stats[:queued] += 1
end
if @async
start_thread if !@thread&.alive? && !@paused
@queue.push({ site: db, user: current_user, db: db, job: blk, desc: desc }, force: force)
else
blk.call
end
end
def stop!
@thread.kill if @thread&.alive?
@thread = nil
@reactor&.stop
@reactor = nil
end
# test only
def stopped?
!@thread&.alive?
end
def do_all_work
do_work(non_block = true) while !@queue.empty?
end
private
def start_thread
@mutex.synchronize do
@reactor = MessageBus::TimerThread.new if !@reactor
@thread =
Thread.new do
@thread.abort_on_exception = true if Rails.env.test?
do_work while true
end if !@thread&.alive?
end
end
# using non_block to match Ruby #deq
def do_work(non_block = false)
db, job, desc = @queue.shift(block: !non_block).values_at(:db, :job, :desc)
start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
db ||= RailsMultisite::ConnectionManagement::DEFAULT
RailsMultisite::ConnectionManagement.with_connection(db) do
begin
warning_job =
@reactor.queue(@timeout) do
Rails.logger.error "'#{desc}' is still running after #{@timeout} seconds on db #{db}, this process may need to be restarted!"
end if !non_block
job.call
rescue => ex
@stats_mutex.synchronize do
stats = @stats[desc]
stats[:errors] += 1 if stats
end
Discourse.handle_job_exception(ex, message: "Running deferred code '#{desc}'")
ensure
warning_job&.cancel
end
end
rescue => ex
Discourse.handle_job_exception(ex, message: "Processing deferred code queue")
ensure
ActiveRecord::Base.connection_handler.clear_active_connections!
if start
@stats_mutex.synchronize do
stats = @stats[desc]
if stats
stats[:finished] += 1
stats[:duration] += Process.clock_gettime(Process::CLOCK_MONOTONIC) - start
end
end
end
end
end
class Defer
extend Deferrable
initialize
end
end