In [2]:
import pandas as pd 
import numpy as np 
import random
import plotly.express as px
from math import comb, po
from sys import stdout

In [3]:
'''
Modelo de Ehrenfest

La cadena de ehrenfest fue desarrollado para modelar el intercambio de moleculas entre dos contenedores. 

En cada tiempo una molecula es seleccionada de manera aleatoria y se traspasa al otro contenedor.


Espacio de estados: {0, 1, ..., n}

'''


def move_ball():
    ''' 
    Mediante esta función calculamos el estado de nuestro modelo de ehrenfast en el siguiente tiempo.
    global:
        state
    '''
    global state
    threshold = state/n
    if random.random()>threshold:
        state += 1
    else:
        state -= 1

def zero_fill(distribution: dict):
    ''' Llena los valores faltantes con 0's'''
    for i in range(11):
        if not i in distribution:
            distribution[i] = 0
    return distribution

def value_convergence(distributions: dict):
    ''' Regresa un diccionario con la probabilidad simulada de cada estado.'''
    evolutions = {}
    for i in range(11):
        evolutions[i] = []
    for distribution in distributions:
        for i in distribution:
            evolutions[i].append(distribution[i])
    return evolutions
            
def plot_convergence(distributions:pd.Series):
    ''' Genera un plot con la convergencia de probabilidad de cada estado.'''
    if not isinstance(distributions, pd.Series):
        distributions = pd.Series(distributions)
    evolutions = value_convergence(distributions)
    px.line(evolutions)
    






### A continuación comparamos la distribución simulada de una cadena Ehrenfast y la comparamos con su distribución teoríca.

### Sabemos que la distribución estacionaria del modelo de ehrenfest con $n$ bolas es
### $f(x) = {n \choose x}(\frac{1}{2})^n, x \in S$

In [43]:
def theoric_distribution(n:int):
    ''' Calculamos la distribución estacionaria de una cadena de Ehrenfest de n bolas.'''
    idx = []
    probabilities = []
    for i in range(n + 1):
        idx.append(i)
        probabilities.append(comb(n, i)*pow(1/2, n))
    return pd.Series(index = idx, data = probabilities)

def simulation_error(theoric:pd.Series, simulated_distribution:pd.Series):
    ''' Calcula los errores en entre una simulación y la distribución teorica de una cadena de Ehrenfast'''
    global distributions
    if not isinstance(simulated_distribution, pd.Series):
        simulated_distribution = pd.Series(simulated_distribution)
    return abs(theoric - simulated_distribution)

    
def calculate_errors(theoric_distribution: pd.Series, distributions:list, metric:str = 'mae'):
    ''' Calcula el error medio de una simulación en cada iteración.'''
    if not metric.lower() in ['mae','mse']:
        raise ValueError(f'metric "{metric}" is not a valid metric.')
    errors = []
    for distribution in distributions:
        if metric == 'mae':
            errors.append(simulation_error(theoric_distribution, distribution).mean())
        else:
            errors.append((simulation_error(theoric_distribution, distribution)**2).mean())
    return pd.Series(errors)

def plot_errors(errors:pd.Series, metric:str):
    ''' Genera un plot de errores medios'''
    if not isinstance(errors, pd.Series):
        errors = pd.Series(errors)
    errors = errors[errors.index%100 == 0]
    return px.line(errors, title = f'{metric} error', labels = {'value':metric , 'index':'iterations'})


In [131]:

### Definimos parametros de experimentos:
states = []     # lista para almacenar resultados
n_experimentos = int(1e5)   # num. de experimentos
n = 10      # numero de bolas (estados posibles)
estado_inicial = random.randint(0, n)      # estado inicial
state = estado_inicial


###
# 1. Realizamos experimentos con parametros indicados.
states = np.zeros(n_experimentos + 1, dtype = int)
states[0] = estado_inicial 
distributions = []

for i in range(n_experimentos):
    if i%500 == 0:
        stdout.write(f'{(i/n_experimentos)*100:.1f}%')
        stdout.flush()
        if i!=0:
            stdout.write('\r')
    move_ball()
    states[i+1] = state
    
    vals, counts = np.unique(states[0:i+1], return_counts = True)
    counts = counts/counts.sum()
    
    t_distribution = {}
    for j in range(len(counts)):
        t_distribution[vals[j]] = counts[j]
    t_distribution = zero_fill(t_distribution)
    distributions.append(t_distribution)

# 2. Generamos histogramas y distribución teorica.
dists = {}
dists['Distribución estacionaria'] = theoric_distribution(n)
dists['Simulación'] = distributions[-1]

fig = px.line(dists)


# 3. Generar plot de convergencia de distintos valores.
plot_convergence(distributions)

# 4. Generar plot de errores.
plot_errors(calculate_errors(theoric_distribution(n), distributions, metric='mae'), metric = 'mae')
plot_errors(calculate_errors(theoric_distribution(n), distributions, metric='mse'), metric = 'mse')

# 5. Generar





In [128]:
plot_errors

In [129]:
fig

In [35]:
### __main__
# 4. Generar plots de errores.

0     0.001060
1     0.010420
2     0.045990
3     0.118559
4     0.202978
5     0.243158
6     0.204918
7     0.118339
8     0.044150
9     0.009530
10    0.000900
dtype: float64

array([5, 4, 5, ..., 7, 6, 5])

In [37]:
counts

0     0.001060
1     0.010420
2     0.045990
3     0.118559
4     0.202978
5     0.243158
6     0.204918
7     0.118339
8     0.044150
9     0.009530
10    0.000900
dtype: float64

0        0.137074
1000     0.005201
2000     0.005057
3000     0.005673
4000     0.006027
           ...   
95000    0.001242
96000    0.001173
97000    0.001151
98000    0.001109
99000    0.001045
Length: 100, dtype: float64