Skip to content

Commit

Permalink
Update get play metrics query to use hourly_play_counts (#2195)
Browse files Browse the repository at this point in the history
* Update get play metrics query to use hourly_play_counts

* Add typed arg

* add typed session arg
  • Loading branch information
isaacsolo committed Jan 21, 2022
1 parent 5cd040c commit 72aa1b3
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 58 deletions.
161 changes: 123 additions & 38 deletions discovery-provider/integration_tests/queries/test_get_plays_metrics.py
Original file line number Diff line number Diff line change
@@ -1,73 +1,158 @@
import time
from datetime import datetime, timedelta

from src.models import Play
from src.queries.get_plays_metrics import _get_plays_metrics
from integration_tests.utils import populate_mock_db
from src.queries.get_plays_metrics import GetPlayMetricsArgs, _get_plays_metrics
from src.tasks.index_hourly_play_counts import _index_hourly_play_counts
from src.utils.db_session import get_db

DAYS_IN_A_YEAR = 365

def populate_mock_db(db, date1, date2):
"""Helper function to populate thee mock DB with plays"""
test_plays = [
{"item_id": 1, "created_at": date1},
{"item_id": 1, "created_at": date1},
{"item_id": 1, "created_at": date2},
{"item_id": 2, "created_at": date1},
{"item_id": 2, "created_at": date2},
{"item_id": 3, "created_at": date2},
{"item_id": 3, "created_at": date2},
{"item_id": 3, "created_at": date2},
]

with db.scoped_session() as session:
for i, play_meta in enumerate(test_plays):
play = Play(
id=i,
play_item_id=play_meta.get("item_id"),
created_at=play_meta.get("created_at", datetime.now()),
)
session.add(play)
def format_date(date):
return int(time.mktime(date.timetuple()))


def test_get_plays_metrics(app):
"""Tests that plays metrics can be queried"""

date = datetime(2020, 10, 4).replace(minute=0, second=0, microsecond=0)
date1 = date + timedelta(hours=-1)
date2 = date + timedelta(days=-2)
before_date = date + timedelta(days=-3)

with app.app_context():
db = get_db()

populate_mock_db(db, date1, date2)

args = {"limit": 10, "start_time": before_date, "bucket_size": "hour"}
date = datetime(2020, 10, 4).replace(minute=0, second=0, microsecond=0)
test_entities = {
"tracks": [
{"track_id": 1, "title": "track 1"},
{"track_id": 2, "title": "track 2"},
{"track_id": 3, "title": "track 3"},
],
"plays": [
{"item_id": 1, "created_at": date + timedelta(hours=-1)},
{"item_id": 1, "created_at": date + timedelta(hours=-1)},
{"item_id": 1, "created_at": date + timedelta(days=-2)},
{"item_id": 2, "created_at": date + timedelta(hours=-1)},
{"item_id": 2, "created_at": date + timedelta(days=-2)},
{"item_id": 3, "created_at": date + timedelta(days=-2)},
{"item_id": 3, "created_at": date + timedelta(days=-2)},
{"item_id": 3, "created_at": date + timedelta(days=-2)},
],
}

populate_mock_db(db, test_entities)

args = GetPlayMetricsArgs(
limit=10,
start_time=date + timedelta(days=-3),
bucket_size="hour",
)

with db.scoped_session() as session:
_index_hourly_play_counts(session)
metrics = _get_plays_metrics(session, args)

assert len(metrics) == 2
assert metrics[0]["timestamp"] == format_date(date + timedelta(hours=-1))
assert metrics[0]["count"] == 3
assert metrics[1]["timestamp"] == format_date(date + timedelta(days=-2))
assert metrics[1]["count"] == 5


def test_get_plays_metrics_with_weekly_buckets(app):
"""Tests that plays metrics can be queried with weekly buckets"""
# A Thursday
date = datetime(2020, 10, 1).replace(minute=0, second=0, microsecond=0)
date1 = date + timedelta(hours=-1)
date2 = date + timedelta(days=-2)
before_date = date + timedelta(days=-3)

with app.app_context():
db = get_db()

populate_mock_db(db, date1, date2)

args = {"limit": 10, "start_time": before_date, "bucket_size": "week"}
# A Thursday
date = datetime(2020, 10, 1).replace(minute=0, second=0, microsecond=0)
test_entities = {
"tracks": [
{"track_id": 1, "title": "track 1"},
{"track_id": 2, "title": "track 2"},
{"track_id": 3, "title": "track 3"},
],
"plays": [
{"item_id": 1, "created_at": date + timedelta(hours=-1)},
{"item_id": 1, "created_at": date + timedelta(hours=-1)},
{"item_id": 1, "created_at": date + timedelta(days=-2)},
{"item_id": 2, "created_at": date + timedelta(hours=-1)},
{"item_id": 2, "created_at": date + timedelta(days=-2)},
{"item_id": 3, "created_at": date + timedelta(days=-2)},
{"item_id": 3, "created_at": date + timedelta(days=-2)},
{"item_id": 3, "created_at": date + timedelta(days=-2)},
],
}

populate_mock_db(db, test_entities)

start_time = date + timedelta(days=-3)
args = GetPlayMetricsArgs(
limit=10,
start_time=date + timedelta(days=-3),
bucket_size="week",
)

with db.scoped_session() as session:
_index_hourly_play_counts(session)
metrics = _get_plays_metrics(session, args)

assert len(metrics) == 1
assert metrics[0]["count"] == 8
assert metrics[0]["timestamp"] == format_date(start_time)


def test_get_plays_metrics_with_yearly_buckets(app):
"""Tests that plays metrics can be queried"""

with app.app_context():
db = get_db()

date = datetime(2020, 10, 4).replace(minute=0, second=0, microsecond=0)
test_entities = {
"tracks": [
{"track_id": 1, "title": "track 1"},
{"track_id": 2, "title": "track 2"},
{"track_id": 3, "title": "track 3"},
],
"plays": [
{"item_id": 1, "created_at": date + timedelta(days=-3 * DAYS_IN_A_YEAR)},
{"item_id": 1, "created_at": date + timedelta(days=-3 * DAYS_IN_A_YEAR)},
{"item_id": 3, "created_at": date + timedelta(days=-3 * DAYS_IN_A_YEAR)},
{"item_id": 1, "created_at": date + timedelta(days=-2 * DAYS_IN_A_YEAR)},
{"item_id": 2, "created_at": date + timedelta(days=-2 * DAYS_IN_A_YEAR)},
{"item_id": 3, "created_at": date + timedelta(days=-1 * DAYS_IN_A_YEAR)},
{"item_id": 3, "created_at": date + timedelta(days=-1)},
{"item_id": 3, "created_at": date + timedelta(weeks=-1)},
],
}

populate_mock_db(db, test_entities)

start_time = date + timedelta(
days=-3 * DAYS_IN_A_YEAR - 1
) # -1 extra day to be inclusive
args = GetPlayMetricsArgs(
limit=10,
start_time=start_time,
bucket_size="year",
)

with db.scoped_session() as session:
_index_hourly_play_counts(session)
metrics = _get_plays_metrics(session, args)

assert len(metrics) == 4
assert metrics[0]["count"] == 2
assert metrics[0]["timestamp"] == format_date(date.replace(day=1, month=1))
assert metrics[1]["count"] == 1
assert metrics[1]["timestamp"] == format_date(
(date + timedelta(days=-1 * DAYS_IN_A_YEAR)).replace(day=1, month=1)
)
assert metrics[2]["count"] == 2
assert metrics[2]["timestamp"] == format_date(
(date + timedelta(days=-2 * DAYS_IN_A_YEAR)).replace(day=1, month=1)
)
assert metrics[3]["count"] == 3
assert metrics[3]["timestamp"] == format_date(
(date + timedelta(days=-3 * DAYS_IN_A_YEAR)).replace(day=1, month=1)
)
10 changes: 5 additions & 5 deletions discovery-provider/src/api/v1/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ def get(self):

try:
args["start_time"] = parse_unix_epoch_param(args.get("start_time"), 0)
except:
except Exception:
abort_bad_request_param("start_time", ns)

if args.get("exact") is not None:
Expand Down Expand Up @@ -327,7 +327,7 @@ def get(self):
args["offset"] = 0
try:
args["start_time"] = parse_unix_epoch_param(args.get("start_time"), 0)
except:
except Exception:
abort_bad_request_param("start_time", ns)

app_names = get_app_names(args)
Expand Down Expand Up @@ -386,7 +386,7 @@ def get(self, app_name):

try:
args["start_time"] = parse_unix_epoch_param(args.get("start_time"), 0)
except:
except Exception:
abort_bad_request_param("start_time", ns)

if args.get("bucket_size") is None:
Expand Down Expand Up @@ -429,7 +429,7 @@ def get(self):

try:
args["start_time"] = parse_unix_epoch_param(args.get("start_time"), 0)
except:
except Exception:
abort_bad_request_param("start_time", ns)

if args.get("bucket_size") is None:
Expand Down Expand Up @@ -473,7 +473,7 @@ def get(self):
args["offset"] = 0
try:
args["start_time"] = parse_unix_epoch_param(args.get("start_time"), 0)
except:
except Exception:
abort_bad_request_param("start_time", ns)

genre_metrics = get_genre_metrics(args)
Expand Down
42 changes: 27 additions & 15 deletions discovery-provider/src/queries/get_plays_metrics.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,32 @@
import logging
import time
from typing import TypedDict

from sqlalchemy import desc, func
from src.models import Play
from sqlalchemy.orm.session import Session
from src.models import HourlyPlayCounts
from src.utils import db_session

logger = logging.getLogger(__name__)


def get_plays_metrics(args):
class GetPlayMetricsArgs(TypedDict):
# A date_trunc operation to aggregate timestamps by
bucket_size: int

# The max number of responses to return
start_time: int

# The max number of responses to return
limit: int


def get_plays_metrics(args: GetPlayMetricsArgs):
"""
Returns metrics for play counts
Args:
args: dict The parsed args from the request
args.start_time: date The start of the query
args.limit: number The max number of responses to return
args.bucket_size: string A date_trunc operation to aggregate timestamps by
args: GetPlayMetrics the parsed args from the request
Returns:
Array of dictionaries with the play counts and timestamp
Expand All @@ -26,24 +36,26 @@ def get_plays_metrics(args):
return _get_plays_metrics(session, args)


def _get_plays_metrics(session, args):
def _get_plays_metrics(session: Session, args: GetPlayMetricsArgs):
metrics_query = (
session.query(
func.date_trunc(args.get("bucket_size"), Play.created_at).label(
"timestamp"
),
func.count(Play.id).label("count"),
func.date_trunc(
args.get("bucket_size"), HourlyPlayCounts.hourly_timestamp
).label("timestamp"),
func.sum(HourlyPlayCounts.play_count).label("count"),
)
.filter(HourlyPlayCounts.hourly_timestamp > args.get("start_time"))
.group_by(
func.date_trunc(args.get("bucket_size"), HourlyPlayCounts.hourly_timestamp)
)
.filter(Play.created_at > args.get("start_time"))
.group_by(func.date_trunc(args.get("bucket_size"), Play.created_at))
.order_by(desc("timestamp"))
.limit(args.get("limit"))
)

metrics = metrics_query.all()

metrics = [
{"timestamp": int(time.mktime(m[0].timetuple())), "count": m[1]}
for m in metrics
{"timestamp": int(time.mktime(metric[0].timetuple())), "count": metric[1]}
for metric in metrics
]
return metrics

0 comments on commit 72aa1b3

Please sign in to comment.