In [1]:
# !python3 -m pip uninstall tensorflow tensorflow-probability nsc -y
# !python3 -m pip install tensorflow tensorflow-probability -q

In [2]:
# !python3 -m pip uninstall nsc -y -q
# !python3 -m pip install -i https://test.pypi.org/simple/ nsc -q

In [3]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import scipy.stats as stats
sns.set()
from collections import defaultdict
import math
from typing import List

# From nsc lib
import nsc
from nsc import distributions as nsd
from nsc.util import function as nsc_func
from nsc.util.function import coupled_logarithm, coupled_exponential
# nsd = nsc.distributions

import ipdb

Importing NSC lib v0.0.2.


### 1) CoupledNormalDistribution function version

In [4]:
def norm_CG(sigma, kappa):
    if kappa == 0:
        result = math.sqrt(2*math.pi) * sigma
    elif kappa < 0:
        result = math.sqrt(math.pi) * sigma * math.gamma((-1+kappa) / (2*kappa)) / float(math.sqrt(-1*kappa) * math.gamma(1 - (1 / (2*kappa))))
    else:
        result = math.sqrt(math.pi) * sigma * math.gamma(1 / (2*kappa)) / float(math.sqrt(kappa) * math.gamma((1+kappa)/(2*kappa)))
  
    return result

In [5]:
def CoupledNormalDistribution(mean, sigma, kappa, alpha):
    """
    Short description
    
    Inputs
    ----------
    x : Input variable in which the coupled logarithm is applied to.
    mean : 
    sigma : 
    kappa : Coupling parameter which modifies the coupled logarithm function.
    dim : The dimension of x, or rank if x is a tensor. Not needed?
    """

    assert sigma > 0, "std must be greater than 0."
    assert alpha in [1, 2], "alpha must be set to either 1 or 2."

    
    if kappa >= 0:
        input = np.arange(mean-20, mean+20, (20+mean - -20+mean)/(2**16+1))
    else:
        x1 = mean - ((-1*sigma**2) / kappa)**0.5
        x2 = mean + ((-1*sigma**2) / kappa)**0.5
        input = np.arange(mean - ((-1*sigma**2) / kappa)**0.5, mean + ((-1*sigma**2) / kappa)**0.5, (x2-x1)/(2**16+1))
 
    normCGvalue = 1/float(norm_CG(sigma, kappa))
    
    coupledNormalDistributionResult = normCGvalue * (nsc_func.coupled_exponential((input - mean)**2/sigma**2, kappa)) ** -0.5
  
    return coupledNormalDistributionResult

In [6]:
kappa, alpha, dim = 0.5, 2, 1

In [7]:
mu, sigma = 0, 1 # mean and standard deviation
x = np.linspace(mu - 3*sigma, mu + 3*sigma, 100)
y = CoupledNormalDistribution(mu, sigma, kappa, alpha)

dx = np.arange(mu-20, mu+20, (20+mu - -20+mu)/(2**16+1))[1] - np.arange(mu-20, mu+20, (20+mu - -20+mu)/(2**16+1))[0]

### 2) CoupledNormalDistribution class

In [8]:
class CoupledNormal:
    """Coupled Normal Distribution.

    This distribution has parameters: location `loc`, 'scale', coupling `kappa`,
    and `alpha`.

    """
    def __init__(self,
                 loc: [int, float, List, np.ndarray],
                 scale: [int, float, List, np.ndarray],
                 kappa: [int, float] = 0.,
                 alpha: int = 2,
                 validate_args: bool = True
                 ):
        loc = np.asarray(loc) if isinstance(loc, List) else loc
        scale = np.asarray(scale) if isinstance(scale, List) else scale
        if validate_args:
            assert isinstance(loc, (int, float, np.ndarray)), "loc must be either an int/float type for scalar, or an list/ndarray type for multidimensional."
            assert isinstance(scale, (int, float, np.ndarray)), "scale must be either an int/float type for scalar, or an list/ndarray type for multidimensional."
            assert type(loc) == type(scale), "loc and scale must be the same type."
            if isinstance(loc, np.ndarray):
                assert loc.shape == scale.shape, "loc and scale must have the same dimensions (check respective .shape())."
                assert np.all((scale >= 0)), "All scale values must be greater or equal to 0."            
            else:
                assert scale >= 0, "scale must be greater or equal to 0."            
            assert isinstance(kappa, (int, float)), "kappa must be an int or float type."
            assert isinstance(alpha, int), "alpha must be an int that equals to either 1 or 2."
            assert alpha in [1, 2], "alpha must be equal to either 1 or 2."
        self.loc = loc
        self.scale = scale
        self.kappa = kappa
        self.alpha = alpha            

    def n_dim(self):
        return 1 if self._event_shape() == [] else self._event_shape()[0]

    def _batch_shape(self) -> List:
        if self._rank(self.loc) == 0:
            # return [] signifying single batch of a single distribution
            return []
        else:
            # return the batch shape in list format
            return list(self.loc.shape)

    def _event_shape(self) -> List:
        # For univariate Coupled Normal distribution, event shape is always []
        # [] signifies single random variable dim (regardless of batch size)
        return []

    def _rank(self, value: [int, float, np.ndarray]) -> int:
        # specify the rank of a given value, with rank=0 for a scalar and rank=ndim for an ndarray
        if isinstance(value, (int, float)):
            return 0 
        else:
            return len(value.shape)

    def prob(self, X: [List, np.ndarray]):
        # Check whether input X is valid
        X = np.asarray(X) if isinstance(X, List) else X
        assert isinstance(X, np.ndarray), "X must be a List or np.ndarray."
        # assert type(X[0]) == type(self.loc), "X samples must be the same type as loc and scale."
        if isinstance(X[0], np.ndarray):
            assert X[0].shape == self.loc.shape, "X samples must have the same dimensions as loc and scale (check respective .shape())."
        # Calculate PDF with input X
        X_norm = (X-self.loc)**2 / self.scale**2
        norm_term = self._normalized_term()
        p = (coupled_exponential(X_norm, self.kappa))**-0.5 / norm_term
        # normCGvalue =  1/float(norm_CG(scale, kappa))
        # coupledNormalDistributionResult = normCGvalue * (coupled_exponential(y, kappa)) ** -0.5
        return p

    # Normalization of 1-D Coupled Gaussian (NormCG)
    def _normalized_term(self) -> [int, float, np.ndarray]:
        if self.kappa == 0:
            norm_term = math.sqrt(2*math.pi) * self.scale
        elif self.kappa < 0:
            gamma_num = math.gamma(self.kappa-1) / (2*self.kappa)
            gamma_dem = math.gamma(1 - (1 / (2*self.kappa)))
            norm_term = (math.sqrt(math.pi)*self.scale*gamma_num) / float(math.sqrt(-1*self.kappa)*gamma_dem)
        else:
            gamma_num = math.gamma(1 / (2*self.kappa))
            gamma_dem = math.gamma((1+self.kappa)/(2*self.kappa))
            norm_term = (math.sqrt(math.pi)*self.scale*gamma_num) / float(math.sqrt(self.kappa)*gamma_dem)
        return norm_term

    def __repr__(self) -> str:
        return f"<nsc.distributions.{self.__class__.__name__} batch_shape={str(self._batch_shape())} event_shape={str(self._event_shape())}>"


In [9]:
class MultivariateCoupledNormal(CoupledNormal):
    """Multivariate Coupled Normal Distribution.

    This distribution has parameters: location `loc`, 'scale', coupling `kappa`,
    and `alpha`.

    """
    def __init__(self,
                 loc: [int, float, List, np.ndarray],
                 scale: [int, float, List, np.ndarray],
                 kappa: [int, float] = 0.,
                 alpha: int = 2,
                 validate_args: bool = True
                 ):
        if validate_args:
            assert isinstance(loc, (list, np.ndarray)), "loc must be either a list or ndarray type. Otherwise use CoupledNormal."
            assert isinstance(scale, (list, np.ndarray)), "scale must be either a list or ndarray type. Otherwise use CoupledNormal."
        super(MultivariateCoupledNormal, self).__init__(
            loc=loc,
            scale=scale,
            kappa=kappa,
            alpha=alpha,
            validate_args=validate_args
        )

    def _batch_shape(self) -> List:
        if self._rank(self.loc) == 1:
            # return [] signifying single batch of a single distribution
            return []
        else:
            # return [batch size]
            return list(self.loc.shape[:-1])
        
    def _event_shape(self) -> List:
        if self._rank(self.loc) == 1:
            # if loc is only a vector
            return list(self.loc.shape)
        else:
            # return [n of random variables] when rank >= 2
            return [self.loc.shape[-1]]

    def _rank(self, value: [int, float, np.ndarray]) -> int:
        # specify the rank of a given value, with rank=0 for a scalar and rank=ndim for an ndarray
        if isinstance(value, (int, float)):
            return 0 
        else:
            return len(value.shape)

    def prob(self, X: [List, np.ndarray]):
        pass
        '''
        # Check whether input X is valid
        X = np.asarray(X) if isinstance(X, List) else X
        assert isinstance(X, np.ndarray), "X must be a List or np.ndarray."
        # assert type(X[0]) == type(self.loc), "X samples must be the same type as loc and scale."
        if isinstance(X[0], np.ndarray):
            assert X[0].shape == self.loc.shape, "X samples must have the same dimensions as loc and scale (check respective .shape())."
        # Calculate PDF with input X
        X_norm = (X-self.loc)**2 / self.scale**2
        norm_term = self._normalized_term()
        p = (coupled_exponential(X_norm, self.kappa))**-0.5 / norm_term
        # normCGvalue =  1/float(norm_CG(scale, kappa))
        # coupledNormalDistributionResult = normCGvalue * (coupled_exponential(y, kappa)) ** -0.5
        return p
        '''

    # Normalization of the multivariate Coupled Gaussian (NormMultiCoupled)
    def _normalized_term(self) -> [int, float, np.ndarray]:
        pass
        '''
        if self.kappa == 0:
            norm_term = math.sqrt(2*math.pi) * self.scale
        elif self.kappa < 0:
            gamma_num = math.gamma(self.kappa-1) / (2*self.kappa)
            gamma_dem = math.gamma(1 - (1 / (2*self.kappa)))
            norm_term = (math.sqrt(math.pi)*self.scale*gamma_num) / float(math.sqrt(-1*self.kappa)*gamma_dem)
        else:
            gamma_num = math.gamma(1 / (2*self.kappa))
            gamma_dem = math.gamma((1+self.kappa)/(2*self.kappa))
            norm_term = (math.sqrt(math.pi)*self.scale*gamma_num) / float(math.sqrt(self.kappa)*gamma_dem)
        return norm_term
        '''


***Test***

In [10]:
mu, sigma, kappa, alpha = 0., 1., 0.5, 2
# X_input = np.arange(mu-20., mu+20., (20.+mu - -20.+mu)/(2.**16.+1.), dtype=float)
six_sigma = 6.*sigma
# X_input = np.arange(mu-6.*sigma, mu+6.*sigma, (20.+mu - -20.+mu)/(2.**16.+1.), dtype=float)
X_input = np.linspace(mu-six_sigma, mu+six_sigma, 1000)

In [11]:
X_input.shape

(1000,)

Coupled Normal distribution

In [12]:
cn = CoupledNormal(loc=mu, scale=sigma)
cn

<nsc.distributions.CoupledNormal batch_shape=[] event_shape=[]>

In [13]:
print(cn.n_dim())
print(cn._normalized_term())
print(cn.prob(X_input))

1
2.5066282746310002
[6.07588285e-09 6.52947950e-09 7.01592715e-09 7.53752759e-09
 8.09673816e-09 8.69618183e-09 9.33865787e-09 1.00271533e-08
 1.07648549e-08 1.15551621e-08 1.24017005e-08 1.33083365e-08
 1.42791924e-08 1.53186627e-08 1.64314314e-08 1.76224902e-08
 1.88971579e-08 2.02611012e-08 2.17203558e-08 2.32813500e-08
 2.49509291e-08 2.67363809e-08 2.86454635e-08 3.06864341e-08
 3.28680797e-08 3.51997497e-08 3.76913900e-08 4.03535801e-08
 4.31975705e-08 4.62353246e-08 4.94795608e-08 5.29437985e-08
 5.66424063e-08 6.05906524e-08 6.48047589e-08 6.93019581e-08
 7.41005529e-08 7.92199799e-08 8.46808763e-08 9.05051506e-08
 9.67160572e-08 1.03338275e-07 1.10397991e-07 1.17922986e-07
 1.25942729e-07 1.34488475e-07 1.43593366e-07 1.53292539e-07
 1.63623243e-07 1.74624957e-07 1.86339516e-07 1.98811248e-07
 2.12087111e-07 2.26216843e-07 2.41253117e-07 2.57251708e-07
 2.74271661e-07 2.92375476e-07 3.11629300e-07 3.32103125e-07
 3.53871003e-07 3.77011266e-07 4.01606762e-07 4.27745097e-07
 4.

Coupled Normal multiple distributions with batch vector size of 2

In [14]:
cn = CoupledNormal(loc=[0., 1.], scale=[1., 2.])
cn

<nsc.distributions.CoupledNormal batch_shape=[2] event_shape=[]>

In [15]:
print(cn.n_dim())
print(cn._normalized_term())
# print(cn.prob(X_input))

1
[2.50662827 5.01325655]


Coupled Normal multiple distributions with batch matrix size of 3x2

In [16]:
cn = CoupledNormal(loc=[[0., 1.], [0., 1.], [0., 1.]], scale=[[1., 2.], [1., 2.], [1., 2.]])
cn

<nsc.distributions.CoupledNormal batch_shape=[3, 2] event_shape=[]>

Multivariate Coupled Normal distribution

In [17]:
# batch_shape is [] while event_shape is [2]
cn = MultivariateCoupledNormal(loc=[0., 1.], scale=[1., 2.])
cn

<nsc.distributions.MultivariateCoupledNormal batch_shape=[] event_shape=[2]>

In [18]:
print(cn.n_dim())
print(cn._normalized_term())
# print(cn.prob(X_input))

2
None


Multivariate Coupled Normal multiple distribution (i.e., batch size of 3)

In [19]:
cn = MultivariateCoupledNormal(loc=[[0., 1.], [1., 2.], [2., 3.]], \
                               scale=[[0., 1.], [2., 3.], [4., 5.]]
                               )
cn

<nsc.distributions.MultivariateCoupledNormal batch_shape=[3] event_shape=[2]>

In [20]:
print(cn.n_dim())
print(cn._normalized_term())
# print(cn.prob(X_input))

2
None


In [21]:
cn = CoupledNormal(loc=[[0., 1., 0., 1.], [1., 2., 1., 2.], [2., 3., 2., 3.]], \
                   scale=[[0., 1., 0., 1.], [2., 3., 2., 3.], [4., 5., 4., 5.]]
                   )
cn.n_dim()

1

In [22]:
print(cn.n_dim())
print(cn._normalized_term())
# print(cn.prob(X_input))

1
[[ 0.          2.50662827  0.          2.50662827]
 [ 5.01325655  7.51988482  5.01325655  7.51988482]
 [10.0265131  12.53314137 10.0265131  12.53314137]]
