Skip to content

feat: admin_data_tools — gerenciamento persistente de flow schedules#1017

Merged
Winzen merged 4 commits intomainfrom
feat/admin-data-tools
Apr 16, 2026
Merged

feat: admin_data_tools — gerenciamento persistente de flow schedules#1017
Winzen merged 4 commits intomainfrom
feat/admin-data-tools

Conversation

@Winzen
Copy link
Copy Markdown
Contributor

@Winzen Winzen commented Apr 16, 2026

feat: admin_data_tools — gerenciamento persistente de flow schedules

Contexto

Este PR cria o app admin_data_tools, migra o management command disable_unhealthy_flow_schedules do app core e resolve um problema estrutural do comando original: o Prefect 0.15 re-registra flows com novo UUID e novo created, fazendo com que o campo created — usado como baseline temporal na validação — se torne não confiável.

A solução introduz o model DisabledFlowSchedule, que persiste os flows desativados no banco usando o name como identificador estável. O FlowService passa a rodar um pipeline de duas fases para garantir que flows permaneçam desativados mesmo após re-registro, e que flows reativados pelo admin voltem a ser monitorados corretamente.


Organização de pastas

backend/apps/admin_data_tools/
├── __init__.py
├── admin.py                         # DisabledFlowScheduleAdmin com integração ao Prefect
├── apps.py
├── migrations/
│   └── 0001_initial.py
├── models.py                        # DisabledFlowSchedule
└── management/
    └── commands/
        ├── disable_unhealthy_flow_schedules.py
        └── _disable_unhealthy_flow_schedules/
            ├── __init__.py
            ├── constants.py         # Queries GraphQL e constantes
            ├── datetime_utils.py    # Helpers de parsing e cálculo de datas
            ├── models.py            # Task, TaskRun, FlowRun, FlowDisable
            └── service.py           # MakeClient, FlowService e integrações externas

Diagrama visual do fluxo de execução

disable_unhealthy_flow_schedules.py (Command Django)
        │
        ▼
FlowService.disable_unhealthy_flow_schedules()
        │
        ├── Fase 1: _enforce_disabled_flows()
        │       │
        │       │  Flows rastreados no banco com is_schedule_active=False
        │       │  ou reactivated_at preenchido
        │       │
        │       ├── active_flows_by_names()  →  Prefect GraphQL
        │       │
        │       ├── _sync_flow_id()          →  atualiza UUID se re-registrado
        │       │
        │       ├── _re_disable_flow()       →  se is_schedule_active=False
        │       │
        │       └── _check_post_reactivation()  →  se reactivated_at preenchido
        │               │
        │               └── FlowDisable(valid_since=reactivated_at).validate()
        │
        └── Fase 2: _detect_and_disable_new_flows()
                │
                ├── _get_new_untracked_flows()
                │       │
                │       ├── flows_failed_last_week()  →  Prefect GraphQL
                │       └── filtra flows já no banco
                │
                ├── FlowDisable(valid_since=created).validate()
                │
                ├── _disable_and_register()
                │       ├── disable_flow_schedule()  →  Prefect (2x, workaround de bug)
                │       └── DisabledFlowSchedule.objects.create()
                │
                └── _send_disable_notification()
                        └── _split_message()  →  chunks de 2000 chars → Discord

Estrutura do código

1. Model DisabledFlowSchedule

Rastreia flows desativados no banco usando flow_name como identificador estável — o flow_id (UUID) é atualizado automaticamente quando o Prefect re-registra o flow.

class DisabledFlowSchedule(models.Model):
    flow_name          = models.CharField(max_length=255, unique=True)
    flow_id            = models.CharField(max_length=255)
    disabled_at        = models.DateTimeField(auto_now_add=True)
    is_schedule_active = models.BooleanField(default=False)
    reactivated_at     = models.DateTimeField(null=True, blank=True)

2. Django Admin — DisabledFlowScheduleAdmin

O checkbox is_schedule_active chama a mutation do Prefect diretamente ao salvar, sem necessidade de intervenção manual no Prefect. Ao reativar, grava reactivated_at para uso como baseline nas verificações futuras.

def save_model(self, request, obj, form, change):
    if change and "is_schedule_active" in form.changed_data:
        service = FlowService()
        if obj.is_schedule_active:
            obj.reactivated_at = timezone.now()
            service.set_flow_schedule(flow_id=obj.flow_id, active=True)
        else:
            obj.reactivated_at = None
            service.disable_flow_schedule(flow_id=obj.flow_id)
    super().save_model(request, obj, form, change)

Acesso controlado pelo sistema de permissões padrão do Django — is_staff=True + permissões de modelo, sem necessidade de superuser.


3. FlowDisable — domínio e regra de negócio

O campo valid_since substitui o created do Prefect como baseline temporal. Para flows novos recebe o created do Prefect; para flows reativados pelo admin recebe o reactivated_at do banco.

class FlowDisable:
    def __init__(self, id: str, created: str | datetime, service: "FlowService", name: str = ""):
        self.id = id
        self.name = name
        self.valid_since = parse_datetime(created) if isinstance(created, str) else created
        self.service = service
        self.runs = self.get_runs()

Critérios de desativação — validate()

O flow é desativado se qualquer uma das condições abaixo for atendida após valid_since:

1. Falha de task crítica

  • Última execução falhou na task run_dbt
  • A mensagem de erro não está na lista de mensagens ignoradas (STATE_MESSAGE_IGNORE)

2. Falhas consecutivas

  • As duas últimas execuções estão no estado Failed

4. FlowService — pipeline de duas fases

Fase 1 — Enforce

Percorre flows rastreados no banco (is_schedule_active=False ou reactivated_at preenchido), consulta o Prefect pelos que estão ativos e aplica a ação correta:

def _process_active_tracked_flow(self, record, current_flow_id):
    self._sync_flow_id(record, current_flow_id)

    if not record.is_schedule_active:
        self._re_disable_flow(record, current_flow_id)
    elif record.reactivated_at:
        self._check_post_reactivation(record, current_flow_id)

Fase 2 — Detect

Consulta flows com falhas na última semana, filtra os não rastreados, valida e desativa:

def _detect_and_disable_new_flows(self):
    flows = self._get_new_untracked_flows()
    flows_to_disable = [flow for flow in flows if flow.validate()]

    if not flows_to_disable:
        return

    for flow in flows_to_disable:
        self._disable_and_register(flow)

    self._send_disable_notification(flows, flows_to_disable)

Workaround de bug do Prefect

def disable_flow_schedule(self, flow_id: str) -> None:
    # Prefect 0.15 requer duas chamadas para desativar efetivamente
    for _ in range(2):
        self.set_flow_schedule(flow_id=flow_id, active=False)

Como testar

  1. Aplique as migrations:
poetry run python manage.py migrate admin_data_tools
  1. Execute o management command:
poetry run python manage.py disable_unhealthy_flow_schedules
  1. Verifique no Django Admin (/admin/admin_data_tools/disabledflowschedule/) que os flows desativados aparecem corretamente.

  2. Marque o checkbox is_schedule_active de um flow e salve — confirme que a mutation foi chamada no Prefect.

  3. Verifique que um usuário com is_staff=True e permissões admin_data_tools | disabled flow schedule | Can view/change acessa o admin sem ser superuser.

Winzen added 4 commits April 14, 2026 18:49
Create new Django app admin_data_tools and migrate the
disable_unhealthy_flow_schedules management command (and its
supporting package) from core to the new app.
Introduce DisabledFlowSchedule to persistently track disabled Prefect
flow schedules across re-registrations. The Django admin allows operators
to reactivate flows via a checkbox, which calls the Prefect mutation
directly and records reactivated_at for use in post-reactivation checks.
Refactor FlowService into two explicit phases:

Phase 1 (enforce): re-disables flows that Prefect reactivated after
re-registration, and detects new failures in flows previously reactivated
by an admin using reactivated_at as the baseline (valid_since) instead
of Prefect's unreliable created field.

Phase 2 (detect): queries only untracked failing flows, validates them,
disables and registers new ones, then sends a chunked Discord notification
to respect the 2000-character limit.

Also adds ACTIVE_FLOWS_BY_NAMES query, disable_flow_schedule workaround,
Google-style type hints and docstrings across all modules, and
requests-toolbelt as an explicit dependency.
@Winzen Winzen self-assigned this Apr 16, 2026
@Winzen Winzen added the prod Indica que o Pull Request está com a branch de destino (base) apontando para main label Apr 16, 2026
@Winzen Winzen linked an issue Apr 16, 2026 that may be closed by this pull request
7 tasks
@Winzen Winzen merged commit e2e066e into main Apr 16, 2026
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

prod Indica que o Pull Request está com a branch de destino (base) apontando para main

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: admin_data_tools — gerenciamento persistente de flow schedules

1 participant