Implementação de uma solução não genérica, pois a quantidade de loops de pares (i,j) depende do tamanho do vetor B:

In [None]:
import numpy as np


def local_cost_func(a, b):
    return abs(a - b)

def global_cost_func(a, b, c):
    return a + b + c


A = np.array(range(5))
B = A[:3] + 0.2

print('A:', A)
print('B:', B)

# Generate all global combinations
combinations = []
costs = []
k = 0
for i1 in range(len(A)):
    for j1 in range(len(B)):

        for i2 in range(len(A)):
            for j2 in range(len(B)):

                # Restriction for unique association
                if (i1 != i2) and (j1 != j2):
                    for i3 in range(len(A)):
                        for j3 in range(len(B)):

                            # Restriction for unique association
                            if (i2 != i3) and (j2 != j3) and (i3 != i1) and (j3 != j1):

                                # Compute the local costs
                                c1 = local_cost_func(A[i1], B[j1])
                                c2 = local_cost_func(A[i2], B[j2])
                                c3 = local_cost_func(A[i3], B[j3])

                                # Compute the global cost
                                gc = global_cost_func(c1, c2, c3)
                                
                                # Save the results
                                costs += [[c1, c2, c3, gc]]
                                combinations += [[A[i1], B[j1], A[i2], B[j2], A[i3], B[j3]]]
                                k += 1

combinations = np.array(combinations)
costs = np.array(costs)
global_opt_index = costs[:,-1].argmin()
global_opt = costs[global_opt_index, -1]
global_solution = combinations[global_opt_index].reshape(3, 2)
print(f"Found the global optimal of {global_opt} after computing {k} combinations.")
for p,pair in enumerate(global_solution):
    print('\t-', pair[1], 'was mapped to', pair[0], 'cost:', costs[global_opt_index, p])

Implementação de uma maneira mais genérica, que independe do tamanho do vetor B:

In [None]:
import numpy as np


def local_cost_func(a):
    return abs(a[0] - a[1])

def global_cost_func(a):
    return sum(a)

def drop_non_unique_rows(table):
    def unique_in_row(row):
        return len(np.unique(row)) == len(row)
    unique_rows_mask = np.apply_along_axis(unique_in_row, 1, table)
    return table[unique_rows_mask]
    
def generate_combination_matrix(A, B):
    len_b = len(B)
    pre_table = [A, B] * len_b
    return np.array(np.meshgrid(*pre_table)).T.reshape(-1, 2 * len_b)

def compute_cost(C):
    local_costs = np.apply_along_axis(local_cost_func, 1, C.reshape(len(C)//2, 2))

    global_costs = global_cost_func(local_costs)

    costs = np.array([*local_costs, global_costs])
    return costs


A = np.array(range(5))
B = A[:4] + 0.2

print('A:', A)
print('B:', B)

combinations = generate_combination_matrix(A, B)
combinations = drop_non_unique_rows(combinations)

costs = np.apply_along_axis(compute_cost, 1, combinations)

global_opt_index = costs[:,-1].argmin()
global_opt = costs[global_opt_index, -1]
global_solution = combinations[global_opt_index].reshape(len(B), 2)
print(f"Found the global optimal of {global_opt} after computing {len(combinations)} combinations.")
for p,pair in enumerate(global_solution):
    print('\t-', pair[1], 'was mapped to', pair[0], 'cost:', costs[global_opt_index, p])

Usando itertools para gerar as combinações o processo melhora muito:

In [None]:
import numpy as np
import itertools


def local_cost_func(a):
    return abs(a[0] - a[1])

def global_cost_func(a):
    return a.sum()

def compute_cost(combinations):
    local_costs = np.apply_along_axis(local_cost_func, 2, combinations)
    
    global_costs = np.apply_along_axis(global_cost_func, 1, local_costs).reshape(len(local_costs),1)
    
    costs = np.hstack((local_costs, global_costs))
    return costs

A = np.array(range(5))
B = A[:4] + 0.2

print('A:', A)
print('B:', B)

combinations = np.array([list(zip(x, B)) for x in itertools.permutations(A, len(B))])

costs = compute_cost(combinations)

global_opt_index = costs[:,-1].argmin()
global_opt = costs[global_opt_index, -1]
global_solution = combinations[global_opt_index].reshape(len(B), 2)
print(f"Found the global optimal of {global_opt} after computing {combinations.size} combinations.")
for p,pair in enumerate(global_solution):
    print('\t-', pair[1], 'was mapped to', pair[0], 'cost:', costs[global_opt_index, p])

In [None]:
print(costs.shape)
print(combinations.shape)
print(len(combinations))

Usando numba pra acelerar:

In [None]:
import numpy as np
import itertools
import numba
import time


@numba.jit
def local_cost_func(a):
    return np.abs(a[0] - a[1])

@numba.jit(nopython=True)
def global_cost_func(a):
    return a.sum()

@numba.jit(nopython=True)
def compute_cost(combinations):
    costs_size, combinations_size = combinations.shape[:2]
    costs = np.empty((costs_size, combinations_size+1))
    for i in range(costs_size):
        for j in range(combinations_size):
            costs[i, j] = local_cost_func(combinations[i, j])
        costs[i,:combinations_size+1] = global_cost_func(costs[i, :combinations_size])
    
    return costs

@numba.jit(nopython=True, parallel=True)
def optimize(combinations):
    costs = compute_cost(combinations)

    global_opt_index = costs[:,-1].argmin()
    global_opt = costs[global_opt_index, -1]
    global_solution = combinations[global_opt_index].reshape(combinations.shape[1:])
    
    return global_solution, global_opt, global_opt_index, elapsed_time


A = np.array(range(20))
B = A[:5] + 0.2
print('A:', A); print('B:', B)

combinations = np.array([list(zip(x, B)) for x in itertools.permutations(A, len(B))])

elapsed_time = -time.time()
global_solution, global_opt, global_opt_index, elapsed_time = optimize(combinations)
elapsed_time += time.time()

print(f"Found the global optimal of {round(global_opt, 2)} after computing {combinations.size} combinations in {round(elapsed_time, 2)} seconds.")
for p,pair in enumerate(global_solution):
    print('\t-', pair[1], 'was mapped to', pair[0], 'cost:', round(costs[global_opt_index, p], 2))

In [None]:
A = np.array(range(10))
B = A[:5] + 0.2
print('A:', A); print('B:', B)


elapsed_time = -time.time()
permutation = itertools.permutations(A, len(B))
elapsed_time += time.time()
print(f'permutations: {elapsed_time} seconds')

elapsed_time = -time.time()
combinations = np.array([list(zip(x, B)) for x in permutation])
elapsed_time += time.time()
print(f'combinations: {elapsed_time} seconds')


Podemos não usar a estrutura de dados de `[list(zip(x, B)) for x in permutatitons]` e usar diretamente `np.fromiter(permutations)`, e aplicar `B` dentro de `compute_cost()`:

In [11]:
import numpy as np
import itertools
import time

A = np.array(range(20))
B = A[:5] + 0.2
print('A:', A); print('B:', B)

elapsed_time = -time.time()
permutation = np.array(list(list(zip(x, B)) for x in itertools.permutations(A, len(B))))
elapsed_time += time.time()
print(f'permutation: {elapsed_time} seconds')
print(permutation[:2], permutation.shape)

A = np.array(range(20))
B = A[:5] + 0.2
print('A:', A); print('B:', B)

elapsed_time = -time.time()
permutation = np.array(list(itertools.permutations(A, len(B))))
elapsed_time += time.time()
print(f'permutation: {elapsed_time} seconds')
print(permutation[:2], permutation.shape)

A = np.array(range(20))
B = A[:5] + 0.2
print('A:', A); print('B:', B)

elapsed_time = -time.time()
permutation = np.fromiter(itertools.chain(*itertools.permutations(A, len(B))),dtype=float).reshape(-1, len(B))
elapsed_time += time.time()
print(f'permutation: {elapsed_time} seconds')
print(permutation[:2], permutation.shape)

A: [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19]
B: [0.2 1.2 2.2 3.2 4.2]
permutation: 13.489964008331299 seconds
[[[0.  0.2]
  [1.  1.2]
  [2.  2.2]
  [3.  3.2]
  [4.  4.2]]

 [[0.  0.2]
  [1.  1.2]
  [2.  2.2]
  [3.  3.2]
  [5.  4.2]]] (1860480, 5, 2)
A: [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19]
B: [0.2 1.2 2.2 3.2 4.2]
permutation: 2.0084993839263916 seconds
[[0 1 2 3 4]
 [0 1 2 3 5]] (1860480, 5)
A: [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19]
B: [0.2 1.2 2.2 3.2 4.2]
permutation: 0.7534046173095703 seconds
[[0. 1. 2. 3. 4.]
 [0. 1. 2. 3. 5.]] (1860480, 5)


In [21]:
import numpy as np
import itertools
import numba
import time


@numba.njit()
def local_cost_func(a, b):
    return abs(a - b)

@numba.njit()
def global_cost_func(a):
    return a.sum()

@numba.njit()
def compute_cost(permutations, B):
    costs_size, permutations_size = permutations.shape
    costs = np.empty((costs_size, permutations_size+1))
    for i in range(costs_size):
        for j in range(permutations_size):
            costs[i, j] = local_cost_func(permutations[i][j], B[j])
        costs[i, -1] = global_cost_func(costs[i, :permutations_size])

    return costs

@numba.njit(parallel=True)
def optimize(permutations, B):
    costs = compute_cost(permutations, B)

    global_opt_index = costs[:,-1].argmin()
    global_opt = costs[global_opt_index, -1]
    
    return global_opt, global_opt_index, costs

A = np.array(range(10))
B = np.array(range(10)) + 0.2
print('A:', A); print('B:', B); print('-'*80)

elapsed_time = -time.time()
permutations = np.fromiter(itertools.chain(*itertools.permutations(A, len(B))),dtype=float).reshape(-1, len(B))
global_opt, global_opt_index, costs = optimize(permutations, B)
elapsed_time += time.time()

print(f"Found the global optimal of {round(global_opt, 2)} after computing {permutations.size} permutations in {round(elapsed_time, 2)} seconds.")
for i,_ in enumerate(B):
    print('\t-', B[i], 'was mapped to', permutations[global_opt_index, i], 'cost:', round(costs[global_opt_index, i], 2))

A: [0 1 2 3 4 5 6 7 8 9]
B: [0.2 1.2 2.2 3.2 4.2 5.2 6.2 7.2 8.2 9.2]
--------------------------------------------------------------------------------
Found the global optimal of 2.0 after computing 36288000 permutations in 3.41 seconds.
	- 0.2 was mapped to 0.0 cost: 0.2
	- 1.2 was mapped to 1.0 cost: 0.2
	- 2.2 was mapped to 2.0 cost: 0.2
	- 3.2 was mapped to 3.0 cost: 0.2
	- 4.2 was mapped to 4.0 cost: 0.2
	- 5.2 was mapped to 5.0 cost: 0.2
	- 6.2 was mapped to 6.0 cost: 0.2
	- 7.2 was mapped to 7.0 cost: 0.2
	- 8.2 was mapped to 8.0 cost: 0.2
	- 9.2 was mapped to 9.0 cost: 0.2


Para minimizar o uso de memória podemos usar um gerador:

In [1]:
import numpy as np
import itertools
import numba
import time


@numba.jit(nopython=True, nogil=True, fastmath=True)
def compute_cost(permutations, B):
    cost = 0
    for i in range(len(permutations)):
        cost += abs(permutations[i] - B[i])
    return cost

@numba.jit(forceobj=True, parallel=True, fastmath=True)
def optimize(A, B):
    global_opt_cost = 1000.0
    global_opt_index = 0
    global_opt_permutation = [0.0] * len(B)
    evaluations = int(0)
    for permutations in itertools.permutations(A, len(B)):
        _global_cost = compute_cost(permutations, B)
        
        # minimization rule
        if _global_cost < global_opt_cost:
            global_opt_cost = _global_cost
            global_opt_index = evaluations
            global_opt_permutation = permutations
        
        evaluations += 1

    global_solution = permutations[global_opt_index]

    return global_solution, global_opt_cost, global_opt_index, global_opt_permutation, evaluations

A = np.array(range(10))
B = np.array(range(10)) + 0.2
print('A:', A); print('B:', B); print('-'*80)

elapsed_time = -time.time()
global_solution, global_opt_cost, global_opt_index, global_opt_permutation, evaluations = optimize(A, B)
elapsed_time += time.time()

print(f"Found the global optimal of {round(global_opt_cost, 2)} after computing {evaluations} evaluations in {round(elapsed_time, 2)} seconds.")
# for i,_ in enumerate(B):
#     print('\t-', B[i], 'was mapped to', global_opt_permutation[i], 'cost:', round(global_opt_costs[i], 2))


A: [0 1 2 3 4 5 6 7 8 9]
B: [0.2 1.2 2.2 3.2 4.2 5.2 6.2 7.2 8.2 9.2]
--------------------------------------------------------------------------------
Found the global optimal of 2.0 after computing 3628800 evaluations in 5.68 seconds.


Testando multiprocess + itertools:

In [23]:
import numpy as np
import itertools
import multiprocessing
import numba
import time


@numba.jit(nopython=True, nogil=True, fastmath=True)
def compute_cost(permutations):
    cost = 0
    for i in range(len(permutations)):
        cost += abs(permutations[i] - B[i])
    return cost

@numba.jit(forceobj=True, parallel=False, fastmath=True)
def optimize(A, B):
    global_opt_cost = 1000.0
    global_opt_index = 0
    global_opt_permutation = [0.0] * len(B)
    evaluations = int(0)
    
    pool = multiprocessing.Pool(multiprocessing.cpu_count())
    permutations = itertools.permutations(A, len(B))
    
    for _global_cost in pool.imap(compute_cost, permutations):
        # minimization rule
        if _global_cost < global_opt_cost:
            global_opt_cost = _global_cost
            global_opt_index = evaluations
#             global_opt_permutation = permutations
        
        evaluations += 1
    
    pool.close()
    global_solution = 0#permutations[global_opt_index]

    return global_solution, global_opt_cost, global_opt_index, global_opt_permutation, evaluations

A = np.array(range(10))
B = np.array(range(5)) + 0.2
print('A:', A, '\nB:', B, '\n' + '-' * 80);

elapsed_time = -time.time()
global_solution, global_opt_cost, global_opt_index, global_opt_permutation, evaluations = optimize(A, B)
pool.close()

elapsed_time += time.time()

print(f"Found the global optimal of {round(global_opt_cost, 2)} after computing {evaluations} evaluations in {round(elapsed_time, 2)} seconds.")


A: [0 1 2 3 4 5 6 7 8 9] 
B: [0.2 1.2 2.2 3.2 4.2] 
--------------------------------------------------------------------------------


Compilation is falling back to object mode WITH looplifting enabled because Function "optimize" failed type inference due to: [1m[1mUnknown attribute 'Pool' of type Module(<module 'multiprocessing' from '/usr/lib/python3.8/multiprocessing/__init__.py'>)
[1m
File "<ipython-input-23-38ffd993fb0f>", line 22:[0m
[1mdef optimize(A, B):
    <source elided>
    
[1m    pool = multiprocessing.Pool(multiprocessing.cpu_count())
[0m    [1m^[0m[0m
[0m
[0m[1mDuring: typing of get attribute at <ipython-input-23-38ffd993fb0f> (22)[0m
[1m
File "<ipython-input-23-38ffd993fb0f>", line 22:[0m
[1mdef optimize(A, B):
    <source elided>
    
[1m    pool = multiprocessing.Pool(multiprocessing.cpu_count())
[0m    [1m^[0m[0m
[0m
  @numba.jit(fastmath=True)
Compilation is falling back to object mode WITHOUT looplifting enabled because Function "optimize" failed type inference due to: [1m[1mCannot determine Numba type of <class 'numba.core.dispatcher.LiftedLoop'>[0m
[1m
File "<ipython

Found the global optimal of 1.0 after computing 30240 evaluations in 6.71 seconds.


Estudo de caso chunking em paralelo

In [1]:
import numpy as np
import itertools
import multiprocessing
import time

def compute_cost(permutations):
    cost = 0
    for i in range(len(permutations)):
        cost += abs(permutations[i] - B[i])
    return cost

def chunker(iterator, size):
    while chunk := list(itertools.islice(iterator, size)):
        yield chunk

def optimize(A, B):
    global_opt_cost = 1000.0
    global_opt_index = 0
    global_opt_permutation = [0.0] * len(B)
    evaluations = int(0)
    permutations = itertools.permutations(A, len(B))
    _global_cost = []
    for chunk in chunker(permutations, chunksize):
        _global_cost = (pool.map_async(compute_cost, chunk).get())
        evaluations += 1

            
        # minimization rule
        if _global_cost < global_opt_cost:
            global_opt_cost = _global_cost
            global_opt_index = evaluations
            global_opt_permutation = chunk
        
    global_solution = permutations[global_opt_index]

    return global_solution, global_opt_cost, global_opt_index, global_opt_permutation, evaluations

A = np.array(range(10))
B = np.array(range(5)) + 0.2
print('A:', A); print('B:', B); print('-'*80)

elapsed_time = -time.time()
global_solution, global_opt_cost, global_opt_index, global_opt_permutation, evaluations = optimize(A, B)
elapsed_time += time.time()

print(f"Found the global optimal of {round(global_opt_cost, 2)} after computing {evaluations} evaluations in {round(elapsed_time, 2)} seconds.")
# for i,_ in enumerate(B):
#     print('\t-', B[i], 'was mapped to', global_opt_permutation[i], 'cost:', round(global_opt_costs[i], 2))

A: [0 1 2 3 4 5 6 7 8 9]
B: [0.2 1.2 2.2 3.2 4.2]
--------------------------------------------------------------------------------
Found the global optimal of 1.0 after computing 30240 evaluations in 0.69 seconds.
