In [1]:
import sys, logging
from pathlib import Path
import numpy as np

# Agregar src al path (ajusta si el notebook está en otro directorio)
sys.path.insert(0, str(Path.cwd() / "src"))

from patches_tda import PatchSpacePipeline, PipelineConfig, WitnessTDA,PolynomialPatchGenerator
from patches_tda.ui import build_ui, PatchGeneratorAdapter


In [None]:
DATA_DIR = Path("vanhateren_imc")

PATCH_SIZE = 3
PATCHES_PER_IMAGE = 5000
TOP_DNORM_FRACTION = 0.20

TARGET_PATCH_SPACE_SIZE = 4_000_000

SUBSAMPLE_SIZE = 50_000

DENSITY_K = 300
DENSITY_TOP_FRACTION = 0.30
DENOISE_ITERATIONS = 2

SEED = 42
MAX_IMAGES = None  # None = todas

# --- helpers para notación científica ---
def sci(n: int) -> str:
    """Enteros en notación científica compacta: 4000000 -> 4e6"""
    return f"{n:.0e}".replace("+", "")

def frac(p: float) -> str:
    """Fracciones limpias: 0.10 -> 0.1"""
    return f"{p:g}"

# salida
OUTPUT_FILE = Path(
    f"X_M={sci(SUBSAMPLE_SIZE)}({DENSITY_K},{frac(DENSITY_TOP_FRACTION)}).npk"
)

In [None]:
logging.basicConfig(level=logging.INFO, format="%(levelname)s | %(name)s | %(message)s")
logger = logging.getLogger("notebook")

In [None]:
if not DATA_DIR.exists():
    raise FileNotFoundError(f"No existe: {DATA_DIR}")

config = PipelineConfig(
    data_dir=DATA_DIR,
    patch_size=PATCH_SIZE,
    patches_per_image=PATCHES_PER_IMAGE,
    top_dnorm_fraction=TOP_DNORM_FRACTION,
    target_patch_space_size=TARGET_PATCH_SPACE_SIZE,
    subsample_size=SUBSAMPLE_SIZE,
    density_k=DENSITY_K,
    density_top_fraction=DENSITY_TOP_FRACTION,
    denoise_iterations=DENOISE_ITERATIONS,
    seed=SEED,
    max_images=MAX_IMAGES,
)

pipeline = PatchSpacePipeline(config)
X_pk = pipeline.run()
stats = pipeline.last_stats

X_pk.shape, X_pk.dtype

In [None]:
print("RESULTADOS PATCH SPACE")
print("  imágenes procesadas:", stats.images_processed)
print("  parches extraídos:", f"{stats.total_patches_extracted:,}")
print("  tras D-norm:", f"{stats.patches_after_dnorm:,}")
print("  patch space size:", f"{stats.patch_space_size:,}")
print("  subsample:", f"{stats.subsample_size:,}")
print("  X(p,k) final:", f"{stats.final_size:,}")
print("  shape:", X_pk.shape)
print("  memoria MB:", X_pk.nbytes/(1024*1024))

In [None]:
# Nota: np.save siempre añade .npy si no está, así que para .npk usa esto:

with open(OUTPUT_FILE, "wb") as f:
    np.save(f, X_pk)

print("Guardado en:", OUTPUT_FILE)

In [None]:
X = np.load(OUTPUT_FILE, allow_pickle=True)

# Si viene directamente como ndarray
print(type(X), X.shape)

In [None]:
import numpy as np
import numpy as np

def dct_change_of_basis(selected_patches: np.ndarray) -> np.ndarray:
    """
    Proyecta parches 3x3 vectorizados (R^9) a coordenadas en la base DCT no-constante (R^8).

    Parameters
    ----------
    selected_patches : np.ndarray
        Shape (N, 9) o (9,). Parches vectorizados en orden row-major.

    Returns
    -------
    V : np.ndarray
        Shape (N, 8). Coordenadas en la base DCT: v = Λ A^T y.
    """
    X = np.asarray(selected_patches, dtype=np.float64)
    if X.ndim == 1:
        if X.shape[0] != 9:
            raise ValueError(f"Si es vector 1D, debe ser shape (9,), pero recibí {X.shape}.")
        X = X.reshape(1, 9)
    if X.ndim != 2 or X.shape[1] != 9:
        raise ValueError(f"selected_patches debe ser shape (N,9). Recibí {X.shape}.")

    # 1) Construir A = [e1 ... e8] (9, 8)
    e1 = (1/np.sqrt(6))   * np.array([1, 0, -1, 1, 0, -1, 1, 0, -1], dtype=np.float64)
    e2 = (1/np.sqrt(6))   * np.array([1, 1,  1, 0, 0,  0,-1,-1, -1], dtype=np.float64)
    e3 = (1/np.sqrt(54))  * np.array([1,-2,  1, 1,-2,  1, 1,-2,  1], dtype=np.float64)
    e4 = (1/np.sqrt(54))  * np.array([1, 1,  1,-2,-2, -2, 1, 1,  1], dtype=np.float64)
    e5 = (1/np.sqrt(8))   * np.array([1, 0, -1, 0, 0,  0,-1, 0,  1], dtype=np.float64)
    e6 = (1/np.sqrt(48))  * np.array([1, 0, -1,-2, 0,  2, 1, 0, -1], dtype=np.float64)
    e7 = (1/np.sqrt(48))  * np.array([1,-2,  1, 0, 0,  0,-1, 2, -1], dtype=np.float64)
    e8 = (1/np.sqrt(216)) * np.array([1,-2,  1,-2, 4, -2, 1,-2,  1], dtype=np.float64)

    A = np.column_stack([e1, e2, e3, e4, e5, e6, e7, e8])  # (9, 8)

    # 2) Λ diagonal con 1/||e_i||^2 (norma euclídea en R^9)
    col_norm_sq = np.sum(A * A, axis=0)   # (8,)
    lam = 1.0 / col_norm_sq              # (8,)

    # 3) v = Λ A^T y  -> equivalente vectorizado: (X @ A) * lam
    V = (X @ A) * lam[None, :]           # (N, 8)
    return V


def generate_Q30_R8(
    gen,
    dct_change_of_basis,
    n_points=30,
    eps=1e-2,
    unit_sphere=True,
    seed=0
):
    """
    Genera exactamente |Q| = n_points puntos en R^8 correspondientes a
    gradientes puramente cuadráticos ±(ax+by)^2, excluyendo a=0 o b=0.

    Parameters
    ----------
    gen : PolynomialPatchGenerator
    dct_change_of_basis : callable
        Función (N,9)->(N,8)
    n_points : int
        Tamaño final de Q (por defecto 30)
    eps : float
        Umbral para excluir direcciones vertical/horizontal
    unit_sphere : bool
        Si True, normaliza cada punto en R^8 para que ||v||2 = 1
    seed : int
        Semilla RNG

    Returns
    -------
    Q8 : (n_points, 8) np.ndarray
    """
    rng = np.random.default_rng(seed)

    # 1) muestrear thetas válidos (a!=0 y b!=0)
    thetas = []
    while len(thetas) < n_points:
        th = rng.uniform(0.0, 2*np.pi)
        if abs(np.cos(th)) > eps and abs(np.sin(th)) > eps:
            thetas.append(th)
    thetas = np.array(thetas, dtype=float)

    # 2) elegir signo: phi=0 (positivo) o phi=pi (negativo)
    phis = rng.choice([0.0, np.pi], size=n_points)

    # 3) generar parches en R^9
    P9 = np.array(
        [gen.generate_patch(theta=float(th), phi=float(ph)).reshape(-1)
         for th, ph in zip(thetas, phis)],
        dtype=np.float64
    )  # (30,9)

    # 4) pasar a R^8
    Q8 = dct_change_of_basis(P9)  # (30,8)

    # 5) (opcional) forzar S^7
    if unit_sphere:
        norms = np.linalg.norm(Q8, axis=1, keepdims=True)
        norms[norms == 0] = 1.0
        Q8 = Q8 / norms

    return Q8

# ---- Ejecución (asegúrate de tener PolynomialPatchGenerator definido/importado) ----
gen = PolynomialPatchGenerator(patch_size=3, usar_norma_D=True)
Q = generate_Q30_R8(gen, dct_change_of_basis, n_points=30, seed=42)
print(Q.shape)  # (30, 8)


In [None]:
X = np.load(OUTPUT_FILE)   # <-- ESTA LÍNEA FALTABA

N = X.shape[0]
n = min(10_000, N)   # por si X tiene menos de 10k

rng = np.random.default_rng(42)  # semilla opcional
idx = rng.choice(N, size=n, replace=False)

X = X[idx]
print(X.shape)  # (10000, D)

In [None]:

#X_union_Q = np.vstack([X, Q])  
plt.figure(figsize=(6, 6))
plt.scatter(X[:, 0], X[:,1], s=1, alpha=0.6)
plt.xlabel("x")
plt.ylabel("y")
plt.gca().set_aspect("equal", adjustable="box")
plt.tight_layout()
plt.show()

In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Cargar datos
X = np.load(OUTPUT_FILE)      # shape (N, 8)
X_union_Q = np.vstack([X, Q]) # shape (N+30, 8)

D = X_union_Q.shape[1]
assert D == 8, "Se espera que los datos estén en R^8"

fig, axes = plt.subplots(D, D, figsize=(16, 16))

for i in range(D):
    for j in range(D):
        ax = axes[i, j]

        if i == j:
            # Diagonal: histograma de la coordenada i
            ax.hist(X_union_Q[:, i], bins=50)
        else:
            # Proyección (i, j)
            ax.scatter(
                X_union_Q[:, j],
                X_union_Q[:, i],
                s=1,
                alpha=0.6
            )

        # Limpiar ticks
        ax.set_xticks([])
        ax.set_yticks([])

        # Etiquetas solo en bordes
        if i == D - 1:
            ax.set_xlabel(f"x{j+1}")
        if j == 0:
            ax.set_ylabel(f"x{i+1}")

plt.suptitle("Proyecciones 2D de $X \\cup Q \\subset \\mathbb{R}^8$", fontsize=16)
plt.tight_layout()
plt.show()


In [None]:

TDA_M_POINTS = 6000
TDA_N_WITNESSES = 1500
TDA_N_LANDMARKS = 110
TDA_MAX_ALPHA_SQUARE = 0.5
TDA_LIMIT_DIMENSION = 2
out_path = Path("X_union_Q.npy")
np.save(out_path, X_union_Q)
tda = (
    WitnessTDA(
        data_path=OUTPUT_FILE,
        m_points=TDA_M_POINTS,
        n_landmarks=TDA_N_LANDMARKS,
        n_witnesses=TDA_N_WITNESSES,
        max_alpha_square=TDA_MAX_ALPHA_SQUARE,
        limit_dimension=TDA_LIMIT_DIMENSION,
        seed=SEED
    )
    .load()
    .sample()
    .compute()
)

print("Puntos usados:", tda.points_.shape[0])
print("Landmarks:", tda.landmarks_.shape[0])
print("Witnesses:", tda.witnesses_.shape[0])
print("Intervalos:", len(tda.persistence_))

In [None]:
tda.plot_projection(method="coords", coords=(0,1))


# o si quieres coordenadas:
# tda.plot_projection(method="coords", dims=(0,1))

In [None]:
# Si tu WitnessTDA ya trae plot_persistence() interno:
tda.plot_persistence()

# o si usa función externa manual:
# tda.plot_persistence(plot_persistence_manual, title_prefix="Witness", max_dims=(0,1,2))


In [None]:
tda.plot_barcodes(max_dims=(1,))


In [None]:
gen = PolynomialPatchGenerator(patch_size=3, usar_norma_D =False)
patch = gen.generate_patch(theta=np.pi/4, phi=np.pi/3)
gen.plot_patch(patch, title="Mi parche")

# Grilla de variaciones
gen.plot_patch_grid(n_theta=4, n_phi=4)

In [2]:
%matplotlib widget

In [3]:

gen = PolynomialPatchGenerator(patch_size=3, usar_norma_D =False)
# Crear adapter (traduce generate_patch → generate)
adapter = PatchGeneratorAdapter(gen)
# Construir UI
ui = build_ui(adapter)
# Mostrar
display(ui)

VBox(children=(VBox(children=(ToggleButtons(description='Modo:', options=('point', 'trajectory'), value='point…

In [None]:
l = ["1","3","-1"]
r = ["80","-200","1"]
l+r