In [None]:
import os
from tqdm import tqdm
from pathlib import Path
import numpy as np
import torch.nn as nn
import matplotlib.pyplot as plt

from scatspectra import *
from scatspectra.layers import TimeAverage, WindowSelector

ILLUSTATION_PATH = Path(os.getcwd()) / "illustration"


save = False


def save_figure(filename, dpi=None):
    if not ILLUSTATION_PATH.exists():
        ILLUSTATION_PATH.mkdir()
    if save:
        plt.savefig(
            ILLUSTATION_PATH / filename,
            dpi=300 if dpi is None else dpi,
            bbox_inches="tight",
        )


cuda_enabled = torch.cuda.is_available()

# Test of github repository

In [None]:
# CHECK ALL MODELS RUN
x = load_data(
    name="smrw",
    R=4,
    T=2**10,
    H=0.5,
    lam=0.3,
    gamma=1 / (2**6) / 256,
    K0=0.03,
    alpha=0.23,
    beta=0.23,
)
x = np.concatenate([x] * 4, axis=1)

for mtype in tqdm(
    [
        None,
        "scat_spectra",
        "inv_scat_spectra",
        "scat_marginal+scat_spectra",
        "scat_marginal",
    ]
):
    Rx = analyze(
        x.astype(np.float32),
        model_type=mtype,
        diago_n=True,
        J=6,
        normalize="each_ps",
        cuda=False,
        nchunks=2,
    )
    Rx = analyze(
        x.astype(np.float64),
        model_type=mtype,
        diago_n=True,
        J=6,
        normalize="each_ps",
        cuda=False,
    )
print("PASSED")

In [None]:
# SCATTERING r > 2
x = load_data(
    name="smrw",
    R=1,
    T=2**8,
    H=0.5,
    lam=0.3,
    gamma=1 / (2**6) / 256,
    K0=0.03,
    alpha=0.23,
    beta=0.23,
)
x = np.concatenate([x] * 15, axis=1)

Sx = analyze(x, model_type=None, J=8, normalize="each_ps", cuda=cuda_enabled, r=3)
Rx = analyze(
    x, model_type="scat_marginal", J=8, normalize="each_ps", cuda=cuda_enabled, r=3
)

test1 = torch.abs(Sx.query(r=3, n=[10, 11], j1=2, j2=4, j3=6).y).mean(-1, keepdims=True)
test2 = Rx.query(r=3, n=[10, 11], j1=2, j2=4, j3=6, q=1.0).y

print(torch.abs(test1 - test2).sum())

In [None]:
# SCATTERING TEST
T = 2**12

x = load_data(name="smrw", R=16, T=T, lam=0.1, H=0.5)
Sx1 = analyze(x, model_type=None, normalize="each_ps")
Sx2 = analyze(x, model_type=None, normalize="batch_ps")

Wx1 = Sx1.query(r=1).y
Wx2 = Sx2.query(r=1).y

print(torch.abs(torch.abs(Wx1).pow(2.0).mean(-1) - 1.0).sum())
print(torch.abs(torch.abs(Wx2).pow(2.0).mean(-1).mean(0) - 1.0).sum())

In [None]:
# DESCRIPTION TEST
x = load_data(
    name="smrw",
    R=32,
    T=2**12,
    H=0.5,
    lam=0.1,
    gamma=1 / (2**12) / 256,
    K0=0.03,
    alpha=0.23,
    beta=0.23,
)
Rx = analyze(x, J=8, model_type="scat_marginal+scat_spectra", normalize="each_ps")
Rx_real = format_to_real(Rx)

y1 = Rx.query(coeff_type="skewness", jr1=3, jl1=4).y
y2_c = Rx_real.query(coeff_type="skewness", jr1=3, jl1=4).y
y2 = y2_c[:, :1, :] + 1j * y2_c[:, 1:, :]

test = torch.abs(y1 - y2).sum()
print(test)

In [None]:
# AVERAGING OPERATOR TEST (requires gpu with memory > 20Gb)
x = load_data(name="fbm", R=1, T=2**12, H=0.65).astype(np.float32)


class Pooling(nn.Module):
    def __init__(self, kernel_size):
        super(Pooling, self).__init__()
        self.pool = nn.AvgPool1d(kernel_size)

    def forward(self, x):
        y = self.pool(x.view(x.shape[0], -1, x.shape[-1]))
        return y.view(x.shape[:-1] + (-1,))


Rx = analyze(
    x,
    J=[7, 6, 6],
    Q=[6, 2, 2],
    r=3,
    model_type="scat_marginal",
    estim_operator=Pooling(kernel_size=16),
    cuda=cuda_enabled,
)

print("PASSED")

In [None]:
# MARGINAL TEST
T = 2**12
J = 8

x = load_data(
    name="smrw",
    R=16,
    T=T,
    H=0.5,
    lam=0.1,
    gamma=1 / T / 256,
    K0=0.03,
    alpha=0.23,
    beta=0.23,
)
x = x.reshape(4, 4, -1)

Sx = analyze(x, model_type=None, J=J, cuda=False, nchunks=1)
Rx_mar = analyze(
    x,
    model_type="scat_marginal",
    J=J,
    normalize="each_ps",
    qs=[1.0, 2.0],
    cuda=False,
    nchunks=1,
)
Rx_covscat = analyze(
    x,
    model_type="scat_marginal+scat_spectra",
    J=J,
    normalize="each_ps",
    cuda=False,
    nchunks=1,
)

ps = torch.abs(Sx.query(n=2, r=1, is_low=False).y).pow(2.0).mean(-1)[0, :]
moments_mar = Rx_mar.query(n=2, q=1.0, r=2, j1=2).y[0, :, 0]
Wx = Sx.query(n=2, r=2, j1=2)
moments_scat = torch.abs(Wx.y).mean(-1)[0, :] / ps[2].pow(0.5)
moments_scat2 = Rx_covscat.query(coeff_type="scat_marginal", nl=2, jl1=2).y[0, :, 0]

print(torch.abs(moments_scat2 - moments_mar).sum())
print(torch.abs(moments_scat - moments_scat2).sum())

In [None]:
# ANALYSIS TEST
B = 128
T = 2**12
J = 8

x = load_data(
    name="smrw",
    R=B,
    T=T,
    H=0.5,
    lam=0.1,
    gamma=1 / T / 256,
    K0=0.03,
    alpha=0.23,
    beta=0.23,
)
Rx_scat = analyze(x, model_type="scat_marginal", J=J, cuda=False, nchunks=B)
Rx_cov1 = analyze(x, model_type="scat_spectra", J=J, cuda=False, nchunks=1)
Rx_cov2 = analyze(x, model_type="scat_spectra", J=J, cuda=False, nchunks=2 * B)
Rx_covstat = analyze(
    x,
    model_type="inv_scat_spectra",
    J=J,
    normalize="batch_ps",
    cuda=cuda_enabled,
    nchunks=B // 2,
)
print(torch.abs(Rx_cov1.y - Rx_cov2.y).sum())
print("DONE")
plot_dashboard(
    [Rx_scat, Rx_cov1, Rx_cov2, Rx_covstat],
    labels=["scat_marginal", "scat_spectra1", "scat_spectra2", "inv_scat_spectra"],
    fontsize=15,
)

In [None]:
# MULTI Q TEST
B = 1
T = 2**9

x = load_data(
    name="smrw",
    R=B,
    T=T,
    H=0.5,
    lam=0.1,
    gamma=1 / T / 256,
    K0=0.03,
    alpha=0.23,
    beta=0.23,
)
x = np.float32(x)
Rx_scat = analyze(
    x,
    model_type="scat_marginal",
    J=[5, 4],
    Q=[8, 16],
    wav_type=["battle_lemarie", "morlet"],
    cuda=cuda_enabled,
    nchunks=1,
)
Rx_cov = analyze(
    x,
    model_type="scat_spectra",
    J=[5, 4],
    Q=[2, 4],
    wav_type=["battle_lemarie", "morlet"],
    cuda=cuda_enabled,
    nchunks=1,
)
print("PASSED")

In [None]:
# DIFFERENT ESTIMATORS (NUMERIC)
N = 25
T = 2**8
J = 6

x = load_data(
    name="smrw",
    R=1,
    T=T,
    H=0.5,
    lam=0.3,
    gamma=1 / (2**6) / 256,
    K0=0.03,
    alpha=0.23,
    beta=0.23,
)
x = np.concatenate([x] * N, axis=1)


ave1 = TimeAverage()
ave2 = WindowSelector(np.arange(T // 4, 3 * T // 4))
ave3 = TimeAverage(np.arange(T // 4, 3 * T // 4))

for mtype in [
    None,
    "scat_spectra",
    "inv_scat_spectra",
    "scat_marginal+scat_spectra",
    "scat_marginal",
]:
    Rx = analyze(
        x, model_type=mtype, J=J, normalize="each_ps", estim_operator=ave1, cuda=False
    )
    Rx = analyze(
        x, model_type=mtype, J=J, normalize="each_ps", estim_operator=ave2, cuda=False
    )
    Rx = analyze(
        x, model_type=mtype, J=J, normalize="each_ps", estim_operator=ave3, cuda=False
    )

Sx = analyze(x, model_type=None, J=J, qs=[2.0], cuda=False)
Rx1 = analyze(
    x, model_type="scat_marginal", J=J, qs=[2.0], estim_operator=ave2, cuda=False
)
Rx2 = analyze(
    x, model_type="scat_marginal", J=J, qs=[2.0], estim_operator=ave3, cuda=False
)

test1 = torch.abs(Sx.query(n=4, r=1).y).pow(2.0)[..., T // 4 : 3 * T // 4].mean(-1)
test2 = Rx1.query(q=2, n=4, r=1).y.mean(-1)
test3 = Rx2.query(q=2, n=4, r=1).y[:, :, 0]

print(torch.abs(test1 - test2).sum())
print(torch.abs(test2 - test3).sum())

In [None]:
# SELF-SIMILARITY SCORE
x = np.random.randn(128, 2**12)
Rx = analyze(x, J=6, normalize="batch_ps")

score_ref1, score1 = self_simi_obstruction_score(
    x, Rx=None, J=6, nchunks=64, cuda=cuda_enabled
)
score_ref2, score2 = self_simi_obstruction_score(
    None, Rx=Rx.mean_batch(), J=6, nchunks=64, cuda=cuda_enabled
)
score_ref3, score3 = self_simi_obstruction_score(
    None, Rx=Rx, J=6, nchunks=64, cuda=False
)
print("PASSED")

In [None]:
# GENERATION TEST (VISUAL)
B = 1
T = 2**12
J = 9
cache_path = (
    None  # set a path, e.g. Path("/home/<user>/_cache_path/"), to cache data generation
)

x = load_data(
    name="smrw",
    R=B,
    T=T,
    H=0.5,
    lam=0.3,
    gamma=1 / T / 256,
    K0=0.03,
    alpha=0.23,
    beta=0.23,
    cache_path=cache_path,
)
x_gen = generate(
    x,
    J=J,
    S=1,
    max_iterations=1000,
    cuda=cuda_enabled,
    tol_optim=1e-2,
    cache_path=cache_path,
)
print("DONE")

Rx = analyze(x, model_type="scat_spectra", J=J, cuda=cuda_enabled, nchunks=1)
Rx_gen = analyze(x_gen, model_type="scat_spectra", J=J, cuda=cuda_enabled, nchunks=1)
plot_dashboard([Rx, Rx_gen], labels=["original", "generated"])

# VISUALIZATION
fig, axes = plt.subplots(2, 1, figsize=(10, 5))
axes[0].plot(np.diff(x)[0, 0, :], color="lightskyblue", linewidth=0.5)
axes[1].plot(np.diff(x_gen)[0, 0, :], color="coral", linewidth=0.5)
axes[0].set_ylim(-4.5, 4.5)
axes[1].set_ylim(-4.5, 4.5)

# Github README

In [None]:
save = False

from scatspectra import load_data, analyze

# DATA
x_brownian = load_data(name="fbm", R=256, T=6063, H=0.5)
x_mrw = load_data(name="mrw", R=256, T=6063, H=0.5, lam=0.2)
x_snp = load_data(name="snp")  # S&P500 daily prices from 2000 to 2024

# ANALYSIS
scat_brownian = analyze(x_brownian)
scat_mrw = analyze(x_mrw)
scat_snp = analyze(np.log10(x_snp))

# VISUALIZATION
plot_dashboard([scat_brownian, scat_mrw, scat_snp], labels=["Brownian", "mrw", "S&P"])

for x in [x_brownian, x_mrw, x_snp]:
    print((x**2).mean())

save_figure("dashboard_brown_mrw_snp.png")

## Self-similarity section

In [None]:
# not self-similar example
T = 4724
np.random.seed(8)

ts = np.linspace(0, 2 * np.pi, T)
ts = np.roll(ts, -3500)

r = 3
a = np.random.choice(T, r, replace=False)
b = np.random.randn(r)

eia = [np.exp(2 * np.pi * t / T * 1j) for t in a]
P = scipy.interpolate.lagrange(eia, b)


def x(t):
    #     return np.real(P(np.cos(t) + 1j * np.sin(t))) # general smooth function
    return np.cos(2 * t)  # fourier vector


# sees 3 dilated at once
ys = []
for j in range(3):
    y = x(ts[: T // (2**j)])
    ys.append(y)

fig, axes = plt.subplots(3, figsize=(10, 9))
for ax, y in zip(axes.ravel(), ys):
    ax.set_xticklabels([])
    ax.set_yticklabels([])
    ax.set_facecolor(tuple(0.95 for i in range(3)))
    ax.grid(None)
    y -= y.mean()
    y /= y.std()
    ax.plot(np.arange(y.size), y, color="black")
    ax.set_ylim(-3.0, 3.0)

In [None]:
save = False
i = 2

plt.figure(figsize=(10, 3))
plt.plot(ys[i], color="black")
ax = plt.gca()
ax.set_xticklabels([])
ax.set_yticklabels([])
ax.set_facecolor((0.95,) * 3)
ax.grid(None)
ax.set_xlim(0, ys[i].size - 1)
save_figure(f"not_self_similar_{i}.png")

In [None]:
plt.figure(figsize=(10, 3))
plt.plot(np.arange(10))

In [None]:
x2 = np.stack([x(np.roll(ts, shift)) for shift in np.arange(T // 2)])[:, None, :]
print(x2.shape)
self_simi_obstruction_score(x2, J=8, nchunks=128, cuda=cuda_enabled)

## Generation

In [None]:
# need to restart kernel for this specific cell
save = True

from scatspectra import generate

# DATA
x = load_data(name="mrw", R=1, T=4096, H=0.5, lam=0.3)

# GENERATION
x_gen = generate(x, cuda=cuda_enabled, tol_optim=1e-2)

# VISUALIZATION
fig, axes = plt.subplots(2, 1, figsize=(10, 5))
axes[0].plot(np.diff(x)[0, 0, :], color="lightskyblue", linewidth=0.5)
axes[1].plot(np.diff(x_gen)[0, 0, :], color="coral", linewidth=0.5)
axes[0].set_ylim(-0.1, 0.1)
axes[1].set_ylim(-0.1, 0.1)

save_figure("generation.png")