In [1]:
import numpy as np 
import random
import matplotlib.pyplot as plt
import scipy.stats as st 
from scipy.stats import norm
from scipy.stats import burr12
from scipy.fft import fft, ifft
import scipy.linalg as linalg
from scipy.optimize import minimize
import collections 
import time
from itertools import chain
import timeit
import os
import pickle

np.random.seed(42)

# Part 0: definition of the kernels and some functions

In [70]:
def kernel_k1(x, y):
    return 1 if x > 0 and y > 0 else 0

def kernel_k2(x, y):
    return x * y

def kernel_k3(x, y):
    return (x + y) if x > 0 and y > 0 else 0

def kernel_k4(x, y):
    return 1/4*(x**(1/3) + y**(1/3))**3 if x > 0 and y > 0 else 0

def kernel_ka05(x,y):
    return (x*y)**0.5

def kernel_ka2(x,y):
    return (x*y)**0.8

def kernel_ka3(x,y):
    return (x*y)**0.9
    
kernel_functions = {
    'kernel_k1': kernel_k1,
    'kernel_k2': kernel_k2,
    'kernel_k3': kernel_k3,
    'kernel_k4': kernel_k4,
    'kernel_ka05': kernel_ka1,
    'kernel_ka2': kernel_ka2,
    'kernel_ka3': kernel_ka3,
}

In [3]:
def kernel_sum(particles, kernel):
    '''
    old inefficient function to compute K(x,y) the entries of our kernels and 
    the denominator to use in the index distribution
    '''
    start = timeit.default_timer()
    n = len(particles)
    weights = np.zeros((n, n))
    
    kernel = kernel_functions[kernel]
    
    for i in range(n):
        for j in range(n):
            if i != j:
                weights[i, j] = kernel(particles[i], particles[j])
                
    total_weight = np.sum(weights)
    stop = timeit.default_timer()
    return weights, total_weight

In [4]:

def kernel_sum_triangular(particles, kernel):
    '''
    more optimal function to compute K(x,y) the entries of our kernels and 
    the denominator to use in the index distribution
    '''
    
    start = timeit.default_timer()
    n = len(particles)
    weights = np.zeros((n, n))
    
    kernel = kernel_functions[kernel]
    
    for i in range(n):
        for j in range(i+1, n):
                weights[i, j] = kernel(particles[i], particles[j])
    
    weights += np.triu(weights, 1).T  
    
    total_weight = np.sum(weights)
    stop = timeit.default_timer()
    #print("Tri",stop - start)
    return weights, total_weight

In [5]:

def calculate_P_index(particles, kernel):
    '''
    function that calculates the index distribution matrix
    '''
    
    weights, total_weight = kernel_sum_triangular(particles, kernel)
    
    P_index = weights / total_weight

    return P_index

In [6]:
def majorant_kernel(particles,kernel):
    '''
    function to calculate the majorant kernel as the maximum value of the collision kernel
    '''
    kernel_func = kernel_functions[kernel]
    weights, total_weight = kernel_sum_triangular(particles, kernel)
    return np.argmax(weights)

In [7]:
def AR_sampling(particles, kernel, recursion_counter=0):
    '''
    Acceptance-Rejection algorithm sampling function, 
    as majorant kernel we choose the maximum value of the kernel
    '''
    start = timeit.default_timer()

    kernel_func = kernel_functions[kernel]
    Max = majorant_kernel(particles, kernel)

    U = random.uniform(0, 1)
    row_index = random.randint(0, len(particles) - 1)
    col_index = random.randint(0, len(particles) - 1)
    while col_index == row_index:
        col_index = random.randint(0, len(particles) - 1)
        row_index = random.randint(0, len(particles) - 1)

    if U <= kernel_func(row_index, col_index) / Max:
        i, j = row_index, col_index

        stop = timeit.default_timer()
        print("AR time:", stop - start, "iterations:", recursion_counter)

        return i, j

    else:
        recursion_counter += 1
        return AR_sampling(particles, kernel, recursion_counter)


In [9]:
def inverse_sampling(particles, kernel):
    '''
    function to sample from the index distribution in a simple way: 
    '''
    start = timeit.default_timer()
    
    U = random.uniform(0, 1)
    
    P_idx = calculate_P_index(particles, kernel)
    
    flat_index = np.where(np.cumsum(P_idx) >= U)[0][0]
    
    #get the 2D index from the flat one
    i, j = np.unravel_index(flat_index, P_idx.shape)
    
    stop = timeit.default_timer() 
    return i,j

In [11]:
def transition_matrix(i,j, N):
    '''
    function to compute the transition matrix for index i,j 
    for the Markov process described in (3) from the project description
    '''
    M = np.eye(N)
    M[i][j]=1
    M[j][j]=0
    
    return M
    

# Part 1: Simulate the coagulation process

To simulate the dynamics we start by generating a starting population. We wait an exponentially distributed time step computed from the kernel, we sample the collision indices from the index distribution, find the new state of the system with the help of the transition matrix. 

In [12]:
def simulate_coagulation(N, T, X0, kernel):
    '''
    To simulate the dynamics we start by generating a starting population.
    We wait an exponentially distributed time step computed from the kernel,
    we sample the collision indices from the index distribution, 
    find the new state of the system with the help of the transition matrix.
    '''
    
    kernel_func = kernel_functions[kernel] 
    
    current_time = 0
    jump_count = 0

    Xt = [X0]
    t = [current_time]
    
    while current_time < T:
        
        # Calculate lambda 
        start1 = timeit.default_timer()
        weights, total_weight = kernel_sum_triangular(X0, kernel)
        stop1 = timeit.default_timer()
        
        Lambda = total_weight/(2*N)
        
        # Generate waiting time Sn
        Sn= st.expon.rvs(scale=1/Lambda)
        
        # Increment current time by the waiting time
        current_time += Sn 
        jump_count +=1
        
        #sample index for collision
        start2 = timeit.default_timer()
        #i,j = AR_sampling(X0, kernel = kernel)
        i,j = inverse_sampling(X0, kernel=kernel)
        stop2 = timeit.default_timer()
        
        #find new vector of sizes based on transition matrix
        M = transition_matrix(i,j,N)
        X0 = M.dot(X0)
        Xt.append(X0.copy()) 
        t.append(current_time)
        
        #end if we reached the maximum jump number
        if jump_count == N-1:
            break
            
   
    return Xt,t

# Part 2 & 3: Monte Carlo simulation of concentration, moment, and gelation 

## First we define the functions and the main monte carlo algorithm

In [34]:
# define the 3 average parameters to be studied: average concentration, moment 1.5, gelation G

def average_conc(X,k, N):
    counts = []
    
    for arr in X:
        count = np.count_nonzero(arr == k)
        counts.append(count/N)
        
    return counts

def moments(X,p):
    X = np.array(X)
    mom_p = np.mean(X**p, axis= 1)
    return mom_p


def gelation(X, N):
    X = np.array(X)
    G =  np.max(X, axis=1)/N
    return G

In [35]:
def MC_coagulation(N, T, R, k, X0,p, kernel, Z):
    '''
    here is the main algorithm for generating R iid replicas of the variables previously defined
    (concentration, moments, gelation)
    '''
    
    Xi = []
    Zi = []
    Ji = []
    
    start1 = timeit.default_timer()
    
    #repeat the coagulation simulation R times
    for r in range(R):
        
        #simulate coagulation to generate the vectors in time
        Xt, t = simulate_coagulation(N=N, T=T, X0=X0, kernel=kernel)
        
        #calculate the average parameters and store them along with the jumping times
        if Z == "average_conc":
            z = average_conc(Xt, k, N)
            #print("Xt and average computed for conc")
        if Z == "moments":
            z = moments(Xt,p)
            #print("Xt and average computed for mom")
        if Z == "gelation":
            z = gelation(Xt,N)
            #print("Xt and average computed for gel")
        
        Zi.append(z)
        Ji.append(t)
    Zi = np.array(Zi)   
    return Zi, Ji

In [36]:
def average_MC(Zi,Ji, N):
    '''
    function for Monte Carlo method that calculates the sample mean estimator
    and asymptotic confidence intervals
    '''
    #confidence level for intervals
    alpha = 0.05
    
    #here we create a unique list of sorted jumping times 
    J_no_first = [sublist[1:] for sublist in Ji] 
    J_no_first[0].append(0)
    J_sorted = sorted(list(chain.from_iterable(J_no_first)))
    J_sorted=np.array(J_sorted)

    Ji = np.array(Ji) 
    
    means = []
    Z_mean = []
    sigma = []
    
    #for each jumping time we find the maximum value lower than that in each row of times and 
    #find the corresponding indices in the rows of Zi
    #with this we can generate the means of our parameters
    for j_time in J_sorted:
        Z_mean = []
        for i, time_seq in enumerate(Ji):
            # Find the index of the last time less than or equal to j_time
            valid_indices = np.where(time_seq <= j_time)[0]
            last_index = valid_indices[-1]
            Z_mean.append(Zi[i][last_index])

                
        Z_mean = np.array(Z_mean)
        means.append(np.nanmean(Z_mean))
        sigma.append(np.nanstd(Z_mean, ddof=1))
 
    
    means = np.array(means)
    sigma = np.array(sigma)

    #calculate confidence intervals
    c_ok = st.norm.ppf(1-alpha/2)
    lower_bound = means - c_ok*sigma/np.sqrt(R)
    upper_bound = means + c_ok*sigma/np.sqrt(R) 
    
    stop2 = timeit.default_timer()
    
    return means, sigma, J_sorted, lower_bound, upper_bound
    

## Let's produce and save simulation files

In [75]:
# Define simulation parameters
N_values = [100]
k_values = [1,2,3,4,5] # put this to one element if not concentration!!
p = 1.5
#kernels = ["kernel_k4","kernel_k3"]
kernels = ["kernel_k2",]
Z = "average_conc"
#Z = "moments"
#Z = "gelation"
R = 10

# Directory to store results. create subfolders for each Z otherwise they will overwrite possibly
results_dir = "simulation_results/concentration"
os.makedirs(results_dir, exist_ok=True)

In [76]:
# Loop over parameters and run simulations
for N in N_values:
    X0 = np.ones(N)
    for kernel in kernels:
        for k in k_values:
            # Run simulation
            Zi, Ji = MC_coagulation(N, T=10, R=R, k=k, X0=X0, p=p, kernel=kernel, Z=Z)

            # Compute averages
            means, sigma, J_sorted, lower_bound, upper_bound = average_MC(Zi, Ji, N)

            # Prepare result dictionary
            result = {
                "means": means,
                "sigma": sigma,
                "J_sorted": J_sorted,
                "lower_bound": lower_bound,
                "upper_bound": upper_bound
            }

            # Save result to file
            filename = f"results_N{N}_k{k}_{kernel}_R{R}.pkl"
            filepath = os.path.join(results_dir, filename)
            with open(filepath, "wb") as f:
                pickle.dump(result, f)

            print(f"Saved results to {filepath}")

Saved results to simulation_results/xEmilio/results_N100_k1_kernel_k2_R10.pkl
Saved results to simulation_results/xEmilio/results_N100_k2_kernel_k2_R10.pkl
Saved results to simulation_results/xEmilio/results_N100_k3_kernel_k2_R10.pkl
Saved results to simulation_results/xEmilio/results_N100_k4_kernel_k2_R10.pkl
Saved results to simulation_results/xEmilio/results_N100_k5_kernel_k2_R10.pkl
