Skip to content

Commit

Permalink
Remove cache on index update (#930)
Browse files Browse the repository at this point in the history
  • Loading branch information
jowlee committed Oct 14, 2020
1 parent ff4a976 commit 7204f26
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 35 deletions.
9 changes: 3 additions & 6 deletions discovery-provider/src/queries/get_unpopulated_playlists.py
Expand Up @@ -5,13 +5,10 @@
from src.utils import redis_connection
from src.models import Playlist
from src.utils import helpers
from src.utils.redis_cache import get_playlist_id_cache_key

ttl_sec = 60


def get_playlist_id_cache_key(id):
return "playlist:id:{}".format(id)

# Cache unpopulated playlists for 5 min
ttl_sec = 5*60

def get_cached_playlists(playlist_ids):
redis_playlist_id_keys = map(get_playlist_id_cache_key, playlist_ids)
Expand Down
8 changes: 3 additions & 5 deletions discovery-provider/src/queries/get_unpopulated_tracks.py
Expand Up @@ -6,14 +6,12 @@
from src.utils import redis_connection
from src.models import Track
from src.utils import helpers
from src.utils.redis_cache import get_track_id_cache_key

logger = logging.getLogger(__name__)

ttl_sec = 60

def get_track_id_cache_key(id):
return "track:id:{}".format(id)

# Cache unpopulated tracks for 5 min
ttl_sec = 5*60

def get_cached_tracks(track_ids):
redis_track_id_keys = map(get_track_id_cache_key, track_ids)
Expand Down
8 changes: 3 additions & 5 deletions discovery-provider/src/queries/get_unpopulated_users.py
Expand Up @@ -5,13 +5,11 @@
from src.utils import redis_connection
from src.models import User
from src.utils import helpers
from src.utils.redis_cache import get_user_id_cache_key

ttl_sec = 60


def get_user_id_cache_key(id):
return "user:id:{}".format(id)

# Cache unpopulated users for 5 min
ttl_sec = 5*60

def get_cached_users(user_ids):
redis_user_id_keys = map(get_user_id_cache_key, user_ids)
Expand Down
31 changes: 18 additions & 13 deletions discovery-provider/src/tasks/index.py
Expand Up @@ -10,7 +10,10 @@
from src.tasks.user_library import user_library_state_update
from src.utils.helpers import get_ipfs_info_from_cnode_endpoint
from src.utils.redis_constants import latest_block_redis_key, \
latest_block_hash_redis_key, most_recent_indexed_block_hash_redis_key, most_recent_indexed_block_redis_key
latest_block_hash_redis_key, most_recent_indexed_block_hash_redis_key, \
most_recent_indexed_block_redis_key
from src.utils.redis_cache import remove_cached_user_ids, \
remove_cached_track_ids, remove_cached_playlist_ids

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -194,19 +197,14 @@ def index_blocks(self, db, blocks_list):
user_library_factory_txs.append(tx_receipt)

# bulk process operations once all tx's for block have been parsed
user_state_changed = (
user_state_update(
self, update_task, session, user_factory_txs, block_number, block_timestamp
)
> 0
)
total_user_changes, user_ids = user_state_update(
self, update_task, session, user_factory_txs, block_number, block_timestamp)
user_state_changed = total_user_changes > 0

track_state_changed = (
track_state_update(
self, update_task, session, track_factory_txs, block_number, block_timestamp
)
> 0
total_track_changes, track_ids = track_state_update(
self, update_task, session, track_factory_txs, block_number, block_timestamp
)
track_state_changed = total_track_changes > 0

social_feature_state_changed = ( # pylint: disable=W0612
social_feature_state_update(
Expand All @@ -216,9 +214,10 @@ def index_blocks(self, db, blocks_list):
)

# Playlist state operations processed in bulk
playlist_state_changed = playlist_state_update(
total_playlist_changes, playlist_ids = playlist_state_update(
self, update_task, session, playlist_factory_txs, block_number, block_timestamp
)
playlist_state_changed = total_playlist_changes > 0

user_library_state_changed = user_library_state_update( # pylint: disable=W0612
self, update_task, session, user_library_factory_txs, block_number, block_timestamp
Expand All @@ -232,11 +231,17 @@ def index_blocks(self, db, blocks_list):
session.flush()
if user_state_changed:
session.execute("REFRESH MATERIALIZED VIEW user_lexeme_dict")
if user_ids:
remove_cached_user_ids(redis, user_ids)
if track_lexeme_state_changed:
if track_ids:
remove_cached_track_ids(redis, track_ids)
session.execute("REFRESH MATERIALIZED VIEW track_lexeme_dict")
if playlist_state_changed:
session.execute("REFRESH MATERIALIZED VIEW playlist_lexeme_dict")
session.execute("REFRESH MATERIALIZED VIEW album_lexeme_dict")
if playlist_ids:
remove_cached_playlist_ids(redis, playlist_ids)

# add the block number of the most recently processed block to redis
redis.set(most_recent_indexed_block_redis_key, block.number)
Expand Down
8 changes: 6 additions & 2 deletions discovery-provider/src/tasks/playlists.py
Expand Up @@ -15,8 +15,11 @@ def playlist_state_update(
):
"""Return int representing number of Playlist model state changes found in transaction."""
num_total_changes = 0
# This stores the playlist_ids created or updated in the set of transactions
playlist_ids = set()

if not playlist_factory_txs:
return num_total_changes
return num_total_changes, playlist_ids

playlist_abi = update_task.abi_values["PlaylistFactory"]["abi"]
playlist_contract = update_task.web3.eth.contract(
Expand All @@ -32,6 +35,7 @@ def playlist_state_update(
processedEntries = 0 # if record does not get added, do not count towards num_total_changes
for entry in playlist_events_tx:
playlist_id = entry["args"]._playlistId
playlist_ids.add(playlist_id)

if playlist_id not in playlist_events_lookup:
existing_playlist_entry = lookup_playlist_record(
Expand Down Expand Up @@ -67,7 +71,7 @@ def playlist_state_update(
invalidate_old_playlist(session, playlist_id)
session.add(value_obj["playlist"])

return num_total_changes
return num_total_changes, playlist_ids


def lookup_playlist_record(update_task, session, entry, block_number):
Expand Down
9 changes: 7 additions & 2 deletions discovery-provider/src/tasks/tracks.py
Expand Up @@ -25,8 +25,12 @@
def track_state_update(self, update_task, session, track_factory_txs, block_number, block_timestamp):
"""Return int representing number of Track model state changes found in transaction."""
num_total_changes = 0

# This stores the track_ids created or updated in the set of transactions
track_ids = set()

if not track_factory_txs:
return num_total_changes
return num_total_changes, track_ids

track_abi = update_task.abi_values["TrackFactory"]["abi"]
track_contract = update_task.web3.eth.contract(
Expand All @@ -40,6 +44,7 @@ def track_state_update(self, update_task, session, track_factory_txs, block_numb
for entry in track_events_tx:
event_args = entry["args"]
track_id = event_args._trackId if '_trackId' in event_args else event_args._id
track_ids.add(track_id)
blockhash = update_task.web3.toHex(entry.blockHash)

if track_id not in track_events:
Expand Down Expand Up @@ -77,7 +82,7 @@ def track_state_update(self, update_task, session, track_factory_txs, block_numb
invalidate_old_track(session, track_id)
session.add(value_obj["track"])

return num_total_changes
return num_total_changes, track_ids


def lookup_track_record(update_task, session, entry, event_track_id, block_number, block_hash):
Expand Down
6 changes: 4 additions & 2 deletions discovery-provider/src/tasks/users.py
Expand Up @@ -15,8 +15,9 @@ def user_state_update(self, update_task, session, user_factory_txs, block_number
"""Return int representing number of User model state changes found in transaction."""

num_total_changes = 0
user_ids = set()
if not user_factory_txs:
return num_total_changes
return num_total_changes, user_ids

user_abi = update_task.abi_values["UserFactory"]["abi"]
user_contract = update_task.web3.eth.contract(
Expand All @@ -38,6 +39,7 @@ def user_state_update(self, update_task, session, user_factory_txs, block_number
processedEntries = 0 # if record does not get added, do not count towards num_total_changes
for entry in user_events_tx:
user_id = entry["args"]._userId
user_ids.add(user_id)

# if the user id is not in the lookup object, it hasn't been initialized yet
# first, get the user object from the db(if exists or create a new one)
Expand Down Expand Up @@ -78,7 +80,7 @@ def user_state_update(self, update_task, session, user_factory_txs, block_number
invalidate_old_user(session, user_id)
session.add(value_obj["user"])

return num_total_changes
return num_total_changes, user_ids


def lookup_user_record(update_task, session, entry, block_number, block_timestamp):
Expand Down
38 changes: 38 additions & 0 deletions discovery-provider/src/utils/redis_cache.py
Expand Up @@ -98,3 +98,41 @@ def inner_wrap(*args, **kwargs):
return transform(response)
return inner_wrap
return outer_wrap


def get_user_id_cache_key(id):
return "user:id:{}".format(id)


def get_track_id_cache_key(id):
return "track:id:{}".format(id)


def get_playlist_id_cache_key(id):
return "playlist:id:{}".format(id)


def remove_cached_user_ids(redis, user_ids):
try:
user_keys = list(map(get_user_id_cache_key, user_ids))
redis.delete(*user_keys)
except Exception as e:
logger.error(
"Unable to remove cached users: %s", e, exc_info=True)


def remove_cached_track_ids(redis, track_ids):
try:
track_keys = list(map(get_track_id_cache_key, track_ids))
redis.delete(*track_keys)
except Exception as e:
logger.error(
"Unable to remove cached tracks: %s", e, exc_info=True)

def remove_cached_playlist_ids(redis, playlist_ids):
try:
playlist_keys = list(map(get_playlist_id_cache_key, playlist_ids))
redis.delete(*playlist_keys)
except Exception as e:
logger.error(
"Unable to remove cached playlists: %s", e, exc_info=True)

0 comments on commit 7204f26

Please sign in to comment.