# Detecting Independence

In the paper "Neural Conditional Probability for Uncertainty Quantification" by Kostic et al., it is claimed that the conditional expectation operator may be used for detecting the independence of two random variables by checking if it is zero. Here, we show this equivaliance in practice.

## Dataset

In [1]:
import torch
from torch.utils.data import TensorDataset, DataLoader, random_split


def make_dataset(n_samples: int = 200, t: float = 0.0):
    """Draw sample from data model Y = tX + (1-t)X_, where X and X_ are independent gaussians."""
    X = torch.normal(mean=0, std=1, size=(n_samples, 1))
    X_ = torch.normal(mean=0, std=1, size=(n_samples, 1))
    Y = t * X + (1 - t) * X_

    ds = TensorDataset(X, Y)

    # Split data into train and val sets
    train_ds, val_ds = random_split(ds, [0.85, 0.15])

    return train_ds, val_ds

## NCP Architecture

In [2]:
from torch import Tensor
from torch.nn import Module

from linear_operator_learning.nn import MLP


class NCP(Module):
    """Neural Conditional Probability in PyTorch.

    Args:
        embedding_x (Module): Neural embedding of x.
        embedding_y (Module): Neural embedding of y.
        matrix_form (str, optional): Either 'dense' of 'sparse'. Defaults to 'dense'.
        learnable_matrix (bool, optional): Whether the matrix layer is learnable. Defaults to True.
    """

    def __init__(
        self,
        embedding_x: Module,
        embedding_y: Module,
        matrix_form: str = "dense",
        learnable_matrix: bool = True,
    ) -> None:
        super().__init__()

        self.U = embedding_x
        self.V = embedding_y
        self.S = NCP.make_matrix_layer(matrix_form, learnable_matrix)

        # Register buffers for the statistics of the latent variables u and v.
        self._register_stats_buffers()

    @staticmethod
    def make_matrix_layer(matrix_form: str, learnable_matrix: bool) -> Module:
        """Creates a module for the truncated operator's matrix form. See the class docs for the arguments."""
        if matrix_form != "dense" or not learnable_matrix:
            raise NotImplementedError(
                "This NCP implementation only supports dense learnable matrix layer."
            )

    def _register_stats_buffers() -> None:
        pass


## Training NCP

In [None]:
import lightning as L
from torch.optim import Adam, Optimizer
from lightning.pytorch.loggers import CSVLogger
from linear_operator_learning.nn import L2ContrastiveLoss


class NCPTrainingModule(L.LightningModule):
    def __init__(
        self,
        # Hack to store the results of different runs without heavy machinery.
        results: dict,
        run_id: tuple,
        # NCP training interface begins here:
        ncp: NCP,
        loss: Module = L2ContrastiveLoss,
        loss_kwargs: dict = {"gamma": 1e-3},
        optimizer: Optimizer = Adam,
        optimizer_kwargs: dict = {"lr", 5e-4},
    ):
        super().__init__()
        self.ncp = ncp
        self.loss = loss(**loss_kwargs)
        self._optimizer = optimizer
        self._optimizer_kwargs = optimizer_kwargs

    def configure_optimizers(self):
        return self._optimizer(self.parameters, **self._optimizer_kwargs)

    def training_step(self, batch, batch_idx):
        out = self.ncp(*batch)
        loss = self.loss(*out)
        self.log("loss/train", loss, prog_bar=False)
        return loss

    def validation_step(self, batch, batch_idx):
        out = self.ncp(*batch)
        loss = self.loss(*out)
        self.log("loss/val", loss, prog_bar=False)
        return loss

## Detection happens here

In [None]:
from pathlib import Path

from lightning import seed_everything
from lightning.pytorch.callbacks import EarlyStopping, ModelCheckpoint

RUN_PATH = Path("runs")
RUN_PATH.mkdir(exist_ok=True)

SEED = 1
REPEATS = 1
BATCH_SIZE = 2048
N_SAMPLES = 10 * BATCH_SIZE
NCP_PARAMS = dict(
    output_shape=2,
    n_hidden=2,
    layer_size=32,
    activation=torch.nn.ELU,
    bias=False,
    iterative_whitening=False,
)

results = dict()
for t in torch.linspace(start=0, end=1, steps=11):
    for r in range(REPEATS):
        run_id = (round(t.item(), 2), r)
        print(f"run_id = {run_id}")

        # Load data_________________________________________________________________________________
        seed_everything(seed=SEED)
        train_ds, val_ds = make_dataset(n_samples=N_SAMPLES, t=t.item())

        train_dl = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=False)
        val_ds = DataLoader(val_ds, batch_size=BATCH_SIZE, shuffle=False)

        # Build NCP_________________________________________________________________________________
        ncp = NCP(
            embedding_x=MLP(input_shape=1, **NCP_PARAMS),
            embedding_y=MLP(input_shape=1, **NCP_PARAMS),
        )

        # Train NCP_________________________________________________________________________________
        # Training module for lightning
        model = NCPTrainingModule(results=results, run_id=run_id, ncp=ncp)

        # Create logger
        logger = CSVLogger(save_dir=RUN_PATH, name="detecting_independence", version=run_id)

        # Create callbacks
        # TODO: Add ModelCheckpoint and EarlyStopping
        # ckpt_call = ModelCheckpoint()
        # early_call = EarlyStopping()

        trainer = L.Trainer(
            accelerator="cpu",
            precision="bf16",
            logger=logger,
            # callbacks=[ckpt_call, early_call],
            max_epochs=100,
            check_val_every_n_epoch=25,
            enable_model_summary=False,
            enable_progress_bar=False,
        )

        trainer.fit(model, train_ds, val_ds)

run_id = (0.0, 0)
run_id = (0.1, 0)
run_id = (0.2, 0)
run_id = (0.3, 0)
run_id = (0.4, 0)
run_id = (0.5, 0)
run_id = (0.6, 0)
run_id = (0.7, 0)
run_id = (0.8, 0)
run_id = (0.9, 0)
run_id = (1.0, 0)


## Plots

In [None]:
import seaborn as sns
import pandas as pd

results_df = pd.DataFrame(
    data=[(t, r, norm) for ((t, r), norm) in results.items()], columns=["t", "r", "norm"]
)
sns.pointplot(results_df, x="t", y="norm")