In [4]:
from typing import Union
from functools import reduce
import random
import heapq
from itertools import combinations

# Data

In [5]:
def convert_list(data_list):
    return [convert_str_to_number(item) for item in data_list]


def convert_str_to_number(s):
    try:
        return int(s)
    except ValueError:
        try:
            return float(s)
        except ValueError:
            return s


def to_vertical_representation(transactions):
    vertical = {}
    for tid, txn in enumerate(transactions, start=0):
        for item in txn:
            if item not in vertical:
                vertical[item] = set()
            vertical[item].add(tid)
    return vertical


def to_horizontal_representation(transactions, utility_of_each_item):
    horizontal = {}

    for transaction_ID, (items, utility) in enumerate(
        zip(transactions, utility_of_each_item), start=0
    ):
        horizontal[transaction_ID] = {item: util for item, util in zip(items, utility)}
    return horizontal


def calculate_total_utility(vertical, horizontal):  # -> dict:
    utility_values_of_mono_item = {}
    for item_name, list_id_transactions in vertical.items():
        utility_values_of_mono_item[item_name] = sum(
            [
                horizontal[transaction_ID][item_name]
                for transaction_ID in list_id_transactions
            ]
        )
    return utility_values_of_mono_item


class DataWarehouse:
    def __init__(self, data: str):
        self.data = data
        self.items = []
        self.transaction_utility = []
        self.utility_of_each_item = []
        self.utility_values_of_mono_item = {}
        self.vertical = None
        self.horizontal = None
        self.total_utility = 0
        self._process_data()
        self._create_representation_data()

    def _process_data(self):
        lines = self.data.strip().split("\n")
        for line in lines:
            items, transaction_utility, utility_of_each_item = line.split(":")
            self.items.append(convert_list(items.split()))
            self.transaction_utility.append(convert_str_to_number(transaction_utility))
            self.utility_of_each_item.append(convert_list(utility_of_each_item.split()))

    def _create_representation_data(self):
        self.vertical = to_vertical_representation(self.items)
        self.horizontal = to_horizontal_representation(
            self.items, self.utility_of_each_item
        )
        self.utility_values_of_mono_item = calculate_total_utility(
            self.vertical, self.horizontal
        )
        self.total_utility = sum(self.utility_values_of_mono_item.values())

# Genetic algorithms

In [None]:
class Genetic:
    def __init__(
        self,
        number_of_population,
        m,
        quantity_of_elite,
        alpha=0.5,
        beta=0.5,
        number_of_generations=10,
        k_tournament=5,
        number_population_s=5,
        stop_criteria_loop=1000,
    ):
        self.number_of_population = number_of_population
        self.m = m
        self.quantity_of_elite = quantity_of_elite
        self.alpha = alpha
        self.beta = beta
        self.stop_criteria_loop = stop_criteria_loop
        self.number_population_s = number_population_s
        self.number_of_generations = number_of_generations
        self.k_tournament = k_tournament

    def get_initial_solutions(self, data) -> list:
        items = data.items
        horizontal = data.horizontal
        transaction_utility = data.transaction_utility

        tu_values = [(i, values) for i, values in enumerate(transaction_utility)]
        candidate_transactions = sorted(tu_values, key=lambda x: x[1], reverse=True)[
            : self.number_of_population
        ]

        initial_solutions = []
        for id_transaction, _ in candidate_transactions:
            items_utility_of_transaction = horizontal[id_transaction]
            sorted_items = sorted(
                items[id_transaction],
                key=lambda x: items_utility_of_transaction.get(x, 0),
                reverse=True,
            )
            solution = [item for item in sorted_items[: self.m]]
            initial_solutions.append(solution)
        return initial_solutions

    def evaluation(self, solution, data: DataWarehouse) -> Union[int, float]:
        if solution:
            pass
        else:
            return 0
        
        vertical = data.vertical
        set_id = [vertical[item] for item in solution]
        if len(solution) > 1:
            id_intersection = set.intersection(*set_id)
        else:
            print(solution)
            id_intersection = vertical[solution[0]]
            
        fitness = 0
        for tid in id_intersection:
            fitness += sum([data.horizontal[tid].get(item, 0) for item in solution])
        return fitness

    def wheel_selection(self, utility_values_of_mono_item, total_utility):
        items = list(utility_values_of_mono_item.keys())
        utilities = list(utility_values_of_mono_item.values())

        if len(items) != len(utilities):
            raise ValueError("Not same length of items and utilities !")

        cumulative_probabilities = []
        cumulative_sum = 0
        for utility in utilities:
            cumulative_sum += utility / total_utility
            cumulative_probabilities.append(cumulative_sum)

        random_value = random.random()

        for i, cumulative_probability in enumerate(cumulative_probabilities):
            if random_value <= cumulative_probability:
                return items[i]

    def genetic_operators(
        self, current_population, data: DataWarehouse, alpha, beta
    ) -> list:
        utility_values_of_mono_item = data.utility_values_of_mono_item
        new_population = []
        for first_solution, second_solution in combinations(current_population, 2):
            # Crossover
            if alpha > random.random():
                if self.evaluation(first_solution, data) > self.evaluation(
                    second_solution, data
                ):
                    x = min(
                        first_solution, key=lambda k: utility_values_of_mono_item[k]
                    )
                    y = max(
                        second_solution, key=lambda k: utility_values_of_mono_item[k]
                    )
                else:
                    x = max(
                        first_solution, key=lambda k: utility_values_of_mono_item[k]
                    )
                    y = min(
                        second_solution, key=lambda k: utility_values_of_mono_item[k]
                    )
                first_solution.remove(x)
                second_solution.append(x)
                first_solution.append(y)
                second_solution.remove(y)

            # Mutation
            for solution in [first_solution, second_solution]:
                if beta > random.random():
                    if 0.5 > random.random():
                        print(solution)
                        x = min(solution, key=lambda k: utility_values_of_mono_item[k], default=None)
                        if x is not None:
                            solution.remove(x)
                    else:
                        x = self.wheel_selection(
                            utility_values_of_mono_item, data.total_utility
                        )
                        solution.append(x)
                new_population.append(solution)
        return new_population

    def tournament_selector(self, data: DataWarehouse, population):
        S = []
        while len(S) < self.number_population_s:
            random_tour = random.sample(population, self.k_tournament)
            best_individual = max(random_tour, key=lambda x: self.evaluation(x, data))
            if best_individual not in S:
                S.append(best_individual)
        return S

    def calculate_total_utility_of_population(self, population, data: DataWarehouse):
        return sum([self.evaluation(solution, data) for solution in population])

    def get_new_elite_population(self, elite_population, new_population, data: DataWarehouse):# -> tuple[list, int | float]:
        population = elite_population + new_population
        # print(elite_population)
        # print(population)
        values = [(x, self.evaluation(x, data)) for x in population]
        sorted_values = sorted(values, key=lambda x: x[1], reverse=True)
        new_elite_population = [x[0] for x in sorted_values[:self.quantity_of_elite]]
        
        return new_elite_population, sum([x[1] for x in sorted_values[self.quantity_of_elite:]])
    
    def update_parameters(self, status: bool,):
        if status:
            self.alpha += 0.05
            self.beta -= 0.05
        else:
            self.alpha -= 0.05
            self.beta += 0.05
    
    def solve(self, data: DataWarehouse):
        elite_population = heapq.nlargest(
            self.quantity_of_elite,
            data.utility_values_of_mono_item.keys(),
            key=lambda x: data.utility_values_of_mono_item[x],
        )
        elite_population = [[x] for x in elite_population]
        population = self.get_initial_solutions(data)
        
        new_population = []
        stop_criteria = 0
        while True:
            s = self.tournament_selector(data, population)
            new_population = self.genetic_operators(s, data, self.alpha, self.beta)
            old_util_values = self.calculate_total_utility_of_population(population, data)
            elite_population, new_util_values =  self.get_new_elite_population(elite_population, new_population, data)
            
            if new_util_values > old_util_values:
                stop_criteria = 0
                self.update_parameters(True)
            else:
                stop_criteria += 1
                self.update_parameters(False)
            if stop_criteria == self.stop_criteria_loop:
                break
        return elite_population        

In [7]:
with open(file="data\\chess_utility.txt") as file:
    processor = DataWarehouse(file.read())
    
processor.utility_values_of_mono_item
Genetic(75, 10, 5, k_tournament=5, number_population_s=5,).solve(processor)

[69, 48, 45, 34, 23, 1, 40, 3, 60, 7]
[40, 69, 48, 1, 72, 50, 34, 45, 52, 56]
[40, 1, 48, 72, 52, 56, 3, 34, 29, 40, 1, 28]
[40, 1, 48, 72, 52, 56, 34, 29, 40, 1, 40]
[48, 45, 69, 1, 34, 72, 23, 56, 52, 72, 3]
[40]
[48]
[52]
[34]
[72]
[1, 48, 40, 72, 66, 71, 56, 64, 11, 7]
[40, 69, 1, 52, 34, 48, 60, 64, 56, 72, 19]
[40, 68, 52, 72, 23, 48, 3, 66, 1, 11, 68]
[40, 68, 52, 72, 48, 3, 66, 1, 68, 40]
[40]
[52, 72, 45, 48, 69, 34, 66, 23, 64, 23]
[69, 48, 50, 52, 11, 64, 66, 3, 7, 40]
[40, 68, 52, 72, 48, 66, 68, 40, 48]
[69, 48, 50, 52, 11, 64, 66, 3, 40, 62]
[1, 52, 34, 48, 60, 64, 56, 72, 64, 52, 15, 48, 68]
[40, 52, 72, 48, 40, 48, 40, 40]
[1, 52, 34, 48, 60, 56, 72, 64, 52, 48, 68, 48]
[68, 72, 11, 3, 56, 60, 64, 34, 60, 68, 64]
[40, 48, 1, 68, 23, 34, 52, 11, 56, 72, 66]
[40, 48, 1, 68, 34, 52, 11, 56, 72, 66, 64, 19]
[40, 48, 1, 68, 34, 52, 11, 56, 72, 66, 40]
[40, 48, 72, 52, 56, 34, 29, 40, 40, 48, 48]
[48, 68, 72, 50, 1, 34, 52, 3, 64, 44]
[48, 68, 72, 1, 34, 52, 3, 64, 34]
[40, 6

KeyboardInterrupt: 

[2, 1]


TypeError: '<=' not supported between instances of 'int' and 'list'

In [48]:
processor.utility_values_of_mono_item

{3: 13, 5: 15, 1: 20, 2: 22, 4: 14, 6: 5, 7: 7}

In [18]:
data = processor
number_of_population = 2
m = 2
items = data.items
horizontal = data.horizontal
transaction_utility = data.transaction_utility
utility_of_each_item = data.utility_of_each_item

tu_values = [(i, values) for i, values in enumerate(transaction_utility)]
candidate_transactions = sorted(tu_values, key=lambda x: x[1], reverse=True)[:number_of_population]

initial_solutions = []
for id_transaction, _ in candidate_transactions:
    items_utility_of_transaction = horizontal[id_transaction]
    sorted_items = sorted(
        items[id_transaction],
        key=lambda x: items_utility_of_transaction.get(x, 0),
        reverse=True,
    )
    solution = [item for item in sorted_items[:m]]
    initial_solutions.append(solution)
    
# for

In [22]:
processor.vertical

{3: {0, 1, 2, 3, 4},
 5: {0, 1, 3, 4},
 1: {0, 2, 3},
 2: {0, 1, 4},
 4: {0, 1, 2},
 6: {0},
 7: {3, 4}}

In [23]:
processor.horizontal

{0: {3: 1, 5: 3, 1: 5, 2: 10, 4: 6, 6: 5},
 1: {3: 3, 5: 3, 2: 8, 4: 6},
 2: {3: 1, 1: 5, 4: 2},
 3: {3: 6, 5: 6, 1: 10, 7: 5},
 4: {3: 2, 5: 3, 2: 4, 7: 2}}

In [34]:
processor.utility_values_of_mono_item


{3: 13, 5: 15, 1: 20, 2: 22, 4: 14, 6: 5, 7: 7}

In [None]:
import random
from typing import Union
from functools import reduce
from itertools import combinations

def evaluation(solution, data: DataWarehouse) -> Union[int, float]:
    vertical = data.vertical
    id_intersection = reduce(lambda x, y: x & y, [vertical[item] for item in solution])    
    
    fitness = 0
    for tid in id_intersection:
        fitness += sum([data.horizontal[tid].get(item, 0) for item in solution])
    return fitness
evaluation([3, 4], processor)       
    
    
def wheel_selection(items, utilities, total_utility):
    if len(items) != len(utilities):
        raise ValueError("Not same length of items and utilities !")

    cumulative_probabilities = []
    cumulative_sum = 0
    for utility in utilities:
        cumulative_sum += utility / total_utility
        cumulative_probabilities.append(cumulative_sum)

    random_value = random.random()

    for i, cumulative_probability in enumerate(cumulative_probabilities):
        if random_value <= cumulative_probability:
            return items[i]
            
def genetic_operators(current_population, data: DataWarehouse, alpha, beta) -> list:
    utility_values_of_mono_item = data.utility_values_of_mono_item
    
    new_population = []
    for first_solution, second_solution in combinations(current_population):
        # Crossover
        if alpha > random.random():
            if evaluation(first_solution, data) > evaluation(second_solution, data):
                x = min(first_solution, key=lambda k: utility_values_of_mono_item[k])
                y = max(second_solution, key=lambda k: utility_values_of_mono_item[k])
            else:
                x = max(first_solution, key=lambda k: utility_values_of_mono_item[k])
                y = min(second_solution, key=lambda k: utility_values_of_mono_item[k])
        first_solution.remove(x)
        second_solution.append(x)
        first_solution.append(y)
        second_solution.remove(y)
        
        # Mutation
        for solution in [first_solution, second_solution]:
            if beta > random.random():
                if 0.5 > random.random():
                    x = min(solution, key=lambda k: utility_values_of_mono_item[k])
                    solution.remove(x)
                else:
                    x = wheel_selection(data.items, utility_values_of_mono_item, data.total_utility)
                    solution.append(x)
            new_population.append(solution)
    return new_population

