In [1]:
import numpy as np

def sample_transition_matrix(concentration_params):
    """
    Samples a transition matrix from Dirichlet distributions based on concentration parameters.
    
    Parameters:
    - concentration_params: A list of lists, where each sublist contains the concentration parameters (alpha)
      for the Dirichlet distribution of each state's transition probabilities.
    
    Returns:
    A sampled transition matrix.
    """
    transition_matrix = np.array([np.random.dirichlet(alpha) for alpha in concentration_params])
    return transition_matrix

def markov_chain_simulation(initial_state, steps, concentration_params):
    """
    Simulates a Markov chain using a single transition matrix sampled from Dirichlet distributions.
    
    Parameters:
    - initial_state: The starting state of the Markov chain.
    - steps: The number of steps to simulate.
    - concentration_params: Concentration parameters for Dirichlet distributions of each state's transitions.
    
    Returns:
    A list representing the sequence of states.
    """
    # Sample one transition matrix for the entire simulation
    transition_matrix = sample_transition_matrix(concentration_params)
    
    current_state = initial_state
    state_sequence = [current_state]
    
    for _ in range(steps - 1):
        current_state = np.random.choice(range(len(transition_matrix)), p=transition_matrix[current_state])
        state_sequence.append(current_state)
    
    return state_sequence

# Example parameters
initial_state = 1
steps = 10
concentration_params = [
    [5, 2, 3],  # Stronger belief in staying in or transitioning from state 0 to states 0, 1, 2
    [2, 5, 3],  # State 1 has a higher probability of staying in 1 or moving to 2
    [3, 3, 4]   # State 2 has a slightly higher probability of moving to state 2
]

# Run the simulation
sequence = markov_chain_simulation(initial_state, steps, concentration_params)
print(f"State sequence: {sequence}")


State sequence: [1, 1, 1, 1, 2, 2, 2, 0, 0, 0]


In [2]:
def categorize_sequence(sequence):
    """
    Categorizes a sequence into one of three categories based on specific criteria.
    
    Parameters:
    - sequence: A list of integers representing the state sequence from a Markov chain.
    
    Returns:
    - An integer (2, 0, 1) representing the category of the sequence.
    """
    # Convert the sequence list to a string for easier subsequence search
    sequence_str = ''.join(map(str, sequence))
    
    # Check for the presence of subsequences
    if '2' in sequence_str:
        return 2 # Bipolar
    elif '00' in sequence_str:
        return 0 # MDD
    else:
        return 1 # Euthymic

# Example usage
sequence = [0, 1, 1, 1, 2, 0, 1, 0, 1, 1, 2, 1] 
category = categorize_sequence(sequence)
print(f"Sequence: {sequence}\nCategory: {category}")

Sequence: [0, 1, 1, 1, 2, 0, 1, 0, 1, 1, 2, 1]
Category: 2


In [3]:
def calculate_category_percentages_np(sequences):
    """
    Calculates the percentages of categories (0, 1, 2) in a collection of Markov chain sequences using a NumPy array.
    
    Parameters:
    - sequences: A list of lists, where each sublist is a sequence from a Markov chain.
    
    Returns:
    A NumPy array where indices 0, 1, and 2 correspond to the percentages of categories 0, 1, and 2, respectively.
    """
    # Initialize a NumPy array for category counts
    category_counts = np.zeros(3)
    
    # Categorize each sequence and update counts
    for sequence in sequences:
        category = categorize_sequence(sequence)
        category_counts[category] += 1
    # Calculate percentages
    category_percentages = (category_counts / len(sequences)) * 100
    
    return category_percentages

# Example usage
sequences = [
    [1, 1, 1, 2, 1],  # Category 1
    [0, 0, 0, 1, 2],  # Category 0
    [1, 2, 2, 1, 2],  # Category 2
    [0, 1, 1, 1, 0],  # Category 1
    [0, 0, 2, 2, 2]   # Category 2
]

percentages = calculate_category_percentages_np(sequences)
print(f"Category Percentages: {percentages}")


Category Percentages: [ 0. 20. 80.]


In [4]:
def generate_markov_sequence(transition_matrix, initial_state, sequence_length):
    """
    Generates a Markov chain sequence given a transition matrix.
    
    Parameters:
    - transition_matrix: A NumPy array representing the transition matrix for the Markov chain.
    - initial_state: The starting state of the sequence.
    - sequence_length: The length of the sequence to generate.
    
    Returns:
    A list representing the generated Markov chain sequence.
    """
    current_state = initial_state
    sequence = [current_state]
    
    for _ in range(sequence_length - 1):
        current_state = np.random.choice(a=len(transition_matrix), p=transition_matrix[current_state])
        sequence.append(current_state)
    
    return sequence

def kl_divergence(generated_percentages, target_percentages):
    """
    Calculates the Kullback-Leibler Divergence between two distributions.
    
    Parameters:
    - generated_percentages: The percentages of categories from generated sequences.
    - target_percentages: The target percentages for each category.
    
    Returns:
    - The KL Divergence.
    """
    epsilon = 1e-8  # To ensure numerical stability by avoiding division by zero and log(0)
    
    # Ensure that both generated_percentages and target_percentages are of float type
    generated_percentages = np.array(generated_percentages, dtype=np.float64)
    target_percentages = np.array(target_percentages, dtype=np.float64)
    
    # Add epsilon to avoid log(0) and ensure non-zero division
    generated_percentages += epsilon
    target_percentages += epsilon
    
    # Normalize to ensure they sum to 1
    generated_norm = generated_percentages / np.sum(generated_percentages)
    target_norm = target_percentages / np.sum(target_percentages)
    
    return np.sum(target_norm * np.log(target_norm / generated_norm))


def objective_function(concentration_params, n_sequences, sequence_length, target_percentages, initial_state=1, penalty=1e6):
    """
    Objective function using KL Divergence, with a penalty for cases where a valid transition matrix cannot be sampled.

    Parameters:
    - concentration_params: Concentration parameters for the Dirichlet distributions.
    - n_sequences: Number of sequences to generate.
    - sequence_length: Length of each sequence.
    - target_percentages: Target category percentages.
    - initial_state: Initial state for sequence generation.
    - penalty: Penalty value to apply when a valid matrix cannot be sampled.

    Returns:
    - The KL Divergence between generated and target percentages, or a penalty if a valid matrix cannot be sampled.
    """
    sequences = []
    valid_matrices = 0
    
    for _ in range(n_sequences):
        try:
            transition_matrix = sample_transition_matrix(concentration_params)
            if transition_matrix is None:  # Implementing the check to avoid processing with invalid matrices
                continue
            sequence = generate_markov_sequence(transition_matrix, initial_state, sequence_length)
            sequences.append(sequence)
            valid_matrices += 1
        except ValueError:
            # Skip sequence generation if a valid transition matrix cannot be sampled
            continue
    
    if valid_matrices == 0:
        # Apply the penalty if no valid matrices could be sampled for all sequences
        return penalty
    
    generated_percentages = calculate_category_percentages_np(sequences)
    
    # Ensure non-negative percentages for KL Divergence calculation
    generated_percentages = np.clip(generated_percentages, a_min=0, a_max=None)
    target_percentages = np.clip(target_percentages, a_min=0, a_max=None)
    
    return kl_divergence(generated_percentages, target_percentages)

In [5]:
# Example usage
concentration_params = [
    [5, 2, 3],  # For state 0 transitions
    [2, 5, 3],  # For state 1 transitions
    [3, 3, 4]   # For state 2 transitions
]

#target_percentages = np.array([10, 87, 3]) # MDD, normal, Bipolar
target_percentages = np.array([5., 94., 1.])
#target_percentages = np.array([5, 1])

n_sequences = 100
sequence_length = 52
# Compute the objective function value
objective_value = objective_function(concentration_params, n_sequences, sequence_length, target_percentages)
print(f"Objective Function Value (MSE): {objective_value}")


Objective Function Value (MSE): 22.541591222983886


In [6]:
%%time
from skopt.space import Real
from skopt import gp_minimize

# Define the search space for 9 concentration parameters (3 for each state transition)
lb = 1e-3
ub = 1.99
search_space = [
    Real(lb, ub, name='alpha_0_0'), Real(lb, ub, name='alpha_0_1'), Real(lb, ub, name='alpha_0_2'),
    Real(lb, ub, name='alpha_1_0'), Real(lb, ub, name='alpha_1_1'), Real(lb, ub, name='alpha_1_2'),
    Real(lb, ub, name='alpha_2_0'), Real(lb, ub, name='alpha_2_1'), Real(lb, ub, name='alpha_2_2'),
]
#Real(1e-6, 10.0, name='alpha_0_0')

from skopt.utils import use_named_args

@use_named_args(search_space)
def skopt_objective_function(**params):
    # Unpack parameters into the structure expected by the objective function
    concentration_params = [
        [params['alpha_0_0'], params['alpha_0_1'], params['alpha_0_2']],
        [params['alpha_1_0'], params['alpha_1_1'], params['alpha_1_2']],
        [params['alpha_2_0'], params['alpha_2_1'], params['alpha_2_2']],
    ]
    # Call the objective function with the structured parameters
    return objective_function(concentration_params, n_sequences=100, sequence_length=52, target_percentages=target_percentages)

# Perform Bayesian Optimization
result = gp_minimize(skopt_objective_function, search_space, n_calls=100, random_state=1, n_jobs=8, n_initial_points=20, verbose=False)

print(f"Optimal concentration parameters: {result.x}")
print(f"Minimum MSE: {result.fun}")

Optimal concentration parameters: [1.99, 1.99, 0.6185093533368146, 0.001, 1.99, 0.001, 1.99, 0.43471463127315113, 1.99]
Minimum MSE: 0.21596314090660873
CPU times: user 5min 16s, sys: 3min 38s, total: 8min 55s
Wall time: 49 s


In [7]:
result_x_np = np.array(result.x)

# Reshape the NumPy array to 3x3
optimized_concentration_params = result_x_np.reshape((3, 3))

In [8]:
# Number of sequences to generate for a robust sample
n_sequences = 1000
sequence_length = 52  # Specify the length of each Markov chain sequence
initial_state = 1  # Assuming we start each sequence from state 0 for simplicity

# Generate sequences using the optimized concentration parameters
sequences = [generate_markov_sequence(sample_transition_matrix(optimized_concentration_params), initial_state, sequence_length) for _ in range(n_sequences)]

# Calculate and compare the category percentages
calculated_percentages = calculate_category_percentages_np(sequences)
# Output the results
print(f"Calculated Category Percentages: {calculated_percentages}")
print(f"Target Percentages: {target_percentages}")
print(f"Difference: {np.abs(calculated_percentages - target_percentages)}")
#old, 20, 50, 30

Calculated Category Percentages: [ 0.1 99.8  0.1]
Target Percentages: [ 5. 94.  1.]
Difference: [4.9 5.8 0.9]


In [9]:
np.around(result_x_np.reshape((3,3)),3)
# Best: [0.1, 9.9]

array([[1.99e+00, 1.99e+00, 6.19e-01],
       [1.00e-03, 1.99e+00, 1.00e-03],
       [1.99e+00, 4.35e-01, 1.99e+00]])

In [10]:
# Close to optimal set of optimized parameters

# Calculated Category Percentages: [ 4.4 93.6  2. ]
# Target Percentages: [ 5 94  1]
# Difference: [0.6 0.4 1. ]
np.array([[2.88 , 9.9  , 0.01 ],
       [0.066, 9.367, 0.01 ],
       [9.52 , 0.01 , 5.35 ]])

np.array([[ 0.822,  7.699, 10.   ],
       [ 0.286,  7.457,  0.   ],
       [10.   ,  9.052,  0.452]])

#Calculated Category Percentages: [ 4.5 93.5  2. ]
#Target Percentages: [ 5 94  1]
#Difference: [0.5 0.5 1. ]
np.array([[9.9  , 8.421, 0.01 ],
       [0.029, 6.727, 0.01 ],
       [9.651, 3.378, 2.508]])

#Calculated Category Percentages: [ 5.7 93.3  1. ]
#Target Percentages: [ 5. 94.  1.]
#Difference: [0.7 0.7 0. ]
np.array([[5.810e-01, 9.900e+00, 5.000e-03],
       [3.660e-01, 9.900e+00, 5.000e-03],
       [3.811e+00, 1.900e-02, 1.902e+00]])

array([[5.810e-01, 9.900e+00, 5.000e-03],
       [3.660e-01, 9.900e+00, 5.000e-03],
       [3.811e+00, 1.900e-02, 1.902e+00]])

In [61]:
from cma import CMAEvolutionStrategy, CMAOptions
# Number of concentration parameters
n_params = len(target_percentages)*3  # 3 parameters for each state

# Initial guess for the concentration parameters
initial_params = np.ones(n_params)  

# Define the bounds for the concentration parameters (they should be positive)
lower_bounds = 1e-8 * np.ones(n_params)  # Avoid zero values for numerical stability
upper_bounds = None  # No upper bounds

# Set the options for the CMA-ES algorithm
options = {
    'bounds': [1e-3, 1.99],  # Lower and upper bounds for the parameters
    'maxiter': 100,  # Maximum number of iterations
    'verb_disp': 1,  # Print output every iteration
}

In [62]:
optimizer = CMAEvolutionStrategy(initial_params, 0.5, options)

(5_w,10)-aCMA-ES (mu_w=3.2,w_1=45%) in dimension 9 (seed=736862, Mon May 13 14:32:05 2024)


In [63]:
solutions = []
while not optimizer.stop():
    solutions = optimizer.ask()

    fitness_values = [objective_function(params.reshape((3,3)), n_sequences, sequence_length, target_percentages, initial_state)
                      for params in solutions]
    optimizer.tell(solutions, fitness_values)
    optimizer.disp()


# Get the best solution
best_params = optimizer.result.xbest

Iterat #Fevals   function value  axis ratio  sigma  min&max std  t[m:s]
    1     10 3.555197201565362e+00 1.0e+00 4.77e-01  5e-01  5e-01 0:03.1
    2     20 2.356747963108380e+00 1.2e+00 4.52e-01  4e-01  5e-01 0:06.1
    3     30 7.026953963041747e-01 1.2e+00 4.67e-01  4e-01  5e-01 0:09.1
    4     40 7.092428045696987e-01 1.3e+00 4.76e-01  4e-01  5e-01 0:12.1
    5     50 9.208538094052572e-01 1.4e+00 4.60e-01  4e-01  5e-01 0:15.0
    6     60 1.169785861040791e-01 1.4e+00 4.53e-01  4e-01  5e-01 0:18.0
    7     70 3.461681595348674e-02 1.5e+00 4.91e-01  4e-01  5e-01 0:20.9
    8     80 9.553862835347735e-01 1.6e+00 5.71e-01  5e-01  7e-01 0:24.0
    9     90 1.135109196464881e-01 1.8e+00 6.19e-01  5e-01  7e-01 0:27.0
   10    100 1.260824837982766e-01 1.8e+00 5.90e-01  4e-01  6e-01 0:30.0
Iterat #Fevals   function value  axis ratio  sigma  min&max std  t[m:s]
   11    110 1.175112697423748e+00 1.9e+00 6.32e-01  4e-01  7e-01 0:33.0
   12    120 5.971139338102378e-01 2.2e+00 5.95e-01  

In [65]:
# Number of sequences to generate for a robust sample
n_sequences = 1000
sequence_length = 52  # Specify the length of each Markov chain sequence
initial_state = 1  # Assuming we start each sequence from state 0 for simplicity

# Generate sequences using the optimized concentration parameters
sequences = [generate_markov_sequence(sample_transition_matrix(best_params.reshape(3,3)), initial_state, sequence_length) for _ in range(n_sequences)]

# Calculate and compare the category percentages
calculated_percentages = calculate_category_percentages_np(sequences)
# Output the results
print(f"Calculated Category Percentages: {calculated_percentages}")
print(f"Target Percentages: {target_percentages}")
print(f"Difference: {np.abs(calculated_percentages - target_percentages)}")

Calculated Category Percentages: [ 4.5 94.7  0.8]
Target Percentages: [ 5. 94.  1.]
Difference: [0.5 0.7 0.2]
