Skip to content

Commit

Permalink
Put back sync queue time estimation, pulled from RabbitMQ management API
Browse files Browse the repository at this point in the history
  • Loading branch information
cpfair committed Jun 17, 2015
1 parent f58978f commit 8703b58
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 10 deletions.
38 changes: 29 additions & 9 deletions stats_cron.py
@@ -1,5 +1,7 @@
from tapiriik.database import db, close_connections
from tapiriik.settings import RABBITMQ_USER_QUEUE_STATS_URL
from datetime import datetime, timedelta
import requests

# total distance synced
distanceSyncedAggr = list(db.sync_stats.aggregate([{"$group": {"_id": None, "total": {"$sum": "$Distance"}}}]))
Expand All @@ -21,13 +23,21 @@
lastHourDistanceSynced = lastHourDistanceSyncedAggr[0]["total"]
else:
lastHourDistanceSynced = 0
# sync wait time, to save making 1 query/sec-user-browser
queueHead = list(db.users.find({"QueuedAt": {"$lte": datetime.utcnow()}, "SynchronizationWorker": None, "SynchronizationHostRestriction": {"$exists": False}}, {"QueuedAt": 1}).sort("QueuedAt").limit(10))
queueHeadTime = timedelta(0)
if len(queueHead):
for queuedUser in queueHead:
queueHeadTime += datetime.utcnow() - queuedUser["QueuedAt"]
queueHeadTime /= len(queueHead)

# How long users are taking to get pushed into rabbitMQ
# Once called "queueHead" as, a very long time ago, this _was_ user queuing
enqueueHead = list(db.users.find({"QueuedAt": {"$lte": datetime.utcnow()}, "SynchronizationWorker": None, "SynchronizationHostRestriction": {"$exists": False}}, {"QueuedAt": 1}).sort("QueuedAt").limit(10))
enqueueTime = timedelta(0)
if len(enqueueHead):
for pendingEnqueueUser in enqueueHead:
enqueueTime += datetime.utcnow() - pendingEnqueueUser["QueuedAt"]
enqueueTime /= len(enqueueHead)

# Query rabbitMQ to get main queue throughput and length
rmq_user_queue_stats = requests.get(RABBITMQ_USER_QUEUE_STATS_URL).json()
rmq_user_queue_length = rmq_user_queue_stats["messages_ready_details"]["avg"]
rmq_user_queue_rate = rmq_user_queue_stats["message_stats"]["ack_details"]["avg_rate"]
rmq_user_queue_wait_time = rmq_user_queue_length / rmq_user_queue_rate

# sync time utilization
db.sync_worker_stats.remove({"Timestamp": {"$lt": datetime.utcnow() - timedelta(hours=1)}}) # clean up old records
Expand Down Expand Up @@ -86,10 +96,20 @@
"ErrorUsers": usersWithErrors,
"TotalErrors": totalErrors,
"SyncTimeUsed": timeUsed,
"SyncQueueHeadTime": queueHeadTime.total_seconds()
"SyncEnqueueTime": enqueueTime.total_seconds(),
"SyncQueueHeadTime": rmq_user_queue_wait_time
})

db.stats.update({}, {"$set": {"TotalDistanceSynced": distanceSynced, "LastDayDistanceSynced": lastDayDistanceSynced, "LastHourDistanceSynced": lastHourDistanceSynced, "TotalSyncTimeUsed": timeUsed, "AverageSyncDuration": avgSyncTime, "LastHourSynchronizationCount": totalSyncOps, "QueueHeadTime": queueHeadTime.total_seconds(), "Updated": datetime.utcnow()}}, upsert=True)
db.stats.update({}, {"$set": {
"TotalDistanceSynced": distanceSynced,
"LastDayDistanceSynced": lastDayDistanceSynced,
"LastHourDistanceSynced": lastHourDistanceSynced,
"TotalSyncTimeUsed": timeUsed,
"AverageSyncDuration": avgSyncTime,
"LastHourSynchronizationCount": totalSyncOps,
"EnqueueTime": enqueueTime.total_seconds(),
"QueueHeadTime": rmq_user_queue_wait_time,
"Updated": datetime.utcnow() }}, upsert=True)


def aggregateCommonErrors():
Expand Down
2 changes: 2 additions & 0 deletions tapiriik/settings.py
Expand Up @@ -312,6 +312,8 @@

RABBITMQ_BROKER_URL = "amqp://guest@localhost//"

RABBITMQ_USER_QUEUE_STATS_URL = "http://guest:guest@localhost:15672/api/queues/%2F/tapiriik-users?lengths_age=3600&lengths_incr=60&msg_rates_age=3600&msg_rates_incr=60"

GARMIN_CONNECT_USER_WATCH_ACCOUNTS = {}

from .local_settings import *
2 changes: 1 addition & 1 deletion tapiriik/web/views/sync.py
Expand Up @@ -45,7 +45,7 @@ def err_msg(err):
"SynchronizationWaitTime": None, # I wish.
"Hash": syncHash}

if stats and "QueueHeadTime" in stats and False: # Disabled till I fix users getting stuck in the queue
if stats and "QueueHeadTime" in stats:
sync_status_dict["SynchronizationWaitTime"] = (stats["QueueHeadTime"] - (datetime.utcnow() - req.user["NextSynchronization"]).total_seconds()) if "NextSynchronization" in req.user and req.user["NextSynchronization"] is not None else None

return HttpResponse(json.dumps(sync_status_dict), mimetype="application/json")
Expand Down

0 comments on commit 8703b58

Please sign in to comment.