# Modified Code from public API:
https://fitter.readthedocs.io/en/latest/_modules/fitter/fitter.html

In [1]:
import sys
import math
import threading
from datetime import datetime

import scipy.stats
import numpy as np
import pylab
import pandas as pd
from scipy.stats import entropy as kl_div
from scipy import optimize

from scipy.optimize import minimize
import time

In [2]:
__all__ = ['get_common_distributions', 'get_distributions', 'Fitter']

def get_distributions():
    distributions = []
    for this in dir(scipy.stats):
        if "fit" in eval("dir(scipy.stats." + this + ")"):
            distributions.append(this)
    return distributions


def get_common_distributions():
    return ['gamma', 'expon', 'exponpow', 'lognorm']
        
def loglikelihood_neg(param, dist):
    loglike_neg = -np.sum(dist[0].logpdf(dist[1], *param))
    #print(loglike_neg)
    return loglike_neg

In [1]:
class Fitter(object):
    def __init__(self, data, xmin=None, xmax=None, bins=100,
                 distributions=None, verbose=True, timeout=5000,
                 density=True):
        self.timeout = timeout
        # USER input
        self._data = None

        # Issue https://github.com/cokelaer/fitter/issues/22 asked for setting
        # the density to False in the fitting and plotting. I first tought it
        # would be possible, but the fitting is performed using the PDF of scipy
        # so one would still need to normalise the data so that it is
        # comparable. Therefore I do not see anyway to do it without using
        # density set to True for now.
        self._density = True

        #: list of distributions to test
        self.distributions = distributions
        if self.distributions == None:
            self._load_all_distributions()
        elif self.distributions == "common":
            self.distributions = get_common_distributions()
        elif isinstance(distributions, str):
            self.distributions = [distributions]

        self.bins = bins
        self.verbose = verbose

        self._alldata = np.array(data)
        if xmin == None:
            self._xmin = self._alldata.min()
        else:
            self._xmin = xmin
        if xmax == None:
            self._xmax = self._alldata.max()
        else:
            self._xmax = xmax

        self._trim_data()
        self._update_data_pdf()

        # Other attributes
        self._init()
        
    def _init(self):
        self.fitted_param = {}
        self.fitted_pdf = {}
        self._fitted_errors = {}
        self._log_lik = {}
        self._max_lik = {}
        self._aic = {}
        self._bic = {}
        self._kldiv = {}
        
    def _update_data_pdf(self):
        # histogram retuns X with N+1 values. So, we rearrange the X output into only N
        self.y, self.x = np.histogram(
            self._data, bins=self.bins, density=self._density)
        self.x = [(this + self.x[i + 1]) / 2. for i,
                  this in enumerate(self.x[0:-1])]

    def _trim_data(self):
        self._data = self._alldata[np.logical_and(
            self._alldata >= self._xmin, self._alldata <= self._xmax)]

    def _get_xmin(self):
        return self._xmin

    def _set_xmin(self, value):
        if value == None:
            value = self._alldata.min()
        elif value < self._alldata.min():
            value = self._alldata.min()
        self._xmin = value
        self._trim_data()
        self._update_data_pdf()
        #xmin = property(_get_xmin, _set_xmin, doc="consider only data above xmin. reset if None")

    def _load_all_distributions(self):
        """Replace the :attr:`distributions` attribute with all scipy distributions"""
        self.distributions = get_distributions()
        
    def hist(self):
        """Draw normed histogram of the data using :attr:`bins`


        .. plot::

            >>> from scipy import stats
            >>> data = stats.gamma.rvs(2, loc=1.5, scale=2, size=20000)
            >>> # We then create the Fitter object
            >>> import fitter
            >>> fitter.Fitter(data).hist()
            , label = "test"
            range = (x.min(), x.max())
            color = None
        """
        _ = pylab.hist(self._data, bins=self.bins, density=self._density)
        pylab.grid(False)

    def fit(self, amp=1):
        r"""Loop over distributions and find best parameter to fit the data for each

        When a distribution is fitted onto the data, we populate a set of
        dataframes:

            - :attr:`df_errors`  :sum of the square errors between the data and the fitted
              distribution i.e., :math:`\sum_i \left( Y_i - pdf(X_i) \right)^2`
            - :attr:`fitted_param` : the parameters that best fit the data
            - :attr:`fitted_pdf` : the PDF generated with the parameters that best fit the data

        Indices of the dataframes contains the name of the distribution.

        """
        for distribution in self.distributions:
            try:
                # need a subprocess to check time it takes. If too long, skip it
                dist = eval("scipy.stats." + distribution)

                # TODO here, dist.fit may take a while or just hang forever
                # with some distributions. So, I thought to use signal module
                # to catch the error when signal takes too long. It did not work
                # presumably because another try/exception is inside the
                # fit function, so I used threading with a recipe from stackoverflow
                # See timed_run function above
                param = self._timed_run(
                    dist.fit, distribution, args=self._data)

                # with signal, does not work. maybe because another expection is caught
                # hoping the order returned by fit is the same as in pdf
                pdf_fitted = dist.pdf(self.x, *param)

                self.fitted_param[distribution] = param[:]
                self.fitted_pdf[distribution] = pdf_fitted

                # calculate error
                sq_error = pylab.sum((self.fitted_pdf[distribution] - self.y)**2)
                
                # calculate information criteria
                log_lik = -np.sum(dist.logpdf(self.x, *param))  # joint probabilit
                k = len(param[:])     # num parameters in model
                n = len(self._data)   # num examples in data set
                aic = 2 * k - 2 * log_lik
                # online: (2 * k - 2 * log_lik)/n
                bic = n * np.log(sq_error / n) + k * np.log(n)

                kullback_leibler = kl_div(self.fitted_pdf[distribution], self.y)         
            
                max_lik = minimize(loglikelihood_neg, x0=[1]*k, args=[dist, self.x], method='nelder-mead').fun
                #print('---------------------')
                
                if self.verbose:
                    print("Fitted {} distribution with error={})".format(
                          distribution, sq_error))

                # compute some errors now
                self._fitted_errors[distribution] = sq_error
                self._log_lik[distribution] = log_lik
                self._max_lik[distribution] = max_lik
                self._aic[distribution] = aic
                self._bic[distribution] = bic
                self._kldiv[distribution] = kullback_leibler
            except Exception as err:
                if self.verbose:
                    print("SKIPPED {} distribution (taking more than {} seconds)".format(distribution,
                                                                                         self.timeout))
                # if we cannot compute the error, set it to large values
                self._fitted_errors[distribution] = np.inf
                self._log_lik[distribution] = np.inf
                self._max_lik[distribution] = np.inf
                self._aic[distribution] = np.inf
                self._bic[distribution] = np.inf
                self._kldiv[distribution] = np.inf

            self.df_errors = pd.DataFrame({'sumsquare_error': self._fitted_errors,
                                       'log_lik': self._log_lik,
                                       'max_lik': self._max_lik,
                                       'aic': self._aic,
                                       'bic': self._bic,
                                       'kl_div': self._kldiv})

    def plot_pdf(self, seg_type = "", names=None, Nbest=5, lw=2, method="sumsquare_error"):
        """Plots Probability density functions of the distributions

        :param str,list names: names can be a single distribution name, or a list
            of distribution names, or kept as None, in which case, the first Nbest
            distribution will be taken (default to best 5)

        """
        assert Nbest > 0
        if Nbest > len(self.distributions):
            Nbest = len(self.distributions)

        if isinstance(names, list):
            for name in names:
                pylab.plot(self.x, self.fitted_pdf[name], lw=lw, label=name)
        elif names:
            pylab.plot(self.x, self.fitted_pdf[names], lw=lw, label=names)
        else:
            try:
                names = self.df_errors.sort_values(by=method).index[0:Nbest]
            except Exception:
                names = self.df_errors.sort(method).index[0:Nbest]
            for name in names:
                if name in self.fitted_pdf.keys():
                    #if math.isinf(self.fitted_pdf[name]) or math.isinf(-1*self.fitted_pdf[name]):
                    pylab.plot(self.x, self.fitted_pdf[name], lw=lw, label=name)
                else:
                    print("%s was not fitted. no parameters available" % name)
                    
        title_map = {"sumsquare_error": "Sum Squared Error", "log_lik": "Log Liklihood", 
                     "aic": "Akaike Information Criterion", "bic": "Bayesian Information Criterion", 
                    "kl_div": "Kullback-Leibler Divergence", "max_lik": "Maximum Likelihood"}
        
        pylab.xlabel(seg_type + " Segment Length")
        pylab.ylabel('Frequency')
        pylab.title(title_map[method])
        pylab.grid(False)
        pylab.legend()
        
    def get_best(self, method='sumsquare_error'):
        """Return best fitted distribution and its parameters

        a dictionary with one key (the distribution name) and its parameters

        """
        # self.df should be sorted, so then us take the first one as the best
        name = self.df_errors.sort_values(method).iloc[0].name
        params = self.fitted_param[name]
        return {name: params}
    
    def summary(self, Nbest=5, seg_type = "", names = None, lw=2, plot=True, method="sumsquare_error"):
        """Plots the distribution of the data and Nbest distribution

        """
        if plot:
            pylab.clf()
            self.hist()
            self.plot_pdf(Nbest=Nbest, seg_type = seg_type, names = names, lw=lw, method=method)
            pylab.grid(False)

        Nbest = min(Nbest, len(self.distributions))
        try:
            names = self.df_errors.sort_values(by=method).index[0:Nbest]
        except:
            names = self.df_errors.sort(method).index[0:Nbest]
        return self.df_errors.loc[names]

    def _timed_run(self, func, distribution, args=(), kwargs={},  default=None):
        """This function will spawn a thread and run the given function
        using the args, kwargs and return the given default value if the
        timeout is exceeded.

        http://stackoverflow.com/questions/492519/timeout-on-a-python-function-call
        """
        class InterruptableThread(threading.Thread):
            def __init__(self):
                threading.Thread.__init__(self)
                self.result = default
                self.exc_info = (None, None, None)

            def run(self):
                try:
                    self.result = func(args, **kwargs)
                except Exception as err:
                    self.exc_info = sys.exc_info()

            def suicide(self):
                raise RuntimeError('Stop has been called')

        it = InterruptableThread()
        it.start()
        started_at = datetime.now()
        it.join(self.timeout)
        ended_at = datetime.now()
        diff = ended_at - started_at

        if it.exc_info[0] is not None:  # if there were any exceptions
            a, b, c = it.exc_info
            raise Exception(a, b, c)  # communicate that to caller

        if it.isAlive():
            it.suicide()
            raise RuntimeError
        else:
            return it.result