In [None]:
import numpy as np
import pandas as pd
from tqdm import tqdm
from pathlib import Path

In [None]:
def extract_mean_vectors_from_crop(img: np.ndarray, margin_frac: float, grid_size: int):
    """
    Compute average spectral vectors from sub-patches of a cropped hyperspectral image.

    Args:
        img (np.ndarray): Hyperspectral cube (H, W, Bands)
        margin_frac (float): Fraction of image borders to crop out
        grid_size (int): Number of divisions along each dimension (e.g., 5 for 5×5 grid)
    Returns:
        list of np.ndarray: Mean-normalized spectral vectors per patch
    """
    H, W, _ = img.shape

    # Apply margin
    margin_y = int(H * margin_frac)
    margin_x = int(W * margin_frac)
    roi_img = img[margin_y:H - margin_y, margin_x:W - margin_x, :]

    results = []
    rows = np.array_split(roi_img, grid_size, axis=0)
    for i in range(grid_size):
        cols = np.array_split(rows[i], grid_size, axis=1)
        for j in range(grid_size):
            patch = cols[j]
            pixels = patch.reshape(-1, patch.shape[2])
            pixels = pixels / (np.linalg.norm(pixels, axis=1, keepdims=True) + 1e-8)
            vec = pixels.mean(axis=0)
            results.append(vec)
    return results

#
# Cultivar-based splitting

In [None]:
def generate_apple_based_split(df, grid_size: int):
    """
    Extracts mean spectral vectors for the specified grid size,
    applies the apple-type-based splitting strategy,
    and saves train/val/test CSVs.

    Args:
        df (pd.DataFrame): DataFrame containing 'path', 'label', and 'apple_type' columns
        grid_size (int): number of subpatches per dimension (e.g., 5 for 5×5 grid)
    """
    # === Split configuration ===
    val_apple_types = {"prince"}
    test_apple_types = {"granny"}

    mean_spectra = []

    # === Extract mean spectra ===
    for _, row in tqdm(df.iterrows(), total=len(df), desc=f"{grid_size}x{grid_size} extraction (apple-based)"):
        try:
            path = Path(row["path"])
            if not path.exists():
                continue

            data = np.load(path)
            if "cube" not in data:
                continue

            cube = data["cube"]
            patch_vectors = extract_mean_vectors_from_crop(cube, margin_frac, grid_size)

            for vec in patch_vectors:
                mean_spectra.append({
                    **{f"band_{i}": val for i, val in enumerate(vec)},
                    "apple_content": row["label"],
                    "apple_type": row["apple_type"].lower()
                })

        except Exception as e:
            print(f"❌ Error processing {row['path']}: {e}")

    # === Build final dataframe and split ===
    df_final = pd.DataFrame(mean_spectra)

    train_df = df_final[~df_final["apple_type"].isin(val_apple_types | test_apple_types)]
    val_df   = df_final[df_final["apple_type"].isin(val_apple_types)]
    test_df  = df_final[df_final["apple_type"].isin(test_apple_types)]

    # === Save outputs ===
    train_df.to_csv(output_dir / f"{grid_size}x{grid_size}_train_apple.csv", index=False)
    val_df.to_csv(output_dir / f"{grid_size}x{grid_size}_val_apple.csv", index=False)
    test_df.to_csv(output_dir / f"{grid_size}x{grid_size}_test_apple.csv", index=False)

    print(f"✅ Done ({grid_size}×{grid_size}). Train: {len(train_df)}, Val: {len(val_df)}, Test: {len(test_df)}")


In [None]:
df = pd.read_csv(csv_path)
print(f"Loaded {len(df)} image records.")

In [None]:
generate_apple_based_split(df, grid_size=1)
generate_apple_based_split(df, grid_size=2)
generate_apple_based_split(df, grid_size=3)
generate_apple_based_split(df, grid_size=4)
generate_apple_based_split(df, grid_size=5)

#
# Sugar concentration-based splitting

In [None]:
split_rules = {
    "aport":       {"test": {25, 45},  "val": {65}},
    "gala":        {"test": {30, 50},  "val": {70}},
    "golden":      {"test": {35, 55},  "val": {75}},
    "granny":      {"test": {60},      "val": {40, 50, 55}},
    "prince":      {"test": {40},      "val": {45, 50, 60}},
    "idared":      {"test": {45, 65},  "val": {25}},
    "semerenko":   {"test": {50, 70},  "val": {30}},
    "starcrimson": {"test": {55, 75},  "val": {35}},
}

def get_split_rule_based(row):
    """
    Assigns each sample to train/val/test based on its apple type and sugar concentration.
    """
    apple = row["apple_type"]
    label = int(round(row["apple_content"]))
    if apple in split_rules:
        if label in split_rules[apple]["test"]:
            return "test"
        elif label in split_rules[apple]["val"]:
            return "val"
    return "train"


In [None]:
def generate_rule_based_split(df, grid_size: int):
    """
    Extracts mean spectral vectors for the specified grid size,
    applies the rule-based splitting strategy, and saves train/val/test CSVs.
    """
    mean_spectra = []

    for _, row in tqdm(df.iterrows(), total=len(df), desc=f"{grid_size}x{grid_size} extraction"):
        try:
            path = Path(row["path"])
            if not path.exists():
                continue

            data = np.load(path)
            if "cube" not in data:
                continue

            cube = data["cube"]
            patch_vectors = extract_mean_vectors_from_crop(cube, margin_frac, grid_size)

            for vec in patch_vectors:
                mean_spectra.append({
                    **{f"band_{i}": val for i, val in enumerate(vec)},
                    "apple_content": row["label"],
                    "apple_type": row["apple_type"].lower()
                })
        except Exception as e:
            print(f"❌ Error processing {row['path']}: {e}")

    df_final = pd.DataFrame(mean_spectra)
    df_final["split"] = df_final.apply(get_split_rule_based, axis=1)

    train_df = df_final[df_final["split"] == "train"].drop(columns=["split"])
    val_df   = df_final[df_final["split"] == "val"].drop(columns=["split"])
    test_df  = df_final[df_final["split"] == "test"].drop(columns=["split"])

    train_df.to_csv(output_dir / f"{grid_size}x{grid_size}_train_rule.csv", index=False)
    val_df.to_csv(output_dir / f"{grid_size}x{grid_size}_val_rule.csv", index=False)
    test_df.to_csv(output_dir / f"{grid_size}x{grid_size}_test_rule.csv", index=False)

    print(f"✅ Done ({grid_size}×{grid_size}). Train: {len(train_df)}, Val: {len(val_df)}, Test: {len(test_df)}")


In [None]:
df = pd.read_csv(csv_path)
print(f"Loaded {len(df)} image records.")

In [None]:
generate_rule_based_split(df, grid_size=1)
generate_rule_based_split(df, grid_size=2)
generate_rule_based_split(df, grid_size=3)
generate_rule_based_split(df, grid_size=4)
generate_rule_based_split(df, grid_size=5)