Skip to content

Commit

Permalink
Merge pull request #1 from Autoplectic/cumulative_residual_entropy
Browse files Browse the repository at this point in the history
Add the Cumulative Residual Entropy
  • Loading branch information
chebee7i committed Mar 7, 2015
2 parents b210644 + 30f0d75 commit 9c3dc44
Show file tree
Hide file tree
Showing 9 changed files with 406 additions and 3 deletions.
8 changes: 6 additions & 2 deletions dit/abc.py
Expand Up @@ -10,8 +10,10 @@
)

from dit.other import (extropy as J,
perplexity as P,
)
perplexity as P,
cumulative_residual_entropy as CRE,
generalized_cumulative_residual_entropy as GCRE,
)

from dit.multivariate import (binding_information as B,
coinformation as I,
Expand Down Expand Up @@ -54,6 +56,8 @@
_others = [
'P', # the perplexity
'J', # the extropy
'CRE', # the cumulative residual entropy
'GCRE', # the generalized cumulative residual entropy
]

__all__ = _dists + _shannon + _shannon_ext + _divergences + _others
1 change: 1 addition & 0 deletions dit/other/__init__.py
Expand Up @@ -5,3 +5,4 @@

from .extropy import extropy
from .perplexity import perplexity
from .cumulative_residual_entropy import *
201 changes: 201 additions & 0 deletions dit/other/cumulative_residual_entropy.py
@@ -0,0 +1,201 @@
"""
The (generalized) cumulative residual entropy and conditional (generalized)
cumulative residual entropy.
"""

from six.moves import range # pylint: disable=redefined-builtin,import-error

import numpy as np

from .. import Distribution as D, ScalarDistribution as SD
from ..algorithms.stats import _numerical_test
from ..utils import pairwise

__all__ = ['cumulative_residual_entropy',
'generalized_cumulative_residual_entropy',
'conditional_cumulative_residual_entropy',
'conditional_generalized_cumulative_residual_entropy',
]

def _cumulative_residual_entropy(dist, generalized=False):
"""
The cumulative residual entropy is an alternative to the Shannon
differential entropy with several advantagious properties.
Parameters
----------
dist : ScalarDistribution
The distribution to compute the cumulative residual entropy of.
generalized : bool
Wheither to integrate from zero over the CDF or to integrate from zero
over the CDF of the absolute value.
Returns
-------
CRE : float
The (generalized) cumulative residual entropy.
Examples
--------
"""
_numerical_test(dist)
eps = ((e if generalized else abs(e), p) for e, p in dist.zipped())
events, probs = zip(*sorted(eps))
cdf = dict((a, p) for a, p in zip(events, np.cumsum(probs)))
terms = []
for a, b in pairwise(events):
pgx = cdf[a]
term = (b-a)*pgx*np.log2(pgx)
terms.append(term)
return -np.nansum(terms)

def generalized_cumulative_residual_entropy(dist, extract=False):
"""
The generalized cumulative residual entropy is a generalized from of the
cumulative residual entropy. Rarther than integrating from 0 to infinty over
the absolute value of the CDF.
Parameters
----------
dist : Distribution
The distribution to compute the generalized cumulative residual entropy
of each index for.
extract : bool
If True and `dist.outcome_length()` is 1, return the single GCRE value
rather than a length-1 array.
Returns
-------
GCREs : ndarray
The generalized cumulative residual entropy for each index.
Examples
--------
>>> generalized_cumulative_residual_entropy(uniform(-2, 3))
1.6928786893420307
>>> generalized_cumulative_residual_entropy(uniform(0, 5))
1.6928786893420307
"""
if not dist.is_joint():
return _cumulative_residual_entropy(dist, generalized=True)
length = dist.outcome_length()
margs = [SD.from_distribution(dist.marginal([i])) for i in range(length)]
cres = np.array([_cumulative_residual_entropy(m, generalized=True) for m in margs])
if len(cres) == 1 and extract:
cres = cres[0]
return cres

def cumulative_residual_entropy(dist, extract=False):
"""
The cumulative residual entropy is an alternative to the Shannon
differential entropy with several desirable properties including
non-negativity.
Parameters
----------
dist : Distribution
The distribution to compute the cumulative residual entropy of each
index for.
extract : bool
If True and `dist.outcome_length()` is 1, return the single GCRE value
rather than a length-1 array.
Returns
-------
CREs : ndarray
The cumulative residual entropy for each index.
Examples
--------
>>> d1 = ScalarDistribution([1, 2, 3, 4, 5, 6], [1/6]*6)
>>> d2 = ScalarDistribution([1, 2, 3, 4, 5, 100], [1/6]*6)
>>> cumulative_residual_entropy(d1)
2.0683182557028439
>>> cumulative_residual_entropy(d2)
22.672680046016705
"""
if not dist.is_joint():
return _cumulative_residual_entropy(dist, generalized=False)
es, ps = zip(*[(tuple(abs(ei) for ei in e), p) for e, p in dist.zipped()])
abs_dist = D(es, ps)
return generalized_cumulative_residual_entropy(abs_dist, extract)

def conditional_cumulative_residual_entropy(dist, rv, crvs=None, rv_mode=None):
"""
Returns the conditional cumulative residual entropy.
Parameters
----------
dist : Distribution
rv : list, None
crvs : list, None
Returns
-------
CCRE : ScalarDistribution
The conditional cumulative residual entropy.
Examples
--------
>>> from itertools import product
>>> events = [ (a, b) for a, b, in product(range(5), range(5)) if a <= b ]
>>> probs = [ 1/(5-a)/5 for a, b in events ]
>>> d = Distribution(events, probs)
>>> print(conditional_cumulative_residual_entropy(d, 1, [0]))
Class: ScalarDistribution
Alphabet: (-0.0, 0.5, 0.91829583405448956, 1.3112781244591329, 1.6928786893420307)
Base: linear
x p(x)
-0.0 0.2
0.5 0.2
0.918295834054 0.2
1.31127812446 0.2
1.69287868934 0.2
"""
if crvs is None:
crvs = []
mdist, cdists = dist.condition_on(crvs=crvs, rvs=[rv], rv_mode=rv_mode)
cres = [cumulative_residual_entropy(cd, extract=True) for cd in cdists]
ccre = SD(cres, mdist.pmf)
return ccre

def conditional_generalized_cumulative_residual_entropy(dist, rv, crvs=None, rv_mode=None):
"""
Returns the conditional cumulative residual entropy.
Parameters
----------
dist : Distribution
rv : list, None
crvs : list, None
Returns
-------
CCRE : ScalarDistribution
The conditional cumulative residual entropy.
Examples
--------
>>> from itertools import product
>>> events = [ (a-2, b-2) for a, b, in product(range(5), range(5)) if a <= b ]
>>> probs = [ 1/(3-a)/5 for a, b in events ]
>>> d = Distribution(events, probs)
>>> print(conditional_generalized_cumulative_residual_entropy(d, 1, [0]))
Class: ScalarDistribution
Alphabet: (-0.0, 0.5, 0.91829583405448956, 1.3112781244591329, 1.6928786893420307)
Base: linear
x p(x)
-0.0 0.2
0.5 0.2
0.918295834054 0.2
1.31127812446 0.2
1.69287868934 0.2
"""
if crvs is None:
crvs = []
mdist, cdists = dist.condition_on(crvs=crvs, rvs=[rv], rv_mode=rv_mode)
cres = [generalized_cumulative_residual_entropy(cd, extract=True) for cd in cdists]
ccre = SD(cres, mdist.pmf)
return ccre
136 changes: 136 additions & 0 deletions dit/other/tests/test_cumulative_residual_entropy.py
@@ -0,0 +1,136 @@
"""
Tests for dit.others.cumulative_residual_entropy.
"""

from __future__ import division

from nose.tools import assert_almost_equal, assert_raises
from numpy.testing import assert_array_almost_equal

from six.moves import range # pylint: disable=redefined-builtin

from itertools import combinations, product

from dit import (ScalarDistribution as SD,
Distribution as D)
from dit.algorithms.stats import mean, standard_deviation
from dit.other import (cumulative_residual_entropy as CRE,
generalized_cumulative_residual_entropy as GCRE,
conditional_cumulative_residual_entropy as CCRE,
conditional_generalized_cumulative_residual_entropy as CGCRE)
from dit.example_dists import uniform, Xor

def miwin():
d3 = [1, 2, 5, 6, 7, 9]
d4 = [1, 3, 4, 5, 8, 9]
d5 = [ 2, 3, 4, 6, 7, 8 ]
d = D(list(product(d3, d4, d5)), [1/6**3]*6**3)
return d

def conditional_uniform1():
events = [ (a, b) for a, b, in product(range(5), range(5)) if a <= b ]
probs = [ 1/(5-a)/5 for a, _ in events ]
d = D(events, probs)
return d

def conditional_uniform2():
events = [ (a-2, b-2) for a, b, in product(range(5), range(5)) if a <= b ]
probs = [ 1/(3-a)/5 for a, _ in events ]
d = D(events, probs)
return d

def test_cre_1():
"""
Test the CRE against known values for several uniform distributions.
"""
dists = [ uniform(-n//2, n//2) for n in range(2, 23, 2) ]
results = [0.5, 0.81127812, 1.15002242, 1.49799845, 1.85028649, 2.20496373,
2.56111354, 2.91823997, 3.27604979, 3.6343579, 3.99304129]
for d, r in zip(dists, results):
yield assert_almost_equal, r, CRE(d)

def test_cre_2():
"""
Test the CRE of a multivariate distribution (CRE is of each marginal).
"""
d = miwin()
assert_array_almost_equal(CRE(d), [3.34415526, 3.27909534, 2.56831826])

def test_cre_3():
"""
Test that the CRE fails when the events are not numbers.
"""
dist = Xor()
assert_raises(TypeError, CRE, dist)

def test_gcre_1():
"""
Test the GCRE against known values for the uniform distribution.
"""
dists = [ uniform(-n//2, n//2) for n in range(2, 23, 2) ]
results = [0.5, 1.31127812, 2.06831826, 2.80927657, 3.54316518, 4.27328199,
5.00113503, 5.72751654, 6.45288453, 7.17752308, 7.90161817]
for d, r in zip(dists, results):
yield assert_almost_equal, r, GCRE(d)

def test_gcre_32():
"""
Test the GCRE of a multivariate distribution.
"""
d = miwin()
assert_array_almost_equal(GCRE(d), [3.34415526, 3.27909534, 2.56831826])

def test_gcre_3():
"""
Test that the GCRE fails on non-numeric events.
"""
dist = Xor()
assert_raises(TypeError, GCRE, dist)

def test_gcre_4():
"""
Test that equal-length uniform distributions all have the same GCRE.
"""
gcres = [ GCRE(uniform(i, i+5)) for i in range(-5, 1) ]
for gcre in gcres:
yield assert_almost_equal, gcre, gcres[0]

def test_ccre_1():
"""
Test that independent RVs have CCRE = CRE.
"""
d = miwin()
for crvs in combinations([0, 1, 2], 2):
rv = (set([0, 1, 2]) - set(crvs)).pop()
ccre1 = CCRE(d, rv, crvs)
ccre2 = CCRE(d, rv)
yield assert_almost_equal, CRE(d)[rv], mean(ccre1)
yield assert_almost_equal, CRE(d)[rv], mean(ccre2)
yield assert_almost_equal, standard_deviation(ccre1), 0

def test_ccre_2():
"""
Test a correlated distribution.
"""
d = conditional_uniform1()
ccre = CCRE(d, 1, [0])
uniforms = [ CRE(uniform(i)) for i in range(1, 6) ]
assert_array_almost_equal(ccre.outcomes, uniforms)

def test_ccre_3():
"""
Test a correlated distribution.
"""
d = conditional_uniform2()
ccre = CCRE(d, 1, [0])
print(ccre)
uniforms = sorted([ CRE(uniform(i-2, 3)) for i in range(5) ])
assert_array_almost_equal(ccre.outcomes, uniforms)

def test_cgcre_1():
"""
"""
d = conditional_uniform2()
cgcre = CGCRE(d, 1, [0])
uniforms = [ GCRE(uniform(i)) for i in range(1, 6) ]
assert_array_almost_equal(cgcre.outcomes, uniforms)
10 changes: 9 additions & 1 deletion dit/utils/misc.py
Expand Up @@ -8,13 +8,14 @@
from __future__ import absolute_import

from collections import Iterable
from itertools import tee
import os
import sys
import subprocess
import warnings

import six
from six.moves import range # pylint: disable=redefined-builtin
from six.moves import range, zip # pylint: disable=redefined-builtin

__all__ = (
'Property',
Expand All @@ -34,6 +35,7 @@
'require_keys',
'str_product',
'digits',
'pairwise',
)

######################################################
Expand Down Expand Up @@ -615,3 +617,9 @@ def digits(n, base, alphabet=None, pad=0, big_endian=True):
sequence = [alphabet[i] for i in sequence]

return sequence

def pairwise(iterable):
"s -> (s0,s1), (s1,s2), (s2, s3), ..."
a, b = tee(iterable)
next(b, None)
return zip(a, b)

0 comments on commit 9c3dc44

Please sign in to comment.