Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Sentry to record job performance metrics #54

Merged
merged 21 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
d3b3d4d
test opentelemetry sentry sdk integration
Moggach Apr 23, 2024
af4c0ec
graphql metrics
janbaykara Apr 24, 2024
9e69b86
merge branch open-telemtry
Moggach Apr 24, 2024
2623ac0
regenerate lock file
Moggach Apr 24, 2024
cc9d72f
test open telemtry console logging
Moggach Apr 24, 2024
2cd4d3b
add some attributes to the import update job metrics open telemtry sp…
Moggach Apr 24, 2024
48a6a66
revert changes to schedule_refresh_all function
Moggach Apr 24, 2024
2728dfb
revert change to report model type
Moggach Apr 25, 2024
5a7901b
set up two metrics to get number of rows updated in import and update…
Moggach Apr 25, 2024
d00baa3
add metrics into relevant functions in models - logging to the consol…
Moggach Apr 25, 2024
10fa350
add a telemetry task function to record task status and log to consol…
Moggach Apr 25, 2024
d210ed8
add cpu usage per task
Moggach Apr 25, 2024
e124252
add time difference between request and function start for import all…
Moggach Apr 25, 2024
7ea05e1
fix issue with wrong data format passed to tracer in import_all task …
Moggach Apr 25, 2024
76ff844
add metrics for the duration of data imports and updates
Moggach Apr 25, 2024
ffe0627
Merge branch 'main' into install-job-metrics
joaquimds May 22, 2024
35fbc2a
feat: use sentry to record job performance metrics
joaquimds May 23, 2024
8071030
fix: correct sentry metrics usage
joaquimds May 24, 2024
9a5577a
Merge remote-tracking branch 'origin/main' into feat/sentry-metrics
janbaykara May 31, 2024
209490c
Merge branch 'main' into feat/sentry-metrics
joaquimds Jun 6, 2024
71d5b98
chore: lint
joaquimds Jun 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .env-example
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,7 @@ TEST_ACTIONNETWORK_MEMBERLIST_API_KEY="..."

SENTRY_DSN="..."
ENVIRONMENT="development"
ENCRYPTION_SECRET_KEY=""

CRYPTOGRAPHY_KEY=somemadeupcryptographickeywhichshouldbereplaced
CRYPTOGRAPHY_SALT=somesaltthatshouldbereplaced
ENCRYPTION_SECRET_KEY=somemadeupcryptographickeywhichshouldbereplaced
11 changes: 8 additions & 3 deletions hub/graphql/mutations.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import logging
import uuid
from typing import List, Optional
Expand Down Expand Up @@ -191,11 +192,15 @@ def get_or_create_organisation_for_user(info: Info, org=None):

@strawberry_django.mutation(extensions=[IsAuthenticated()])
async def import_all(external_data_source_id: str) -> ExternalDataSourceAction:
data_source = await models.ExternalDataSource.objects.aget(
id=external_data_source_id
data_source: models.ExternalDataSource = (
await models.ExternalDataSource.objects.aget(id=external_data_source_id)
)
request_id = str(uuid.uuid4())
await data_source.schedule_import_all(request_id=request_id)
requested_at = datetime.datetime.now(datetime.timezone.utc).isoformat()

await data_source.schedule_import_all(
requested_at=requested_at, request_id=request_id
)
return ExternalDataSourceAction(id=request_id, external_data_source=data_source)


Expand Down
22 changes: 19 additions & 3 deletions hub/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from pyairtable import Base as AirtableBase
from pyairtable import Table as AirtableTable
from pyairtable.models.schema import TableSchema as AirtableTableSchema
from sentry_sdk import metrics
from strawberry.dataloader import DataLoader
from wagtail.admin.panels import FieldPanel, ObjectList, TabbedInterface
from wagtail.images.models import AbstractImage, AbstractRendition, Image
Expand Down Expand Up @@ -1155,13 +1156,16 @@ def get_scheduled_batch_job_progress(self, parent_job: ProcrastinateJob):
)
+ 1
)
remaining = total - done

time_started = (
ProcrastinateEvent.objects.filter(job_id=parent_job.id)
.order_by("at")
.first()
.at.replace(tzinfo=pytz.utc)
)

remaining = total - done

time_so_far = datetime.now(pytz.utc) - time_started
duration_per_record = time_so_far / done
time_remaining = duration_per_record * remaining
Expand Down Expand Up @@ -1827,9 +1831,12 @@ async def deferred_refresh_all(
)

members = await external_data_source.fetch_all()
member_count = 0
batches = batched(members, settings.IMPORT_UPDATE_ALL_BATCH_SIZE)
for batch in batches:
member_count += len(batch)
await external_data_source.schedule_refresh_many(batch, request_id)
metrics.distribution(key="update_rows_requested", value=member_count)

@classmethod
async def deferred_refresh_webhooks(cls, external_data_source_id: str):
Expand Down Expand Up @@ -1864,11 +1871,14 @@ async def deferred_import_all(
)

members = await external_data_source.fetch_all()
member_count = 0
batches = batched(members, settings.IMPORT_UPDATE_ALL_BATCH_SIZE)
for batch in batches:
member_count += len(batch)
await external_data_source.schedule_import_many(
batch, request_id=request_id
)
metrics.distribution(key="import_rows_requested", value=member_count)

async def schedule_refresh_one(self, member) -> int:
if not self.allow_updates:
Expand Down Expand Up @@ -1961,14 +1971,20 @@ async def schedule_import_many(self, members: list, request_id: str = None) -> i
except (UniqueViolation, IntegrityError):
pass

async def schedule_import_all(self, request_id: str = None) -> int:
async def schedule_import_all(
self, requested_at: str, request_id: str = None
) -> int:
try:
return await import_all.configure(
# Dedupe `import_all` jobs for the same config
# https://procrastinate.readthedocs.io/en/stable/howto/queueing_locks.html
queueing_lock=f"import_all_{str(self.id)}",
schedule_in={"seconds": settings.SCHEDULED_UPDATE_SECONDS_DELAY},
).defer_async(external_data_source_id=str(self.id), request_id=request_id)
).defer_async(
external_data_source_id=str(self.id),
requested_at=requested_at,
request_id=request_id,
)
except (UniqueViolation, IntegrityError):
pass

Expand Down
60 changes: 60 additions & 0 deletions hub/tasks.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,68 @@
from __future__ import annotations

Check failure on line 1 in hub/tasks.py

View workflow job for this annotation

GitHub Actions / lint

Imports are incorrectly sorted and/or formatted.

import datetime
import functools
import os

from django.conf import settings
from django.db.models import Count, Q
from django.core import management

from procrastinate.contrib.django import app
from procrastinate.contrib.django.models import ProcrastinateJob
from sentry_sdk import metrics


def telemetry_task(func):
task_name = func.__name__
user_cpu_time_metric = f"task.{task_name}.user_cpu_time"
system_cpu_time_metric = f"task.{task_name}.system_cpu_time"
elapsed_time_metric = f"task.{task_name}.elapsed_time"
percentage_failed_metric = f"task.{task_name}.percentage_failed"

@functools.wraps(func)
async def wrapper(*args, **kwargs):
# Get times before task execution
start_time = datetime.datetime.now(datetime.timezone.utc)
cpu_start = os.times()

try:
result = await func(*args, **kwargs)

# Get CPU time after task execution
end_time = datetime.datetime.now(datetime.timezone.utc)
cpu_end = os.times()

# Calculate the CPU time used during the task
user_cpu_time_used = cpu_end.user - cpu_start.user
system_cpu_time_used = cpu_end.system - cpu_start.system
elapsed_time = end_time - start_time

metrics.distribution(key=user_cpu_time_metric, value=user_cpu_time_used)
metrics.distribution(key=system_cpu_time_metric, value=system_cpu_time_used)
metrics.distribution(
key=elapsed_time_metric,
value=elapsed_time.total_seconds(),
unit="seconds",
)

return result
finally:
counts = await ProcrastinateJob.objects.aaggregate(
total=Count("id"), failed=Count("id", filter=Q(status="failed"))
)
percentage_failed = (
counts["failed"] * 100 / counts["total"] if counts["total"] else 0
)
metrics.gauge(
key=percentage_failed_metric, value=round(percentage_failed, 2)
)

return wrapper


@app.task(queue="external_data_sources")
@telemetry_task
async def refresh_one(external_data_source_id: str, member):
from hub.models import ExternalDataSource

Expand All @@ -16,6 +72,7 @@


@app.task(queue="external_data_sources", retry=settings.IMPORT_UPDATE_MANY_RETRY_COUNT)
@telemetry_task
async def refresh_many(
external_data_source_id: str, members: list, request_id: str = None
):
Expand All @@ -29,6 +86,7 @@


@app.task(queue="external_data_sources")
@telemetry_task
async def refresh_all(external_data_source_id: str, request_id: str = None):
from hub.models import ExternalDataSource

Expand All @@ -49,6 +107,7 @@


@app.task(queue="external_data_sources", retry=settings.IMPORT_UPDATE_MANY_RETRY_COUNT)
@telemetry_task
async def import_many(
external_data_source_id: str, members: list, request_id: str = None
):
Expand All @@ -62,6 +121,7 @@


@app.task(queue="external_data_sources")
@telemetry_task
async def import_all(external_data_source_id: str, request_id: str = None):
from hub.models import ExternalDataSource

Expand Down
13 changes: 8 additions & 5 deletions local_intelligence_hub/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@
POSTHOG_HOST=(str, False),
ENVIRONMENT=(str, "development"),
SENTRY_DSN=(str, False),
CRYPTOGRAPHY_KEY=(str, "somemadeupcryptographickeywhichshouldbereplaced"),
CRYPTOGRAPHY_SALT=(str, "somesaltthatshouldbereplaced"),
ENCRYPTION_SECRET_KEY=(str, "somemadeupcryptographickeywhichshouldbereplaced"),
SENTRY_TRACE_SAMPLE_RATE=(float, 1.0),
CRYPTOGRAPHY_KEY=(str, ""),
CRYPTOGRAPHY_SALT=(str, ""),
ENCRYPTION_SECRET_KEY=(str, ""),
ELECTORAL_COMMISSION_API_KEY=(str, ""),
MAILCHIMP_MYSOC_KEY=(str, ""),
MAILCHIMP_MYSOC_SERVER_PREFIX=(str, ""),
Expand All @@ -75,11 +76,14 @@
# Should be alphanumeric
CRYPTOGRAPHY_KEY = env("CRYPTOGRAPHY_KEY")
CRYPTOGRAPHY_SALT = env("CRYPTOGRAPHY_SALT")
ENCRYPTION_SECRET_KEY = env("ENCRYPTION_SECRET_KEY")

if CRYPTOGRAPHY_KEY is None:
raise ValueError("CRYPTOGRAPHY_KEY must be set")
if CRYPTOGRAPHY_SALT is None:
raise ValueError("CRYPTOGRAPHY_SALT must be set")
if ENCRYPTION_SECRET_KEY is None:
raise ValueError("ENCRYPTION_SECRET_KEY must be set")

ELECTORAL_COMMISSION_API_KEY = env("ELECTORAL_COMMISSION_API_KEY")
BASE_URL = env("BASE_URL")
Expand Down Expand Up @@ -419,6 +423,7 @@ def jwt_handler(token):
}

SCHEDULED_UPDATE_SECONDS_DELAY = env("SCHEDULED_UPDATE_SECONDS_DELAY")
SENTRY_TRACE_SAMPLE_RATE = env("SENTRY_TRACE_SAMPLE_RATE")

environment = env("ENVIRONMENT")

Expand Down Expand Up @@ -480,8 +485,6 @@ def jwt_handler(token):
},
}

ENCRYPTION_SECRET_KEY = env("ENCRYPTION_SECRET_KEY")

WAGTAIL_SITE_NAME = "Mapped hub page editor"
WAGTAILADMIN_BASE_URL = BASE_URL
WAGTAILDOCS_EXTENSIONS = []
Expand Down
Loading
Loading