In [None]:
import ctypes
import sys
import time
from multiprocessing import Process, Queue

from IPython.display import display

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# Test the type of distribution of data for any distribution
from scipy.stats import kstest
# Test for the Heteroscedasticity
from scipy.stats import levene
# Test with nonparametric test for comparing of two
from scipy.stats import wilcoxon
# Test with nonparametric test for comparing of three or more
from scipy.stats import friedmanchisquare
# probability plot
from scipy.stats import probplot

np.set_printoptions(linewidth=100000000, formatter={'all': lambda x: str(x)})
%matplotlib notebook

In [None]:
from run_cec import MinMB, fillprototype

CECv = 2017
cec_dll = ctypes.cdll.LoadLibrary('cec%d/cec%d.so' % (CECv, CECv))
prototype = ctypes.CFUNCTYPE(    
    ctypes.c_double,                
    ctypes.POINTER(ctypes.c_double),                
    ctypes.c_int,
    ctypes.c_int
)
run_fun = prototype(('runtest', cec_dll))

from NiaPy.algorithms import AlgorithmUtility
from NiaPy.task import StoppingTask, Utility
from NiaPy.util import reflectRepair

def testRunTest(a, d, bench, **args):
   task = StoppingTask(D=d, nFES=d * 1e4, benchmark=bench, **args)
   start_time = time.time()
   best = a(task)
   return time.time() - start_time, best

def testOne(x=0.55):
   for i in range(10 ** 6): 
      x = x + x
      x = x / 2
      x = x * x
      x = np.sqrt(x)
      x = np.log(x)
      x = np.exp(x)
      x = x / (x + 2)

def testTwo(d):
   for i in range(2 * 10 ** 5):
      x = np.random.uniform(-100, 100, d)
      run_fun(x.ctypes.data_as(ctypes.POINTER(ctypes.c_double)), ctypes.c_int(d), ctypes.c_int(18))

def testThreeCec(a, d, fnum=1): return testRunTest(a, d, MinMB(cec_dll.runtest, fnum=fnum))

def testThreeBasic(a, d, fnum=1):
   mapper = {
      1:'bentcigar',
      2:'',
      3:'zakharov',
      4:'rosenbrock',
      5:'rastrigin',
      6:'',
      10:'swefel'
   }
   return testRunTest(a, d, mapper[fnum])

def t_fun_cec(a, d, fnum, q, runs_no):
   for _ in range(runs_no): q.put(testThreeCec(a, d, fnum)[1][1])
	  
def t_fun_basic(a, d, fnum, q, runs_no):
   for _ in range(runs_no): q.put(testThreeBasic(a, d, fnum)[1][1])
   
def runThread(a, d, fnum=1, thread_fun=t_fun_cec, thread_no=24, runs_no=35, seed=1, **a_args):
   autil = AlgorithmUtility()
   ts, qs, r, runs, runs_sum = [], [], [], [0] * thread_no, 0
   i = 0
   while runs_sum < runs_no:
      runs[i] += 1
      runs_sum += 1
      i = i + 1 if i + 1 < len(runs) else 0
   for i in range(thread_no):
      if runs[i] == 0: continue
      q = Queue(runs[i])
      t = Process(target=thread_fun, args=(autil.get_algorithm(a)(seed=seed + i, **a_args), d, fnum, q, runs[i]))
      t.start()
      ts.append(t), qs.append(q)
   for t in ts: t.join()
   for q in qs: 
      while not q.empty(): r.append(q.get())
   return r

In [None]:
dims, algosNames, alpha = [10, 30, 50], ['BA', 'ABA', 'SABA', 'HBA', 'HSABA'], 0.05
seed = 1
algs = {
   'BA': {},
   'HBA': {},
   'SABA': {},
   'HSABA': {
      'NP': 100,
      'A': 0.5,
      'F': 0.5,
      'CR': 0.9,
	  'r': 0.01,
      'Qmax': 1,
      'Qmin': -1.2
   }
}

# CEC 2017 algorithm speed test
Example of time execution speed of algorithm

## Test one

In [None]:
start_time = time.time()
testOne()
t0 = (time.time() - start_time)
print(t0)

## Test two

In [None]:
t1 = []
for d in dims:
   start_time = time.time()
   testTwo(d)
   t1.append(time.time() - start_time)
print(t1)

## Test three

In [None]:
d = 10

### Run on basic function

In [None]:
autil = AlgorithmUtility()
res, f = {}, 1
for k, v in algs.items():
   a = autil.get_algorithm(k)(seed=seed + 1, **v)
   r = testThreeBasic(a, d, f)
   res[k] = [r[0], r[1][1]]
display('Func %d' % f)
display(pd.DataFrame.from_dict(res))

### Run on CEC function

In [None]:
autil = AlgorithmUtility()
for f in range(1):
   res = {}
   for k, v in algs.items():
      a = autil.get_algorithm(k)(seed=seed + 1, **v)
      r = testThreeCec(a, d, f + 1)
      res[k] = [r[0], r[1][1]]
   display('Func %d' % (f + 1))
   display(pd.DataFrame.from_dict(res))

## Running on multiple threads

### Run on basic function

In [None]:
res, d, f = {}, 10, 1
for k, v in algs.items(): res[k] = runThread(k, d, f, thread_fun=t_fun_basic, **v)
display('Func %d' % f)
display(pd.DataFrame.from_dict(res))

### Run on CEC function

In [None]:
res, d, f = {}, 10, 1
for k, v in algs.items(): res[k] = runThread(k, d, f, thread_fun=t_fun_cec, **v)
display('Func %d' % f)
display(pd.DataFrame.from_dict(res))

# CEC 2017 statistic
For getting the data need to perform statistic test we used: `for i in {1..30}; do python run_cec.py -c 17 -o T -rn 50 -a DE -D 10 -f $i -seed {1000..1050}; done`.
The example shows how to run DE algorithm on all benchmark functions that have problem dimensionality set to 10.
DE algorithm runs 50 times on each benchmark functions with seed in range from 1000 to 1050.
Every algorithm run has it's own seed.
We used `-o T` for generating the output.

## Example of multiple runs on one problem
* dim $\in \{10, 30, 50\}$
* fnum $\in \{1, \cdots , 30\}$
* algos $\in$ `{dynNpMsjDE, BBFWA, DE, jDE, SCA, ES(1+1), ES(m+1), ASO, BA, dynFWAG}`

## Load data
We will create a new variable `data`, that holds the optization results for 50 runs, 30 functions and specified algorithms.
Now first index presents algorithm, second index presents function and third index presents the value of optimization.

In [None]:
res, d = {}, 10
for k, v in algs.items():
   print('START algor run for %s' % k)
   for f in range(1, 31):
      print('START function %d run' % f)
      tmp = runThread(k, d, f, thread_fun=t_fun_cec, **v)
      if res.get(k, None) is None: res[k] = [tmp]
      else: res[k].append(tmp)
      print('END function run')
   print('END algor run')

## Normalize the data
Now we transform the presentation of data in our new array called `ndata` that has normalized data.
Now first index presents function, second index presents algorithm and third index presents the value of optimization.

In [None]:
ndata = np.asanyarray([[(data[j, i] - np.min(data[j, i])) / (np.max(data[j, i]) - np.min(data[j, i])) for j in range(len(algos))] for i in range(30)])
ndata[np.isnan(ndata)] = 0

## Test for normal distribution
[Kolmogorov–Smirnov](https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test) test: Tests whether a sample is drawn from a given distribution, or whether two samples are drawn from the same distribution.

If $\text{p-value} > \alpha$ then values belong to selected distribution.

In [None]:
for i, a in enumerate(algos):
   tmp = ''
   for j in range(30):
      s = kstest(ndata[j, i], 'norm')
      print('%10s on %2d.: %.3E %.3E' % (a, j + 1, s[0], s[1]))
      tmp += '%.3E & ' % s[1]
   print(tmp, '\n')

### Probability Plot for one algorithm

In [None]:
for i in range(30):
   fig = plt.figure()
   ax = fig.add_subplot(111)
   probplot(ndata[i, 0], plot=ax)
plt.show()

## Test for Heteroscedasticity
The Levene test tests the null hypothesis that all input samples are from populations with equal variances. Levene’s test is an alternative to Bartlett’s test bartlett in the case where there are significant deviations from normality.

In [None]:
tmp = ''
for j in range(30):
   s = levene(*ndata[j, :], center='mean')
   print('%2d.: %.3E %.3E' % (j + 1, s[0], s[1]))
   tmp += '%.3E & ' % s[1]
print(tmp)

## Run Wilcoxon test
[Wilcoxon signed-rank](https://en.wikipedia.org/wiki/Wilcoxon_signed-rank_test) test: tests whether matched pair samples are drawn from populations with different mean ranks

In [None]:
for j in range(30):
   tmp = ''
   print ('f%2d' % (j + 1))
   for i, a in enumerate(algos):
      s = wilcoxon(ndata[j, 0], ndata[j, i])
      print ('%10s vs. %10s on %2d. fun: %.3E %.3E' % (algos[0], a, j, s[0], s[1]))
      tmp += '%.3E & ' % s[1]
   print (tmp, '\n')

## Run Friedman test
[Friedman two-way analysis of variance by ranks](https://en.wikipedia.org/wiki/Friedman_test): tests whether k treatments in randomized block designs have identical effects

In [None]:
for j in range(30):
   print (algos, ' on ', j + 1, ' fun: ', friedmanchisquare(*ndata[j, :]))