/
persister.rb
154 lines (135 loc) · 4.97 KB
/
persister.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
require 'zlib'
require 'activerecord-import'
require 'active_support/inflector'
ActiveSupport::Inflector::Inflections.instance.singular('Slaves', 'Slave')
ActiveSupport::Inflector::Inflections.instance.singular('slaves', 'slave')
ActiveRecord::Base.include_root_in_json = false
ActiveRecord::Base.default_timezone = :utc
module RRRSpec
module Server
module Persister
SLAVE_EXIT_WAIT_TIME = 15
PERSISTED_RESIDUE_SEC = 60
module_function
def work_loop
loop { work }
end
def work
taskset = PersisterQueue.dequeue
ActiveRecord::Base.connection_pool.with_connection do
return if Persistence::Taskset.where(key: taskset.key).exists?
end
sleep SLAVE_EXIT_WAIT_TIME
ActiveRecord::Base.connection_pool.with_connection do
persist(taskset)
if RRRSpec.configuration.json_cache_path
create_api_cache(taskset, RRRSpec.configuration.json_cache_path)
end
taskset.expire(PERSISTED_RESIDUE_SEC)
update_estimate_sec(taskset)
end
rescue
RRRSpec.logger.error($!)
end
private
module_function
def persist(taskset)
taskset_finished_at = taskset.finished_at
return if taskset_finished_at.blank?
p_taskset = ActiveRecord::Base.transaction do
h = taskset.to_h
h.delete('tasks')
h.delete('slaves')
h.delete('worker_logs')
Persistence::Taskset.create(h)
end
ActiveRecord::Base.transaction do
p_slaves = taskset.slaves.map do |slave|
h = slave.to_h
h.delete('trials')
p_slave = Persistence::Slave.new(h)
p_slave.taskset_id = p_taskset.id
p_slave
end
Persistence::Slave.import(p_slaves)
p_slaves.each { |p_slave| p_slave.run_callbacks(:save) {} }
end
ActiveRecord::Base.transaction do
p_tasks = taskset.tasks.map do |task|
h = task.to_h
h.delete('taskset')
h.delete('trials')
p_task = Persistence::Task.new(h)
p_task.taskset_id = p_taskset
p_task
end
Persistence::Task.import(p_tasks)
p_tasks.each { |p_task| p_task.run_callbacks(:save) {} }
end
p_slaves = {}
p_taskset.slaves.each do |p_slave|
p_slaves[p_slave.key] = p_slave
end
ActiveRecord::Base.transaction do
p_trials = []
p_taskset.tasks.each do |p_task|
Task.new(p_task.key).trials.each do |trial|
h = trial.to_h
next if h['finished_at'].blank? || h['finished_at'] > taskset_finished_at
slave_key = h.delete('slave')['key']
h.delete('task')
p_trial = Persistence::Trial.new(h)
p_trial.task_id = p_task
p_trial.slave_id = p_slaves[slave_key]
p_trials << p_trial
end
end
Persistence::Trial.import(p_trials)
p_trials.each { |p_trial| p_trial.run_callbacks(:save) {} }
end
ActiveRecord::Base.transaction do
p_worker_logs = taskset.worker_logs.map do |worker_log|
h = worker_log.to_h
h['worker_key'] = h['worker']['key']
h.delete('worker')
h.delete('taskset')
p_worker_log = Persistence::WorkerLog.new(h)
p_worker_log.taskset_id = p_taskset
p_worker_log
end
Persistence::WorkerLog.import(p_worker_logs)
p_worker_logs.each { |p_worker_log| p_worker_log.run_callbacks(:save) {} }
end
end
def create_api_cache(taskset, path)
p_obj = Persistence::Taskset.where(key: taskset.key).full.first
json = JSON.generate(p_obj.as_full_json.update('is_full' => true))
FileUtils.mkdir_p(File.join(path, 'v1', 'tasksets'))
json_path = File.join(path, 'v1', 'tasksets', taskset.key.gsub(':', '-'))
IO.write(json_path, json)
Zlib::GzipWriter.open(json_path + ".gz") { |gz| gz.write(json) }
end
ESTIMATION_FIELDS = [
"`spec_file`",
"avg(UNIX_TIMESTAMP(`trials`.`finished_at`)-UNIX_TIMESTAMP(`trials`.`started_at`)) as `avg`",
# "avg(`trials`.`finished_at`-`trials`.`started_at`) as `avg`",
]
def update_estimate_sec(taskset)
p_obj = Persistence::Taskset.where(key: taskset.key).first
taskset_class = p_obj.taskset_class
query = Persistence::Task.joins(:trials).joins(:taskset).
select(ESTIMATION_FIELDS).
where('tasksets.taskset_class' => taskset_class).
where('trials.status' => ["passed", "pending"]).
group('spec_file')
estimation = {}
query.each do |row|
estimation[row.spec_file] = row.avg.to_i
end
unless estimation.empty?
TasksetEstimation.update_estimate_secs(taskset_class, estimation)
end
end
end
end
end