Skip to content

Commit

Permalink
Update aggregate user repopulate to use the past two weeks (#2069)
Browse files Browse the repository at this point in the history
  • Loading branch information
isaacsolo committed Nov 19, 2021
1 parent f268693 commit 5ebf1c8
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 12 deletions.
47 changes: 35 additions & 12 deletions discovery-provider/src/tasks/index_aggregate_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,23 @@
import time
import sqlalchemy as sa
from src.tasks.celery_app import celery
from src.utils.redis_constants import most_recent_indexed_aggregate_user_block_redis_key
from src.utils.redis_constants import (
most_recent_indexed_aggregate_user_block_redis_key,
index_aggregate_user_last_refresh_completion_redis_key
)
from src.tasks.calculate_trending_challenges import get_latest_blocknumber
from src.queries.get_health import get_elapsed_time_redis

logger = logging.getLogger(__name__)

# Names of the aggregate tables to update
AGGREGATE_USER = "aggregate_user"
DEFAULT_UPDATE_TIMEOUT = 60 * 30 # 30 minutes

# every ~1000 updates, refresh the entire table
# at 30 secs interval, this should happen ~twice a day
REFRESH_COUNTER = 1000
# (1,209,600 two weeks in seconds / 5 sec block_processing_interval_sec)
TWO_WEEKS_IN_BLOCKS = 241920

TWO_WEEKS_IN_SECONDS= 1209600

### UPDATE_AGGREGATE_USER_QUERY ###
# Get a lower bound blocknumber to check for new entity counts for a user
Expand Down Expand Up @@ -349,6 +354,7 @@ def update_aggregate_table(
# Attempt to acquire lock - do not block if unable to acquire
have_lock = update_lock.acquire(blocking=False)
if have_lock:
start_time = time.time()
most_recent_indexed_aggregate_block = redis.get(
most_recent_indexed_aggregate_block_key
)
Expand All @@ -360,16 +366,28 @@ def update_aggregate_table(
f"index_aggregate_user.py | most_recent_indexed_aggregate_block: {most_recent_indexed_aggregate_block}"
)

elapsed_time = get_elapsed_time_redis(redis,
index_aggregate_user_last_refresh_completion_redis_key)

is_refreshed = False
with db.scoped_session() as session:
latest_indexed_block_num = get_latest_blocknumber(session, redis)
start_time = time.time()

if (
not most_recent_indexed_aggregate_block
or latest_indexed_block_num % REFRESH_COUNTER == 0
):
# re-create entire table
# repopulate entire table
logger.info(f"index_aggregate_user.py | Repopulating {table_name}")
most_recent_indexed_aggregate_block = 0
session.execute("TRUNCATE TABLE {}".format(table_name))
is_refreshed = True

elif not elapsed_time or elapsed_time > TWO_WEEKS_IN_SECONDS:
# refresh the past two weeks for data accuracy
logger.info(f"index_aggregate_user.py | Refreshing {table_name} for the past two weeks")
most_recent_indexed_aggregate_block -= max(TWO_WEEKS_IN_BLOCKS, 0)
is_refreshed = True

logger.info(f"index_aggregate_user.py | Updating {table_name}")
upsert = sa.text(query)
session.execute(
Expand All @@ -379,13 +397,18 @@ def update_aggregate_table(
},
)

# set new block to be the lower bound for the next indexing
if is_refreshed:
redis.set(
most_recent_indexed_aggregate_block_key, latest_indexed_block_num
)
logger.info(
f"""index_aggregate_user.py | Finished updating {table_name} in: {time.time()-start_time} sec"""
index_aggregate_user_last_refresh_completion_redis_key, int(time.time())
)

# set new block to be the lower bound for the next indexing
redis.set(
most_recent_indexed_aggregate_block_key, latest_indexed_block_num
)
logger.info(
f"""index_aggregate_user.py | Finished updating {table_name} in: {time.time()-start_time} sec"""
)
else:
logger.info(
f"index_aggregate_user.py | Failed to acquire lock update_aggregate_table:{table_name}"
Expand Down
2 changes: 2 additions & 0 deletions discovery-provider/src/utils/redis_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
"most_recent_indexed_ipld_block_hash_redis_key"
)
most_recent_indexed_aggregate_user_block_redis_key = "most_recent_indexed_aggregate_user_block"
index_aggregate_user_last_refresh_completion_redis_key = "index_aggregate_user:last-refresh-completion"

trending_tracks_last_completion_redis_key = "trending:tracks:last-completion"
trending_playlists_last_completion_redis_key = "trending-playlists:last-completion"
challenges_last_processed_event_redis_key = "challenges:last-processed-event"
Expand Down

0 comments on commit 5ebf1c8

Please sign in to comment.