# Import Libraries

In [None]:
from hmmlearn.hmm import CategoricalHMM
import numpy as np
import random
import pandas as pd
import time
from tqdm import tqdm
import math
import seaborn as sns
import matplotlib.pyplot as plt

# Model

In [None]:
# Model
n_comp = 50 #hidden states
n_features = 50 #observable states
start_prob = start_prob = np.array([1/n_comp] * n_comp)
trans_prob = np.random.dirichlet(np.ones(n_comp), size=n_comp) #transition between hidden states
emission_prob = np.random.dirichlet(np.ones(n_features), size=n_comp)#probability of emission based on a hidden state

#Defining model
model = CategoricalHMM(n_components=n_comp, startprob_prior=start_prob, transmat_prior=trans_prob, 
                       emissionprob_prior=emission_prob, algorithm='viterbi', init_params='')

#Probabilities equal predefined probabilities
model.startprob_ = start_prob
model.transmat_ = trans_prob
model.emissionprob_= emission_prob

print(model.startprob_)
print(model.transmat_)
print(model.emissionprob_)

# Observation Sequences

In [None]:
# Observation Sequence
obs_sequence = np.random.randint(0, n_features, size=(6,1)) #Ex. picked a random observation sequence of length 6
log_prob, correct_state_seq = model.decode(obs_sequence, algorithm='viterbi')

# Objective Score

In [None]:
def compute_objective_score(model, obs_sequence, attacked_sequence):
    #Check for original observation sequence
    if (obs_sequence != attacked_sequence).any():
        
        #Decode the original observation sequence
        log_prob, correct_state_seq = model.decode(obs_sequence, algorithm='viterbi')
    
        #Decode the attack observation sequence
        log_prob2, new_state_seq = model.decode(attacked_sequence, algorithm='viterbi')
    
        #Calculate the number of observations changed and the number of hidden states changed
        num_obs_changed = 0
        num_hiddenstates_changed = 0
        #Loop through each position of the observed sequence to compare it to attack sequence
        for idx in range(len(obs_sequence)):
            if attacked_sequence[idx] != obs_sequence[idx]:
                num_obs_changed += 1
            if new_state_seq[idx] != correct_state_seq[idx]:
                num_hiddenstates_changed += 1
    
        #Calculate the objective score (more important to change hidden states than observations)
        if num_hiddenstates_changed >= 1:
            max_score = (0.45 * len(obs_sequence) / len(obs_sequence)) - (0.55 * 1 / len(obs_sequence))
            objective_score = ((0.45 * num_hiddenstates_changed / len(obs_sequence)) - (0.55 * num_obs_changed / len(obs_sequence))) / max_score

            if objective_score < 0:
                objective_score = 0
        else:
            #Major penalty if the attack hidden state sequence does not differ from the original hidden state sequence
            objective_score = -1

    else:
        #Penalty if the random attack observation is the same as the observation sequence
        num_obs_changed = -1
        num_hiddenstates_changed = -1
        new_state_seq = -1
        objective_score = -1
        correct_state_seq = -1

    return num_obs_changed, num_hiddenstates_changed, correct_state_seq, new_state_seq, objective_score


# Most Likely Hidden State Sequence Given the Observations

In [None]:
# Find the most likely hidden state sequence from the observation sequence parameter
log_prob, correct_state_seq = model.decode(obs_sequence, algorithm='viterbi')

correct_state_seq

# GA

In [None]:
def genetic_algorithm(model, obs_sequence, population_size, number_generations, mutation_rate):
    
    #Initial population
    def first_population():
        attack_population = [np.random.randint(0, n_features, size=(len(obs_sequence), 1)) for i in range(population_size)]
        return attack_population

    #Fitness function
    def fitness_score(attack_seq):
        num_obs_changed, num_hiddenstates_changed, correct_state_seq, new_state_seq, objective_score = compute_objective_score(model, obs_sequence, attack_seq)
        return num_obs_changed, num_hiddenstates_changed, correct_state_seq, new_state_seq, objective_score

    #Selection function to get top half
    def selection(population, scores):
    
        # Ensure scores are numeric (using the first element of the tuple)
        clean_scores = []
        for score in scores:
            if isinstance(score, tuple) and isinstance(score[0], (int, float)):
                clean_scores.append(score[0])  # Use the first element (fitness score)
            else:
                print(f"Invalid score detected: {score}. Replacing with -inf.")
                clean_scores.append(-float('inf'))  # Default low fitness for invalid entries
        
        # Zip the cleaned scores (first element) with the population and proceed with sorting
        combined_pop_scores = list(zip(population, clean_scores))
        sorted_pop_scores = sorted(combined_pop_scores, key=lambda x: x[1], reverse=True)
        
        pop_only = [ind for ind, _ in sorted_pop_scores]
        return pop_only[:len(pop_only) // 2]  # Select top 50% of the population

    #Crossover function
    def crossover(parent1, parent2):
        idx = np.random.randint(1, len(parent1))
        child1 = np.concatenate([parent1[:idx], parent2[idx:]])
        child2 = np.concatenate([parent2[:idx], parent1[idx:]])
        return child1, child2

    #Mutation function
    def mutation(attack_seq):
        if np.random.rand() < mutation_rate:
            idx = np.random.randint(len(attack_seq))
            attack_seq[idx] = np.random.randint(n_features)
        return attack_seq

    #Using functions
    population = first_population()
    best_individual = None
    best_fitness = -10
    best_num_obs_changed = None
    best_num_hiddenstates_changed = None
    
    start_time = time.time()
    
    for generation in range(number_generations):
        fitnesses = [fitness_score(ind) for ind in population]
        num_obs_changes, num_hiddenstates_changes, correct_state_seqs, new_state_seqs, objective_scores = zip(*fitnesses)
        #scores, num_hidden_states_changes = zip(*fitnesses)
        
        if len(objective_scores) > 0:
            current_best_fitness = max(objective_scores)
            best_idx = np.argmax(objective_scores)

            if current_best_fitness > best_fitness:
                best_fitness = current_best_fitness
                best_individual = population[best_idx]  # Safe indexing after confirmation
                best_num_obs_changed = num_obs_changes[best_idx]
                best_num_hiddenstates_changed = num_hiddenstates_changes[best_idx]
        
        selected_population = selection(population, fitnesses)
        next_generation = []
        while len(next_generation) < population_size:
            parents = random.sample(selected_population, 2)
            offspring1, offspring2 = crossover(parents[0], parents[1])
            next_generation.append(mutation(offspring1))
            next_generation.append(mutation(offspring2))
        population = next_generation
    
    end_time = time.time()  # End the timer
    execution_time_ga = end_time - start_time  # Calculate the execution time


    return best_individual, best_fitness, execution_time_ga, best_num_obs_changed, best_num_hiddenstates_changed




In [None]:
# GA Parameters
pop = 20
gen = 20
mut_rate = 0.01

In [None]:
#Execute GA
best_individual, best_fitness, execution_time_ga, best_num_obs_changed, best_num_hiddenstates_changed = \
    genetic_algorithm(model, obs_sequence, pop, gen, mut_rate)

In [None]:
#GA
print("GA Method Best Attack")
print("    Attack Sequence:", best_individual.T, "Original:", obs_sequence.T)
print("    Objective Score:", best_fitness)
print("    Execution Time (seconds):", execution_time_ga)
print("    Num Hidden States Changed:", best_num_hiddenstates_changed)
print("    Num Observations Changed:", best_num_obs_changed)

# SA

In [None]:
#SA METHOD
def simulated_annealing(model, obs_sequence, initial_temp, cooling_rate, max_iter):
    
    def random_attack_start(obs_sequence):
        initial_solution = np.random.randint(0, n_features, size=(len(obs_sequence),1))
        return initial_solution

    def generate_neighbor(solution):
        neighbor = solution.copy()
        idx = np.random.randint(0, len(solution))
        neighbor[idx] = np.random.randint(0, n_features)
        return neighbor

    def acceptance_probability(current_score, new_score, temperature):
        if new_score > current_score:
            return 1.0
        else:
            return np.exp((new_score - current_score) / temperature)

    def annealing_schedule(t, initial_temp, cooling_rate):
        return initial_temp / (1 + cooling_rate * t)

    
    start_time_sa = time.time()  # Start timer
    current_solution = random_attack_start(obs_sequence)
    num_obs_changed, num_hiddenstates_changed, correct_state_seq, new_state_seq, current_score = compute_objective_score(model, obs_sequence, current_solution)
    best_solution = current_solution
    best_score = current_score
    temperature = initial_temp
    num_obs_changed = num_obs_changed
    num_hiddenstates_changed = num_hiddenstates_changed

    for t in range(max_iter):
        temperature = annealing_schedule(t, initial_temp, cooling_rate)
        new_solution = generate_neighbor(current_solution)
        new_num_obs_changed, new_num_hiddenstates_changed, correct_state_seq, new_state_seq, new_score = compute_objective_score(model, obs_sequence, new_solution)
        
        if acceptance_probability(current_score, new_score, temperature) > np.random.rand():
            current_solution = new_solution
            current_score = new_score
            
            if new_score > best_score:
                best_solution = new_solution
                best_score = new_score
                num_obs_changed = new_num_obs_changed
                num_hiddenstates_changed = new_num_hiddenstates_changed

        score_history.append(best_score)

    end_time_sa = time.time()  # End the timer
    execution_time_sa = end_time_sa - start_time_sa  # Calculate the execution time

    return best_solution, best_score, execution_time_sa, num_obs_changed, num_hiddenstates_changed



In [None]:
# SA Parameters
initial_temp = 10
cooling_rate = 0.009
max_iter_sa = 1000

In [None]:
# Execute SA
best_solution, best_score, execution_time_sa, num_obs_changed2, num_hiddenstates_changed2 = \
    simulated_annealing(model, obs_sequence, initial_temp, cooling_rate, max_iter_sa)

In [None]:
#SA
print("SA Method Best Attack")
print("    Attack Sequence:", best_solution.T, "Original:", obs_sequence.T)
print("    Objective Score:", best_score)
print("    Execution Time (seconds):", execution_time_sa)
print("    Num Hidden States Changed:", num_obs_changed2)
print("    Num Observations Changed:", num_hiddenstates_changed2)

# TS

In [None]:
#TS METHOD
def tabu_search(model, obs_sequence, tabu_list_size, max_iter):
    
    def random_attack_start(obs_sequence):
        initial_solution = np.random.randint(0, n_features, size=(len(obs_sequence), 1))
        return initial_solution

    def generate_neighborhood(solution):
        neighbors = []
        indices = list(range(len(solution)))

        # Calculate total possible neighbors and a quarter of them
        total_possible_neighbors = len(solution) * (n_features - 1)
        sample_size = math.ceil(total_possible_neighbors ** 0.45) #total_possible_neighbors // 10

        while len(neighbors) < sample_size:
            neighbor = solution.copy()
        
            # Randomly pick a position in the sequence to change
            i = random.choice(indices)
            
            # Randomly choose a new value different from the current one
            new_value = random.choice([j for j in range(n_features) if j != solution[i, 0]])
            
            # Modify the solution at the chosen position
            neighbor[i, 0] = new_value
            
            # Ensure no duplicate neighbors
            if neighbor.tolist() not in [n.tolist() for n in neighbors]:  # Convert to list for comparison
                neighbors.append(neighbor)
        
        return neighbors

    
    start_time = time.time()
    current_solution = random_attack_start(obs_sequence)
    num_obs_changed, num_hiddenstates_changed, correct_state_seq, new_state_seq, current_score = compute_objective_score(model, obs_sequence, current_solution)
    
    best_solution = current_solution
    best_score = current_score
    best_num_obs_changed = num_obs_changed
    best_num_hiddenstates_changed = num_hiddenstates_changed
    tabu_list = []


    for t in range(max_iter):
        neighborhood = generate_neighborhood(current_solution)
        best_neighbor = None
        best_neighbor_score = -np.inf
        best_neighbor_num_obs_changed = None
        best_neighbor_num_hiddenstates_changed = None


        #print(neighborhood)

        
        # Evaluate all neighbors
        for neighbor in neighborhood:
            new_num_obs_changed, new_num_hiddenstates_changed, correct_state_seq, new_state_seq, neighbor_score = compute_objective_score(model, obs_sequence, neighbor)
            
            if neighbor_score > best_neighbor_score and neighbor.tolist() not in tabu_list:
                best_neighbor = neighbor
                best_neighbor_score = neighbor_score
                best_neighbor_num_obs_changed = new_num_obs_changed
                best_neighbor_num_hiddenstates_changed = new_num_hiddenstates_changed
    
        # After evaluating all neighbors, update current solution if a best neighbor was found
        if best_neighbor is not None:
            current_solution = best_neighbor
            current_score = best_neighbor_score
            tabu_list.append(current_solution.tolist())
            
            if len(tabu_list) > tabu_list_size:
                tabu_list.pop(0)
    
            # Update best solution if the current score is better
            if current_score > best_score:
                best_solution = current_solution
                best_score = current_score
                best_num_obs_changed = best_neighbor_num_obs_changed
                best_num_hiddenstates_changed = best_neighbor_num_hiddenstates_changed

    end_time = time.time()
    execution_time = end_time - start_time

    return best_solution, best_score, execution_time, best_num_obs_changed, best_num_hiddenstates_changed



In [None]:
# TS Parameters
tabu_list_size = 10
max_iter_ts = 5

In [None]:
# Execute TS
best_solution1, best_score1, execution_time1, num_obs_changed3, num_hiddenstates_changed3 = \
    tabu_search(model, obs_sequence, tabu_list_size, max_iter_ts)

In [None]:
#TS
print("TS Method Best Attack")
print("    Attack Sequence:", best_solution1.T, "Original:", obs_sequence.T)
print("    Objective Score:", best_score1)
print("    Execution Time (seconds):", execution_time1)
print("    Num Hidden States Changed:", num_obs_changed3)
print("    Num Observations Changed:", num_hiddenstates_changed3)

# Run Experiment

In [None]:
# Initialize storage for results
results = []

# GA Parameters
pop_sizes = [10, 20, 30, 50]
gen_sizes = [20, 30, 35, 40]
mut_rates = [0.01, 0.05, 0.1]

# SA Parameters
initial_temps = [10, 20, 30]
cooling_rates = [0.001, 0.005, 0.01, 0.05]
max_iters_sa = [100, 250, 500]

# TS Parameters
tabu_list_sizes = [5, 10, 15, 20]
max_iters_ts = [10, 25, 50, 75]

# Define observation lengths
obs_lengths = [6, 10, 50, 100]

# Number of iterations for each algorithm
iterations = 20

# Total tasks to track for progress (each algorithm runs 30 times for each observation length)
total_tasks = len(obs_lengths) * iterations * (len(pop_sizes) * len(gen_sizes) * len(mut_rates) + 
                                               len(initial_temps) * len(cooling_rates) * len(max_iters_sa) +
                                               len(tabu_list_sizes) * len(max_iters_ts))

# Create a progress bar using tqdm
with tqdm(total=total_tasks, desc="Running Algorithms", unit="run") as pbar:
    
    # Loop over each observation length
    for obs_length in obs_lengths:
        
        # Generate observation sequence based on the current length
        obs_sequence = np.random.randint(0, n_features, size=(obs_length, 1)) 
        log_prob, correct_state_seq = model.decode(obs_sequence, algorithm='viterbi')
        
        # Test Genetic Algorithm
        for pop_size in pop_sizes:
            for gen_size in gen_sizes:
                for mut_rate in mut_rates:
                    for _ in range(iterations):
                        best_individual, best_fitness, execution_time_ga, best_num_obs_changed, best_num_hiddenstates_changed = genetic_algorithm(
                            model, obs_sequence, pop_size, gen_size, mut_rate
                        )
                        # Store the results
                        results.append({
                            'Algorithm': 'GA',
                            'Obs_Length': obs_length,
                            'Population_Size': pop_size,
                            'Generation_Size': gen_size,
                            'Mutation_Rate': mut_rate,
                            'Best_Fitness': best_fitness,
                            'Best_Individual': best_individual,
                            'Best_Num_Obs_Changed': best_num_obs_changed,
                            'Best_Num_Hidden_States_Changed': best_num_hiddenstates_changed,
                            'Execution_Time': execution_time_ga
                        })
                        pbar.update(1)

        # Test Simulated Annealing
        for initial_temp in initial_temps:
            for cooling_rate in cooling_rates:
                for max_iter in max_iters_sa:
                    for _ in range(iterations):
                        best_solution, best_score, execution_time_sa, num_obs_changed, num_hiddenstates_changed = simulated_annealing(
                            model, obs_sequence, initial_temp, cooling_rate, max_iter
                        )
                        # Store the results
                        results.append({
                            'Algorithm': 'SA',
                            'Obs_Length': obs_length,
                            'Initial_Temperature': initial_temp,
                            'Cooling_Rate': cooling_rate,
                            'Max_Iterations': max_iter,
                            'Best_Fitness': best_score,
                            'Best_Individual': best_solution,
                            'Best_Num_Obs_Changed': num_obs_changed,
                            'Best_Num_Hidden_States_Changed': num_hiddenstates_changed,
                            'Execution_Time': execution_time_sa
                        })
                        pbar.update(1)

        # Test Tabu Search
        for tabu_size in tabu_list_sizes:
            for max_iter in max_iters_ts:
                for _ in range(iterations):
                    best_solution, best_score, execution_time_ts, num_obs_changed, num_hiddenstates_changed = tabu_search(
                        model, obs_sequence, tabu_size, max_iter
                    )
                    # Store the results
                    results.append({
                        'Algorithm': 'TS',
                        'Obs_Length': obs_length,
                        'Tabu_List_Size': tabu_size,
                        'Max_Iterations': max_iter,
                        'Best_Fitness': best_score,
                        'Best_Individual': best_solution,
                        'Best_Num_Obs_Changed': num_obs_changed,
                        'Best_Num_Hidden_States_Changed': num_hiddenstates_changed,
                        'Execution_Time': execution_time_ts
                    })
                    pbar.update(1)

In [None]:
# Convert results to a DataFrame for easier analysis
results_df = pd.DataFrame(results)
results_df

# Filter the DataFrame where algorithm is 'GA' and obs_length is 6
filtered_results = results_df[(results_df['Algorithm'] == 'TS') & (results_df['Obs_Length'] == 50) & (results_df['Best_Fitness'] > 0)]

# Display the filtered results
filtered_results.head(20)


# Analysis

In [None]:
# Summary of Best Fitness by Algorithm and Obs_Length
summary_table = results_df.groupby(['Algorithm', 'Obs_Length']).agg(
    Best_Fitness=('Best_Fitness', 'mean'),
    Best_Num_Obs_Changed=('Best_Num_Obs_Changed', 'mean'),
    Best_Num_Hidden_States_Changed=('Best_Num_Hidden_States_Changed', 'mean'),
    Execution_Time=('Execution_Time', 'mean')
).reset_index()

(summary_table)

In [None]:
# Summary of Best Fitness by Algorithm and Obs_Length
summary_table = results_df.agg(
    Best_Fitness=('Best_Fitness', 'mean'),
    Best_Num_Obs_Changed=('Best_Num_Obs_Changed', 'mean'),
    Best_Num_Hidden_States_Changed=('Best_Num_Hidden_States_Changed', 'mean'),
    Execution_Time=('Execution_Time', 'mean')
).reset_index()

(summary_table)

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# Pivot for heatmap data for Tabu Search
ts_heatmap_data = results_df[results_df['Algorithm'] == 'TS'].pivot_table(
    values='Best_Fitness',
    index='Tabu_List_Size',
    columns='Max_Iterations'
)

# Create the heatmap with adjustments
plt.figure(figsize=(4.5, 3.5))  # Reduce the figure size
sns.heatmap(ts_heatmap_data, annot=True, cmap='viridis', annot_kws={"size": 8}, cbar_kws={'shrink': 0.8})
plt.title('Tabu Search Best Fitness by List Size and Iterations', fontsize=11)  # Smaller title
plt.xlabel('Max Iterations', fontsize=10)
plt.ylabel('Tabu List Size', fontsize=10)
plt.xticks(fontsize=8)  # Reduce tick label size
plt.yticks(fontsize=8)

# Tighten layout to remove excess whitespace
plt.tight_layout()

# Show the plot
plt.show()


In [None]:
import pandas as pd

# Assume results_df is your DataFrame with results
results_df = pd.DataFrame(results)

# Find average best parameters for GA
avg_ga_params = results_df[results_df['Algorithm'] == 'GA'].groupby(
    ['Population_Size', 'Generation_Size', 'Mutation_Rate']
).agg({'Best_Fitness': 'mean'}).reset_index()

# Sort to find the top 3 GA parameters by average fitness
top_ga_params = avg_ga_params.sort_values(by='Best_Fitness', ascending=False).head(3)

# Display the top performing GA parameters
print("Top 3 GA Parameters:")
print(top_ga_params)

# Find average best parameters for SA
avg_sa_params = results_df[results_df['Algorithm'] == 'SA'].groupby(
    ['Initial_Temperature', 'Cooling_Rate', 'Max_Iterations']
).agg({'Best_Fitness': 'mean'}).reset_index()

# Sort to find the top 3 SA parameters by average fitness
top_sa_params = avg_sa_params.sort_values(by='Best_Fitness', ascending=False).head(3)

# Display the top performing SA parameters
print("Top 3 SA Parameters:")
print(top_sa_params)
