5
5
*/
6
6
final class PhabricatorWorkerLeaseQuery extends PhabricatorQuery {
7
7
8
+ const PHASE_LEASED = 'leased ' ;
8
9
const PHASE_UNLEASED = 'unleased ' ;
9
10
const PHASE_EXPIRED = 'expired ' ;
10
11
11
12
private $ ids ;
12
13
private $ objectPHIDs ;
13
14
private $ limit ;
14
15
private $ skipLease ;
16
+ private $ leased = false ;
15
17
16
18
public static function getDefaultWaitBeforeRetry () {
17
19
return phutil_units ('5 minutes in seconds ' );
@@ -45,14 +47,44 @@ public function withObjectPHIDs(array $phids) {
45
47
return $ this ;
46
48
}
47
49
50
+ /**
51
+ * Select only leased tasks, only unleased tasks, or both types of task.
52
+ *
53
+ * By default, queries select only unleased tasks (equivalent to passing
54
+ * `false` to this method). You can pass `true` to select only leased tasks,
55
+ * or `null` to ignore the lease status of tasks.
56
+ *
57
+ * If your result set potentially includes leased tasks, you must disable
58
+ * leasing using @{method:setSkipLease}. These options are intended for use
59
+ * when displaying task status information.
60
+ *
61
+ * @param mixed `true` to select only leased tasks, `false` to select only
62
+ * unleased tasks (default), or `null` to select both.
63
+ * @return this
64
+ */
65
+ public function withLeasedTasks ($ leased ) {
66
+ $ this ->leased = $ leased ;
67
+ return $ this ;
68
+ }
69
+
48
70
public function setLimit ($ limit ) {
49
71
$ this ->limit = $ limit ;
50
72
return $ this ;
51
73
}
52
74
53
75
public function execute () {
54
76
if (!$ this ->limit ) {
55
- throw new Exception ('You must setLimit() when leasing tasks. ' );
77
+ throw new Exception (
78
+ pht ('You must setLimit() when leasing tasks. ' ));
79
+ }
80
+
81
+ if ($ this ->leased !== false ) {
82
+ if (!$ this ->skipLease ) {
83
+ throw new Exception (
84
+ pht (
85
+ 'If you potentially select leased tasks using withLeasedTasks(), ' .
86
+ 'you MUST disable lease acquisition by calling setSkipLease(). ' ));
87
+ }
56
88
}
57
89
58
90
$ task_table = new PhabricatorWorkerActiveTask ();
@@ -65,10 +97,16 @@ public function execute() {
65
97
// find enough tasks, try tasks with expired leases (i.e., tasks which have
66
98
// previously failed).
67
99
68
- $ phases = array (
69
- self ::PHASE_UNLEASED ,
70
- self ::PHASE_EXPIRED ,
71
- );
100
+ // If we're selecting leased tasks, look for them first.
101
+
102
+ $ phases = array ();
103
+ if ($ this ->leased !== false ) {
104
+ $ phases [] = self ::PHASE_LEASED ;
105
+ }
106
+ if ($ this ->leased !== true ) {
107
+ $ phases [] = self ::PHASE_UNLEASED ;
108
+ $ phases [] = self ::PHASE_EXPIRED ;
109
+ }
72
110
$ limit = $ this ->limit ;
73
111
74
112
$ leased = 0 ;
@@ -160,13 +198,22 @@ public function execute() {
160
198
$ tasks [$ row ['id ' ]]->setData ($ task_data );
161
199
}
162
200
201
+ if ($ this ->skipLease ) {
202
+ // Reorder rows into the original phase order if this is a status query.
203
+ $ tasks = array_select_keys ($ tasks , $ task_ids );
204
+ }
205
+
163
206
return $ tasks ;
164
207
}
165
208
166
209
private function buildWhereClause (AphrontDatabaseConnection $ conn_w , $ phase ) {
167
210
$ where = array ();
168
211
169
212
switch ($ phase ) {
213
+ case self ::PHASE_LEASED :
214
+ $ where [] = 'leaseOwner IS NOT NULL ' ;
215
+ $ where [] = 'leaseExpires >= UNIX_TIMESTAMP() ' ;
216
+ break ;
170
217
case self ::PHASE_UNLEASED :
171
218
$ where [] = 'leaseOwner IS NULL ' ;
172
219
break ;
@@ -177,7 +224,7 @@ private function buildWhereClause(AphrontDatabaseConnection $conn_w, $phase) {
177
224
throw new Exception ("Unknown phase ' {$ phase }'! " );
178
225
}
179
226
180
- if ($ this ->ids ) {
227
+ if ($ this ->ids !== null ) {
181
228
$ where [] = qsprintf ($ conn_w , 'id IN (%Ld) ' , $ this ->ids );
182
229
}
183
230
@@ -199,6 +246,11 @@ private function buildUpdateWhereClause(
199
246
// `IN (NULL)` doesn't match NULL.
200
247
201
248
switch ($ phase ) {
249
+ case self ::PHASE_LEASED :
250
+ throw new Exception (
251
+ pht (
252
+ 'Trying to lease tasks selected in the leased phase! This is ' .
253
+ 'intended to be imposssible. ' ));
202
254
case self ::PHASE_UNLEASED :
203
255
$ where [] = qsprintf ($ conn_w , 'leaseOwner IS NULL ' );
204
256
$ where [] = qsprintf ($ conn_w , 'id IN (%Ld) ' , ipull ($ rows , 'id ' ));
@@ -223,6 +275,10 @@ private function buildUpdateWhereClause(
223
275
224
276
private function buildOrderClause (AphrontDatabaseConnection $ conn_w , $ phase ) {
225
277
switch ($ phase ) {
278
+ case self ::PHASE_LEASED :
279
+ // Ideally we'd probably order these by lease acquisition time, but
280
+ // we don't have that handy and this is a good approximation.
281
+ return qsprintf ($ conn_w , 'ORDER BY priority ASC, id ASC ' );
226
282
case self ::PHASE_UNLEASED :
227
283
// When selecting new tasks, we want to consume them in order of
228
284
// increasing priority (and then FIFO).
0 commit comments