# Level and Power

In [1]:
from joblib import Parallel, delayed
from tqdm import tqdm

import numpy as np
import polars as pl
from scipy.linalg import sqrtm
from scipy.stats import chi2

import pickle
from pathlib import Path
from typing import Callable

from gpy_test import GPY

from data_generation.arma import (
    generate_arma_sample,
    generate_arma_covariance_matrix,
)

from data_generation.other import (
    factor_model_sample,
    common_random_dependence_sample,
)

In [2]:
# We reproduce here the settings used for LRV
fs = [lambda x: (x - 1) ** 2]

alpha = 0.66
N_range = np.linspace(1000, 8000, 8)
c = 1 / 2  # not really needed, but that's to reproduce the same M as for LRV test

# default values for the iid time series. We choose a small AR as that
# creates a very spiky spectral density if too large
time_ar, time_ma = 0.1, 0.5

# always repeat an experiment K times
n_repeats = 10**1

# threshold to detect
level = 0.1
chi2_values = chi2.ppf(np.array([level / 2, 1 - level / 2]), df=len(fs))

# generate real gaussian data
is_complex_gaussian = True

# various computation (complex contour integral) should result in a
# real value. We check that the imaginary part is smaller than this
tolerance = 1e-5

# path to the stored experiment results for easy reloading
data_storage_path = "./data/{experiment_type}/{experiment_id}.pickle"

In [3]:
# Define the functions to store and load experiment results
def run_batch(
    run: Callable, n_repeats: int, experiment_type: str, run_kwargs: dict
) -> None:
    experiment_id = "_".join(
        f"{key}={value}"
        for key, value in run_kwargs.items()
        if isinstance(value, int) or isinstance(value, float)
    )
    resolved_data_path = Path(
        data_storage_path.format(
            experiment_type=experiment_type, experiment_id=experiment_id
        )
    )
    if resolved_data_path.exists():
        return

    print(f"Running experiment {experiment_type} with id {experiment_id}")

    resolved_data_path.parent.mkdir(parents=True, exist_ok=True)

    results = [run(**run_kwargs) for _ in range(n_repeats)]
    with open(resolved_data_path, "wb") as handle:
        pickle.dump(results, handle)


def load_batch(experiment_type: str) -> pl.DataFrame:
    # load the samples
    all_results = []

    resolved_data_path = Path(
        data_storage_path.format(experiment_type=experiment_type, experiment_id="none")
    ).parent
    for file in resolved_data_path.iterdir():
        if file.stem == ".DS_Store":
            continue

        with open(file, "rb") as handle:
            results = pickle.load(handle)
        all_results += results

    return pl.DataFrame(all_results)

# Level check
# ARMA time series

## Constant ARMA coefs

In [4]:
def run(
    N,
    M,
    time_ar,
    time_ma,
    is_complex_gaussian,
):
    y = generate_arma_sample(
        N,
        M,
        time_ar,
        time_ma,
        is_complex_gaussian,
    )

    gpy_result = GPY(y, fs, is_complex_gaussian=is_complex_gaussian)

    return {
        "N": N,
        "M": M,
        "time_ar": time_ar,
        "time_ma": time_ma,
        "is_complex_gaussian": is_complex_gaussian,
        "test_statistic": gpy_result.test_statistic,
        "is_positive": gpy_result.is_positive(level),
    }

In [6]:
tasks = []
for N in N_range:
    M = int(N**alpha * c)

    task = (
        run,
        n_repeats,
        f"arma_no_dependence",
        {
            "N": int(N),
            "M": int(M),
            "time_ar": time_ar,
            "time_ma": time_ma,
            "is_complex_gaussian": is_complex_gaussian,
        },
    )

    tasks.append(task)

# shuffle the tasks to distribute the load over time
np.random.shuffle(tasks)
results = Parallel(n_jobs=-1)(delayed(run_batch)(*task) for task in tqdm(tasks))

df = load_batch(f"arma_no_dependence")
pivot = (
    df
    # df.with_columns(
    #     [
    #         # (pl.col("p_value") < level).alias("is_test_positive").cast(pl.Float64),
    #         (
    #             (pl.col("test_statistic") < chi2_values[0])
    #             | (pl.col("test_statistic") > chi2_values[1])
    #         )
    #         .alias("is_test_positive")
    #         .cast(pl.Float64),
    #     ]
    # )
    .group_by(["N"])
    .mean()
    .to_pandas()
)
pivot[["N", "is_positive"]].sort_values("N")

100%|██████████| 8/8 [00:00<00:00, 12237.21it/s]


Unnamed: 0,N,is_positive
2,1000,0.0
3,2000,0.2
4,3000,0.0
7,4000,0.1
6,5000,0.1
1,6000,0.0
5,7000,0.0
0,8000,0.1


## Random ARMA coefs

In [7]:
def run(
    N,
    M,
    is_complex_gaussian,
):
    ar, ma = np.random.uniform(-0.5, 0.5, M), np.random.uniform(-0.5, 0.5, M)

    y = generate_arma_sample(
        N,
        M,
        ar,
        ma,
        is_complex_gaussian,
    )

    gpy_result = GPY(y, fs, is_complex_gaussian=is_complex_gaussian)

    return {
        "N": N,
        "M": M,
        "is_complex_gaussian": is_complex_gaussian,
        "test_statistic": gpy_result.test_statistic,
        "is_positive": gpy_result.is_positive(level),
    }

In [8]:
tasks = []
for N in N_range:
    M = int(N**alpha * c)

    task = (
        run,
        n_repeats,
        f"arma_no_dependence_multidim",
        {
            "N": int(N),
            "M": int(M),
            "is_complex_gaussian": is_complex_gaussian,
        },
    )

    tasks.append(task)

# shuffle the tasks to distribute the load over time
np.random.shuffle(tasks)
results = Parallel(n_jobs=-1)(delayed(run_batch)(*task) for task in tqdm(tasks))


df = load_batch(f"arma_no_dependence_multidim")
pivot = (
    # df.with_columns(
    #     [
    #         # (pl.col("p_value") < level).alias("is_test_positive").cast(pl.Float64),
    #         (
    #             (pl.col("test_statistic") < chi2_values[0])
    #             | (pl.col("test_statistic") > chi2_values[1])
    #         )
    #         .alias("is_test_positive")
    #         .cast(pl.Float64),
    #     ]
    # )
    df.group_by(["N"]).mean().to_pandas()
)
pivot[["N", "is_positive"]].sort_values("N")

100%|██████████| 8/8 [00:00<00:00, 10512.04it/s]


Running experiment arma_no_dependence_multidim with id N=6000_M=155_is_complex_gaussian=True
Running experiment arma_no_dependence_multidim with id N=7000_M=172_is_complex_gaussian=True
Running experiment arma_no_dependence_multidim with id N=3000_M=98_is_complex_gaussian=True
Running experiment arma_no_dependence_multidim with id N=4000_M=119_is_complex_gaussian=True
Running experiment arma_no_dependence_multidim with id N=8000_M=188_is_complex_gaussian=True
Running experiment arma_no_dependence_multidim with id N=5000_M=138_is_complex_gaussian=True
Running experiment arma_no_dependence_multidim with id N=1000_M=47_is_complex_gaussian=True
Running experiment arma_no_dependence_multidim with id N=2000_M=75_is_complex_gaussian=True


Unnamed: 0,N,is_positive
6,1000,0.8
3,2000,0.7
0,3000,0.8
2,4000,0.7
4,5000,0.8
1,6000,0.8
5,7000,0.8
7,8000,1.0


# Power check

## ARMA covariance structure

In [None]:
def run(
    N,
    M,
    time_ar,
    time_ma,
    space_ar_1,
    space_ar_2,
    space_ma_1,
    space_ma_2,
    is_complex_gaussian,
):
    y = generate_arma_sample(
        N,
        M,
        time_ar,
        time_ma,
        is_complex_gaussian,
    )
    # Cov_1 = generate_arma_covariance_matrix(M // 2, space_ar_1, space_ma_1)
    # Cov_2 = generate_arma_covariance_matrix(M - M // 2, space_ar_2, space_ma_2)
    # Cov = np.block(
    #     [
    #         [Cov_1, np.zeros((M // 2, M - M // 2))],
    #         [np.zeros((M - M // 2, M // 2)), Cov_2],
    #     ]
    # )
    Cov = generate_arma_covariance_matrix(M, space_ar_1, space_ma_1)
    y = y @ sqrtm(Cov)

    gpy_result = GPY(y, fs, is_complex_gaussian=is_complex_gaussian)

    return {
        "N": N,
        "M": M,
        "time_ar": time_ar,
        "time_ma": time_ma,
        "space_ar_1": space_ar_1,
        "space_ma_1": space_ma_1,
        "space_ar_2": space_ar_2,
        "space_ma_2": space_ma_2,
        "is_complex_gaussian": is_complex_gaussian,
        "test_statistic": gpy_result.test_statistic,
        "p_value": gpy_result.p_value,
    }

In [9]:
# ARMA covariance structure
space_ar = 0.05

tasks = []
for N in N_range:
    B = int(N**alpha)
    if B % 2 == 0:
        B -= 1
    M = int(B * c)

    task = (
        run,
        n_repeats,
        f"ar_dependence",
        {
            "N": int(N),
            "M": int(M),
            "time_ar": time_ar,
            "time_ma": time_ma,
            "space_ar_1": space_ar,
            "space_ma_1": 0,
            "space_ar_2": space_ar,
            "space_ma_2": 0,
            "is_complex_gaussian": is_complex_gaussian,
        },
    )

    tasks.append(task)

# shuffle the tasks to distribute the load over time
np.random.shuffle(tasks)
results = Parallel(n_jobs=-1)(delayed(run_batch)(*task) for task in tqdm(tasks))

df = load_batch(f"ar_dependence").drop(["time_ar", "time_ma"])
pivot = (
    df.with_columns(
        [
            # (pl.col("p_value") < level).alias("is_test_positive").cast(pl.Float64),
            (
                (pl.col("test_statistic") < chi2_values[0])
                | (pl.col("test_statistic") > chi2_values[1])
            )
            .alias("is_test_positive")
            .cast(pl.Float64),
        ]
    )
    .group_by(["N"])
    .mean()
    .sort("N")
    .to_pandas()
)
pivot[["N", "is_test_positive"]].sort_values("N")

100%|██████████| 8/8 [00:00<00:00, 11413.07it/s]


Running experiment ar_dependence with id N=5000_M=137_time_ar=0.1_time_ma=0.5_space_ar_1=0.05_space_ma_1=0_space_ar_2=0.05_space_ma_2=0_is_complex_gaussian=True
Running experiment ar_dependence with id N=4000_M=118_time_ar=0.1_time_ma=0.5_space_ar_1=0.05_space_ma_1=0_space_ar_2=0.05_space_ma_2=0_is_complex_gaussian=True
Running experiment ar_dependence with id N=3000_M=98_time_ar=0.1_time_ma=0.5_space_ar_1=0.05_space_ma_1=0_space_ar_2=0.05_space_ma_2=0_is_complex_gaussian=True
Running experiment ar_dependence with id N=7000_M=171_time_ar=0.1_time_ma=0.5_space_ar_1=0.05_space_ma_1=0_space_ar_2=0.05_space_ma_2=0_is_complex_gaussian=True
Running experiment ar_dependence with id N=6000_M=155_time_ar=0.1_time_ma=0.5_space_ar_1=0.05_space_ma_1=0_space_ar_2=0.05_space_ma_2=0_is_complex_gaussian=True
Running experiment ar_dependence with id N=2000_M=74_time_ar=0.1_time_ma=0.5_space_ar_1=0.05_space_ma_1=0_space_ar_2=0.05_space_ma_2=0_is_complex_gaussian=True
Running experiment ar_dependence wit

Unnamed: 0,N,is_test_positive
0,1000,0.0886
1,2000,0.095
2,3000,0.0871
3,4000,0.0861
4,5000,0.0801
5,6000,0.0844
6,7000,0.087
7,8000,0.0843


In [18]:
# ARMA covariance structure
space_ar = 0
space_ma = 0.5
n_repeats = 100

tasks = []
for N in N_range:
    B = int(N**alpha)
    if B % 2 == 0:
        B -= 1
    M = int(B * c)

    task = (
        run,
        n_repeats,
        f"ar_dependence",
        {
            "N": int(N),
            "M": int(M),
            "time_ar": time_ar,
            "time_ma": time_ma,
            "space_ar_1": space_ar,
            "space_ma_1": space_ma,
            "space_ar_2": space_ar,
            "space_ma_2": space_ma,
            "is_complex_gaussian": is_complex_gaussian,
        },
    )

    tasks.append(task)

# shuffle the tasks to distribute the load over time
np.random.shuffle(tasks)
results = Parallel(n_jobs=-1)(delayed(run_batch)(*task) for task in tqdm(tasks))

df = (
    load_batch(f"ar_dependence")
    .drop(["time_ar", "time_ma"])
    .filter(pl.col("space_ar_1") == 0)
    .filter(pl.col("space_ma_1") == 0.5)
)
pivot = (
    df.with_columns(
        [
            # (pl.col("p_value") < level).alias("is_test_positive").cast(pl.Float64),
            (
                (pl.col("test_statistic") < chi2_values[0])
                | (pl.col("test_statistic") > chi2_values[1])
            )
            .alias("is_test_positive")
            .cast(pl.Float64),
        ]
    )
    .group_by(["N"])
    .mean()
    .sort("N")
    .to_pandas()
)
pivot[["N", "is_test_positive"]].sort_values("N")

100%|██████████| 8/8 [00:00<00:00, 7779.84it/s]


Unnamed: 0,N,is_test_positive
0,1000,0.17
1,2000,0.12
2,3000,0.13
3,4000,0.14
4,5000,0.13
5,6000,0.11
6,7000,0.11
7,8000,0.15


## Common random dependence


In [10]:
def run(
    N,
    M,
    time_ar,
    time_ma,
    mixing_scale,
    is_complex_gaussian,
):
    y = common_random_dependence_sample(
        N, M, time_ar, time_ma, mixing_scale, is_complex_gaussian
    )

    gpy_result = GPY(y, fs, is_complex_gaussian=is_complex_gaussian)

    return {
        "N": N,
        "M": M,
        "time_ar": time_ar,
        "time_ma": time_ma,
        "mixing_scale": mixing_scale,
        "is_complex_gaussian": is_complex_gaussian,
        "test_statistic": gpy_result.test_statistic,
        "p_value": gpy_result.p_value,
    }

In [11]:
# ARMA covariance structure
mixing_scale = 1.0

tasks = []
for N in N_range:
    B = int(N**alpha)
    if B % 2 == 0:
        B -= 1
    M = int(B * c)

    task = (
        run,
        n_repeats,
        f"common_random_dependence",
        {
            "N": int(N),
            "M": int(M),
            "time_ar": time_ar,
            "time_ma": time_ma,
            "mixing_scale": mixing_scale,
            "is_complex_gaussian": is_complex_gaussian,
        },
    )

    tasks.append(task)

# shuffle the tasks to distribute the load over time
np.random.shuffle(tasks)
results = Parallel(n_jobs=-1)(delayed(run_batch)(*task) for task in tqdm(tasks))

df = load_batch(f"common_random_dependence").drop(["time_ar", "time_ma"])
pivot = (
    df.with_columns(
        [
            # (pl.col("p_value") < level).alias("is_test_positive").cast(pl.Float64),
            (
                (pl.col("test_statistic") < chi2_values[0])
                | (pl.col("test_statistic") > chi2_values[1])
            )
            .alias("is_test_positive")
            .cast(pl.Float64),
        ]
    )
    .group_by(["N"])
    .mean()
    .sort("N")
    .to_pandas()
)
pivot[["N", "is_test_positive"]].sort_values("N")

100%|██████████| 8/8 [00:00<00:00, 10652.20it/s]


Running experiment common_random_dependence with id N=1000_M=47_time_ar=0.1_time_ma=0.5_mixing_scale=1.0_is_complex_gaussian=True
Running experiment common_random_dependence with id N=6000_M=155_time_ar=0.1_time_ma=0.5_mixing_scale=1.0_is_complex_gaussian=True
Running experiment common_random_dependence with id N=4000_M=118_time_ar=0.1_time_ma=0.5_mixing_scale=1.0_is_complex_gaussian=True
Running experiment common_random_dependence with id N=7000_M=171_time_ar=0.1_time_ma=0.5_mixing_scale=1.0_is_complex_gaussian=True
Running experiment common_random_dependence with id N=2000_M=74_time_ar=0.1_time_ma=0.5_mixing_scale=1.0_is_complex_gaussian=True
Running experiment common_random_dependence with id N=3000_M=98_time_ar=0.1_time_ma=0.5_mixing_scale=1.0_is_complex_gaussian=True
Running experiment common_random_dependence with id N=8000_M=187_time_ar=0.1_time_ma=0.5_mixing_scale=1.0_is_complex_gaussian=True
Running experiment common_random_dependence with id N=5000_M=137_time_ar=0.1_time_ma=0

Unnamed: 0,N,is_test_positive
0,1000,0.1844
1,2000,0.1912
2,3000,0.1852
3,4000,0.1842
4,5000,0.1841
5,6000,0.1875
6,7000,0.1962
7,8000,0.1879


## Factor model 

In [19]:
def run(
    N,
    M,
    r,
    snr,
    time_ar,
    time_ma,
    is_complex_gaussian,
):
    y = factor_model_sample(N, M, r, time_ar, time_ma, snr, is_complex_gaussian)
    gpy_result = GPY(y, fs)
    return {
        "N": N,
        "M": M,
        "r": r,
        "snr": snr,
        "test_statistic": gpy_result.test_statistic,
        "p_value": gpy_result.p_value,
    }

In [None]:
snr_range = [0.05, 0.5]
r = 1

tasks = []
for snr in snr_range:
    for N in N_range:
        B = int(N**alpha)
        if B % 2 == 0:
            B -= 1
        M = int(B * c)

        task = (
            run,
            n_repeats,
            f"factor_model",
            {
                "N": int(N),
                "M": int(M),
                "r": r,
                "snr": snr,
                "time_ar": time_ar,
                "time_ma": time_ma,
                "is_complex_gaussian": is_complex_gaussian,
            },
        )

        tasks.append(task)

# shuffle the tasks to distribute the load over time
np.random.shuffle(tasks)
results = Parallel(n_jobs=-1)(delayed(run_batch)(*task) for task in tqdm(tasks))

df = load_batch(f"factor_model")
pivot = (
    df.with_columns(
        [
            # (pl.col("p_value") < level).alias("is_test_positive").cast(pl.Float64),
            (
                (pl.col("test_statistic") < chi2_values[0])
                | (pl.col("test_statistic") > chi2_values[1])
            )
            .alias("is_test_positive")
            .cast(pl.Float64),
        ]
    )
    .group_by(["N", "snr"])
    .mean()
    .to_pandas()
    .pivot(index="N", columns=["snr"], values="is_test_positive")
)
pivot

100%|██████████| 16/16 [00:00<00:00, 15827.56it/s]


Running experiment factor_model with id N=6000_M=155_r=1_snr=0.5_time_ar=0.1_time_ma=0.5_is_complex_gaussian=True
Running experiment factor_model with id N=3000_M=98_r=1_snr=0.5_time_ar=0.1_time_ma=0.5_is_complex_gaussian=True
Running experiment factor_model with id N=4000_M=118_r=1_snr=0.5_time_ar=0.1_time_ma=0.5_is_complex_gaussian=True
Running experiment factor_model with id N=1000_M=47_r=1_snr=0.5_time_ar=0.1_time_ma=0.5_is_complex_gaussian=True
Running experiment factor_model with id N=7000_M=171_r=1_snr=0.5_time_ar=0.1_time_ma=0.5_is_complex_gaussian=True
Running experiment factor_model with id N=8000_M=187_r=1_snr=0.5_time_ar=0.1_time_ma=0.5_is_complex_gaussian=True
Running experiment factor_model with id N=2000_M=74_r=1_snr=0.5_time_ar=0.1_time_ma=0.5_is_complex_gaussian=True
Running experiment factor_model with id N=5000_M=137_r=1_snr=0.5_time_ar=0.1_time_ma=0.5_is_complex_gaussian=True


snr,0.05,0.10,0.50,1.00,5.00,100.00,1000.00
N,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
1000,0.0794,0.0732,0.2,0.19,0.13,0.19,0.1
2000,0.0826,0.0735,0.14,0.16,0.13,0.1,0.21
3000,0.0769,0.0771,0.16,0.15,0.12,0.19,0.13
4000,0.0752,0.0791,0.12,0.06,0.15,0.14,0.12
5000,0.0761,0.0752,0.12,0.13,0.08,0.12,0.17
6000,0.078,0.0787,0.11,0.12,0.14,0.15,0.09
7000,0.0775,0.0804,0.13,0.14,0.15,0.2,0.16
8000,0.0742,0.0861,0.11,0.14,0.13,0.12,0.15


In [26]:
10 * np.log10(snr_range)

array([-13.01029996,  -3.01029996])