Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions src/sentry/integrations/slack/webhooks/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
from sentry.shared_integrations.exceptions import ApiError
from sentry.users.models import User
from sentry.users.services.user import RpcUser
from sentry.utils import metrics
from sentry.utils.locking import UnableToAcquireLock

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -577,24 +578,41 @@ def handle_seer_autofix_start(
group=group,
organization_id=group.project.organization_id,
)
stopping_point = entrypoint.autofix_stopping_point
is_continuation = entrypoint.autofix_run_id is not None
logging_ctx = {
"group_id": group.id,
"organization_id": group.project.organization_id,
"stopping_point": str(stopping_point),
"is_continuation": is_continuation,
"user_id": user.id,
}
_logger.info("seer.slack.trigger_autofix.start", extra=logging_ctx)
lock_key = SlackEntrypoint.get_autofix_lock_key(
group_id=group.id,
stopping_point=entrypoint.autofix_stopping_point,
stopping_point=stopping_point,
)
lock = locks.get(lock_key, duration=10, name="autofix_entrypoint_slack")
try:
with lock.acquire():
SeerOperator(entrypoint=entrypoint).trigger_autofix(
group=group,
user=user,
stopping_point=entrypoint.autofix_stopping_point,
stopping_point=stopping_point,
run_id=entrypoint.autofix_run_id,
)
except UnableToAcquireLock:
# Might be a double click, or Seer is taking it's time confirming the run start.
# The entrypoint will handle removing the button once it starts the run anyway.
_logger.info("seer.slack.trigger_autofix.lock_contention", extra=logging_ctx)
return

_logger.info("seer.slack.trigger_autofix.complete", extra=logging_ctx)
metrics.incr(
"seer.slack.trigger_autofix",
tags={"stopping_point": str(stopping_point), "is_continuation": str(is_continuation)},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to stringify a bool (is_continuation)? I wasn't sure so I looked up a random example and it seems ok

)

@classmethod
def get_action_option(cls, slack_request: SlackActionRequest) -> tuple[str | None, str | None]:
action_option, action_id = None, None
Expand Down
37 changes: 37 additions & 0 deletions src/sentry/seer/entrypoints/cache.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import logging

from sentry.seer.entrypoints.registry import entrypoint_registry
from sentry.seer.entrypoints.types import SeerOperatorCacheResult
from sentry.utils import metrics
from sentry.utils.cache import cache

logger = logging.getLogger(__name__)

AUTOFIX_CACHE_TIMEOUT_SECONDS = 60 * 60 * 12 # 12 hours


Expand Down Expand Up @@ -58,6 +63,14 @@ def populate_pre_autofix_cache(
) -> SeerOperatorCacheResult:
cache_key = cls._get_pre_autofix_cache_key(entrypoint_key=entrypoint_key, group_id=group_id)
cache.set(cache_key, cache_payload, timeout=AUTOFIX_CACHE_TIMEOUT_SECONDS)
logger.info(
"seer.operator.cache.pre_autofix_populated",
extra={
"entrypoint_key": entrypoint_key,
"group_id": group_id,
"cache_key": cache_key,
},
)
return SeerOperatorCacheResult[CachePayloadT](
payload=cache_payload,
source="group_id",
Expand All @@ -70,6 +83,14 @@ def populate_post_autofix_cache(
) -> SeerOperatorCacheResult:
cache_key = cls._get_post_autofix_cache_key(entrypoint_key=entrypoint_key, run_id=run_id)
cache.set(cache_key, cache_payload, timeout=AUTOFIX_CACHE_TIMEOUT_SECONDS)
logger.info(
"seer.operator.cache.post_autofix_populated",
extra={
"entrypoint_key": entrypoint_key,
"run_id": run_id,
"cache_key": cache_key,
},
)
return SeerOperatorCacheResult[CachePayloadT](
payload=cache_payload,
source="run_id",
Expand Down Expand Up @@ -125,6 +146,11 @@ def migrate(
if one exists. If overwrite is True, any existing post-autofix cache will be overwritten.
"""
for entrypoint_key in entrypoint_registry.registrations.keys():
logging_ctx = {
"entrypoint_key": str(entrypoint_key),
"group_id": from_group_id,
"run_id": to_run_id,
}
pre_cache_result = cls._get_pre_autofix_cache(
entrypoint_key=entrypoint_key, group_id=from_group_id
)
Expand All @@ -133,9 +159,13 @@ def migrate(
)
# If we already have a post-autofix cache, and we're not overwriting, skip.
if not overwrite and post_cache_result:
logging_ctx["reason"] = "post_cache_exists"
logger.info("seer.operator.cache.migrate_skipped", extra=logging_ctx)
continue
# If we don't have a pre-autofix cache, nothing to migrate, skip.
if not pre_cache_result:
logging_ctx["reason"] = "no_pre_cache"
logger.info("seer.operator.cache.migrate_skipped", extra=logging_ctx)
continue
post_cache_key = cls._get_post_autofix_cache_key(
entrypoint_key=entrypoint_key, run_id=to_run_id
Expand All @@ -146,3 +176,10 @@ def migrate(
timeout=AUTOFIX_CACHE_TIMEOUT_SECONDS,
)
cache.delete(pre_cache_result["key"])
logging_ctx["from_key"] = pre_cache_result["key"]
logging_ctx["to_key"] = post_cache_key
logger.info("seer.operator.cache.migrated", extra=logging_ctx)
metrics.incr(
"seer.operator.cache.migrated",
tags={"entrypoint_key": str(entrypoint_key)},
)
82 changes: 51 additions & 31 deletions src/sentry/seer/entrypoints/integrations/slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from sentry.tasks.base import instrumented_task
from sentry.taskworker.namespaces import integrations_tasks
from sentry.taskworker.retry import Retry
from sentry.utils import metrics
from sentry.utils.locking import UnableToAcquireLock
from sentry.utils.registry import NoRegistrationExistsError

Expand Down Expand Up @@ -115,8 +116,8 @@ def get_autofix_stopping_point_from_action(
try:
stopping_point = AutofixStoppingPoint(action.value)
except ValueError:
logger.warning(
"entrypoint.invalid_autofix_stopping_point",
logger.exception(
"seer.entrypoint.slack.invalid_stopping_point",
extra={
"entrypoint_key": SeerEntrypointKey.SLACK.value,
"stopping_point": action.value,
Expand Down Expand Up @@ -242,7 +243,7 @@ def on_autofix_update(
)
case _:
logging_ctx["event_type"] = event_type
logger.info("entrypoint.unsupported_event_type", extra=logging_ctx)
logger.info("seer.entrypoint.slack.unsupported_event_type", extra=logging_ctx)
return

# Determine whether an automation has progressed beyond the current stopping point
Expand All @@ -269,13 +270,23 @@ def on_autofix_update(
has_coding_enabled = ENABLE_SEER_CODING_DEFAULT
data_kwargs["has_progressed"] = has_coding_enabled

data = SeerAutofixUpdate(**data_kwargs)
schedule_all_thread_updates(
threads=cache_payload["threads"],
integration_id=cache_payload["integration_id"],
organization_id=cache_payload["organization_id"],
data=SeerAutofixUpdate(**data_kwargs),
data=data,
)
logger.info("seer.entrypoint.slack.autofix_update_scheduled", extra=logging_ctx)
metrics.incr(
"seer.entrypoint.slack.autofix_update_scheduled",
tags={
"event_type": str(event_type),
"current_point": str(data.current_point),
"has_progressed": str(data.has_progressed),
"thread_count": len(cache_payload["threads"]),
},
)
logger.info("entrypoint.on_autofix_update_success", extra=logging_ctx)


def send_thread_update(
Expand All @@ -299,6 +310,10 @@ def send_thread_update(
renderable = NotificationService.render_template(
data=data, template=template_cls(), provider=provider
)
metric_tags = {
"data_source": str(data.source),
"is_ephemeral": str(bool(ephemeral_user_id)),
}
try:
if ephemeral_user_id:
install.send_threaded_ephemeral_message(
Expand All @@ -313,12 +328,15 @@ def send_thread_update(
thread_ts=thread["thread_ts"],
renderable=renderable,
)
except ValueError as e:
logging_ctx["error"] = str(e)
logger.warning("entrypoint.send_thread_update.invalid_integration", extra=logging_ctx)
except ValueError:
logger.exception(
"seer.entrypoint.slack.thread_update.invalid_integration", extra=logging_ctx
)
metrics.incr("seer.entrypoint.slack.thread_update.invalid_integration", tags=metric_tags)
# No need to retry since these are configuration errors, and will just repeat
return
logger.info("entrypoint.send_thread_update.success", extra=logging_ctx)
logger.info("seer.entrypoint.slack.thread_update.sent", extra=logging_ctx)
metrics.incr("seer.entrypoint.slack.thread_update.sent", tags=metric_tags)


@instrumented_task(
Expand All @@ -342,13 +360,16 @@ def process_thread_update(

from sentry.integrations.slack.integration import SlackIntegration

logging_ctx = {
"integration_id": integration_id,
"organization_id": organization_id,
"entrypoint_key": SeerEntrypointKey.SLACK.value,
}

try:
data_dto = NotificationDataDto.from_dict(serialized_data)
except (NotificationServiceError, NoRegistrationExistsError) as e:
logger.warning(
"entrypoint.process_thread_update.deserialize_error",
extra={"error": e, "data": serialized_data},
)
except (NotificationServiceError, NoRegistrationExistsError):
logger.exception("seer.entrypoint.slack.thread_update.deserialize_error", extra=logging_ctx)
return

integration = integration_service.get_integration(
Expand All @@ -358,14 +379,7 @@ def process_thread_update(
status=ObjectStatus.ACTIVE,
)
if not integration:
logger.warning(
"entrypoint.process_thread_update.integration_not_found",
extra={
"integration_id": integration_id,
"organization_id": organization_id,
"entrypoint_key": SeerEntrypointKey.SLACK.value,
},
)
logger.error("seer.entrypoint.slack.thread_update.integration_not_found", extra=logging_ctx)
return

send_thread_update(
Expand Down Expand Up @@ -461,7 +475,7 @@ def remove_all_buttons_transformer(elem: dict[str, Any]) -> dict[str, Any] | Non
original_blocks = message_data["blocks"]
original_text = message_data["text"]
except (KeyError, TypeError):
logger.exception("entrypoint.update_message_invalid_payload", extra=logging_ctx)
logger.exception("seer.entrypoint.slack.message_update.invalid_payload", extra=logging_ctx)
return

blocks = _transform_block_actions(original_blocks, transformer)
Expand All @@ -486,7 +500,7 @@ def remove_all_buttons_transformer(elem: dict[str, Any]) -> dict[str, Any] | Non
try:
install.update_message(channel_id=channel_id, message_ts=message_ts, renderable=renderable)
except IntegrationError:
logger.exception("entrypoint.update_message_failed", extra=logging_ctx)
logger.exception("seer.entrypoint.slack.message_update.failed", extra=logging_ctx)


def handle_prepare_autofix_update(
Expand Down Expand Up @@ -521,10 +535,9 @@ def handle_prepare_autofix_update(
automation_stopping_point = (
get_automation_stopping_point(group) if is_group_triggering_automation(group) else None
)
except Exception as e:
logger.warning(
"entrypoint.get_automation_stopping_point_error",
extra={"error": str(e), **logging_ctx},
except Exception:
logger.exception(
"seer.entrypoint.slack.prepare_autofix.get_stopping_point_error", extra=logging_ctx
)
automation_stopping_point = None

Expand Down Expand Up @@ -559,15 +572,22 @@ def handle_prepare_autofix_update(
),
)
except UnableToAcquireLock:
logger.warning("entrypoint.handle_prepare_autofix_update.lock_failed", extra=logging_ctx)
logger.exception("seer.entrypoint.slack.prepare_autofix.lock_failed", extra=logging_ctx)
return

logger.info(
"entrypoint.handle_prepare_autofix_update",
"seer.entrypoint.slack.prepare_autofix.cache_populated",
extra={
"cache_key": cache_result["key"],
"cache_source": cache_result["source"],
"thread_count": len(threads),
"has_automation": bool(automation_stopping_point),
**logging_ctx,
},
)
metrics.incr(
"seer.entrypoint.slack.prepare_autofix.cache_populated",
tags={
"cache_source": cache_result["source"],
"has_automation": str(bool(automation_stopping_point)),
},
)
Loading
Loading