Skip to content

Commit

Permalink
Jowlee metrics (#737)
Browse files Browse the repository at this point in the history
* Added redis metrics decorator

* Added metrics db model & migration

* Added metrics indexing redis to db

* Added metrics routes

* Updated comments and formats

* Use scan in place of keys redis search

* Clean up comments
  • Loading branch information
jowlee committed Aug 7, 2020
1 parent 5ef638c commit f126e2c
Show file tree
Hide file tree
Showing 21 changed files with 957 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Create Metrics Routes Table
Revision ID: f54b539b0527
Revises: 776ca72b16db
Create Date: 2020-07-31 09:41:14.132668
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = 'f54b539b0527'
down_revision = '776ca72b16db'
branch_labels = None
depends_on = None


def upgrade():
op.create_table('route_metrics',
sa.Column('route_path', sa.String(), nullable=False),
sa.Column('version', sa.String(), nullable=False),
sa.Column('query_string', sa.String(), nullable=False, default=''),
sa.Column('count', sa.Integer(), nullable=False),
sa.Column('timestamp', sa.DateTime, nullable=False, default=sa.func.now()),
sa.Column('created_at', sa.DateTime, nullable=False, default=sa.func.now()),
sa.Column('updated_at', sa.DateTime, nullable=False, onupdate=sa.func.now()),
sa.PrimaryKeyConstraint('route_path', 'query_string', 'timestamp')
)
op.create_table('app_name_metrics',
sa.Column('application_name', sa.String(), nullable=False),
sa.Column('count', sa.Integer(), nullable=False),
sa.Column('timestamp', sa.DateTime, nullable=False, default=sa.func.now()),
sa.Column('created_at', sa.DateTime, nullable=False, default=sa.func.now()),
sa.Column('updated_at', sa.DateTime, nullable=False, onupdate=sa.func.now()),
sa.PrimaryKeyConstraint('application_name', 'timestamp')
)

def downgrade():
op.drop_table('route_metrics')
op.drop_table('app_name_metrics')
20 changes: 13 additions & 7 deletions discovery-provider/src/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from sqlalchemy_utils import database_exists, create_database
from sqlalchemy import exc
from celery import Task
from celery.schedules import timedelta
from celery.schedules import timedelta, crontab

import redis
from flask import Flask
Expand Down Expand Up @@ -46,7 +46,8 @@


def initContracts():
registry_address = web3.toChecksumAddress(shared_config["contracts"]["registry"])
registry_address = web3.toChecksumAddress(
shared_config["contracts"]["registry"])
registry_instance = web3.eth.contract(
address=registry_address, abi=abi_values["Registry"]["abi"]
)
Expand Down Expand Up @@ -115,8 +116,6 @@ def initContracts():
)




def create_app(test_config=None):
return create(test_config)

Expand Down Expand Up @@ -248,7 +247,8 @@ def default(self, o):

def configure_celery(flask_app, celery, test_config=None):
database_url = shared_config["db"]["url"]
engine_args_literal = ast.literal_eval(shared_config["db"]["engine_args_literal"])
engine_args_literal = ast.literal_eval(
shared_config["db"]["engine_args_literal"])
redis_url = shared_config["redis"]["url"]

if test_config is not None:
Expand All @@ -258,7 +258,8 @@ def configure_celery(flask_app, celery, test_config=None):

# Update celery configuration
celery.conf.update(
imports=["src.tasks.index", "src.tasks.index_blacklist", "src.tasks.index_cache", "src.tasks.index_plays"],
imports=["src.tasks.index", "src.tasks.index_blacklist",
"src.tasks.index_cache", "src.tasks.index_plays", "src.tasks.index_metrics"],
beat_schedule={
"update_discovery_provider": {
"task": "update_discovery_provider",
Expand All @@ -275,6 +276,10 @@ def configure_celery(flask_app, celery, test_config=None):
"update_play_count": {
"task": "update_play_count",
"schedule": timedelta(seconds=5)
},
"update_metrics": {
"task": "update_metrics",
"schedule": crontab(minute=0, hour="*")
}
},
task_serializer="json",
Expand All @@ -288,7 +293,8 @@ def configure_celery(flask_app, celery, test_config=None):

# Initialize IPFS client for celery task context
gateway_addrs = shared_config["ipfs"]["gateway_hosts"].split(',')
gateway_addrs.append(shared_config["discprov"]["user_metadata_service_url"])
gateway_addrs.append(
shared_config["discprov"]["user_metadata_service_url"])
logger.warning(f"__init__.py | {gateway_addrs}")
ipfs_client = IPFSClient(
shared_config["ipfs"]["host"], shared_config["ipfs"]["port"], gateway_addrs
Expand Down
2 changes: 2 additions & 0 deletions discovery-provider/src/api/v1/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from src.api.v1.users import ns as users_ns
from src.api.v1.playlists import ns as playlists_ns
from src.api.v1.tracks import ns as tracks_ns
from src.api.v1.metrics import ns as metrics_ns
from src.api.v1.models.users import ns as models_ns

bp = Blueprint('api_v1', __name__, url_prefix="/v1")
Expand All @@ -11,3 +12,4 @@
api_v1.add_namespace(users_ns)
api_v1.add_namespace(playlists_ns)
api_v1.add_namespace(tracks_ns)
api_v1.add_namespace(metrics_ns)
4 changes: 4 additions & 0 deletions discovery-provider/src/api/v1/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ def make_response(name, namespace, modelType):
"data": modelType,
})

def to_dict(multi_dict):
"""Converts a multi dict into a dict where only list entries are not flat"""
return {k: v if len(v) > 1 else v[0] for (k, v) in multi_dict.to_dict(flat=False).items()}


search_parser = reqparse.RequestParser()
search_parser.add_argument('query', required=True)
Expand Down
93 changes: 93 additions & 0 deletions discovery-provider/src/api/v1/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import logging # pylint: disable=C0302
from datetime import datetime
from flask import Flask, Blueprint
from flask_restx import Resource, Namespace, fields, reqparse
from src import api_helpers
from src.api.v1.helpers import make_response, success_response, to_dict
from .models.metrics import route_metric, app_name_metric, app_name
from src.queries.get_route_metrics import get_route_metrics
from src.queries.get_app_name_metrics import get_app_name_metrics
from src.queries.get_app_names import get_app_names

logger = logging.getLogger(__name__)

ns = Namespace('metrics', description='Metrics related operations')

route_metrics_response = make_response("metrics_reponse", ns, fields.List(fields.Nested(route_metric)))
app_name_response = make_response("app_name_response", ns, fields.List(fields.Nested(app_name)))
app_name_metrics_response = make_response("app_name_metrics_response", ns, fields.List(fields.Nested(app_name_metric)))

metrics_route_parser = reqparse.RequestParser()
metrics_route_parser.add_argument('path', required=True)
metrics_route_parser.add_argument('query_string', required=False)
metrics_route_parser.add_argument('start_time', required=True, type=int)
metrics_route_parser.add_argument('limit', required=False, type=int)
metrics_route_parser.add_argument('version', required=False, action='append')

@ns.route("/routes")
class RouteMetrics(Resource):
@ns.expect(metrics_route_parser)
@ns.marshal_with(route_metrics_response)
def get(self):
"""Get the route metrics"""
args = metrics_route_parser.parse_args()
if args.get('limit') is None:
args['limit'] = 48
else:
args['limit'] = min(args.get('limit'), 48)
try:
args['start_time'] = datetime.utcfromtimestamp(args['start_time'])
except:
return api_helpers.error_response('Poorly formated start_time parameter', 400)

route_metrics = get_route_metrics(args)
response = success_response(route_metrics)
return response


metrics_app_name_list_parser = reqparse.RequestParser()
metrics_app_name_list_parser.add_argument('limit', required=False, type=int)
metrics_app_name_list_parser.add_argument('offset', required=False, type=int)

@ns.route("/app_name")
class AppNameListMetrics(Resource):
@ns.expect(metrics_app_name_list_parser)
@ns.marshal_with(app_name_response)
def get(self):
"""List all the app names"""
args = metrics_app_name_list_parser.parse_args()
if args.get('limit') is None:
args['limit'] = 100
else:
args['limit'] = min(args.get('limit'), 100)
if args.get('offset') is None:
args['offset'] = 0

app_names = get_app_names(args)
response = success_response(app_names)
return response


metrics_app_name_parser = reqparse.RequestParser()
metrics_app_name_parser.add_argument('start_time', required=True, type=int)
metrics_app_name_parser.add_argument('limit', required=False, type=int)

@ns.route("/app_name/<string:app_name>")
class AppNameMetrics(Resource):
@ns.expect(metrics_app_name_parser)
@ns.marshal_with(app_name_metrics_response)
def get(self, app_name):
"""Get the app name metrics"""
args = metrics_app_name_parser.parse_args()
if args.get('limit') is None:
args['limit'] = 48
else:
args['limit'] = min(args.get('limit'), 48)
try:
args['start_time'] = datetime.utcfromtimestamp(args['start_time'])
except:
return api_helpers.error_response('Poorly formated start_time parameter', 400)
app_name_metrics = get_app_name_metrics(app_name, args)
response = success_response(app_name_metrics)
return response

17 changes: 17 additions & 0 deletions discovery-provider/src/api/v1/models/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from flask_restx import fields
from .common import ns

route_metric = ns.model('route_metric', {
"route": fields.String,
"timestamp": fields.String,
"count": fields.Integer
})

app_name_metric = ns.model('app_name_metric', {
"timestamp": fields.String,
"count": fields.Integer
})

app_name = ns.model('app_name', {
"name": fields.String
})
4 changes: 4 additions & 0 deletions discovery-provider/src/api/v1/playlists.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from .models.tracks import track
from src.queries.search_queries import SearchKind, search
from src.utils.redis_cache import cache
from src.utils.redis_metrics import record_metrics

logger = logging.getLogger(__name__)

Expand All @@ -18,6 +19,7 @@

@ns.route("/<string:playlist_id>")
class Playlist(Resource):
@record_metrics
@ns.marshal_with(playlists_response)
@cache(ttl_sec=5)
def get(self, playlist_id):
Expand All @@ -33,6 +35,7 @@ def get(self, playlist_id):

@ns.route("/<string:playlist_id>/tracks")
class PlaylistTracks(Resource):
@record_metrics
@ns.marshal_with(playlist_tracks_response)
@cache(ttl_sec=5)
def get(self, playlist_id):
Expand All @@ -49,6 +52,7 @@ def get(self, playlist_id):

@ns.route("/search")
class PlaylistSearchResult(Resource):
@record_metrics
@ns.marshal_with(playlist_search_result)
@ns.expect(search_parser)
@cache(ttl_sec=5)
Expand Down
5 changes: 5 additions & 0 deletions discovery-provider/src/api/v1/tracks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from src.utils.config import shared_config
from flask.json import dumps
from src.utils.redis_cache import cache
from src.utils.redis_metrics import record_metrics

logger = logging.getLogger(__name__)
ns = Namespace('tracks', description='Track related operations')
Expand All @@ -23,6 +24,7 @@

@ns.route('/<string:track_id>')
class Track(Resource):
@record_metrics
@ns.marshal_with(track_response)
@cache(ttl_sec=5)
def get(self, track_id):
Expand All @@ -38,6 +40,7 @@ def get(self, track_id):

@ns.route("/<string:track_id>/stream")
class TrackStream(Resource):
@record_metrics
@cache(ttl_sec=5)
def get(self, track_id):
"""Redirect to track mp3"""
Expand All @@ -61,6 +64,7 @@ def get(self, track_id):

@ns.route("/search")
class TrackSearchResult(Resource):
@record_metrics
@ns.marshal_with(track_search_result)
@ns.expect(search_parser)
@cache(ttl_sec=60)
Expand All @@ -82,6 +86,7 @@ def get(self):

@ns.route("/trending")
class Trending(Resource):
@record_metrics
@ns.marshal_with(tracks_response)
@cache(ttl_sec=30 * 60)
def get(self):
Expand Down
5 changes: 5 additions & 0 deletions discovery-provider/src/api/v1/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from src.api.v1.helpers import abort_not_found, decode_with_abort, extend_favorite, extend_track, extend_user, make_response, search_parser, success_response
from .models.tracks import track
from src.utils.redis_cache import cache
from src.utils.redis_metrics import record_metrics

logger = logging.getLogger(__name__)

Expand All @@ -18,6 +19,7 @@
user_response = make_response("user_response", ns, fields.Nested(user_model))
@ns.route("/<string:user_id>")
class User(Resource):
@record_metrics
@ns.marshal_with(user_response)
@cache(ttl_sec=5)
def get(self, user_id):
Expand All @@ -33,6 +35,7 @@ def get(self, user_id):
tracks_response = make_response("tracks_response", ns, fields.List(fields.Nested(track)))
@ns.route("/<string:user_id>/tracks")
class TrackList(Resource):
@record_metrics
@ns.marshal_with(tracks_response)
@cache(ttl_sec=5)
def get(self, user_id):
Expand All @@ -48,6 +51,7 @@ def get(self, user_id):
favorites_response = make_response("favorites_response", ns, fields.List(fields.Nested(favorite)))
@ns.route("/<string:user_id>/favorites")
class FavoritedTracks(Resource):
@record_metrics
@ns.marshal_with(favorites_response)
@cache(ttl_sec=5)
def get(self, user_id):
Expand All @@ -61,6 +65,7 @@ def get(self, user_id):

@ns.route("/search")
class UserSearchResult(Resource):
@record_metrics
@ns.marshal_with(user_search_result)
@ns.expect(search_parser)
def get(self):
Expand Down

0 comments on commit f126e2c

Please sign in to comment.