Skip to content

Commit

Permalink
Aggregate metrics from other discovery nodes (#1266)
Browse files Browse the repository at this point in the history
* Aggregate metrics from other discovery nodes

* Add comments and small changes

* Rename models, update comments, remove logs

* Add command to spin up discovery node and its dependencies

* Fix lint

* Describe timestamp attributes in new metrics tables

* Parallelize calls to get other discovery node endpoints

* Update command name to start up discovery provider with its dependencies

* Add tests

* Order aggregate route metrics by timestamp asc and fix test

* Merge and persist route and app metrics regardless of new metrics

* Fix month metrics returned

* Fix tests after db query updates

* Use date instead of datetime for db query comparison and add more logs

* Add more logs

* Persist own metrics

Co-authored-by: Saliou Diallo <saliou@audius.co>
Co-authored-by: Raymond Jacobson <ray@audius.co>
  • Loading branch information
3 people committed Mar 10, 2021
1 parent 33b4c6d commit ceaebcc
Show file tree
Hide file tree
Showing 22 changed files with 2,243 additions and 32 deletions.
3 changes: 3 additions & 0 deletions discovery-provider/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,6 @@ cov_html/*
build/

.vscode/

# redis dumps
*_dump
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""Add aggregate metrics tables
Revision ID: c967ae0fcaf6
Revises: 579360c7cbf3
Create Date: 2021-02-24 17:45:45.939947
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = 'c967ae0fcaf6'
down_revision = '5dd6a55bb738'
branch_labels = None
depends_on = None


def upgrade():
op.create_table('aggregate_daily_unique_users_metrics',
sa.Column('id', sa.Integer(), nullable=False, primary_key=True),
sa.Column('count', sa.Integer(), nullable=False),
sa.Column('timestamp', sa.Date(), nullable=False),
sa.Column('created_at', sa.DateTime, nullable=False, default=sa.func.now()),
sa.Column('updated_at', sa.DateTime, nullable=False, onupdate=sa.func.now()),
sa.PrimaryKeyConstraint('id')
)
op.create_table('aggregate_daily_total_users_metrics',
sa.Column('id', sa.Integer(), nullable=False, primary_key=True),
sa.Column('count', sa.Integer(), nullable=False),
sa.Column('timestamp', sa.Date(), nullable=False),
sa.Column('created_at', sa.DateTime, nullable=False, default=sa.func.now()),
sa.Column('updated_at', sa.DateTime, nullable=False, onupdate=sa.func.now()),
sa.PrimaryKeyConstraint('id')
)
op.create_table('aggregate_monthly_unique_users_metrics',
sa.Column('id', sa.Integer(), nullable=False, primary_key=True),
sa.Column('count', sa.Integer(), nullable=False),
sa.Column('timestamp', sa.Date(), nullable=False),
sa.Column('created_at', sa.DateTime, nullable=False, default=sa.func.now()),
sa.Column('updated_at', sa.DateTime, nullable=False, onupdate=sa.func.now()),
sa.PrimaryKeyConstraint('id')
)
op.create_table('aggregate_monthly_total_users_metrics',
sa.Column('id', sa.Integer(), nullable=False, primary_key=True),
sa.Column('count', sa.Integer(), nullable=False),
sa.Column('timestamp', sa.Date(), nullable=False),
sa.Column('created_at', sa.DateTime, nullable=False, default=sa.func.now()),
sa.Column('updated_at', sa.DateTime, nullable=False, onupdate=sa.func.now()),
sa.PrimaryKeyConstraint('id')
)
op.create_table('aggregate_daily_app_name_metrics',
sa.Column('id', sa.Integer(), nullable=False, primary_key=True),
sa.Column('application_name', sa.String(), nullable=False),
sa.Column('count', sa.Integer(), nullable=False),
sa.Column('timestamp', sa.Date(), nullable=False),
sa.Column('created_at', sa.DateTime, nullable=False, default=sa.func.now()),
sa.Column('updated_at', sa.DateTime, nullable=False, onupdate=sa.func.now()),
sa.PrimaryKeyConstraint('id')
)
op.create_table('aggregate_monthly_app_name_metrics',
sa.Column('id', sa.Integer(), nullable=False, primary_key=True),
sa.Column('application_name', sa.String(), nullable=False),
sa.Column('count', sa.Integer(), nullable=False),
sa.Column('timestamp', sa.Date(), nullable=False),
sa.Column('created_at', sa.DateTime, nullable=False, default=sa.func.now()),
sa.Column('updated_at', sa.DateTime, nullable=False, onupdate=sa.func.now()),
sa.PrimaryKeyConstraint('id')
)


def downgrade():
op.drop_table('aggregate_daily_unique_users_metrics')
op.drop_table('aggregate_daily_total_users_metrics')
op.drop_table('aggregate_monthly_unique_users_metrics')
op.drop_table('aggregate_monthly_total_users_metrics')
op.drop_table('aggregate_daily_app_name_metrics')
op.drop_table('aggregate_monthly_app_name_metrics')
9 changes: 9 additions & 0 deletions discovery-provider/src/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from src.utils.config import config_files, shared_config, ConfigIni
from src.utils.ipfs_lib import IPFSClient
from src.tasks import celery_app
from src.utils.redis_metrics import METRICS_INTERVAL

# these global vars will be set in create_celery function
web3endpoint = None
Expand Down Expand Up @@ -325,6 +326,14 @@ def configure_celery(flask_app, celery, test_config=None):
"task": "update_metrics",
"schedule": crontab(minute=0, hour="*")
},
"aggregate_metrics": {
"task": "aggregate_metrics",
"schedule": timedelta(minutes=METRICS_INTERVAL)
},
"synchronize_metrics": {
"task": "synchronize_metrics",
"schedule": crontab(minute=0, hour=1)
},
"update_materialized_views": {
"task": "update_materialized_views",
"schedule": timedelta(seconds=60)
Expand Down
8 changes: 8 additions & 0 deletions discovery-provider/src/api/v1/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ def parse_unix_epoch_param(time, default=0):
return datetime.utcfromtimestamp(default)
return datetime.utcfromtimestamp(time)

def parse_unix_epoch_param_non_utc(time, default=0):
if time is None:
return datetime.fromtimestamp(default)
return datetime.fromtimestamp(time)

def extend_track(track):
track_id = encode_int_id(track["track_id"])
owner_id = encode_int_id(track["owner_id"])
Expand Down Expand Up @@ -230,6 +235,9 @@ def extend_activity(item):
}
return None

def abort_bad_path_param(param, namespace):
namespace.abort(400, "Oh no! Bad path parameter {}.".format(param))

def abort_bad_request_param(param, namespace):
namespace.abort(400, "Oh no! Bad request parameter {}.".format(param))

Expand Down
157 changes: 151 additions & 6 deletions discovery-provider/src/api/v1/metrics.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
import logging
from src.queries.get_genre_metrics import get_genre_metrics
from src.queries.get_plays_metrics import get_plays_metrics
from flask import Flask, Blueprint
from flask_restx import Resource, Namespace, fields, reqparse, inputs
from src.api.v1.helpers import make_response, success_response, to_dict, \
parse_bool_param, parse_unix_epoch_param, abort_bad_request_param
parse_bool_param, parse_unix_epoch_param, parse_unix_epoch_param_non_utc, \
abort_bad_request_param, abort_bad_path_param, format_limit
from .models.metrics import route_metric, app_name_metric, app_name, plays_metric, \
genre_metric, route_trailing_metric, app_name_trailing_metric
from src.queries.get_route_metrics import get_route_metrics
from src.queries.get_app_name_metrics import get_app_name_metrics
from src.queries.get_route_metrics import get_route_metrics, get_aggregate_route_metrics, \
get_historical_route_metrics
from src.queries.get_app_name_metrics import get_app_name_metrics, get_aggregate_app_metrics, \
get_historical_app_metrics
from src.queries.get_app_names import get_app_names
from src.queries.get_trailing_metrics import get_monthly_trailing_route_metrics, \
get_trailing_app_metrics
get_trailing_app_metrics, get_aggregate_route_metrics_trailing_month
from src.utils.redis_cache import cache

from src.utils.redis_metrics import get_redis_route_metrics, get_redis_app_metrics, \
get_aggregate_metrics_info
logger = logging.getLogger(__name__)


Expand All @@ -37,6 +40,148 @@
metrics_route_parser.add_argument('version', required=False, action='append')

valid_date_buckets = ['hour', 'day', 'week', 'month', 'quarter', 'year', 'decade', 'century']
valid_bucket_sizes = {
'week': ['day'],
'month': ['day', 'week'],
'all_time': ['month', 'week']
}

@ns.route("/routes/cached", doc=False)
class CachedRouteMetrics(Resource):
@ns.doc(
id="""Cached Route Metrics""",
params={
'start_time': 'Start Time in Unix Epoch'
},
responses={
200: 'Success',
400: 'Bad request',
500: 'Server error'
}
)
@cache(ttl_sec=5)
def get(self):
args = metrics_route_parser.parse_args()
logger.info(f"getting cached route metrics at {args.get('start_time')} before parsing")
start_time = parse_unix_epoch_param_non_utc(args.get("start_time"))
logger.info(f"getting cached route metrics at {start_time} UTC")
metrics = get_redis_route_metrics(start_time)
response = success_response(metrics)
return response

@ns.route("/apps/cached", doc=False)
class CachedAppMetrics(Resource):
@ns.doc(
id="""Cached App Metrics""",
params={
'start_time': 'Start Time in Unix Epoch'
},
responses={
200: 'Success',
400: 'Bad request',
500: 'Server error'
}
)
@cache(ttl_sec=5)
def get(self):
args = metrics_route_parser.parse_args()
logger.info(f"getting cached app metrics at {args.get('start_time')} before parsing")
start_time = parse_unix_epoch_param_non_utc(args.get("start_time"))
logger.info(f"getting cached app metrics at {start_time.now()} UTC")
metrics = get_redis_app_metrics(start_time)
response = success_response(metrics)
return response

@ns.route("/aggregates/info", doc=False)
class AggregateMetricsInfo(Resource):
@cache(ttl_sec=5)
def get(self):
"""Gets aggregate metrics information"""
metrics_info = get_aggregate_metrics_info()
response = success_response(metrics_info)
return response

@ns.route("/aggregates/historical", doc=False)
class AggregateHistoricalMetrics(Resource):
@cache(ttl_sec=30 * 60)
def get(self):
"""Gets historical aggregate metrics"""
historical_metrics = {
'routes': get_historical_route_metrics(),
'apps': get_historical_app_metrics()
}
response = success_response(historical_metrics)
return response

@ns.route("/aggregates/routes/trailing/month", doc=False)
class AggregateRouteMetricsTrailingMonth(Resource):
@cache(ttl_sec=30 * 60)
def get(self):
"""Gets aggregated route metrics for the last trailing 30 days"""
metrics = get_aggregate_route_metrics_trailing_month()
response = success_response(metrics)
return response

aggregate_route_metrics_parser = reqparse.RequestParser()
aggregate_route_metrics_parser.add_argument('bucket_size', required=False)

@ns.route("/aggregates/routes/<string:time_range>", doc=False)
class AggregateRouteMetrics(Resource):
@ns.doc(
id="""Aggregate Route Metrics""",
params={
'bucket_size': 'Grouping of route metrics (e.g. by day, week, or month) for given time range'
},
responses={
200: 'Success',
400: 'Bad request',
500: 'Server error'
}
)
@cache(ttl_sec=30 * 60)
def get(self, time_range):
"""Gets aggregated route metrics based on time range and bucket size"""
if time_range not in valid_bucket_sizes:
abort_bad_path_param('time_range', ns)

args = aggregate_route_metrics_parser.parse_args()
valid_buckets = valid_bucket_sizes[time_range]
bucket_size = args.get("bucket_size") or valid_buckets[0]

if bucket_size not in valid_buckets:
abort_bad_request_param('bucket_size', ns)

metrics = get_aggregate_route_metrics(time_range, bucket_size)
response = success_response(metrics)
return response

aggregate_app_metrics_parser = reqparse.RequestParser()
aggregate_app_metrics_parser.add_argument('limit', required=False)

@ns.route("/aggregates/apps/<string:time_range>", doc=False)
class AggregateAppMetricsTrailing(Resource):
@ns.doc(
id="""Aggregate App Metrics""",
params={
'limit': 'Maximum number of apps to return'
},
responses={
200: 'Success',
400: 'Bad request',
500: 'Server error'
}
)
@cache(ttl_sec=30 * 60)
def get(self, time_range):
"""Gets aggregated app metrics based on time range and bucket size"""
if time_range not in valid_bucket_sizes:
abort_bad_path_param('time_range', ns)

args = aggregate_app_metrics_parser.parse_args()
limit = format_limit(args, max_limit=100)
metrics = get_aggregate_app_metrics(time_range, limit)
response = success_response(metrics)
return response

@ns.route("/routes", doc=False)
class RouteMetrics(Resource):
Expand Down

0 comments on commit ceaebcc

Please sign in to comment.