In [1]:
import scipy.stats as st
from scipy.stats import genextreme as gev
from scipy.stats import gumbel_l as gumbel
from scipy.stats import gamma as gamma
from scipy.stats import expon as exp
from scipy.stats import lognorm as logn
from scipy.stats import gumbel_r as gumbel_r
from scipy.stats import powerlaw as powerlaw

from statsmodels.distributions.empirical_distribution import ECDF


In [2]:
def AIC(data, distr, pars=None):
    """Clacular el criterio de información de Akaike
    
    Parámetros:
    -----------
    data:      array (n,). Datos asociados a la distribución
    distr:     string. Nombre de la distribución en scipy.stats
    pars:      array. Vector con los parámetros ajustados de la distribución 'distr'. Si es 'None', se ajustan los parámetros de la distribución
                      'distr' a las observaciones en 'data'
                      
    Salida:
    -------
    aic:       float. Criterio de información de Akaike
    """
    
    # definir la distribución en scipy.stats
    distr = getattr(st, distr)
    # parámetros ajustados
    if pars is None:
        fitted_pars = distr.fit(data)
    else:
        fitted_pars = pars
    # nº de parámetros
    k = len(fitted_pars)
    # log likelihood 
    logLik = np.sum(distr.logpdf(data, *fitted_pars))
    # Akaike information criterion
    aic = 2 * k - 2 * logLik
    
    if pars is None:
        return aic, fitted_pars
    else:
        return aic

In [3]:
# matplotlib.rcParams['figure.figsize'] = (16.0, 12.0)
# matplotlib.style.use('ggplot')

# Create models from data

def make_pdf(dist, params, size=10000):
    """Generate distributions's Probability Distribution Function """
    
    # Separate parts of parameters
    arg = params[:-2]
    loc = params[-2]
    scale = params[-1]

    # Get sane start and end points of distribution
    start = dist.ppf(0.01, *arg, loc=loc, scale=scale) if arg else dist.ppf(0.01, loc=loc, scale=scale)
    end = dist.ppf(0.99, *arg, loc=loc, scale=scale) if arg else dist.ppf(0.99, loc=loc, scale=scale)

    # Build PDF and turn into pandas Series
    x = np.linspace(start, end, size)
    y = dist.pdf(x, loc=loc, scale=scale, *arg)
    pdf = pd.Series(y, x)

    return pdf

In [4]:
def plot_fitted_distribution(data, distr, params, pos=True, bins=None, **kwargs):
    """Genera cuatro gráficos para verificar la calidad del ajuste de una función de distribución.
    
    Parámetros:
    -----------
    data:      array. Datos observados
    distr:     string. Nombre de la distribución en 'scipy.stats'
    params:    array. Parámetros ajustados para 'distr'
    pos:       boolean. Si se han de eliminar todos los valores no estrictamente positivos, es decir, 0 ó negativos
    bins:      int. Número de columnas en el histograma. Si es 'None', 'seaborn' calcula el número más apropiado
    """
    
    # asegurar que los datos son un 'array' de una dimensión
    if isinstance(data, pd.Series):
        data = data.values.flatten().astype(float)
    elif isinstance(data, np.ndarray):
        data = data.flatten().astype(float)
    # eliminar datos faltantes
    data = data[~np.isnan(data)]
    # eliminar datos nulos o negativos
    if pos is True:
        data = data[data > 0]
    data = pd.Series(data)
    
    # definir distribución en scipy.stats
    distr = getattr(st, distr)
    
    # definir la figura
    if 'figsize' in kwargs:
        figsize = kwargs['figsize']
    else:
        figsize = (12, 8)
    fig, ax = plt.subplots(2, 2, figsize=figsize)
    
    #-----------------------------------------------------------------------------------
    # pdf of the  distribution 
    pdf = make_pdf(distr, params)
    pdf.plot(lw=2, label='PDF', ax=ax[0,0])
    # histogram of the data 
    sns.distplot(data, bins=bins, kde=False, norm_hist=True, color='dimgray', label='Data', ax=ax[0,0])
    # config
    param_names = (distr.shapes + ', loc, scale').split(', ') if distr.shapes else ['loc', 'scale']
    param_str = ', '.join(['{}={:0.2f}'.format(k,v) for k,v in zip(param_names, params)])
    dist_str = '{}({})'.format(distr.name, param_str)
    ax[0,0].legend()
    ax[0,0].set_title(u'Best probability density function \n' + dist_str)
    ax[0,0].set_xlabel(u'Data')
    ax[0,0].set_ylabel('Frequency')
    #ax[0,0].set_ylim(dataYLim)
    
    #----------------------------------------------------------------------------------- 
    # Quantile-quantile plot
    PQmax_1 = distr(*params).cdf(data)
    Qemp = np.sort(data)
    kk = np.arange(1, len(data) + 1)
    prob = kk / (len(data))
    Qlogn  = distr(*params).ppf(prob)
    # scatter plot of empiricam vs fitted distribution 
    ax[0,1].plot(Qemp, Qlogn, '.k')
    ax[0,1].plot(np.arange(min(data), max(data)), np.arange(min(data), max(data)), '-b')
    # config
    ax[0,1].set_title('Q-Q Plot \n' + dist_str)
    ax[0,1].set_xlabel(u'Empirical')
    ax[0,1].set_ylabel('Model')
    #-----------------------------------------------------------------------------------
    # CDF of the fitted distribution
    x = np.linspace(min(data), max(data), 200)
    y = distr(*params).cdf(x)
    ax[1,0].plot(x, y)
    # scatter plot of the empirical distribution
    ecdf = ECDF(data)
    ax[1,0].plot(data, ecdf(data), '.k')
    # config
    ax[1,0].set_title('Best Cumulative density function \n' + dist_str)
    ax[1,0].set_xlabel(u'Data')
    ax[1,0].set_ylabel('Prob')
    #-----------------------------------------------------------------------------------    
    # frequency law
    x_r = np.arange(1,1000)
    prob_t = 1 - 1 / x_r
    y_r = distr(*params).ppf(prob_t)
    ax[1,1].plot(x_r, y_r, '-b')
    # observations
    t_r = 1 / (1 - ecdf(data))
    ax[1,1].plot(t_r, data, '.k')
    # config
    ax[1,1].set_xscale("log")
    ax[1,1].set_xlabel(u'Return Periods')
    ax[1,1].set_ylabel('Return Values')
    ax[1,1].set_title('Return Values Plot \n' + dist_str)
    
    fig.tight_layout()

In [5]:
def fitter(data, distributions=None, crit='SSE', pos=True, plot=True, **kwargs):
    """Encuentra la función de distribución con el mejor ajuste a los datos de entrada
    
    Parámetros:
    -----------
    data:           series o array. Datos a ajustar
    distributions:  list. Nombres en scipy.stats de las distribuciones a ajustar. Si es 'None', se ajustan todas.
    crit:           string. Criterio en función del cual seleccionar el mejor ajuste: 'SSE', suma de error cuadrático;  'AIC', criterio de información de Akaike
    pos:            boolean. Si se han de eliminar todos los valores no estrictamente positivos, es decir, 0 ó negativos
    plot:           boolean. Si se muestran gráficos con el ajuste de la mejor distribución
    
    Salidas:
    --------
    Como métodos de la función
    best_crit:      float. Valor del criterio de selección para la mejor distribución
    best_distr:     string. Nombre en scipy.stats de la mejor distribución
    best_params:    array. Parámetros ajustados para la mejor distribución
    data:           array. Datos utilizados para el ajuste
    fit:            data frame. Tabla con el SSE (suma del error cuadrático) y AIC (criterio de información de Akaike) para cada distribución ajustada
    get_params:     dictionary. Parámetros ajustados para cada distribución
    
    Si 'plot' es 'True', gráficos con el ajuste de la mejor distribución.
    """
    
    # asegurar que los datos son un 'array' de una dimensión
    if isinstance(data, pd.Series):
        data = data.values.flatten().astype(float)
    elif isinstance(data, np.ndarray):
        data = data.flatten().astype(float)
    # eliminar datos faltantes
    data = data[~np.isnan(data)]
    # eliminar datos nulos o negativos
    if pos is True:
        data = data[data > 0]
    
    # empirical distribution function
    ecdf = ECDF(data)
    cdf_e = ecdf(data)
    # Distributions to check
    if distributions is None:
        distrs = [st.alpha,st.anglit,st.arcsine,st.beta,st.betaprime,st.bradford,st.burr,st.cauchy,st.chi,st.chi2,st.cosine,
                  st.dgamma,st.dweibull,st.erlang,st.expon,st.exponnorm,st.exponweib,st.exponpow,st.f,st.fatiguelife,st.fisk,
                  st.foldcauchy,st.foldnorm,st.frechet_r,st.frechet_l,st.genlogistic,st.genpareto,st.gennorm,st.genexpon,
                  st.genextreme,st.gausshyper,st.gamma,st.gengamma,st.genhalflogistic,st.gilbrat,st.gompertz,st.gumbel_r,
                  st.gumbel_l,st.halfcauchy,st.halflogistic,st.halfnorm,st.halfgennorm,st.hypsecant,st.invgamma,st.invgauss,
                  st.invweibull,st.johnsonsb,st.johnsonsu,st.ksone,st.kstwobign,st.laplace,st.levy,st.levy_l,st.levy_stable,
                  st.logistic,st.loggamma,st.loglaplace,st.lognorm,st.lomax,st.maxwell,st.mielke,st.nakagami,st.ncx2,st.ncf,
                  st.nct,st.norm,st.pareto,st.pearson3,st.powerlaw,st.powerlognorm,st.powernorm,st.rdist,st.reciprocal,
                  st.rayleigh,st.rice,st.recipinvgauss,st.semicircular,st.t,st.triang,st.truncexpon,st.truncnorm,st.tukeylambda,
                  st.uniform,st.vonmises,st.vonmises_line,st.wald,st.weibull_min,st.weibull_max,st.wrapcauchy]
    else:
        distrs = [getattr(st, distribution) for distribution in distributions]

    # Estimate distribution parameters from data
    fit = pd.DataFrame(index=[distr.name for distr in distrs], columns=['SSE', 'AIC'])
    get_params = {}
    for distr in distrs:
        distName = distr.name
        # Try to fit the distribution
        try:
            # fit dist to data
            params = distr.fit(data)
            get_params[distName] = params

            # Calculate fitted PDF and error with fit in distribution
            cdf = distr(*params).cdf(data)
            
            # performance
            sse = np.sum((cdf_e - cdf)**2)
            aic = AIC(data, distr=distName, pars=params)
            fit.loc[distName,:] = [sse, aic]
        except:
            print('ERROR. No se pudo ajustar {0}'.format(distName))
            continue
    
    # asegurar que 'crit' está en mayúsculas
    crit = crit.upper() 
    # seleccionar distribución con mejor ajuste (menor valor de 'crit')
    fit.sort_values(by=crit, inplace=True)
    best_distr = fit.index[0]
    # parámetros y ajuste de la mejor distribución
    best_params = get_params[best_distr]
    best_crit = fit.loc[best_distr, crit]
    
    if plot:
        plot_fitted_distribution(data, best_distr, best_params, kwargs)
    
    fitter.best_crit = best_crit
    fitter.best_distr = best_distr
    fitter.best_params = best_params
    fitter.data = data
    fitter.fit = fit
    fitter.get_params = get_params