@@ -64,30 +64,46 @@ async def run(self):
64
64
token_postfix = 0
65
65
66
66
while not self .closed :
67
- while not self .waiting and len (self .processing ) < self .opts .get ("concurrency" ) and not self .closing and not self .limitUntil :
67
+ num_total = len (self .processing )
68
+ while not self .waiting and num_total < self .opts .get ("concurrency" ) and not self .closing and (not self .limitUntil or num_total == 0 ):
68
69
token_postfix += 1
69
70
token = f'{ self .id } :{ token_postfix } '
70
71
waiting_job = asyncio .ensure_future (self .getNextJob (token ))
71
72
self .processing .add (waiting_job )
72
73
73
- try :
74
- jobs , pending = await getCompleted (self .processing )
75
-
76
- jobs_to_process = [self .processJob (job , job .token ) for job in jobs ]
77
- processing_jobs = [asyncio .ensure_future (
78
- j ) for j in jobs_to_process ]
79
- pending .update (processing_jobs )
80
- self .processing = pending
74
+ num_total = len (self .processing )
81
75
82
- if ( len ( jobs ) == 0 or len ( self .processing ) == 0 ) and self . closing :
83
- # We are done processing so we can close the queue
76
+ if self .waiting and num_total > 1 :
77
+ # we have a job waiting but we have others that we could start processing already
84
78
break
85
79
86
- except Exception as e :
87
- # This should never happen or we will have an endless loop
88
- print ("ERROR:" , e )
89
- traceback .print_exc ()
90
- return
80
+ try :
81
+ jobs , pending = await getCompleted (self .processing )
82
+
83
+ jobs_to_process = [self .processJob (job , job .token ) for job in jobs ]
84
+ processing_jobs = [asyncio .ensure_future (
85
+ j ) for j in jobs_to_process ]
86
+ pending .update (processing_jobs )
87
+ self .processing = pending
88
+
89
+ # no more jobs waiting but we have others that could start processing already
90
+ if (len (jobs )== 0 and num_total > 1 ):
91
+ break
92
+
93
+ # if there are potential jobs to be processed and blockUntil is set, we should exit to avoid waiting
94
+ # for processing this job.
95
+ if self .blockUntil :
96
+ break
97
+
98
+ except Exception as e :
99
+ # This should never happen or we will have an endless loop
100
+ print ("ERROR:" , e )
101
+ traceback .print_exc ()
102
+ return
103
+
104
+ if (len (jobs ) == 0 or len (self .processing ) == 0 ) and self .closing :
105
+ # We are done processing so we can close the queue
106
+ break
91
107
92
108
self .running = False
93
109
self .timer .stop ()
@@ -119,6 +135,11 @@ async def moveToActive(self, token: str, job_id: str = None):
119
135
if job_id and job_id .startswith ('0:' ):
120
136
self .blockUntil = int (job_id .split (':' )[1 ]) or 0
121
137
138
+ # remove marker from active list
139
+ await self .client .lrem (self .scripts .toKey ('active' ), 1 , job_id )
140
+ if self .blockUntil > 0 :
141
+ return
142
+
122
143
result = await self .scripts .moveToActive (token , self .opts , job_id )
123
144
job_data = None
124
145
id = None
@@ -127,21 +148,15 @@ async def moveToActive(self, token: str, job_id: str = None):
127
148
128
149
if result :
129
150
job_data , id , limit_until , delay_until = result
151
+ self .updateDelays (limit_until , delay_until )
152
+ return self .nextJobFromJobData (job_data , id , token )
130
153
131
- return self .nextJobFromJobData (job_data , id , limit_until , delay_until , token )
132
-
133
- def nextJobFromJobData (self , job_data = None , job_id : str = None , limit_until : int = 0 ,
134
- delay_until : int = 0 , token : str = None ):
135
- self .limitUntil = max (limit_until , 0 ) or 0
136
-
154
+ def nextJobFromJobData (self , job_data = None , job_id : str = None , token : str = None ):
137
155
if not job_data :
138
156
if not self .drained :
139
157
self .drained = True
140
158
self .blockUntil = 0
141
159
142
- if delay_until :
143
- self .blockUntil = max (delay_until , 0 ) or 0
144
-
145
160
if job_data :
146
161
self .drained = False
147
162
job_instance = Job .fromJSON (self , job_data , job_id )
@@ -159,12 +174,17 @@ async def waitForJob(self):
159
174
job_id = await self .bclient .brpoplpush (self .scripts .keys ["wait" ], self .scripts .keys ["active" ], timeout )
160
175
161
176
return job_id
177
+
178
+ def updateDelays (self , limit_until = 0 , delay_until = 0 ):
179
+ self .limitUntil = max (limit_until , 0 ) or 0
180
+ self .blockUntil = max (delay_until , 0 ) or 0
162
181
163
182
async def processJob (self , job : Job , token : str ):
164
183
try :
165
184
self .jobs .add ((job , token ))
166
185
result = await self .processor (job , token )
167
186
if not self .forceClosing :
187
+ # TODO get next job from this script
168
188
await self .scripts .moveToCompleted (job , result , job .opts .get ("removeOnComplete" , False ), token , self .opts , fetchNext = not self .closing )
169
189
job .returnvalue = result
170
190
self .emit ("completed" , job , result )
0 commit comments