<a href="https://colab.research.google.com/github/Witcape/PSO/blob/main/QPSO_LLM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import random
import re
import matplotlib.pyplot as plt
from huggingface_hub import hf_hub_download
from llama_cpp import Llama
from typing import List, Tuple

# Define the Sphere Function
def sphere_function(x: np.ndarray) -> float:
    """Sphere function: f(x) = sum(x_i^2)"""
    return np.sum(x ** 2)

# Initialize the LLM
def initialize_llm(model_name_or_path: str, model_basename: str, n_threads: int = 2,
                  n_batch: int = 512, n_gpu_layers: int = 32) -> Llama:
    """
    Initialize the LLM using llama_cpp.

    :param model_name_or_path: Repository ID on Hugging Face Hub.
    :param model_basename: Model filename.
    :param n_threads: Number of CPU threads.
    :param n_batch: Batch size.
    :param n_gpu_layers: Number of GPU layers to offload.
    :return: Initialized Llama model.
    """
    model_path = hf_hub_download(repo_id=model_name_or_path, filename=model_basename)
    lcpp_llm = Llama(
        model_path=model_path,
        n_threads=n_threads,
        n_batch=n_batch,
        n_gpu_layers=n_gpu_layers
    )
    return lcpp_llm

# Function to Find New Weight Using LLM
def find_weight(llm: Llama, w_values: List[float], g_values: List[float]) -> float:
    """
    Use the LLM to suggest a new weight based on recent weights and cost values.

    :param llm: Initialized Llama model.
    :param w_values: List of recent weight values.
    :param g_values: List of recent cost values.
    :return: Suggested new weight.
    """
    prompt = (f"Update the weight to minimize the cost function. Consider the last five weights: {w_values[-5:]} "
              f"and the last five cost values: {g_values[-5:]}. Compute the next weight aiming to minimize "
              f"the cost function. Respond only with the updated weight in decimal form—no additional text or explanations.")

    w_finder_template = f'''SYSTEM: You are a helpful, respectful, and honest assistant. Always answer as helpfully.

USER: {prompt}

ASSISTANT:
    '''
    response = llm(prompt=w_finder_template, max_tokens=256, temperature=0.5, top_p=0.95,
                  repeat_penalty=1.2, top_k=150, echo=True)

    # Extract decimal weight from response text
    text_response = response['choices'][0]['text']
    value = re.findall(r'-?\d+\.\d+', text_response)

    # Fallback in case LLM does not return a number
    new_weight = float(value[-1]) if value else w_values[-1]
    print(f"[DEBUG] New weight from LLM: {new_weight}")
    return new_weight

# Define Particle Class
class Particle:
    def __init__(self, bounds: List[Tuple[float, float]]):
        self._x = np.zeros(len(bounds))
        for idx, (lo, hi) in enumerate(bounds):
            self._x[idx] = random.uniform(lo, hi)
        self._best = self._x.copy()
        self._best_value = np.NaN

    @property
    def best(self) -> np.ndarray:
        return self._best

    def set_best(self, x: np.ndarray):
        self._best[:] = x

    @property
    def best_value(self) -> float:
        return self._best_value

    def set_best_value(self, v: float):
        self._best_value = v

    def __getitem__(self, key):
        return self._x[key]

    def __setitem__(self, key, val):
        self._x[key] = val

# Define Swarm Class
class Swarm:
    def __init__(self, size: int, dim: int, bounds: List[Tuple[float, float]]):
        self._particles = [Particle(bounds) for _ in range(size)]
        self._dim = dim
        self._gbest_value = None
        self._gbest = None

    def size(self) -> int:
        return len(self._particles)

    def particles(self) -> List[Particle]:
        return self._particles

    def mean_best(self) -> np.ndarray:
        x = np.zeros(self._dim)
        for p in self._particles:
            x += p.best
        return x / self.size()

    @property
    def gbest(self) -> np.ndarray:
        return self._gbest

    @property
    def gbest_value(self) -> float:
        return self._gbest_value

    def update_gbest(self):
        pg = min(self._particles, key=lambda p: p.best_value)
        if self._gbest_value is None or pg.best_value < self._gbest_value:
            self._gbest = pg.best.copy()
            self._gbest_value = pg.best_value

# Define QPSO Class
class QPSO(Swarm):
    def __init__(self, cf, size: int, dim: int, bounds: List[Tuple[float, float]],
                 maxIters: int, llm: Llama):
        super(QPSO, self).__init__(size, dim, bounds)
        self._cf = cf  # Cost function
        self._maxIters = maxIters
        self._iters = 0
        self.llm = llm  # LLM for cost evaluations
        self.init_eval()

    def llm_cost_function(self, particle_position: np.ndarray) -> float:
        input_text = f"Evaluate cost for position: {particle_position.tolist()}"
        print("[DEBUG] Input to LLM:", input_text)  # Debugging: print the input text

        response = self.llm(prompt=input_text, max_tokens=256, temperature=0.5, top_p=0.95,
                           repeat_penalty=1.2, top_k=150, echo=True)  # Call the LLM

        # Print the raw response for debugging
        print("[DEBUG] LLM Response:", response)  # Debugging: print the raw response

        # Assuming response is a dictionary with a 'choices' key
        if isinstance(response, dict) and 'choices' in response:
            text_response = response['choices'][0]['text']  # Extract the text response
            value = re.findall(r'-?\d+\.\d+', text_response)  # Use regex to find floating-point numbers

            if value:
                return float(value[-1])  # Return the last found float value
            else:
                print("[WARNING] No valid cost value found in the LLM response. Using default cost.")
                return self._cf(particle_position)  # Fallback to actual cost function
        else:
            print("[WARNING] LLM response is not in the expected format. Using default cost.")
            return self._cf(particle_position)  # Fallback to actual cost function

    def init_eval(self):
        for p in self._particles:
            cost_value = self.llm_cost_function(p[:])
            p.set_best_value(cost_value)
        self.update_gbest()

    def update_best(self):
        for p in self._particles:
            cost_value = self.llm_cost_function(p[:])
            if np.isnan(p.best_value) or cost_value < p.best_value:
                p.set_best(p[:])
                p.set_best_value(cost_value)
        self.update_gbest()

    def kernel_update(self, **kwargs):
        pass  # Placeholder for the QPSO-specific kernel update logic

    def update(self, callback=None, interval=None):
        while self._iters < self._maxIters:
            self.kernel_update()
            self.update_best()
            if callback and (self._iters % interval == 0):
                callback(self)
            self._iters += 1

    @property
    def iters(self) -> int:
        return self._iters

    @property
    def maxIters(self) -> int:
        return self._maxIters

# Define QDPSO Class
class QDPSO(QPSO):
    def __init__(self, cf, size: int, dim: int, bounds: List[Tuple[float, float]],
                 maxIters: int, g: float, llm: Llama):
        super(QDPSO, self).__init__(cf, size, dim, bounds, maxIters, llm)
        self._g = g  # Quantum parameter
        self._weights = [0.5]  # Starting weight for demonstration
        self._costs = []

    def kernel_update(self, **kwargs):
        # Adjust weight using LLM-guided find_weight function if enough cost history is available
        if len(self._costs) >= 5 and len(self._weights) >= 5:
            new_weight = find_weight(self.llm, self._weights, self._costs)
            self._weights.append(new_weight)
        elif len(self._weights) < 5:
            new_weight = self._weights[-1]
        else:
            new_weight = self._weights[-1]

        # Print the current weight for monitoring
        print(f"[DEBUG] Iteration {self._iters + 1}: Current weight = {new_weight:.4f}")

        # Particle update logic using the new weight
        for p in self._particles:
            for i in range(self._dim):
                u1 = random.uniform(0., 1.)
                u2 = random.uniform(0., 1.)
                u3 = random.uniform(0., 1.)
                rand_sign = 1 if random.random() > 0.5 else -1
                c = (u1 * p.best[i] + u2 * self.gbest[i]) / (u1 + u2)
                L = (1 / self._g) * abs(p[i] - c)
                p[i] = c + rand_sign * L * np.log(1. / u3)

        # Record the current best cost for reference in weight adjustment
        self._costs.append(self.gbest_value)
        print(f"[INFO] Best solution cost at Iteration {self._iters + 1}: {self.gbest_value:.4f}\n")

    def update_best(self):
        for p in self._particles:
            cost_value = self.llm_cost_function(p[:])
            if np.isnan(p.best_value) or cost_value < p.best_value:
                p.set_best(p[:])
                p.set_best_value(cost_value)
        self.update_gbest()

    def update(self, callback=None, interval=None):
        while self._iters < self._maxIters:
            print(f"\n[INFO] Starting Iteration {self._iters + 1}...")
            self.kernel_update()  # Update particles based on kernel logic
            self.update_best()  # Re-evaluate particle bests and global best
            if callback and (self._iters % interval == 0):
                callback(self)
            self._iters += 1

# Function to Run QDPSO
def run_qdpso(cf, size: int, dim: int, bounds: List[Tuple[float, float]],
             maxIters: int, g: float, llm: Llama,
             callback=None, interval: int = 10):
    """
    Run the QDPSO algorithm.

    :param cf: Cost function.
    :param size: Number of particles in the swarm.
    :param dim: Dimensionality of the problem space.
    :param bounds: Bounds for each dimension (list of (low, high) tuples).
    :param maxIters: Maximum number of iterations.
    :param g: Quantum parameter for QDPSO.
    :param llm: Initialized LLM for weight adjustments.
    :param callback: Optional callback function for custom monitoring.
    :param interval: Interval for callback invocation.
    :return: Best solution and its cost found by QDPSO, along with weight and cost histories.
    """
    qdpso = QDPSO(cf, size, dim, bounds, maxIters, g, llm)
    qdpso.update(callback=callback, interval=interval)
    return qdpso.gbest, qdpso.gbest_value, qdpso._weights, qdpso._costs

In [None]:
# import os
# import numpy as np
# from scipy.io import loadmat
# import matplotlib.pyplot as plt
# from huggingface_hub import hf_hub_download
# from llama_cpp import Llama
# import numpy as np
# import re
# import matplotlib.pyplot as plt
# from pypop7.optimizers.core.optimizer import Optimizer
# from pypop7.optimizers.pso.pso import PSO
# from scipy.optimize import differential_evolution

In [None]:
# !CMAKE_ARGS="-DLLAMA_CUBLAS=on" FORCE_CMAKE=1 pip install llama-cpp-python==0.1.78 numpy==1.23.4 --force-reinstall --upgrade --no-cache-dir --verbose
# !pip install huggingface_hub
# !pip install llama-cpp-python==0.1.78
# !pip install numpy==1.23.4

# model_name_or_path = "TheBloke/Llama-2-13B-chat-GGML"
# model_basename = "llama-2-13b-chat.ggmlv3.q5_1.bin"

In [None]:
# model_name_or_path = "TheBloke/Llama-2-13B-chat-GGML"  # Replace with your model repo
# model_basename = "llama-2-13b-chat.ggmlv3.q5_1.bin"    # Replace with your model filename
# llm = initialize_llm(model_name_or_path, model_basename, n_threads=2, n_batch=512, n_gpu_layers=32)

In [None]:
# size = 20  # Number of particles
# dim = 2    # Dimensions of the problem
# bounds = [(-5.0, 5.0) for _ in range(dim)]  # Bounds for each dimension
# maxIters = 20  # Maximum number of iterations
# g = 1.0         # Quantum parameter

In [None]:
# best_solution, best_cost, weight_history, cost_history = run_qdpso(
#         cf=sphere_function,
#         size=size,
#         dim=dim,
#         bounds=bounds,
#         maxIters=maxIters,
#         g=g,
#         llm=llm,
#         callback=None,
#         interval=10
#     )

# print(f"Best Solution: {best_solution}, Best Cost: {best_cost}")

In [None]:
# plt.figure(figsize=(12, 5))

#     # Plotting Cost History
#     plt.subplot(1, 2, 1)
#     plt.plot(cost_history, label="Cost", color='blue')
#     plt.title("Cost History")
#     plt.xlabel("Iterations")
#     plt.ylabel("Cost")
#     plt.legend()

#     # Plotting Weight History
#     plt.subplot(1, 2, 2)
#     plt.plot(weight_history, label="Weight (Quantum Parameter)", color='orange')
#     plt.title("Weight History")
#     plt.xlabel("Iterations")
#     plt.ylabel("Weight")
#     plt.legend()

#     # Show the plots
#     plt.tight_layout()
#     plt.show()