In [70]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.stats import norm, chi2, truncnorm
from tqdm import trange
from sklearn import linear_model
from sklearn.linear_model import LassoCV
from sklearn.linear_model import Lasso

import matplotlib.pyplot as plt

# Functions

In [None]:
import numpy as np
from sklearn.linear_model import LogisticRegression

def density_ratio_estimate_prob(D_nu, D_de):
    l_nu = np.ones(len(D_nu))
    l_de = np.zeros(len(D_de))
    
    l = np.concatenate((l_nu, l_de))
    D = np.concatenate((D_nu, D_de))
    
    #fit losgistic model
    C = 0.1
    model = LogisticRegression(penalty= 'l2', C= C)
    model.fit(D, l)
    
    # get density ratios for all samples
    density_ratios = (model.predict_proba(D_de)[:, 1] / model.predict_proba(D_nu)[:, 0])*(len(D_de)/len(D_nu))
    
    return density_ratios


def Covariate_Shift_Weight(x, z, v = 0):
    return np.exp(((x - z @ s)**2 - (x - z @ t)**2)/2)

def Model_X(z, v):
    return z @ u + np.random.normal(0, 1, 1)

def T_statistic(y, x, z, v = 0):
    d_y = reg.predict(z.reshape(1,20))
    d_x = z @ u
    return np.abs((y-d_y)*(x-d_x))

def Conterfeits(y, x, z, v = 0, L = 5, K = 10):
    M = L * K - 1
    cnt = 0 
    
    for i in range(M):
        x_ = Model_X(z, v)
        if T_statistic(y, x, z, v) > T_statistic(y, x_, z, v):
            cnt=cnt+1
            
    return cnt // K

def PCRtest(Y, X, Z, V = 0, L = 5, K = 20, covariate_shift = True):
    n = Y.size
    W = np.array([0.0]*L)
    
    for j in range(n):
        y, x, z, v = Y[j], X[j], Z[j], 0
        if covariate_shift == True:
            W[Conterfeits(y, x, z, v, L, K)] += sample_density_ratio[j]
        if covariate_shift == False:
            W[Conterfeits(y, x, z, v, L, K)] += 1
            
    return W, L/n * np.dot(W - n/L, W - n/L)



In [124]:
import scipy.stats as stats

def chi_squared_p_value(chi_squared_statistic, df):
    """
    Calculate the p-value from a chi-squared distribution.

    Parameters:
    - chi_squared_statistic (float): The observed chi-squared test statistic.
    - df (int): The degrees of freedom.

    Returns:
    - p_value (float): The calculated p-value.
    """
    p_value = 1 - stats.chi2.cdf(chi_squared_statistic, df)
    return p_value

def monte_carlo_p_value(n_samples, covariance_matrix, L, quantile):
    """
    Calculate the probability corresponding to a given quantile using the Monte Carlo method.

    Parameters:
    - n_samples (int): The number of Monte Carlo samples to generate.
    - covariance_matrix (ndarray): The covariance matrix of the random vector X.
    - L (int): The number of components to sum.
    - quantile (float): The quantile value.

    Returns:
    - probability (float): The estimated probability corresponding to the quantile.
    """
    count = 0
    for _ in range(n_samples):
        sample = np.random.multivariate_normal(np.zeros(L), covariance_matrix)
        squared_sum = np.sum(sample**2)
        if squared_sum <= quantile:
            count += 1

    probability = count / n_samples
    return 1-probability



def generate_covariance_matrix(L):
    """
    Generate a covariance matrix with the specified properties.

    Parameters:
    - L (int): The size of the covariance matrix.

    Returns:
    - covariance_matrix (ndarray): The generated covariance matrix.
    """
    covariance_matrix = np.full((L, L), 1/L)  # Fill all entries with 1/L
    np.fill_diagonal(covariance_matrix, 1 - 1/(L**2))  # Set diagonal entries to 1 - 1/L^2
    return covariance_matrix

# 生成数据

In [95]:
n, p=1000, 20
s = np.random.normal(0, 1, p)
t = s + np.random.normal(0, 0.1, p)
u = np.random.normal(0, 1, p)

In [96]:
def generate(n, p, s, t, u, Alpha = 0):
    Z_source, Z_target = np.zeros((n, p)), np.zeros((n, p))
    V_source, V_target = 0, 0
    for i in range(n):
        Z_source[i] = np.random.normal(0, 1, p)
        Z_target[i] = np.random.normal(0.1, 1, p)
        
    X_source = Z_source @ u + np.random.normal(0, 1, n)
    X_target = Z_target @ u + np.random.normal(0, 1, n)
    
    V_source = Z_source @ s + X_source + np.random.normal(0, 1, n)
    V_target = Z_target @ t - X_target + np.random.normal(0, 1, n)
    
    Y_source, Y_target = np.zeros(n), np.zeros(n)
    for i in range(n):
        Y_source[i] = np.sin(Z_source[i].sum() + X_source[i] + V_source[i]) + np.random.normal(0, 1, 1) + Alpha * X_source[i]
        Y_target[i] = np.sin(Z_target[i].sum() + X_target[i] + V_target[i]) + np.random.normal(0, 1, 1) + Alpha * X_target[i]
    return Y_source.reshape(-1,1), X_source.reshape(-1,1), V_source.reshape(-1,1), Z_source,\
           Y_target.reshape(-1,1), X_target.reshape(-1,1), V_target.reshape(-1,1), Z_target

In [97]:
Y_source, X_source, V_source, Z_source, Y_target, X_target, V_target, Z_target = generate(n, p, s, t, u, 0)

In [98]:
reg = LassoCV().fit(Z_source,Y_source)
Y_source[3]

  y = column_or_1d(y, warn=True)


array([0.90754252])

In [101]:
D_s = np.concatenate((X_source, Z_source, V_source), axis = 1)
D_t = np.concatenate((X_target, Z_target, V_target), axis = 1)
sample_density_ratio = density_ratio_estimate_prob(D_t, D_s)

In [117]:
chi_squared_p_value(5.32, 4)

0.25601049158543854

# Test procedure

*CompQuadForm package 

In [162]:
PCRtest(Y_source, X_source, Z_source, L = 5, K = 20, covariate_shift = True)

(array([208.90359482, 219.87989041, 220.74199501, 211.18297793,
        205.98518229]),
 5.32797901428065)

In [146]:
cov1 = generate_covariance_matrix(10)

In [163]:
monte_carlo_p_value(100000, cov1, 10, 4.7)

0.90033

In [161]:
PCRtest(Y_target, X_target, Z_target, L = 10, K = 20, covariate_shift = False)

(array([110., 101., 108., 102., 106., 102.,  76., 108.,  92.,  95.]), 9.38)

In [None]:
PCRtest(Y_source, X_source, Z_source, L = 10, K = 20, covariate_shift = False)

In [None]:
cov2 = generate_covariance_matrix(10)

In [150]:
chi_squared_p_value(9.34,4)

0.0531407682181243

In [13]:
def int_(x):
    if x >= 80: return 79
    return int(x)

def Many_Tests(m, Alpha):
    X = []
    n, p=500, 20
    for i in trange(m):
        s = np.random.normal(0, 1, p)
        t = s + np.random.normal(0, 0.1, p)
        u = np.random.normal(0, 1, p)
        Y_source, X_source, V_source, Z_source, Y_target, X_target, V_target, Z_target = generate(n, p, s, t, u, Alpha)
        u, v = PCRtest(Y_source, X_source, Z_source, L = 5, K = 20, covariate_shift = False)
        X.append(v)
    return X

In [None]:
*write the function

In [None]:
def multiple_test():

In [14]:
H_0 = Mutiple_Tests(10, 1)
H_1 = Mutiple_Tests(10, 0)

100%|██████████| 10/10 [00:33<00:00,  3.38s/it]
100%|██████████| 10/10 [00:32<00:00,  3.26s/it]


In [15]:
H_1.sort()
H_0.sort()

In [16]:
H_0

[1264.94,
 1304.98,
 1354.48,
 1370.3,
 1370.8,
 1411.74,
 1428.7,
 1428.7,
 1429.3,
 1445.44]

In [17]:
def Density_Variance(n):
    a, b=0, 0
    for i in range(n):
        z = np.random.normal(0, 1, p)
        x = z @ s + np.random.normal(0, 1, 1)
        a += Covariate_Shift_Weight(x, z)**2
        b += np.exp((z @ t - z @ s)**2)
    return a/n, b/n

In [18]:
Density_Variance(100000)

ValueError: matmul: Input operand 1 has a mismatch in its core dimension 0, with gufunc signature (n?,k),(k,m?)->(n?,m?) (size 1 is different from 20)

In [None]:
a=[]
for i in range(50):
    b=PCRtest(Y_source, X_source, Z_source, L = 5, K = 20, covariate_shift = False)
    a.append(b)

In [None]:
a.sort
a

# Power simulation