Skip to content

Commit

Permalink
Merge pull request #54 from commonknowledge/feat/sentry-metrics
Browse files Browse the repository at this point in the history
Use Sentry to record job performance metrics
  • Loading branch information
joaquimds committed Jun 6, 2024
2 parents de1e145 + 71d5b98 commit 3ada163
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 112 deletions.
5 changes: 4 additions & 1 deletion .env-example
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,7 @@ TEST_ACTIONNETWORK_MEMBERLIST_API_KEY="..."

SENTRY_DSN="..."
ENVIRONMENT="development"
ENCRYPTION_SECRET_KEY=""

CRYPTOGRAPHY_KEY=somemadeupcryptographickeywhichshouldbereplaced
CRYPTOGRAPHY_SALT=somesaltthatshouldbereplaced
ENCRYPTION_SECRET_KEY=somemadeupcryptographickeywhichshouldbereplaced
11 changes: 8 additions & 3 deletions hub/graphql/mutations.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import logging
import uuid
from typing import List, Optional
Expand Down Expand Up @@ -191,11 +192,15 @@ def get_or_create_organisation_for_user(info: Info, org=None):

@strawberry_django.mutation(extensions=[IsAuthenticated()])
async def import_all(external_data_source_id: str) -> ExternalDataSourceAction:
data_source = await models.ExternalDataSource.objects.aget(
id=external_data_source_id
data_source: models.ExternalDataSource = (
await models.ExternalDataSource.objects.aget(id=external_data_source_id)
)
request_id = str(uuid.uuid4())
await data_source.schedule_import_all(request_id=request_id)
requested_at = datetime.datetime.now(datetime.timezone.utc).isoformat()

await data_source.schedule_import_all(
requested_at=requested_at, request_id=request_id
)
return ExternalDataSourceAction(id=request_id, external_data_source=data_source)


Expand Down
22 changes: 19 additions & 3 deletions hub/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from pyairtable import Base as AirtableBase
from pyairtable import Table as AirtableTable
from pyairtable.models.schema import TableSchema as AirtableTableSchema
from sentry_sdk import metrics
from strawberry.dataloader import DataLoader
from wagtail.admin.panels import FieldPanel, ObjectList, TabbedInterface
from wagtail.images.models import AbstractImage, AbstractRendition, Image
Expand Down Expand Up @@ -1155,13 +1156,16 @@ def get_scheduled_batch_job_progress(self, parent_job: ProcrastinateJob):
)
+ 1
)
remaining = total - done

time_started = (
ProcrastinateEvent.objects.filter(job_id=parent_job.id)
.order_by("at")
.first()
.at.replace(tzinfo=pytz.utc)
)

remaining = total - done

time_so_far = datetime.now(pytz.utc) - time_started
duration_per_record = time_so_far / done
time_remaining = duration_per_record * remaining
Expand Down Expand Up @@ -1827,9 +1831,12 @@ async def deferred_refresh_all(
)

members = await external_data_source.fetch_all()
member_count = 0
batches = batched(members, settings.IMPORT_UPDATE_ALL_BATCH_SIZE)
for batch in batches:
member_count += len(batch)
await external_data_source.schedule_refresh_many(batch, request_id)
metrics.distribution(key="update_rows_requested", value=member_count)

@classmethod
async def deferred_refresh_webhooks(cls, external_data_source_id: str):
Expand Down Expand Up @@ -1864,11 +1871,14 @@ async def deferred_import_all(
)

members = await external_data_source.fetch_all()
member_count = 0
batches = batched(members, settings.IMPORT_UPDATE_ALL_BATCH_SIZE)
for batch in batches:
member_count += len(batch)
await external_data_source.schedule_import_many(
batch, request_id=request_id
)
metrics.distribution(key="import_rows_requested", value=member_count)

async def schedule_refresh_one(self, member) -> int:
if not self.allow_updates:
Expand Down Expand Up @@ -1961,14 +1971,20 @@ async def schedule_import_many(self, members: list, request_id: str = None) -> i
except (UniqueViolation, IntegrityError):
pass

async def schedule_import_all(self, request_id: str = None) -> int:
async def schedule_import_all(
self, requested_at: str, request_id: str = None
) -> int:
try:
return await import_all.configure(
# Dedupe `import_all` jobs for the same config
# https://procrastinate.readthedocs.io/en/stable/howto/queueing_locks.html
queueing_lock=f"import_all_{str(self.id)}",
schedule_in={"seconds": settings.SCHEDULED_UPDATE_SECONDS_DELAY},
).defer_async(external_data_source_id=str(self.id), request_id=request_id)
).defer_async(
external_data_source_id=str(self.id),
requested_at=requested_at,
request_id=request_id,
)
except (UniqueViolation, IntegrityError):
pass

Expand Down
60 changes: 60 additions & 0 deletions hub/tasks.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,68 @@
from __future__ import annotations

import datetime
import functools
import os

from django.conf import settings
from django.db.models import Count, Q
from django.core import management

from procrastinate.contrib.django import app
from procrastinate.contrib.django.models import ProcrastinateJob
from sentry_sdk import metrics


def telemetry_task(func):
task_name = func.__name__
user_cpu_time_metric = f"task.{task_name}.user_cpu_time"
system_cpu_time_metric = f"task.{task_name}.system_cpu_time"
elapsed_time_metric = f"task.{task_name}.elapsed_time"
percentage_failed_metric = f"task.{task_name}.percentage_failed"

@functools.wraps(func)
async def wrapper(*args, **kwargs):
# Get times before task execution
start_time = datetime.datetime.now(datetime.timezone.utc)
cpu_start = os.times()

try:
result = await func(*args, **kwargs)

# Get CPU time after task execution
end_time = datetime.datetime.now(datetime.timezone.utc)
cpu_end = os.times()

# Calculate the CPU time used during the task
user_cpu_time_used = cpu_end.user - cpu_start.user
system_cpu_time_used = cpu_end.system - cpu_start.system
elapsed_time = end_time - start_time

metrics.distribution(key=user_cpu_time_metric, value=user_cpu_time_used)
metrics.distribution(key=system_cpu_time_metric, value=system_cpu_time_used)
metrics.distribution(
key=elapsed_time_metric,
value=elapsed_time.total_seconds(),
unit="seconds",
)

return result
finally:
counts = await ProcrastinateJob.objects.aaggregate(
total=Count("id"), failed=Count("id", filter=Q(status="failed"))
)
percentage_failed = (
counts["failed"] * 100 / counts["total"] if counts["total"] else 0
)
metrics.gauge(
key=percentage_failed_metric, value=round(percentage_failed, 2)
)

return wrapper


@app.task(queue="external_data_sources")
@telemetry_task
async def refresh_one(external_data_source_id: str, member):
from hub.models import ExternalDataSource

Expand All @@ -16,6 +72,7 @@ async def refresh_one(external_data_source_id: str, member):


@app.task(queue="external_data_sources", retry=settings.IMPORT_UPDATE_MANY_RETRY_COUNT)
@telemetry_task
async def refresh_many(
external_data_source_id: str, members: list, request_id: str = None
):
Expand All @@ -29,6 +86,7 @@ async def refresh_many(


@app.task(queue="external_data_sources")
@telemetry_task
async def refresh_all(external_data_source_id: str, request_id: str = None):
from hub.models import ExternalDataSource

Expand All @@ -49,6 +107,7 @@ async def refresh_webhooks(external_data_source_id: str, timestamp=None):


@app.task(queue="external_data_sources", retry=settings.IMPORT_UPDATE_MANY_RETRY_COUNT)
@telemetry_task
async def import_many(
external_data_source_id: str, members: list, request_id: str = None
):
Expand All @@ -62,6 +121,7 @@ async def import_many(


@app.task(queue="external_data_sources")
@telemetry_task
async def import_all(external_data_source_id: str, request_id: str = None):
from hub.models import ExternalDataSource

Expand Down
13 changes: 8 additions & 5 deletions local_intelligence_hub/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@
POSTHOG_HOST=(str, False),
ENVIRONMENT=(str, "development"),
SENTRY_DSN=(str, False),
CRYPTOGRAPHY_KEY=(str, "somemadeupcryptographickeywhichshouldbereplaced"),
CRYPTOGRAPHY_SALT=(str, "somesaltthatshouldbereplaced"),
ENCRYPTION_SECRET_KEY=(str, "somemadeupcryptographickeywhichshouldbereplaced"),
SENTRY_TRACE_SAMPLE_RATE=(float, 1.0),
CRYPTOGRAPHY_KEY=(str, ""),
CRYPTOGRAPHY_SALT=(str, ""),
ENCRYPTION_SECRET_KEY=(str, ""),
ELECTORAL_COMMISSION_API_KEY=(str, ""),
MAILCHIMP_MYSOC_KEY=(str, ""),
MAILCHIMP_MYSOC_SERVER_PREFIX=(str, ""),
Expand All @@ -75,11 +76,14 @@
# Should be alphanumeric
CRYPTOGRAPHY_KEY = env("CRYPTOGRAPHY_KEY")
CRYPTOGRAPHY_SALT = env("CRYPTOGRAPHY_SALT")
ENCRYPTION_SECRET_KEY = env("ENCRYPTION_SECRET_KEY")

if CRYPTOGRAPHY_KEY is None:
raise ValueError("CRYPTOGRAPHY_KEY must be set")
if CRYPTOGRAPHY_SALT is None:
raise ValueError("CRYPTOGRAPHY_SALT must be set")
if ENCRYPTION_SECRET_KEY is None:
raise ValueError("ENCRYPTION_SECRET_KEY must be set")

ELECTORAL_COMMISSION_API_KEY = env("ELECTORAL_COMMISSION_API_KEY")
BASE_URL = env("BASE_URL")
Expand Down Expand Up @@ -419,6 +423,7 @@ def jwt_handler(token):
}

SCHEDULED_UPDATE_SECONDS_DELAY = env("SCHEDULED_UPDATE_SECONDS_DELAY")
SENTRY_TRACE_SAMPLE_RATE = env("SENTRY_TRACE_SAMPLE_RATE")

environment = env("ENVIRONMENT")

Expand Down Expand Up @@ -480,8 +485,6 @@ def jwt_handler(token):
},
}

ENCRYPTION_SECRET_KEY = env("ENCRYPTION_SECRET_KEY")

WAGTAIL_SITE_NAME = "Mapped hub page editor"
WAGTAILADMIN_BASE_URL = BASE_URL
WAGTAILDOCS_EXTENSIONS = []
Expand Down
Loading

0 comments on commit 3ada163

Please sign in to comment.