# 1. Projeto: Associação Temporal de Paradas e OPs

Este notebook organiza o pipeline para:

1. Padronizar chaves (preencher `CD_PEDIDO` e `CD_ITEM` a partir de `CD_OP`).
2. Ordenar temporalmente os eventos por máquina, com diagnósticos de monotonicidade.
3. Associar paradas entre ajustes consecutivos.
4. Inferir OP faltante via merge temporal (`merge_asof`) com tolerância.
5. Marcar eventos próximos às trocas de turno.
6. Tratar produções sem **Ajuste** explícito quando o setup for idêntico (ferramental/cores).

---
## Citações do Cliente

> "Exatamente. Não existe uma associação direta entre evento de Parada e evento de Produção. O sistema basicamente conta chapas e registra eventos de parada."

> "Toda vez que um evento é classificado como Ajuste (cód. 1) o operador informa ao sistema qual será a próxima ordem e automaticamente o sistema encerra a ordem anterior; todas as paradas que estiverem dentro do período entre 2 eventos de Ajuste são automaticamente classificadas como pertencentes àquela OP anterior."

> "Acredito que sejam apontamentos indevidos, ou seja, eventos de paradas associados a ordens de produção onde o sistema não conseguiu associar relação, muito provavelmente porque foram classificados erroneamente. Verifique se estes eventos não estão com horários relacionados às trocas de turno (~ 05:00 ; 13:00 ; 21:00)."

## 2. Índice (Table of Contents)

1. Projeto / Citações do Cliente
2. Índice
3. Setup (imports, utilitários)
4. Dados de Entrada
5. Seleção e Transformação de Máquinas
6. Merge Base
7. Padronização de Chaves (OP → Pedido/Item)
8. Escopo e Filtros (remover código 0)
9. Preparação Temporal e Monotonicidade
10. Associação entre Ajustes (Paradas → OP anterior)
11. Inferência via Merge Temporal (asof)
12. Marcação de Troca de Turno
13. Diagnósticos Rápidos
14. Pipeline Integrado
15. Execução Final (opcional)
16. Produções Sem Ajuste Explícito (setup idêntico)
17. Auditorias e Sanity Checks
18. Persistência (salvar parquet/csv)
19. Amostras de Casos Suspeitos (perto de turno)
20. Anexo: Utilitários de Depuração

In [1]:
# 3. Setup (imports, utilitários)
import os
import numpy as np
import pandas as pd

pd.set_option("display.max_columns", None)

# Tokens que representam ausência/placeholder
MISSING_TOKENS = {"", "-1", "None", "nan"}

def is_missing_str(s: pd.Series) -> pd.Series:
    s = s.astype("string")
    return s.isna() | s.str.strip().isin(MISSING_TOKENS)

def to_datetime_safe(s: pd.Series) -> pd.Series:
    return pd.to_datetime(s, errors="coerce")

def normalize_datetimes(df: pd.DataFrame, cini: str = "DT_INICIO", cfim: str = "DT_FIM") -> pd.DataFrame:
    out = df.copy()
    out[cini] = to_datetime_safe(out[cini])
    out[cfim]  = to_datetime_safe(out[cfim])
    return out

def ensure_dir(path: str):
    os.makedirs(path, exist_ok=True)

## 4. Dados de Entrada

Carrega paradas, facas, máquinas e tarefas conforme caminhos do projeto.

In [2]:
df_paradas   = pd.read_parquet("../../../data/raw/tb_paradas.parquet")
df_facas     = pd.read_parquet("../../../data/raw/tb_facas.parquet")
df_maquinas  = pd.read_parquet("../../../data/raw/tb_maquina.parquet")
df_tarefcon  = pd.read_parquet("./../../../data/raw/tb_tarefcon.parquet")

## 5. Seleção e Transformação de Máquinas

- Filtra `CD_TIPO` em {1, 3}.
- Mantém `ID_GRUPOMAQUINA` em {8, 18, 22, 23}.
- Renomeia `QT_NRDECORES` para `QT_NRDECORES_MAQUINA`.
- Mapeia `CD_TIPO` para texto descritivo.

In [3]:
df_maquinas_fil = df_maquinas[df_maquinas["CD_TIPO"].astype("string").isin(["1", "3"])].copy()
df_maquinas_fil = df_maquinas_fil[df_maquinas_fil["ID_GRUPOMAQUINA"].astype("string").isin(["8", "18", "22", "23"])].copy()
df_maquinas_fil = df_maquinas_fil.rename(columns={"QT_NRDECORES": "QT_NRDECORES_MAQUINA"})

status_descr = {1: "C/V", 3: "Flexo"}
df_maquinas_fil["TX_TIPO_MAQUINA"] = (
    df_maquinas_fil["CD_TIPO"].astype(int).map(status_descr)
)

## 6. Merge Base

- `df_tarefcon` × `df_maquinas_fil` por `CD_MAQUINA` (inner)
- Enriquecimento com `df_paradas` pelo código `CD_PARADAOUCONV` → `df_paradas.CD_PARADA` (left)
- Reorganização de colunas e normalização de datas

In [4]:
df_merge = df_tarefcon.merge(
    df_maquinas_fil[["CD_MAQUINA", "QT_NRDECORES_MAQUINA", "ID_GRUPOMAQUINA", "TX_TIPO_MAQUINA"]],
    on="CD_MAQUINA",
    how="inner",
).merge(
    df_paradas[["CD_PARADA", "TX_DESCRICAO", "FL_EXTERNA", "FL_USADACONVERSAO"]],
    left_on="CD_PARADAOUCONV",
    right_on="CD_PARADA",
    how="left",
)
df_merge = df_merge[sorted(df_merge.columns)].copy()
df_merge = normalize_datetimes(df_merge, cini="DT_INICIO", cfim="DT_FIM")

## 7. Padronização de Chaves (OP → Pedido/Item)

Em 96% dos registros que apresentam parada, não há valores para `ID_ITEM` e `ID_PEDIDO`. Como `CD_OP` é `PEDIDO/ITEM`, recuperamos ambos **apenas onde estiver faltando**.

In [5]:
def preencher_pedido_item_de_op(
    df: pd.DataFrame,
    op_col: str = "CD_OP",
    pedido_col: str = "CD_PEDIDO",
    item_col: str = "CD_ITEM",
) -> pd.DataFrame:
    out = df.copy()
    if pedido_col not in out.columns:
        out[pedido_col] = pd.NA
    if item_col not in out.columns:
        out[item_col] = pd.NA

    op_str = out.get(op_col, pd.Series(pd.NA, index=out.index)).astype("string").str.strip()
    op_valida = (~is_missing_str(op_str)) & op_str.str.contains("/", regex=False)
    partes = op_str.where(op_valida).str.split("/", n=1, expand=True)
    if isinstance(partes, pd.DataFrame) and partes.shape[1] == 2:
        ped_from_op  = partes[0].astype("string").str.strip()
        item_from_op = partes[1].astype("string").str.strip()
        mask_ped_missing  = is_missing_str(out[pedido_col].astype("string"))
        mask_item_missing = is_missing_str(out[item_col].astype("string"))
        out.loc[op_valida & mask_ped_missing, pedido_col]  = ped_from_op[op_valida & mask_ped_missing]
        out.loc[op_valida & mask_item_missing, item_col]   = item_from_op[op_valida & mask_item_missing]
    return out

# Aplica o preenchimento de chaves
df_merge = preencher_pedido_item_de_op(df_merge)

## 8. Escopo e Filtros (remover código 0)

Remove eventos com `CD_PARADAOUCONV == 0` (ex.: esteiras).

In [6]:
mask_zero = pd.to_numeric(df_merge["CD_PARADAOUCONV"], errors="coerce").fillna(0) == 0
df_merge = df_merge[~mask_zero].copy()

## 9. Preparação Temporal e Monotonicidade

Cria diagnósticos:
- `BAD_ORDER`: `DT_FIM < DT_INICIO`
- `MONO_BREAK`: quebras de monotonicidade de `DT_INICIO` por máquina

In [7]:
def preparar_tempo_monotonico(
    df: pd.DataFrame,
    col_maquina: str = "CD_MAQUINA",
    col_inicio: str = "DT_INICIO",
    col_fim: str = "DT_FIM",
    corrigir_fim_menor_inicio: bool = False,
) -> pd.DataFrame:
    out = normalize_datetimes(df, cini=col_inicio, cfim=col_fim)
    out[col_maquina] = out[col_maquina].astype("string")

    out = out[~(out[col_inicio].isna() & out[col_fim].isna())].copy()

    out["BAD_ORDER"] = (
        out[col_fim].notna() & out[col_inicio].notna() & (out[col_fim] < out[col_inicio])
    )

    out = out.sort_values([col_maquina, col_inicio, col_fim], kind="mergesort").reset_index(drop=True)

    out["MONO_BREAK"] = (
        out.groupby(col_maquina, sort=False)[col_inicio]
        .apply(lambda s: s.diff().dt.total_seconds().fillna(0) < 0)
        .reset_index(level=0, drop=True)
        .astype("int8")
    )

    if corrigir_fim_menor_inicio:
        mask_fix = out["BAD_ORDER"]
        out.loc[mask_fix, col_fim] = out.loc[mask_fix, col_inicio]
        out["BAD_ORDER"] = (
            out[col_fim].notna() & out[col_inicio].notna() & (out[col_fim] < out[col_inicio])
        )
    return out

# Aplica ordenação e diagnósticos
df_merge = preparar_tempo_monotonico(df_merge, corrigir_fim_menor_inicio=False)

## 10. Associação entre Ajustes (Paradas → OP anterior)

Regra: paradas entre dois ajustes pertencem à **OP anterior**. O ajuste é `CD_PARADAOUCONV == 1`.

In [8]:
def associar_paradas_entre_ajustes(
    df: pd.DataFrame,
    col_maquina: str = "CD_MAQUINA",
    col_evento: str = "CD_PARADAOUCONV",
    col_op: str = "CD_OP",
    ajuste_code: int = 1,
) -> pd.DataFrame:
    out = df.copy()
    out[col_maquina] = out[col_maquina].astype("string")

    op_atual_por_maq = {}
    for idx, row in out.iterrows():
        maq = row[col_maquina]
        cod = pd.to_numeric(row[col_evento], errors="coerce")
        row_op = row[col_op]

        # Ajuste define a nova OP
        if cod == ajuste_code and not is_missing_str(pd.Series([row_op])).iloc[0]:
            op_atual_por_maq[maq] = row_op
            continue

        # Parada (>=1) sem OP herda a OP atual da máquina
        if (cod >= 1) and is_missing_str(pd.Series([row_op])).iloc[0]:
            if maq in op_atual_por_maq:
                out.at[idx, col_op] = op_atual_por_maq[maq]

    return out

# Aplica associação entre ajustes
df_merge = df_merge.sort_values(["CD_MAQUINA", "DT_INICIO"], kind="mergesort").reset_index(drop=True)
df_merge = associar_paradas_entre_ajustes(df_merge)
df_merge = preencher_pedido_item_de_op(df_merge)

## 11. Inferência via Merge Temporal (asof)

Regra: paradas sem `CD_OP` → casar `DT_FIM` da parada com próximo `DT_INICIO` com `CD_OP` **na mesma máquina** dentro de tolerância (ex.: 30 min).

Para evitar `ValueError: left keys must be sorted`, o merge é feito **por máquina**, ordenando as chaves em cada grupo.

In [9]:
def inferir_op_asof(df: pd.DataFrame, tolerancia_min: int = 30) -> pd.DataFrame:
    out = normalize_datetimes(df)
    out["__IDX__"] = np.arange(len(out))

    cod = pd.to_numeric(out["CD_PARADAOUCONV"], errors="coerce").fillna(0)
    mask_parada = cod >= 1
    mask_sem_op = is_missing_str(out["CD_OP"])

    left_all  = out.loc[mask_parada & mask_sem_op & out["DT_FIM"].notna(), ["__IDX__", "CD_MAQUINA", "DT_FIM"]].copy()
    left_all  = left_all.rename(columns={"DT_FIM": "KEY_TIME"})
    right_all = out.loc[(~mask_sem_op) & out["DT_INICIO"].notna(), ["CD_MAQUINA", "DT_INICIO", "CD_OP"]].copy()

    # tipagem consistente
    left_all["CD_MAQUINA"]  = left_all["CD_MAQUINA"].astype("string")
    right_all["CD_MAQUINA"] = right_all["CD_MAQUINA"].astype("string")

    updates = []
    tol = pd.Timedelta(minutes=tolerancia_min)

    for maq, left in left_all.groupby("CD_MAQUINA", sort=False):
        right = right_all[right_all["CD_MAQUINA"] == maq]
        if left.empty or right.empty:
            continue

        left  = left.sort_values(["KEY_TIME"], kind="mergesort").reset_index(drop=True)
        right = right.sort_values(["DT_INICIO"], kind="mergesort").reset_index(drop=True)

        matched = pd.merge_asof(
            left,
            right,
            left_on="KEY_TIME",
            right_on="DT_INICIO",
            direction="forward",
            tolerance=tol
        )

        ok = matched[matched["CD_OP"].notna()]
        if not ok.empty:
            updates.extend(ok[["__IDX__", "CD_OP"]].itertuples(index=False, name=None))

    for idx, op in updates:
        out.loc[out["__IDX__"] == idx, "CD_OP"] = op

    out = out.drop(columns=["__IDX__"])
    out = preencher_pedido_item_de_op(out)
    return out

# Aplica merge temporal
df_merge = inferir_op_asof(df_merge, tolerancia_min=30)

## 12. Marcação de Troca de Turno (05:00, 13:00, 21:00)

Citação do cliente: verificar eventos próximos às trocas de turno. Cria colunas:
- `NEAR_SHIFT_INICIO`
- `NEAR_SHIFT_FIM`

In [10]:
def marcar_troca_turno(df: pd.DataFrame, janela_min: int = 30) -> pd.DataFrame:
    out = df.copy()
    out["DT_INICIO"] = to_datetime_safe(out["DT_INICIO"])
    out["DT_FIM"]    = to_datetime_safe(out["DT_FIM"])

    def near_shift(series: pd.Series) -> pd.Series:
        series = to_datetime_safe(series)
        mins = series.dt.hour * 60 + series.dt.minute
        anchors = [5*60, 13*60, 21*60]
        deltas = [
            (mins - a).abs().where((mins - a).abs() <= 720, 1440 - (mins - a).abs())
            for a in anchors
        ]
        dmin = pd.concat(deltas, axis=1).min(axis=1)
        return (dmin <= janela_min).astype("int8")

    out["NEAR_SHIFT_INICIO"] = near_shift(out["DT_INICIO"])
    out["NEAR_SHIFT_FIM"]    = near_shift(out["DT_FIM"])
    return out

# Marca trocas de turno
df_merge = marcar_troca_turno(df_merge, janela_min=30)

## 13. Diagnósticos Rápidos

- Amostra de paradas (>=1) mostrando OP e proximidade de turno.
- Estatísticas básicas de flags temporais.

In [11]:
amostra_paradas = df_merge[pd.to_numeric(df_merge["CD_PARADAOUCONV"], errors="coerce").fillna(0) >= 1][
    ["CD_MAQUINA", "DT_INICIO", "DT_FIM", "CD_PARADAOUCONV", "CD_OP", "CD_PEDIDO", "CD_ITEM", "NEAR_SHIFT_INICIO", "NEAR_SHIFT_FIM"]
].sample(20, random_state=42) if df_merge.shape[0] >= 20 else df_merge

stats_flags = {
    "total_linhas": int(df_merge.shape[0]),
    "bad_order": int(df_merge.get("BAD_ORDER", pd.Series([False]*len(df_merge))).sum()),
    "mono_break": int(df_merge.get("MONO_BREAK", pd.Series([0]*len(df_merge))).sum()),
    "near_shift_inicio": int(df_merge.get("NEAR_SHIFT_INICIO", pd.Series([0]*len(df_merge))).sum()),
    "near_shift_fim": int(df_merge.get("NEAR_SHIFT_FIM", pd.Series([0]*len(df_merge))).sum()),
}

## 14. Pipeline Integrado

Executa todas as etapas na ordem recomendada.

In [12]:
def pipeline_paradas_ops(df: pd.DataFrame, tolerancia_min: int = 30, janela_turno_min: int = 30) -> pd.DataFrame:
    out = df.copy()
    out = normalize_datetimes(out)

    # 1) OP → Pedido/Item (precoce, só preenche vazios)
    out = preencher_pedido_item_de_op(out)

    # 2) Remove CD_PARADAOUCONV == 0
    mask_zero_local = pd.to_numeric(out["CD_PARADAOUCONV"], errors="coerce").fillna(0) == 0
    out = out[~mask_zero_local].copy()

    # 3) Ordenação + diagnósticos
    out = preparar_tempo_monotonico(out, corrigir_fim_menor_inicio=False)

    # 4) Paradas entre ajustes
    out = out.sort_values(["CD_MAQUINA", "DT_INICIO"], kind="mergesort").reset_index(drop=True)
    out = associar_paradas_entre_ajustes(out)
    out = preencher_pedido_item_de_op(out)

    # 5) Merge temporal (asof) por máquina
    out = inferir_op_asof(out, tolerancia_min=tolerancia_min)

    # 6) Marca proximidade de troca de turno
    out = marcar_troca_turno(out, janela_min=janela_turno_min)

    return out

## 15. Execução Final (opcional)

Descomente para executar tudo de ponta a ponta, caso ainda não tenha rodado as etapas acima.

In [13]:
# df_merge = pipeline_paradas_ops(df_merge, tolerancia_min=30, janela_turno_min=30)
# df_merge.head()

## 16. Produções Sem Ajuste Explícito (setup idêntico)

Quando duas OPs consecutivas **não** possuem um Ajuste explícito (cód. 1) entre elas, **mas** o setup é idêntico, não devemos penalizar a ausência de ajuste.

**Critério de setup idêntico (ajustável):**
- Mesmo `CD_FACA` **e**
- Mesma `QT_NRDECORES_MAQUINA` (ou coluna equivalente de cores)

Resultado:
- Cria `SETUP_SAME` por linha (comparando com linha anterior da mesma máquina)
- Cria `AUSENCIA_AJUSTE_PERMITIDA` quando há troca de OP sem evento 1 entre os blocos, mas `SETUP_SAME == 1`

In [14]:
def marcar_setup_identico(
    df: pd.DataFrame,
    col_maquina: str = "CD_MAQUINA",
    col_op: str = "CD_OP",
    col_faca: str = "CD_FACA",
    col_cores: str = "QT_NRDECORES_MAQUINA",
) -> pd.DataFrame:
    out = df.copy()
    out[[col_faca, col_cores, col_maquina, col_op]] = out[[col_faca, col_cores, col_maquina, col_op]].astype("string")
    out = out.sort_values([col_maquina, "DT_INICIO"], kind="mergesort").reset_index(drop=True)
    same_faca  = out.groupby(col_maquina)[col_faca].apply(lambda s: s == s.shift(1)).reset_index(level=0, drop=True)
    same_cores = out.groupby(col_maquina)[col_cores].apply(lambda s: s == s.shift(1)).reset_index(level=0, drop=True)
    out["SETUP_SAME"] = (same_faca & same_cores).astype("int8")
    return out

def permitir_ausencia_ajuste_sem_penalizar(
    df: pd.DataFrame,
    col_maquina: str = "CD_MAQUINA",
    col_evento: str = "CD_PARADAOUCONV",
    col_op: str = "CD_OP",
    ajuste_code: int = 1,
) -> pd.DataFrame:
    out = df.copy()
    out[[col_maquina, col_op]] = out[[col_maquina, col_op]].astype("string")
    out = out.sort_values([col_maquina, "DT_INICIO"], kind="mergesort").reset_index(drop=True)

    # Marca se há um Ajuste explícito nesta linha
    cod = pd.to_numeric(out[col_evento], errors="coerce").fillna(0)
    out["IS_AJUSTE"] = (cod == ajuste_code).astype("int8")

    # Transições de OP por máquina
    prev_op = out.groupby(col_maquina)[col_op].shift(1)
    out["OP_CHANGED"] = (out[col_op] != prev_op).astype("int8")

    # Houve algum ajuste entre a OP anterior e esta?
    # Para cada máquina, olhamos se entre um índice de mudança e o anterior ocorreu IS_AJUSTE==1
    out["AUSENCIA_AJUSTE_PERMITIDA"] = 0
    for maq, g in out.groupby(col_maquina, sort=False):
        idxs = g.index.tolist()
        for i in range(1, len(idxs)):
            cur_idx = idxs[i]
            prev_idx = idxs[i-1]
            # Se trocou OP nesta linha
            if out.at[cur_idx, "OP_CHANGED"] == 1:
                # Verifica existência de ajuste explícito entre prev_idx (exclusivo) e cur_idx (exclusivo)
                janela = out.loc[prev_idx+1:cur_idx-1]
                houve_ajuste = int(janela["IS_AJUSTE"].sum()) > 0 if not janela.empty else False
                # Se não houve ajuste, mas setup idêntico, então permitimos ausência
                if not houve_ajuste and out.at[cur_idx, "SETUP_SAME"] == 1:
                    out.at[cur_idx, "AUSENCIA_AJUSTE_PERMITIDA"] = 1
    return out

# Aplica marcações
df_merge = marcar_setup_identico(df_merge, col_faca="CD_FACA", col_cores="QT_NRDECORES_MAQUINA")
df_merge = permitir_ausencia_ajuste_sem_penalizar(df_merge)

## 17. Auditorias e Sanity Checks

Funções utilitárias para checar condições típicas de erro ou inconsistência.

In [15]:
def audit_asof_sort(df: pd.DataFrame) -> dict:
    report = {"left_sorted": True, "right_sorted": True, "by_group": []}
    tmp = normalize_datetimes(df)
    cod = pd.to_numeric(tmp["CD_PARADAOUCONV"], errors="coerce").fillna(0)
    mask_parada = cod >= 1
    mask_sem_op = is_missing_str(tmp["CD_OP"])    
    left = tmp.loc[mask_parada & mask_sem_op & tmp["DT_FIM"].notna(), ["CD_MAQUINA", "DT_FIM"]].copy()
    left.rename(columns={"DT_FIM": "KEY_TIME"}, inplace=True)
    right = tmp.loc[(~mask_sem_op) & tmp["DT_INICIO"].notna(), ["CD_MAQUINA", "DT_INICIO", "CD_OP"]].copy()
    left["CD_MAQUINA"] = left["CD_MAQUINA"].astype("string")
    right["CD_MAQUINA"] = right["CD_MAQUINA"].astype("string")

    for maq, gleft in left.groupby("CD_MAQUINA", sort=False):
        gright = right[right["CD_MAQUINA"] == maq]
        ok_left  = gleft.sort_values(["KEY_TIME"], kind="mergesort").index.equals(gleft.sort_index().sort_values().index)
        ok_right = gright.sort_values(["DT_INICIO"], kind="mergesort").index.equals(gright.sort_index().sort_values().index)
        report["by_group"].append({"CD_MAQUINA": str(maq), "left_keys_sorted": bool(ok_left), "right_keys_sorted": bool(ok_right)})
        report["left_sorted"]  &= ok_left
        report["right_sorted"] &= ok_right
    return report

def audit_missing_ops(df: pd.DataFrame) -> dict:
    s = df["CD_OP"].astype("string")
    return {
        "total": int(df.shape[0]),
        "faltando_op": int(is_missing_str(s).sum())
    }

# Exemplo de uso (opcional):
# asof_sort_report = audit_asof_sort(df_merge)
# missing_ops_report = audit_missing_ops(df_merge)

## 18. Persistência (salvar parquet/csv)

Salva resultado final processado.

In [16]:
OUTPUT_DIR = "../../../data/processed"
ensure_dir(OUTPUT_DIR)
out_parquet = os.path.join(OUTPUT_DIR, "df_paradas_ops.parquet")
out_csv     = os.path.join(OUTPUT_DIR, "df_paradas_ops.csv")

# Descomente para salvar
# df_merge.to_parquet(out_parquet, index=False)
# df_merge.to_csv(out_csv, index=False, sep=';')

## 19. Amostras de Casos Suspeitos (perto de turno)

Exibe linhas de paradas sem OP original que foram inferidas e ocorrem **perto de trocas de turno**.

In [17]:
def amostras_perto_de_turno(df: pd.DataFrame, n: int = 20) -> pd.DataFrame:
    # Paradas (>=1) com OP válida agora e perto de turno
    cod = pd.to_numeric(df["CD_PARADAOUCONV"], errors="coerce").fillna(0)
    mask_parada = cod >= 1
    mask_op_val = ~is_missing_str(df["CD_OP"]) & df["CD_OP"].astype("string").str.contains("/")
    mask_turno  = (df["NEAR_SHIFT_INICIO"] == 1) | (df["NEAR_SHIFT_FIM"] == 1)
    sample = df.loc[mask_parada & mask_op_val & mask_turno, [
        "CD_MAQUINA", "DT_INICIO", "DT_FIM", "CD_PARADAOUCONV", "CD_OP", "CD_PEDIDO", "CD_ITEM",
        "NEAR_SHIFT_INICIO", "NEAR_SHIFT_FIM"
    ]]
    if sample.shape[0] > n:
        return sample.sample(n, random_state=42)
    return sample

# exemplos (opcional):
# amostras_turno = amostras_perto_de_turno(df_merge, n=20)
# amostras_turno.head()

## 20. Anexo: Utilitários de Depuração

Funções auxiliares para inspeção rápida de encadeamentos em uma máquina específica.

In [18]:
def trilha_maquina(df: pd.DataFrame, maquina: str, n: int = 50) -> pd.DataFrame:
    g = df[df["CD_MAQUINA"].astype("string") == str(maquina)].copy()
    g = g.sort_values(["DT_INICIO", "DT_FIM"], kind="mergesort").reset_index(drop=True)
    cols = [
        "CD_MAQUINA", "DT_INICIO", "DT_FIM", "CD_PARADAOUCONV", "IS_AJUSTE", "SETUP_SAME",
        "AUSENCIA_AJUSTE_PERMITIDA", "CD_OP", "CD_PEDIDO", "CD_ITEM",
        "NEAR_SHIFT_INICIO", "NEAR_SHIFT_FIM", "BAD_ORDER", "MONO_BREAK"
    ]
    cols = [c for c in cols if c in g.columns]
    if g.shape[0] > n:
        return g[cols].head(n)
    return g[cols]

# exemplo (opcional):
# trilha_maquina(df_merge, maquina="DRO3", n=100)