In [51]:
from math import cos, pi, sin, sqrt
from pathlib import Path
from typing import Tuple, Type
import os
import time

import numpy as np
from numba import jit, njit
import numba as nb
from numba.typed import Dict, List
from sklearn.cluster import KMeans

from pyMSOO.utils.EA import AbstractTask, Individual
from pyMSOO.MFEA.benchmark.continous.funcs import AbstractFunc
from pyMSOO.MFEA.benchmark.continous.utils import Individual_func


class MultiArmBenchmark():
    def __init__(self, 
                 num_task = 10,
                 dim_map = 2, 
                 dim = 10,
                 samples = 30000, 
                 cvt_use_cache=True,
                 save_dir = './Data') -> None:
        """This class create a new set of tasks for the robot arm problem

        Args:
            num_task (int, optional): Number of task. Defaults to 10.
            dim_map (int, optional): The dimension of the problem. Defaults to 2.
            dim (int, optional): The dimension of each individual. Defaults to 10.
            samples (int, optional): Number of random samples. Defaults to 30000.
            cvt_use_cache (bool, optional): Generate new datasets or not. Defaults to True.
            save_dir (str, optional): Save directory. Defaults to './Data'.
        """

        self.num_task = num_task
        self.dim_map = dim_map
        self.dim = dim
        self.samples = samples
        self.save_dir = save_dir
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)
            print("Directory created:", save_dir)
        else:
            print("Directory already exists:", save_dir)
            
        self.tasks = self.cvt()
        self.cvt_use_cache = cvt_use_cache

    def __centroids_filename(self, k, dim):
        return 'centroids_' + str(k) + '_' + str(dim) + '.dat'

    def __write_centroids(self, centroids):
        k = centroids.shape[0]
        dim = centroids.shape[1]
        filename = os.path.join(self.save_dir, self.__centroids_filename(k, dim))
        with open(filename, 'w') as f:
            for p in centroids:
                for item in p:
                    f.write(str(item) + ' ')
                f.write('\n')

    def cvt(self):
        # check if we have cached values
        cvt_use_cache = self.cvt_use_cache
        fname = os.path.join(self.save_dir, self.__centroids_filename(self.num_task, self.dim_map))
        
        if cvt_use_cache:
            if Path(fname).is_file():
                print("WARNING: using cached CVT:", fname)
                return np.loadtxt(fname)
        # otherwise, compute cvt
        print("Computing CVT (this can take a while...):", fname)

        x = np.random.rand(self.samples, self.dim_map)
        k_means = KMeans(init='k-means++', n_clusters=self.num_task,
                        n_init=1, verbose=1)#,algorithm="full")
        k_means.fit(x)
        self.__write_centroids(k_means.cluster_centers_)

        return k_means.cluster_centers_

    def getMultiTask(self)-> Tuple[List[AbstractFunc], Type[Individual_func]]:
        tasks = [
            Arm(self.dim, task[0], task[1], (0, 1)) for task in self.tasks
        ]
        return tasks, Individual_func


class Arm(AbstractTask):
    def __init__(self, dim: int, alpha_max: float, d: int, bound: tuple = None):
        """Robot arm problem

        Args:
            dim (int): The dimension of individual
            alpha_max (float): The sum of maximum angle
            d (int): The sum of arm length
            bound (tuple, optional): Bound of gene. Defaults to None.
        """

        self.dim = dim
        self.bound = bound
        self.angular_range = alpha_max/dim
        self.d = d
        self.lengths = np.ones(dim) * d / dim
        self.lengths = np.concatenate(([0], self.lengths))

    def fw_kinematics(self, p):
        # ONLY USE THIS FUNCTION FOR TESTING PURPOSE
        assert(len(p) == self.dim)
        p = np.append(p, 0)
        return self.__class__._func(self.dim, self.lengths, p)

    # @staticmethod
    @njit
    def decode(x, angular_range):
        x_decode = (x - 0.5) * angular_range * pi * 2
        return x_decode
    
    @staticmethod
    def _convert(x):
        if isinstance(x, np.ndarray):
            return x

        if isinstance(x, Individual):
            return x.genes

        raise ValueError(
            "Wrong value type for input argument, expected 'List', 'np.ndarray' or 'Individual' but got {}".format(type(x)))

    def __call__(self, x):
        x = self.__class__._convert(x)
        x = self.__class__.decode(x, self.angular_range)
        return self.func(x)
    
    def func(self, p: List):
        assert(len(p) == self.dim)
        p = np.append(p, 0)
        y = self.__class__._func(self.dim, self.lengths, p)
        target = 0.5 * np.ones(2)
        f = np.linalg.norm(y - target)
        return f
    
    # @staticmethod
    @njit
    def _func(dim, lengths, p):
        def dot_py(A,B):
            m, n = A.shape
            p = B.shape[1]

            C = np.zeros((m,p))

            for i in range(0,m):
                for j in range(0,p):
                    for k in range(0,n):
                        C[i,j] += A[i,k]*B[k,j] 
            return C
        
        mat = np.identity(4)
        for i in range(0, dim + 1):   
            m = np.array([cos(p[i]), -sin(p[i]), 0, lengths[i], \
                         sin(p[i]),  cos(p[i]), 0, 0, \
                         0, 0, 1, 0, 0, 0, 0, 1])
            m = m.reshape((4, 4))
            mat = dot_py(mat, m)
            v = dot_py(mat, np.array([0, 0, 0, 1]).reshape((4, 1)))
            # mat = np.dot(mat, m)
            # v = np.dot(mat, np.array([0.0, 0.0, 0.0, 1.0]).reshape((4, 1)))
        return [v[0][0], v[0][1]]

In [88]:
tasks, IndClass = MultiArmBenchmark(num_task=10, save_dir="Data", cvt_use_cache=False).getMultiTask()

Directory already exists: Data
Computing CVT (this can take a while...): Data\centroids_10_2.dat
Initialization complete
Iteration 0, inertia 629.2010650444147.
Iteration 1, inertia 548.7360350372272.
Iteration 2, inertia 535.5912964289892.
Iteration 3, inertia 529.6463825700004.
Iteration 4, inertia 525.6610337758802.
Iteration 5, inertia 522.7768368562486.
Iteration 6, inertia 520.4672127225815.
Iteration 7, inertia 518.6352043445901.
Iteration 8, inertia 517.1205801011884.
Iteration 9, inertia 515.8202525469528.
Iteration 10, inertia 514.8411256441241.
Iteration 11, inertia 514.0823479491811.
Iteration 12, inertia 513.5190761220612.
Iteration 13, inertia 513.110949993557.
Iteration 14, inertia 512.7600044161222.
Iteration 15, inertia 512.4912541282258.
Iteration 16, inertia 512.2489145799607.
Iteration 17, inertia 512.0342048690444.
Iteration 18, inertia 511.8585831820556.
Iteration 19, inertia 511.7030033943493.
Iteration 20, inertia 511.5302364140226.
Iteration 21, inertia 511.389

In [89]:
a = [0.4] * 10

In [90]:
tasks[0](np.array(a))

0.746217839345604

In [91]:
# 1-DOFs
a = Arm(dim = 1, alpha_max=1, d=1, bound=(0, 1))
v = a.fw_kinematics(np.array([0]))
np.testing.assert_almost_equal(v, [1, 0])
v = a.fw_kinematics(np.array([pi/2]))
np.testing.assert_almost_equal(v, [0, 1])

# 2-DOFs
a = Arm(dim = 2, alpha_max=1, d=2, bound=(0, 1))
v = a.fw_kinematics([0, 0])
np.testing.assert_almost_equal(v, [2, 0])
v = a.fw_kinematics([pi/2, 0])
np.testing.assert_almost_equal(v, [0, 2])
v = a.fw_kinematics([pi/2, pi/2])
np.testing.assert_almost_equal(v, [-1, 1])
v = a.fw_kinematics([pi/4, -pi/2])
np.testing.assert_almost_equal(v, [sqrt(2), 0])

# a 4-DOF square
a = Arm(dim = 4, alpha_max=1, d=4, bound=(0, 1))
v = a.fw_kinematics([3*pi/4, pi/2, pi/2, pi/2])
np.testing.assert_almost_equal(v, [0, 0])

In [1]:
from pyMSOO.MFEA.model import MFEA_base
from pyMSOO.utils.Crossover import *
from pyMSOO.utils.Mutation import *
from pyMSOO.utils.Selection import *
from pyMSOO.MFEA.benchmark.continous import *
from pyMSOO.utils.MultiRun.RunMultiTime import * 

In [4]:
from pyMSOO.MFEA.benchmark.continous import MultiArmBenchmark

tasks, IndClass = MultiArmBenchmark(num_task=10, save_dir="Data", cvt_use_cache=True).getMultiTask()

Directory already exists: Data


In [5]:
baseModel = MFEA_base.model()
baseModel.compile(
    IndClass= IndClass,
    tasks= tasks,
    # crossover = KL_SBXCrossover(nc= 2, k= 100, conf_thres= 1),
    crossover= SBX_Crossover(nc = 2),
    mutation= PolynomialMutation(nm = 5),
    selection= ElitismSelection()
)
solve = baseModel.fit(
    nb_generations = 1000, rmp = 0.3, nb_inds_each_task= 100, 
    bound_pop= [0, 1], evaluate_initial_skillFactor= True
)

END!


In [6]:
x = baseModel.history_cost[-1]

In [7]:
from pyMSOO.MFEA.model import SM_MFEA
from pyMSOO.utils.Crossover import *
from pyMSOO.utils.Mutation import *
from pyMSOO.utils.Selection import *
from pyMSOO.utils.Search.DifferentialEvolution import *
from pyMSOO.MFEA.benchmark.continous import *
from pyMSOO.utils.MultiRun.RunMultiTime import * 

baseModel = SM_MFEA.model()
baseModel.compile(
    IndClass= IndClass,
    tasks= tasks,
    crossover = DaS_SBX_Crossover(nc= 2, eta= 0.001, conf_thres= 1),
    # crossover = SBX_Crossover(nc = 2),
    mutation = PolynomialMutation(nm = 5, pm= 1),
    selection= ElitismSelection(random_percent= 0.1),
    search= L_SHADE(len_mem= 15),
    attr_tasks = ['crossover', 'mutation', 'search'],
)
solve = baseModel.fit(
    nb_generations= 1000, nb_inds_each_task= 100, nb_inds_min= 20,
    lr = 0.1, p_const_intra= 0., prob_search = 0., lc_nums = 200,
    nb_epochs_stop= 1000, swap_po= False,
    evaluate_initial_skillFactor= True
)

[0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1]
[99996, 100088, 100334, 100142, 100138, 99294, 99740, 99876, 100594, 99828]
END!


In [8]:
x

[0.369271725246802,
 0.5715278997229594,
 0.12045006780925734,
 0.394175159150768,
 9.134755659935667e-08,
 0.3568361548740225,
 0.2358874699234273,
 0.07930842369600759,
 0.5714774062951145,
 0.5482566704999147]

In [9]:
baseModel.history_cost[-1]

[0.369271725246802,
 0.571527897017238,
 0.1204500299559136,
 0.394175159150768,
 0.0,
 0.3568361547139353,
 0.2358874374338936,
 0.07930842369600731,
 0.5714774062951145,
 0.548256647258801]

In [10]:
print(sum(np.array(baseModel.history_cost[-1]) < np.array(x)))
print(sum(np.array(baseModel.history_cost[-1]) == np.array(x)))

7
3


In [11]:
baseModel = MultiTimeModel(model= MFEA_base)
baseModel.compile(
    IndClass= IndClass,
    tasks= tasks,
    crossover= SBX_Crossover(nc = 2),
    mutation= PolynomialMutation(nm = 5),
    selection= ElitismSelection()
)
baseModel.fit(
    nb_generations = 1000, rmp = 0.3, nb_inds_each_task= 100, 
    bound_pop= [0, 1], evaluate_initial_skillFactor= True
)

In [12]:
baseModel.run(
    nb_run= 1,
    save_path= './RESULTS/MFEA_robot.mso'
)

END!
DONE!
Saved


In [13]:
from pyMSOO.utils.LoadSaveModel.load_utils import loadModel

baseModel = loadModel('./RESULTS/MFEA_robot.mso', ls_tasks= tasks, set_attribute= True)