Skip to content

Commit

Permalink
Add profile backfill job (#4412)
Browse files Browse the repository at this point in the history
* Add profile backfill job

* Fix lint

* Backwards indexing backfill from current

* Add test

* getenv

* cast to int

* Add more logs

* Add scoped queue

* Fix lint

Co-authored-by: isaac <isaac@audius.co>
Co-authored-by: Raymond Jacobson <ray@audius.co>
  • Loading branch information
3 people committed Dec 1, 2022
1 parent 3cafe9d commit 6507e70
Show file tree
Hide file tree
Showing 4 changed files with 293 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import logging
from unittest import mock

from integration_tests.utils import populate_mock_db
from src.challenges.challenge_event import ChallengeEvent
from src.models.indexing.indexing_checkpoints import IndexingCheckpoint
from src.tasks.index_profile_challenge_backfill import (
enqueue_social_rewards_check,
get_latest_backfill,
index_profile_challenge_backfill_tablename,
)
from src.utils.db_session import get_db
from src.utils.update_indexing_checkpoints import save_indexed_checkpoint

logger = logging.getLogger(__name__)


@mock.patch(
"src.tasks.index_profile_challenge_backfill.get_config_backfill", autospec=True
)
@mock.patch("src.challenges.challenge_event_bus.ChallengeEventBus", autospec=True)
def test_index_profile_challenge_backfill(
bus_mock: mock.MagicMock, get_config_backfill: mock.MagicMock, app
):
get_config_backfill.return_value = 0
with app.app_context():
db = get_db()

entities = {
"users": [{}] * 100,
"reposts": [{"user_id": i, "blocknumber": i + 2} for i in range(1, 50)],
"saves": [{"user_id": i + 1, "blocknumber": i + 3} for i in range(1, 20)],
"follows": [
{
"follower_user_id": i + 4,
"followee_user_id": i + 4,
"blocknumber": i + 7,
}
for i in range(1, 60)
],
"tracks": [{"owner_id": i} for i in range(1, 7)],
}
populate_mock_db(db, entities)

enqueue_social_rewards_check(db, bus_mock)
repost_calls = [
mock.call.dispatch(ChallengeEvent.repost, i + 2, i) for i in range(1, 50)
]
save_calls = [
mock.call.dispatch(ChallengeEvent.favorite, i + 3, i + 1)
for i in range(1, 20)
]
follow_calls = [
mock.call.dispatch(ChallengeEvent.follow, i + 7, i + 4)
for i in range(1, 60)
]
calls = repost_calls + save_calls + follow_calls
bus_mock.assert_has_calls(calls, any_order=True)

with db.scoped_session() as session:
checkpoint = (
session.query(IndexingCheckpoint)
.filter(
IndexingCheckpoint.tablename
== index_profile_challenge_backfill_tablename
)
.first()
)

assert checkpoint.last_checkpoint == -901


@mock.patch(
"src.tasks.index_profile_challenge_backfill.get_config_backfill", autospec=True
)
def test_index_profile_challenge_get_blocknumber(
get_config_backfill: mock.MagicMock, app
):
get_config_backfill.return_value = 10
with app.app_context():
db = get_db()

# No checkpoint and no block
with db.scoped_session() as session:
cp_1 = get_latest_backfill(session, 10)
assert cp_1 == None

entities = {"users": [{}] * 100}
populate_mock_db(db, entities)

# No checkpoint - so return latest block
with db.scoped_session() as session:
cp_2 = get_latest_backfill(session, 10)
assert cp_2 == 99

save_indexed_checkpoint(
session, index_profile_challenge_backfill_tablename, 80
)

# with checkpoint greater than backfill stopping blocknumber
cp_3 = get_latest_backfill(session, 10)
assert cp_3 == 80

save_indexed_checkpoint(
session, index_profile_challenge_backfill_tablename, 7
)

# with checkpoint less than backfill stopping blocknumber
cp_3 = get_latest_backfill(session, 10)
assert cp_3 == None
5 changes: 5 additions & 0 deletions discovery-provider/src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,10 @@ def configure_celery(celery, test_config=None):
"update_track_is_available": {
"task": "update_track_is_available",
"schedule": timedelta(hours=3),
},
"index_profile_challenge_backfill": {
"task": "index_profile_challenge_backfill",
"schedule": timedelta(minutes=1),
}
# UNCOMMENT BELOW FOR MIGRATION DEV WORK
# "index_solana_user_data": {
Expand Down Expand Up @@ -598,6 +602,7 @@ def configure_celery(celery, test_config=None):
redis_inst.delete("prune_plays_lock")
redis_inst.delete("update_aggregate_table:aggregate_user_tips")
redis_inst.delete("spl_token_backfill_lock")
redis_inst.delete("profile_challenge_backfill_lock")
redis_inst.delete(INDEX_REACTIONS_LOCK)
redis_inst.delete(UPDATE_TRACK_IS_AVAILABLE_LOCK)

Expand Down
15 changes: 15 additions & 0 deletions discovery-provider/src/queries/health_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@
from src.queries.get_latest_play import get_latest_play
from src.queries.get_sol_plays import get_latest_sol_play_check_info
from src.queries.queries import parse_bool_param
from src.tasks.index_profile_challenge_backfill import (
index_profile_challenge_backfill_tablename,
)
from src.utils import helpers, redis_connection
from src.utils.db_session import get_db_read_replica
from src.utils.elasticdsl import esclient
from src.utils.update_indexing_checkpoints import get_last_indexed_checkpoint

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -153,3 +158,13 @@ def db_seed_restore_check():
def location():
location = get_location()
return success_response(location, sign_response=False)


@bp.route("/backfill_profile_challenge", methods=["GET"])
def backfill_profile_challenge_check():
db = get_db_read_replica()
with db.scoped_session() as session:
checkpoint = get_last_indexed_checkpoint(
session, index_profile_challenge_backfill_tablename
)
return success_response(checkpoint, sign_response=False)
163 changes: 163 additions & 0 deletions discovery-provider/src/tasks/index_profile_challenge_backfill.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
import logging
import os
from typing import Optional

from redis import Redis
from sqlalchemy.orm.session import Session
from src.challenges.challenge_event_bus import ChallengeEventBus
from src.models.indexing.block import Block
from src.models.social.follow import Follow
from src.models.social.repost import Repost
from src.models.social.save import Save
from src.tasks.celery_app import celery
from src.tasks.social_features import (
dispatch_challenge_follow,
dispatch_challenge_repost,
)
from src.tasks.user_library import dispatch_favorite
from src.utils.prometheus_metric import save_duration_metric
from src.utils.session_manager import SessionManager
from src.utils.update_indexing_checkpoints import (
get_last_indexed_checkpoint,
save_indexed_checkpoint,
)

logger = logging.getLogger(__name__)

index_profile_challenge_backfill_tablename = "index_profile_challenge_backfill"

# Number of blocks to scan through at a time
BLOCK_INTERVAL = 1000


def enqueue_social_rewards_check(db: SessionManager, challenge_bus: ChallengeEventBus):
with db.scoped_session() as session:
backfill_blocknumber = get_config_backfill()
# check config for value
if backfill_blocknumber == None:
logger.info(
"index_profile_challenge_backfill.py | backfill block number not set"
)
return None
backfill_blocknumber = int(backfill_blocknumber)
block_backfill = get_latest_backfill(session, backfill_blocknumber)
if block_backfill is None:
logger.info("index_profile_challenge_backfill.py | Backfill complete")
return
# Do it
min_blocknumber = block_backfill - BLOCK_INTERVAL
# reposts of tracks and playlists reposts
reposts = (
session.query(Repost)
.filter(
Repost.blocknumber <= block_backfill,
Repost.blocknumber > min_blocknumber,
)
.all()
)
logger.info(
f"index_profile_challenge_backfill.py | calculated {len(reposts)} reposts"
)
for repost in reposts:
repost_blocknumber: int = repost.blocknumber
dispatch_challenge_repost(challenge_bus, repost, repost_blocknumber)

saves = (
session.query(Save)
.filter(
Save.blocknumber <= block_backfill,
Save.blocknumber > min_blocknumber,
)
.all()
)
logger.info(
f"index_profile_challenge_backfill.py | calculated {len(saves)} saves"
)
for save in saves:
save_blocknumber: int = save.blocknumber
dispatch_favorite(challenge_bus, save, save_blocknumber)

follows = (
session.query(Follow)
.filter(
Follow.blocknumber <= block_backfill,
Follow.blocknumber > min_blocknumber,
)
.all()
)
logger.info(
f"index_profile_challenge_backfill.py | calculated {len(follows)} follows"
)
for follow in follows:
follow_blocknumber: int = follow.blocknumber
dispatch_challenge_follow(challenge_bus, follow, follow_blocknumber)

save_indexed_checkpoint(
session, index_profile_challenge_backfill_tablename, min_blocknumber
)


def get_config_backfill():
return os.getenv("audius_discprov_backfill_social_rewards_blocknumber")


def get_latest_backfill(session: Session, backfill_blocknumber: int) -> Optional[int]:
try:
checkpoint = get_last_indexed_checkpoint(
session, index_profile_challenge_backfill_tablename
)
# If the checkpoints is not set, start from the current blocknumber
if checkpoint == 0:
block = session.query(Block).filter(Block.is_current == True).first()
if not block:
return None
block_number = block.number
return block_number

if checkpoint <= backfill_blocknumber:
logger.info("index_profile_challenge_backfill.py | backfill complete")
return None

return checkpoint

except Exception as e:
logger.error(
"index_profile_challenge_backfill.py | Error during get_latest_backfill",
exc_info=True,
)
raise e


# ####### CELERY TASKS ####### #
@celery.task(name="index_profile_challenge_backfill", bind=True)
@save_duration_metric(metric_group="celery_task")
def index_profile_challenge_backfill(self):
redis: Redis = index_profile_challenge_backfill.redis
db: SessionManager = index_profile_challenge_backfill.db
challenge_bus: ChallengeEventBus = (
index_profile_challenge_backfill.challenge_event_bus
)

# Define lock acquired boolean
have_lock = False
# Max duration of lock is 1 hr
update_lock = redis.lock("profile_challenge_backfill_lock", timeout=3600)

try:
# Attempt to acquire lock - do not block if unable to acquire
have_lock = update_lock.acquire(blocking=False)
if have_lock:
logger.info("index_profile_challenge_backfill.py | Acquired lock")
with challenge_bus.use_scoped_dispatch_queue():
enqueue_social_rewards_check(db, challenge_bus)
else:
logger.info("index_profile_challenge_backfill.py | Failed to acquire lock")
except Exception as e:
logger.error(
"index_profile_challenge_backfill.py | Fatal error in main loop",
exc_info=True,
)
raise e
finally:
if have_lock:
update_lock.release()

0 comments on commit 6507e70

Please sign in to comment.