Skip to content

Commit

Permalink
Merge branch 'master' into replays-processor
Browse files Browse the repository at this point in the history
  • Loading branch information
lynnagara committed Mar 25, 2024
2 parents 02bf926 + 050c343 commit 3ee893c
Show file tree
Hide file tree
Showing 15 changed files with 281 additions and 20 deletions.
6 changes: 3 additions & 3 deletions gocd/templates/bash/s4s-clickhouse-queries.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ then
ARGS="--gcs-bucket ${GCS_BUCKET}"
fi

SNUBA_SERVICE_NAME="query-${SNUBA_CMD_TYPE}-gocd"
SNUBA_COMPONENT_NAME="query-${SNUBA_CMD_TYPE}-gocd"
SNUBA_CMD="query-${SNUBA_CMD_TYPE} ${ARGS[@]}"

eval $(/devinfra/scripts/regions/project_env_vars.py --region="${SENTRY_REGION}")
/devinfra/scripts/k8s/k8stunnel

/devinfra/scripts/k8s/k8s-spawn-job.py \
--label-selector="service=snuba-${SNUBA_SERVICE_NAME}" \
--container-name="${SNUBA_SERVICE_NAME}" \
--label-selector="service=snuba,component=${SNUBA_COMPONENT_NAME}" \
--container-name="${SNUBA_COMPONENT_NAME}" \
--try-deployments-and-statefulsets \
"snuba-query-${SNUBA_CMD_TYPE}" \
"us.gcr.io/sentryio/snuba:${GO_REVISION_SNUBA_REPO}" \
Expand Down
4 changes: 2 additions & 2 deletions gocd/templates/bash/s4s-replay-queries.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ eval $(/devinfra/scripts/regions/project_env_vars.py --region="${SENTRY_REGION}"
/devinfra/scripts/k8s/k8stunnel

/devinfra/scripts/k8s/k8s-spawn-job.py \
--label-selector="service=snuba-${SNUBA_SERVICE_NAME}" \
--container-name="${SNUBA_SERVICE_NAME}" \
--label-selector="service=snuba,component=${SNUBA_COMPONENT_NAME}" \
--container-name="${SNUBA_COMPONENT_NAME}" \
"snuba-query-replayer" \
"us.gcr.io/sentryio/snuba:${GO_REVISION_SNUBA_REPO}" \
-- \
Expand Down
2 changes: 1 addition & 1 deletion gocd/templates/clickhouse-query-replayer.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ local generate_replay_job(component) =
{
environment_variables: {
SENTRY_REGION: 's4s',
SNUBA_SERVICE_NAME: 'query-replayer-gocd',
SNUBA_COMPONENT_NAME: 'query-replayer-gocd',
},
elastic_profile_id: pipeline_group,
tasks: [
Expand Down
10 changes: 10 additions & 0 deletions rust_snuba/src/processors/generic_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ struct CountersRawRow {
common_fields: CommonMetricFields,
#[serde(default)]
count_value: f64,
#[serde(skip_serializing_if = "Option::is_none")]
record_meta: Option<u8>,
}

/// Parse is the trait which should be implemented for all metric types.
Expand Down Expand Up @@ -210,6 +212,12 @@ impl Parse for CountersRawRow {
}
let retention_days = enforce_retention(Some(from.retention_days), &config.env_config);

let record_meta = match from.use_case_id.as_str() {
"escalating_issues" => Some(0),
"metric_stats" => Some(0),
_ => Some(1),
};

let common_fields = CommonMetricFields {
use_case_id: from.use_case_id,
org_id: from.org_id,
Expand All @@ -230,6 +238,7 @@ impl Parse for CountersRawRow {
Ok(Some(Self {
common_fields,
count_value,
record_meta,
}))
}
}
Expand Down Expand Up @@ -726,6 +735,7 @@ mod tests {
day_retention_days: None,
},
count_value: 1.0,
record_meta: Some(1),
};
assert_eq!(
result.unwrap(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ expression: snapshot_payload
"min_retention_days": 90,
"org_id": 1,
"project_id": 3,
"record_meta": 1,
"retention_days": 90,
"tags.indexed_value": [
0,
Expand Down
1 change: 0 additions & 1 deletion snuba/admin/iam_policy/iam_policy.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
},
{
"members": [
"group:team-starfish@sentry.io",
"group:team-ingest@sentry.io",
"group:team-visibility@sentry.io",
"group:team-performance@sentry.io",
Expand Down
30 changes: 19 additions & 11 deletions snuba/admin/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@
runner = Runner()
audit_log = AuditLog()

ORG_ID = 1


@application.errorhandler(UnauthorizedException)
def handle_invalid_json(exception: UnauthorizedException) -> Response:
Expand Down Expand Up @@ -237,9 +239,11 @@ def do_action() -> None:
if not dry_run:
audit_log.record(
user or "",
AuditLogAction.RAN_MIGRATION_STARTED
if action == "run"
else AuditLogAction.REVERSED_MIGRATION_STARTED,
(
AuditLogAction.RAN_MIGRATION_STARTED
if action == "run"
else AuditLogAction.REVERSED_MIGRATION_STARTED
),
{"migration": str(migration_key), "force": force, "fake": fake},
)
check_for_inactive_replicas(
Expand All @@ -256,19 +260,23 @@ def do_action() -> None:
if not dry_run:
audit_log.record(
user or "",
AuditLogAction.RAN_MIGRATION_COMPLETED
if action == "run"
else AuditLogAction.REVERSED_MIGRATION_COMPLETED,
(
AuditLogAction.RAN_MIGRATION_COMPLETED
if action == "run"
else AuditLogAction.REVERSED_MIGRATION_COMPLETED
),
{"migration": str(migration_key), "force": force, "fake": fake},
notify=True,
)

def notify_error() -> None:
audit_log.record(
user or "",
AuditLogAction.RAN_MIGRATION_FAILED
if action == "run"
else AuditLogAction.REVERSED_MIGRATION_FAILED,
(
AuditLogAction.RAN_MIGRATION_FAILED
if action == "run"
else AuditLogAction.REVERSED_MIGRATION_FAILED
),
{"migration": str(migration_key), "force": force, "fake": fake},
notify=True,
)
Expand Down Expand Up @@ -667,7 +675,7 @@ def config(config_key: str) -> Response:
400,
{"Content-Type": "application/json"},
)
except (state.MismatchedTypeException):
except state.MismatchedTypeException:
return Response(
json.dumps({"error": "Mismatched type"}),
400,
Expand Down Expand Up @@ -997,7 +1005,7 @@ def dlq_replay() -> Response:
@check_tool_perms(tools=[AdminTools.PRODUCTION_QUERIES])
def production_snql_query() -> Response:
body = json.loads(request.data)
body["tenant_ids"] = {"referrer": request.referrer}
body["tenant_ids"] = {"referrer": request.referrer, "organization_id": ORG_ID}
try:
ret = run_snql_query(body, g.user.email)
return ret
Expand Down
2 changes: 1 addition & 1 deletion snuba/cli/query_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ def get_queries_from_querylog(
window_hours_ago_ts = now - timedelta(hours=window_hours)
interval = timedelta(hours=1)

start_time = window_hours_ago_ts
for table in table_names:
start_time = window_hours_ago_ts
while start_time < now:
end_time = start_time + interval
logger.info(f"Fetching queries to run from {table}...")
Expand Down
4 changes: 4 additions & 0 deletions snuba/migrations/group_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,9 @@ def get_migrations(self) -> Sequence[str]:
"0027_sets_add_raw_tags_column",
"0028_distributions_add_indexed_tags_column",
"0029_add_use_case_id_index",
"0030_add_record_meta_column",
"0031_counters_meta_table",
"0032_counters_meta_table_mv",
]


Expand Down Expand Up @@ -356,6 +359,7 @@ def get_migrations(self) -> Sequence[str]:
"0009_spans_add_measure_hashmap",
"0010_spans_add_compression",
"0011_spans_add_index_on_trace_id",
"0012_spans_add_index_on_transaction_name",
]


Expand Down
2 changes: 1 addition & 1 deletion snuba/settings/settings_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@

OPTIMIZE_PARALLEL_MAX_JITTER_MINUTES = 0

ADMIN_ALLOWED_PROD_PROJECTS = [1]
ADMIN_ALLOWED_PROD_PROJECTS = [1, 11276]

REDIS_CLUSTERS = {
key: {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from typing import Sequence

from snuba.clickhouse.columns import Column, UInt
from snuba.clusters.storage_sets import StorageSetKey
from snuba.migrations import migration, operations
from snuba.migrations.columns import MigrationModifiers as Modifiers


class Migration(migration.ClickhouseNodeMigration):
blocking = False
local_table_name = "generic_metric_counters_raw_local"
dist_table_name = "generic_metric_counters_raw_dist"
storage_set_key = StorageSetKey.GENERIC_METRICS_COUNTERS

def forwards_ops(self) -> Sequence[operations.SqlOperation]:
return [
operations.AddColumn(
storage_set=self.storage_set_key,
table_name=self.local_table_name,
column=Column("record_meta", UInt(8, Modifiers(default=str("0")))),
target=operations.OperationTarget.LOCAL,
after="materialization_version",
),
operations.AddColumn(
storage_set=self.storage_set_key,
table_name=self.dist_table_name,
column=Column("record_meta", UInt(8, Modifiers(default=str("0")))),
target=operations.OperationTarget.DISTRIBUTED,
after="materialization_version",
),
]

def backwards_ops(self) -> Sequence[operations.SqlOperation]:
return [
operations.DropColumn(
column_name="record_meta",
storage_set=self.storage_set_key,
table_name=self.dist_table_name,
target=operations.OperationTarget.DISTRIBUTED,
),
operations.DropColumn(
column_name="record_meta",
storage_set=self.storage_set_key,
table_name=self.local_table_name,
target=operations.OperationTarget.LOCAL,
),
]
68 changes: 68 additions & 0 deletions snuba/snuba_migrations/generic_metrics/0031_counters_meta_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from typing import Sequence

from snuba.clickhouse.columns import AggregateFunction, Column, DateTime, String, UInt
from snuba.clusters.storage_sets import StorageSetKey
from snuba.migrations import migration, operations, table_engines
from snuba.migrations.columns import MigrationModifiers as Modifiers
from snuba.migrations.operations import OperationTarget
from snuba.utils.schemas import Float


class Migration(migration.ClickhouseNodeMigration):
blocking = False
granularity = "2048"
local_table_name = "generic_metric_counters_meta_aggregated_local"
dist_table_name = "generic_metric_counters_meta_aggregated_dist"
storage_set_key = StorageSetKey.GENERIC_METRICS_COUNTERS
columns: Sequence[Column[Modifiers]] = [
Column("org_id", UInt(64)),
Column("project_id", UInt(64)),
Column("use_case_id", String(Modifiers(low_cardinality=True))),
Column("metric_id", UInt(64)),
Column("tag_key", String()),
Column("timestamp", DateTime(modifiers=Modifiers(codecs=["DoubleDelta"]))),
Column("retention_days", UInt(16)),
Column("tag_values", AggregateFunction("groupUniqArray", [String()])),
Column("count", AggregateFunction("sum", [Float(64)])),
]

def forwards_ops(self) -> Sequence[operations.SqlOperation]:
return [
operations.CreateTable(
storage_set=self.storage_set_key,
table_name=self.local_table_name,
engine=table_engines.AggregatingMergeTree(
storage_set=self.storage_set_key,
order_by="(org_id, project_id, use_case_id, metric_id, tag_key, timestamp)",
primary_key="(org_id, project_id, use_case_id, metric_id, tag_key, timestamp)",
partition_by="(retention_days, toMonday(timestamp))",
settings={"index_granularity": self.granularity},
ttl="timestamp + toIntervalDay(retention_days)",
),
columns=self.columns,
target=OperationTarget.LOCAL,
),
operations.CreateTable(
storage_set=self.storage_set_key,
table_name=self.dist_table_name,
engine=table_engines.Distributed(
local_table_name=self.local_table_name, sharding_key=None
),
columns=self.columns,
target=OperationTarget.DISTRIBUTED,
),
]

def backwards_ops(self) -> Sequence[operations.SqlOperation]:
return [
operations.DropTable(
storage_set=self.storage_set_key,
table_name=self.dist_table_name,
target=OperationTarget.DISTRIBUTED,
),
operations.DropTable(
storage_set=self.storage_set_key,
table_name=self.local_table_name,
target=OperationTarget.LOCAL,
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from typing import Sequence

from snuba.clickhouse.columns import AggregateFunction, Column, DateTime, String, UInt
from snuba.clusters.storage_sets import StorageSetKey
from snuba.migrations import migration, operations
from snuba.migrations.columns import MigrationModifiers as Modifiers
from snuba.migrations.operations import OperationTarget
from snuba.utils.schemas import Float


class Migration(migration.ClickhouseNodeMigration):
blocking = False
view_name = "generic_metric_counters_meta_aggregation_mv"
dest_table_name = "generic_metric_counters_meta_aggregated_local"
dest_table_columns: Sequence[Column[Modifiers]] = [
Column("org_id", UInt(64)),
Column("project_id", UInt(64)),
Column("use_case_id", String(Modifiers(low_cardinality=True))),
Column("metric_id", UInt(64)),
Column("tag_key", String()),
Column("timestamp", DateTime(modifiers=Modifiers(codecs=["DoubleDelta"]))),
Column("retention_days", UInt(16)),
Column("tag_values", AggregateFunction("groupUniqArray", [String()])),
Column("value", AggregateFunction("sum", [Float(64)])),
]
storage_set_key = StorageSetKey.GENERIC_METRICS_COUNTERS

def forwards_ops(self) -> Sequence[operations.SqlOperation]:
return [
operations.CreateMaterializedView(
storage_set=self.storage_set_key,
view_name=self.view_name,
columns=self.dest_table_columns,
destination_table_name=self.dest_table_name,
target=OperationTarget.LOCAL,
query="""
SELECT
org_id,
project_id,
use_case_id,
metric_id,
tag_key,
toStartOfWeek(timestamp) as timestamp,
retention_days,
groupUniqArrayState(tag_value) as `tag_values`,
sumState(count_value) as count
FROM generic_metric_counters_raw_local
ARRAY JOIN
tags.key AS tag_key, tags.raw_value AS tag_value
WHERE record_meta = 1
GROUP BY
org_id,
project_id,
use_case_id,
metric_id,
tag_key,
timestamp,
retention_days
""",
),
]

def backwards_ops(self) -> Sequence[operations.SqlOperation]:
return [
operations.DropTable(
storage_set=self.storage_set_key,
table_name=self.view_name,
target=OperationTarget.LOCAL,
)
]
Loading

0 comments on commit 3ee893c

Please sign in to comment.