Skip to content

Commit

Permalink
Add Playlist Trending Endpoint (#1288)
Browse files Browse the repository at this point in the history
* Trending playlists

* Fix lint

* PR tweaks

* Fix followers edge case

* Adding tracks WIP

* Add track fixes

* Trending celery task

* Fix non-full trending playlists endpoint

* Fix documentation

* Fix lint

* PR revisions
  • Loading branch information
piazzatron committed Mar 12, 2021
1 parent 1dd8e19 commit 764ad08
Show file tree
Hide file tree
Showing 13 changed files with 523 additions and 136 deletions.
10 changes: 7 additions & 3 deletions discovery-provider/src/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import alembic.config # pylint: disable=E0611

from src import exceptions
from src.queries import queries, search, search_queries, health_check, trending, notifications
from src.queries import queries, search, search_queries, health_check, notifications
from src.api.v1 import api as api_v1
from src.utils import helpers, config
from src.utils.session_manager import SessionManager
Expand Down Expand Up @@ -276,7 +276,6 @@ def default(self, o):

exceptions.register_exception_handlers(app)
app.register_blueprint(queries.bp)
app.register_blueprint(trending.bp)
app.register_blueprint(search.bp)
app.register_blueprint(search_queries.bp)
app.register_blueprint(notifications.bp)
Expand Down Expand Up @@ -309,7 +308,8 @@ def configure_celery(flask_app, celery, test_config=None):
"src.tasks.index_plays", "src.tasks.index_metrics",
"src.tasks.index_materialized_views",
"src.tasks.index_network_peers", "src.tasks.index_trending",
"src.tasks.cache_user_balance", "src.monitors.monitoring_queue"
"src.tasks.cache_user_balance", "src.monitors.monitoring_queue",
"src.tasks.cache_trending_playlists"
],
beat_schedule={
"update_discovery_provider": {
Expand Down Expand Up @@ -355,6 +355,10 @@ def configure_celery(flask_app, celery, test_config=None):
"monitoring_queue": {
"task": "monitoring_queue",
"schedule": timedelta(seconds=60)
},
"cache_trending_playlists": {
"task": "cache_trending_playlists",
"schedule": timedelta(minutes=30)
}
},
task_serializer="json",
Expand Down
121 changes: 115 additions & 6 deletions discovery-provider/src/api/v1/playlists.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@
from src.queries.get_playlist_tracks import get_playlist_tracks
from src.api.v1.helpers import abort_not_found, decode_with_abort, extend_playlist, extend_track, make_full_response,\
make_response, success_response, search_parser, abort_bad_request_param, decode_string_id, \
extend_user, get_default_max, get_current_user_id
extend_user, get_default_max, get_current_user_id, to_dict, format_offset, format_limit
from .models.tracks import track
from src.queries.search_queries import SearchKind, search
from src.utils.redis_cache import cache
from src.utils.redis_cache import cache, extract_key, use_redis_cache
from src.utils.redis_metrics import record_metrics
from src.queries.get_reposters_for_playlist import get_reposters_for_playlist
from src.queries.get_savers_for_playlist import get_savers_for_playlist
from src.queries.get_trending_playlists import get_trending_playlists, TRENDING_LIMIT, TRENDING_TTL_SEC
from flask.globals import request
from src.utils.db_session import get_db_read_replica

logger = logging.getLogger(__name__)

Expand All @@ -36,10 +39,17 @@ def get_playlist(playlist_id, current_user_id):
return None

def get_tracks_for_playlist(playlist_id, current_user_id=None):
args = {"playlist_id": playlist_id, "with_users": True, "current_user_id": current_user_id}
playlist_tracks = get_playlist_tracks(args)
tracks = list(map(extend_track, playlist_tracks))
return tracks
db = get_db_read_replica()
with db.scoped_session() as session:
args = {
"playlist_ids": [playlist_id],
"populate_tracks": True,
"current_user_id": current_user_id
}
playlist_tracks_map = get_playlist_tracks(session, args)
playlist_tracks = playlist_tracks_map[playlist_id]
tracks = list(map(extend_track, playlist_tracks))
return tracks

PLAYLIST_ROUTE = "/<string:playlist_id>"
@ns.route(PLAYLIST_ROUTE)
Expand Down Expand Up @@ -255,3 +265,102 @@ def get(self, playlist_id):
users = get_reposters_for_playlist(args)
users = list(map(extend_user, users))
return success_response(users)

trending_response = make_response("trending_playlists_response", ns, fields.List(fields.Nested(playlist_model)))
trending_parser = reqparse.RequestParser()
trending_parser.add_argument('time', required=False)
@ns.route("/trending")
class TrendingPlaylists(Resource):
@record_metrics
@ns.doc(
id="""Trending Playlists""",
params={
'time': 'time range to query'
},
responses={
200: 'Success',
400: 'Bad request',
500: 'Server error'
}
)
@ns.expect(trending_parser)
@ns.marshal_with(trending_response)
@cache(ttl_sec=TRENDING_TTL_SEC)
def get(self):
"""Gets top trending playlists for time period on Audius"""
args = trending_parser.parse_args()
time = args.get("time")
time = "week" if time not in ["week", "month", "year"] else time
args = {
"time": time,
"with_tracks": False
}

playlists = get_trending_playlists(args)
playlists = playlists[:TRENDING_LIMIT]
playlists = list(map(extend_playlist, playlists))

return success_response(playlists)

full_trending_playlists_response = make_full_response("full_trending_playlists_response", full_ns, fields.List(fields.Nested(full_playlist_model)))

full_trending_parser = trending_parser.copy()
full_trending_parser.add_argument('time', required=False)
full_trending_parser.add_argument('limit', required=False)
full_trending_parser.add_argument('offset', required=False)
full_trending_parser.add_argument('user_id', required=False)

@full_ns.route("/trending")
class FullTrendingPlaylists(Resource):
@full_ns.expect(full_trending_parser)
@full_ns.doc(
id="""Returns trending playlists for a time period""",
params={
'user_id': 'A User ID',
'limit': 'Limit',
'offset': 'Offset',
'time': 'week / month / year'
},
responses={
200: 'Success',
400: 'Bad request',
500: 'Server error'
}
)

def get_cache_key(self):
request_items = to_dict(request.args)
request_items.pop('limit', None)
request_items.pop('offset', None)
key = extract_key(request.path, request_items.items())
return key

@record_metrics
@full_ns.marshal_with(full_trending_playlists_response)
def get(self):
"""Get trending playlists"""
# Parse args
args = full_trending_parser.parse_args()
offset, limit = format_offset(args), format_limit(args, TRENDING_LIMIT)
current_user_id, time = args.get("user_id"), args.get("time", "week")
time = "week" if time not in ["week", "month", "year"] else time
args = {
'time': time,
'with_tracks': True,
'limit': limit,
'offset': offset
}

# If we have a user_id, we call into `get_trending_playlist`
# which fetches the cached unpopulated tracks and then
# populates metadata. Otherwise, just
# retrieve the last cached value.
if current_user_id:
decoded = decode_string_id(current_user_id)
args["current_user_id"] = decoded
playlists = get_trending_playlists(args)
else:
key = self.get_cache_key()
playlists = use_redis_cache(key, TRENDING_TTL_SEC, lambda: get_trending_playlists(args))

return success_response(playlists)
6 changes: 3 additions & 3 deletions discovery-provider/src/api/v1/tracks.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,15 +344,15 @@ def get(self):
trending_ids_route_parser.add_argument('limit', required=False, type=int, default=10)
trending_ids_route_parser.add_argument('genre', required=False, type=str)

track_id = ns.model('track_id', { "id": fields.String(required=True) })
trending_times_ids = ns.model('trending_times_ids', {
track_id = full_ns.model('track_id', { "id": fields.String(required=True) })
trending_times_ids = full_ns.model('trending_times_ids', {
"week": fields.List(fields.Nested(track_id)),
"month": fields.List(fields.Nested(track_id)),
"year": fields.List(fields.Nested(track_id))
})
trending_ids_response = make_response(
"trending_ids_response",
ns,
full_ns,
fields.Nested(trending_times_ids)
)

Expand Down
16 changes: 9 additions & 7 deletions discovery-provider/src/queries/get_health.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from src.utils.redis_constants import latest_block_redis_key, \
latest_block_hash_redis_key, most_recent_indexed_block_hash_redis_key, most_recent_indexed_block_redis_key, \
most_recent_indexed_ipld_block_redis_key, most_recent_indexed_ipld_block_hash_redis_key, \
trending_tracks_last_completion_redis_key
trending_tracks_last_completion_redis_key, trending_playlists_last_completion_redis_key


logger = logging.getLogger(__name__)
Expand All @@ -22,6 +22,11 @@
default_indexing_interval_seconds = int(
shared_config["discprov"]["block_processing_interval_sec"])

def get_elapsed_time_redis(redis, redis_key):
last_seen = redis.get(redis_key)
elapsed_time_in_sec = (int(time.time()) - int(last_seen)) if last_seen else None
return elapsed_time_in_sec

# Returns DB block state & diff
def _get_db_block_state():
db = db_session.get_db_read_replica()
Expand Down Expand Up @@ -150,12 +155,8 @@ def get_health(args, use_redis_cache=True):
latest_indexed_block_num = db_block_state["number"] or 0
latest_indexed_block_hash = db_block_state["blockhash"]

trending_tracks_last_completion = redis.get(
trending_tracks_last_completion_redis_key
)
trending_tracks_age_sec = \
(int(time.time()) - int(trending_tracks_last_completion)) \
if trending_tracks_last_completion else None
trending_tracks_age_sec = get_elapsed_time_redis(redis, trending_tracks_last_completion_redis_key)
trending_playlists_age_sec = get_elapsed_time_redis(redis, trending_playlists_last_completion_redis_key)

# Get system information monitor values
sys_info = monitors.get_monitors([
Expand All @@ -181,6 +182,7 @@ def get_health(args, use_redis_cache=True):
},
"git": os.getenv("GIT_SHA"),
"trending_tracks_age_sec": trending_tracks_age_sec,
"trending_playlists_age_sec": trending_playlists_age_sec,
**sys_info
}

Expand Down
105 changes: 65 additions & 40 deletions discovery-provider/src/queries/get_playlist_tracks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,63 +3,88 @@

from src.models import Playlist, Track
from src.utils import helpers
from src.utils.db_session import get_db_read_replica
from src.queries.query_helpers import populate_track_metadata, add_users_to_tracks

logger = logging.getLogger(__name__)

def get_playlist_tracks(args):
playlists = []
current_user_id = args.get("current_user_id")
limit = args.get("limit")
offset = args.get("offset")
def get_playlist_tracks(session, args):
"""Accepts args:
{
# optionally pass in full playlists to avoid having to fetch
"playlists": Playlist[]
db = get_db_read_replica()
with db.scoped_session() as session:
try:
playlist_id = args.get("playlist_id")
playlist = (
# not needed if playlists are passed
"playlist_ids": string[]
"current_user_id": int
"populate_tracks": boolean # whether to add users & metadata to tracks
}
Returns: {
playlist_id: Playlist
}
"""

try:
playlists = args.get("playlists")
if not playlists:
playlist_ids = args.get("playlist_ids")
playlists = (
session
.query(Playlist)
.filter(
Playlist.is_current == True,
Playlist.playlist_id == playlist_id
Playlist.playlist_id.in_(playlist_ids)
)
.first()
)
if playlist is None:
return None
playlists = list(map(helpers.model_to_dictionary, playlists))

playlist_track_ids = [track_id['track']
for track_id in playlist.playlist_contents['track_ids']]
if limit and offset:
playlist_track_ids = playlist_track_ids[offset:offset+limit]
if not playlists:
return {}

playlist_tracks = (
session
.query(Track)
.filter(
Track.is_current == True,
Track.track_id.in_(playlist_track_ids)
)
.all()
# track_id -> [playlist_id]
track_ids_set = set()
track_id_map = {}
for playlist in playlists:
playlist_id = playlist['playlist_id']
for track_id_dict in playlist['playlist_contents']['track_ids']:
track_id = track_id_dict['track']
track_ids_set.add(track_id)
if track_id not in track_id_map:
track_id_map[track_id] = [playlist_id]
else:
track_id_map[track_id].append(playlist_id)

playlist_tracks = (
session
.query(Track)
.filter(
Track.is_current == True,
Track.track_id.in_(list(track_ids_set))
)
.all()
)

tracks = helpers.query_result_to_list(playlist_tracks)
tracks = populate_track_metadata(
session, playlist_track_ids, tracks, current_user_id)
tracks = helpers.query_result_to_list(playlist_tracks)

if args.get("with_users", False):
add_users_to_tracks(session, tracks, current_user_id)
if args.get("populate_tracks"):
current_user_id = args.get("current_user_id")
tracks = populate_track_metadata(
session, list(track_ids_set), tracks, current_user_id)

tracks_dict = {track['track_id']: track for track in tracks}
add_users_to_tracks(session, tracks, current_user_id)

playlist_tracks = []
for track_id in playlist_track_ids:
playlist_tracks.append(tracks_dict[track_id])
# { playlist_id => [track]}
playlists_map = {}
for track in tracks:
track_id = track["track_id"]
parent_playlist_ids = track_id_map[track_id]
for playlist_id in parent_playlist_ids:
if playlist_id not in playlists_map:
playlists_map[playlist_id] = [track]
else:
playlists_map[playlist_id].append(track)

return playlist_tracks
return playlists_map

except sqlalchemy.orm.exc.NoResultFound:
pass
return playlists
except sqlalchemy.orm.exc.NoResultFound:
return {}

0 comments on commit 764ad08

Please sign in to comment.