## FireFly Algorithm for Hyper-Paramerter Tuning

In [53]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, TensorDataset

import re
import matplotlib.pyplot as plt

import numpy as np
import os
import random

from dataclasses import dataclass
from typing import Union

In [130]:
@dataclass
class FireFlyConfig:

    pop_size: int
    alpha: float
    beta0: float
    gamma: float
    max_iters: int
    seed: int = None
    
    @staticmethod
    def get_defaults():
        return FireFlyConfig(pop_size=20, alpha=1.0, beta0=1.0, gamma=0.01, max_iters=100, seed=None)

    def to_dict(self,):
        return dict(pop_size=self.pop_size, alpha=self.alpha, beta0=self.beta0, gamma=self.gamma, max_iters=self.max_iters)

In [131]:
np.array(list(filter(lambda x : x%2, [1, 2, 3, 4, 5])))

array([1, 3, 5])

In [132]:
@dataclass
class FireFlyParameterBounder:
    bounds: list[tuple[Union[float, int]]]

    @staticmethod
    def get_defaults(dim=3):
        return FireFlyParameterBounder(bounds=[(-5, 5) for _ in range(dim)])

    def clip(self, value: Union[float, int] , lb, ub):
        if lb > value: return lb
        elif value > ub:
            return ub
        return value

    def apply(self, input: np.ndarray):
        lb = np.array(tuple(map(lambda item: item[0], self.bounds)))
        ub = np.array(tuple(map(lambda item: item[1], self.bounds)))

        return np.vectorize(self.clip)(input, lb, ub)

In [133]:
FireFlyParameterBounder.get_defaults()

FireFlyParameterBounder(bounds=[(-5, 5), (-5, 5), (-5, 5)])

In [134]:
FireFlyParameterBounder(bounds=[(10, 2), (3, 4), (5, 6)]).apply(np.array([-1, 2, 3]))

array([10,  3,  5])

In [135]:
class FireFlyBase:

    config: FireFlyConfig
    bounder: FireFlyParameterBounder
    
    def __init__(
        self,
        config: FireFlyConfig  = FireFlyConfig.get_defaults(),
        bounder: FireFlyParameterBounder = FireFlyParameterBounder.get_defaults()
    ) -> None:
        
        assert isinstance(config, FireFlyConfig), "the 'config' param must be an instance of 'FireFlyConfig'."
        assert isinstance(bounder, FireFlyParameterBounder), "the 'config' param must be an instance of 'FireFlyParameterBounder'."
        self.config = config
        self.bounder = bounder

    def gen_fireflies(self, dim: int = 3):
        return np.random.rand(self.config.pop_size, dim)

    def get_intensity(self, func, fireflies: np.ndarray):
        return np.apply_along_axis(func, 1, fireflies)

    def get_distance(self, fi, fj):
        return np.sum(np.square(fi - fj), axis=-1)

    def compute_beta(self, r: float):
        return self.config.beta0 * np.exp(-self.config.gamma * r)

    def update_ffi(self, fi, fj, beta, steps):
        return beta * (fj - fi) + steps

In [136]:
class FireFlyOptimizer(FireFlyBase):
    
    def __init__(
        self,
        config: FireFlyConfig  = FireFlyConfig.get_defaults(),
        bounder: FireFlyParameterBounder = FireFlyParameterBounder.get_defaults()
    ) -> None:
        super(FireFlyOptimizer, self).__init__(config=config, bounder=bounder)
    
    def run(self, func, dim):
        fireflies = self.gen_fireflies(dim=dim)
        intensity = self.get_intensity(func=func, fireflies=fireflies)
        
        self.best_intensity = np.min(intensity)
        self.best_pos = self.bounder.apply(fireflies[np.argmin(intensity)])
        
        iter = self.config.pop_size
        new_alpha = self.config.alpha

        diff = np.apply_along_axis(lambda item: item[1] - item[0],1, np.array([item for item in self.bounder.bounds]))
        
        for iter in range(self.config.max_iters):
            new_alpha *= 0.97
            
            for i in range(self.config.pop_size):
                
                for j in range(self.config.pop_size):
                
                    if intensity[i] > intensity[j] and not np.isnan(intensity[j]): 
                    
                        r = self.get_distance(fireflies[i], fireflies[j])
                        beta = self.compute_beta(r=r)
                        
                        steps = new_alpha * (np.random.rand(dim) - 0.5) * diff
                        
                        fireflies[i] += self.update_ffi(fireflies[j], fireflies[i], beta=beta, steps=steps)
                        fireflies[i] = self.bounder.apply(fireflies[i])
                        intensity[i] = func(fireflies[i])
                        
                        if not np.isnan(intensity[i]) and intensity[i] < self.best_intensity: 
                            self.best_pos = self.bounder.apply(fireflies[i].copy())
                            self.best_intensity = func(self.best_pos)

In [137]:
bounder = FireFlyParameterBounder(bounds=[(-0.2, 0.2), (-1., 2.)])
config = FireFlyConfig.get_defaults()
config.max_iters = 1000

In [138]:
FA = FireFlyOptimizer(config=config, bounder=bounder)

In [139]:
def f(x):
    return x[0]**2 + x[1]**2

In [140]:
FA.run(func=f, dim=2)

In [141]:
FA.best_pos

array([-0.05292036, -0.17681104])

In [142]:
FA.best_intensity

0.03406270865915567

In [143]:
sentences = [
    "The king is a man.",
    "The queen is a woman.",
    "The queen is the wife of the king.",
    "A man can be a king.",
    "A woman can be a queen.",
    "A king is not a queen, and a queen is not a king.",
    "The prince is the son of the king and queen.",
    "The princess is the daughter of the king and queen.",
    "A prince can become a king.",
    "A princess can become a queen.",
    "The castle is the home of the king and queen.",
    "The kingdom is ruled by the king.",
    "The throne is where the king sits.",
    "The crown is worn by the king and queen.",
    "The knight serves the king.",
    "The kingdom has many subjects.",
    "The king leads his army.",
    "The queen attends royal events.",
    "The prince trains to become a knight.",
    "The princess learns to rule the kingdom.",
    "The king and queen host a grand ball.",
    "The royal family lives in the palace.",
    "The subjects are loyal to the king and queen.",
    "The kingdom is prosperous under the king's rule.",
    "The king commands respect from everyone.",
    "The queen is known for her wisdom.",
    "The prince dreams of adventure.",
    "The princess is admired for her beauty.",
    "The king decrees a new law.",
    "The queen advises the king on important matters."
]

In [144]:
text = " ".join(sentences).lower()

In [145]:
vocab = {word: i for i, word in enumerate(set(text.split()))}
vocab_size = len(vocab)

In [146]:
vocab_size

83

In [147]:
def get_context_target(w: int):
    context = []
    target = []
    words = text.split()
    for i in range(1, len(words) - w):
        target.append(vocab[words[i]])
        context.append([vocab[words[i-w]], vocab[words[i+w]]])

    return context, target

In [148]:
context, target = get_context_target(5)

In [149]:
# class TextDataSet(Dataset):

#     def __init__(self, context, target):

#         self.X: torch.Tensor = torch.tensor(context)
#         self.y: torch.Tensor = torch.tensor(target)

#     def __len__(self):
#         return len(self.X)

#     def __getitem__(self, index: int) -> tuple[torch.Tensor, torch.Tensor]:
#         return self.X[index], self.y[index]

In [150]:
dataset = TextDataSet()

In [151]:
dataLoader = DataLoader(dataset, batch_size=1, shuffle=True)

In [152]:
class SumReshapeTransform(nn.Module):

    def __init__(self,) -> None:
        super(SumReshapeTransform, self).__init__()

    def forward(self, input: torch.Tensor):
        return input.sum(dim=1).reshape(1, -1)

In [183]:
def train(params):

    lr = params[0]
    
    emb_size = int(params[1])

    beta1 = params[2]
    
    beta2 = params[3]

    w = int(params[4])

    context, target = get_context_target(w)
    
    dataset = TensorDataset(torch.tensor(context), torch.tensor(target))

    data_loader = DataLoader(dataset, batch_size=1)

    model = nn.Sequential(
        nn.Embedding(vocab_size, emb_size),
        SumReshapeTransform(),
        nn.Linear(emb_size, vocab_size),
    )

    criterion = nn.CrossEntropyLoss()
    
    opt = optim.Adam(model.parameters(), lr=lr, betas=(beta1, beta2))

    total_loss = 0
    
    for context, target in data_loader:
    
        opt.zero_grad()
        
        pred = model(context)
    
        target_vec = torch.zeros(1, vocab_size)
    
        target_vec[0][int(target)] = 1
        
        loss = criterion(pred, target_vec)
    
        loss.backward()
            
        opt.step()
    
        total_loss += loss.item()
        
    return total_loss

In [206]:
bounder = FireFlyParameterBounder(bounds=[(0.0001, 0.1), (5, 20), (0.5, 0.999), (0.5, 0.999), (1, 5)])
config = FireFlyConfig.get_defaults()
config.max_iters = 5
config.pop_size = 10
config.gamma = 0.04

In [207]:
config

FireFlyConfig(pop_size=10, alpha=1.0, beta0=1.0, gamma=0.04, max_iters=5, seed=None)

In [208]:
FA = FireFlyOptimizer(config=config, bounder=bounder)

In [209]:
res = []

In [211]:
for _ in range(2):
    FA.run(train, dim=5)
    res.append(FA.best_pos)

In [212]:
FA.best_intensity

879.7484115064144

In [213]:
mean = np.array(res).mean(axis=0)

In [221]:
mean

array([ 0.02229864, 11.40763862,  0.54742701,  0.62933412,  1.73581711])

In [222]:
train(mean)

871.9510925412178

In [223]:
train([0.01, 5.55170906, 0.5, 0.99    , 10.13743302])

889.7145767211914

In [227]:
not np.nan == True

True