In [1]:
import pandas as pd
import numpy as np
import itertools


import sys
import os

BASE_DIR = os.path.abspath(os.path.join(os.getcwd(), "../" * 2))
sys.path.insert(0, BASE_DIR)

from mod.data_process.numpy import cal_r2
from dataset.bivariate.data_generator import DataGenerator
from giefstat.estimate import cal_assoc

from script.一致性测试.setting import NS, FUNC_NAMES, AMPS

In [2]:
def load_data(func: str, N: int):
    data_gener = DataGenerator()
    x, y, _, _ = data_gener.gen_data(N, func, normalize=False)
    return x, y

In [4]:
methods = [
    "PearsonCorr", "SpearmanCorr", "DistCorr", "MI-cut", "MI-qcut", "MI-Darbellay", "MI-KDE",
    "MI-GIEF", "MIC", "RMIC"]

repeats = 100
for method, N in itertools.product(methods, NS):
    results = None
    for i, func in enumerate(FUNC_NAMES):
        x, y = load_data(func, N)
        y_range = np.max(y) - np.min(y)
        print(i, func)
        arr = None
        for _ in range(repeats):
            # 加入随机幅度的噪声.
            amp = np.random.choice(AMPS)
            noise = np.random.uniform(-amp * y_range, amp * y_range, y.shape)
            y_noise = y.copy() + noise

            if method == "DistCorr":
                _arr = np.array(
                    [1 - cal_r2(y, y_noise), cal_assoc(x, y_noise, method)])
            elif method == "MI-GIEF":
                _arr = np.array(
                    [1 - cal_r2(y, y_noise), cal_assoc(x, y_noise, method, "c", "c", k=3)])
            elif method == "MI-qcut":
                _arr = np.array(
                    [1 - cal_r2(y, y_noise), cal_assoc(x, y_noise, method)])
            elif method == "MI-cut":
                _arr = np.array(
                    [1 - cal_r2(y, y_noise), cal_assoc(x, y_noise, method)])
            elif method == "MI-Darbellay":
                _arr = np.array(
                    [1 - cal_r2(y, y_noise), cal_assoc(x, y_noise, method)])
            elif method == "MI-KDE":
                _arr = np.array(
                    [1 - cal_r2(y, y_noise), cal_assoc(x, y_noise, method)])
            elif method == "MIC":
                _arr = np.array(
                    [1 - cal_r2(y, y_noise), cal_assoc(x, y_noise, method)])
            elif method == "PearsonCorr":
                _arr = np.array(
                    [1 - cal_r2(y, y_noise), cal_assoc(x, y_noise, method)])
            elif method == "RMIC":
                _arr = np.array(
                    [1 - cal_r2(y, y_noise), cal_assoc(x, y_noise, method, encode=False)])  # **** 不对x进行编码
            elif method == "SpearmanCorr":  
                _arr = np.array(
                    [1 - cal_r2(y, y_noise), cal_assoc(x, y_noise, method)])
            arr = _arr if arr is None else np.vstack((arr, _arr))
        results = pd.DataFrame(arr, columns=["noise", f"{func}"]) if results is None \
                else pd.concat([results, pd.DataFrame(arr, columns=["noise", f"{func}"])], axis=1)

    results.to_csv(
        f"{BASE_DIR}/script/一致性测试/file/info_measure_{method}_{N}.csv", index=False)

0 sin_high_freq
1 cos_high_freq
2 vary_freq_sin
3 vary_freq_cos
4 non_fourier_freq_sin
5 non_fourier_freq_cos
6 sin_low_freq
7 linear_periodic_med_freq
8 cubic
9 cubic_y_stretched
10 parabola
11 spike
12 lopsided_l_shaped
13 l_shaped
14 sigmoid
15 exp_base_10
16 exp_base_2
17 linear_periodic_high_freq_2
18 linear_periodic_high_freq
19 linear_periodic_low_freq
20 line
0 sin_high_freq
1 cos_high_freq
2 vary_freq_sin
3 vary_freq_cos
4 non_fourier_freq_sin
5 non_fourier_freq_cos
6 sin_low_freq
7 linear_periodic_med_freq
8 cubic
9 cubic_y_stretched
10 parabola
11 spike
12 lopsided_l_shaped
13 l_shaped
14 sigmoid
15 exp_base_10
16 exp_base_2
17 linear_periodic_high_freq_2
18 linear_periodic_high_freq
19 linear_periodic_low_freq
20 line
0 sin_high_freq
1 cos_high_freq
2 vary_freq_sin
3 vary_freq_cos
4 non_fourier_freq_sin
5 non_fourier_freq_cos
6 sin_low_freq
7 linear_periodic_med_freq
8 cubic
9 cubic_y_stretched
10 parabola
11 spike
12 lopsided_l_shaped
13 l_shaped
14 sigmoid
15 exp_base_10
