In [8]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

In [16]:
import torch

from data_loaders import *

from missing_process.utils_generation import * 

In [12]:
def produce_NA(X, p_miss, mecha="OTselfmask", p_obs=0.2, q=0.25):
    """
    Generate missing values for specifics missing-data mechanism and proportion of missing values. 
    
    Parameters
    ----------
    X : torch.DoubleTensor or np.ndarray, shape (n, d)
        Data for which missing values will be simulated.
        If a numpy array is provided, it will be converted to a pytorch tensor.
    p_miss : float
        Proportion of missing values to generate for variables which will have missing values.
    mecha : str, 
            Indicates the missing-data mechanism to be used. "MCAR" by default, "MAR", "MNAR" or "MNARsmask"
    opt: str, 
         For mecha = "MNAR", it indicates how the missing-data mechanism is generated: using a logistic regression ("logistic"), quantile censorship ("quantile") or logistic regression for generating a self-masked MNAR mechanism ("selfmasked").
    p_obs : float
            If mecha = "MAR", or mecha = "MNAR" with opt = "logistic" or "quanti", proportion of variables with *no* missing values that will be used for the logistic masking model.
    q : float
        If mecha = "MNAR" and opt = "quanti", quantile level at which the cuts should occur.
    
    Returns
    ----------
    A dictionnary containing:
    'X_init': the initial data matrix.
    'X_incomp': the data with the generated missing values.
    'mask': a matrix indexing the generated missing values.s
    """
    
    to_torch = torch.is_tensor(X) ## output a pytorch tensor, or a numpy array
    if not to_torch:
        X = X.astype(np.float32)
        X = torch.from_numpy(X)
    
    if mecha == "MAR":
        print("MAR")
        mask = MAR_mask(X, p_miss, p_obs).double()

    elif mecha == "OTlogistic":
        print("OTlogistic")
        mask = MNAR_mask_logistic(X, p_miss, p_obs).double()

    elif mecha == "OTquantile":
        print("OTquantile")
        mask = MNAR_mask_quantiles(X, p_miss, q, 1-p_obs).double()

    elif mecha == "OTselfmask":
        print("OTselfmask")
        mask = MNAR_self_mask_logistic(X, p_miss).double()

    else:
        print("MCAR")
        mask = (torch.rand(X.shape) < p_miss).double()
    
    X_nas = X.clone()
    X_nas[mask.bool()] = np.nan
    
    return {'X_init': X.double(), 'X_incomp': X_nas.double(), 'mask': mask}

In [84]:
X=scale(dataset_loader("wine_quality_white")["data"])


to_torch = torch.is_tensor(X) ## output a pytorch tensor, or a numpy array
if not to_torch:
    X = X.astype(np.float32)
    X = torch.from_numpy(X)


4898 11


tensor([[False,  True, False,  ...,  True,  True, False],
        [False, False,  True,  ..., False, False,  True],
        [False, False, False,  ...,  True, False,  True],
        ...,
        [False, False, False,  ..., False,  True, False],
        [False, False, False,  ..., False, False, False],
        [False, False, False,  ..., False,  True, False]])

In [None]:

X_logisitc = MNAR_mask_logistic(X, 0.5)

In [107]:
def MNAR_mask_logistic(X, p, p_params =0.3, exclude_inputs=True):

    n, d = X.shape


    to_torch = torch.is_tensor(X) ## output a pytorch tensor, or a numpy array
    if not to_torch:
        X = torch.from_numpy(X)

    mask = torch.zeros(n, d).bool() if to_torch else np.zeros((n, d)).astype(bool)
    # control SPlit
    d_params = max(int(p_params * d), 1) if exclude_inputs else d ## number of variables used as inputs (at least 1)
    d_na = d - d_params if exclude_inputs else d ## number of variables masked with the logistic model
    idxs_params = np.random.choice(d, d_params, replace=False) if exclude_inputs else np.arange(d) # 任选三个parameter obs

    idxs_nas = np.array([i for i in range(d) if i not in idxs_params]) if exclude_inputs else np.arange(d) # 剩下的 feature miss

    coeffs = pick_coeffs(X, idxs_params, idxs_nas)
    intercepts = fit_intercepts(X[:, idxs_params], coeffs, p)


    ps = torch.sigmoid(X[:, idxs_params].mm(coeffs) + intercepts)
    
    ber = torch.rand(n, d_na)
    print(idxs_params,idxs_nas)
    print(ber.shape,ps.shape)
    mask[:, idxs_nas] = ber < ps

    print(mask.shape)

    if exclude_inputs:
        mask[:, idxs_params] = torch.rand(n, d_params) < p

    return mask
def pick_coeffs(X, idxs_obs=None, idxs_nas=None, self_mask=False):
    n, d = X.shape
    if self_mask:
        coeffs = torch.randn(d)
        Wx = X * coeffs
        coeffs /= torch.std(Wx, 0)
    else:
        d_obs = len(idxs_obs)
        d_na = len(idxs_nas)
        coeffs = torch.randn(d_obs, d_na) #dimension
        Wx = X[:, idxs_obs].mm(coeffs)
        coeffs /= torch.std(Wx, 0, keepdim=True)
    return coeffs

MNAR_mask_logistic(X, 0.5)

[2 4 1] [ 0  3  5  6  7  8  9 10]
torch.Size([4898, 8]) torch.Size([4898, 8])
torch.Size([4898, 11])


tensor([[ True, False,  True,  ...,  True, False,  True],
        [False, False,  True,  ...,  True,  True,  True],
        [ True, False,  True,  ..., False, False, False],
        ...,
        [ True, False, False,  ...,  True, False,  True],
        [False, False,  True,  ...,  True,  True,  True],
        [False, False,  True,  ...,  True, False,  True]])

In [33]:
def MNAR_self_mask_logistic(X, p):

    n, d = X.shape

    to_torch = torch.is_tensor(X) ## output a pytorch tensor, or a numpy array
    if not to_torch:
        X = torch.from_numpy(X)

    ### Variables will have NA proportions that depend on those observed variables, through a logistic model
    ### The parameters of this logistic model are random.

    ### Pick coefficients so that W^Tx has unit variance (avoids shrinking)
    coeffs = pick_coeffs(X, self_mask=True)
    ### Pick the intercepts to have a desired amount of missing values
    intercepts = fit_intercepts(X, coeffs, p, self_mask=True)

    ps = torch.sigmoid(X * coeffs + intercepts)

    ber = torch.rand(n, d) if to_torch else np.random.rand(n, d)
    mask = ber < ps if to_torch else ber < ps.numpy()

    return mask

In [40]:
coeffs = pick_coeffs(X, self_mask=True)

In [None]:
def fit_intercepts(X, coeffs, p, self_mask=False):
    if self_mask:
        d = len(coeffs)
        intercepts = torch.zeros(d)
        for j in range(d):
            def f(x):
                return torch.sigmoid(X * coeffs[j] + x).mean().item() - p
            intercepts[j] = optimize.bisect(f, -50, 50)
    else:
        d_obs, d_na = coeffs.shape
        intercepts = torch.zeros(d_na)
        for j in range(d_na):
            def f(x):
                return torch.sigmoid(X.mv(coeffs[:, j]) + x).mean().item() - p
            intercepts[j] = optimize.bisect(f, -50, 50)
    return intercepts


In [42]:

d = len(coeffs)
intercepts = torch.zeros(d)


In [48]:
intercepts

tensor([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])

In [47]:
X

tensor([[ 7.4000,  0.7000,  0.0000,  ...,  3.5100,  0.5600,  9.4000],
        [ 7.8000,  0.8800,  0.0000,  ...,  3.2000,  0.6800,  9.8000],
        [ 7.8000,  0.7600,  0.0400,  ...,  3.2600,  0.6500,  9.8000],
        ...,
        [ 6.3000,  0.5100,  0.1300,  ...,  3.4200,  0.7500, 11.0000],
        [ 5.9000,  0.6450,  0.1200,  ...,  3.5700,  0.7100, 10.2000],
        [ 6.0000,  0.3100,  0.4700,  ...,  3.3900,  0.6600, 11.0000]])

In [38]:
intercepts = fit_intercepts(X, coeffs, 0.5, self_mask=True)

ValueError: f(a) and f(b) must have different signs