forked from rime/librime
-
Notifications
You must be signed in to change notification settings - Fork 0
/
deployer.cc
154 lines (137 loc) · 3.82 KB
/
deployer.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
//
// Copyright RIME Developers
// Distributed under the BSD License
//
// 2011-12-01 GONG Chen <chen.sst@gmail.com>
//
#include <chrono>
#include <exception>
#include <utility>
#include <boost/date_time/posix_time/posix_time_types.hpp>
#include <filesystem>
#include <rime/deployer.h>
namespace rime {
Deployer::Deployer()
: shared_data_dir("."),
user_data_dir("."),
prebuilt_data_dir("build"),
staging_dir("build"),
sync_dir("sync"),
user_id("unknown") {}
Deployer::~Deployer() {
JoinWorkThread();
}
bool Deployer::RunTask(const string& task_name, TaskInitializer arg) {
auto c = DeploymentTask::Require(task_name);
if (!c) {
LOG(ERROR) << "unknown deployment task: " << task_name;
return false;
}
the<DeploymentTask> t(c->Create(arg));
if (!t) {
LOG(ERROR) << "error creating deployment task: " << task_name;
return false;
}
return t->Run(this);
}
bool Deployer::ScheduleTask(const string& task_name, TaskInitializer arg) {
auto c = DeploymentTask::Require(task_name);
if (!c) {
LOG(ERROR) << "unknown deployment task: " << task_name;
return false;
}
an<DeploymentTask> t(c->Create(arg));
if (!t) {
LOG(ERROR) << "error creating deployment task: " << task_name;
return false;
}
ScheduleTask(t);
return true;
}
void Deployer::ScheduleTask(an<DeploymentTask> task) {
std::lock_guard<std::mutex> lock(mutex_);
pending_tasks_.push(task);
}
an<DeploymentTask> Deployer::NextTask() {
std::lock_guard<std::mutex> lock(mutex_);
if (!pending_tasks_.empty()) {
auto result = pending_tasks_.front();
pending_tasks_.pop();
return result;
}
// there is still chance that a task is added by another thread
// right after this call... careful.
return nullptr;
}
bool Deployer::HasPendingTasks() {
std::lock_guard<std::mutex> lock(mutex_);
return !pending_tasks_.empty();
}
bool Deployer::Run() {
LOG(INFO) << "running deployment tasks:";
message_sink_("deploy", "start");
int success = 0;
int failure = 0;
do {
while (auto task = NextTask()) {
try {
if (task->Run(this))
++success;
else
++failure;
} catch (const std::exception& ex) {
++failure;
LOG(ERROR) << "Error deploying: " << ex.what();
}
// boost::this_thread::interruption_point();
}
LOG(INFO) << success + failure << " tasks ran: " << success << " success, "
<< failure << " failure.";
message_sink_("deploy", !failure ? "success" : "failure");
// new tasks could have been enqueued while we were sending the message.
// before quitting, double check if there is nothing left to do.
} while (HasPendingTasks());
return !failure;
}
bool Deployer::StartWork(bool maintenance_mode) {
if (IsWorking()) {
LOG(WARNING) << "a work thread is already running.";
return false;
}
maintenance_mode_ = maintenance_mode;
if (pending_tasks_.empty()) {
return false;
}
#ifdef RIME_NO_THREADING
LOG(INFO) << "running " << pending_tasks_.size() << " tasks in main thread.";
return Run();
#else
LOG(INFO) << "starting work thread for " << pending_tasks_.size()
<< " tasks.";
work_ = std::async(std::launch::async, [this] { Run(); });
return work_.valid();
#endif
}
bool Deployer::StartMaintenance() {
return StartWork(true);
}
bool Deployer::IsWorking() {
if (!work_.valid())
return false;
auto status = work_.wait_for(std::chrono::milliseconds(0));
return status != std::future_status::ready;
}
bool Deployer::IsMaintenanceMode() {
return maintenance_mode_ && IsWorking();
}
void Deployer::JoinWorkThread() {
if (work_.valid())
work_.get();
}
void Deployer::JoinMaintenanceThread() {
JoinWorkThread();
}
string Deployer::user_data_sync_dir() const {
return (std::filesystem::path(sync_dir) / user_id).string();
}
} // namespace rime