In [138]:
import numpy as np
from collections import deque

# Running Mean and Variance

In [None]:
class StreamingStats:
    def __init__(self):
        self.n = 0
        self.mean = 0.0
        self.M2 = 0.0

    def update(self, x: float):
        self.n += 1
        delta = x - self.mean
        self.mean += delta / self.n
        delta2 = x - self.mean
        self.M2 += delta * delta2

    def get_variance(self, ddof=1):
        if self.n < 2:
            return 0.0
        return self.M2 / (self.n - ddof)

    def get_mean(self):
        return self.mean

# Usage
stream = StreamingStats()
data = [10, 12, 23, 23, 16, 23, 21, 16]

for x in data:
    stream.update(x)
    print(f"Val: {x}, Mean: {stream.get_mean():.2f}, Var: {stream.get_variance():.2f}")

In [None]:
class RollingStats:
    def __init__(self, window=3):
        self.window = window
        self.values = deque(maxlen=window)

    def update(self, x: float):
        self.values.append(x) # deque automatically handles popping the oldest element when full

    def get_mean(self):
        if not self.values:
            return 0.0
        return sum(self.values) / len(self.values)

    def get_variance(self, ddof=1):
        if len(self.values) < 2:
            return 0.0
        mu = self.get_mean()
        # Standard variance calculation on the current window
        return sum((x - mu) ** 2 for x in self.values) / (len(self.values) - ddof)

# Usage
stream = RollingStats(window=3)
data = [10, 12, 23, 23, 16, 23, 21, 16]

print(f"{'Value':<10} {'Rolling Mean':<15} {'Rolling Var':<15}")
print("-" * 40)

for x in data:
    stream.update(x)
    print(f"{x:<10} {stream.get_mean():<15.2f} {stream.get_variance():<15.2f}")

Value      Rolling Mean    Rolling Var    
----------------------------------------
10         10.00           0.00           
12         11.00           2.00           
23         15.00           49.00          
23         19.33           40.33          
16         20.67           16.33          
23         20.67           16.33          
21         20.00           13.00          
16         20.00           13.00          


# Mean and Variance

1. Incremental Mean
$$\mu_n = \mu_{n-1} + \frac{x_n - \mu_{n-1}}{n}$$
2. Incremental Sum of Squared Differences ($M_2$)
$$M_{2,n} = \sum_{i=1}^n (x_i - \mu_n)^2$$
$$M_{2,n} = M_{2,n-1} + (x_n - \mu_{n-1})(x_n - \mu_n)$$

In [None]:
def average_welford(arr: list):
    if not arr:
        return None, None
    # if input is a nd.array, change it to: if arr.size == 0
    mean = 0.0
    for count, x in enumerate(arr, 1):
        delta = x - mean
        mean += delta / count
    return mean

def variance_welford(arr: list):
    mean = 0.0
    m2 = 0.0
    for count, x in enumerate(arr, 1):
        delta = x - mean
        mean += delta / count
        delta_new = x - mean
        m2 +=  delta_new * delta
    variance = m2 / (count - 1) if count > 1 else 0
    return variance

def sample_stats_unoptimized(arr):
    if not arr:
        return None, None
    n = len(arr) # this is O(1)
    mean = sum(arr) / n
    variance = 0
    for v in arr:
        variance += (v - mean)**2
    divisor = n - 1 if n > 1 else n
    variance /= divisor
    return mean, variance

In [112]:
# Test
np.random.seed(0)
arr = np.random.normal(loc=10, scale=1, size=(30,)).tolist()

assert np.allclose(average_welford(arr), np.mean(arr))
assert np.allclose(variance_welford(arr), np.var(arr, ddof=1))

# Rolling Mean and Variance

In [None]:
def moving_average_raw(arr: list, k: int):
    # k: window size
    n = len(arr)
    rolling_means = []
    if k > n:
        return rolling_means
    mean = average_welford(arr[:k])
    rolling_means.append(mean)
    for i in range(k, n):
        mean += (arr[i] - arr[i-k]) / k
        rolling_means.append(mean)
    return rolling_means

def moving_average(arr: list, k: int) -> list[float]:
    n = len(arr)
    if n < k:
        return []
    # rolling_means = []
    # for i in range(n-k+1):
    #     rolling_means.append(sum(arr[i:i+k])/k)

    # |----------|--------|
    # 0-------(n-k+1)--(n-1)
    return [sum(arr[i:i+k])/k for i in range(n-k+1)]

def moving_average_convolve(arr: list, k: int) -> list[float]:
    kernel = np.ones(k) / k
    return np.convolve(arr, kernel, mode="valid")

In [111]:
ma0 = moving_average_raw(arr, 20)
ma1 = moving_average(arr, 20)
ma2 = moving_average_convolve(arr,20)

assert np.allclose(ma0, ma1)
assert np.allclose(ma1, ma2)

# Combined Datasets

In [None]:
nA, muA, stdA = 20, 80, 10
nB, muB, stdB = 11, 50, 20
# Combine dataset A and B
n = nA + nB
mu = (nA * muA + nB * muB) / (nA + nB)

# Sum of squares (adjusted degree of freedom)
ss_A = (nA - 1) * stdA**2 + nA * muA**2 
ss_B = (nB - 1) * stdB**2 + nB * muB**2
ss = ss_A + ss_B # total sum of squares
 
total_sum = mu * (nA + nB)
var = (ss - (total_sum**2 / n)) / (n - 1)
sd = np.sqrt(var)

# Unadjusted:
# deltaA = muA - mu # split (xi-mu) to (xi-muA+muA-mu)
# deltaB = muB - mu
# s = (nA * (deltaA**2 + stdA**2) + nB * (deltaB**2 + stdB**2)) / (nA + nB)
# sd = np.sqrt(s)