# Online Computation of Statistical Quantities

In many real-world scenarios, datasets can be exceedingly large, making it impractical or even impossible to load the entire dataset into memory at once. However, there are numerous situations where it's crucial to compute statistical quantities such as mean, sum, variance, and covariance from the dataset. This is particularly common in fields like machine learning, signal processing, and data analysis.

To address this challenge, we introduce a set of Python classes that implement online computation algorithms for various statistical quantities. Online computation, also known as incremental computation, allows us to calculate statistical measures efficiently by processing data in small batches or one data point at a time, without needing to store the entire dataset in memory simultaneously.

The classes provided in this notebook include:

- **IterMean**: Computes the mean of the dataset incrementally.
- **IterSum**: Calculates the sum of the dataset iteratively.
- **IterVar**: Computes the variance of the dataset in an online fashion.
- **IterCov**: Computes the covariance matrix of two datasets incrementally.

Each class is designed to update its internal state as new data becomes available, allowing for seamless integration into pipelines where data is streamed or loaded in chunks.

These online computation techniques not only enable the processing of large datasets that exceed memory constraints but also offer advantages in scenarios where data arrives sequentially or in batches, such as real-time data analysis or processing data from streaming sources.

In [1]:
import numpy as np

In [2]:
class IterFunc:
    """Base class for iterative functions."""

    def __init__(self):
        self.n_samples = 0

    def _process_batch(self, batch_data):
        """Process batch data and return valid data."""
        batch_data = np.atleast_2d(batch_data)
        ndim = batch_data.ndim
        axis = tuple(range(1, ndim))
        nan_mask = np.isnan(batch_data).any(axis=axis)
        valid_data = batch_data[~nan_mask]
        self.n_samples += len(valid_data)
        return valid_data

    def __repr__(self):
        return f"{self.__class__.__name__}()"


class IterMean(IterFunc):
    """Class to calculate the mean iteratively."""

    def __init__(self):
        super().__init__()
        self.mean = None

    def update_batch(self, batch_data):
        """Update the mean with batch data."""
        valid_data = self._process_batch(batch_data)
        n = len(valid_data)
        if n > 0:
            if self.mean is None:
                self.mean = np.mean(valid_data, axis=0)
            else:
                self.mean = ((self.n_samples - n) * self.mean + np.sum(valid_data, axis=0)) / self.n_samples
        return self

    def __call__(self) -> np.ndarray:
        """Return the mean."""
        if self.mean is None:
            raise ValueError("No valid data to compute the mean.")
        return self.mean.copy()


class IterSum(IterFunc):
    """Class to calculate the sum iteratively."""

    def __init__(self):
        super().__init__()
        self.sum = None

    def update_batch(self, batch_data):
        """Update the sum with batch data."""
        valid_data = self._process_batch(batch_data)
        if valid_data.shape[0] > 0:
            if self.sum is None:
                self.sum = np.sum(valid_data, axis=0)
            else:
                self.sum += np.sum(valid_data, axis=0)
        return self

    def __call__(self) -> np.ndarray:
        """Return the sum."""
        if self.sum is None:
            raise ValueError("No valid data to compute the sum.")
        return self.sum.copy()


class IterVar(IterFunc):
    """Class to calculate the variance iteratively."""

    def __init__(self, ddof=0):
        super().__init__()
        self.mean = IterMean()
        self.var = None
        self.ddof = ddof

    def update_batch(self, v):
        """Update the variance with batch data."""
        valid_data = self._process_batch(v)
        n = len(valid_data)
        if n > 0:
            if self.var is None:
                self.var = np.var(valid_data, axis=0, ddof=0)
                self.mean.update_batch(valid_data)
            else:
                previous_mean = self.mean()
                self.mean.update_batch(valid_data)
                updated_mean = self.mean()
                incremental_variance = np.einsum('ji,ji->i', valid_data - previous_mean, valid_data - updated_mean)
                self.var = ((self.n_samples - n) * self.var + incremental_variance) / self.n_samples
        return self

    def __call__(self) -> np.ndarray:
        """Return the variance."""
        if self.var is None:
            raise ValueError("No valid data to compute the variance.")
        return self.n_samples / (self.n_samples - self.ddof) * self.var


class IterCov(IterFunc):
    """Class to calculate the covariance iteratively."""

    def __init__(self, ddof=0):
        super().__init__()
        self.mean1 = IterMean()
        self.mean2 = IterMean()
        self.cov = None
        self.ddof = ddof

    def _process_batch(self, v1, v2):
        """Process two batches of data and return valid data."""
        w1, w2 = np.atleast_2d(v1), np.atleast_2d(v2)
        if len(w1) != len(w2):
            raise ValueError("v1 and v2 have different lengths!")
        mask = (~np.any(np.isnan(w1), axis=1)) & (~np.any(np.isnan(w2), axis=1))
        self.n_samples += mask.sum()
        return w1[mask], w2[mask]

    def update_batch(self, v1, v2):
        """Update the covariance with two batches of data."""
        w1, w2 = self._process_batch(v1, v2)
        n = len(w1)
        if n > 0:
            if self.cov is None:
                n1, n2 = w1.shape[1], w2.shape[1]
                self.mean1.update_batch(w1)
                self.mean2.update_batch(w2)
                if n == 1:
                    self.cov = np.zeros((n1, n2))
                else:
                    self.cov = (w1-self.mean1()).T@((w2-self.mean2())/n)
            else:
                # the order of the 6 following lines is important
                m1 = self.mean1()
                self.mean2.update_batch(w2)
                m2 = self.mean2()
                self.cov += (w1-m1).T@((w2-m2)/self.mean1.n_samples)
                self.cov *= (self.mean1.n_samples/self.mean2.n_samples)
                self.mean1.update_batch(w1)

    def __call__(self) -> np.ndarray:
        """Return the covariance."""
        if self.cov is None:
            raise ValueError("No valid data to compute the covariance.")
        return self.n_samples/(self.n_samples - self.ddof)*self.cov

We conduct tests to verify the correctness and functionality of the iterative computation classes: `IterSum`, `IterMean`, `IterVar`, and `IterCov`. Each test function takes input data and splits it into a specified number of batches. It then iteratively updates an instance of the corresponding class with each batch of data and verifies whether the computed statistical quantity matches the expected value. We utilize the `np.allclose` function to check for numerical closeness between the computed value and the ground truth value. Upon running each test, if the computed value matches the expected value within a reasonable tolerance, the function returns 'OK!', indicating that the respective implementation behaves as expected.

In [3]:
n_samples = 1_000_000
n_features = 50
n_batches = 31
data = np.random.randn(n_samples, n_features)

In [4]:
def test_IterSum(data, n_batches):
    itersum = IterSum()
    for batch_data in np.array_split(data, n_batches):
        itersum.update_batch(batch_data)
    assert np.allclose(itersum(), np.sum(data, axis=0))
    return 'OK!'


test_IterSum(data=data, n_batches=n_batches)

'OK!'

In [5]:
def test_IterMean(data, n_batches):
    itermean = IterMean()
    for batch_data in np.array_split(data, n_batches):
        itermean.update_batch(batch_data)
    assert np.allclose(itermean(), np.mean(data, axis=0))
    return 'OK!'


test_IterMean(data=data, n_batches=n_batches)

'OK!'

In [6]:
def test_IterVar(data, n_batches):
    itervar = IterVar(ddof=1)
    for batch_data in np.array_split(data, n_batches):
        itervar.update_batch(batch_data)
    assert np.allclose(itervar(), np.var(data, axis=0, ddof=1))
    return 'OK!'


test_IterVar(data=data, n_batches=n_batches)

'OK!'

In [7]:
def test_IterVar_IterCov(data, n_batches):
    itervar = IterVar()
    itercov = IterCov()

    for batch_data in np.array_split(data, n_batches):
        itervar.update_batch(batch_data)
        itercov.update_batch(batch_data, batch_data)

    cov = itercov()
    var = itervar()

    true_cov = np.cov(data.T, ddof=0)
    true_var = np.var(data, axis=0)

    assert np.allclose(cov, true_cov)
    assert np.allclose(var, true_var)
    assert np.allclose(var, np.diag(cov))
    return 'OK!'


test_IterVar_IterCov(data=data, n_batches=n_batches)

'OK!'