In [1]:
import numpy as np
import matplotlib.pyplot as plt
from abc import ABC, abstractmethod
from tqdm import tqdm

%matplotlib inline

In [2]:
def shermanMorrison(V:np.ndarray, x:np.ndarray):
    """
    ${V_t}^{-1} = V_{t-1}^{-1} - \frac{V_{t-1}^{-1}xx^T V_{t-1}^{-1}}{1 + x^T V_{t-1}^{-1} x}$
    V: inverse of old gram matrix, corresponding to $V_{t-1}$.
    x: a new observed context
    return: inverse of new gram matrix
    """
    numerator = np.einsum("ij, j, k, kl -> il", V, x, x, V)
    denominator = np.einsum("i, ij, j ->", x, V, x)
    return V - (numerator / denominator)


def l2norm(v:np.ndarray):
    v = v.flatten()
    return np.sqrt(np.sum(v ** 2))


def matrix_norm(m:np.ndarray, frobenius:bool):
    assert len(m.shape) == 2
    if frobenius:
        ## frobenius norm
        return np.linalg.norm(m)
    ## spectral norm
    return np.linalg.norm(m, 2)


def covariance_generator(d:int):
    rnd = np.random.uniform(low=-1, high=1, size=d*d).reshape(d, d)
    ## make a symmetric matrix
    sym = (rnd + rnd.T) / 2
    ## make positive semi-definite  
    return sym @ sym.T


def minmax(v:np.ndarray, bound:float=1.):
    min = np.min(v)
    max = np.max(v)
    return ((v - min) / (max - min)) * bound


def left_pseudo_inverse(A:np.ndarray, bound:float=None):
    d, k = A.shape
    u, A_sig, v_T = np.linalg.svd(A)
    
    B_sig = np.zeros((k, d))
    for i in range(k):
        B_sig[i, i] = 1 / A_sig[i]
    
    B = v_T.T @ B_sig @ u.T
    if bound:
        max_singular = matrix_norm(B, frobenius=False)
        B *= bound / max_singular
    
    return B


def rademacher(size:int):
    """
    Generate Rademacher random variables.

    Args:
    size (int): Number of random variables to generate.

    Returns:
    numpy.ndarray: An array of Rademacher random variables.
    """
    return 2 * np.random.randint(0, 2, size) - 1


def subgaussian_noise(distribution:str, size:int, var:float=None):
    """
    distribution (str): the distribution to sample a sub-Gaussian noise
    size (int): The number of total rounds (T)
    var (float): The variance proxy of the noise
    """
    if not var:
        assert distribution in ["gaussian", "uniform"]
    
    if distribution == "gaussian":
        if not var:
            var = 1
        noise = np.random.normal(loc=0, scale=var, size=size) 
    elif distribution == "uniform":
        if not var:
            low = -1
            high = 1
            var = ((high - low) ** 2) / 12
        else:
            low = -np.sqrt(3 * var)
            high = np.sqrt(3 * var)
        noise = np.random.uniform(low=low, high=high, size=size)
    else:
        var = 1
        noise = rademacher(size=size)
    return noise, var

In [3]:
def sampleGenerator(num_samples:int, obs_dim:int, latent_dim:int, noise_var:float=None, noise_dist:str="gaussian",
                    feature_bound:float=None, matrix_bound:float=None, disjoint:bool=True, seed:int=7777):
    """
    num_samples: the number of samples
    obs_dim: the dimension of an observable context
    latent_dim: the dimension of a latent context
    noise_var: variance of the noise added to the observable contexts
    feature_bound: the upper bound of the norm of the observable contexts
    matrix_bound: the upper bound of the spectral norm of the encoder matrix
    disjoint: represents if the latent features are disjoint are not
    """
    np.random.seed(seed)
    
    ## latent features
    if disjoint:
        scalar = np.random.uniform(low=0, high=2)
        Z_cov = scalar * np.identity(latent_dim)
    else:
        Z_cov = covariance_generator(latent_dim)
    Zs = [np.random.multivariate_normal(mean=np.ones(latent_dim), cov=Z_cov) for _ in range(num_samples)]
    Z = np.concatenate(Zs, axis=0).reshape(num_samples, latent_dim)
    
    ## mapping noise
    context_noise, var = subgaussian_noise(distribution=noise_dist, size=num_samples*obs_dim, var=noise_var)
    context_noise = context_noise.reshape(-1, d)
    
    ## latent mapping decoder and observable features
    A_cov = covariance_generator(obs_dim*latent_dim)
    A = np.random.multivariate_normal(mean=np.zeros(obs_dim*latent_dim), cov=A_cov).reshape(obs_dim, latent_dim)
    X = Z @ A.T + context_noise
    if feature_bound:
        for i in range(X.shape[0]):
            row = X[i, :]
            norm = l2norm(row)
            row *= (feature_bound / norm)
            X[i, :] = row
    
    ## encoder bounded by C_B
    B = left_pseudo_inverse(A, bound=matrix_bound)
    
    return B, Z, X, context_noise

In [4]:
def param_generator(dimension:int, distribution:str="gaussian", bound:float=1.):
    if distribution == "gaussian":
        cov = covariance_generator(dimension)
        mean = np.random.randint(-1, 2, size=dimension)
        param = np.random.multivariate_normal(mean=mean, cov=cov)
    else:
        # uniform
        param = np.random.random(dimension)
        
    if bound:
        norm = l2norm(param)
        param *= (bound / norm)
    
    return param

In [5]:
## hyper-parameters
N = 50000         # number of samples
d = 8             # observable dimension
k = 5             # latent dimension
T = 10000         # total time horizon
feature_bound = 1 
param_bound = 1
matrix_bound = 1  # bound of the encoder

# $\sigma_{\eta} = \frac{1}{\sqrt{T}}$

In [7]:
"""
sampleGenerator(num_samples:int, obs_dim:int, latent_dim:int, noise_var:float=None, noise_dist:str="gaussian",
                    feature_bound:float=None, matrix_bound:float=None, disjoint:bool=True, seed:int=7777)
"""
## not disjoint case
VAR = 1 / np.sqrt(T)
decoder, Z, X, context_noise = sampleGenerator(num_samples=N, obs_dim=d, latent_dim=k, noise_var=VAR,
                                              feature_bound=feature_bound, matrix_bound=matrix_bound,
                                              disjoint=False)
print(Z.shape, X.shape, context_noise.shape)

(50000, 5) (50000, 8) (50000, 8)
