Skip to content
2 changes: 1 addition & 1 deletion .github/workflows/snuba-integration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
env:
USE_SNUBA: 1
MIGRATIONS_TEST_MIGRATE: 1
USE_REDIS_INDEXER: 1
USE_INDEXER: 1

steps:
- uses: actions/checkout@v2
Expand Down
2 changes: 1 addition & 1 deletion migrations_lockfile.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ To resolve this, rebase against latest master and regenerate your migration. Thi
will then be regenerated, and you should be able to merge without conflicts.

nodestore: 0002_nodestore_no_dictfield
sentry: 0234_grouphistory
sentry: 0235_add_metricskeyindexer_table
social_auth: 0001_initial
1 change: 1 addition & 0 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ def env(key, default="", type=None):
"sentry.analytics.events",
"sentry.nodestore",
"sentry.search",
"sentry.sentry_metrics.indexer",
"sentry.snuba",
"sentry.lang.java.apps.Config",
"sentry.lang.javascript.apps.Config",
Expand Down
55 changes: 55 additions & 0 deletions src/sentry/migrations/0235_add_metricskeyindexer_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Generated by Django 2.2.24 on 2021-10-04 18:19

import django.utils.timezone
from django.db import migrations, models

import sentry.db.models.fields.bounded


class Migration(migrations.Migration):
# This flag is used to mark that a migration shouldn't be automatically run in
# production. We set this to True for operations that we think are risky and want
# someone from ops to run manually and monitor.
# General advice is that if in doubt, mark your migration as `is_dangerous`.
# Some things you should always mark as dangerous:
# - Large data migrations. Typically we want these to be run manually by ops so that
# they can be monitored. Since data migrations will now hold a transaction open
# this is even more important.
# - Adding columns to highly active tables, even ones that are NULL.
is_dangerous = False

# This flag is used to decide whether to run this migration in a transaction or not.
# By default we prefer to run in a transaction, but for migrations where you want
# to `CREATE INDEX CONCURRENTLY` this needs to be set to False. Typically you'll
# want to create an index concurrently when adding one to an existing table.
# You'll also usually want to set this to `False` if you're writing a data
# migration, since we don't want the entire migration to run in one long-running
# transaction.
atomic = True

dependencies = [
("sentry", "0234_grouphistory"),
]

operations = [
migrations.CreateModel(
name="MetricsKeyIndexer",
fields=[
(
"id",
sentry.db.models.fields.bounded.BoundedBigAutoField(
primary_key=True, serialize=False
),
),
("string", models.CharField(max_length=200)),
("date_added", models.DateTimeField(default=django.utils.timezone.now)),
],
options={
"db_table": "sentry_metricskeyindexer",
},
),
migrations.AddConstraint(
model_name="metricskeyindexer",
constraint=models.UniqueConstraint(fields=("string",), name="unique_string"),
),
]
6 changes: 4 additions & 2 deletions src/sentry/new_migrations/monkey/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from django.db.migrations.executor import MigrationExecutor
from django.db.migrations.operations import SeparateDatabaseAndState
from django.db.migrations.operations.fields import FieldOperation
from django.db.migrations.operations.models import ModelOperation
from django.db.migrations.operations.models import IndexOperation, ModelOperation

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -37,7 +37,9 @@ def _check_db_routing(migration):
def _check_operations(operations):
failed_ops = []
for operation in operations:
if isinstance(operation, (FieldOperation, ModelOperation, RenameContentType)):
if isinstance(
operation, (FieldOperation, ModelOperation, RenameContentType, IndexOperation)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to add the IndexOperation here because otherwise I got the missing "hints={'tables':..} argument" for AddIndex. It seemed to me that since the AddIndex operation is model specific that I could put this here cc @wedamija

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks good to me

):
continue
elif isinstance(operation, SeparateDatabaseAndState):
failed_ops.extend(_check_operations(operation.database_operations))
Expand Down
12 changes: 6 additions & 6 deletions src/sentry/release_health/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,32 +40,32 @@ def get_tag_values_list(org_id: int, values: Sequence[str]) -> Sequence[int]:


def metric_id(org_id: int, name: str) -> int:
index = indexer.resolve(org_id, name) # type: ignore
index = indexer.resolve(name) # type: ignore
if index is None:
raise MetricIndexNotFound(name)
return index # type: ignore


def tag_key(org_id: int, name: str) -> str:
index = indexer.resolve(org_id, name) # type: ignore
index = indexer.resolve(name) # type: ignore
if index is None:
raise MetricIndexNotFound(name)
return f"tags[{index}]"


def tag_value(org_id: int, name: str) -> int:
index = indexer.resolve(org_id, name) # type: ignore
index = indexer.resolve(name) # type: ignore
if index is None:
raise MetricIndexNotFound(name)
return index # type: ignore


def try_get_string_index(org_id: int, name: str) -> Optional[int]:
return indexer.resolve(org_id, name) # type: ignore
return indexer.resolve(name) # type: ignore


def reverse_tag_value(org_id: int, index: int) -> str:
str_value = indexer.reverse_resolve(org_id, index) # type: ignore
str_value = indexer.reverse_resolve(index) # type: ignore
# If the value can't be reversed it's very likely a real programming bug
# instead of something to be caught down: We probably got back a value from
# Snuba that's not in the indexer => partial data loss
Expand Down Expand Up @@ -338,7 +338,7 @@ def _count_users(total: bool, referrer: str) -> Dict[Any, int]:
rv = {}

for project_id, release in project_releases:
release_tag_value = indexer.resolve(org_id, release) # type: ignore
release_tag_value = indexer.resolve(release) # type: ignore
if release_tag_value is None:
# Don't emit empty releases -- for exact compatibility with
# sessions table backend.
Expand Down
8 changes: 4 additions & 4 deletions src/sentry/sentry_metrics/indexer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,18 @@ class StringIndexer(Service): # type: ignore

__all__ = ("record", "resolve", "reverse_resolve", "bulk_record")

def bulk_record(self, org_id: int, strings: List[str]) -> Dict[str, int]:
def bulk_record(self, strings: List[str]) -> Dict[str, int]:
raise NotImplementedError()

def record(self, org_id: int, string: str) -> int:
def record(self, string: str) -> int:
"""Store a string and return the integer ID generated for it

With every call to this method, the lifetime of the entry will be
prolonged.
"""
raise NotImplementedError()

def resolve(self, org_id: int, string: str) -> Optional[int]:
def resolve(self, string: str) -> Optional[int]:
"""Lookup the integer ID for a string.

Does not affect the lifetime of the entry.
Expand All @@ -31,7 +31,7 @@ def resolve(self, org_id: int, string: str) -> Optional[int]:
"""
raise NotImplementedError()

def reverse_resolve(self, org_id: int, id: int) -> Optional[str]:
def reverse_resolve(self, id: int) -> Optional[str]:
"""Lookup the stored string for a given integer ID.

Returns None if the entry cannot be found.
Expand Down
3 changes: 1 addition & 2 deletions src/sentry/sentry_metrics/indexer/indexer_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ def __init__(self, producer: Producer) -> None:
def process_message(self, message: Any) -> MutableMapping[str, Any]:
parsed_message: MutableMapping[str, Any] = json.loads(message.value(), use_rapid_json=True)

org_id = parsed_message["org_id"]
metric_name = parsed_message["name"]
tags = parsed_message["tags"]

Expand All @@ -46,7 +45,7 @@ def process_message(self, message: Any) -> MutableMapping[str, Any]:
*tags.values(),
}

mapping = indexer.bulk_record(org_id, list(strings)) # type: ignore
mapping = indexer.bulk_record(list(strings)) # type: ignore

new_tags = {mapping[k]: mapping[v] for k, v in tags.items()}

Expand Down
6 changes: 3 additions & 3 deletions src/sentry/sentry_metrics/indexer/mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ def __init__(self) -> None:
self._strings: DefaultDict[str, int] = defaultdict(self._counter.__next__)
self._reverse: Dict[int, str] = {}

def record(self, org_id: int, string: str) -> int:
def record(self, string: str) -> int:
return self._record(string)

def resolve(self, org_id: int, string: str) -> Optional[int]:
def resolve(self, string: str) -> Optional[int]:
return self._strings.get(string)

def reverse_resolve(self, org_id: int, id: int) -> Optional[str]:
def reverse_resolve(self, id: int) -> Optional[str]:
return self._reverse.get(id)

def _record(self, string: str) -> int:
Expand Down
30 changes: 30 additions & 0 deletions src/sentry/sentry_metrics/indexer/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from typing import Any

from django.db import connections, models, router
from django.utils import timezone

from sentry.db.models import Model


class MetricsKeyIndexer(Model): # type: ignore
__include_in_export__ = False

string = models.CharField(max_length=200)
date_added = models.DateTimeField(default=timezone.now)

class Meta:
db_table = "sentry_metricskeyindexer"
app_label = "sentry"
constraints = [
models.UniqueConstraint(fields=["string"], name="unique_string"),
]

@classmethod
def get_next_values(cls, num: int) -> Any:
using = router.db_for_write(cls)
connection = connections[using].cursor()

connection.execute(
"SELECT nextval('sentry_metricskeyindexer_id_seq') from generate_series(1,%s)", [num]
)
return connection.fetchall()
Comment on lines +23 to +30
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a lot of context on this project, how will we use the values from this sequence?

It vaguely looks like you want to reserve a range of ids, and then use those ids later on to create new rows. Is that the general idea?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wedamija Sorry for not giving enough context in the PR description, I can go back an update in a bit but yeah:

It vaguely looks like you want to reserve a range of ids, and then use those ids later on to create new rows. Is that the general idea?

Thats basically it. Eventually we want to have postgres be off the critical path, but in order to do that we need to know the ids ahead of time. What I am unsure about is what kind of ranges we are talking, is it 100, 1000, 10000? Since this metrics indexer will be used by metrics names, tag keys, and tag values, it could be a lot of writes for high cardinality tags

Copy link
Member

@wedamija wedamija Oct 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good for now. Once we know how many ids we're allocating per second we can decide whether we need to do something more complex here.

I'm not sure if there's a performance hit to calling nextval 10k times, as opposed to doing something like https://www.depesz.com/2008/03/20/getting-multiple-values-from-sequences/. Something we can possibly benchmark in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I bet it will be desirable to avoid calling nextval 10k times (that would be 10k writes I believe). We may not strictly need a sequence at that point but just a counter.
I think we could discuss the solutions then. At this point I am not sure it is very useful to have this method at this stage. Probably better removing it for now in case somebody decided to start depending on it for some reason.

74 changes: 74 additions & 0 deletions src/sentry/sentry_metrics/indexer/postgres.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from collections import defaultdict
from typing import Any, Dict, List, Optional, Set

from sentry.sentry_metrics.indexer.models import MetricsKeyIndexer
from sentry.utils.services import Service


class PGStringIndexer(Service): # type: ignore
"""
Provides integer IDs for metric names, tag keys and tag values
and the corresponding reverse lookup.
"""

__all__ = ("record", "resolve", "reverse_resolve", "bulk_record")

def _bulk_record(self, unmapped_strings: Set[str]) -> Any:
records = [MetricsKeyIndexer(string=string) for string in unmapped_strings]
# We use `ignore_conflicts=True` here to avoid race conditions where metric indexer
# records might have be created between when we queried in `bulk_record` and the
# attempt to create the rows down below.
MetricsKeyIndexer.objects.bulk_create(records, ignore_conflicts=True)
# Using `ignore_conflicts=True` prevents the pk from being set on the model
# instances. Re-query the database to fetch the rows, they should all exist at this
# point.
return MetricsKeyIndexer.objects.filter(string__in=unmapped_strings)

def bulk_record(self, strings: List[str]) -> Dict[str, int]:
# first look up to see if we have any of the values
records = MetricsKeyIndexer.objects.filter(string__in=strings)
result = defaultdict(int)

for record in records:
result[record.string] = record.id

unmapped = set(strings).difference(result.keys())
new_mapped = self._bulk_record(unmapped)

for new in new_mapped:
result[new.string] = new.id

return result

def record(self, string: str) -> int:
"""Store a string and return the integer ID generated for it"""
result = self.bulk_record(strings=[string])
return result[string]

def resolve(self, string: str) -> Optional[int]:
"""Lookup the integer ID for a string.

Returns None if the entry cannot be found.
"""
try:
id: int = MetricsKeyIndexer.objects.filter(string=string).values_list("id", flat=True)[
0
]
except IndexError:
return None

return id

def reverse_resolve(self, id: int) -> Optional[str]:
"""Lookup the stored string for a given integer ID.

Returns None if the entry cannot be found.
"""
try:
string: str = MetricsKeyIndexer.objects.filter(id=id).values_list("string", flat=True)[
0
]
except IndexError:
return None

return string
Loading