diff --git a/.github/workflows/snuba-integration-test.yml b/.github/workflows/snuba-integration-test.yml index 53990eb4303aff..4901716f2386b8 100644 --- a/.github/workflows/snuba-integration-test.yml +++ b/.github/workflows/snuba-integration-test.yml @@ -18,7 +18,7 @@ jobs: env: USE_SNUBA: 1 MIGRATIONS_TEST_MIGRATE: 1 - USE_REDIS_INDEXER: 1 + USE_INDEXER: 1 steps: - uses: actions/checkout@v2 diff --git a/migrations_lockfile.txt b/migrations_lockfile.txt index 931d040f1175f2..c8a8e9e6107c83 100644 --- a/migrations_lockfile.txt +++ b/migrations_lockfile.txt @@ -6,5 +6,5 @@ To resolve this, rebase against latest master and regenerate your migration. Thi will then be regenerated, and you should be able to merge without conflicts. nodestore: 0002_nodestore_no_dictfield -sentry: 0234_grouphistory +sentry: 0235_add_metricskeyindexer_table social_auth: 0001_initial diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index d0319e11068007..bd2da679ed04f0 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -334,6 +334,7 @@ def env(key, default="", type=None): "sentry.analytics.events", "sentry.nodestore", "sentry.search", + "sentry.sentry_metrics.indexer", "sentry.snuba", "sentry.lang.java.apps.Config", "sentry.lang.javascript.apps.Config", diff --git a/src/sentry/migrations/0235_add_metricskeyindexer_table.py b/src/sentry/migrations/0235_add_metricskeyindexer_table.py new file mode 100644 index 00000000000000..86436c1d9ef734 --- /dev/null +++ b/src/sentry/migrations/0235_add_metricskeyindexer_table.py @@ -0,0 +1,55 @@ +# Generated by Django 2.2.24 on 2021-10-04 18:19 + +import django.utils.timezone +from django.db import migrations, models + +import sentry.db.models.fields.bounded + + +class Migration(migrations.Migration): + # This flag is used to mark that a migration shouldn't be automatically run in + # production. We set this to True for operations that we think are risky and want + # someone from ops to run manually and monitor. + # General advice is that if in doubt, mark your migration as `is_dangerous`. + # Some things you should always mark as dangerous: + # - Large data migrations. Typically we want these to be run manually by ops so that + # they can be monitored. Since data migrations will now hold a transaction open + # this is even more important. + # - Adding columns to highly active tables, even ones that are NULL. + is_dangerous = False + + # This flag is used to decide whether to run this migration in a transaction or not. + # By default we prefer to run in a transaction, but for migrations where you want + # to `CREATE INDEX CONCURRENTLY` this needs to be set to False. Typically you'll + # want to create an index concurrently when adding one to an existing table. + # You'll also usually want to set this to `False` if you're writing a data + # migration, since we don't want the entire migration to run in one long-running + # transaction. + atomic = True + + dependencies = [ + ("sentry", "0234_grouphistory"), + ] + + operations = [ + migrations.CreateModel( + name="MetricsKeyIndexer", + fields=[ + ( + "id", + sentry.db.models.fields.bounded.BoundedBigAutoField( + primary_key=True, serialize=False + ), + ), + ("string", models.CharField(max_length=200)), + ("date_added", models.DateTimeField(default=django.utils.timezone.now)), + ], + options={ + "db_table": "sentry_metricskeyindexer", + }, + ), + migrations.AddConstraint( + model_name="metricskeyindexer", + constraint=models.UniqueConstraint(fields=("string",), name="unique_string"), + ), + ] diff --git a/src/sentry/new_migrations/monkey/executor.py b/src/sentry/new_migrations/monkey/executor.py index 03fe6d43154018..d0dd1bf9df8853 100644 --- a/src/sentry/new_migrations/monkey/executor.py +++ b/src/sentry/new_migrations/monkey/executor.py @@ -5,7 +5,7 @@ from django.db.migrations.executor import MigrationExecutor from django.db.migrations.operations import SeparateDatabaseAndState from django.db.migrations.operations.fields import FieldOperation -from django.db.migrations.operations.models import ModelOperation +from django.db.migrations.operations.models import IndexOperation, ModelOperation logger = logging.getLogger(__name__) @@ -37,7 +37,9 @@ def _check_db_routing(migration): def _check_operations(operations): failed_ops = [] for operation in operations: - if isinstance(operation, (FieldOperation, ModelOperation, RenameContentType)): + if isinstance( + operation, (FieldOperation, ModelOperation, RenameContentType, IndexOperation) + ): continue elif isinstance(operation, SeparateDatabaseAndState): failed_ops.extend(_check_operations(operation.database_operations)) diff --git a/src/sentry/release_health/metrics.py b/src/sentry/release_health/metrics.py index 46fcd094377ab4..5ffe868e919f1b 100644 --- a/src/sentry/release_health/metrics.py +++ b/src/sentry/release_health/metrics.py @@ -40,32 +40,32 @@ def get_tag_values_list(org_id: int, values: Sequence[str]) -> Sequence[int]: def metric_id(org_id: int, name: str) -> int: - index = indexer.resolve(org_id, name) # type: ignore + index = indexer.resolve(name) # type: ignore if index is None: raise MetricIndexNotFound(name) return index # type: ignore def tag_key(org_id: int, name: str) -> str: - index = indexer.resolve(org_id, name) # type: ignore + index = indexer.resolve(name) # type: ignore if index is None: raise MetricIndexNotFound(name) return f"tags[{index}]" def tag_value(org_id: int, name: str) -> int: - index = indexer.resolve(org_id, name) # type: ignore + index = indexer.resolve(name) # type: ignore if index is None: raise MetricIndexNotFound(name) return index # type: ignore def try_get_string_index(org_id: int, name: str) -> Optional[int]: - return indexer.resolve(org_id, name) # type: ignore + return indexer.resolve(name) # type: ignore def reverse_tag_value(org_id: int, index: int) -> str: - str_value = indexer.reverse_resolve(org_id, index) # type: ignore + str_value = indexer.reverse_resolve(index) # type: ignore # If the value can't be reversed it's very likely a real programming bug # instead of something to be caught down: We probably got back a value from # Snuba that's not in the indexer => partial data loss @@ -338,7 +338,7 @@ def _count_users(total: bool, referrer: str) -> Dict[Any, int]: rv = {} for project_id, release in project_releases: - release_tag_value = indexer.resolve(org_id, release) # type: ignore + release_tag_value = indexer.resolve(release) # type: ignore if release_tag_value is None: # Don't emit empty releases -- for exact compatibility with # sessions table backend. diff --git a/src/sentry/sentry_metrics/indexer/base.py b/src/sentry/sentry_metrics/indexer/base.py index c14b72d458905a..93951fc6cc021f 100644 --- a/src/sentry/sentry_metrics/indexer/base.py +++ b/src/sentry/sentry_metrics/indexer/base.py @@ -11,10 +11,10 @@ class StringIndexer(Service): # type: ignore __all__ = ("record", "resolve", "reverse_resolve", "bulk_record") - def bulk_record(self, org_id: int, strings: List[str]) -> Dict[str, int]: + def bulk_record(self, strings: List[str]) -> Dict[str, int]: raise NotImplementedError() - def record(self, org_id: int, string: str) -> int: + def record(self, string: str) -> int: """Store a string and return the integer ID generated for it With every call to this method, the lifetime of the entry will be @@ -22,7 +22,7 @@ def record(self, org_id: int, string: str) -> int: """ raise NotImplementedError() - def resolve(self, org_id: int, string: str) -> Optional[int]: + def resolve(self, string: str) -> Optional[int]: """Lookup the integer ID for a string. Does not affect the lifetime of the entry. @@ -31,7 +31,7 @@ def resolve(self, org_id: int, string: str) -> Optional[int]: """ raise NotImplementedError() - def reverse_resolve(self, org_id: int, id: int) -> Optional[str]: + def reverse_resolve(self, id: int) -> Optional[str]: """Lookup the stored string for a given integer ID. Returns None if the entry cannot be found. diff --git a/src/sentry/sentry_metrics/indexer/indexer_consumer.py b/src/sentry/sentry_metrics/indexer/indexer_consumer.py index 7831aec320087f..37a28e77f65a63 100644 --- a/src/sentry/sentry_metrics/indexer/indexer_consumer.py +++ b/src/sentry/sentry_metrics/indexer/indexer_consumer.py @@ -36,7 +36,6 @@ def __init__(self, producer: Producer) -> None: def process_message(self, message: Any) -> MutableMapping[str, Any]: parsed_message: MutableMapping[str, Any] = json.loads(message.value(), use_rapid_json=True) - org_id = parsed_message["org_id"] metric_name = parsed_message["name"] tags = parsed_message["tags"] @@ -46,7 +45,7 @@ def process_message(self, message: Any) -> MutableMapping[str, Any]: *tags.values(), } - mapping = indexer.bulk_record(org_id, list(strings)) # type: ignore + mapping = indexer.bulk_record(list(strings)) # type: ignore new_tags = {mapping[k]: mapping[v] for k, v in tags.items()} diff --git a/src/sentry/sentry_metrics/indexer/mock.py b/src/sentry/sentry_metrics/indexer/mock.py index 7ed5f5d533ccbe..8c7ac64e4ed8b9 100644 --- a/src/sentry/sentry_metrics/indexer/mock.py +++ b/src/sentry/sentry_metrics/indexer/mock.py @@ -31,13 +31,13 @@ def __init__(self) -> None: self._strings: DefaultDict[str, int] = defaultdict(self._counter.__next__) self._reverse: Dict[int, str] = {} - def record(self, org_id: int, string: str) -> int: + def record(self, string: str) -> int: return self._record(string) - def resolve(self, org_id: int, string: str) -> Optional[int]: + def resolve(self, string: str) -> Optional[int]: return self._strings.get(string) - def reverse_resolve(self, org_id: int, id: int) -> Optional[str]: + def reverse_resolve(self, id: int) -> Optional[str]: return self._reverse.get(id) def _record(self, string: str) -> int: diff --git a/src/sentry/sentry_metrics/indexer/models.py b/src/sentry/sentry_metrics/indexer/models.py new file mode 100644 index 00000000000000..0c91ab93539211 --- /dev/null +++ b/src/sentry/sentry_metrics/indexer/models.py @@ -0,0 +1,30 @@ +from typing import Any + +from django.db import connections, models, router +from django.utils import timezone + +from sentry.db.models import Model + + +class MetricsKeyIndexer(Model): # type: ignore + __include_in_export__ = False + + string = models.CharField(max_length=200) + date_added = models.DateTimeField(default=timezone.now) + + class Meta: + db_table = "sentry_metricskeyindexer" + app_label = "sentry" + constraints = [ + models.UniqueConstraint(fields=["string"], name="unique_string"), + ] + + @classmethod + def get_next_values(cls, num: int) -> Any: + using = router.db_for_write(cls) + connection = connections[using].cursor() + + connection.execute( + "SELECT nextval('sentry_metricskeyindexer_id_seq') from generate_series(1,%s)", [num] + ) + return connection.fetchall() diff --git a/src/sentry/sentry_metrics/indexer/postgres.py b/src/sentry/sentry_metrics/indexer/postgres.py new file mode 100644 index 00000000000000..fcab5bc4f0908a --- /dev/null +++ b/src/sentry/sentry_metrics/indexer/postgres.py @@ -0,0 +1,74 @@ +from collections import defaultdict +from typing import Any, Dict, List, Optional, Set + +from sentry.sentry_metrics.indexer.models import MetricsKeyIndexer +from sentry.utils.services import Service + + +class PGStringIndexer(Service): # type: ignore + """ + Provides integer IDs for metric names, tag keys and tag values + and the corresponding reverse lookup. + """ + + __all__ = ("record", "resolve", "reverse_resolve", "bulk_record") + + def _bulk_record(self, unmapped_strings: Set[str]) -> Any: + records = [MetricsKeyIndexer(string=string) for string in unmapped_strings] + # We use `ignore_conflicts=True` here to avoid race conditions where metric indexer + # records might have be created between when we queried in `bulk_record` and the + # attempt to create the rows down below. + MetricsKeyIndexer.objects.bulk_create(records, ignore_conflicts=True) + # Using `ignore_conflicts=True` prevents the pk from being set on the model + # instances. Re-query the database to fetch the rows, they should all exist at this + # point. + return MetricsKeyIndexer.objects.filter(string__in=unmapped_strings) + + def bulk_record(self, strings: List[str]) -> Dict[str, int]: + # first look up to see if we have any of the values + records = MetricsKeyIndexer.objects.filter(string__in=strings) + result = defaultdict(int) + + for record in records: + result[record.string] = record.id + + unmapped = set(strings).difference(result.keys()) + new_mapped = self._bulk_record(unmapped) + + for new in new_mapped: + result[new.string] = new.id + + return result + + def record(self, string: str) -> int: + """Store a string and return the integer ID generated for it""" + result = self.bulk_record(strings=[string]) + return result[string] + + def resolve(self, string: str) -> Optional[int]: + """Lookup the integer ID for a string. + + Returns None if the entry cannot be found. + """ + try: + id: int = MetricsKeyIndexer.objects.filter(string=string).values_list("id", flat=True)[ + 0 + ] + except IndexError: + return None + + return id + + def reverse_resolve(self, id: int) -> Optional[str]: + """Lookup the stored string for a given integer ID. + + Returns None if the entry cannot be found. + """ + try: + string: str = MetricsKeyIndexer.objects.filter(id=id).values_list("string", flat=True)[ + 0 + ] + except IndexError: + return None + + return string diff --git a/src/sentry/sentry_metrics/indexer/redis_mock.py b/src/sentry/sentry_metrics/indexer/redis_mock.py index adc3f406f20f6f..a018a5d9149e7c 100644 --- a/src/sentry/sentry_metrics/indexer/redis_mock.py +++ b/src/sentry/sentry_metrics/indexer/redis_mock.py @@ -23,15 +23,15 @@ class RedisMockIndexer(StringIndexer): Temporary mock string indexer that uses Redis to store data. """ - def _get_key(self, org_id: int, instance: Union[str, int]) -> str: + def _get_key(self, instance: Union[str, int]) -> str: if isinstance(instance, str): - return f"temp-metrics-indexer:{org_id}:1:str:{instance}" + return f"temp-metrics-indexer:1:str:{instance}" elif isinstance(instance, int): - return f"temp-metrics-indexer:{org_id}:1:int:{instance}" + return f"temp-metrics-indexer:1:int:{instance}" else: raise Exception("Invalid: must be string or int") - def _bulk_record(self, org_id: int, unmapped: Dict[str, None]) -> Dict[str, int]: + def _bulk_record(self, unmapped: Dict[str, None]) -> Dict[str, int]: """ Take a mapping of strings {"metric_id`": None} and populate the ints for the corresponding strings. @@ -50,15 +50,15 @@ def _bulk_record(self, org_id: int, unmapped: Dict[str, None]) -> Dict[str, int] int_value = get_int(string) mapped_ints[string] = int_value - int_key = self._get_key(org_id, int_value) - string_key = self._get_key(org_id, string) + int_key = self._get_key(int_value) + string_key = self._get_key(string) client.set(string_key, int_value, ex=INDEXER_TTL) client.set(int_key, string, ex=INDEXER_TTL) return mapped_ints - def bulk_record(self, org_id: int, strings: List[str]) -> Dict[str, int]: + def bulk_record(self, strings: List[str]) -> Dict[str, int]: """ Takes a list of strings that could be a metric names, tag keys or values and returns a string -> int mapping. @@ -94,7 +94,7 @@ def bulk_record(self, org_id: int, strings: List[str]) -> Dict[str, int]: """ client = get_client() - string_keys = [self._get_key(org_id, s) for s in strings] + string_keys = [self._get_key(s) for s in strings] results = client.mget(string_keys) resolved: Dict[str, int] = {} @@ -108,37 +108,37 @@ def bulk_record(self, org_id: int, strings: List[str]) -> Dict[str, int]: if len(unresolved.keys()) == 0: return resolved - newly_resolved = self._bulk_record(org_id, unresolved) + newly_resolved = self._bulk_record(unresolved) resolved.update(newly_resolved) return resolved - def record(self, org_id: int, string: str) -> int: + def record(self, string: str) -> int: """ If key already exists, grab that value, otherwise record both the string to int and int to string relationships. """ client = get_client() - string_key = f"temp-metrics-indexer:{org_id}:1:str:{string}" + string_key = f"temp-metrics-indexer:1:str:{string}" value: Any = client.get(string_key) if value is None: value = get_int(string) client.set(string_key, value) # reverse record (int to string) - int_key = f"temp-metrics-indexer:{org_id}:1:int:{value}" + int_key = f"temp-metrics-indexer:1:int:{value}" client.set(int_key, string) return int(value) - def resolve(self, org_id: int, string: str) -> Optional[int]: + def resolve(self, string: str) -> Optional[int]: try: - return int(get_client().get(f"temp-metrics-indexer:{org_id}:1:str:{string}")) + return int(get_client().get(f"temp-metrics-indexer:1:str:{string}")) except TypeError: return None - def reverse_resolve(self, org_id: int, id: int) -> Optional[str]: - result: Optional[str] = get_client().get(f"temp-metrics-indexer:{org_id}:1:int:{id}") + def reverse_resolve(self, id: int) -> Optional[str]: + result: Optional[str] = get_client().get(f"temp-metrics-indexer:1:int:{id}") return result def delete_records(self) -> None: diff --git a/src/sentry/snuba/metrics.py b/src/sentry/snuba/metrics.py index 22a7d2f3ab1fff..7563e566ad82bd 100644 --- a/src/sentry/snuba/metrics.py +++ b/src/sentry/snuba/metrics.py @@ -424,7 +424,7 @@ def _build_filter(self, query_definition: QueryDefinition) -> Optional[BooleanCo def to_int(string): try: - return indexer.resolve(self._project.organization_id, string) + return indexer.resolve(string) except KeyError: return None @@ -455,10 +455,7 @@ def _build_where( Condition( Column("metric_id"), Op.IN, - [ - indexer.resolve(self._project.organization_id, name) - for _, name in query_definition.fields.values() - ], + [indexer.resolve(name) for _, name in query_definition.fields.values()], ), Condition(Column(TS_COL_QUERY), Op.GTE, query_definition.start), Condition(Column(TS_COL_QUERY), Op.LT, query_definition.end), @@ -471,8 +468,7 @@ def _build_where( def _build_groupby(self, query_definition: QueryDefinition) -> List[SelectableExpression]: return [Column("metric_id")] + [ - Column(f"tags[{indexer.resolve(self._project.organization_id, field)}]") - for field in query_definition.groupby + Column(f"tags[{indexer.resolve(field)}]") for field in query_definition.groupby ] def _build_queries(self, query_definition): @@ -566,12 +562,12 @@ def __init__( def _parse_tag(self, tag_string: str) -> str: tag_key = int(tag_string.replace("tags[", "").replace("]", "")) - return indexer.reverse_resolve(self._organization_id, tag_key) + return indexer.reverse_resolve(tag_key) def _extract_data(self, entity, data, groups): tags = tuple((key, data[key]) for key in sorted(data.keys()) if key.startswith("tags[")) - metric_name = indexer.reverse_resolve(self._organization_id, data["metric_id"]) + metric_name = indexer.reverse_resolve(data["metric_id"]) ops = self._ops_by_metric[metric_name] tag_data = groups.setdefault( @@ -615,14 +611,9 @@ def translate_results(self): for data in series: self._extract_data(entity, data, groups) - org_id = self._organization_id - groups = [ dict( - by={ - self._parse_tag(key): indexer.reverse_resolve(org_id, value) - for key, value in tags - }, + by={self._parse_tag(key): indexer.reverse_resolve(value) for key, value in tags}, **data, ) for tags, data in groups.items() diff --git a/src/sentry/testutils/cases.py b/src/sentry/testutils/cases.py index 7dfc770a3c45c4..25090c2340df91 100644 --- a/src/sentry/testutils/cases.py +++ b/src/sentry/testutils/cases.py @@ -983,17 +983,17 @@ def bulk_store_sessions(self, sessions): @classmethod def _push_metric(cls, session, type, name, tags, value): def metric_id(name): - res = indexer.record(session["org_id"], name) + res = indexer.record(name) assert res is not None, name return res def tag_key(name): - res = indexer.record(session["org_id"], name) + res = indexer.record(name) assert res is not None, name return res def tag_value(name): - res = indexer.record(session["org_id"], name) + res = indexer.record(name) assert res is not None, name return res diff --git a/src/sentry/utils/pytest/sentry.py b/src/sentry/utils/pytest/sentry.py index e7d79f856fb443..527dc82fa8ca90 100644 --- a/src/sentry/utils/pytest/sentry.py +++ b/src/sentry/utils/pytest/sentry.py @@ -120,10 +120,8 @@ def pytest_configure(config): settings.SENTRY_TSDB = "sentry.tsdb.redissnuba.RedisSnubaTSDB" settings.SENTRY_EVENTSTREAM = "sentry.eventstream.snuba.SnubaEventStream" - if os.environ.get("USE_REDIS_INDEXER", False): - settings.SENTRY_METRICS_INDEXER = ( - "sentry.sentry_metrics.indexer.redis_mock.RedisMockIndexer" - ) + if os.environ.get("USE_INDEXER", False): + settings.SENTRY_METRICS_INDEXER = "sentry.sentry_metrics.indexer.postgres.PGStringIndexer" if os.environ.get("DISABLE_TEST_SDK", False): settings.SENTRY_SDK_CONFIG = {} diff --git a/tests/sentry/sentry_metrics/test_indexer.py b/tests/sentry/sentry_metrics/test_indexer.py index 3ae6b74bae1e93..7cfcd6c33be62b 100644 --- a/tests/sentry/sentry_metrics/test_indexer.py +++ b/tests/sentry/sentry_metrics/test_indexer.py @@ -1,16 +1,13 @@ -from sentry.models import Organization from sentry.sentry_metrics.indexer.mock import MockIndexer INDEXER = MockIndexer() def test_resolve(): - mock_org_id = Organization().id - assert INDEXER.resolve(mock_org_id, "what") is None - assert INDEXER.resolve(mock_org_id, "user") == 11 + assert INDEXER.resolve("what") is None + assert INDEXER.resolve("user") == 11 def test_reverse_resolve(): - mock_org_id = Organization().id - assert INDEXER.reverse_resolve(mock_org_id, 666) is None - assert INDEXER.reverse_resolve(mock_org_id, 11) == "user" + assert INDEXER.reverse_resolve(666) is None + assert INDEXER.reverse_resolve(11) == "user" diff --git a/tests/sentry/sentry_metrics/test_postgres_indexer.py b/tests/sentry/sentry_metrics/test_postgres_indexer.py new file mode 100644 index 00000000000000..cba25504171707 --- /dev/null +++ b/tests/sentry/sentry_metrics/test_postgres_indexer.py @@ -0,0 +1,21 @@ +from sentry.sentry_metrics.indexer.models import MetricsKeyIndexer +from sentry.sentry_metrics.indexer.postgres import PGStringIndexer +from sentry.testutils.cases import TestCase + + +class PostgresIndexerTest(TestCase): + def setUp(self) -> None: + self.indexer = PGStringIndexer() + + def test_indexer(self): + results = PGStringIndexer().bulk_record(strings=["hello", "hey", "hi"]) + assert list(results.values()) == [1, 2, 3] + + # test resolve and reverse_resolve + obj = MetricsKeyIndexer.objects.get(string="hello") + assert PGStringIndexer().resolve("hello") == obj.id + assert PGStringIndexer().reverse_resolve(obj.id) == obj.string + + # test record on a string that already exists + PGStringIndexer().record("hello") + assert PGStringIndexer().resolve("hello") == obj.id diff --git a/tests/sentry/sentry_metrics/test_redis_indexer.py b/tests/sentry/sentry_metrics/test_redis_indexer.py index 4fc0c194dc23a4..2c190b4812c90e 100644 --- a/tests/sentry/sentry_metrics/test_redis_indexer.py +++ b/tests/sentry/sentry_metrics/test_redis_indexer.py @@ -7,8 +7,6 @@ class RedisMockIndexerTest(TestCase): def setUp(self) -> None: - self.org_id = self.create_organization().id - self.key_base = f"temp-metrics-indexer:{self.org_id}:1:" self.indexer = RedisMockIndexer() def tearDown(self) -> None: @@ -16,17 +14,17 @@ def tearDown(self) -> None: def test_bulk_record(self) -> None: strings = ["test-metric", "test-tag-key", "test-tag-value"] - results = self.indexer.bulk_record(self.org_id, strings) + results = self.indexer.bulk_record(strings) assert results == {s: get_int(s) for s in strings} def test_resolve(self) -> None: strings = ["test-metric"] - self.indexer.bulk_record(self.org_id, strings) - assert self.indexer.resolve(self.org_id, "test-metric") == get_int("test-metric") - assert self.indexer.resolve(self.org_id, "bad-value") is None + self.indexer.bulk_record(strings) + assert self.indexer.resolve("test-metric") == get_int("test-metric") + assert self.indexer.resolve("bad-value") is None def test_reverse_resolve(self) -> None: strings = ["test-metric"] - self.indexer.bulk_record(self.org_id, strings) - assert self.indexer.reverse_resolve(self.org_id, get_int("test-metric")) == "test-metric" - assert self.indexer.reverse_resolve(self.org_id, 55555) is None + self.indexer.bulk_record(strings) + assert self.indexer.reverse_resolve(get_int("test-metric")) == "test-metric" + assert self.indexer.reverse_resolve(55555) is None diff --git a/tests/snuba/snuba/test_indexer_consumer.py b/tests/snuba/snuba/test_indexer_consumer.py index 55c48bf8c4db7c..5080c27efc1f24 100644 --- a/tests/snuba/snuba/test_indexer_consumer.py +++ b/tests/snuba/snuba/test_indexer_consumer.py @@ -13,7 +13,7 @@ MetricsIndexerWorker, get_metrics_consumer, ) -from sentry.sentry_metrics.indexer.redis_mock import get_int +from sentry.sentry_metrics.indexer.postgres import PGStringIndexer from sentry.testutils.cases import TestCase from sentry.utils import json, kafka_config from sentry.utils.batching_kafka_consumer import wait_for_topics @@ -34,38 +34,52 @@ "org_id": 1, "project_id": 3, } -tests = [ - pytest.param(payload, 0, False, id="success"), - pytest.param(payload, 1, True, id="missing callback"), -] -@patch("confluent_kafka.Producer") -@pytest.mark.parametrize("metrics_payload, flush_return_value, with_exception", tests) -def test_metrics_indexer_worker(producer, metrics_payload, flush_return_value, with_exception): - producer.produce = MagicMock() - producer.flush = MagicMock(return_value=flush_return_value) +class MetricsIndexerWorkerTest(TestCase): + def setUp(self): + super().setUp() + + def tearDown(self): + super().tearDown() + + def test_without_exception(self): + self.assert_metrics_indexer_worker() - metrics_worker = MetricsIndexerWorker(producer=producer) + def test_with_exception(self): + self.assert_metrics_indexer_worker(flush_return_value=1, with_exception=True) - mock_message = Mock() - mock_message.value = MagicMock(return_value=json.dumps(metrics_payload)) + @pytest.mark.django_db + @patch("confluent_kafka.Producer") + def assert_metrics_indexer_worker( + self, producer, metrics_payload=payload, flush_return_value=0, with_exception=False + ): + producer.produce = MagicMock() + producer.flush = MagicMock(return_value=flush_return_value) - parsed = metrics_worker.process_message(mock_message) - assert parsed["tags"] == {get_int(k): get_int(v) for k, v in metrics_payload["tags"].items()} - assert parsed["metric_id"] == get_int(metrics_payload["name"]) + metrics_worker = MetricsIndexerWorker(producer=producer) - if with_exception: - with pytest.raises(Exception, match="didn't get all the callbacks: 1 left"): + mock_message = Mock() + mock_message.value = MagicMock(return_value=json.dumps(metrics_payload)) + + parsed = metrics_worker.process_message(mock_message) + assert parsed["tags"] == { + PGStringIndexer().resolve(string=k): PGStringIndexer().resolve(string=str(v)) + for k, v in payload["tags"].items() + } + assert parsed["metric_id"] == PGStringIndexer().resolve(string=payload["name"]) + + if with_exception: + with pytest.raises(Exception, match="didn't get all the callbacks: 1 left"): + metrics_worker.flush_batch([parsed]) + else: metrics_worker.flush_batch([parsed]) - else: - metrics_worker.flush_batch([parsed]) - producer.produce.assert_called_with( - topic="snuba-metrics", - key=None, - value=json.dumps(parsed).encode(), - on_delivery=metrics_worker.callback, - ) + producer.produce.assert_called_with( + topic="snuba-metrics", + key=None, + value=json.dumps(parsed).encode(), + on_delivery=metrics_worker.callback, + ) class MetricsIndexerConsumerTest(TestCase): @@ -103,6 +117,7 @@ def tearDown(self): self.override_settings_cm.__exit__(None, None, None) self.admin_client.delete_topics([self.ingest_topic, self.snuba_topic]) + @pytest.mark.django_db def test_metrics_consumer(self): ingest_producer = self._get_producer(self.ingest_topic) message = json.dumps(payload).encode() @@ -153,5 +168,8 @@ def test_metrics_consumer(self): # finally test the payload of the translated message parsed = json.loads(translated_msg.value(), use_rapid_json=True) - assert parsed["tags"] == {str(get_int(k)): get_int(v) for k, v in payload["tags"].items()} - assert parsed["metric_id"] == get_int(payload["name"]) + assert parsed["tags"] == { + str(PGStringIndexer().resolve(string=k)): PGStringIndexer().resolve(string=str(v)) + for k, v in payload["tags"].items() + } + assert parsed["metric_id"] == PGStringIndexer().resolve(string=payload["name"])