-
-
Notifications
You must be signed in to change notification settings - Fork 3.7k
/
huginn_scheduler.rb
212 lines (175 loc) · 5.15 KB
/
huginn_scheduler.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
require 'rufus/scheduler'
class Rufus::Scheduler
SCHEDULER_AGENT_TAG = Agents::SchedulerAgent.name
class Job
# Store an ID of SchedulerAgent in this job.
def scheduler_agent_id=(id)
self[:scheduler_agent_id] = id
end
# Extract an ID of SchedulerAgent if any.
def scheduler_agent_id
self[:scheduler_agent_id]
end
# Return a SchedulerAgent tied to this job. Return nil if it is
# not found or disabled.
def scheduler_agent
agent_id = scheduler_agent_id or return nil
Agent.of_type(Agents::SchedulerAgent).active.find_by(id: agent_id)
end
end
# Get all jobs tied to any SchedulerAgent
def scheduler_agent_jobs
jobs(tag: SCHEDULER_AGENT_TAG)
end
# Get a job tied to a given SchedulerAgent
def scheduler_agent_job(agent)
scheduler_agent_jobs.find { |job|
job.scheduler_agent_id == agent.id
}
end
# Schedule or reschedule a job for a given SchedulerAgent and return
# the running job. Return nil if unscheduled.
def schedule_scheduler_agent(agent)
job = scheduler_agent_job(agent)
if agent.unavailable?
if job
puts "Unscheduling SchedulerAgent##{agent.id} (disabled)"
job.unschedule
end
nil
else
if job
return job if agent.memory['scheduled_at'] == job.scheduled_at.to_i
puts "Rescheduling SchedulerAgent##{agent.id}"
job.unschedule
else
puts "Scheduling SchedulerAgent##{agent.id}"
end
agent_id = agent.id
job = schedule_cron agent.options['schedule'], tag: SCHEDULER_AGENT_TAG do |job|
job.scheduler_agent_id = agent_id
if scheduler_agent = job.scheduler_agent
scheduler_agent.control!
else
puts "Unscheduling SchedulerAgent##{job.scheduler_agent_id} (disabled or deleted)"
job.unschedule
end
end
# Make sure the job is associated with a SchedulerAgent before
# it is triggered.
job.scheduler_agent_id = agent_id
agent.memory['scheduled_at'] = job.scheduled_at.to_i
agent.save
job
end
end
# Schedule or reschedule jobs for all SchedulerAgents and unschedule
# orphaned jobs if any.
def schedule_scheduler_agents
scheduled_jobs = Agent.of_type(Agents::SchedulerAgent).map { |scheduler_agent|
scheduler_agent.with_lock do
schedule_scheduler_agent(scheduler_agent)
end
}.compact
(scheduler_agent_jobs - scheduled_jobs).each { |job|
puts "Unscheduling SchedulerAgent##{job.scheduler_agent_id} (orphaned)"
job.unschedule
}
end
end
class HuginnScheduler < LongRunnable::Worker
include LongRunnable
FAILED_JOBS_TO_KEEP = 100
SCHEDULE_TO_CRON = {
'1m' => '*/1 * * * *',
'2m' => '*/2 * * * *',
'5m' => '*/5 * * * *',
'10m' => '*/10 * * * *',
'30m' => '*/30 * * * *',
'1h' => '0 * * * *',
'2h' => '0 */2 * * *',
'5h' => '0 */5 * * *',
'12h' => '0 */12 * * *',
'1d' => '0 0 * * *',
'2d' => '0 0 */2 * *',
'7d' => '0 0 * * 1',
}
def setup
tzinfo_friendly_timezone = ActiveSupport::TimeZone::MAPPING[ENV['TIMEZONE'].presence || "Pacific Time (US & Canada)"]
# Schedule event propagation.
every '1m' do
propagate!
end
# Schedule event cleanup.
every ENV['EVENT_EXPIRATION_CHECK'].presence || '6h' do
cleanup_expired_events!
end
# Schedule failed job cleanup.
every '1h' do
cleanup_failed_jobs!
end
# Schedule repeating events.
SCHEDULE_TO_CRON.keys.each do |schedule|
cron "#{SCHEDULE_TO_CRON[schedule]} #{tzinfo_friendly_timezone}" do
run_schedule "every_#{schedule}"
end
end
# Schedule events for specific times.
24.times do |hour|
cron "0 #{hour} * * * " + tzinfo_friendly_timezone do
run_schedule hour_to_schedule_name(hour)
end
end
# Schedule Scheduler Agents
every '1m' do
@scheduler.schedule_scheduler_agents
end
end
def run
@scheduler.join
end
def self.setup_worker
[new(id: self.to_s)]
end
private
def run_schedule(time)
with_mutex do
puts "Queuing schedule for #{time}"
AgentRunScheduleJob.perform_later(time)
end
end
def propagate!
with_mutex do
return unless AgentPropagateJob.can_enqueue?
puts "Queuing event propagation"
AgentPropagateJob.perform_later
end
end
def cleanup_expired_events!
with_mutex do
puts "Running event cleanup"
AgentCleanupExpiredJob.perform_later
end
end
def cleanup_failed_jobs!
num_to_keep = (ENV['FAILED_JOBS_TO_KEEP'].presence || FAILED_JOBS_TO_KEEP).to_i
first_to_delete = Delayed::Job.where.not(failed_at: nil).order("failed_at DESC").offset(num_to_keep).limit(1).pluck(:failed_at).first
Delayed::Job.where(["failed_at <= ?", first_to_delete]).delete_all if first_to_delete.present?
end
def hour_to_schedule_name(hour)
if hour == 0
"midnight"
elsif hour < 12
"#{hour}am"
elsif hour == 12
"noon"
else
"#{hour - 12}pm"
end
end
def with_mutex(&block)
mutex.synchronize do
ActiveRecord::Base.connection_pool.with_connection(&block)
end
end
end