In [1]:
import numpy as np
import torch
import time
import os

from sklearn import (linear_model, model_selection, preprocessing,
                     pipeline)
from scipy.spatial.distance import pdist

from kymatio.torch import HarmonicScattering3D

from kymatio.scattering3d.backend.torch_backend \
    import TorchBackend3D

from kymatio.scattering3d.utils \
    import generate_weighted_sum_of_gaussians

from kymatio.datasets import fetch_qm7
from kymatio.caching import get_cache_dir

In [2]:
qm7_url = "https://qmml.org/Datasets/gdb7-12.zip"
def fetch_qm7(align=True, cache=True):
    """Fetches the GDB7-12 dataset"""

    cache_path = get_cache_dir("qm7")
    print(cache_path)
    if cache:
        cache_path = get_cache_dir("qm7")
        if align:
            aligned_filename = os.path.join(cache_path, "qm7_aligned.npz")
            if os.path.exists(aligned_filename):
                f = np.load(aligned_filename)
                return dict(**f)

        # load unaligned if existent, align if required
        unaligned_filename = os.path.join(cache_path, "qm7.npz")
        if os.path.exists(unaligned_filename):
            f = np.load(unaligned_filename)
            if align:
                _pca_align_positions(f['positions'], f['charges'], inplace=True)
                np.savez(aligned_filename, **f)
            return dict(**f)

    path = get_cache_dir("qm7")
    qm7_file = os.path.join(path, "gdb7-12/dsgdb7ae.xyz")
    print(qm7_file)
    if not os.path.exists(qm7_file):
        qm7_zipfile = os.path.join(path, "gdb7-12.zip")
        if not os.path.exists(qm7_zipfile):
            _download(qm7_url, qm7_zipfile)
            import zipfile
            with zipfile.ZipFile(qm7_zipfile, "r") as zipref:
                zipref.extractall(path)

    qm7 = read_xyz(qm7_file)
    if cache:
        np.savez(unaligned_filename, **qm7)

    if align:
        _pca_align_positions(qm7['positions'], qm7['charges'], inplace=True)
        if cache:
            np.savez(aligned_filename, **qm7)

    return qm7

In [31]:
# Changing code to fetch in smaller set

qm7 = fetch_qm7(align=True, cache = True)
pos_full = qm7['positions']
full_charges_complete = qm7['charges']

cache_path = get_cache_dir("qm7")
with open(cache_path + '/gdb7-12/dsgdb7ae_subset1k.txt', 'r') as file:
    # Create an empty list to store the lines
    lines = []

    # Iterate over the lines of the file
    for line in file:
        # Remove the newline character at the end of the line
        line = line.strip()

        # Append the line to the list
        lines.append(line)
        
#Take single string stored in above list and convert it into string of indexes
string_indexes = lines[0].split(',')
indexes = [len(string_indexes)]

#convert string indexes to int indexes
j = 0
for i in string_indexes:
    indexes.append(int(i.strip()))

    
length = len(indexes)
#create pos and full_charges as np arrays with length fitted to dataset 
pos = np.empty([length, 23, 3])
full_charges = np.empty([length, 23])

#fill pos and full_charges with data
j = 0
for i in indexes2:
    pos[j] = pos_full[i]
    full_charges[j] = full_charges_complete[j]
    j = j + 1

n_molecules = pos.shape[0]
print(n_molecules)

/Users/michaeljacob/kymatio_cache/qm7
/Users/michaeljacob/kymatio_cache/qm7
(7165, 23, 3)
(1001, 23, 3)
1001


In [22]:
# Starting here working over 1k subset data set and going to try full code but with the torch.cat implemented
# Additional time output added

mask = full_charges <= 2
valence_charges = full_charges * mask

mask = np.logical_and(full_charges > 2, full_charges <= 10)
valence_charges += (full_charges - 2) * mask

mask = np.logical_and(full_charges > 10, full_charges <= 18)
valence_charges += (full_charges - 10) * mask


overlapping_precision = 1e-1
sigma = 2.0
min_dist = np.inf

for i in range(n_molecules):
    n_atoms = np.sum(full_charges[i] != 0)
    pos_i = pos[i, :n_atoms, :]
    min_dist = min(min_dist, pdist(pos_i).min())

delta = sigma * np.sqrt(-8 * np.log(overlapping_precision))
pos = pos * delta / min_dist

M, N, O = 192, 128, 96

grid = np.mgrid[-M//2:-M//2+M, -N//2:-N//2+N, -O//2:-O//2+O]
grid = np.fft.ifftshift(grid)

J = 2
L = 3
integral_powers = [0.5, 1.0, 2.0, 3.0]

scattering = HarmonicScattering3D(J=J, shape=(M, N, O),
                                  L=L, sigma_0=sigma,
                                  integral_powers=integral_powers)

use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
scattering.to(device)

batch_size = 8
n_batches = int(np.ceil(n_molecules / batch_size))

order_0, orders_1_and_2 = [], []
print('Computing solid harmonic scattering coefficients of '
      '{} molecules from the QM7 database on {}'.format(
        n_molecules,   "GPU" if use_cuda else "CPU"))
print('sigma: {}, L: {}, J: {}, integral powers: {}'.format(
        sigma, L, J, integral_powers))


  pos = pos * delta / min_dist


Computing solid harmonic scattering coefficients of 32 molecules from the QM7 database on CPU
sigma: 2.0, L: 3, J: 2, integral powers: [0.5, 1.0, 2.0, 3.0]


In [24]:

this_time = None
last_time = None

#Run regression for first two intervals to catch errors in appending/functions 
#before running entire transform across all batches
for i in range(2):
    this_time = time.time()
    if last_time is not None:
        dt = this_time - last_time
        print("Iteration {} ETA: [{:02}:{:02}:{:02}]".format(
                    i + 1, int(((n_batches - i - 1) * dt) // 3600),
                    int((((n_batches - i - 1) * dt) // 60) % 60),
                    int(((n_batches - i - 1) * dt) % 60)))
    else:
        print("Iteration {} ETA: {}".format(i + 1, '-'))
    last_time = this_time
    time.sleep(1)
    
    t_0 = time.time()

    # Extract the current batch.
    start = i * batch_size
    end = min(start + batch_size, n_molecules)

    pos_batch = pos[start:end]
    full_batch = full_charges[start:end]
    val_batch = valence_charges[start:end]

    # Calculate the density map for the nuclear charges and transfer
    # to PyTorch.
    full_density_batch = generate_weighted_sum_of_gaussians(grid,
            pos_batch, full_batch, sigma)
    full_density_batch = torch.from_numpy(full_density_batch)
    full_density_batch = full_density_batch.to(device).float()

    # Compute zeroth-order, first-order, and second-order scattering
    # coefficients of the nuclear charges.
    full_order_0 = TorchBackend3D.compute_integrals(full_density_batch,
                                     integral_powers)
    full_scattering = scattering(full_density_batch)

    # Compute the map for valence charges.
    val_density_batch = generate_weighted_sum_of_gaussians(grid,
            pos_batch, val_batch, sigma)
    val_density_batch = torch.from_numpy(val_density_batch)
    val_density_batch = val_density_batch.to(device).float()

    # Compute scattering coefficients for the valence charges.
    val_order_0 = TorchBackend3D.compute_integrals(val_density_batch,
                                    integral_powers)
    val_scattering = scattering(val_density_batch)

    # Take the difference between nuclear and valence charges, then
    # compute the corresponding scattering coefficients.
    core_density_batch = full_density_batch - val_density_batch

    core_order_0 = TorchBackend3D.compute_integrals(core_density_batch,
                                     integral_powers)
    core_scattering = scattering(core_density_batch)

    # Stack the nuclear, valence, and core coefficients into arrays
    # and append them to the output.
    batch_order_0 = torch.stack(
        (full_order_0, val_order_0, core_order_0), dim=-1)
    batch_orders_1_and_2 = torch.stack(
        (full_scattering, val_scattering, core_scattering), dim=-1)

    
    order_0 = torch.Tensor(order_0)
    print(type(order_0))
    order_0 = torch.cat((order_0, batch_order_0), 0)

    orders_1_and_2 = torch.Tensor(orders_1_and_2)
    orders_1_and_2 = torch.cat((orders_1_and_2, batch_orders_1_and_2), 0)
    # Changing to utilize torch cat instead of append since error thrown from append
    
    t_f = time.time()
    
    print("--- %s seconds ---" % (t_f - t_0))
    print("order 0 size is", order_0.size(), "orders 1 and 2 size is", orders_1_and_2.size())

Iteration 1 ETA: -
<class 'torch.Tensor'>
--- 404.09992599487305 seconds ---
order 0 size is torch.Size([8, 4, 3]) orders 1 and 2 size is torch.Size([8, 6, 4, 4, 3])
Iteration 2 ETA: [00:13:30]
<class 'torch.Tensor'>
--- 388.24966621398926 seconds ---
order 0 size is torch.Size([16, 4, 3]) orders 1 and 2 size is torch.Size([16, 6, 4, 4, 3])


In [25]:

#perform rest of batches
for i in range(2, n_batches):
    this_time = time.time()
    if last_time is not None:
        dt = this_time - last_time
        print("Iteration {} ETA: [{:02}:{:02}:{:02}]".format(
                    i + 1, int(((n_batches - i - 1) * dt) // 3600),
                    int((((n_batches - i - 1) * dt) // 60) % 60),
                    int(((n_batches - i - 1) * dt) % 60)))
    else:
        print("Iteration {} ETA: {}".format(i + 1, '-'))
    last_time = this_time
    time.sleep(1)
    
    t_0 = time.time()

    # Extract the current batch.
    start = i * batch_size
    end = min(start + batch_size, n_molecules)

    pos_batch = pos[start:end]
    full_batch = full_charges[start:end]
    val_batch = valence_charges[start:end]

    # Calculate the density map for the nuclear charges and transfer
    # to PyTorch.
    full_density_batch = generate_weighted_sum_of_gaussians(grid,
            pos_batch, full_batch, sigma)
    full_density_batch = torch.from_numpy(full_density_batch)
    full_density_batch = full_density_batch.to(device).float()

    # Compute zeroth-order, first-order, and second-order scattering
    # coefficients of the nuclear charges.
    full_order_0 = TorchBackend3D.compute_integrals(full_density_batch,
                                     integral_powers)
    full_scattering = scattering(full_density_batch)

    # Compute the map for valence charges.
    val_density_batch = generate_weighted_sum_of_gaussians(grid,
            pos_batch, val_batch, sigma)
    val_density_batch = torch.from_numpy(val_density_batch)
    val_density_batch = val_density_batch.to(device).float()

    # Compute scattering coefficients for the valence charges.
    val_order_0 = TorchBackend3D.compute_integrals(val_density_batch,
                                    integral_powers)
    val_scattering = scattering(val_density_batch)

    # Take the difference between nuclear and valence charges, then
    # compute the corresponding scattering coefficients.
    core_density_batch = full_density_batch - val_density_batch

    core_order_0 = TorchBackend3D.compute_integrals(core_density_batch,
                                     integral_powers)
    core_scattering = scattering(core_density_batch)

    # Stack the nuclear, valence, and core coefficients into arrays
    # and append them to the output.
    batch_order_0 = torch.stack(
        (full_order_0, val_order_0, core_order_0), dim=-1)
    batch_orders_1_and_2 = torch.stack(
        (full_scattering, val_scattering, core_scattering), dim=-1)

    
    order_0 = torch.Tensor(order_0)
    order_0 = torch.cat((order_0, batch_order_0), 0)

    orders_1_and_2 = torch.Tensor(orders_1_and_2)
    orders_1_and_2 = torch.cat((orders_1_and_2, batch_orders_1_and_2), 0)
    # Changing to utilize torch cat instead of append since error thrown from append
    
    t_f = time.time()
    
    print("--- %s seconds ---" % (t_f - t_0))
    print("order 0 size is", order_0.size(), "orders 1 and 2 size is", orders_1_and_2.size())

Iteration 3 ETA: [00:11:49]
--- 396.92416310310364 seconds ---
order 0 size is torch.Size([24, 4, 3]) orders 1 and 2 size is torch.Size([24, 6, 4, 4, 3])
Iteration 4 ETA: [00:00:00]
--- 398.6881170272827 seconds ---
order 0 size is torch.Size([32, 4, 3]) orders 1 and 2 size is torch.Size([32, 6, 4, 4, 3])


In [27]:
# NOTE: NEED TO EDIT TO FIT SUBSET AFTER ABOVE CODE SUCCEDES

#commented out code which alters the space in which the arrays are stored
#order_0 = torch.cat(order_0, dim=0)
#orders_1_and_2 = torch.cat(orders_1_and_2, dim=0)

#order_0 = (order_0).cpu().numpy()
#orders_1_and_2 = orders_1_and_2.cpu().numpy()

order_0 = order_0.reshape((n_molecules, -1))

orders_1_and_2 = orders_1_and_2.reshape((n_molecules, -1))

basename = 'qm7_L_{}_J_{}_sigma_{}_MNO_{}_powers_{}.npy'.format(
        L, J, sigma, (M, N, O), integral_powers)

cache_dir = get_cache_dir("qm7/experiments")

filename = os.path.join(cache_dir, 'order_0_' + basename)
np.save(filename, order_0)

filename = os.path.join(cache_dir, 'orders_1_and_2' + basename)
np.save(filename, orders_1_and_2)

scattering_coef = np.concatenate([order_0, orders_1_and_2], axis=1)

qm7 = fetch_qm7()
target_full = qm7['energies']
print(target_full.shape)
print(len(indexes))

#takes relevant indexes for energy levels
j = 0
target = np.empty([length])
for i in indexes2:
    target[j] = target_full[i]
    j = j + 1

#Number of molecules must be divisible by number of folds
n_folds = 7

P = np.random.permutation(n_molecules).reshape((n_folds, -1))

cross_val_folds = []

for i_fold in range(n_folds):
    fold = (np.concatenate(P[np.arange(n_folds) != i_fold], axis=0),
            P[i_fold])
    cross_val_folds.append(fold)
    
#removes NaN data for large datasets and replaces with 0 to preserve eneregy
scattering_coef[np.isnan(scattering_coef)] = 0

alphas = 10.0 ** (-np.arange(1, 10))
for i, alpha in enumerate(alphas):
    scaler = preprocessing.StandardScaler()
    ridge = linear_model.Ridge(alpha=alpha)

    regressor = pipeline.make_pipeline(scaler, ridge)
    
    target_prediction = model_selection.cross_val_predict(regressor,
            X=scattering_coef, y=target, cv=cross_val_folds)

    #prints MAE, RMSE
    #expected MAE to be <3.5 kcal/mol
    MAE = np.mean(np.abs(target_prediction - target))
    RMSE = np.sqrt(np.mean((target_prediction - target) ** 2))

    print('Ridge regression, alpha: {}, MAE: {}, RMSE: {}'.format(
        alpha, MAE, RMSE))

32
torch.Size([32, 12])
/Users/michaeljacob/kymatio_cache/qm7
(7165,)
1001
Ridge regression, alpha: 0.1, MAE: 125.01755332946777, RMSE: 175.72193052351165
Ridge regression, alpha: 0.01, MAE: 151.22376441955566, RMSE: 201.13080230635114
Ridge regression, alpha: 0.001, MAE: 167.951904296875, RMSE: 231.26390582397724
Ridge regression, alpha: 0.0001, MAE: 284.95350456237793, RMSE: 388.7481623801966
Ridge regression, alpha: 1e-05, MAE: 200.37871932983398, RMSE: 271.2891914429772


  dual_coef = linalg.solve(K, y, assume_a="pos", overwrite_a=False)
  dual_coef = linalg.solve(K, y, assume_a="pos", overwrite_a=False)
  dual_coef = linalg.solve(K, y, assume_a="pos", overwrite_a=False)
  dual_coef = linalg.solve(K, y, assume_a="pos", overwrite_a=False)
  dual_coef = linalg.solve(K, y, assume_a="pos", overwrite_a=False)
  dual_coef = linalg.solve(K, y, assume_a="pos", overwrite_a=False)


Ridge regression, alpha: 1e-06, MAE: 199.75620079040527, RMSE: 270.91078063038753
Ridge regression, alpha: 1e-07, MAE: 199.87989234924316, RMSE: 270.9607164138058
Ridge regression, alpha: 1e-08, MAE: 199.87989234924316, RMSE: 270.9607164138058
Ridge regression, alpha: 1e-09, MAE: 199.87989234924316, RMSE: 270.9607164138058




In [None]:
""" This script will test the submodules used by the scattering module"""
import os
import io
import numpy as np
import pytest
from kymatio import HarmonicScattering3D

backends = []

from kymatio.scattering3d.backend.numpy_backend import backend
backends.append(backend)


def relative_difference(a, b):
    return np.sum(np.abs(a - b)) / max(np.sum(np.abs(a)), np.sum(np.abs(b)))


@pytest.mark.parametrize("backend", backends)
def test_against_standard_computations(backend):
    file_path = os.path.abspath(os.path.dirname(__file__))
    with open(os.path.join(file_path, 'test_data_3d.npz'), 'rb') as f:
        buffer = io.BytesIO(f.read())
        data = np.load(buffer)
    x = data['x']
    scattering_ref = data['Sx']
    J = data['J']
    L = data['L']
    integral_powers = data['integral_powers']

    M = x.shape[1]

    batch_size = x.shape[0]

    N, O = M, M
    sigma = 1

    scattering = HarmonicScattering3D(J=J, shape=(M, N, O), L=L,
                                      sigma_0=sigma, method='integral',
                                      integral_powers=integral_powers,
                                      max_order=2, backend=backend,
                                      frontend='numpy')

    order_0 = backend.compute_integrals(x, integral_powers)
    scattering.max_order = 2
    scattering.method = 'integral'
    scattering.integral_powers = integral_powers
    orders_1_and_2 = scattering(x)

    order_0 = order_0.reshape((batch_size, -1))
    start = 0
    end = order_0.shape[1]
    order_0_ref = scattering_ref[:, start:end]

    orders_1_and_2 = orders_1_and_2.reshape((batch_size, -1))
    start = end
    end += orders_1_and_2.shape[1]
    orders_1_and_2_ref = scattering_ref[:, start:end]

    order_0_diff_cpu = relative_difference(order_0_ref, order_0)
    orders_1_and_2_diff_cpu = relative_difference(
        orders_1_and_2_ref, orders_1_and_2)

    assert order_0_diff_cpu < 1e-6, "CPU : order 0 do not match, diff={}".format(order_0_diff_cpu)
    assert orders_1_and_2_diff_cpu < 1e-6, "CPU : orders 1 and 2 do not match, diff={}".format(orders_1_and_2_diff_cpu)
    assert orders_1_and_2.dtype == np.dtype(np.float32)


@pytest.mark.parametrize("backend", backends)
def test_scattering_batch_shape_agnostic(backend):
    J = 2
    shape = (16, 16, 16)

    S = HarmonicScattering3D(J=J, shape=shape, frontend='numpy', backend=backend)
    for k in range(3):
        with pytest.raises(RuntimeError) as ve:
            S(np.zeros(shape[:k]))
        assert 'at least three' in ve.value.args[0]

    x = np.zeros(shape=shape)

    Sx = S(x)

    assert len(Sx.shape) == 3
    assert Sx.dtype == np.dtype(np.float32)

    coeffs_shape = Sx.shape[-3:]

    test_shapes = ((1,) + shape, (2,) + shape, (2, 2) + shape,
                   (2, 2, 2) + shape)

    for test_shape in test_shapes:
        x = np.zeros(shape=test_shape)
        x = x

        Sx = S(x)

        assert len(Sx.shape) == len(test_shape)
        assert Sx.shape[-3:] == coeffs_shape
        assert Sx.shape[:-3] == test_shape[:-3]
        assert Sx.dtype == np.dtype(np.float32)
        
print(backends)
print(test_scattering_batch_shape_agnostic())

In [None]:
""" This script will test the submodules used by the scattering module"""
import torch
import os
import io
import numpy as np
import pytest
from kymatio.torch import HarmonicScattering3D
from kymatio.scattering3d.utils import generate_weighted_sum_of_gaussians

backends = []

skcuda_available = False
try:
    if torch.cuda.is_available():
        from skcuda import cublas
        import cupy
        skcuda_available = True
except:
    Warning('torch_skcuda backend not available.')

if skcuda_available:
    from kymatio.scattering3d.backend.torch_skcuda_backend import backend
    backends.append(backend)


from kymatio.scattering3d.backend.torch_backend import backend
backends.append(backend)

if torch.cuda.is_available():
    devices = ['cuda', 'cpu']
else:
    devices = ['cpu']


def relative_difference(a, b):
    return np.sum(np.abs(a - b)) / max(np.sum(np.abs(a)), np.sum(np.abs(b)))


@pytest.mark.parametrize("device", devices)
@pytest.mark.parametrize("backend", backends)
def test_FFT3d_central_freq_batch(device, backend):
    # Checked the 0 frequency for the 3D FFT
    for device in devices:
        x = torch.zeros(1, 32, 32, 32, 1).float()
        if device == 'gpu':
            x = x.cuda()
        a = x.sum()
        y = backend.rfft(x)
        c = y[:, 0, 0, 0].sum()
        assert (c - a).abs().sum() < 1e-6


@pytest.mark.parametrize("device", devices)
@pytest.mark.parametrize("backend", backends)
def test_fft3d_error(backend, device):
    x = torch.randn(1, 4, 4, 4, 2)
    with pytest.raises(TypeError) as record:
        backend.rfft(x)
    assert 'real' in record.value.args[0]

    x = torch.randn(1, 4, 4, 4, 1)
    with pytest.raises(TypeError) as record:
        backend.ifft(x)
    assert 'complex' in record.value.args[0]

    x = torch.randn(4, 4, 4, 1)
    x = x.to(device)
    y = x[::2, ::2, ::2]

    with pytest.raises(RuntimeError) as record:
        backend.rfft(y)
    assert 'must be contiguous' in record.value.args[0]

    x = torch.randn(4, 4, 4, 2)
    x = x.to(device)
    y = x[::2, ::2, ::2]

    with pytest.raises(RuntimeError) as record:
        backend.ifft(y)
    assert 'must be contiguous' in record.value.args[0]


@pytest.mark.parametrize("device", devices)
@pytest.mark.parametrize("backend", backends)
def test_cdgmm3d(device, backend):
    if not backend.name.endswith('_skcuda') or device != 'cpu':
        x = torch.zeros(2, 3, 4, 2).to(device)
        x[..., 0] = 2
        x[..., 1] = 3

        y = torch.zeros_like(x)
        y[..., 0] = 4
        y[..., 1] = 5

        prod = torch.zeros_like(x)
        prod[..., 0] = x[..., 0] * y[..., 0] - x[..., 1] * y[..., 1]
        prod[..., 1] = x[..., 0] * y[..., 1] + x[..., 1] * y[..., 0]

        z = backend.cdgmm3d(x, y)

        assert (z - prod).norm().cpu().item() < 1e-7

        with pytest.raises(RuntimeError) as record:
            x = torch.randn((3, 4, 3, 2), device=device)
            x = x[:, 0:3, ...]
            y = torch.randn((3, 3, 3, 2), device=device)
            backend.cdgmm3d(x, y)
        assert "contiguous" in record.value.args[0]

        with pytest.raises(RuntimeError) as record:
            x = torch.randn((3, 3, 3, 2), device=device)
            y = torch.randn((3, 4, 3, 2), device=device)
            y = y[:, 0:3, ...]
            backend.cdgmm3d(x, y)
        assert "contiguous" in record.value.args[0]

        with pytest.raises(RuntimeError) as record:
            x = torch.randn((3, 3, 3, 2), device=device)
            y = torch.randn((4, 4, 4, 2), device=device)
            backend.cdgmm3d(x, y)
        assert "not compatible" in record.value.args[0]

        with pytest.raises(TypeError) as record:
            x = torch.randn(3, 3, 3, 2).double()
            y = torch.randn(3, 3, 3, 2)
            backend.cdgmm3d(x, y)
        assert " must be of the same dtype" in record.value.args[0]

    if backend.name.endswith('_skcuda'):
        x = torch.randn((3, 3, 3, 2), device=torch.device('cpu'))
        y = torch.randn((3, 3, 3, 2), device=torch.device('cpu'))
        with pytest.raises(TypeError) as record:
            backend.cdgmm3d(x, y)
        assert "must be CUDA" in record.value.args[0]


@pytest.mark.parametrize("device", devices)
@pytest.mark.parametrize("backend", backends)
def test_complex_modulus(backend, device):
    if backend.name == "torch_skcuda" and device == "cpu":
        pytest.skip("The skcuda backend does not support CPU tensors.")
    x = torch.randn(4, 3, 2).to(device)
    xm = torch.sqrt(x[..., 0] ** 2 + x[..., 1] ** 2)
    y = backend.modulus(x)
    y = y.reshape(y.shape[:-1])
    assert (y - xm).norm() < 1e-7


@pytest.mark.parametrize("device", devices)
@pytest.mark.parametrize("backend", backends)
def test_against_standard_computations(device, backend):
    if backend.name.endswith('_skcuda') and device == "cpu":
        pytest.skip("The skcuda backend does not support CPU tensors.")

    file_path = os.path.abspath(os.path.dirname(__file__))
    with open(os.path.join(file_path, 'test_data_3d.npz'), 'rb') as f:
        buffer = io.BytesIO(f.read())
        data = np.load(buffer)
    x = torch.from_numpy(data['x'])
    scattering_ref = torch.from_numpy(data['Sx'])
    J = data['J']
    L = data['L']
    integral_powers = data['integral_powers']

    M = x.shape[1]

    batch_size = x.shape[0]

    N, O = M, M
    sigma = 1

    scattering = HarmonicScattering3D(J=J, shape=(M, N, O), L=L,
            sigma_0=sigma, method='integral',
            integral_powers=integral_powers, max_order=2, backend=backend)

    scattering.to(device)
    x = x.to(device)

    order_0 = backend.compute_integrals(x, integral_powers)
    scattering.max_order = 2
    scattering.method = 'integral'
    scattering.integral_powers = integral_powers
    orders_1_and_2 = scattering(x)

    order_0 = order_0.cpu().numpy().reshape((batch_size, -1))
    start = 0
    end = order_0.shape[1]
    order_0_ref = scattering_ref[:, start:end].cpu().numpy()

    orders_1_and_2 = orders_1_and_2.cpu().numpy().reshape((batch_size, -1))
    start = end
    end += orders_1_and_2.shape[1]
    orders_1_and_2_ref = scattering_ref[:, start:end].cpu().numpy()

    order_0_diff_cpu = relative_difference(order_0_ref, order_0)
    orders_1_and_2_diff_cpu = relative_difference(
        orders_1_and_2_ref, orders_1_and_2)

    assert order_0_diff_cpu < 1e-6, "CPU : order 0 do not match, diff={}".format(order_0_diff_cpu)
    assert orders_1_and_2_diff_cpu < 1e-6, "CPU : orders 1 and 2 do not match, diff={}".format(orders_1_and_2_diff_cpu)
    assert orders_1_and_2.dtype == np.dtype(np.float32)


@pytest.mark.parametrize("device", devices)
@pytest.mark.parametrize("backend", backends)
def test_solid_harmonic_scattering(device, backend):
    if backend.name.endswith('_skcuda') and device == "cpu":
        pytest.skip("The skcuda backend does not support CPU tensors.")

    # Compare value to analytical formula in the case of a single Gaussian
    centers = np.zeros((1, 1, 3))
    weights = np.ones((1, 1))
    sigma_gaussian = 3.
    sigma_0_wavelet = 3.
    M, N, O, J, L = 128, 128, 128, 1, 3
    grid = np.mgrid[-M // 2:-M // 2 + M, -N // 2:-N // 2 + N, -O // 2:-O // 2 + O]
    grid = grid.astype('float32')
    grid = np.fft.ifftshift(grid, axes=(1, 2, 3))
    x = torch.from_numpy(generate_weighted_sum_of_gaussians(grid, centers,
        weights, sigma_gaussian)).to(device).float()
    scattering = HarmonicScattering3D(J=J, shape=(M, N, O), L=L,
            sigma_0=sigma_0_wavelet, max_order=1, method='integral',
            integral_powers=[1], backend=backend).to(device)

    scattering.max_order = 1
    scattering.method = 'integral'
    scattering.integral_powers = [1]

    s = scattering(x)

    for j in range(J+1):
        sigma_wavelet = sigma_0_wavelet*2**j
        k = sigma_wavelet / np.sqrt(sigma_wavelet**2 + sigma_gaussian**2)
        for l in range(1, L+1):
            err = torch.abs(s[0, j, l, 0] - k ** l).sum()/(1e-6+s[0, j, l, 0].abs().sum())
            assert err<1e-4


@pytest.mark.parametrize("device", devices)
@pytest.mark.parametrize("backend", backends)
def test_larger_scales(device, backend):
    if backend.name.endswith('_skcuda') and device == "cpu":
        pytest.skip("The skcuda backend does not support CPU tensors.")

    shape = (32, 32, 32)
    L = 3
    sigma_0 = 1

    x = torch.randn((1,) + shape).to(device)

    for J in range(3, 4+1):
        scattering = HarmonicScattering3D(J=J, shape=shape, L=L, sigma_0=sigma_0, backend=backend).to(device)
        scattering.method = 'integral'
        Sx = scattering(x)


@pytest.mark.parametrize("device", devices)
@pytest.mark.parametrize("backend", backends)
def test_scattering_batch_shape_agnostic(device, backend):
    if backend.name.endswith('_skcuda') and device == "cpu":
        pytest.skip("The skcuda backend does not support CPU tensors.")

    J = 2
    shape = (16, 16, 16)

    S = HarmonicScattering3D(J=J, shape=shape, backend=backend)

    for k in range(3):
        with pytest.raises(RuntimeError) as ve:
            S(torch.zeros(shape[:k]))
        assert 'at least three' in ve.value.args[0]

    x = torch.zeros(shape)

    x = x.to(device)
    S.to(device)

    Sx = S(x)

    assert len(Sx.shape) == 3

    coeffs_shape = Sx.shape[-3:]

    test_shapes = ((1,) + shape, (2,) + shape, (2, 2) + shape,
                   (2, 2, 2) + shape)

    for test_shape in test_shapes:
        x = torch.zeros(test_shape)

        x = x.to(device)

        Sx = S(x)

        assert len(Sx.shape) == len(test_shape)
        assert Sx.shape[-3:] == coeffs_shape
        assert Sx.shape[:-3] == test_shape[:-3]

In [None]:
from kymatio.scattering3d.utils import sqrt
import pytest
import numpy as np

def test_utils():
    # Simple test
    x = np.arange(8)
    y = sqrt(x**2)
    assert (y == x).all()

    # Test problematic case
    x = np.arange(8193, dtype='float32')
    y = sqrt(x**2)
    assert (y == x).all()

    # Make sure we still don't let in negatives...
    with pytest.warns(RuntimeWarning) as record:
        x = np.array([-1, 0, 1])
        y = sqrt(x)
    assert "Negative" in record[0].message.args[0]

    # ...unless they are complex numbers!
    with pytest.warns(None) as record:
        x = np.array([-1, 0, 1], dtype='complex64')
        y = sqrt(x)
    assert not record.list