Skip to content

Commit 9a51a62

Browse files
committed
Make sure we're releasing all the locks, even if there's an exception
-autopull
1 parent f0ef24a commit 9a51a62

File tree

1 file changed

+71
-89
lines changed

1 file changed

+71
-89
lines changed

bodyfetcher.py

Lines changed: 71 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -60,31 +60,30 @@ def add_to_queue(self, post, should_check_site=False):
6060
post_id = d["id"]
6161
if (post_id == 3122 or post_id == 51812) and site_base == "meta.stackexchange.com":
6262
return # don't check meta sandbox, it's full of weird posts
63-
self.queue_modify_lock.acquire()
64-
if site_base not in self.queue:
65-
self.queue[site_base] = {}
66-
67-
# Something about how the queue is being filled is storing Post IDs in a list.
68-
# So, if we get here we need to make sure that the correct types are paseed.
69-
#
70-
# If the item in self.queue[site_base] is a dict, do nothing.
71-
# If the item in self.queue[site_base] is not a dict but is a list or a tuple, then convert to dict and
72-
# then replace the list or tuple with the dict.
73-
# If the item in self.queue[site_base] is neither a dict or a list, then explode.
74-
if type(self.queue[site_base]) is dict:
75-
pass
76-
elif type(self.queue[site_base]) is not dict and type(self.queue[site_base]) in [list, tuple]:
77-
post_list_dict = {}
78-
for post_list_id in self.queue[site_base]:
79-
post_list_dict[post_list_id] = None
80-
self.queue[site_base] = post_list_dict
81-
else:
82-
raise TypeError("A non-iterable is in the queue item for a given site, this will cause errors!")
63+
with self.queue_modify_lock:
64+
if site_base not in self.queue:
65+
self.queue[site_base] = {}
66+
67+
# Something about how the queue is being filled is storing Post IDs in a list.
68+
# So, if we get here we need to make sure that the correct types are paseed.
69+
#
70+
# If the item in self.queue[site_base] is a dict, do nothing.
71+
# If the item in self.queue[site_base] is not a dict but is a list or a tuple, then convert to dict and
72+
# then replace the list or tuple with the dict.
73+
# If the item in self.queue[site_base] is neither a dict or a list, then explode.
74+
if type(self.queue[site_base]) is dict:
75+
pass
76+
elif type(self.queue[site_base]) is not dict and type(self.queue[site_base]) in [list, tuple]:
77+
post_list_dict = {}
78+
for post_list_id in self.queue[site_base]:
79+
post_list_dict[post_list_id] = None
80+
self.queue[site_base] = post_list_dict
81+
else:
82+
raise TypeError("A non-iterable is in the queue item for a given site, this will cause errors!")
8383

84-
# This line only works if we are using a dict in the self.queue[site_base] object, which we should be with
85-
# the previous conversion code.
86-
self.queue[site_base][str(post_id)] = datetime.utcnow()
87-
self.queue_modify_lock.release()
84+
# This line only works if we are using a dict in the self.queue[site_base] object, which we should be with
85+
# the previous conversion code.
86+
self.queue[site_base][str(post_id)] = datetime.utcnow()
8887

8988
if GlobalVars.flovis is not None:
9089
GlobalVars.flovis.stage('bodyfetcher/enqueued', site_base, post_id,
@@ -114,9 +113,8 @@ def check_queue(self):
114113
return
115114

116115
# We're not making an API request, so explicitly store the queue
117-
self.queue_modify_lock.acquire()
118-
store_bodyfetcher_queue()
119-
self.queue_modify_lock.release()
116+
with self.queue_modify_lock:
117+
store_bodyfetcher_queue()
120118

121119
def print_queue(self):
122120
return '\n'.join(["{0}: {1}".format(key, str(len(values))) for (key, values) in self.queue.items()])
@@ -125,10 +123,9 @@ def make_api_call_for_site(self, site):
125123
if site not in self.queue:
126124
return
127125

128-
self.queue_modify_lock.acquire()
129-
new_posts = self.queue.pop(site)
130-
store_bodyfetcher_queue()
131-
self.queue_modify_lock.release()
126+
with self.queue_modify_lock:
127+
new_posts = self.queue.pop(site)
128+
store_bodyfetcher_queue()
132129

133130
new_post_ids = [int(k) for k in new_posts.keys()]
134131

@@ -137,51 +134,47 @@ def make_api_call_for_site(self, site):
137134
GlobalVars.flovis.stage('bodyfetcher/api_request', site, post_id,
138135
{'site': site, 'posts': list(new_posts.keys())})
139136

140-
self.queue_timing_modify_lock.acquire()
141-
post_add_times = [v for k, v in new_posts.items()]
142-
pop_time = datetime.utcnow()
143-
144-
for add_time in post_add_times:
145-
try:
146-
seconds_in_queue = (pop_time - add_time).total_seconds()
147-
if site in self.queue_timings:
148-
self.queue_timings[site].append(seconds_in_queue)
149-
else:
150-
self.queue_timings[site] = [seconds_in_queue]
151-
except KeyError: # XXX: Any other possible exception?
152-
continue # Skip to next item if we've got invalid data or missing values.
137+
with self.queue_timing_modify_lock:
138+
post_add_times = [v for k, v in new_posts.items()]
139+
pop_time = datetime.utcnow()
153140

154-
store_queue_timings()
141+
for add_time in post_add_times:
142+
try:
143+
seconds_in_queue = (pop_time - add_time).total_seconds()
144+
if site in self.queue_timings:
145+
self.queue_timings[site].append(seconds_in_queue)
146+
else:
147+
self.queue_timings[site] = [seconds_in_queue]
148+
except KeyError: # XXX: Any other possible exception?
149+
continue # Skip to next item if we've got invalid data or missing values.
155150

156-
self.queue_timing_modify_lock.release()
157-
self.max_ids_modify_lock.acquire()
151+
store_queue_timings()
158152

159-
if site in self.previous_max_ids and max(new_post_ids) > self.previous_max_ids[site]:
160-
previous_max_id = self.previous_max_ids[site]
161-
intermediate_posts = range(previous_max_id + 1, max(new_post_ids))
153+
with self.max_ids_modify_lock:
154+
if site in self.previous_max_ids and max(new_post_ids) > self.previous_max_ids[site]:
155+
previous_max_id = self.previous_max_ids[site]
156+
intermediate_posts = range(previous_max_id + 1, max(new_post_ids))
162157

163-
# We don't want to go over the 100-post API cutoff, so take the last
164-
# (100-len(new_post_ids)) from intermediate_posts
158+
# We don't want to go over the 100-post API cutoff, so take the last
159+
# (100-len(new_post_ids)) from intermediate_posts
165160

166-
intermediate_posts = intermediate_posts[(100 - len(new_post_ids)):]
161+
intermediate_posts = intermediate_posts[(100 - len(new_post_ids)):]
167162

168-
# new_post_ids could contain edited posts, so merge it back in
169-
combined = chain(intermediate_posts, new_post_ids)
163+
# new_post_ids could contain edited posts, so merge it back in
164+
combined = chain(intermediate_posts, new_post_ids)
170165

171-
# Could be duplicates, so uniquify
172-
posts = list(set(combined))
173-
else:
174-
posts = new_post_ids
166+
# Could be duplicates, so uniquify
167+
posts = list(set(combined))
168+
else:
169+
posts = new_post_ids
175170

176-
try:
177-
if max(new_post_ids) > self.previous_max_ids[site]:
171+
try:
172+
if max(new_post_ids) > self.previous_max_ids[site]:
173+
self.previous_max_ids[site] = max(new_post_ids)
174+
store_bodyfetcher_max_ids()
175+
except KeyError:
178176
self.previous_max_ids[site] = max(new_post_ids)
179177
store_bodyfetcher_max_ids()
180-
except KeyError:
181-
self.previous_max_ids[site] = max(new_post_ids)
182-
store_bodyfetcher_max_ids()
183-
184-
self.max_ids_modify_lock.release()
185178

186179
log('debug', "New IDs / Hybrid Intermediate IDs for {}:".format(site))
187180
if len(new_post_ids) > 30:
@@ -224,8 +217,7 @@ def make_api_call_for_site(self, site):
224217
# wait to make sure API has/updates post data
225218
time.sleep(3)
226219

227-
GlobalVars.api_request_lock.acquire()
228-
try:
220+
with GlobalVars.api_request_lock:
229221
# Respect backoff, if we were given one
230222
if GlobalVars.api_backoff_time > time.time():
231223
time.sleep(GlobalVars.api_backoff_time - time.time() + 2)
@@ -235,22 +227,18 @@ def make_api_call_for_site(self, site):
235227
except (requests.exceptions.Timeout, requests.ConnectionError, Exception):
236228
# Any failure in the request being made (timeout or otherwise) should be added back to
237229
# the queue.
238-
self.queue_modify_lock.acquire()
239-
if site in self.queue:
240-
self.queue[site].update(new_posts)
241-
else:
242-
self.queue[site] = new_posts
243-
self.queue_modify_lock.release()
244-
GlobalVars.api_request_lock.release()
230+
with self.queue_modify_lock:
231+
if site in self.queue:
232+
self.queue[site].update(new_posts)
233+
else:
234+
self.queue[site] = new_posts
245235
return
246236

247-
self.api_data_lock.acquire()
248-
add_or_update_api_data(site)
249-
self.api_data_lock.release()
237+
with self.api_data_lock:
238+
add_or_update_api_data(site)
250239

251240
message_hq = ""
252-
GlobalVars.apiquota_rw_lock.acquire()
253-
try:
241+
with GlobalVars.apiquota_rw_lock:
254242
if "quota_remaining" in response:
255243
if response["quota_remaining"] - GlobalVars.apiquota >= 5000 and GlobalVars.apiquota >= 0:
256244
tell_rooms_with("debug", "API quota rolled over with {0} requests remaining. "
@@ -276,8 +264,6 @@ def make_api_call_for_site(self, site):
276264
GlobalVars.apiquota = response["quota_remaining"]
277265
else:
278266
message_hq = "The quota_remaining property was not in the API response."
279-
finally:
280-
GlobalVars.apiquota_rw_lock.release()
281267

282268
if "error_message" in response:
283269
message_hq += " Error: {} at {} UTC.".format(response["error_message"], time_request_made)
@@ -291,9 +277,6 @@ def make_api_call_for_site(self, site):
291277
if GlobalVars.api_backoff_time < time.time() + response["backoff"]:
292278
GlobalVars.api_backoff_time = time.time() + response["backoff"]
293279

294-
finally:
295-
GlobalVars.api_request_lock.release()
296-
297280
if len(message_hq) > 0 and "site is required" not in message_hq:
298281
tell_rooms_with("debug", message_hq.strip())
299282

@@ -390,8 +373,7 @@ def make_api_call_for_site(self, site):
390373
log('error', "Exception handling answers:", e)
391374

392375
end_time = time.time()
393-
GlobalVars.posts_scan_stats_lock.acquire()
394-
GlobalVars.num_posts_scanned += num_scanned
395-
GlobalVars.post_scan_time += end_time - start_time
396-
GlobalVars.posts_scan_stats_lock.release()
376+
with GlobalVars.posts_scan_stats_lock:
377+
GlobalVars.num_posts_scanned += num_scanned
378+
GlobalVars.post_scan_time += end_time - start_time
397379
return

0 commit comments

Comments
 (0)