Skip to content

Commit

Permalink
Celery task for refresh materialized view (#975)
Browse files Browse the repository at this point in the history
  • Loading branch information
hareeshnagaraj committed Oct 24, 2020
1 parent 374f850 commit a799433
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 18 deletions.
7 changes: 6 additions & 1 deletion discovery-provider/src/__init__.py
Expand Up @@ -286,7 +286,8 @@ def configure_celery(flask_app, celery, test_config=None):
# Update celery configuration
celery.conf.update(
imports=["src.tasks.index", "src.tasks.index_blacklist",
"src.tasks.index_cache", "src.tasks.index_plays", "src.tasks.index_metrics"],
"src.tasks.index_cache", "src.tasks.index_plays",
"src.tasks.index_metrics", "src.tasks.index_materialized_views"],
beat_schedule={
"update_discovery_provider": {
"task": "update_discovery_provider",
Expand All @@ -307,6 +308,10 @@ def configure_celery(flask_app, celery, test_config=None):
"update_metrics": {
"task": "update_metrics",
"schedule": crontab(minute=0, hour="*")
},
"update_materialized_views": {
"task": "update_materialized_views",
"schedule": timedelta(seconds=60)
}
},
task_serializer="json",
Expand Down
17 changes: 0 additions & 17 deletions discovery-provider/src/tasks/index.py
Expand Up @@ -225,23 +225,15 @@ def index_blocks(self, db, blocks_list):
self, update_task, session, user_library_factory_txs, block_number, block_timestamp
)

# keep search materialized view in sync with db
# only refresh track_lexeme_dict when necessary
# social state changes are not factored in since they don't affect track_lexeme_dict
# write out all pending transactions to db before refreshing view
track_lexeme_state_changed = (user_state_changed or track_state_changed)
session.commit()
if user_state_changed:
session.execute("REFRESH MATERIALIZED VIEW user_lexeme_dict")
if user_ids:
remove_cached_user_ids(redis, user_ids)
if track_lexeme_state_changed:
if track_ids:
remove_cached_track_ids(redis, track_ids)
session.execute("REFRESH MATERIALIZED VIEW track_lexeme_dict")
if playlist_state_changed:
session.execute("REFRESH MATERIALIZED VIEW playlist_lexeme_dict")
session.execute("REFRESH MATERIALIZED VIEW album_lexeme_dict")
if playlist_ids:
remove_cached_playlist_ids(redis, playlist_ids)

Expand Down Expand Up @@ -418,15 +410,6 @@ def revert_blocks(self, db, revert_blocks_list):
rebuild_playlist_index = rebuild_playlist_index or bool(revert_playlist_entries)
rebuild_track_index = rebuild_track_index or bool(revert_track_entries)
rebuild_user_index = rebuild_user_index or bool(revert_user_entries)

if rebuild_playlist_index:
session.execute("REFRESH MATERIALIZED VIEW playlist_lexeme_dict")
session.execute("REFRESH MATERIALIZED VIEW album_lexeme_dict")
if rebuild_user_index or rebuild_track_index:
session.execute("REFRESH MATERIALIZED VIEW track_lexeme_dict")
if rebuild_user_index:
session.execute("REFRESH MATERIALIZED VIEW user_lexeme_dict")

# TODO - if we enable revert, need to set the most_recent_indexed_block_redis_key key in redis

# calls GET identityservice/registered_creator_nodes to retrieve creator nodes currently registered on chain
Expand Down
40 changes: 40 additions & 0 deletions discovery-provider/src/tasks/index_materialized_views.py
@@ -0,0 +1,40 @@
import logging
import json
from src.tasks.celery_app import celery

logger = logging.getLogger(__name__)

def update_views(self, db):
with db.scoped_session() as session:
logger.info('index_materialized_views.py | Updating materialized views')
session.execute("REFRESH MATERIALIZED VIEW user_lexeme_dict")
session.execute("REFRESH MATERIALIZED VIEW track_lexeme_dict")
session.execute("REFRESH MATERIALIZED VIEW playlist_lexeme_dict")
session.execute("REFRESH MATERIALIZED VIEW album_lexeme_dict")
logger.info('index_materialized_views.py | Finished updating materialized views')

######## CELERY TASKS ########
@celery.task(name="update_materialized_views", bind=True)
def update_materialized_views(self):
# Cache custom task class properties
# Details regarding custom task context can be found in wiki
# Custom Task definition can be found in src/__init__.py
db = update_materialized_views.db
redis = update_materialized_views.redis
# Define lock acquired boolean
have_lock = False
# Define redis lock object
update_lock = redis.lock("update_materialized_views", timeout=7200)
try:
# Attempt to acquire lock - do not block if unable to acquire
have_lock = update_lock.acquire(blocking=False)
if have_lock:
update_views(self, db)
else:
logger.info("index_materialized_views.py | Failed to acquire update_materialized_views")
except Exception as e:
logger.error("index_materialized_views.py | Fatal error in main loop", exc_info=True)
raise e
finally:
if have_lock:
update_lock.release()

0 comments on commit a799433

Please sign in to comment.