# Atlas DataFlow

In [3]:
# Imports: apenas stdlib + APIs públicas do Atlas DataFlow (core + Steps canônicos)
from __future__ import annotations

import os
import json
import uuid
from pathlib import Path
from datetime import datetime, timezone

from atlas_dataflow.core.pipeline.context import RunContext
from atlas_dataflow.core.pipeline.registry import StepRegistry
from atlas_dataflow.core.engine.engine import Engine

from atlas_dataflow.core.config.hashing import compute_config_hash
from atlas_dataflow.core.contract.hashing import compute_contract_hash
from atlas_dataflow.core.traceability.manifest import (
    create_manifest,
    add_event,
    step_started,
    step_finished,
    step_failed,
    save_manifest,
)

# Steps canônicos (DAG explícito)
from atlas_dataflow.steps.ingest.load import IngestLoadStep
from atlas_dataflow.steps.contract.load import ContractLoadStep
from atlas_dataflow.steps.contract.conformity_report import ContractConformityReportStep
from atlas_dataflow.steps.transform.cast_types_safe import CastTypesSafeStep
from atlas_dataflow.steps.audit.profile_baseline import AuditProfileBaselineStep
from atlas_dataflow.steps.audit.schema_types import AuditSchemaTypesStep
from atlas_dataflow.steps.audit.duplicates import AuditDuplicatesStep

# pip install -e .

## 1) Run Directory (estado explícito)

A execução do notebook deve ser **determinística** e **sem estado oculto**.

- `ATLAS_RUN_DIR` pode ser definido no ambiente para controlar o diretório da run.
- Caso não exista, este notebook cria um diretório local em `./runs/notebook_template_v1/<run_id>`.

Nada é escrito “solto” fora deste diretório.


In [5]:
# Run directory explícito (sem estado oculto)
run_id = uuid.uuid4().hex[:12]
base_run_dir = Path(os.environ.get("ATLAS_RUN_DIR", "./runs/notebook_template_v1")).expanduser()
run_dir = (base_run_dir / run_id)
run_dir.mkdir(parents=True, exist_ok=True)

run_dir


WindowsPath('runs/notebook_template_v1/6e028c9edb2a')

## 2) Configuração explícita (config)

Config é uma estrutura declarativa.  
Não contém lógica. Não contém inferência.

Este template cria um exemplo mínimo reprodutível:
- gera um CSV pequeno em `run_dir/input.csv`
- gera um contrato mínimo em `run_dir/contract.json`
- aponta `steps.ingest.load.path` e `contract.path` explicitamente

⚠️ Em uso real, substitua apenas os paths — **sem mudar o papel do notebook**.


In [6]:
# Input mínimo (exemplo) gerado com stdlib — NÃO é “transformação”, apenas fixture reprodutível do template.
input_csv = run_dir / "input.csv"
contract_path = run_dir / "contract.json"

# CSV simples (tudo como string na ingestão, como esperado em ingest.load v1)
csv_text = """age,plan,active,target
34,basic,true,0
52,premium,false,1
41,basic,true,0
"""
input_csv.write_text(csv_text, encoding="utf-8")

# Internal Contract v1 (mínimo válido para rodar os Steps do template)
contract = {
    "contract_version": "1.0",
    "problem": {"name": "template_demo", "type": "classification"},
    "target": {"name": "target", "dtype": "int", "allowed_null": False},
    "features": [
        {"name": "age", "role": "numerical", "dtype": "int", "required": True, "allowed_null": False},
        {"name": "plan", "role": "categorical", "dtype": "category", "required": True, "allowed_null": False},
        {"name": "active", "role": "boolean", "dtype": "bool", "required": True, "allowed_null": False},
    ],
    "defaults": {},
    "categories": {
        "plan": {"allowed": ["basic", "premium"], "normalization": {"type": "lower"}}
    },
    "imputation": {
        "age": {"strategy": "median", "mandatory": False},
        "plan": {"strategy": "most_frequent", "mandatory": False},
        "active": {"strategy": "most_frequent", "mandatory": False},
    },
}

contract_path.write_text(json.dumps(contract, ensure_ascii=False, indent=2), encoding="utf-8")

# Config explícita
config = {
    "engine": {"fail_fast": True},
    "contract": {"path": str(contract_path)},
    "steps": {
        "ingest.load": {"enabled": True, "path": str(input_csv)},
        # Os demais Steps são controlados por enable/disable se necessário
        "contract.load": {"enabled": True},
        "contract.conformity_report": {"enabled": True},
        "transform.cast_types_safe": {"enabled": True},
        "audit.profile_baseline": {"enabled": True},
        "audit.schema_types": {"enabled": True},
        "audit.duplicates": {"enabled": True},
    },
}

config


{'engine': {'fail_fast': True},
 'contract': {'path': 'runs\\notebook_template_v1\\6e028c9edb2a\\contract.json'},
 'steps': {'ingest.load': {'enabled': True,
   'path': 'runs\\notebook_template_v1\\6e028c9edb2a\\input.csv'},
  'contract.load': {'enabled': True},
  'contract.conformity_report': {'enabled': True},
  'transform.cast_types_safe': {'enabled': True},
  'audit.profile_baseline': {'enabled': True},
  'audit.schema_types': {'enabled': True},
  'audit.duplicates': {'enabled': True}}}

## 3) RunContext explícito

O notebook **não** mantém estado fora do `RunContext`.  
Ele apenas cria o contexto e o entrega ao Engine.


In [9]:
ctx = RunContext(
    run_id=run_id,
    created_at=datetime.now(timezone.utc),
    config=config,
    contract={},  # será injetado por contract.load
    meta={"run_dir": str(run_dir)},
)

ctx




## 4) DAG explícito (Steps + dependências visíveis)

Aqui está o DAG canônico do template v1.

- Steps são instanciados explicitamente.
- Dependências são visíveis (via `depends_on` em cada Step, quando aplicável).
- O `StepRegistry` valida estrutura (IDs únicos + ordem declarada).


In [12]:
# Instanciação explícita de Steps
steps = [
    IngestLoadStep(),
    ContractLoadStep(),
    ContractConformityReportStep(),
    CastTypesSafeStep(),
    AuditProfileBaselineStep(),
    AuditSchemaTypesStep(),
    AuditDuplicatesStep(),
]

# Registro explícito (validação estrutural)
registry = StepRegistry()
for s in steps:
    registry.add(s)

# Lista ordenada declarada (fonte de verdade do notebook)
[s.id for s in registry.list()]


['ingest.load',
 'contract.load',
 'contract.conformity_report',
 'transform.cast_types_safe',
 'audit.profile_baseline',
 'audit.schema_types',
 'audit.duplicates']

## 5) Execução explícita (Engine)

O notebook chama o Engine explicitamente.  
Nenhum Step é executado por “efeito colateral”.

Após a execução, o notebook materializa o **Manifest v1** (rastreabilidade forense),
registrando:
- `run_started`
- `step_started` / `step_finished` / `step_failed`
- `run_finished`


In [13]:
# Hashes semânticos de entrada (config + contract)
config_hash = compute_config_hash(config)
contract_hash = compute_contract_hash(contract)

# Versão do Atlas (best-effort; mantém determinismo se pacote não tiver versão instalada)
try:
    from importlib.metadata import version as _pkg_version
    atlas_version = _pkg_version("atlas-dataflow")
except Exception:
    atlas_version = "dev"

manifest = create_manifest(
    run_id=run_id,
    started_at=ctx.created_at,
    atlas_version=atlas_version,
    config_hash=config_hash,
    contract_hash=contract_hash,
)

add_event(manifest, event_type="run_started", ts=ctx.created_at, payload={"run_dir": str(run_dir)})

engine = Engine(steps=registry.list(), ctx=ctx)

# Execução
t0 = datetime.now(timezone.utc)
result = engine.run()
t1 = datetime.now(timezone.utc)

# Sincronização explícita dos resultados no Manifest (sem inferência)
for step_id, step_result in result.steps.items():
    kind = getattr(step_result, "kind", None)
    kind_value = kind.value if hasattr(kind, "value") else str(kind)
    step_started(manifest, step_id=step_id, kind=kind_value, ts=t0)
    if getattr(step_result, "status", None) is not None and getattr(step_result.status, "value", "") == "failed":
        step_failed(manifest, step_id=step_id, ts=t1, error=step_result.summary)
    else:
        step_finished(manifest, step_id=step_id, ts=t1, result=step_result)

add_event(manifest, event_type="run_finished", ts=t1, payload={"duration_ms": int((t1 - t0).total_seconds() * 1000)})

manifest_path = run_dir / "manifest.json"
save_manifest(manifest, manifest_path)

manifest_path


WindowsPath('runs/notebook_template_v1/6e028c9edb2a/manifest.json')

## 6) Resultados (visualização sem manipular dados)

O notebook pode **renderizar status** e **apontar artefatos**,
mas não deve inspecionar/alterar dados fora do core.


In [14]:
# Status por Step (somente leitura)
[(sid, sr.status.value if hasattr(sr.status, "value") else str(sr.status), sr.summary) for sid, sr in result.steps.items()]


[('contract.load', 'success', 'contract loaded and validated'),
 ('contract.conformity_report',
  'failed',
  'No tabular dataset found in RunContext artifacts (expected data.raw_rows or data.transformed_rows)'),
 ('ingest.load', 'success', 'dataset loaded'),
 ('audit.profile_baseline', 'success', 'baseline profile computed'),
 ('audit.duplicates', 'success', 'duplicates audit computed'),
 ('audit.schema_types', 'success', 'schema types audit computed'),
 ('transform.cast_types_safe', 'skipped', 'skipped due to failed dependency')]

## 7) Artefatos gerados

- `manifest.json` (rastreabilidade forense)
- Logs estruturados no `ctx.events` (observabilidade local do core)
- Artefatos internos via `ctx.get_artifact(...)` quando expostos por Steps

Este template só **aponta caminhos** — não “abre” conteúdos.


In [15]:
# Caminhos principais
{
    "run_dir": str(run_dir),
    "input_csv": str(input_csv),
    "contract": str(contract_path),
    "manifest": str(manifest_path),
}


{'run_dir': 'runs\\notebook_template_v1\\6e028c9edb2a',
 'input_csv': 'runs\\notebook_template_v1\\6e028c9edb2a\\input.csv',
 'contract': 'runs\\notebook_template_v1\\6e028c9edb2a\\contract.json',
 'manifest': 'runs\\notebook_template_v1\\6e028c9edb2a\\manifest.json'}