In [1]:
import jittor as jt
from sde_lib import VPSDE, subVPSDE, VESDE  # 从你的模块导入
jt.flags.use_cuda = 1
import os
os.environ['CUDA_VISIBLE_DEVICES']='1'

# ------------------------- 通用测试工具 -------------------------
def test_sde_core(sde_class, N=1000):
    """测试 SDE 核心功能：初始化 + sde + marginal_prob + 先验采样/对数概率"""
    # 1. 初始化 SDE
    sde = sde_class(N=N)
    print(f"=== 测试 {sde_class.__name__} ===")

    # 2. 构造测试数据（简化维度：batch=2, 通道=1, 尺寸=4x4）
    batch_size = 2
    x = jt.randn(batch_size, 1, 4, 4)  # 随机输入
    t = jt.rand(batch_size)             # 随机时间步（0~1）

    # 3. 测试 sde()：漂移项(drift)和扩散项(diffusion)
    drift, diffusion = sde.sde(x, t)
    assert drift.shape == x.shape, "sde() 漂移项形状错误"
    assert diffusion.shape == (batch_size,), "sde() 扩散项形状错误"
    print("  sde() 测试通过 ✔️")

    # 4. 测试 marginal_prob()：均值(mean)和标准差(std)
    mean, std = sde.marginal_prob(x, t)
    assert mean.shape == x.shape, "marginal_prob() 均值形状错误"
    assert std.shape == (batch_size,), "marginal_prob() 标准差形状错误"
    print("  marginal_prob() 测试通过 ✔️")

    # 5. 测试先验采样 + 对数概率
    sample_shape = (batch_size, 1, 4, 4)
    sample = sde.prior_sampling(sample_shape)
    assert sample.shape == sample_shape, "prior_sampling() 形状错误"
    
    logp = sde.prior_logp(sample)
    assert logp.shape == (batch_size,), "prior_logp() 形状错误"
    print("  先验采样/对数概率 测试通过 ✔️")

    # 6. 测试离散化(discretize)
    f, G = sde.discretize(x, t)
    assert f.shape == x.shape, "discretize() 漂移项离散化错误"
    assert G.shape == (batch_size,), "discretize() 扩散项离散化错误"
    print("  discretize() 测试通过 ✔️\n")


# ------------------------- 执行测试 -------------------------
if __name__ == "__main__":
    # 测试所有 SDE 类
    test_sde_core(VPSDE)
    test_sde_core(subVPSDE)
    test_sde_core(VESDE, N=1000)  # VESDE 参数兼容

[38;5;2m[i 0721 16:24:37.768642 44 compiler.py:956] Jittor(1.3.9.14) src: /home/a516/anaconda3/envs/Jittor/lib/python3.8/site-packages/jittor[m
[38;5;2m[i 0721 16:24:37.774011 44 compiler.py:957] g++ at /usr/bin/g++(12.3.0)[m
[38;5;2m[i 0721 16:24:37.774549 44 compiler.py:958] cache_path: /home/a516/.cache/jittor/jt1.3.9/g++12.3.0/py3.8.20/Linux-6.6.87.2xef/13thGenIntelRCx37/4832/default[m
[38;5;2m[i 0721 16:24:37.778372 44 __init__.py:412] Found nvcc(12.8.61) at /usr/local/cuda-12.8/bin/nvcc.[m
[38;5;2m[i 0721 16:24:37.855370 44 __init__.py:412] Found addr2line(2.42) at /usr/bin/addr2line.[m
[38;5;2m[i 0721 16:24:37.916179 44 compiler.py:1013] cuda key:cu12.8.61[m
[38;5;2m[i 0721 16:24:38.215488 44 __init__.py:227] Total mem: 7.56GB, using 2 procs for compiling.[m
[38;5;2m[i 0721 16:24:38.277619 44 jit_compiler.cc:28] Load cc_path: /usr/bin/g++[m
[38;5;2m[i 0721 16:24:39.007779 44 init.cc:63] Found cuda archs: [89,][m
[38;5;3m[w 0721 16:24:39.198686 44 compile_exter

=== 测试 VPSDE ===
  sde() 测试通过 ✔️
  marginal_prob() 测试通过 ✔️
  先验采样/对数概率 测试通过 ✔️
  discretize() 测试通过 ✔️

=== 测试 subVPSDE ===
  sde() 测试通过 ✔️
  marginal_prob() 测试通过 ✔️
  先验采样/对数概率 测试通过 ✔️
  discretize() 测试通过 ✔️

=== 测试 VESDE ===
  sde() 测试通过 ✔️
  marginal_prob() 测试通过 ✔️
  先验采样/对数概率 测试通过 ✔️
  discretize() 测试通过 ✔️



In [3]:
x=jt.Var([1,2])
x.detach().cpu().numpy().reshape((-1,))

array([1, 2], dtype=int32)

In [10]:
import unittest
import sys
import os
# 添加项目根目录到 Python 搜索路径（确保能找到 models 文件夹）
sys.path.append(os.path.dirname(os.path.abspath(__file__)))

import jittor as jt
import numpy as np
# 从 models.utils 导入需要测试的函数/类
from models.utils import (
    register_model, get_model, create_model,
    to_flattened_numpy, from_flattened_numpy,
    get_model_fn, get_score_fn
)
import sde_lib  # 假设 sde_lib 与 models 同级或已在搜索路径中


# 初始化Jittor
jt.init()


# 定义测试用的模型和SDE（与之前相同，用于模拟测试环境）
@register_model(name="test_model")
class TestModel(jt.nn.Module):
    """用于测试的简单模型"""
    def __init__(self, config):
        super().__init__()
        self.config = config

    def execute(self, x, labels):
        """简单返回输入x的副本（模拟模型输出）"""
        return x.clone()


class MockVPSDE(sde_lib.VPSDE):
    """模拟VPSDE用于测试"""
    def __init__(self):
        self.N = 10  # 时间步数量
        self.sqrt_1m_alphas_cumprod = jt.linspace(0.1, 1.0, self.N)  # 模拟参数

    def marginal_prob(self, x, t):
        """返回均值和标准差（模拟）"""
        return x, jt.ones_like(x) * 0.5  # 标准差固定为0.5


class MockVESDE(sde_lib.VESDE):
    """模拟VESDE用于测试"""
    def __init__(self):
        self.T = 1.0  # 总时间
        self.N = 10  # 离散时间步数量


class TestUtils(unittest.TestCase):
    """测试修改后的工具函数"""

    def test_register_and_get_model(self):
        """测试模型注册与获取"""
        # 检查是否能正确获取已注册的模型
        model_cls = get_model("test_model")
        self.assertEqual(model_cls.__name__, "TestModel")

        # 测试未注册模型的错误
        with self.assertRaises(KeyError):
            get_model("unregistered_model")

    def test_from_flattened_numpy(self):
        """测试从展平的numpy数组创建Jittor张量"""
        # 生成原始数据
        original = np.random.rand(2, 3, 4)  # 形状(2,3,4)
        flattened = original.flatten()  # 展平为(24,)

        # 转换为Jittor张量
        recovered = from_flattened_numpy(flattened, shape=(2, 3, 4))

        # 验证形状和数据
        self.assertEqual(recovered.shape, (2, 3, 4))
        np.testing.assert_allclose(recovered.numpy(), original, rtol=1e-6)

    def test_to_flattened_numpy(self):
        """测试将Jittor张量展平为numpy数组"""
        # 创建Jittor张量
        x = jt.array(np.random.rand(2, 3, 4))

        # 展平为numpy
        flattened = to_flattened_numpy(x)

        # 验证形状和数据
        self.assertEqual(flattened.shape, (24,))  # 2*3*4=24
        np.testing.assert_allclose(flattened, x.numpy().flatten(), rtol=1e-6)

    def test_create_model(self):
        """测试模型创建函数"""
        # 模拟配置
        class MockConfig:
            class model:
                name = "test_model"
        config = MockConfig()

        # 创建模型
        model = create_model(config)

        # 验证模型类型
        self.assertIsInstance(model, TestModel)

    def test_get_model_fn(self):
        """测试模型函数包装（训练/评估模式）"""
        # 创建模型和配置
        model = TestModel(config=None)
        train_fn = get_model_fn(model, train=True)
        eval_fn = get_model_fn(model, train=False)

        # 测试输入输出（模型简单返回x，这里验证调用是否正常）
        x = jt.array(np.random.rand(2, 3))
        labels = jt.array([1, 2])

        train_out = train_fn(x, labels)
        eval_out = eval_fn(x, labels)

        # 验证输出与输入一致（因为TestModel返回x）
        self.assertTrue(jt.allclose(train_out, x))
        self.assertTrue(jt.allclose(eval_out, x))

    def test_get_score_fn_vpsde(self):
        """测试VPSDE的分数函数包装"""
        # 模拟SDE和模型
        sde = MockVPSDE()
        model = TestModel(config=None)  # 模型返回x（这里用于测试计算逻辑）
        score_fn = get_score_fn(sde, model, continuous=False)

        # 测试输入
        x = jt.array(np.random.rand(2, 3, 3, 1))  # 假设是图像数据(批量, H, W, C)
        t = jt.array([0.1, 0.5])  # 时间步

        # 计算分数
        score = score_fn(x, t)

        # 验证逻辑：VPSDE的分数应为 -model_out / std
        # 模型返回x，因此score应约为 -x / std（std来自sde.sqrt_1m_alphas_cumprod）
        labels = t * (sde.N - 1)  # 离散时间标签
        std = sde.sqrt_1m_alphas_cumprod[labels.long()]
        expected_score = -x / std[:, None, None, None]  # 广播std到x的形状

        self.assertTrue(jt.allclose(score, expected_score, rtol=1e-6))

    def test_get_score_fn_vesde(self):
        """测试VESDE的分数函数包装"""
        # 模拟SDE和模型
        sde = MockVESDE()
        model = TestModel(config=None)  # 模型返回x
        score_fn = get_score_fn(sde, model, continuous=False)

        # 测试输入
        x = jt.array(np.random.rand(2, 3, 3, 1))
        t = jt.array([0.2, 0.8])  # 时间步

        # 计算分数
        score = score_fn(x, t)

        # 验证逻辑：VESDE的分数直接返回model_out（模型返回x，因此score应等于x）
        self.assertTrue(jt.allclose(score, x, rtol=1e-6))


if __name__ == "__main__":
    unittest.main()

NameError: name '__file__' is not defined

In [2]:
"""Abstract SDE classes, Reverse SDE, and VE/VP SDEs."""
import abc
import jittor as jt
import numpy as np


class SDE(abc.ABC):
  """SDE abstract class. Functions are designed for a mini-batch of inputs."""

  def __init__(self, N):
    """Construct an SDE.

    Args:
      N: number of discretization time steps.
    """
    super().__init__()
    self.N = N

  @property
  @abc.abstractmethod
  def T(self):
    """End time of the SDE."""
    pass

  @abc.abstractmethod
  def sde(self, x, t):
    pass

  @abc.abstractmethod
  def marginal_prob(self, x, t):
    """Parameters to determine the marginal distribution of the SDE, $p_t(x)$."""
    pass

  @abc.abstractmethod
  def prior_sampling(self, shape):
    """Generate one sample from the prior distribution, $p_T(x)$."""
    pass

  @abc.abstractmethod
  def prior_logp(self, z):
    """Compute log-density of the prior distribution.

    Useful for computing the log-likelihood via probability flow ODE.

    Args:
      z: latent code
    Returns:
      log probability density
    """
    pass

  def discretize(self, x, t):
    """Discretize the SDE in the form: x_{i+1} = x_i + f_i(x_i) + G_i z_i.

    Useful for reverse diffusion sampling and probabiliy flow sampling.
    Defaults to Euler-Maruyama discretization.

    Args:
      x: a torch tensor
      t: a torch float representing the time step (from 0 to `self.T`)

    Returns:
      f, G
    """
    dt = 1 / self.N
    drift, diffusion = self.sde(x, t)
    f = drift * dt
    G = diffusion * jt.sqrt(jt.array(dt))
    return f, G

  def reverse(self, score_fn, probability_flow=False):
    """Create the reverse-time SDE/ODE.

    Args:
      score_fn: A time-dependent score-based model that takes x and t and returns the score.
      probability_flow: If `True`, create the reverse-time ODE used for probability flow sampling.
    """
    N = self.N
    T = self.T
    sde_fn = self.sde
    discretize_fn = self.discretize

    # Build the class for reverse-time SDE.
    class RSDE(self.__class__):
      def __init__(self):
        self.N = N
        self.probability_flow = probability_flow

      @property
      def T(self):
        return T

      def sde(self, x, t):
        """Create the drift and diffusion functions for the reverse SDE/ODE."""
        drift, diffusion = sde_fn(x, t)
        score = score_fn(x, t)
        drift = drift - diffusion[:, None, None, None] ** 2 * score * (0.5 if self.probability_flow else 1.)
        # Set the diffusion function to zero for ODEs.
        diffusion = 0. if self.probability_flow else diffusion
        return drift, diffusion

      def discretize(self, x, t):
        """Create discretized iteration rules for the reverse diffusion sampler."""
        f, G = discretize_fn(x, t)
        rev_f = f - G[:, None, None, None] ** 2 * score_fn(x, t) * (0.5 if self.probability_flow else 1.)
        rev_G = jt.zeros_like(G) if self.probability_flow else G
        return rev_f, rev_G

    return RSDE()


class VPSDE(SDE):
  def __init__(self, beta_min=0.1, beta_max=20, N=1000):
    """Construct a Variance Preserving SDE.

    Args:
      beta_min: value of beta(0)
      beta_max: value of beta(1)
      N: number of discretization steps
    """
    super().__init__(N)
    self.beta_0 = beta_min
    self.beta_1 = beta_max
    self.N = N
    self.discrete_betas = jt.linspace(beta_min / N, beta_max / N, N)
    self.alphas = 1. - self.discrete_betas
    self.alphas_cumprod = jt.cumprod(self.alphas, dim=0)
    self.sqrt_alphas_cumprod = jt.sqrt(self.alphas_cumprod)
    self.sqrt_1m_alphas_cumprod = jt.sqrt(1. - self.alphas_cumprod)

  @property
  def T(self):
    return 1

  def sde(self, x, t):
    beta_t = self.beta_0 + t * (self.beta_1 - self.beta_0)
    drift = -0.5 * beta_t[:, None, None, None] * x
    diffusion = jt.sqrt(beta_t)
    return drift, diffusion

  def marginal_prob(self, x, t):
    log_mean_coeff = -0.25 * t ** 2 * (self.beta_1 - self.beta_0) - 0.5 * t * self.beta_0
    mean = jt.exp(log_mean_coeff[:, None, None, None]) * x
    std = jt.sqrt(1. - jt.exp(2. * log_mean_coeff))
    return mean, std

  def prior_sampling(self, shape):
    return jt.randn(*shape)

  def prior_logp(self, z):
    shape = z.shape
    N = np.prod(shape[1:])
    logps = -N / 2. * np.log(2 * np.pi) - jt.sum(z ** 2, (1, 2, 3)) / 2.
    return logps

  def discretize(self, x, t):
    """DDPM discretization."""
    timestep = (t * (self.N - 1) / self.T).long()
    # Fxxk u
    discrete_betas = self.discrete_betas
    alphas = self.alphas
    
    beta = discrete_betas[timestep]
    alpha = alphas[timestep]
    
    sqrt_beta = jt.sqrt(beta)
    f = jt.sqrt(alpha)[:, None, None, None] * x - x
    G = sqrt_beta
    return f, G


class subVPSDE(SDE):
  def __init__(self, beta_min=0.1, beta_max=20, N=1000):
    """Construct the sub-VP SDE that excels at likelihoods.

    Args:
      beta_min: value of beta(0)
      beta_max: value of beta(1)
      N: number of discretization steps
    """
    super().__init__(N)
    self.beta_0 = beta_min
    self.beta_1 = beta_max
    self.N = N

  @property
  def T(self):
    return 1

  def sde(self, x, t):
    beta_t = self.beta_0 + t * (self.beta_1 - self.beta_0)
    drift = -0.5 * beta_t[:, None, None, None] * x
    discount = 1. - jt.exp(-2 * self.beta_0 * t - (self.beta_1 - self.beta_0) * t ** 2)
    diffusion = jt.sqrt(beta_t * discount)
    return drift, diffusion

  def marginal_prob(self, x, t):
    log_mean_coeff = -0.25 * t ** 2 * (self.beta_1 - self.beta_0) - 0.5 * t * self.beta_0
    mean = jt.exp(log_mean_coeff)[:, None, None, None] * x
    std = 1 - jt.exp(2. * log_mean_coeff)
    return mean, std

  def prior_sampling(self, shape):
    return jt.randn(*shape)

  def prior_logp(self, z):
    shape = z.shape
    N = np.prod(shape[1:])
    return -N / 2. * np.log(2 * np.pi) - jt.sum(z ** 2, (1, 2, 3)) / 2.


class VESDE(SDE):
  def __init__(self, sigma_min=0.01, sigma_max=50, N=1000):
    """Construct a Variance Exploding SDE.

    Args:
      sigma_min: smallest sigma.
      sigma_max: largest sigma.
      N: number of discretization steps
    """
    super().__init__(N)
    self.sigma_min = sigma_min
    self.sigma_max = sigma_max
    self.discrete_sigmas = jt.exp(jt.linspace(np.log(self.sigma_min), np.log(self.sigma_max), N))
    self.N = N

  @property
  def T(self):
    return 1

  def sde(self, x, t):
    sigma = self.sigma_min * (self.sigma_max / self.sigma_min) ** t
    drift = jt.zeros_like(x)
    diffusion = sigma * jt.sqrt(jt.Var(2 * (np.log(self.sigma_max) - np.log(self.sigma_min))))
    return drift, diffusion

  def marginal_prob(self, x, t):
    std = self.sigma_min * (self.sigma_max / self.sigma_min) ** t
    mean = x
    return mean, std

  def prior_sampling(self, shape):
    return jt.randn(*shape) * self.sigma_max

  def prior_logp(self, z):
    shape = z.shape
    N = np.prod(shape[1:])
    return -N / 2. * np.log(2 * np.pi * self.sigma_max ** 2) - jt.sum(z ** 2, (1, 2, 3)) / (2 * self.sigma_max ** 2)

  def discretize(self, x, t):
    """SMLD(NCSN) discretization."""
    timestep = (t * (self.N - 1) / self.T).int64()
    sigma = self.discrete_sigmas[timestep]
    adjacent_sigma = jt.where(timestep == 0, jt.zeros_like(t),
                                 self.discrete_sigmas[timestep - 1])
    f = jt.zeros_like(x)
    G = jt.sqrt(sigma ** 2 - adjacent_sigma ** 2)
    return f, G

# 确保Jittor使用CPU（如需GPU可改为jt.flags.use_cuda = 1）
jt.flags.use_cuda = 0

def test_sde_base_methods(sde, batch_size=2, img_size=32, channels=3):
    """测试SDE的基础方法"""
    # 创建测试输入
    x = jt.randn(batch_size, channels, img_size, img_size)  # 随机图像数据
    t = jt.rand(batch_size)  # 随机时间步 [0,1)
    
    # 测试sde方法
    drift, diffusion = sde.sde(x, t)
    assert drift.shape == x.shape, f"Drift形状错误: {drift.shape} vs {x.shape}"
    assert diffusion.shape == (batch_size,), f"Diffusion形状错误: {diffusion.shape} vs ({batch_size},)"
    
    # 测试marginal_prob方法
    mean, std = sde.marginal_prob(x, t)
    assert mean.shape == x.shape, f"Mean形状错误: {mean.shape} vs {x.shape}"
    assert std.shape == (batch_size,), f"Std形状错误: {std.shape} vs ({batch_size},)"
    
    # 测试prior_sampling方法
    sample_shape = (batch_size, channels, img_size, img_size)
    prior_sample = sde.prior_sampling(sample_shape)
    assert prior_sample.shape == sample_shape, f"先验采样形状错误: {prior_sample.shape} vs {sample_shape}"
    
    # 测试prior_logp方法
    logp = sde.prior_logp(prior_sample)
    assert logp.shape == (batch_size,), f"先验logp形状错误: {logp.shape} vs ({batch_size},)"
    
    # 测试discretize方法
    f, G = sde.discretize(x, t)
    assert f.shape == x.shape, f"Discretize f形状错误: {f.shape} vs {x.shape}"
    assert G.shape == (batch_size,), f"Discretize G形状错误: {G.shape} vs ({batch_size},)"
    
    print(f"{sde.__class__.__name__}基础方法测试通过")

def test_reverse_sde(sde, batch_size=2, img_size=32, channels=3):
    """测试反向SDE"""
    # 定义简单的score函数（仅用于测试）
    def score_fn(x, t):
        return jt.zeros_like(x)  # 零得分函数
    
    # 创建反向SDE
    rsde = sde.reverse(score_fn)
    
    # 测试反向SDE的基础属性
    assert rsde.T == sde.T, "反向SDE的终止时间错误"
    
    # 测试反向SDE的sde方法
    x = jt.randn(batch_size, channels, img_size, img_size)
    t = jt.rand(batch_size)
    drift, diffusion = rsde.sde(x, t)
    assert drift.shape == x.shape, f"反向SDE Drift形状错误: {drift.shape} vs {x.shape}"
    assert diffusion.shape == (batch_size,), f"反向SDE Diffusion形状错误: {diffusion.shape} vs ({batch_size},)"
    
    # 测试反向SDE的discretize方法
    f, G = rsde.discretize(x, t)
    assert f.shape == x.shape, f"反向SDE discretize f形状错误: {f.shape} vs {x.shape}"
    assert G.shape == (batch_size,), f"反向SDE discretize G形状错误: {G.shape} vs ({batch_size},)"
    
    print(f"{sde.__class__.__name__}反向SDE测试通过")

def test_vpsde():
    """测试VPSDE类"""
    sde = VPSDE(beta_min=0.1, beta_max=20, N=1000)
    test_sde_base_methods(sde)
    test_reverse_sde(sde)

def test_subvpsde():
    """测试subVPSDE类"""
    sde = subVPSDE(beta_min=0.1, beta_max=20, N=1000)
    test_sde_base_methods(sde)
    test_reverse_sde(sde)

def test_vesde():
    """测试VESDE类"""
    sde = VESDE(sigma_min=0.01, sigma_max=50, N=1000)
    test_sde_base_methods(sde)
    test_reverse_sde(sde)

if __name__ == "__main__":
    # 运行所有测试
    test_vpsde()
    test_subvpsde()
    test_vesde()
    print("所有SDE测试均通过！")

VPSDE基础方法测试通过
VPSDE反向SDE测试通过
subVPSDE基础方法测试通过
subVPSDE反向SDE测试通过
VESDE基础方法测试通过
VESDE反向SDE测试通过
所有SDE测试均通过！


In [15]:
import jittor as jt
import numpy as np
import traceback
from sampling import (
    get_predictor, get_corrector, get_pc_sampler, get_ode_sampler,
    EulerMaruyamaPredictor, ReverseDiffusionPredictor, AncestralSamplingPredictor,
    LangevinCorrector, NonePredictor, NoneCorrector
)
from sde_lib import VPSDE, VESDE, subVPSDE


def setup_test_env():
    """初始化测试环境：固定种子、打印设备信息"""
    jt.set_seed(42)
    device_mode = "GPU" if jt.flags.use_cuda else "CPU"
    print(f"【测试环境】设备：{device_mode} | Jittor版本：{jt.__version__} | 随机种子：42\n")
    return device_mode


def simple_score_fn(x, t):
    """简化分数函数（返回零张量，避免依赖复杂模型）"""
    return jt.zeros_like(x)


def print_tensor_info(tensor, name, step):
    """打印张量的关键信息（类型、形状、设备），用于定位类型错误"""
    if hasattr(tensor, "shape"):
        shape_str = tensor.shape
    else:
        shape_str = "无shape属性"
    
    if jt.flags.use_cuda and hasattr(tensor, "device"):
        device_str = tensor.device
    else:
        device_str = "CPU" if not jt.flags.use_cuda else "GPU（Jittor自动管理）"
    
    print(f"  [{step}] {name}：")
    print(f"    - 类型：{type(tensor)}")
    print(f"    - 形状：{shape_str}")
    print(f"    - 设备：{device_str}")
    print(f"    - 是否为方法：{callable(tensor)}\n")  # 关键：判断是否是方法对象


def test_step_wrapper(test_func, test_name):
    """测试步骤包装器：捕获异常并打印详细堆栈"""
    print("=" * 60)
    print(f"【测试阶段】{test_name}")
    print("=" * 60)
    try:
        test_func()
        print(f"【测试结果】{test_name} ✅ 通过\n")
    except Exception as e:
        print(f"【测试结果】{test_name} ❌ 失败")
        print(f"【错误类型】{type(e).__name__}: {str(e)}")
        print(f"【错误堆栈】\n{traceback.format_exc()}\n")
        raise  # 终止测试，优先解决底层错误

In [2]:
import jittor as jt
import numpy as np
import traceback
from scipy import integrate  # ODE采样依赖
# 导入待测试的采样模块和SDE定义
from sampling import (
    get_predictor, get_corrector, get_pc_sampler, get_ode_sampler,
    EulerMaruyamaPredictor, ReverseDiffusionPredictor, AncestralSamplingPredictor,
    LangevinCorrector, AnnealedLangevinDynamics, NonePredictor, NoneCorrector
)
from sde_lib import VPSDE, VESDE, subVPSDE


# -------------------------- 全局测试工具函数 --------------------------
def setup_test_env():
    """初始化测试环境：固定种子、打印设备信息"""
    jt.set_seed(42)
    device_mode = "GPU" if jt.flags.use_cuda else "CPU"
    print(f"【测试环境】设备：{device_mode} | Jittor版本：{jt.__version__} | 随机种子：42\n")
    return device_mode


def simple_score_fn(x, t):
    """简化分数函数（返回零张量，避免依赖复杂模型）"""
    return jt.zeros_like(x)


def print_tensor_info(tensor, name, step):
    """打印张量的关键信息（类型、形状、设备），用于定位类型错误"""
    if hasattr(tensor, "shape"):
        shape_str = tensor.shape
    else:
        shape_str = "无shape属性"
    
    if jt.flags.use_cuda and hasattr(tensor, "device"):
        device_str = tensor.device
    else:
        device_str = "CPU" if not jt.flags.use_cuda else "GPU（Jittor自动管理）"
    
    print(f"  [{step}] {name}：")
    print(f"    - 类型：{type(tensor)}")
    print(f"    - 形状：{shape_str}")
    print(f"    - 设备：{device_str}")
    print(f"    - 是否为方法：{callable(tensor)}\n")  # 关键：判断是否是方法对象


def test_step_wrapper(test_func, test_name):
    """测试步骤包装器：捕获异常并打印详细堆栈"""
    print("=" * 60)
    print(f"【测试阶段】{test_name}")
    print("=" * 60)
    try:
        test_func()
        print(f"【测试结果】{test_name} ✅ 通过\n")
    except Exception as e:
        print(f"【测试结果】{test_name} ❌ 失败")
        print(f"【错误类型】{type(e).__name__}: {str(e)}")
        print(f"【错误堆栈】\n{traceback.format_exc()}\n")
        raise  # 终止测试，优先解决底层错误


# -------------------------- 阶段1：测试SDE属性合法性 --------------------------
def test_sde_attributes():
    """测试SDE类的关键属性是否为张量（非方法）"""
    # 初始化3类SDE（覆盖所有使用场景）
    sde_configs = [
        ("VPSDE", VPSDE(beta_min=0.1, beta_max=20, N=100)),
        ("VESDE", VESDE(sigma_min=0.01, sigma_max=50, N=100)),
        ("subVPSDE", subVPSDE(beta_min=0.1, beta_max=20, N=100))
    ]
    
    # 验证每个SDE的关键属性
    for sde_name, sde in sde_configs:
        print(f"【SDE类型】{sde_name}")
        # 待验证的属性（键：属性名，值：是否必须存在）
        required_attrs = {
            "discrete_betas": sde_name in ["VPSDE", "subVPSDE"],  # VESDE无此属性
            "discrete_sigmas": sde_name == "VESDE",               # 仅VESDE有此属性
            "alphas": sde_name in ["VPSDE", "subVPSDE"]           # VESDE无此属性
        }
        
        for attr_name, is_required in required_attrs.items():
            if is_required:
                # 检查属性是否存在
                assert hasattr(sde, attr_name), f"{sde_name} 缺少属性 {attr_name}"
                attr_val = getattr(sde, attr_name)
                
                # 打印属性详细信息
                print_tensor_info(attr_val, f"{sde_name}.{attr_name}", step=f"SDE-1")
                
                # 核心断言：属性必须是Jittor张量，且不是方法
                assert isinstance(attr_val, jt.Var), f"{sde_name}.{attr_name} 不是jt.Var（当前类型：{type(attr_val)}）"
                assert not callable(attr_val), f"{sde_name}.{attr_name} 是方法对象（不可索引）"
            else:
                # 验证非必需属性不存在
                assert not hasattr(sde, attr_name), f"{sde_name} 不应有属性 {attr_name}"
        print("-" * 40)


# -------------------------- 阶段2：测试预测器 --------------------------
def test_basic_predictors():
    """测试无错误的基础预测器，确保环境正常"""
    batch_size = 2
    img_shape = (batch_size, 3, 32, 32)  # (B, C, H, W)
    t = jt.ones(batch_size) * 0.5  # 时间步（统一为0.5）
    sde = VPSDE(N=100)  # 用VPSDE测试基础预测器
    x = jt.randn(*img_shape)  # 随机输入
    
    # 待测试的基础预测器
    basic_predictors = [
        ("EulerMaruyamaPredictor", EulerMaruyamaPredictor),
        ("ReverseDiffusionPredictor", ReverseDiffusionPredictor),
        ("NonePredictor", NonePredictor)
    ]
    
    for pred_name, pred_cls in basic_predictors:
        print(f"【测试预测器】{pred_name}")
        # 1. 初始化预测器
        pred = pred_cls(sde=sde, score_fn=simple_score_fn, probability_flow=False)
        print_tensor_info(pred, f"{pred_name}实例", step="Pred-1")
        
        # 2. 调用update_fn（核心逻辑）
        print(f"  [Pred-2] 输入x类型：{type(x)} | 输入t类型：{type(t)}")
        x_out, x_mean_out = pred.update_fn(x, t)
        
        # 3. 验证输出
        assert x_out.shape == img_shape, f"输出形状错误：{x_out.shape} vs {img_shape}"
        assert isinstance(x_out, jt.Var), f"输出不是jt.Var（类型：{type(x_out)}）"
        print(f"  [Pred-3] 输出验证通过：x_out形状={x_out.shape} | 类型={type(x_out)}\n")


def test_ancestral_sampling_predictor():
    """逐行测试AncestralSamplingPredictor，定位方法索引错误"""
    batch_size = 2
    img_shape = (batch_size, 3, 32, 32)
    t = jt.ones(batch_size) * 0.5  # 时间步
    x = jt.randn(*img_shape)        # 随机输入
    
    # 测试2类SDE（Ancestral仅支持VPSDE/VESDE）
    sde_list = [
        ("VPSDE", VPSDE(beta_min=0.1, beta_max=20, N=100)),
        ("VESDE", VESDE(sigma_min=0.01, sigma_max=50, N=100))
    ]
    
    for sde_name, sde in sde_list:
        print(f"【重点测试】AncestralSamplingPredictor + {sde_name}")
        # 1. 初始化Ancestral预测器
        pred = AncestralSamplingPredictor(
            sde=sde, score_fn=simple_score_fn, probability_flow=False
        )
        print(f"  [Anc-1] 预测器初始化完成：{type(pred)}")
        
        # 2. 拆解update_fn逻辑（分VESDE/VPSDE分支）
        if isinstance(sde, VESDE):
            print("  [Anc-2] 进入VESDE分支（vesde_update_fn）")
            # 逐行执行vesde_update_fn，打印关键对象
            timestep = (t * (sde.N - 1) / sde.T).long()
            print_tensor_info(timestep, "timestep（时间步索引）", step="Anc-2a")
            
            # 关键：验证sde.discrete_sigmas是张量，且索引操作合法
            print_tensor_info(sde.discrete_sigmas, "sde.discrete_sigmas", step="Anc-2b")
            assert isinstance(sde.discrete_sigmas, jt.Var), "discrete_sigmas不是张量"
            
            sigma = sde.discrete_sigmas[timestep]  # 可能出错的索引行
            print_tensor_info(sigma, "sigma（索引后）", step="Anc-2c")
            
            # 继续执行剩余逻辑
            adjacent_sigma = jt.where(timestep == 0, jt.zeros_like(t), sde.discrete_sigmas[timestep - 1])
            score = pred.score_fn(x, t)
            print_tensor_info(score, "score（分数函数输出）", step="Anc-2d")
            
            x_mean = x + score * (sigma **2 - adjacent_sigma** 2)[:, None, None, None]
            std = jt.sqrt((adjacent_sigma **2 * (sigma** 2 - adjacent_sigma **2)) / (sigma** 2))
            noise = jt.randn_like(x)
            x_out = x_mean + std[:, None, None, None] * noise
            
        else:  # VPSDE分支
            print("  [Anc-2] 进入VPSDE分支（vpsde_update_fn）")
            # 逐行执行vpsde_update_fn，打印关键对象
            timestep = (t * (sde.N - 1) / sde.T).long()
            print_tensor_info(timestep, "timestep（时间步索引）", step="Anc-3a")
            
            # 关键：验证sde.discrete_betas是张量，且无错误调用
            print_tensor_info(sde.discrete_betas, "sde.discrete_betas", step="Anc-3b")
            assert isinstance(sde.discrete_betas, jt.Var), "discrete_betas不是张量"
            
            # 重点排查：用户代码中可能存在的.to[timestep]错误
            try:
                # 修正：删除.to（Jittor无需设备转移），直接索引
                beta = sde.discrete_betas[timestep]
                print_tensor_info(beta, "beta（索引后）", step="Anc-3c")
            except Exception as e:
                print(f"  [Anc-3c 错误] 索引sde.discrete_betas时失败：{str(e)}")
                print(f"  [关键检查] sde.discrete_betas.to 是否存在？{hasattr(sde.discrete_betas, 'to')}")
                if hasattr(sde.discrete_betas, 'to'):
                    print(f"  [关键检查] sde.discrete_betas.to 类型：{type(sde.discrete_betas.to)}")
                raise
            
            # 继续执行剩余逻辑
            score = pred.score_fn(x, t)
            print_tensor_info(score, "score（分数函数输出）", step="Anc-3d")
            
            x_mean = (x + beta[:, None, None, None] * score) / jt.sqrt(1. - beta)[:, None, None, None]
            noise = jt.randn_like(x)
            x_out = x_mean + jt.sqrt(beta)[:, None, None, None] * noise
        
        # 3. 验证最终输出
        assert x_out.shape == img_shape, f"Ancestral输出形状错误：{x_out.shape} vs {img_shape}"
        print(f"  [Anc-4] {sde_name}分支测试通过：x_out形状={x_out.shape}\n")


# -------------------------- 阶段3：测试校正器 --------------------------
def test_correctors():
    """测试所有校正器，重点验证alpha的索引合法性"""
    batch_size = 2
    img_shape = (batch_size, 3, 32, 32)
    t = jt.ones(batch_size) * 0.5
    x = jt.randn(*img_shape)
    snr = 0.1  # 信号噪声比
    n_steps = 1  # 校正器迭代步数
    
    # 测试组合：3类SDE × 3类校正器
    sde_list = [VPSDE(N=100), VESDE(N=100), subVPSDE(N=100)]
    corrector_list = [
        ("LangevinCorrector", LangevinCorrector),
        ("AnnealedLangevinDynamics", AnnealedLangevinDynamics),
        ("NoneCorrector", NoneCorrector)
    ]
    
    for sde in sde_list:
        sde_name = sde.__class__.__name__
        print(f"【校正器测试】SDE类型：{sde_name}")
        
        for corr_name, corr_cls in corrector_list:
            print(f"  [Corr-1] 测试校正器：{corr_name}")
            try:
                # 1. 初始化校正器
                corr = corr_cls(sde=sde, score_fn=simple_score_fn, snr=snr, n_steps=n_steps)
                
                # 2. 调用update_fn（核心逻辑）
                x_out, x_mean_out = corr.update_fn(x, t)
                
                # 3. 验证输出
                assert x_out.shape == img_shape, f"校正器输出形状错误：{x_out.shape} vs {img_shape}"
                print(f"  [Corr-2] {corr_name} 测试通过：x_out类型={type(x_out)}\n")
                
            except NotImplementedError as e:
                # 正常：部分SDE不支持某些校正器
                print(f"  [Corr-2] 跳过：{corr_name} 不支持 {sde_name}（{str(e)}）\n")
            except Exception as e:
                # 异常：打印详细信息
                print(f"  [Corr-2 错误] {corr_name} 测试失败：{str(e)}")
                # 定位alpha索引问题（校正器常见错误点）
                if "alpha" in str(e) or "index" in str(e):
                    timestep = (t * (sde.N - 1) / sde.T).long()
                    print(f"  [错误定位] timestep类型：{type(timestep)} | 值：{timestep}")
                    if hasattr(sde, "alphas"):
                        print(f"  [错误定位] sde.alphas类型：{type(sde.alphas)} | 是否可索引：{hasattr(sde.alphas, '__getitem__')}")
                raise


# -------------------------- 阶段4：测试完整采样流程 --------------------------
def test_pc_sampler_full():
    """测试完整PC采样流程（预测器+校正器组合）"""
    # 测试配置
    batch_size = 2
    img_shape = (3, 32, 32)  # 单样本形状 (C, H, W)
    full_shape = (batch_size, *img_shape)  # 批量形状 (B, C, H, W)
    sde = VPSDE(N=50)  # 简化步数（50步）加快测试
    snr = 0.1
    n_steps = 1
    denoise = True
    
    # 简化逆归一化函数（无实际缩放，仅返回原张量）
    def inverse_scaler(x):
        return x
    
    print(f"【PC采样器测试】配置：VPSDE + EulerMaruyama + Langevin | 批量={batch_size} | 形状={full_shape}")
    
    # 1. 创建PC采样器
    pc_sampler = get_pc_sampler(
        sde=sde,
        shape=full_shape,
        predictor=EulerMaruyamaPredictor,
        corrector=LangevinCorrector,
        inverse_scaler=inverse_scaler,
        snr=snr,
        n_steps=n_steps,
        continuous=False,
        denoise=denoise,
        eps=1e-3
    )
    
    # 2. 调用采样器（传入dummy模型，仅用于生成score_fn）
    dummy_model = lambda x, t: simple_score_fn(x, t)
    print(f"  [PC-1] 开始采样（共{sde.N}步）...")
    samples, nfe = pc_sampler(model=dummy_model)
    
    # 3. 验证采样结果
    assert samples.shape == full_shape, f"PC采样器输出形状错误：{samples.shape} vs {full_shape}"
    assert isinstance(samples, jt.Var), f"PC采样器输出不是jt.Var（类型：{type(samples)}）"
    print(f"  [PC-2] 采样完成：样本形状={samples.shape} | 函数调用次数={nfe}")
    print(f"  [PC-3] PC采样器全流程测试通过\n")


def test_ode_sampler_full():
    """测试完整ODE采样流程（依赖scipy求解器）"""
    batch_size = 2
    img_shape = (3, 32, 32)
    full_shape = (batch_size, *img_shape)
    sde = VESDE(N=50)  # VESDE适合ODE测试
    denoise = True
    
    def inverse_scaler(x):
        return x
    
    print(f"【ODE采样器测试】配置：VESDE + RK45求解器 | 批量={batch_size} | 形状={full_shape}")
    
    try:
        # 1. 创建ODE采样器
        ode_sampler = get_ode_sampler(
            sde=sde,
            shape=full_shape,
            inverse_scaler=inverse_scaler,
            denoise=denoise,
            method="RK45",
            eps=1e-3
        )
        
        # 2. 调用采样器
        dummy_model = lambda x, t: simple_score_fn(x, t)
        print("  [ODE-1] 开始ODE采样（依赖scipy.integrate）...")
        samples, nfe = ode_sampler(model=dummy_model)
        
        # 3. 验证结果
        assert samples.shape == full_shape, f"ODE采样器输出形状错误：{samples.shape} vs {full_shape}"
        print(f"  [ODE-2] 采样完成：样本形状={samples.shape} | ODE求解器调用次数={nfe}")
        print(f"  [ODE-3] ODE采样器全流程测试通过\n")
        
    except ImportError:
        print(f"  [ODE-错误] 缺少scipy，请安装：pip install scipy\n")
    except Exception as e:
        print(f"  [ODE-错误] 采样失败：{str(e)}")
        # 定位ODE函数中的from_flattened_numpy/to_flattened_numpy（若用户代码中存在）
        if "from_flattened_numpy" in str(e) or "to_flattened_numpy" in str(e):
            print("  [错误定位] 检查models.utils中的from/to_flattened_numpy是否适配Jittor")
        raise


# -------------------------- 执行全流程测试 --------------------------
if __name__ == "__main__":
    try:
        # 1. 初始化测试环境
        setup_test_env()
        
        # 2. 按阶段执行测试（前一阶段失败则终止，避免连锁错误）
        test_step_wrapper(test_sde_attributes, "1. SDE属性合法性测试")
        test_step_wrapper(test_basic_predictors, "2. 基础预测器测试")
        test_step_wrapper(test_ancestral_sampling_predictor, "3. AncestralSamplingPredictor重点测试")
        test_step_wrapper(test_correctors, "4. 校正器测试")
        test_step_wrapper(test_pc_sampler_full, "5. PC采样器全流程测试")
        test_step_wrapper(test_ode_sampler_full, "6. ODE采样器全流程测试")
        
        # 3. 最终结果
        print("=" * 70)
        print("🎉 全流程测试通过！sampling.py（Jittor版）无类型错误")
        print("=" * 70)
        
    except Exception as e:
        print("\n" + "=" * 70)
        print(f"❌ 全流程测试失败：{type(e).__name__}: {str(e)}")
        print("💡 关键排查建议：")
        if "'method' object is not subscriptable" in str(e):
            print("  - 检查是否存在 'sde.discrete_betas.to[timestep]' 这类代码（.to是方法，应删除）")
            print("  - 确保所有索引操作的对象是jt.Var（通过阶段1的SDE属性测试验证）")
        print("=" * 70)


【测试环境】设备：GPU | Jittor版本：1.3.9.14 | 随机种子：42

【测试阶段】1. SDE属性合法性测试
【SDE类型】VPSDE
  [SDE-1] VPSDE.discrete_betas：
    - 类型：<class 'jittor.jittor_core.Var'>
    - 形状：[100,]
    - 设备：GPU（Jittor自动管理）
    - 是否为方法：False

  [SDE-1] VPSDE.alphas：
    - 类型：<class 'jittor.jittor_core.Var'>
    - 形状：[100,]
    - 设备：GPU（Jittor自动管理）
    - 是否为方法：False

----------------------------------------
【SDE类型】VESDE
  [SDE-1] VESDE.discrete_sigmas：
    - 类型：<class 'jittor.jittor_core.Var'>
    - 形状：[100,]
    - 设备：GPU（Jittor自动管理）
    - 是否为方法：False

----------------------------------------
【SDE类型】subVPSDE
  [SDE-1] subVPSDE.discrete_betas：
    - 类型：<class 'jittor.jittor_core.Var'>
    - 形状：[100,]
    - 设备：GPU（Jittor自动管理）
    - 是否为方法：False

  [SDE-1] subVPSDE.alphas：
    - 类型：<class 'jittor.jittor_core.Var'>
    - 形状：[100,]
    - 设备：GPU（Jittor自动管理）
    - 是否为方法：False

----------------------------------------
【测试结果】1. SDE属性合法性测试 ✅ 通过

【测试阶段】2. 基础预测器测试
【测试预测器】EulerMaruyamaPredictor
  [Pred-1] EulerMaruyamaPredictor实例：


In [1]:
import jittor as jt
import numpy as np
from op import upfirdn2d

# 确保使用CUDA
jt.flags.use_cuda = 1

def test_upfirdn2d_basic():
    # 设置随机种子，确保结果可复现
    jt.set_seed(42)
    
    # 测试用例1：基础功能测试
    print("测试基础功能...")
    batch, channel, height, width = 2, 3, 8, 8
    kernel_size = 3
    
    # 创建随机输入和kernel
    input = jt.randn(batch, channel, height, width)
    kernel = jt.randn(kernel_size, kernel_size)
    
    # 测试不同参数组合
    params_list = [
        (1, 1, (0, 0)),  # 无上下采样，无填充
        (2, 1, (1, 1)),  # 上采样x2，无下采样，填充1
        (1, 2, (1, 1)),  # 无上采样，下采样x2，填充1
        (2, 2, (2, 2))   # 上采样x2+下采样x2，填充2
    ]
    
    for up, down, pad in params_list:
        print(f"测试参数: up={up}, down={down}, pad={pad}")
        output = upfirdn2d(input, kernel, up, down, pad)
        
        # 计算预期输出形状
        kernel_h, kernel_w = kernel_size, kernel_size
        expected_h = (height * up + pad[0] + pad[1] - kernel_h) // down + 1
        expected_w = (width * up + pad[0] + pad[1] - kernel_w) // down + 1
        
        # 检查输出形状是否正确
        assert output.shape == (batch, channel, expected_h, expected_w), \
            f"形状不匹配! 预期: {(batch, channel, expected_h, expected_w)}, 实际: {output.shape}"
        print(f"形状检查通过: {output.shape}")
    
    print("基础功能测试通过!\n")

def test_upfirdn2d_backward():
    # 测试用例2：反向传播测试
    print("测试反向传播...")
    batch, channel, height, width = 1, 2, 4, 4
    kernel_size = 3
    
    # 创建需要计算梯度的输入
    input = jt.randn(batch, channel, height, width, requires_grad=True)
    kernel = jt.randn(kernel_size, kernel_size)
    
    # 前向计算
    output = upfirdn2d(input, kernel, up=2, down=2, pad=(1, 1))
    
    # 计算损失（简单求和）
    loss = output.sum()
    
    # 反向传播
    loss.backward()
    
    # 检查梯度是否存在且形状正确
    assert input.grad is not None, "输入梯度不存在!"
    assert input.grad.shape == input.shape, "梯度形状不匹配!"
    
    # 检查梯度是否为非零值（随机输入下梯度应为非零）
    assert not jt.allclose(input.grad, jt.zeros_like(input.grad)), "梯度值全为零!"
    
    print("反向传播测试通过!")

if __name__ == "__main__":
    test_upfirdn2d_basic()
    test_upfirdn2d_backward()
    print("所有测试通过!")

[38;5;2m[i 0831 17:20:36.021648 04 compiler.py:956] Jittor(1.3.9.14) src: /home/a516/anaconda3/envs/Jittor/lib/python3.8/site-packages/jittor[m
[38;5;2m[i 0831 17:20:36.026710 04 compiler.py:957] g++ at /usr/bin/g++(12.3.0)[m
[38;5;2m[i 0831 17:20:36.027513 04 compiler.py:958] cache_path: /home/a516/.cache/jittor/jt1.3.9/g++12.3.0/py3.8.20/Linux-6.6.87.2xef/13thGenIntelRCx37/4832/default[m
[38;5;2m[i 0831 17:20:36.037754 04 __init__.py:412] Found nvcc(12.8.61) at /usr/local/cuda-12.8/bin/nvcc.[m
[38;5;2m[i 0831 17:20:36.180715 04 __init__.py:412] Found addr2line(2.42) at /usr/bin/addr2line.[m
[38;5;2m[i 0831 17:20:36.245548 04 compiler.py:1013] cuda key:cu12.8.61[m
[38;5;2m[i 0831 17:20:36.712410 04 __init__.py:227] Total mem: 15.43GB, using 5 procs for compiling.[m
[38;5;2m[i 0831 17:20:36.840692 04 jit_compiler.cc:28] Load cc_path: /usr/bin/g++[m
[38;5;2m[i 0831 17:20:37.071908 04 init.cc:63] Found cuda archs: [89,][m
[38;5;3m[w 0831 17:20:37.409088 04 compile_exte

测试基础功能...
测试参数: up=1, down=1, pad=(0, 0)


RuntimeError: Wrong inputs arguments, Please refer to examples(help(jt.ops.code)).

Types of your inputs are:
 self	= module,
 args	= (list, NanoString, list, ),
 kwargs	= {cuda_src=str, extras=dict, },

The function declarations are:
 VarHolder* code(NanoVector shape,  NanoString dtype, vector<VarHolder*>&& inputs={},  string&& cpu_src="",  vector<string>&& cpu_grad_src={},  string&& cpu_header="",  string&& cuda_src="",  vector<string>&& cuda_grad_src={},  string&& cuda_header="",  DataMap&& data={})
 vector_to_tuple<VarHolder*> code_(vector<NanoVector>&& shapes,  vector<NanoString>&& dtypes, vector<VarHolder*>&& inputs={},  string&& cpu_src="",  vector<string>&& cpu_grad_src={},  string&& cpu_header="",  string&& cuda_src="",  vector<string>&& cuda_grad_src={},  string&& cuda_header="",  DataMap&& data={})
 vector_to_tuple<VarHolder*> code__(vector<VarHolder*>&& inputs, vector<VarHolder*>&& outputs,  string&& cpu_src="",  vector<string>&& cpu_grad_src={},  string&& cpu_header="",  string&& cuda_src="",  vector<string>&& cuda_grad_src={},  string&& cuda_header="",  DataMap&& data={})

Failed reason:[38;5;1m[f 0831 17:20:37.909518 04 pyjt_jit_op_maker.cc:17225] Not a valid keyword: extras[m