Skip to content

Commit

Permalink
FIx aggregate metrics persistence and synchronization (#1320)
Browse files Browse the repository at this point in the history
* Fix type and clear metrics locks

* Fix app metrics query

* Fix historical metrics dictionary key

Co-authored-by: Saliou Diallo <saliou@audius.co>
  • Loading branch information
sddioulde and Saliou Diallo committed Mar 18, 2021
1 parent 59d220c commit bddb9ea
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 17 deletions.
2 changes: 2 additions & 0 deletions discovery-provider/src/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,8 @@ def configure_celery(flask_app, celery, test_config=None):
redis_inst.delete("update_play_count_lock")
redis_inst.delete("ipld_blacklist_lock")
redis_inst.delete("update_discovery_lock")
redis_inst.delete("aggregate_metrics_lock")
redis_inst.delete("synchronize_metrics_lock")
logger.info('Redis instance initialized!')

# Initialize custom task context with database object
Expand Down
4 changes: 2 additions & 2 deletions discovery-provider/src/queries/get_trailing_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ def _get_aggregate_route_metrics_trailing_month(session):
logger.info(f"trailing month total count: {total_count}")

return {
"unique_count": unique_count[0],
"total_count": total_count[0]
'unique_count': unique_count[0],
'total_count': total_count[0]
}

def get_monthly_trailing_route_metrics():
Expand Down
16 changes: 8 additions & 8 deletions discovery-provider/src/queries/update_historical_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ def update_historical_daily_route_metrics(db, metrics):
.first()
)
if day_unique_record:
day_unique_record.count = values['unique']
day_unique_record.count = values['unique_count']
else:
day_unique_record = AggregateDailyUniqueUsersMetrics(
timestamp=day,
count=values['unique']
count=values['unique_count']
)
session.add(day_unique_record)

Expand All @@ -28,11 +28,11 @@ def update_historical_daily_route_metrics(db, metrics):
.first()
)
if day_total_record:
day_total_record.count = values['total']
day_total_record.count = values['total_count']
else:
day_total_record = AggregateDailyTotalUsersMetrics(
timestamp=day,
count=values['total']
count=values['total_count']
)
session.add(day_total_record)

Expand All @@ -45,11 +45,11 @@ def update_historical_monthly_route_metrics(db, metrics):
.first()
)
if month_unique_record:
month_unique_record.count = values['unique']
month_unique_record.count = values['unique_count']
else:
month_unique_record = AggregateMonthlyUniqueUsersMetrics(
timestamp=month,
count=values['unique']
count=values['unique_count']
)
session.add(month_unique_record)

Expand All @@ -59,11 +59,11 @@ def update_historical_monthly_route_metrics(db, metrics):
.first()
)
if month_total_record:
month_total_record.count = values['total']
month_total_record.count = values['total_count']
else:
month_total_record = AggregateMonthlyTotalUsersMetrics(
timestamp=month,
count=values['total']
count=values['total_count']
)
session.add(month_total_record)

Expand Down
7 changes: 4 additions & 3 deletions discovery-provider/src/tasks/index_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ def consolidate_metrics_from_other_nodes(self, db, redis):
merge_route_metrics(new_personal_route_metrics, end_time, db)
merge_app_metrics(new_personal_app_metrics, end_time, db)

# Merge & persiste metrics for other nodes
# Merge & persist metrics for other nodes
for node in all_other_nodes:
start_time_str = visited_node_timestamps[node] if node in visited_node_timestamps else one_iteration_ago_str
start_time_obj = datetime.strptime(start_time_str, datetime_format_secondary)
Expand Down Expand Up @@ -292,8 +292,8 @@ def update_route_metrics_count(my_metrics, other_metrics):
for timestamp, values in other_metrics.items():
if timestamp in my_metrics:
my_metrics[timestamp] = {
'unique': max(values['unique'], my_metrics[timestamp]['unique']),
'total': max(values['total'], my_metrics[timestamp]['total'])
'unique_count': max(values['unique_count'], my_metrics[timestamp]['unique_count']),
'total_count': max(values['total_count'], my_metrics[timestamp]['total_count'])
}
else:
my_metrics[timestamp] = values
Expand Down Expand Up @@ -321,6 +321,7 @@ def synchronize_all_node_metrics(self, db):
monthly_app_metrics = {}
for node in get_all_other_nodes():
historical_metrics = get_historical_metrics(node)
logger.info(f"got historical metrics from {node}: {historical_metrics}")
if historical_metrics:
update_route_metrics_count(daily_route_metrics, historical_metrics['routes']['daily'])
update_route_metrics_count(monthly_route_metrics, historical_metrics['routes']['monthly'])
Expand Down
8 changes: 4 additions & 4 deletions discovery-provider/src/utils/redis_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ def persist_app_metrics(db, day, month, app_count):
for application_name, count in app_count.items():
day_record = (
session.query(AggregateDailyAppNameMetrics)
.filter(AggregateDailyAppNameMetrics.timestamp == day and \
AggregateDailyAppNameMetrics.application_name == application_name)
.filter(AggregateDailyAppNameMetrics.timestamp == day)
.filter(AggregateDailyAppNameMetrics.application_name == application_name)
.first()
)
if day_record:
Expand All @@ -184,8 +184,8 @@ def persist_app_metrics(db, day, month, app_count):

month_record = (
session.query(AggregateMonthlyAppNameMetrics)
.filter(AggregateMonthlyAppNameMetrics.timestamp == month and \
AggregateMonthlyAppNameMetrics.application_name == application_name)
.filter(AggregateMonthlyAppNameMetrics.timestamp == month)
.filter(AggregateMonthlyAppNameMetrics.application_name == application_name)
.first()
)
if month_record:
Expand Down

0 comments on commit bddb9ea

Please sign in to comment.