In [None]:
import logging
import random
from collections.abc import Callable
from copy import deepcopy
from dataclasses import dataclass
from datetime import datetime
from json import dumps
from time import time
from typing import Final, Literal

import numpy as np
import structlog
from numpy import ndarray, where
from pandas import DataFrame, Series, concat, read_csv, set_option
from scipy.stats import loguniform
from sklearn.metrics import (
	auc,
	average_precision_score,
	f1_score,
	precision_recall_curve,
)
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV, cross_val_score
from sklearn.svm import OneClassSVM

set_option("display.max_columns", None)
structlog.configure(
	processors=[
		structlog.stdlib.filter_by_level,
		structlog.stdlib.add_logger_name,
		structlog.stdlib.add_log_level,
		structlog.stdlib.PositionalArgumentsFormatter(),
		structlog.processors.TimeStamper(fmt="iso"),
		structlog.processors.StackInfoRenderer(),
		structlog.processors.format_exc_info,
		structlog.processors.UnicodeDecoder(),
		structlog.processors.JSONRenderer(),
	],
	context_class=dict,
	logger_factory=structlog.stdlib.LoggerFactory(),
	wrapper_class=structlog.stdlib.BoundLogger,
	cache_logger_on_first_use=True,
)
type ParamGrid = dict[str, tuple[float | str, ...]]
NUM_TRIALS: Final[int] = 20


@dataclass
class SearchResult:
	method: str
	best_params: dict
	best_score: float
	cv_scores: list[float]
	fit_time: float
	n_evaluations: int

In [None]:
X_train = read_csv("../data/PAMAP2/x_train_data.csv")
X_test = read_csv("../data/PAMAP2/x_test_data.csv")
y_train = read_csv("../data/PAMAP2/y_train_data.csv")
y_test = read_csv("../data/PAMAP2/y_test_data.csv")

X_train["activity"] = y_train
X_test["activity"] = y_test

testing_data: DataFrame
test_targets: Series

# RESULTS: dict[int, dict[str, float | int]] = {}
# MODELS: dict[int, dict] = {}
MIN_SAMPLES = X_train["activity"].value_counts().sort_values().iloc[0]
MAXIMAZED = False

In [None]:
def configure_file_logger(filepath: str) -> structlog.BoundLogger:
	"""Configure a file-specific logger using structlog."""

	# Create a specific handler for this file
	file_handler = logging.FileHandler(filepath, mode="a")
	file_handler.setLevel(logging.INFO)

	# Configure structlog with file output
	structlog.configure(
		processors=[
			structlog.stdlib.filter_by_level,
			structlog.stdlib.add_logger_name,
			structlog.stdlib.add_log_level,
			structlog.stdlib.PositionalArgumentsFormatter(),
			structlog.processors.TimeStamper(fmt="iso"),
			structlog.processors.StackInfoRenderer(),
			structlog.processors.format_exc_info,
			structlog.processors.UnicodeDecoder(),
			structlog.processors.JSONRenderer(),
		],
		context_class=dict,
		logger_factory=structlog.stdlib.LoggerFactory(),
		wrapper_class=structlog.stdlib.BoundLogger,
		cache_logger_on_first_use=False,
	)
	# Get the underlying stdlib logger and add the handler
	stdlib_logger = logging.getLogger("hyperparameter_search")
	stdlib_logger.handlers.clear()  # Clear existing handlers
	stdlib_logger.addHandler(file_handler)
	stdlib_logger.setLevel(logging.INFO)

	return structlog.get_logger("hyperparameter_search")


def score_function(model: OneClassSVM, Train: DataFrame, test: Series) -> float:
	"""
	Objective function to maximize, calcs the F1 score on the test set.
	follows the format needed by scikit-learn's API.

	Args:
		model (OneClassSVM): Model to eval
		X_test (DataFrame): train data, only for API compliance
		y_true (Series): true targets, only for API compliance

	Returns:
		float: F1 score
	"""
	f1 = f1_score(test_targets, where(model.predict(testing_data) == -1, True, False))

	# Get decision scores (higher values = more normal, lower = more anomalous)
	# convert to anomaly scores (higher values = more anomalous) and Negate
	# decision scores since OneClassSVM gives higher scores for inliers
	anomaly_scores = -model.decision_function(testing_data)
	precision, recall, _ = precision_recall_curve(test_targets, anomaly_scores)
	logger.info(
		{
			"target": f1,
			"avg_precision": average_precision_score(test_targets, anomaly_scores),
			"auc_pr": auc(recall, precision),
			"params": model.get_params(),
			"datetime": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
		}
	)
	return float(f1)

In [None]:
class SimulatedAnnealingSearch:
	"""
	Custom Simulated Annealing implementation for hyperparameter optimization.

	Simulated Annealing Search:
	- Temperature-based acceptance: Accepts worse solutions with decreasing probability
	- Adaptive parameter perturbation: Different strategies for continuous (nu, gamma, tol) vs discrete parameters
	- Cooling schedule: Exponential cooling with configurable rate
	- Neighbor generation: Smart parameter space exploration
	"""

	def __init__(
		self,
		param_space: dict,
		n_iter: int = 100,
		initial_temp: float = 1.0,
		cooling_rate: float = 0.95,
		min_temp: float = 0.01,
		random_state: int = 42,
	):
		self.param_space = param_space
		self.n_iter = n_iter
		self.initial_temp = initial_temp
		self.cooling_rate = cooling_rate
		self.min_temp = min_temp
		self.random_state = random_state
		self.best_params_ = None
		self.best_score_ = -np.inf
		self.cv_results_ = {"mean_test_score": []}

	def _sample_params(self) -> dict:
		"""Sample random parameters from the parameter space."""
		return {
			key: values.rvs(random_state=self.random_state)
			if hasattr(values, "rvs")  # scipy distribution
			else random.choice(values)
			for key, values in self.param_space.items()
		}

	def _neighbor_params(self, current_params: dict) -> dict:
		"""Generate neighboring parameters by slightly modifying current ones."""
		neighbor = deepcopy(current_params)

		# Choose a random parameter to modify
		param_to_modify = random.choice(list(self.param_space.keys()))

		if hasattr(self.param_space[param_to_modify], "rvs"):  # continuous parameter
			if param_to_modify == "nu":
				# For nu, stay within bounds [0.001, 1.0]
				current_val = neighbor[param_to_modify]
				neighbor[param_to_modify] = np.clip(
					current_val + np.random.normal(0, 0.05 * current_val), 0.001, 1.0
				)
			elif param_to_modify == "gamma":
				# For gamma, use log-space perturbation
				neighbor[param_to_modify] = 10 ** np.clip(
					np.log10(neighbor[param_to_modify]) + np.random.normal(0, 0.1),
					-4,
					1,
				)  # 1e-4 to 10
			elif param_to_modify == "tol":
				# For tolerance, use log-space perturbation
				neighbor[param_to_modify] = 10 ** np.clip(
					np.log10(neighbor[param_to_modify]) + np.random.normal(0, 0.1),
					-6,
					-1,
				)  # 1e-6 to 1e-1
		else:  # discrete parameter
			neighbor[param_to_modify] = random.choice(self.param_space[param_to_modify])

		return neighbor

	def _evaluate_params(self, params: ParamGrid, X: DataFrame, y: Series) -> float:
		"""Evaluate parameter configuration using cross-validation."""
		return np.mean(
			cross_val_score(
				OneClassSVM(**params, kernel="rbf"), X, y, cv=4, scoring=score_function
			)
		)

	def fit(self, X: DataFrame, y: Series):
		"""Fit the simulated annealing search."""
		random.seed(self.random_state)
		np.random.seed(self.random_state)

		# Initialize with random parameters
		current_params = self._sample_params()
		current_score = self._evaluate_params(current_params, X, y)

		self.best_params_ = deepcopy(current_params)
		self.best_score_ = current_score

		temperature = self.initial_temp

		for iteration in range(self.n_iter):
			# Generate neighbor, store the score for cv_results and Accept or reject neighbor
			neighbor_params = self._neighbor_params(current_params)
			neighbor_score = self._evaluate_params(neighbor_params, X, y)
			self.cv_results_["mean_test_score"].append(neighbor_score)

			if neighbor_score > current_score:  # Better solution - always accept
				current_params = neighbor_params
				current_score = neighbor_score
			else:  # Worse solution - accept with probability
				if (
					random.random()
					< np.exp(neighbor_score - current_score / temperature)
					if temperature > 0
					else 0
				):
					current_params = neighbor_params
					current_score = neighbor_score

			if current_score > self.best_score_:  # Update best solution
				self.best_params_ = deepcopy(current_params)
				self.best_score_ = current_score

			# Cool down
			temperature = max(temperature * self.cooling_rate, self.min_temp)

		return self

In [None]:
class GeneticAlgorithmSearch:
	"""
	Custom Genetic Algorithm implementation for hyperparameter optimization.

	Genetic Algorithm Search:
	- Population-based optimization: Maintains diverse parameter sets
	- Tournament selection: Robust parent selection mechanism
	- Uniform crossover: Parameter exchange between parents
	- Adaptive mutation: Random parameter changes with configurable rate
	- Elite preservation: Keeps best solutions across generations
	"""

	def __init__(
		self,
		param_space: dict,
		population_size: int = 20,
		n_generations: int = 10,
		mutation_rate: float = 0.1,
		crossover_rate: float = 0.8,
		elite_size: int = 2,
		random_state: int = 42,
	):
		self.param_space = param_space
		self.population_size = population_size
		self.n_generations = n_generations
		self.mutation_rate = mutation_rate
		self.crossover_rate = crossover_rate
		self.elite_size = elite_size
		self.random_state = random_state
		self.best_params_ = None
		self.best_score_ = -np.inf
		self.cv_results_ = {"mean_test_score": []}

	def _create_individual(self) -> dict:
		"""Create a random individual (parameter set)."""
		return {
			key: values.rvs(random_state=self.random_state)
			if hasattr(values, "rvs")  # scipy distribution
			else random.choice(values)
			for key, values in self.param_space.items()
		}

	def _crossover(self, parent1: dict, parent2: dict) -> tuple[dict, dict]:
		"""Create two offspring from two parents using uniform crossover."""
		child1, child2 = deepcopy(parent1), deepcopy(parent2)

		for key in parent1.keys():
			if random.random() < 0.5:  # Swap parameter values
				child1[key], child2[key] = child2[key], child1[key]

		return child1, child2

	def _mutate(self, individual: dict) -> dict:
		"""Mutate an individual by randomly changing some parameters."""
		mutated = deepcopy(individual)

		for key in individual.keys():
			if random.random() < self.mutation_rate:
				mutated[key] = (
					self.param_space[key].rvs(random_state=self.random_state)
					if hasattr(self.param_space[key], "rvs")  # continuous parameter
					else random.choice(self.param_space[key])  # discrete parameter
				)
		return mutated

	def _tournament_selection(
		self, population: list, fitness_scores: list, tournament_size: int = 3
	) -> dict:
		"""Select an individual using tournament selection."""
		tournament_indices = random.sample(
			range(len(population)), min(tournament_size, len(population))
		)
		return population[
			tournament_indices[
				np.argmax([fitness_scores[i] for i in tournament_indices])
			]
		]

	def _evaluate_params(self, params: dict, X: DataFrame, y: Series) -> float:
		"""Evaluate parameter configuration using cross-validation."""
		return np.mean(
			cross_val_score(
				OneClassSVM(**params, kernel="rbf"), X, y, cv=4, scoring=score_function
			)
		)

	def fit(self, X: DataFrame, y: Series):
		"""Fit the genetic algorithm search."""
		random.seed(self.random_state)
		np.random.seed(self.random_state)
		population = [self._create_individual() for _ in range(self.population_size)]

		for generation in range(self.n_generations):  # Evaluate fitness
			print(f"Evaluating Generation {generation}")
			fitness_scores = []
			for individual in population:
				score = self._evaluate_params(individual, X, y)
				fitness_scores.append(score)
				self.cv_results_["mean_test_score"].append(score)

				if score > self.best_score_:
					self.best_params_ = deepcopy(individual)
					self.best_score_ = score

			# Create next generation | Elite selection - keep best individuals
			new_population = [
				deepcopy(population[idx])
				for idx in np.argsort(fitness_scores)[-self.elite_size :]
			]
			# Generate offspring
			while len(new_population) < self.population_size:
				# Selection
				parent1 = self._tournament_selection(population, fitness_scores)
				parent2 = self._tournament_selection(population, fitness_scores)
				# Crossover
				if random.random() < self.crossover_rate:
					child1, child2 = self._crossover(parent1, parent2)
				else:
					child1, child2 = deepcopy(parent1), deepcopy(parent2)
				# Mutation
				new_population.extend([self._mutate(child1), self._mutate(child2)])
			# Trim to exact population size
			population = new_population[: self.population_size]

		return self

In [None]:
def get_param_grid(
	search_method: Literal[
		"Grid", "Random", "SimulatedAnnealing", "GeneticAlgorithm", "Bayesian"
	],
	use_log_dist: bool = False,
) -> ParamGrid:
	if search_method == "Grid":
		return {
			"nu": [0.01, 0.05, 0.1, 0.25],
			"gamma": ["scale", "auto", 0.001, 0.01, 0.1],
			"tol": [1e-1, 1e-2, 1e-3, 1e-4, 1e-5],
		}
	elif search_method in ["Random", "SimulatedAnnealing", "GeneticAlgorithm"]:
		if use_log_dist:  # Log-uniform for better coverage
			return {
				"nu": loguniform(0.001, 0.3),
				"gamma": loguniform(1e-4, 10),
				"tol": loguniform(1e-6, 1e-1),
			}
		else:
			return {
				"nu": [0.01, 0.025, 0.05, 0.75, 0.1, 0.2, 0.3, 0.4, 0.5],
				"gamma": ["scale", "auto", 0.001, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1],
				"tol": [1e-5, 1e-4, 1e-3, 1e-2, 1e-1],
			}
	elif search_method == "Bayesian":
		return {"nu": (0.01, 0.5), "gamma": (1e-4, 1), "tol": (1e-5, 1e-1)}


def update_train_vars(
	i: int, activities: ndarray
) -> tuple[DataFrame, Series, DataFrame, Series]:
	training = (  # picks the first n samples of each class
		X_train[X_train["activity"].isin(activities[:i])]
		.groupby("activity")
		.head(MIN_SAMPLES)
	)
	testing = X_test[X_test["activity"] == activities[i]].head(MIN_SAMPLES)
	training.loc[:, "isNovelty"], testing.loc[:, "isNovelty"] = False, True
	novelty = concat(
		[testing, training.sample(n=int(0.15 * len(training)), random_state=42)]
	)
	return (
		training.drop(columns=["isNovelty"]),
		training["isNovelty"],
		# only current activity (as novelty)
		novelty.drop(columns=["isNovelty"]),
		novelty["isNovelty"],
	)


def train_search_method(
	training_data: DataFrame,
	train_targets: Series,
	search_type: Literal["Grid", "Random", "SimulatedAnnealing", "GeneticAlgorithm"],
	params: dict[str, list],
	scoring: Callable,
	n_iter: int | None = 100,
	cv: int = 4,
	verbose: int = 1,
	random_state: int = 42,
) -> (
	RandomizedSearchCV
	| GridSearchCV
	| SimulatedAnnealingSearch
	| GeneticAlgorithmSearch
):
	if search_type == "SimulatedAnnealing":
		return SimulatedAnnealingSearch(
			param_space=params,
			n_iter=n_iter or 100,
			initial_temp=1.0,
			cooling_rate=0.95,
			random_state=random_state,
		).fit(training_data, train_targets)

	elif search_type == "GeneticAlgorithm":
		# For GA, i use n_iter as total evaluations = population_size * n_generations
		population_size = min(20, n_iter // 5) if n_iter else 20
		return GeneticAlgorithmSearch(
			param_space=params,
			population_size=population_size,
			n_generations=(n_iter // population_size) if n_iter else 5,
			mutation_rate=0.1,
			crossover_rate=0.8,
			random_state=random_state,
		).fit(training_data, train_targets)

	else:
		# Original implementation for Grid and Random search
		search_cls = RandomizedSearchCV if search_type == "Random" else GridSearchCV
		search_kwargs = {
			f"param_{'distributions' if search_type == 'Random' else 'grid'}": params,
			"estimator": OneClassSVM(kernel="rbf"),
			"scoring": scoring,
			"cv": cv,
			"verbose": verbose,
			"error_score": "raise",
		}
		if search_type == "Random" and n_iter:
			search_kwargs.update({"n_iter": n_iter, "random_state": random_state})
		return search_cls(**search_kwargs).fit(training_data, train_targets)

In [None]:
def compare_search_methods(
	grid_scores: list[float], random_scores: list[float]
) -> dict[str, float]:
	"""
	Statistical comparison of search methods using Wilcoxon signed-rank test.
	"""
	if len(grid_scores) != len(random_scores):
		raise ValueError("Score arrays must have equal length")

	statistic, p_value = wilcoxon(grid_scores, random_scores, alternative="two-sided")

	return {
		"wilcoxon_statistic": statistic,
		"p_value": p_value,
		"grid_mean": np.mean(grid_scores),
		"random_mean": np.mean(random_scores),
		"effect_size": (np.mean(grid_scores) - np.mean(random_scores))
		/ np.std(grid_scores + random_scores),
	}


def update_params_grid(
	cv_results: dict[str, tuple], og_param_grid: ParamGrid
) -> ParamGrid:
	params = ["gamma", "nu", "tol"]
	top_entries = (
		DataFrame(
			zip(
				cv_results["rank_test_score"],
				cv_results["param_gamma"],
				cv_results["param_nu"],
				cv_results["param_tol"],
			),
			columns=["rank_test_score", "gamma", "nu", "tol"],
		)
		.sort_values("rank_test_score")
		.head(NUM_TRIALS)
	)
	# Check if we're stuck with the same parameter space
	if len(top_entries) == NUM_TRIALS:
		print("Detected potential parameter space stagnation, diversifying...")

		current_params = {col: set(top_entries[col]) for col in params}
		unused_params = {
			param: list(set(og_param_grid[param]) - current_params[param])
			for param in params
		}
		# Replace least effective values with unused ones
		for param in params:
			if unused_params[param]:  # If there are unused values available
				current_unique = list(dict.fromkeys(top_entries[param]))
				# Remove the least effective (last) value and add an unused one
				if len(current_unique) > 1 and unused_params[param]:
					current_unique = current_unique[:-1] + [unused_params[param][0]]
				elif unused_params[param]:
					# If only one value, replace it partially
					current_unique.append(unused_params[param][0])

				top_entries.loc[  # Update the parameter list
					top_entries[param] == list(dict.fromkeys(top_entries[param]))[-1],
					param,
				] = unused_params[param][0]

	exmp = {col: list(dict.fromkeys(top_entries[col])) for col in params}
	cartesian_size = len(exmp["gamma"]) * len(exmp["nu"]) * len(exmp["tol"])

	if cartesian_size > NUM_TRIALS:
		while cartesian_size > NUM_TRIALS:
			# Find which parameter to reduce (try removing last value from each)
			best_reduction = None
			best_param = None

			for param in params:
				if len(exmp[param]) > 1:  # Only reduce if more than 1 value remains
					# Calculate new cartesian size if we remove last value from this param
					temp_sizes = [
						len(exmp[p]) if p != param else len(exmp[p]) - 1 for p in params
					]
					new_size = temp_sizes[0] * temp_sizes[1] * temp_sizes[2]
					# Check if this gets us closer to NUM_TRIALS without going under
					if new_size >= NUM_TRIALS and (
						best_reduction is None or new_size < best_reduction
					):
						best_reduction = new_size
						best_param = param

			# If no good reduction found, just remove from the param with most values
			if best_param is None:
				param_lengths = [(param, len(exmp[param])) for param in params]
				param_lengths.sort(key=lambda x: x[1], reverse=True)
				best_param = param_lengths[0][0]

			# Remove the last (least effective) value from the chosen parameter
			if len(exmp[best_param]) > 1:
				exmp[best_param] = exmp[best_param][:-1]

			cartesian_size = len(exmp["gamma"]) * len(exmp["nu"]) * len(exmp["tol"])
			# Safety break to avoid infinite loop
			if all(len(exmp[param]) == 1 for param in params):
				break

	assert cartesian_size == NUM_TRIALS, "reducing the params space failed"

	print(f"dict of len {cartesian_size} :", exmp)
	return exmp


def eval_search_method(
	activities: ndarray,
	search_name: Literal["Grid", "Random", "SimulatedAnnealing", "GeneticAlgorithm"],
	use_log_dist: bool = False,
) -> SearchResult:
	dist = get_param_grid(search_name, use_log_dist)
	MAXIMAZED = False
	BEST_SCORES = None

	global testing_data, test_targets, logger
	start_time = time()

	for i in range(1, len(activities)):
		training_data, train_targets, test_data, testing_targets = update_train_vars(
			i, activities
		)
		testing_data = test_data
		test_targets = testing_targets
		print(f"Training for activities {activities[:i]}")

		if not MAXIMAZED:
			search_method = train_search_method(
				training_data=training_data,
				train_targets=train_targets,
				search_type=search_name,
				params=dist,
				scoring=score_function,
			)
			BEST_SCORES = search_method
			MAXIMAZED = True
		else:
			print(f"Already maximized, suggesting new {NUM_TRIALS} points")
			# For metaheuristics, we don't update param grid like GridSearch
			if search_name in ["SimulatedAnnealing", "GeneticAlgorithm"]:
				search_method = train_search_method(
					training_data=training_data,
					train_targets=train_targets,
					search_type=search_name,
					params=dist,
					scoring=score_function,
					n_iter=NUM_TRIALS,
				)
			else:
				search_method = train_search_method(
					training_data=training_data,
					train_targets=train_targets,
					search_type=search_name,
					params=update_params_grid(search_method.cv_results_, dist)
					if search_name == "Grid"
					else dist,
					scoring=score_function,
					n_iter=NUM_TRIALS if search_name == "Random" else None,
				)

			if search_method.best_score_ > BEST_SCORES.best_score_:
				BEST_SCORES = search_method

		print(f"{search_name} Search Best Params:", search_method.best_params_)

	return SearchResult(
		method=search_name + "_search",
		best_params=BEST_SCORES.best_params_,
		best_score=BEST_SCORES.best_score_,
		cv_scores=BEST_SCORES.cv_results_["mean_test_score"].tolist()
		if hasattr(BEST_SCORES.cv_results_["mean_test_score"], "tolist")
		else BEST_SCORES.cv_results_["mean_test_score"],
		fit_time=time() - start_time,
		n_evaluations=len(BEST_SCORES.cv_results_["mean_test_score"]),
	)

In [None]:
activities = X_train["activity"].unique()

logger = configure_file_logger("../reports/logs_grid.log")
grid_result = eval_search_method(activities, "Grid")

logger = configure_file_logger("../reports/logs_rand.log")
rand_result = eval_search_method(activities, "Random")

logger = configure_file_logger("../reports/logs_rand_log.log")
rand_log_result = eval_search_method(activities, "Random", True)

# New metaheuristic searches
logger = configure_file_logger("../reports/logs_sian.log")
sa_result = eval_search_method(activities, "SimulatedAnnealing", True)

logger = configure_file_logger("../reports/logs_geal.log")
ga_result = eval_search_method(activities, "GeneticAlgorithm", True)

In [None]:
with open("../conf/test_results.json", "w") as file:
	file.write(
		dumps(
			[
				{
					"test": "Grid x Rand",
					**compare_search_methods(
						grid_result.cv_scores, rand_result.cv_scores
					),
				},
				{
					"test": "Grid x SA",
					**compare_search_methods(
						grid_result.cv_scores, sa_result.cv_scores
					),
				},
				{
					"test": "Grid x GA",
					**compare_search_methods(
						grid_result.cv_scores, ga_result.cv_scores
					),
				},
				{
					"test": "Rand x SA",
					**compare_search_methods(
						rand_result.cv_scores, sa_result.cv_scores
					),
				},
				{
					"test": "Rand x GA",
					**compare_search_methods(
						rand_result.cv_scores, ga_result.cv_scores
					),
				},
				{
					"test": "SA x GA",
					**compare_search_methods(sa_result.cv_scores, ga_result.cv_scores),
				},
			]
		)
	)