Probability Density Functions
========

 - Split normal (double-sided Gaussian)
 - 

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
plt.rcParams.update({'font.size': 15})

from datetime import datetime

In [None]:
from scipy.optimize import curve_fit

Basics: Fitting a distribution to a dataset (e.g. fitting data with a Gaussian)
--------

In [None]:
import numpy as np
from scipy import stats
import matplotlib.pyplot as plt  # matplotlib must be installed to plot

rng = np.random.default_rng()
dist = stats.nbinom
shapes = (5, 0.5)
data = dist.rvs(*shapes, size=1000, random_state=rng)

In [None]:
bounds = [(0, 30), (0, 1)]
res = stats.fit(dist, data, bounds)

In [None]:
res.params

In [None]:
fig,ax = plt.subplots(1,1,figsize=(8,6))
ax.hist(data,bins=np.linspace(-0.5,19.5,21),histtype = 'step',fill=None,linewidth=2,density=2)
rv = stats.nbinom(res.params.n, res.params.p)
xx = np.arange(20)
ax.plot(xx, stats.nbinom.pmf(xx, res.params.n, res.params.p), 'bo', ms=8, label='nbinom pmf')

Basics: Fitting a distribution to a dataset (e.g. fitting data with a Gaussian)
--------

In [None]:
rng = np.random.default_rng()
dist = stats.norm
shapes = (0, 1)
data = dist.rvs(*shapes, size=20, random_state=rng)

In [None]:
res = stats.fit(dist, data, [(-5, 5), (0, 10)])

In [None]:
res.params

In [None]:
print(sum(data)/len(data),np.std(data))

In [None]:
fig,ax = plt.subplots(1,1,figsize=(8,6))
ax.hist(data, density=True, histtype='stepfilled', alpha=0.2,bins=np.linspace(-4,4,101),label='data');
xx = np.linspace(dist.ppf(0.001),dist.ppf(0.999),100)
#ax.plot(xx, dist.pdf(xx, res.params.loc, res.params.scale), label='fitted Gaus')
ax.plot(xx, dist.pdf(xx, 0, 1), label='Gaus(0,1)')
ax.legend()

Basics: Fitting a distribution to a dataset (e.g. fitting data with a Lognormal)
--------

In [None]:
rng = np.random.default_rng()
dist = stats.lognorm
shapes = (0.25, 1, 1) # s (width 1), loc( 0), second width (1)
data = np.maximum(stats.norm.rvs(100,10, size=20, random_state=rng),0)

In [None]:
res = stats.fit(dist, data, [(0, 5), (0, 200),(0,200)])

In [None]:
res.params

In [None]:
fig,ax = plt.subplots(1,1,figsize=(8,6))
ax.hist(data, density=True, histtype='stepfilled', alpha=0.2,
        bins=np.linspace(-sorted(data)[0]*0.9,sorted(data)[-1]*1.1,101),label='data');

xx = np.linspace(dist.ppf(0.001, *res.params),dist.ppf(0.999, *res.params),100)
ax.plot(xx, dist.pdf(xx, res.params.s, res.params.loc, res.params.scale), label='fitted logn')
#ax.plot(xx, dist.pdf(xx, *shapes), label='logn')
ax.legend()

Parallelize a fit - python multiprocessing (does not seem to work)
--------

In [None]:
#datasets = np.maximum(stats.norm.rvs(100,10, size=(3,20), random_state=rng),0)
#datasets

In [None]:
#%%timeit
#
#res = []
#for data in datasets :
#    res.append(stats.fit(stats.lognorm, data, [(0, 5), (0, 200),(0,200)]))

In [None]:
#from multiprocessing import Pool
#import multiprocessing
#
#from multiprocessing import set_start_method
#set_start_method("spawn")

In [None]:
#def f(x):
#    return x*x
#
#with Pool(5) as p:
#    print(p.map(f, [1, 2, 3]))

In [None]:
#def double(a):
#    return a * 2
#
#def driver_func():
#    PROCESSES = 4
#    with multiprocessing.Pool(PROCESSES) as pool:
#        params = [(1, ), (2, ), (3, ), (4, )]
#        results = [pool.apply_async(double, p) for p in params]
#
#        for r in results:
#            print('\t', r.get())

In [None]:
#driver_func()

In [None]:
#import multiprocessing as mp
#print("Number of processors: ", mp.cpu_count())

Split-normal (two-piece normal) (double-sided Gaussian) distribution
-----------

**Version 1: has two parameters for width of each side (not ideal)**

In [None]:
from scipy.stats import rv_continuous
import scipy.special as sc
from scipy.stats._distn_infrastructure import _ShapeInfo

In [None]:
class split_normal_gen(rv_continuous):
    r"""A split normal distribution.
    """
    #def _argcheck(self, *Params):
    #    return True
    
    def _get_support(self, a, b):
        # The lower and upper boundaries of x
        return -float('inf'), float('inf')

    def _shape_info(self):
        # name, integrality=False, domain=(-np.inf, np.inf), inclusive=(True, True)
        # Integrality: whether it is an integer or not.
        return [_ShapeInfo("a", False, (0, np.inf), (False, False))]

    def _pdf(self, x, a, b):
        
        C = np.sqrt(2/np.pi) * ( 1/(a + b) )        
        sigma  = (x < 0) * a
        sigma += (x >= 0) * b
        return C * np.exp( -x**2/(2.0*sigma**2) )

    def _cdf(self, x, a, b):
        # Reuse the cdf defined for the Normal distribution
        
        afrac = a/(a+b)
        bfrac = b/(a+b)
        
        ret  = (x < 0) * (afrac*2.0) * sc.ndtr( x/a)
        ret += (x > 0) * (1 - (bfrac*2.0) * sc.ndtr(-x/b))

        return ret

    def _ppf(self, q, a, b):
        # Reuse the ppf defined for the Normal distribution

        afrac = a/(a+b)
        bfrac = b/(a+b)
        ret  = (q < afrac) *   a  * sc.ndtri( np.minimum(   q /(2.0*afrac), 0.5) )
        ret += (q > afrac) * (-b) * sc.ndtri( np.minimum((1-q)/(2.0*bfrac), 0.5) )

        return ret

    #def _stats(self):
    #    return np.inf, np.inf, np.nan, np.nan

split_normal = split_normal_gen(a = 1, b = 1, name="split_normal")

In [None]:
fig,axes = plt.subplots(1,2,figsize=(16,6))
axes = axes.flatten()

a = 2
b = 1
loc=5
rng = np.random.default_rng()
data = split_normal.rvs(a=a,b=b,loc=loc, size=500000, random_state=rng)

xhist = np.linspace(sorted(data)[0],sorted(data)[-1],100)
xx = np.linspace(sorted(data)[0],sorted(data)[-1],1000)
x_ppf = np.linspace(0.001,0.999,1000)

ax = axes[0]
ax.hist(data, density=True, histtype='stepfilled', alpha=0.2,bins=xhist,label='data');
ax.plot(xx, split_normal.pdf(xx,a,b,loc=loc), label='pdf')

ax = axes[1]
ax.plot(x_ppf, split_normal.ppf(x_ppf,a,b,loc=loc), label='ppf (inverse of cdf)')
ax.legend()

ax = axes[0].twinx()
ax.plot(xx, split_normal.cdf(xx,a,b,loc=loc), label='cdf')
ax.plot(xx, split_normal.sf(xx,a,b,loc=loc), label='sf',color='red')
ax.set_ylim(bottom=0)
ax.legend()
axes[0].legend(loc=2)

In [None]:
a = 1
b = 4
loc=4
rng = np.random.default_rng()
data = split_normal.rvs(a=a,b=b,loc=loc, size=20, random_state=rng)

res = split_normal.fit(data,fscale=1)
print(res)

buffer = 0.2*(sorted(data)[-1] - sorted(data)[0])
xlow = sorted(data)[0] - buffer
xhigh = sorted(data)[-1] + buffer

xhist = np.linspace(xlow,xhigh,100)
xx = np.linspace(xlow,xhigh,1000)

fig,ax = plt.subplots(1,1,figsize=(8,6))
ax.hist(data, density=True, histtype='stepfilled', alpha=0.2,bins=xhist,label='data');
ax.plot(xx, split_normal.pdf(xx,a=res[0],b=res[1],loc=res[2]), label='ppf (inverse of cdf)')

Split-normal with relative sigmas
--------

**Version 2: defines a single variable that describes the scaling of the second width compared to the first width (more ideal, correct number of variables.)**

In [None]:
class split_norm_gen(rv_continuous):
    r"""A split normal distribution.
    """
    #def _argcheck(self, *Params):
    #    return True
    
    def _get_support(self, a):
        # The lower and upper boundaries of x
        return -float('inf'), float('inf')

    def _shape_info(self):
        # name, integrality=False, domain=(-np.inf, np.inf), inclusive=(True, True)
        # Integrality: whether it is an integer or not.
        return [_ShapeInfo("a", False, (0, np.inf), (False, False))]

    def _pdf(self, x, a):

        C = np.sqrt(2/np.pi) * ( 1/(a + 1.0) )        
        sigma  = (x < 0) * a
        sigma += (x >= 0) * 1.0
        return C * np.exp( -x**2/(2.0*sigma**2) )

    def _cdf(self, x, a):
        # Reuse the cdf defined for the Normal distribution
        
        afrac =   a/(a+1.0)
        bfrac = 1.0/(a+1.0)
        
        ret  = (x < 0) *        (afrac*2.0) * sc.ndtr( x/a  )
        ret += (x > 0) * (1.0 - (bfrac*2.0) * sc.ndtr(-x/1.0))

        return ret

    def _ppf(self, q, a):
        # Reuse the ppf defined for the Normal distribution

        afrac =   a/(a+1.0)
        bfrac = 1.0/(a+1.0)
        ret  = (q < afrac) *     a  * sc.ndtri( np.minimum(   q /(2.0*afrac), 0.5) )
        ret += (q > afrac) * (-1.0) * sc.ndtri( np.minimum((1-q)/(2.0*bfrac), 0.5) )

        return ret

    #def _stats(self):
    #    return np.inf, np.inf, np.nan, np.nan

split_norm = split_norm_gen(a = 1, name="split_norm")

In [None]:
fig,axes = plt.subplots(1,2,figsize=(16,6))
axes = axes.flatten()

a = 0.5
loc=5.0
rng = np.random.default_rng()
data = split_norm.rvs(a=a,loc=loc, size=500000, random_state=rng)

xhist = np.linspace(sorted(data)[0],sorted(data)[-1],100)
xx = np.linspace(sorted(data)[0],sorted(data)[-1],1000)
x_ppf = np.linspace(0.001,0.999,1000)

ax = axes[0]
ax.hist(data, density=True, histtype='stepfilled', alpha=0.2,bins=xhist,label='data');
ax.plot(xx, split_norm.pdf(xx,a,loc=loc), label='pdf')

ax = axes[1]
ax.plot(x_ppf, split_norm.ppf(x_ppf,a,loc=loc), label='ppf (inverse of cdf)')
ax.legend()

ax = axes[0].twinx()
ax.plot(xx, split_norm.cdf(xx,a,loc=loc), label='cdf')
ax.plot(xx, split_norm.sf(xx,a,loc=loc), label='sf',color='red')
ax.set_ylim(bottom=0)
ax.legend()
axes[0].legend(loc=2)

In [None]:
a = 4.0
loc=-5.0
rng = np.random.default_rng()
data = split_norm.rvs(a=a,loc=loc, size=20, random_state=rng)

res = split_norm.fit(data)
print(res)

buffer = 0.2*(sorted(data)[-1] - sorted(data)[0])
xlow = sorted(data)[0] - buffer
xhigh = sorted(data)[-1] + buffer

xhist = np.linspace(xlow,xhigh,100)
xx = np.linspace(xlow,xhigh,1000)

fig,ax = plt.subplots(1,1,figsize=(8,6))
ax.hist(data, density=True, histtype='stepfilled', alpha=0.2,bins=xhist,label='data');
ax.plot(xx, split_norm.pdf(xx,a=res[0],loc=res[1],scale=res[2]), label='fit (no bounds)')

bounds=[(0.2,5.0),(-100,100),(0,100)]
res = stats.fit(split_norm, data,bounds=bounds)
print(res)
ax.plot(xx, split_norm.pdf(xx,a=res.params.a,loc=res.params.loc,scale=res.params.scale), label='fit (bounds)')
ax.legend();

Parallelize it!
-----

In [None]:
import dask.dataframe as dd
#from dask.multiprocessing import get

In [None]:
data = []
for i in range(200) :
    a = rng.uniform(2,10)
    loc= rng.uniform(-20,50)
    if i < 5 :
        print(a,loc)
    data.append(split_norm.rvs(a=a,loc=loc, size=20, random_state=rng))

data = pd.DataFrame(data=data)

In [None]:
startTime = datetime.now()

for i,row in data.iterrows() :
    bounds=[(0.2,5.0),(-100,100),(0,100)] # a, loc, scale
    res = stats.fit(split_norm,row.tolist(),bounds=bounds)
    if not i % 100 :
        print(i)
    #print(res)

elapsed_time = str(datetime.now()-startTime).split('.')[0]
print(elapsed_time)

In [None]:
def fit_rowdata_splitnorm(row) :
    #print(row)
    srow = sorted(row.tolist())
    dist = srow[-1] - srow[0]

    # For some reason Dask processes the metadata rows as well, which are filled with ones.
    # Basically skip these - they will disappear anyway in the output.
    if dist == 0 :
        return 0.0,0.0,0.0

    bounds=[(0.2,5.0),(srow[0],srow[-1]),(0,dist)] # a, loc, scale
    res = stats.fit(split_norm, row, bounds)
    return res.params.a,res.params.loc,res.params.scale

In [None]:
startTime = datetime.now()

ddata = dd.from_pandas(data, npartitions=20)

# Too many problems
#res = ddata.map_partitions(lambda df: df.apply((lambda row: fit_rowdata_splitnorm(row)),
#                                               axis=1)).compute(scheduler='single-threaded')# 'processes'

meta = meta={0: float, 1: float, 2: float}
# single-threaded, processes, threads ...
res = ddata.apply(fit_rowdata_splitnorm, axis=1,
                  result_type='expand', meta=meta).compute(scheduler='processes')
res.columns = ['a', 'loc','scale']

elapsed_time = str(datetime.now()-startTime).split('.')[0]
print(elapsed_time)

In [None]:
res

Double-Gaussian distribution
---------

In [None]:
from scipy.stats._continuous_distns import _norm_pdf
from scipy.stats import norm

In [None]:
class double_gaussian_gen(rv_continuous):
    r"""A double-Gaussian distribution.
    loc2 and scale2 are defined *relative to loc and scale!*
    """
    #def _argcheck(self, *Params):
    #    return True
    
    def _get_support(self,loc2,scale2,frac):
        # The lower and upper boundaries of x
        return -float('inf'), float('inf')

    def _shape_info(self):
        # name, integrality=False, domain=(-np.inf, np.inf), inclusive=(True, True)
        # Integrality: whether it is an integer or not.
        return [_ShapeInfo("loc2", False, (0, np.inf), (True, True)), # loc2 (affected by scale)
                _ShapeInfo("scale2", False, (0, np.inf), (False, False)), # scale2, also affected by scale
                _ShapeInfo("frac", False, (0, 1), (False, False))] # frac

    def _pdf(self, x, loc2, scale2, frac):
        
        # a is "loc2", b is "scale2"
        return ((1-frac)*_norm_pdf(x) + (frac)*_norm_pdf((x-loc2)/scale2)) * (1/((1-frac)+frac*scale2))

double_gaussian = double_gaussian_gen(name="double_gaussian")

In [None]:
fig,axes = plt.subplots(1,2,figsize=(16,6))
axes = axes.flatten()

loc2 = 5
loc=5.0
scale=2
scale2=0.5
frac=0.5
rng = np.random.default_rng()
data = double_gaussian.rvs(loc2=loc2,scale2=scale2,frac=frac,scale=scale,loc=loc,size=20, random_state=rng) # a=1,b=2,c=0.5
sorted_data = sorted(data[:500])


xhist = np.linspace(sorted_data[0],sorted_data[-1],100)
xx = np.linspace(sorted_data[0],sorted_data[-1],1000)
#xx = np.linspace(-10,40,1000)
x_ppf = np.linspace(0.001,0.999,1000)

ax = axes[0]
ax.hist(data, density=True, histtype='stepfilled', alpha=0.2,bins=xhist,label='data');
ax.plot(xx, double_gaussian.pdf(xx,loc2=loc2,scale2=scale2,frac=frac,scale=scale,loc=loc), label='pdf')
ax.plot(xx, norm.pdf(xx,scale=scale,loc=loc), label='pdf')
ax.set_ylim(bottom=0)

ax = axes[1]
ax.plot(x_ppf, double_gaussian.ppf(x_ppf,loc2=loc2,scale2=scale2,frac=frac,scale=scale,loc=loc), label='ppf (inverse of cdf)')
ax.legend()

ax = axes[0].twinx()
ax.plot(xx, double_gaussian.cdf(xx,loc2=loc2,scale2=scale2,frac=frac,scale=scale,loc=loc), label='cdf',color='purple')
ax.plot(xx, double_gaussian.sf(xx,loc2=loc2,scale2=scale2,frac=frac,scale=scale,loc=loc), label='sf',color='red')
ax.set_ylim(bottom=0)
ax.legend()
axes[0].legend(loc=2)

Fit with double-gaussian
---------

In [None]:
loc2 = 5.0
loc=0.0
frac=0.5
scale=1.0
scale2=1.0
rng = np.random.default_rng()
data = double_gaussian.rvs(loc2=loc2,scale2=scale2,frac=frac,scale=scale,loc=loc,size=50, random_state=rng) # a=1,b=2,c=0.5

#res = double_gaussian.fit(data)
bounds=[(0,100),(0,5),(0,1),(-10,10),(0,2)] # a (loc2), b (scale2), c (frac), loc, scale
res = stats.fit(double_gaussian, data,bounds=bounds)
print(res)

buffer = 0.2*(sorted(data)[-1] - sorted(data)[0])
xlow = sorted(data)[0] - buffer
xhigh = sorted(data)[-1] + buffer

xhist = np.linspace(xlow,xhigh,100)
xx = np.linspace(xlow,xhigh,1000)

fig,ax = plt.subplots(1,1,figsize=(8,6))
ax.hist(data, density=True, histtype='stepfilled', alpha=0.2,bins=xhist,label='data');
ax.plot(xx, double_gaussian.pdf(xx,loc2=res.params.loc2,scale2=res.params.scale2,frac=res.params.frac,
                                loc=res.params.loc,scale=res.params.scale), label='fit (no bounds)')

#bounds=[(0.2,5.0),(-100,100),(0,100)]
#res = stats.fit(split_norm, data,bounds=bounds)
#print(res)
#ax.plot(xx, split_norm.pdf(xx,a=res.params.a,loc=res.params.loc,scale=res.params.scale), label='fit (bounds)')
#ax.legend();