<a href="https://colab.research.google.com/github/JDM-1609/Statistical-Process-Control-in-Injection-Machines/blob/main/SPC_Analysis_and_Limit_OptimizerV2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## VISUALIZACIÓN DE DATOS Y GRÁFICAS DE CONTROL MEDIANTE SPC

El siguiente programa recibe los archivos generados apartir del sistema ALS, separando el DataFrame en diferentes secciones especificadas (Info. Contextual, estadísticos y datos crudos) creando un DataFrame para cada una, permitiendo extraer datos de interés para llevar seguimietos de la optimización de los límites de control de los planes de control de las inyectoras en la empresa SIMEX. Con lo anterior, se logra contruir las gráficas de control (I, X̄ y S) para la identificación de mediciones fuera de especificaciones.

In [None]:
# Carga de datos
from google.colab import files

uploaded = files.upload()
RUTA_IN = next(iter(uploaded.keys()))
print("Archivo cargado:", RUTA_IN)

In [None]:
import csv
from pathlib import Path
from io import StringIO
from ipywidgets import IntSlider, interact, Dropdown
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages

# OPCIONAL: para visualización de los datos en el DF

pd.set_option("display.max_columns", 20) # Controla cuántas columnas se muestran sin omisiones
pd.set_option("display.max_row", None)
pd.set_option("display.float_format", lambda v: f"{v:.5g}") # Muestra valores con hasta 4 cifras significativas
ENCODING_ALS = "CP1250" # Encoding de los archivos del ALS

In [None]:
# Inicialización de los índices start & end de cada sección
                       # SECCIONES
idx_info = 0           # Información contextual
idx_stats_start = 2    # Start Estadísticos
idx_stats_end = 21     # End Estadísticos
idx_raw_start = 23     # Datos crudos

EJECUCIÓN DEL VISUALIZADOR

In [None]:
# EJECUCIÓN

# Leer todas las líneas del archivo
lines = leer_lineas(RUTA_IN, encoding=ENCODING_ALS)
print(f"NÚMERO TOTAL DE LÍNEAS EN EL ARCHIVO: {len(lines)}\n")

# Información contextual
df_info = parse_info_contextual(lines, idx_info=idx_info)
print("INFORMACIÓN CONTEXTUAL:")
display(df_info)

# Estadísticos
df_stats = parse_estadisticos(lines, idx_start=idx_stats_start, idx_end=idx_stats_end)
#print("\nTABLA ESTADÍSTICOS PRINCIPALES (Vista rápida):")
#display(df_stats.head())

# Datos crudos
df_raw = parse_datos_crudos(lines, idx_start=idx_raw_start)
#print("\nDATOS CRUDOS MEDIDOS POR EL ALS (Vista rápida):")
#display(df_raw.head())


In [None]:
# EJECUCIÓN
df_resumen = estadisticos_resumen(df_stats)
df_resumen

In [None]:
# EJECUCIÓN
df_limpio, df_xbar, df_s, xbar_prom, s_prom = procesar_datos_spc(df_raw)


In [None]:
# EJECUCIÓN: para graficar la carta I de la variable seleccionada en el desplegable
selector_carta_i(df_limpio, df_stats, window=20)


In [None]:
# EJECUCIÓN: para graficar la carta X de la variable seleccionada en el desplegable
selector_carta_x(df_xbar, xbar_prom, df_stats, window=30)


In [None]:
# EJECUCIÓN: para graficar la carta S de la variable seleccionada en el desplegable
selector_carta_s(df_s, s_prom, df_stats, window=30)


In [None]:
# EJECUCIÓN: para graficar la carta I de cada variable independiente
graficar_carta_i(df_limpio, df_stats, "t4012 [s]")
graficar_carta_i(df_limpio, df_stats, "t4018 [s]")
graficar_carta_i(df_limpio, df_stats, "t4015 [s]")
graficar_carta_i(df_limpio, df_stats, "V4062 [cmł]")
graficar_carta_i(df_limpio, df_stats, "p4072 [bar]")
graficar_carta_i(df_limpio, df_stats, "V4065 [cmł]")



In [None]:
# EJECUCIÓN: para graficar la carta X de cada variable independiente
graficar_carta_x(df_xbar, xbar_prom, df_stats, "t4012 [s]")
graficar_carta_x(df_xbar, xbar_prom, df_stats, "t4018 [s]")
graficar_carta_x(df_xbar, xbar_prom, df_stats, "t4015 [s]")
graficar_carta_x(df_xbar, xbar_prom, df_stats, "V4062 [cmł]")
graficar_carta_x(df_xbar, xbar_prom, df_stats, "p4072 [bar]")
graficar_carta_x(df_xbar, xbar_prom, df_stats, "V4065 [cmł]")

In [None]:
# EJECUCIÓN: para graficar la carta S de cada variable independiente
graficar_carta_s(df_s, s_prom, df_stats, "t4012 [s]")
graficar_carta_s(df_s, s_prom, df_stats, "t4012 [s]")
graficar_carta_s(df_s, s_prom, df_stats, "t4018 [s]")
graficar_carta_s(df_s, s_prom, df_stats, "t4015 [s]")
graficar_carta_s(df_s, s_prom, df_stats, "V4062 [cmł]")
graficar_carta_s(df_s, s_prom, df_stats, "p4072 [bar]")
graficar_carta_s(df_s, s_prom, df_stats, "V4065 [cmł]")


FUNCIONES DEL VISUALIZADOR

In [None]:
# FUNCIONES: División de las secciones en un DF diferente cada uno.

# Lee el CSV como strings y devuelve una lista donde cada elemento es una línea del CSV completo
def leer_lineas(ruta_csv: str, encoding: str = ENCODING_ALS):
    raw = Path(ruta_csv).read_text(encoding=encoding, errors="replace")
    lines = raw.splitlines()  # Este método siempre devuelve una lista
    return lines

# Devuelve la sección "Información Contextual" como un DF
def parse_info_contextual(lines, idx_info: int = 0) -> pd.DataFrame:
    line = lines[idx_info]  # Se toma únicamente la primera línea
    cells = next(csv.reader([line], delimiter=",", quotechar='"'))

    # Limpieza eliminando vacíos y el elemento "Valores reales"
    cleaned = []
    for c in cells:
        c = c.strip().strip('"')
        if not c:  # Si está vacío -> se omite
            continue
        if c.lower() == "valores reales":
            continue
        cleaned.append(c)

    claves  = cleaned[0::2]
    valores = cleaned[1::2]

    df_info = pd.DataFrame([valores], columns=claves)
    return df_info

# Devuelve la sección "Estadísticos" como un DF
def parse_estadisticos(lines, idx_start: int, idx_end: int) -> pd.DataFrame:

    # Se unen las líneas de la sección en un solo string tipo CSV
    block = "\n".join(lines[idx_start:idx_end])

    # Leemos este bloque como si fuera un CSV independiente
    df_stats = pd.read_csv(
        StringIO(block),
        sep=",",
        decimal=",",
        thousands=".",
        quotechar='"',
        encoding=ENCODING_ALS,
        engine="python",
        )
    # Los índices del df son los nombres de los estadísticos
    df_stats.index.name = "Estadísticos"

    # Ajuste de los encabezados de las columnas
    cols = list(df_stats.columns)
    if str(cols[0]).strip() == "":
      df_stats.columns = cols[1:] + cols[:1] # Se desplazan las columnas a la izq
      df_stats = df_stats.dropna(axis=1, how="all") # Elimina la columna vacía

    return df_stats

# Devuelve la sección "Datos Crudos" como un DF
def parse_datos_crudos(lines, idx_start: int) -> pd.DataFrame:

    # Se unen las líneas desde idx_start hasta final de los datos
    block = "\n".join(lines[idx_start:])

    df_raw = pd.read_csv(
        StringIO(block),
        sep=",",
        decimal=",",
        thousands=".",
        quotechar='"',
        encoding=ENCODING_ALS,
        engine="python",
    )
    df_raw.columns = [str(c).strip() for c in df_raw.columns]
    df_raw = df_raw.reset_index()

    cols = list(df_raw.columns)
    df_raw.columns = cols[1:] + cols[:1] # Se desplazan las columnas a la izq

    # Eliminar columnas completamente vacías (NaN en todas las filas)
    df_raw = df_raw.dropna(axis=1, how="all")

    return df_raw

"------------------------------------------------------------------------------"

# FUNCIONES: Extracción de datos estadísticos necesarios para los seguimientos

# Limitar a 3 cifras significativas
def sig(x, n=5):
    try:
        return float(f"{x:.{n}g}")
    except:
        return x

# Construcción de la tabla final para seguimientos
def estadisticos_resumen(df_stats):

    variables = df_stats.columns.tolist()
    filas = []

    for var in variables:
        try:
            # Extraer valores de df_stats
            valor_nominal = df_stats.loc["Valor nominal", var]
            xqq           = df_stats.loc["xqq", var]
            sigma         = df_stats.loc["Sigma", var]
            cp            = df_stats.loc["Cp", var]
            cpd           = df_stats.loc["Cpd", var]

            # Calcular Rango
            rango = df_stats.loc["Tolerancia superior", var] - valor_nominal

            # Calcular Desviación
            desviacion = ((cp - cpd) / (cp))*100 if cp != 0 else np.nan

            # Convertir a 3 cifras significativas cada valor de interés
            fila = [
                var,
                sig(rango),
                sig(valor_nominal),
                sig(xqq),
                sig(sigma),
                sig(cp),
                sig(cpd),
                sig(desviacion)
            ]
        # Si se encuentran valores faltantes devuelve toda la fila como "NaN"
        except KeyError:
            fila = [var] + [np.nan]*7

        filas.append(fila)

    # Se crea el DF final
    df_resumen = pd.DataFrame(
        filas,
        columns=[
            "Variables", "Rango", "Valor nominal", "Media",
            "Sigma_W", "Cp", "Cpk", "Desviación"
        ]
    )

    return df_resumen

"------------------------------------------------------------------------------"

# FUNCIONES: Clasificación de datos por subgrupos y cálculos SPC
# De datos crudos se clasifica para n=5 mediciones por subgrupo y se realizan cálculos SPC
def procesar_datos_spc(df_raw):

    df = df_raw.copy()

    # Se separa el subgrupo con la medición en dos columnas aparte
    df[["subgrupo", "pos"]] = df["Muestra aleatoria"].str.split("/", expand=True)
    df["subgrupo"] = df["subgrupo"].astype(int)
    df["pos"] = df["pos"].astype(int)

    # Toma solo las columnas con valores númericos (Medición de cada variable de control)
    columnas_no_variables = ["Muestra aleatoria", "Momento", "Cantidad piezas", "subgrupo", "pos"]
    variables = [c for c in df.columns if c not in columnas_no_variables]

    # Calcular X̄ y S por subgrupo
    df_xbar = df.groupby("subgrupo")[variables].mean()   # Media por subgrupo
    df_s = df.groupby("subgrupo")[variables].std(ddof=1) # Des Std por subgrupo

    xbar_prom_global = df_xbar.mean() # Cálculo de la media de todas las medias
    s_prom_global = df_s.mean()       # Cálculo de la media de las desv std

    return df, df_xbar, df_s, xbar_prom_global, s_prom_global

"------------------------------------------------------------------------------"

# GRÁFICAS

# FUNCIONES: Generación de la carta I por variable controlada
# Grafica con slider, identificación de puntos fuera de tolerancia
# Dropdown para seleccionar la variable de interés
def graficar_carta_i(df_ind, df_stats, variable, window=20, titulo=None):
    # Datos base
    vals_ind = df_ind[variable].astype(float).to_numpy()     # valores individuales
    n_obs = len(vals_ind)
    muestras = np.arange(1, n_obs + 1)                       # índice de muestra 1..N

    # Valor nominal desde df_stats
    valor_nominal = float(df_stats.loc["Valor nominal", variable])

    # Límites de especificación
    USL = float(df_stats.loc["Tolerancia superior", variable])
    LSL = float(df_stats.loc["Tolerancia inferior", variable])

    # Límites de control de individuales
    UCLi = float(df_stats.loc["LISx", variable])
    LCLi = float(df_stats.loc["LIIx", variable])

    # Escala eje Y automática
    all_vals = np.concatenate([vals_ind, [USL, LSL, UCLi, LCLi, valor_nominal]])
    y_min = all_vals.min()
    y_max = all_vals.max()
    margen = 0.05 * (y_max - y_min if y_max > y_min else 1.0)
    y_min -= margen
    y_max += margen

    # Ventana deslizante
    if window > n_obs:
        window = n_obs

    if titulo is None:
        titulo = (f"Carta de Individuales - {variable}   "
                  f"VN={valor_nominal:.3f}   USL={USL:.3f}   LSL={LSL:.3f}")

    def plot_window(start_idx: int):
        end_idx = min(start_idx + window, n_obs)
        x_win = muestras[start_idx:end_idx]
        y_win = vals_ind[start_idx:end_idx]

        plt.figure(figsize=(12, 5))
        ax = plt.gca()

        # Zonas fuera de especificación (relleno rojo translúcido)
        ax.axhspan(USL, y_max, facecolor="red", alpha=0.08)
        ax.axhspan(y_min, LSL, facecolor="red", alpha=0.08)

        # Serie de individuales
        ax.plot(
            x_win, y_win,
            marker="o",
            linestyle="-",
            color="blue",
            label="Medición"
        )

        # Puntos fuera de especificación
        mask_out = (y_win > USL) | (y_win < LSL)
        ax.scatter(
            x_win[mask_out],
            y_win[mask_out],
            s=50,
            color="red",
            edgecolors="black",
            zorder=5,
            label="Fuera de especificación" if mask_out.any() else None
        )

          # Valor nominal (VERDE)
        ax.axhline(
            y=valor_nominal,
            linestyle="-",
            linewidth=1.2,
            color="green",
            label="Valor nominal"
        )
         # Límites de especificación (rojo continuo)
        ax.axhline(
            y=USL,
            linestyle="-",
            linewidth=1.2,
            color="red",
            label="USL"
        )
        ax.axhline(
            y=LSL,
            linestyle="-",
            linewidth=1.2,
            color="red",
            label="LSL"
        )

        # Límites de control de individuales (naranja punteado)
        ax.axhline(
            y=UCLi,
            linestyle="--",
            linewidth=0.8,
            color="orange",
            label="UCL"
        )
        ax.axhline(
            y=LCLi,
            linestyle="--",
            linewidth=0.8,
            color="orange",
            label="LCL"
        )

        # Ejes y título
        ax.set_xlabel("Número de muestra")
        ax.set_ylabel(variable)
        ax.set_title(titulo)

        # Eje X acotado a la ventana actual
        ax.set_xlim(x_win.min() - 0.5, x_win.max() + 0.5)
        ax.set_xticks(x_win)

        # Escala vertical
        ax.set_ylim(y_min, y_max)

        # Grid
        ax.grid(True, linestyle="--", alpha=0.4)

        # Marco exterior negro
        for spine in ax.spines.values():
            spine.set_edgecolor("black")
            spine.set_linewidth(1.2)

        # Leyenda fuera de la gráfica
        ax.legend(
            loc="upper center",
            bbox_to_anchor=(0.5, -0.20),
            ncol=6,
            frameon=False,
            fontsize=9
        )

        plt.tight_layout()
        plt.show()

    # Slider sobre las muestras individuales
    max_start = max(n_obs - window, 0)
    slider = IntSlider(
        value=0,
        min=0,
        max=max_start,
        step=1,
        description="Inicio:",
        continuous_update=False,
        )

    interact(plot_window, start_idx=slider)
def selector_carta_i(df_ind, df_stats, window=200):

    variables = [c for c in df_ind.columns
                 if c not in ["Muestra aleatoria", "Momento", "Cantidad piezas", "subgrupo", "pos"]]

    def _plot(variable):
        graficar_carta_i(df_ind, df_stats, variable, window=window)

    interact(_plot, variable=Dropdown(options=variables, description="Variable:"))

"------------------------------------------------------------------------------"

# FUNCIONES: Generación de la carta X̄ por variable controlada

def graficar_carta_x(df_xbar, xbar_prom, df_stats, variable, window=30, titulo=None):

    # Datos base
    xbar_subgrupos = df_xbar[variable]                      # Serie con X̄ por subgrupo
    subgrupos = xbar_subgrupos.index.astype(int).to_numpy() # Índices de subgrupos como enteros

    xbar_global = float(xbar_prom[variable])

    # Límites de especificación (tolerancias)
    USLx = float(df_stats.loc["Tolerancia superior", variable])
    LSLx = float(df_stats.loc["Tolerancia inferior", variable])

    # Límites de control
    UCLx = float(df_stats.loc["LISx", variable])
    LCLx = float(df_stats.loc["LIIx", variable])

    # Escala eje Y automática
    all_vals = np.concatenate([
        xbar_subgrupos.values,
        [USLx, LSLx, UCLx, LCLx, xbar_global]
    ])
    y_min = all_vals.min()
    y_max = all_vals.max()
    margen = 0.05 * (y_max - y_min if y_max > y_min else 1.0)
    y_min -= margen
    y_max += margen

    # Ventana deslizante)
    n = len(subgrupos)
    if window > n:
        window = n

    if titulo is None:
        titulo = (f"Carta X̄ - {variable}   "
                  f"SW={xbar_global:.3f}   "
                  f"USL={USLx:.3f}   LSL={LSLx:.3f}")

# FUNCIÓN: Configuración gráfica
    def plot_window(start_idx: int):
        end_idx = min(start_idx + window, n)
        sg = subgrupos[start_idx:end_idx]
        vals = xbar_subgrupos.iloc[start_idx:end_idx]

        plt.figure(figsize=(12, 5))
        ax = plt.gca()

        # Zonas fuera de especificación (En rojo translúcido)
        ax.axhspan(USLx, y_max, facecolor="red", alpha=0.08)
        ax.axhspan(y_min, LSLx, facecolor="red", alpha=0.08)

        # Serie X̄
        ax.plot(
            sg, vals,
            marker="o",
            linestyle="-",
            color="blue",
            label="X̄ por subgrupo"
        )

        # Puntos fuera de especificación
        mask_out = (vals > USLx) | (vals < LSLx)
        ax.scatter(
            sg[mask_out],
            vals[mask_out],
            s=50,
            color="red",
            edgecolors="black",
            zorder=5,
            label="Fuera de especificación" if mask_out.any() else None
        )

        # Media global (verde, punteada)
        ax.axhline(
            y=xbar_global,
            linestyle="--",
            linewidth=0.8,
            color="green",
            label="X̄ global"
        )

        # Límites de especificación (rojo continuo)
        ax.axhline(
            y=USLx,
            linestyle="-",
            linewidth=1.2,
            color="red",
            label="USLx"
        )
        ax.axhline(
            y=LSLx,
            linestyle="-",
            linewidth=1.2,
            color="red",
            label="LSLx"
        )

        # Límites de control (naranja punteado)
        ax.axhline(
            y=UCLx,
            linestyle="--",
            linewidth=0.8,
            color="orange",
            label="UCLx"
        )
        ax.axhline(
            y=LCLx,
            linestyle="--",
            linewidth=0.8,
            color="orange",
            label="LCLx"
        )

        # Ejes y título
        ax.set_xlabel("Subgrupos")
        ax.set_ylabel(variable)
        ax.set_title(titulo)

        # Eje X acotado a la ventana actual
        ax.set_xlim(sg.min() - 0.5, sg.max() + 0.5)
        ax.set_xticks(sg)

        # Escala vertical
        ax.set_ylim(y_min, y_max)

        # Grid
        ax.grid(True, linestyle="--", alpha=0.4)

        # Marco exterior negro
        for spine in ax.spines.values():
            spine.set_edgecolor("black")
            spine.set_linewidth(1.2)

        # Leyenda fuera de la gráfica
        ax.legend(
            loc="upper center",
            bbox_to_anchor=(0.5, -0.20),
            ncol=6,
            frameon=False,
            fontsize=9
        )

        plt.tight_layout()
        plt.show()

    # Slider para elegir la ventana de subgrupos
    max_start = max(n - window, 0)
    slider = IntSlider(
        value=0,
        min=0,
        max=max_start,
        step=1,
        description="Inicio:",
        continuous_update=False,
    )

    interact(plot_window, start_idx=slider)

# FUNCIÓN: Dropdown para selección de la variable de interés
def selector_carta_x(df_xbar, xbar_prom, df_stats, window=30):
    # Lista de variables disponibles (columnas de df_xbar)
    variables = list(df_xbar.columns)

    def _plot(variable):
        graficar_carta_x(df_xbar, xbar_prom, df_stats, variable, window=window)

    interact(_plot, variable=Dropdown(options=variables, description="Variable:"))

"------------------------------------------------------------------------------"

# FUNCIONES: Generación de la carta S por variable controlada
def graficar_carta_s(df_s, s_prom, df_stats, variable, window=30, titulo=None):
    # Datos base
    s_subgrupos = df_s[variable]                          # Serie S por subgrupo
    subgrupos = s_subgrupos.index.astype(int).to_numpy()  # Índices de subgrupos como enteros

    s_global = float(s_prom[variable])

    # Límites de control de S
    UCLs = float(df_stats.loc["LISs", variable])   # Límite superior de control de S
    LCLs = float(df_stats.loc["LIIs", variable])   # Límite inferior de control de S

    # Escala eje Y automática
    all_vals = np.concatenate([
        s_subgrupos.values,
        [UCLs, LCLs, s_global]
    ])
    y_min = all_vals.min()
    y_max = all_vals.max()
    margen = 0.05 * (y_max - y_min if y_max > y_min else 1.0)
    y_min -= margen
    y_max += margen

    # Ventana deslizante
    n = len(subgrupos)
    if window > n:
        window = n

    if titulo is None:
        titulo = (f"Carta S - {variable}   "
                  f"S̄={s_global:.3f}   UCLs={UCLs:.3f}   LCLs={LCLs:.3f}")

    def plot_window(start_idx: int):
        end_idx = min(start_idx + window, n)
        sg = subgrupos[start_idx:end_idx]
        vals = s_subgrupos.iloc[start_idx:end_idx]

        plt.figure(figsize=(12, 5))
        ax = plt.gca()

        # Zonas fuera de control
        ax.axhspan(UCLs, y_max, facecolor="red", alpha=0.08)
        ax.axhspan(y_min, LCLs, facecolor="red", alpha=0.08)

        # Serie S
        ax.plot(
            sg, vals,
            marker="o",
            linestyle="-",
            color="blue",
            label="S por subgrupo"
        )

        # Puntos fuera de control (fuera de UCLs / LCLs)
        mask_out = (vals > UCLs) | (vals < LCLs)
        ax.scatter(
            sg[mask_out],
            vals[mask_out],
            s=50,
            color="red",
            edgecolors="black",
            zorder=5,
            label="Fuera de control" if mask_out.any() else None
        )

        # Media global de S (verde, punteada fina)
        ax.axhline(
            y=s_global,
            linestyle="--",
            linewidth=0.8,
            color="green",
            label="S̄ global"
        )

        # Límites de control (naranja punteado)
        ax.axhline(
            y=UCLs,
            linestyle="--",
            linewidth=0.8,
            color="orange",
            label="UCLs"
        )
        ax.axhline(
            y=LCLs,
            linestyle="--",
            linewidth=0.8,
            color="orange",
            label="LCLs"
        )

        # Ejes y título
        ax.set_xlabel("Subgrupos")
        ax.set_ylabel(f"S de {variable}")
        ax.set_title(titulo)

        # Eje X acotado a la ventana actual
        ax.set_xlim(sg.min() - 0.5, sg.max() + 0.5)
        ax.set_xticks(sg)

        # Escala vertical
        ax.set_ylim(y_min, y_max)

        # Grid
        ax.grid(True, linestyle="--", alpha=0.4)

        # Marco exterior negro
        for spine in ax.spines.values():
            spine.set_edgecolor("black")
            spine.set_linewidth(1.2)

        # Leyenda fuera de la gráfica
        ax.legend(
            loc="upper center",
            bbox_to_anchor=(0.5, -0.20),
            ncol=4,
            frameon=False,
            fontsize=9
        )

        plt.tight_layout()
        plt.show()

    # Slider para elegir la ventana de subgrupos
    max_start = max(n - window, 0)
    slider = IntSlider(
        value=0,
        min=0,
        max=max_start,
        step=1,
        description="Inicio:",
        continuous_update=False,
    )

    interact(plot_window, start_idx=slider)


def selector_carta_s(df_s, s_prom, df_stats, window=30):
    variables = list(df_s.columns)

    def _plot(variable):
        graficar_carta_s(df_s, s_prom, df_stats, variable, window=window)

    interact(_plot, variable=Dropdown(options=variables, description="Variable:"))

OPTIMIZADOR DE TOLERANCIAS

Funciones auxiliares

In [None]:
# Configuración del optimizador de límites
OPT_CFG_DEFAULT = {
    "usar_subgrupos_estables": True,   # usar info de df_xbar/df_s + límites de control
    "metodo_outliers": "iqr",          # "iqr" o "percentiles"
    "iqr_factor": 1.5,                 # clásico 1.5·IQR
    "p_low": 0.5,                      # percentil inferior para recorte (si se usa percentiles)
    "p_high": 99.5,                    # percentil superior
    "cov_objetivo": 0.99,              # cubrir al menos 99% de los datos "limpios"
    "k_sigma": None,                   # si se quiere usar μ ± k·σ en vez de percentiles (None = usar percentiles)
    "min_frac_rango": 0.5,             # no reducir el rango a menos del 50% del rango actual
    "cpk_min": 1.0,                    # si hay Cpk<1.0, evitar apretar tolerancias
    "min_n_datos": 30,                 # mínimo de datos por variable para proponer algo
    "allow_gap": 1,
    "min_len_tramo": 25,
}


import numpy as np

def _filtrar_subgrupos_estables(
    df_xbar,
    df_s,
    df_stats,
    var: str,
    allow_gap: int = 1,
    min_len: int = 25,
):
    """
    Devuelve el TRAMO CONSECUTIVO más largo de subgrupos estables para UNA variable (var).

    Estable (por subgrupo) si:
      - X̄ está entre [LIIx, LISx]
      - S  está entre [LIIs, LISs]

    allow_gap:
      - 0  -> no perdona huecos
      - 1  -> perdona un False aislado entre True (recomendado)
      - 2  -> perdona hasta 2 huecos aislados (más permisivo)
    min_len:
      - Longitud mínima deseada del tramo. Si no existe, devuelve igual el tramo más largo.
    """

    # Validaciones mínimas
    if (var not in df_xbar.columns) or (var not in df_s.columns) or (var not in df_stats.columns):
        return np.array([], dtype=df_xbar.index.dtype)

    # Extraer límites de control desde df_stats
    try:
        UCLx = float(df_stats.loc["LISx", var])
        LCLx = float(df_stats.loc["LIIx", var])
        UCLs = float(df_stats.loc["LISs", var])
        LCLs = float(df_stats.loc["LIIs", var])
    except KeyError:
        return np.array([], dtype=df_xbar.index.dtype)

    x = df_xbar[var].to_numpy()
    s = df_s[var].to_numpy()
    subgrupos = df_xbar.index.to_numpy()

    # Máscara base de estabilidad por subgrupo
    mask = (x >= LCLx) & (x <= UCLx) & (s >= LCLs) & (s <= UCLs)

    if not mask.any():
        return np.array([], dtype=subgrupos.dtype)

    # --- Perdonar huecos aislados ---
    # Idea: si hay un False rodeado por True, lo convertimos en True.
    # Repetimos allow_gap veces por si hay patrones tipo True False False True con allow_gap=2.
    if allow_gap > 0 and len(mask) >= 3:
        mask2 = mask.copy()
        for _ in range(allow_gap):
            for i in range(1, len(mask2) - 1):
                if (not mask2[i]) and mask2[i - 1] and mask2[i + 1]:
                    mask2[i] = True
        mask = mask2

    # --- Buscar tramo consecutivo más largo de True ---
    best_start = 0
    best_len = 0
    cur_start = None
    cur_len = 0

    for i, ok in enumerate(mask):
        if ok:
            if cur_start is None:
                cur_start = i
                cur_len = 1
            else:
                cur_len += 1
        else:
            if cur_start is not None:
                if cur_len > best_len:
                    best_len = cur_len
                    best_start = cur_start
                cur_start = None
                cur_len = 0

    # cierre si termina en True
    if cur_start is not None and cur_len > best_len:
        best_len = cur_len
        best_start = cur_start

    tramo = subgrupos[best_start: best_start + best_len]

    # Si el tramo queda muy corto, igual lo devolvemos (pero tú lo reportas como warning)
    # (Esto es útil: evita que el optimizador “invente” estabilidad cuando no hay)
    return tramo



def _filtrar_outliers_serie(x: pd.Series, metodo="iqr", iqr_factor=1.5,
                            p_low=0.5, p_high=99.5):
    """
    Devuelve una serie filtrada eliminando outliers "burdos".
    - metodo="iqr": usa Q1-1.5·IQR y Q3+1.5·IQR
    - metodo="percentiles": usa [p_low, p_high]
    """
    x = x.dropna()
    if x.empty:
        return x

    if metodo == "iqr":
        q1 = x.quantile(0.25)
        q3 = x.quantile(0.75)
        iqr = q3 - q1
        low = q1 - iqr_factor * iqr
        high = q3 + iqr_factor * iqr
    else:  # percentiles
        low = x.quantile(p_low / 100.0)
        high = x.quantile(p_high / 100.0)

    return x[(x >= low) & (x <= high)]


Función principal

In [None]:
def optimizar_tolerancias(
    df_limpio: pd.DataFrame,
    df_stats: pd.DataFrame,
    df_xbar: pd.DataFrame = None,
    df_s: pd.DataFrame = None,
    variables: list[str] | None = None,
    config: dict | None = None,
) -> pd.DataFrame:

    # --- Configuración ---
    cfg = OPT_CFG_DEFAULT.copy()
    if config is not None:
        cfg.update(config)

    metodo_outliers = cfg["metodo_outliers"]
    iqr_factor = cfg["iqr_factor"]
    p_low_cfg = cfg["p_low"]
    p_high_cfg = cfg["p_high"]
    cov_obj = cfg["cov_objetivo"]
    k_sigma = cfg["k_sigma"]
    min_frac_rango = cfg["min_frac_rango"]
    cpk_min = cfg["cpk_min"]
    min_n_datos = cfg["min_n_datos"]

    # --- Variables a analizar ---
    if variables is None:
        # Heurística: todas las columnas numéricas que NO sean claramente contextuales
        cols_excluir = {"Muestra aleatoria", "Momento", "Cantidad piezas", "subgrupo", "pos"}
        variables = [
            c for c in df_limpio.columns
            if c not in cols_excluir and pd.api.types.is_numeric_dtype(df_limpio[c])
        ]


    filas_resultado = []

    for var in variables:
        comentario = []

        # 1) Extraer datos de la variable
        if var not in df_limpio.columns:
            comentario.append("Variable no encontrada en df_limpio.")
            filas_resultado.append({
                "Variable": var,
                "Nominal": np.nan,
                "LSL_actual": np.nan,
                "USL_actual": np.nan,
                "LSL_rec": np.nan,
                "USL_rec": np.nan,
                "Media": np.nan,
                "sigma": np.nan,
                "p_low": np.nan,
                "p_high": np.nan,
                "rango_actual": np.nan,
                "rango_rec": np.nan,
                "reduccion_rango_pct": np.nan,
                "cobertura_datos_pct": np.nan,
                "comentario": "; ".join(comentario),
            })
            continue

        # --- Tramo estable POR VARIABLE (si se desea y hay info) ---
        subgrupos_tramo = None
        if cfg["usar_subgrupos_estables"] and df_xbar is not None and df_s is not None:
            if "subgrupo" in df_limpio.columns:
                subgrupos_tramo = _filtrar_subgrupos_estables(
                    df_xbar=df_xbar,
                    df_s=df_s,
                    df_stats=df_stats,
                    var=var,              # <- clave: solo esta variable
                    allow_gap=cfg.get("allow_gap", 1),
                    min_len=cfg.get("min_len_tramo", 25),
                )

        serie = df_limpio[var].dropna()

        if subgrupos_tramo is not None and len(subgrupos_tramo) > 0:
            serie = serie[df_limpio["subgrupo"].isin(subgrupos_tramo)]
        else:
            comentario.append("No se detectó tramo estable para esta variable.")
            # Política recomendada: NO optimizar si no hay tramo estable
            # (si prefieres, puedes seguir sin filtrar, pero no es lo ideal)
            # continue

        if len(serie) < min_n_datos:
            comentario.append(f"No hay datos suficientes (n={len(serie)} < {min_n_datos}).")
            # Intentamos leer nominal y tolerancias actuales para reportar algo
            try:
                nominal = float(df_stats.loc["Valor nominal", var])
                LSL_act = float(df_stats.loc["Tolerancia inferior", var])
                USL_act = float(df_stats.loc["Tolerancia superior", var])
            except Exception:
                nominal = LSL_act = USL_act = np.nan

            filas_resultado.append({
                "Variable": var,
                "Nominal": nominal,
                "LSL_actual": LSL_act,
                "USL_actual": USL_act,
                "LSL_rec": np.nan,
                "USL_rec": np.nan,
                "Media": np.nan,
                "sigma": np.nan,
                "p_low": np.nan,
                "p_high": np.nan,
                "rango_actual": (USL_act - LSL_act) if np.isfinite(USL_act) and np.isfinite(LSL_act) else np.nan,
                "rango_rec": np.nan,
                "reduccion_rango_pct": np.nan,
                "cobertura_datos_pct": np.nan,
                "comentario": "; ".join(comentario),
            })
            continue

        # 3) Filtrar outliers
        serie_filtrada = _filtrar_outliers_serie(
            serie,
            metodo=metodo_outliers,
            iqr_factor=iqr_factor,
            p_low=p_low_cfg,
            p_high=p_high_cfg,
        )

        # Cálculo del porcentaje de datos retenidos y eliminados después de filtrar outliers
        n_total = len(serie)
        n_filtrados = len(serie_filtrada)

        porc_retenido = 100 * n_filtrados / n_total
        porc_eliminado = 100 - porc_retenido

        if len(serie_filtrada) < min_n_datos:
            comentario.append(
                f"Tras filtrar outliers, quedan pocos datos (n={len(serie_filtrada)})."
            )

        # Stats del proceso
        mu = float(serie_filtrada.mean())
        sigma = float(serie_filtrada.std(ddof=1)) if len(serie_filtrada) > 1 else 0.0

        # Percentiles para info
        p_low_val = float(serie_filtrada.quantile(p_low_cfg / 100.0))
        p_high_val = float(serie_filtrada.quantile(p_high_cfg / 100.0))

        # Percentiles objetivo según cobertura deseada
        q_low_obj = (1.0 - cov_obj) / 2.0
        q_high_obj = 1.0 - q_low_obj
        p_low_obj_val = float(serie_filtrada.quantile(q_low_obj))
        p_high_obj_val = float(serie_filtrada.quantile(q_high_obj))

        # 4) Leer nominal y tolerancias actuales
        try:
            nominal = float(df_stats.loc["Valor nominal", var])
        except Exception:
            nominal = mu  # fallback: si no hay nominal en df_stats, usamos mu

        try:
            LSL_act = float(df_stats.loc["Tolerancia inferior", var])
            USL_act = float(df_stats.loc["Tolerancia superior", var])
        except Exception:
            LSL_act = np.nan
            USL_act = np.nan

        rango_actual = (USL_act - LSL_act) if np.isfinite(USL_act) and np.isfinite(LSL_act) else np.nan

        # 5) Leer Cpk (si existe) para decidir si se puede apretar
        Cpk_act = None
        for nombre_fila in ["Cpk", "Cpd", "Cpkx"]:
            if nombre_fila in df_stats.index:
                try:
                    Cpk_act = float(df_stats.loc[nombre_fila, var])
                    break
                except Exception:
                    continue

        if Cpk_act is not None and Cpk_act < cpk_min:
            comentario.append(f"Cpk actual bajo ({Cpk_act:.2f} < {cpk_min}), "
                              "no se recomienda estrechar la tolerancia.")
            # En este caso, simplemente reportamos y no cambiamos nada
            filas_resultado.append({
                "Variable": var,
                "Nominal": nominal,
                "LSL_actual": LSL_act,
                "USL_actual": USL_act,
                "LSL_rec": LSL_act,
                "USL_rec": USL_act,
                "Media": mu,
                "sigma": sigma,
                "p_low": p_low_val,
                "p_high": p_high_val,
                "rango_actual": rango_actual,
                "rango_rec": rango_actual,
                "reduccion_rango_pct": 0.0,
                "cobertura_datos_pct": np.nan,
                "Pct_eliminado": porc_eliminado,
                "Pct_retenido": porc_retenido,
                "comentario": "; ".join(comentario),
            })
            continue

        # 6) Propuesta bruta de LSL/USL recomendados
        if k_sigma is not None and sigma > 0:
            # Basado en μ ± k·σ
            LSL_prop = mu - k_sigma * sigma
            USL_prop = mu + k_sigma * sigma
            comentario.append(f"Propuesta inicial basada en μ ± {k_sigma}·σ.")
        else:
            # Basado en percentiles para cov_objetivo (ej: 99%)
            LSL_prop = p_low_obj_val
            USL_prop = p_high_obj_val
            comentario.append(f"Propuesta inicial basada en percentiles que cubren ~{cov_obj*100:.1f}% de los datos.")

        rango_prop = USL_prop - LSL_prop

        # 7) Cobertura real de datos filtrados con la propuesta inicial
        dentro = ((serie_filtrada >= LSL_prop) & (serie_filtrada <= USL_prop)).mean()
        cobertura_pct = float(dentro * 100.0)

        # 8) Candado: no reducir demasiado el rango respecto al actual
        if np.isfinite(rango_actual) and rango_actual > 0:
            rango_min = min_frac_rango * rango_actual
            if rango_prop < rango_min:
                comentario.append(
                    f"Rango propuesto ({rango_prop:.4g}) < {min_frac_rango*100:.0f}% del rango actual "
                    f"({rango_actual:.4g}). Se limita el cierre."
                )
                # Recentramos rango mínimo sobre el nominal
                LSL_prop = nominal - rango_min / 2.0
                USL_prop = nominal + rango_min / 2.0
                rango_prop = rango_min

                # Recalcular cobertura con este rango ajustado
                dentro = ((serie_filtrada >= LSL_prop) & (serie_filtrada <= USL_prop)).mean()
                cobertura_pct = float(dentro * 100.0)

        # 9) Construir fila de resultado
        reduccion_pct = np.nan
        if np.isfinite(rango_actual) and rango_actual > 0:
            reduccion_pct = 100.0 * (1.0 - rango_prop / rango_actual)

        filas_resultado.append({
            "Variable": var,
            "Nominal": nominal,
            "Media": mu,
            "sigma": sigma,
            "p_low": p_low_val,
            "p_high": p_high_val,
            "LSL_actual": LSL_act,
            "USL_actual": USL_act,
            "LSL_rec": LSL_prop,
            "USL_rec": USL_prop,
            "rango_actual": rango_actual,
            "rango_rec": rango_prop,
            "reduccion_rango_pct": reduccion_pct,
            "cobertura_datos_pct": cobertura_pct,
            "Pct_eliminado": porc_eliminado,
            "Pct_retenido": porc_retenido,
            "Datos eliminados": n_total - n_filtrados,
            "Datos filtrados": n_filtrados,
            "Comentario": "; ".join(comentario),
        })

    df_res = pd.DataFrame(filas_resultado)
    return df_res


In [None]:
# Supongamos que ya hiciste:
# df_info, df_stats, df_raw = ...
# df_limpio, df_xbar, df_s, xbar_prom, s_prom = procesar_datos_spc(df_raw)

variables = ["t4012 [s]", "t4018 [s]", "t4015 [s]", "V4062 [cmł]", "p4072 [bar]", "V4065 [cmł]"]

df_tol_opt = optimizar_tolerancias(
    df_limpio=df_limpio,
    df_stats=df_stats,
    df_xbar=df_xbar,
    df_s=df_s,
    variables=variables,
    config={
        # aquí puedes sobreescribir lo que quieras
        "metodo_outliers": "percentiles",
        "p_low": 0.5,
        "p_high": 99.5,
        "cov_objetivo": 0.99,
        "min_frac_rango": 0.5,
        "cpk_min": 1.0,
    }
)

df_tol_opt


SIMULADOR

In [None]:
df_limpio.shape

In [None]:
df_stats

In [None]:
def calcular_sigma_w_por_subgrupos(df_limpio: pd.DataFrame, variables, col_subgrupo="subgrupo") -> pd.Series:
    """
    Sigma_W por variable = promedio de la desviación estándar por subgrupo (n=5).
    df_raw debe tener una columna 'subgrupo' (int) y columnas numéricas de variables.
    """
    sigmas = {}
    for var in variables:
        if var not in df_limpio.columns:
            sigmas[var] = np.nan
            continue

        g = df_limpio[[col_subgrupo, var]].dropna().groupby(col_subgrupo)[var]
        s_por_subgrupo = g.std(ddof=1)
        sigmas[var] = float(s_por_subgrupo.mean()) if len(s_por_subgrupo) else np.nan

    return pd.Series(sigmas, name="sigma_w")


def simular_plan_control(
    df_limpio: pd.DataFrame,
    df_stats: pd.DataFrame,
    rangos_total: dict,
    variables=None,
    col_subgrupo="subgrupo",
    idx_nominal="Valor nominal",
    decimales=3
) -> pd.DataFrame:
    """
    Simula nuevos límites a partir de un RANGO TOTAL por variable (USL-LSL) simétrico
    alrededor del nominal y recalcula Cp/Cpk.

    rangos_total: dict {variable: rango_total}, donde rango_total = USL - LSL.
    """

    if variables is None:
        variables = list(rangos_total.keys())

    # 1) Nominal desde df_stats
    nominales = {}
    for var in variables:
        try:
            nominales[var] = float(df_stats.loc[idx_nominal, var])
        except Exception:
            nominales[var] = np.nan
    nominales = pd.Series(nominales, name="nominal")

    # 2) Media desde datos crudos
    medias = {}
    for var in variables:
        medias[var] = float(df_limpio[var].dropna().mean()) if var in df_limpio.columns else np.nan
    medias = pd.Series(medias, name="media")

    # 3) Sigma_W por subgrupos
    sigma_w = calcular_sigma_w_por_subgrupos(df_limpio, variables, col_subgrupo=col_subgrupo)

    # 4) Tabla simulada
    filas = []
    for var in variables:
        rango = float(rangos_total.get(var, np.nan))     # <- ancho TOTAL
        half = rango / 2.0 if np.isfinite(rango) else np.nan

        nominal = float(nominales.get(var, np.nan))
        mu = float(medias.get(var, np.nan))
        sw = float(sigma_w.get(var, np.nan))

        # Límites simulados (simétricos)
        LSL = nominal - half
        USL = nominal + half

        # Cp / Cpk
        if np.isfinite(sw) and sw > 0 and np.isfinite(LSL) and np.isfinite(USL):
            cp = (USL - LSL) / (6.0 * sw)  # = rango / (6*sw)
            cpk = min((USL - mu) / (3.0 * sw), (mu - LSL) / (3.0 * sw))
            desviacion = (cp - cpk) / cp if cp != 0 else np.nan
        else:
            cp = np.nan
            cpk = np.nan
            desviacion = np.nan

        filas.append({
            "Variables": var,
            "Rango": rango,
            "Valor nominal": nominal,
            "Media": mu,
            "Sigma_W": sw,
            "Cp": cp,
            "Cpk": cpk,
            "Desviación": desviacion
        })

    df_out = pd.DataFrame(filas)

    # Formato
    for c in ["Rango", "Valor nominal", "Media", "Sigma_W", "Cp", "Cpk", "Desviación"]:
        df_out[c] = pd.to_numeric(df_out[c], errors="coerce").round(decimales)

    return df_out


In [None]:
rangos_nuevos = {
    "p4072 [bar]": 150,  # rango total (USL-LSL)
    "t4012 [s]": 2.0,
}

df_sim = simular_plan_control(df_limpio, df_stats, rangos_total=rangos_nuevos, decimales=3)
df_sim
