ModuleNotFoundError: No module named 'open_clip_torch'

In [4]:
# --- Check 1: Row-sum == 1 (main positive + neighbors, row-normalized) ---

from pathlib import Path
import numpy as np
import pandas as pd

# === 配置 ===
ARTIFACTS_DIR = Path("/cwStorage/nodecw_group/jijh/yuanspace_data/artifacts")
NODES_PATH = ARTIFACTS_DIR / "nodes.parquet"
EDGES_PATH = ARTIFACTS_DIR / "edges.parquet"

# 抽样的 anchor 数量（建议 200~1000 视内存）
NUM_ANCHORS_TO_CHECK = 500

# “主正样本权重=1 + 邻居 alpha 后归一化”的默认策略
# 如果你在训练里使用了“锐化/温度”处理（例如 alpha^gamma 或主正样本权重 τ），可以在这里同步设置：
USE_GAMMA = False
GAMMA = 1.0      # 若 USE_GAMMA=True，则对 alpha 使用 alpha ** GAMMA
USE_TAU = False
TAU = 1.0        # 若 USE_TAU=True，则主正样本权重设为 TAU（替代 1）

print(f"Loading nodes: {NODES_PATH}")
nodes = pd.read_parquet(NODES_PATH, columns=["tile_id"])
tile_ids = nodes["tile_id"].to_numpy()

print(f"Loading edges: {EDGES_PATH}")
edges = pd.read_parquet(EDGES_PATH, columns=["src_tile_id", "nbr_tile_id", "alpha"])

# 为了加速随机抽样，这里只用存在出边的 src 集合
src_unique = edges["src_tile_id"].unique()
rng = np.random.default_rng(2025)
sampled_src = rng.choice(src_unique, size=min(NUM_ANCHORS_TO_CHECK, len(src_unique)), replace=False)

row_sums = []
bad_rows = []

# 将边按 src 分组，便于快速检索
grp = edges.groupby("src_tile_id", sort=False)

for src in sampled_src:
    if src not in grp.groups:
        # 该 src 没有邻居边（极少数情况），直接跳过
        continue
    sub = grp.get_group(src)

    # 邻居 alpha
    alpha = sub["alpha"].to_numpy(dtype=np.float64)

    # 可选：对 alpha 做幂次锐化
    if USE_GAMMA and GAMMA != 1.0:
        alpha = np.power(alpha, GAMMA)

    # 主正样本权重
    main_w = TAU if USE_TAU else 1.0

    total = main_w + alpha.sum()
    if total <= 0:
        # 不应出现；防御性处理
        rs = np.nan
    else:
        # 归一化后的行和应为 1
        rs = (main_w / total) + (alpha / total).sum()

    row_sums.append(rs)
    if not np.isfinite(rs) or abs(rs - 1.0) > 1e-6:
        bad_rows.append((src, rs))

row_sums = np.array(row_sums, dtype=np.float64)
print(f"[Check1] Checked {len(row_sums)} rows. mean={row_sums.mean():.8f}, "
      f"std={row_sums.std():.8f}, min={row_sums.min():.8f}, max={row_sums.max():.8f}")
if bad_rows:
    print(f"[Check1][WARN] Found {len(bad_rows)} rows with |sum-1|>1e-6. Show first 5:")
    for a,b in bad_rows[:5]:
        print(f"  src_tile_id={a}, row_sum={b}")
else:
    print("[Check1] ✅ All sampled rows sum to 1 within tolerance.")


Loading nodes: /cwStorage/nodecw_group/jijh/yuanspace_data/artifacts/nodes.parquet
Loading edges: /cwStorage/nodecw_group/jijh/yuanspace_data/artifacts/edges.parquet
[Check1] Checked 500 rows. mean=1.00000000, std=0.00000000, min=1.00000000, max=1.00000000
[Check1] ✅ All sampled rows sum to 1 within tolerance.


In [5]:
# --- Check 2: Mapping assertion for main positives (simulated) ---

from pathlib import Path
import numpy as np
import pandas as pd

ARTIFACTS_DIR = Path("/cwStorage/nodecw_group/jijh/yuanspace_data/artifacts")
NODES_PATH = ARTIFACTS_DIR / "nodes.parquet"

# === 模拟分布式设置 ===
WORLD_SIZE = 3
N_LOCAL = 16      # 每个 rank 的本地 batch 大小（用于模拟），可改小一点以便可视化
RANKS_TO_TEST = [0, 1, 2]  # 要测试的 rank 集合

nodes = pd.read_parquet(NODES_PATH, columns=["tile_id"])
tile_ids = nodes["tile_id"].to_numpy()
assert len(tile_ids) >= WORLD_SIZE * N_LOCAL, "数据量不足以构造模拟批次，请减小 N_LOCAL 或 WORLD_SIZE"

# 简单的“轮转分配”方式模拟 DistributedSampler（真实实现有 shuffle，但索引公式一致）
# 我们取前 WORLD_SIZE * N_LOCAL 个样本，切成 WORLD_SIZE 片，每片 N_LOCAL 大小
local_batches = []
for r in range(WORLD_SIZE):
    start = r * N_LOCAL
    end = (r + 1) * N_LOCAL
    local_img_ids = tile_ids[start:end].copy()  # 作为 image_tile_ids
    local_txt_ids = tile_ids[start:end].copy()  # 作为 text_tile_ids（对称）
    local_batches.append((local_img_ids, local_txt_ids))

# 模拟 all_gather 后的“全局列域”
global_tile_ids = np.concatenate([b[1] for b in local_batches], axis=0)  # 用 text 端决定列域
txt_id_to_idx = {int(tid): idx for idx, tid in enumerate(global_tile_ids)}

# 对每个 rank 做断言：txt_id_to_idx[image_tile_ids[i]] == rank*N_LOCAL + i
bad = []
for rank in RANKS_TO_TEST:
    image_tile_ids, _ = local_batches[rank]
    for i in range(N_LOCAL):
        tid = int(image_tile_ids[i])
        col = txt_id_to_idx.get(tid, None)
        expect = rank * N_LOCAL + i
        if col != expect:
            bad.append((rank, i, tid, col, expect))

if bad:
    print(f"[Check2][FAIL] Found {len(bad)} mismatches. Show first 10:")
    for r,i,tid,col,exp in bad[:10]:
        print(f"  rank={r}, i={i}, tile_id={tid}, mapped_col={col}, expected={exp}")
else:
    print("[Check2] ✅ Mapping assertion passed for all tested ranks.")


[Check2] ✅ Mapping assertion passed for all tested ranks.


In [6]:
# --- Check 3: Print pos_col and neighbor_cols (simulated batch domain) ---

from pathlib import Path
import numpy as np
import pandas as pd

ARTIFACTS_DIR = Path("/cwStorage/nodecw_group/jijh/yuanspace_data/artifacts")
NODES_PATH = ARTIFACTS_DIR / "nodes.parquet"
EDGES_PATH = ARTIFACTS_DIR / "edges.parquet"

WORLD_SIZE = 3
N_LOCAL = 16      # 与检查 2 相同，保持一致
RANK = 0          # 选择一个 rank 来展示
ANCHOR_INDEX_IN_LOCAL = 3  # 选择该 rank 的第几个样本作为 anchor

# 1) 读取数据
nodes = pd.read_parquet(NODES_PATH, columns=["tile_id"])
edges = pd.read_parquet(EDGES_PATH, columns=["src_tile_id", "nbr_tile_id", "distance", "alpha"])
tile_ids = nodes["tile_id"].to_numpy()
assert len(tile_ids) >= WORLD_SIZE * N_LOCAL

# 2) 构造模拟批次与列域（与检查 2 相同）
local_batches = []
for r in range(WORLD_SIZE):
    start = r * N_LOCAL
    end = (r + 1) * N_LOCAL
    local_img_ids = tile_ids[start:end].copy()
    local_txt_ids = tile_ids[start:end].copy()
    local_batches.append((local_img_ids, local_txt_ids))

global_tile_ids = np.concatenate([b[1] for b in local_batches], axis=0)  # 列域由 text 端确定
txt_id_to_idx = {int(tid): idx for idx, tid in enumerate(global_tile_ids)}

# 3) 选择 anchor（取 image 端的第 ANCHOR_INDEX_IN_LOCAL 个）
image_tile_ids, text_tile_ids = local_batches[RANK]
anchor_tile_id = int(image_tile_ids[ANCHOR_INDEX_IN_LOCAL])

# 4) 取该 anchor 的邻居（按距离从近到远）
nbr_df = edges[edges["src_tile_id"] == anchor_tile_id].sort_values("distance").copy()
# 只保留“本批次列域里存在”的邻居
in_batch_mask = nbr_df["nbr_tile_id"].isin(global_tile_ids)
nbr_in = nbr_df[in_batch_mask]

pos_col = txt_id_to_idx.get(anchor_tile_id, None)
neighbor_cols = [txt_id_to_idx.get(int(t), None) for t in nbr_in["nbr_tile_id"].tolist()]

print(f"[Check3] Anchor tile_id={anchor_tile_id}")
print(f"  pos_col (should be rank*N_LOCAL + local_index): {pos_col}")
print(f"  neighbors in-batch = {len(nbr_in)} / total neighbors = {len(nbr_df)}")

# 打印前若干邻居
MAX_SHOW = 12
for idx, row in nbr_in.head(MAX_SHOW).iterrows():
    t = int(row["nbr_tile_id"])
    col = txt_id_to_idx.get(t, None)
    dist = float(row["distance"])
    alpha = float(row["alpha"])
    print(f"    nbr_tile_id={t:>10d} | col={col:>5} | dist={dist:8.3f} | alpha={alpha:6.3f}")

# 5) 可选：计算“主正样本+邻居（仅批内）”的归一化行和，辅助人工核验
main_w = 1.0
alpha_vec = nbr_in["alpha"].to_numpy(dtype=float)
row_sum = (main_w + alpha_vec.sum())
print(f"  (Optional) main+in-batch-neighbors sum before normalization = {row_sum:.6f} "
      f"(应 < 1+Σ所有邻居；仅用于辅助理解)")


[Check3] Anchor tile_id=3
  pos_col (should be rank*N_LOCAL + local_index): 3
  neighbors in-batch = 0 / total neighbors = 6
  (Optional) main+in-batch-neighbors sum before normalization = 1.000000 (应 < 1+Σ所有邻居；仅用于辅助理解)


# 测试改正

In [7]:
# ==== 在 Notebook 里运行：为你的 Dataset 增补索引加速结构 ====
import numpy as np
from collections import defaultdict

def dataset_build_fast_indices(dataset, k_neighbors: int = 6):
    """
    为 SpatiallyAwareDataset 构建快速索引与向量化邻居矩阵。
    需要 dataset 拥有:
      - dataset.tile_ids: np.ndarray shape [N]
      - dataset.sample_ids: np.ndarray shape [N] 或等价的列表
      - dataset.edges_map: dict[int, list[int]]  # src_tile_id -> nbr_tile_id(长度<=k)
    生成:
      - dataset.id2idx: dict[tile_id] -> dataset index
      - dataset.sample_to_indices: dict[sample_id] -> np.ndarray of indices
      - dataset.nbr_index: np.ndarray [N, k_neighbors]，元素为索引，缺失填 -1
    """
    assert hasattr(dataset, "tile_ids") and hasattr(dataset, "sample_ids"), "Dataset 缺少 tile_ids 或 sample_ids"
    assert hasattr(dataset, "edges_map"), "Dataset 需要有 edges_map (src_tile_id -> [nbr_tile_id...])"
    tile_ids = np.asarray(dataset.tile_ids)
    sample_ids = np.asarray(dataset.sample_ids)

    N = tile_ids.shape[0]
    id2idx = {int(t): int(i) for i, t in enumerate(tile_ids)}
    dataset.id2idx = id2idx

    # 每个样本的索引列表（向量化）
    sample_to_indices = defaultdict(list)
    for i, sid in enumerate(sample_ids):
        sample_to_indices[sid].append(i)
    dataset.sample_to_indices = {sid: np.asarray(idxs, dtype=np.int64) for sid, idxs in sample_to_indices.items()}

    # 邻居“索引矩阵” [N, K]，向量化映射 tile_id->index
    K = int(k_neighbors)
    nbr_index = np.full((N, K), -1, dtype=np.int64)
    for i in range(N):
        src_tid = int(tile_ids[i])
        nbr_tids = dataset.edges_map.get(src_tid, [])
        # 只取前K个
        if len(nbr_tids) > K:
            nbr_tids = nbr_tids[:K]
        # 映射为索引，映射不到的置 -1
        mapped = [id2idx.get(int(t), -1) for t in nbr_tids]
        if mapped:
            nbr_index[i, :len(mapped)] = np.asarray(mapped, dtype=np.int64)
    dataset.nbr_index = nbr_index
    print(f"[build_fast_indices] N={N}, K={K}; sample buckets={len(dataset.sample_to_indices)}; done.")


In [8]:
# ==== 在 Notebook 里运行：邻域感知批采样器 ====
import math
import random
import torch
from torch.utils.data import Sampler

class SpatialBucketBatchSampler(Sampler):
    """
    邻域感知 BatchSampler：
    - 每个 batch 由若干“中心锚点” + 这些锚点的邻居（也作为锚点）组成；
    - 只产出索引列表（不改 DataLoader 的 __getitem__ 行为）→ IO 不增加；
    - DDP 下按 sample_id 分桶给 rank（hash 分配），各 rank 扫描不同样本集合；
    - 保证每个 rank 的 batch 数等长（循环补齐或丢弃多余）。

    依赖 dataset 成员：
      dataset.sample_to_indices: dict[sample_id] -> np.ndarray of indices
      dataset.nbr_index: np.ndarray[N, K] 邻居索引矩阵（-1 表示无效）
      dataset.sample_ids: np.ndarray[N]
    """
    def __init__(
        self,
        dataset,
        batch_size: int,
        world_size: int = 1,
        rank: int = 0,
        centers_per_batch: int = 16,
        max_neighbors_per_center: int = 4,
        same_sample_only: bool = True,
        drop_last: bool = True,
        seed: int = 2025
    ):
        self.dataset = dataset
        self.batch_size = int(batch_size)
        self.world_size = int(world_size)
        self.rank = int(rank)
        self.centers_per_batch = int(centers_per_batch)
        self.max_neighbors_per_center = int(max_neighbors_per_center)
        self.same_sample_only = bool(same_sample_only)
        self.drop_last = bool(drop_last)
        self.seed = int(seed)

        assert hasattr(dataset, "sample_to_indices") and hasattr(dataset, "nbr_index"), \
            "请先调用 dataset_build_fast_indices(dataset) 生成 sample_to_indices / nbr_index"
        self.N = len(dataset.sample_ids)
        self.sample_ids = np.asarray(dataset.sample_ids)

        # 1) 将样本桶按哈希分配给各 rank，避免 DDP 重叠
        all_samples = list(dataset.sample_to_indices.keys())
        all_samples.sort(key=lambda x: str(x))  # 稳定顺序
        self.assigned_samples = [s for s in all_samples if (hash(str(s)) % self.world_size) == self.rank]
        assert len(self.assigned_samples) > 0, "本 rank 被分配的样本桶为空，请检查数据或 world_size 设置。"

        # 2) 计算每个 epoch 的批次数，保证各 rank 步数一致
        #    用全局 N 估算：每个 rank 生成 floor(N / (B * world_size)) 个 batch
        self.batches_per_epoch = self._compute_batches_per_epoch()

        # 3) 为每个样本桶准备一个“游标指针”与乱序索引
        self._rng = random.Random(self.seed)
        self._epoch = 0
        self._reset_per_sample_state()

    def _compute_batches_per_epoch(self):
        # 按总样本量与 batch_size/world_size 估算一个“全局统一”的步数
        est = self.N // (self.batch_size * self.world_size)
        return max(1, int(est))

    def _reset_per_sample_state(self):
        self._per_sample_order = {}
        self._per_sample_ptr = {}
        for s in self.assigned_samples:
            idxs = self.dataset.sample_to_indices[s]
            # 打乱
            order = idxs.copy()
            self._rng.shuffle(order.tolist())  # 原地洗牌（list 更快）
            self._per_sample_order[s] = order
            self._per_sample_ptr[s] = 0

        # 给样本桶一个稳定但洗牌后的遍历顺序
        self._sample_cycle = self.assigned_samples.copy()
        self._rng.shuffle(self._sample_cycle)

    def set_epoch(self, epoch: int):
        self._epoch = int(epoch)
        # 以 epoch 为种子扰动，确保每个 epoch 的顺序不同但各 rank 一致可复现
        self._rng.seed(self.seed + self._epoch)
        self._reset_per_sample_state()

    def __len__(self):
        return self.batches_per_epoch

    def __iter__(self):
        rng = self._rng
        ds = self.dataset
        nbr_index = ds.nbr_index
        sample_ids = self.sample_ids

        # round-robin 扫描样本桶，产生固定数量的 batch
        num_yield = 0
        sample_cursor = 0
        while num_yield < self.batches_per_epoch:
            s = self._sample_cycle[sample_cursor]
            sample_cursor = (sample_cursor + 1) % len(self._sample_cycle)

            order = self._per_sample_order[s]
            ptr = self._per_sample_ptr[s]
            if ptr >= len(order):
                # 该桶耗尽，重置该桶（循环使用），以避免不同 rank 步数不一致
                self._rng.shuffle(order.tolist())
                self._per_sample_ptr[s] = 0
                ptr = 0

            batch = []
            used = set()

            # 1) 选中心锚点
            centers = []
            centers_take = min(self.centers_per_batch, len(order) - ptr)
            if centers_take <= 0:
                continue
            centers = order[ptr:ptr + centers_take]
            self._per_sample_ptr[s] += centers_take

            # 2) 将中心加入 batch
            for c in centers:
                if len(batch) >= self.batch_size:
                    break
                if c in used:
                    continue
                batch.append(int(c))
                used.add(int(c))

                # 3) 拉取该中心的邻居索引，限制同样本、去重，最多取 max_neighbors_per_center
                nbrs = nbr_index[int(c)]
                # 过滤无效
                nbrs = nbrs[nbrs >= 0]
                if self.same_sample_only:
                    nbrs = nbrs[sample_ids[nbrs] == s]
                # 去重自己/已选
                if len(nbrs) > 0:
                    # 打乱邻居次序，避免总是取前 K 个
                    nbrs = nbrs.copy()
                    rng.shuffle(nbrs.tolist())
                    for nb in nbrs:
                        if len(batch) >= self.batch_size:
                            break
                        nb = int(nb)
                        if nb in used:
                            continue
                        batch.append(nb)
                        used.add(nb)
                        if len(batch) >= self.batch_size or \
                           (len(batch) - len(centers)) >= self.max_neighbors_per_center * len(centers):
                            break  # 控制邻居预算

            # 4) 若 batch 未满，继续从同样本补齐随机样本（不保证是邻居）
            if len(batch) < self.batch_size:
                remain = self.batch_size - len(batch)
                # 从该桶剩余索引中补
                ptr2 = self._per_sample_ptr[s]
                if ptr2 + remain <= len(order):
                    fill = order[ptr2:ptr2 + remain]
                    self._per_sample_ptr[s] += remain
                    for f in fill:
                        fi = int(f)
                        if fi in used:
                            continue
                        batch.append(fi)
                        used.add(fi)
                else:
                    # 不够就循环补
                    take = 0
                    while len(batch) < self.batch_size:
                        if self._per_sample_ptr[s] >= len(order):
                            self._rng.shuffle(order.tolist())
                            self._per_sample_ptr[s] = 0
                        fi = int(order[self._per_sample_ptr[s]])
                        self._per_sample_ptr[s] += 1
                        if fi in used:
                            continue
                        batch.append(fi)
                        used.add(fi)

            # 5) 产出
            if self.drop_last and len(batch) < self.batch_size:
                continue
            num_yield += 1
            yield batch


In [15]:
# ==== 在 Notebook 里运行示例（或粘贴到你的 train 脚本里） ====
from torch.utils.data import DataLoader
import torch.distributed as dist
from open_clip_train.spatial_data import SpatiallyAwareDataset

# 1) 准备 dataset（你已有）
dataset = SpatiallyAwareDataset(artifacts_dir="/cwStorage/nodecw_group/jijh/yuanspace_data/artifacts", k_neighbors=6)
# 假定 dataset 已经有 tile_ids / sample_ids / edges_map
dataset_build_fast_indices(dataset, k_neighbors=6)

# 2) 构造 batch_sampler（DDP 环境下）
if dist.is_available() and dist.is_initialized():
    world_size = dist.get_world_size()
    rank = dist.get_rank()
else:
    world_size = 1
    rank = 0

batch_sampler = SpatialBucketBatchSampler(
    dataset=dataset,
    batch_size=1600,
    world_size=world_size,
    rank=rank,
    centers_per_batch=16,           # 可调：中心锚点个数
    max_neighbors_per_center=4,     # 可调：每个中心带的邻居上限
    same_sample_only=True,
    drop_last=True,
    seed=2025,
)

# 3) DataLoader（注意：不要再传 batch_size / shuffle / sampler）
loader = DataLoader(
    dataset,
    batch_sampler=batch_sampler,
    num_workers=16,
    pin_memory=True,
    persistent_workers=True,
    prefetch_factor=4,
    collate_fn=getattr(dataset, "collate_fn", None)  # 若你有自定义 collate_fn 就传入
)

# 4) 训练循环中，每个 epoch 开始前设置 epoch（确保可复现洗牌）
for epoch in range(num_epochs):
    if hasattr(batch_sampler, "set_epoch"):
        batch_sampler.set_epoch(epoch)
    # for step, batch in enumerate(loader):
    #     ... 你的训练逻辑 ...


TypeError: unsupported operand type(s) for /: 'str' and 'str'