Skip to content

Commit

Permalink
Add timeout to index plays lock (#912)
Browse files Browse the repository at this point in the history
* Add timeout to index plays lock

* Pass lock through to check if owned
  • Loading branch information
jowlee committed Oct 9, 2020
1 parent 155afd5 commit d8f5a52
Showing 1 changed file with 7 additions and 6 deletions.
13 changes: 7 additions & 6 deletions discovery-provider/src/tasks/index_plays.py
Expand Up @@ -24,7 +24,7 @@ def get_time_diff(previous_time):
# NOTE: indexing the plays will eventually be a part of `index_blocks`


def get_track_plays(self, db):
def get_track_plays(self, db, lock):
start_time = time.time()
job_extra_info = {'job': JOB}
with db.scoped_session() as session:
Expand Down Expand Up @@ -186,11 +186,12 @@ def get_track_plays(self, db):
build_insert_query_time)

insert_refresh_time = time.time()

if plays:
has_lock = lock.owned()
if plays and has_lock:
session.bulk_save_objects(plays)
session.execute("REFRESH MATERIALIZED VIEW CONCURRENTLY aggregate_plays")

job_extra_info['has_lock'] = has_lock
job_extra_info['number_rows_insert'] = len(plays)
job_extra_info['insert_refresh_time'] = get_time_diff(
insert_refresh_time)
Expand All @@ -210,13 +211,13 @@ def update_play_count(self):
# Define lock acquired boolean
have_lock = False

# Define redis lock object
update_lock = redis.lock("update_play_count_lock", blocking_timeout=25)
# Define redis lock object with a timeout of 10 minutes
update_lock = redis.lock("update_play_count_lock", timeout=10*60)
try:
# Attempt to acquire lock - do not block if unable to acquire
have_lock = update_lock.acquire(blocking=False)
if have_lock:
get_track_plays(self, db)
get_track_plays(self, db, update_lock)
else:
logger.error(
f"index_plays.py | update_play_count | {self.request.id} | Failed to acquire update_play_count_lock",
Expand Down

0 comments on commit d8f5a52

Please sign in to comment.