Skip to content

Commit

Permalink
aloc_sink: avoid exception when _worker_name is None
Browse files Browse the repository at this point in the history
  • Loading branch information
slfritchie committed Feb 6, 2020
1 parent d0f99ec commit 35d2038
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions testing/correctness/tests/aloc_sink/aloc_sink_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,6 @@ def _handle_frame(self, frame):
# credits and will rely solely on TCP backpressure for its
# own backpressure signalling.

self._worker_name = msg.instance_name
global active_workers
with active_workers as active_workers_l:
if self._worker_name in active_workers_l:
Expand Down Expand Up @@ -278,6 +277,7 @@ def _handle_frame(self, frame):
logging.info(txt)
ok = cwm.Ok(500)
self.write(ok)
self._worker_name = msg.instance_name
active_workers_l[self._worker_name] = True
logging.info("Add {} to active_workers".format(self._worker_name))

Expand Down Expand Up @@ -620,7 +620,7 @@ def handle_message_streamx(self, msg):
.format(regexp, bs))

key = (self._worker_name, msg.stream_id)
if msg.message_id != None:
if msg.message_id is not None:
logging.debug('msg.message_id = {} worker {}'.format(msg.message_id, self._worker_name))
if self._output_offset == msg.message_id:
(ret1, new_offset) = self._twopc_out.append_output(bs, self._output_offset)
Expand Down Expand Up @@ -714,14 +714,16 @@ def close(self, delete_from_active_workers = True):
## may not have been initialized yet. No harm.
None

if delete_from_active_workers:
if self._worker_name is None:
logging.info("close: _worker_name is None")
elif delete_from_active_workers:
global active_workers
with active_workers as active_workers_l:
if not (self._worker_name in active_workers_l):
raise Exception("Lock not active for {}"
.format(self._worker_name))
del active_workers_l[self._worker_name]
logging.info("Remove {} from active_workers".format(self._worker_name))
if self._worker_name in active_workers_l:
del active_workers_l[self._worker_name]
logging.info("Remove {} from active_workers".format(self._worker_name))
else:
logging.info("Lock not active for {}".format(self._worker_name))

def log_it(self, log):
log.insert(0, time.time())
Expand Down

0 comments on commit 35d2038

Please sign in to comment.