# Probability

> Core probability utilities for RBE - normalization, sampling, entropy, and divergence measures

In [None]:
#| default_exp rbe.probability

In [None]:
#| hide
from nbdev.showdoc import *

  import pkg_resources,importlib


## Imports and utils

In [None]:
#| export
import numpy as np
from typing import Optional, Union, List
from fastcore.all import *

In [None]:
from fastcore.test import test_eq, test_close

## Basic Operations

Core probability operations following fast.ai style - short names, clear purpose.

We write source code first, and then tests come after. The tests serve as both a means to confirm that the code works and also serves as working examples. 

The `normalize` function takes a list or array of numbers and converts them into proper probabilities that sum to 1.

For example, if you have raw scores like `[1, 2, 3]`, it converts them to `[1/6, 2/6, 3/6]` = `[0.167, 0.333, 0.5]`.

This is essential for probability calculations because:
- Probabilities must sum to 1 by definition
- Many algorithms (like sampling) require normalized distributions
- Raw scores from sensors or models often aren't normalized

The function also includes robust error handling for edge cases common in security applications - rejecting negative values, empty arrays, and all-zero inputs that could indicate data corruption or sensor failures.

In [None]:
#| export
def normalize(probs):
    """Normalize probabilities to sum to 1."""
    probs = np.asarray(probs, dtype=np.float64)  # Ensure float64 for precision
    if probs.size == 0: raise ValueError("Cannot normalize empty array")
    if np.any(probs < 0): raise ValueError("Probabilities must be non-negative") 
    s = np.sum(probs)
    if s == 0: raise ValueError("Cannot normalize zero probabilities")
    return probs / s

In [None]:
normalize([1, 2, 3])

array([0.16666667, 0.33333333, 0.5       ])

In [None]:
# Test normalize function with comprehensive edge cases
# Basic normalization
probs = [1, 2, 3]
normed = normalize(probs)
test_close(np.sum(normed), 1.0)
test_close(normed, [1/6, 2/6, 3/6])

# Already normalized - should remain unchanged
test_close(normalize([0.2, 0.3, 0.5]), [0.2, 0.3, 0.5])

# Single element - critical for RBE edge cases
test_close(normalize([5]), [1.0])

# Uniform distribution
test_close(normalize([1, 1, 1, 1]), [0.25, 0.25, 0.25, 0.25])

# Very small numbers (numerical stability for anomaly scores)
tiny = [1e-10, 2e-10, 3e-10]
normed_tiny = normalize(tiny)
test_close(np.sum(normed_tiny), 1.0)
assert normed_tiny.dtype == np.float64, "Should maintain float64 precision"

# Large numbers (overflow protection)
large = [1e100, 2e100, 3e100]
normed_large = normalize(large) 
test_close(np.sum(normed_large), 1.0)
test_close(normed_large, [1/6, 2/6, 3/6])

# Mixed scales (common in cyber security scores)
mixed = [0.001, 1000, 0.1]
normed_mixed = normalize(mixed)
test_close(np.sum(normed_mixed), 1.0)


In [None]:
# Test error conditions
# Empty array
try:
    normalize([])
    assert False, "Should raise ValueError for empty array"
except ValueError as e:
    assert "empty array" in str(e)

# All zeros
try:
    normalize([0, 0, 0])
    assert False, "Should raise ValueError for zero probabilities"
except ValueError as e:
    assert "zero probabilities" in str(e)

# Negative values (data corruption detection)
try:
    normalize([1, -2, 3])
    assert False, "Should raise ValueError for negative probabilities"
except ValueError as e:
    assert "non-negative" in str(e)

# NaN values (sensor failure detection)
try:
    normalize([1, np.nan, 3])
    assert False, "Should handle NaN gracefully"
except:
    pass  # Expected to fail somehow

The `sample` function randomly selects indices from a probability distribution. 

Given a list of probabilities (like `[0.1, 0.7, 0.2]`), it returns random indices (0, 1, or 2) where higher probability values are more likely to be chosen. For example, index 1 would be selected about 70% of the time.

Key features:
- Takes any probabilities (automatically normalizes them to sum to 1)
- Returns a single index when `n=1`, or an array of indices when `n>1`
- Uses a controllable random number generator for reproducible results
- Essential for Monte Carlo methods in Recursive Bayesian Estimators

In your cyber security context, this would be useful for simulating network events based on their estimated probabilities or sampling from threat likelihood distributions.

In [None]:
#| export
def sample(probs, # probability distribution
           n=1, # number of samples
           rng=None # random number generator
           ):
    """Sample indices from probability distribution."""
    if rng is None: rng = np.random.default_rng()
    probs = normalize(probs)  # This handles all validation
    if n == 1:
        return rng.choice(len(probs), p=probs)  # Return scalar
    else:
        return rng.choice(len(probs), size=n, p=probs)  # Return array

In [None]:
sample([0.1,0.7,0.2], n=10, rng=np.random.default_rng(42))

array([1, 1, 2, 1, 0, 2, 1, 1, 1, 1])

In [None]:
sample([0.1,0.7,0.2], n=1, rng=np.random.default_rng(42))

1

In [None]:
# Test sample function - critical for RBE Monte Carlo methods

# Basic sampling with fixed seed for reproducibility
rng = np.random.default_rng(42)
samples = sample([0.1, 0.7, 0.2], n=1000, rng=rng)
assert len(samples) == 1000
assert np.all((samples >= 0) & (samples <= 2))

# Check distribution approximates expected probabilities
counts = np.bincount(samples, minlength=3)
freqs = counts / 1000
test_close(freqs, [0.1, 0.7, 0.2], eps=0.05)  # Allow 5% tolerance

# Single sample returns scalar (not array)
rng = np.random.default_rng(123)
single = sample([0.3, 0.7], n=1, rng=rng)
assert isinstance(single, (int, np.integer)), f"Expected scalar, got {type(single)}"
assert 0 <= single <= 1

# Multiple samples return array
multiple = sample([0.3, 0.7], n=5, rng=rng)
assert isinstance(multiple, np.ndarray), "Expected array for n>1"
assert len(multiple) == 5



In [None]:
# Test with unnormalized probabilities (common in cyber security)
unnorm = [10, 70, 20]  # Sums to 100, not 1
rng = np.random.default_rng(456)
samples = sample(unnorm, n=1000, rng=rng)
counts = np.bincount(samples, minlength=3)
freqs = counts / 1000
test_close(freqs, [0.1, 0.7, 0.2], eps=0.05)

# Edge case: single option (deterministic)
certain = sample([1], n=10, rng=rng)
assert np.all(certain == 0), "Single option should always return index 0"

# Extreme probabilities (rare events in anomaly detection)
rare = [0.999, 0.001]  # Very rare anomaly
samples = sample(rare, n=10000, rng=np.random.default_rng(789))
anomaly_count = np.sum(samples == 1)
# Should be around 10 anomalies, allow wide tolerance for randomness
assert 0 <= anomaly_count <= 50, f"Got {anomaly_count} anomalies"



In [None]:
# Test error conditions for robust cyber security applications

# Negative probabilities (corrupted threat scores)
try:
    sample([0.5, -0.3, 0.8], n=1)
    assert False, "Should reject negative probabilities"
except ValueError as e:
    assert "non-negative" in str(e)

# Empty probabilities
try:
    sample([], n=1)
    assert False, "Should reject empty probability array"
except ValueError as e:
    assert "empty array" in str(e)

# Zero sample count
zero_samples = sample([0.5, 0.5], n=0)
assert len(zero_samples) == 0, "n=0 should return empty array"

# Test reproducibility (critical for security audits)
rng1 = np.random.default_rng(999)
rng2 = np.random.default_rng(999)
s1 = sample([0.4, 0.6], n=100, rng=rng1)
s2 = sample([0.4, 0.6], n=100, rng=rng2)
assert np.array_equal(s1, s2), "Same seed should produce identical results"


## Information Measures

Entropy and divergence measures for quantifying uncertainty and comparing distributions.

In [None]:
#| export
def entropy(probs, base=2):
    "Calculate entropy of `probs` distribution in given `base`"
    probs = normalize(probs)
    probs = probs[probs > 0]  # Remove zeros to avoid log(0)
    if base == 2:
        return -np.sum(probs * np.log2(probs))
    elif base == 'e':
        return -np.sum(probs * np.log(probs))
    else:
        return -np.sum(probs * np.log(probs)) / np.log(base)

def kl_div(p, q, eps=1e-10):
    "KL divergence from `q` to `p`"
    p, q = normalize(p), normalize(q)
    # Add epsilon to avoid log(0)
    return np.sum(p * np.log((p + eps) / (q + eps)))

def js_div(p, q):
    "Jensen-Shannon divergence between `p` and `q`"
    p, q = normalize(p), normalize(q)
    m = 0.5 * (p + q)
    return 0.5 * kl_div(p, m) + 0.5 * kl_div(q, m)

In [None]:
# Test entropy
uniform = [0.5, 0.5]
certain = [1.0, 0.0]
assert entropy(uniform) > entropy(certain)
test_close(entropy(uniform), 1.0)  # Maximum entropy for 2 outcomes

# Test KL divergence
p = [0.5, 0.5]
q = [0.5, 0.5]
test_close(kl_div(p, q), 0.0, eps=1e-10)  # Same distributions

# Test JS divergence (symmetric)
test_close(js_div(p, q), js_div(q, p))  # Should be symmetric

## Effective Sample Size

Measure of particle filter health - how many particles are effectively contributing.

In [None]:
#| export
def eff_size(weights):
    "Calculate effective sample size of normalized `weights`"
    weights = normalize(weights)
    return 1.0 / np.sum(weights**2)

In [None]:
# Test effective sample size
uniform_weights = np.ones(100) / 100
skewed_weights = np.zeros(100)
skewed_weights[0] = 1.0

test_close(eff_size(uniform_weights), 100.0)  # All particles contribute
test_close(eff_size(skewed_weights), 1.0)     # Only one particle

## Categorical Distribution Utilities

In [None]:
#| export
def categorical(probs, labels=None):
    "Create categorical distribution from `probs` with optional `labels`"
    probs = normalize(probs)
    if labels is None:
        labels = list(range(len(probs)))
    return dict(zip(labels, probs))

def uniform(n):
    "Create uniform distribution over `n` outcomes"
    return np.ones(n) / n

def from_counts(counts):
    "Create probability distribution from `counts`"
    counts = np.asarray(counts)
    if np.any(counts < 0):
        raise ValueError("Counts must be non-negative")
    return normalize(counts)

In [None]:
# Test categorical utilities
cat_dist = categorical([1, 2, 3], ['A', 'B', 'C'])
test_eq(cat_dist['A'], 1/6)
test_eq(cat_dist['B'], 2/6)
test_eq(cat_dist['C'], 3/6)

# Test uniform
u = uniform(4)
test_close(u, [0.25, 0.25, 0.25, 0.25])

# Test from_counts
probs = from_counts([10, 20, 30])
test_close(probs, [1/6, 2/6, 3/6])

## Export

In [None]:
#| export
__all__ = [
    # Basic operations
    'normalize', 'sample',
    
    # Information measures
    'entropy', 'kl_div', 'js_div',
    
    # Effective sample size
    'eff_size',
    
    # Categorical utilities
    'categorical', 'uniform', 'from_counts'
]

In [None]:
#|hide
import nbdev; nbdev.nbdev_export()