# Project 2 - UTA and AHP Methods

## Imports

In [1]:
!pip install pulp



In [2]:
import pulp
import pandas as pd

## Utils

## UTA

In [3]:
data = pd.read_csv("data.csv")
data

Unnamed: 0,name,price,critic_score,user_score,length,genres,num_of_achievements
0,Dark Souls: Remastered,150,84,83,44,11,41
1,Dark Souls III,200,89,90,49,11,43
2,Terraria,46,81,81,102,9,115
3,Baldur's Gate 3,250,96,89,107,8,54
4,Dave the Diver,92,90,83,32,7,43
5,Rust,153,69,65,37,10,92
6,Hollow Knight,68,90,91,42,10,63
7,Portal 2,46,95,89,14,7,51
8,Vampire Survivors,20,86,83,25,9,204
9,Hades,115,93,88,49,9,49


In [4]:
preferences = ["Payday 3 < Titan Souls",
               "Kao the Kangaroo < Hades",
               "Subnautica < Terraria",
               "Assassin's Creed Unity < Dark Souls: Remastered"]

criteria_types = {"price": "cost", "critic_score" : "gain", "user_score" : "gain", "length" : "gain", "genres" : "gain", "num_of_achievements" : "gain"}

In [39]:
def get_split_sign(preference):
    if "<" in preference:
        return "<"
    elif ">" in preference:
        return ">"
    elif "=" in preference:
        return "="
    else:
        raise ValueError("Unexpected or lack of sign indicating preference")

def get_condition(preference, split_sign):
    parts = preference.split(split_sign)
    parts = [part.strip() for part in parts]
    return parts[0], split_sign, parts[1]

def add_error_variables(var, compared_alternatives, positive_errors, negative_errors):
    if var not in compared_alternatives:
        compared_alternatives.add(var)
        positive_errors[var] = pulp.LpVariable(f'positive_error_{var}', lowBound=0)
        negative_errors[var] = pulp.LpVariable(f'negative_error_{var}', lowBound=0)

def get_alternative_constraints(alternatives):
    alternative_constraints = {}
    alt_constraint_names = {}
    for i, row in alternatives.iterrows():
        alternative_constraints[row["name"]] = {}
        for col in alternatives.drop(columns = ["name"]).columns:
            if f'{col}_{row[col]}' not in list(alt_constraint_names.keys()):
                alternative_constraints[row["name"]][col] = pulp.LpVariable(f'{col}_{row[col]}', lowBound=0)
                alt_constraint_names[f'{col}_{row[col]}'] = (row["name"], col)
            else:
                temp_name = alt_constraint_names[f'{col}_{row[col]}'][0]
                temp_col = alt_constraint_names[f'{col}_{row[col]}'][1]
                alternative_constraints[row["name"]][col] = alternative_constraints[temp_name][temp_col]
    return alternative_constraints

def get_comparison_constrains(alternative_constraints, var_names, positive_errors, negative_errors):
    constraints = []
    for alt_name_1, condition, alt_name_2 in var_names:
        expression1 = negative_errors[alt_name_1] - positive_errors[alt_name_1] + pulp.lpSum(alternative_constraints[alt_name_1][key] for key in alternative_constraints[alt_name_1].keys())
        expression2 = negative_errors[alt_name_2] - positive_errors[alt_name_2] + pulp.lpSum(alternative_constraints[alt_name_2][key] for key in alternative_constraints[alt_name_1].keys())
        if condition == ">":
            new_constraint = expression1 >= expression2
        elif condition == "=":
            new_constraint = expression1 == expression2
        elif condition == "<":
            new_constraint = expression1 <= expression2
        constraints.append(new_constraint)
    return constraints

def get_normalization_constrains(constraints, alternatives, criteria_types, alternative_constraints):
    best_row_positions = []
    for criteria in criteria_types.keys():
        if criteria_types[criteria] == "cost":
            idx = alternatives[criteria].idxmin()
        elif criteria_types[criteria] == "gain":
            idx = alternatives[criteria].idxmax()
        row = alternatives.loc[idx]
        best_row_positions.append([row["name"], criteria])
    constraints.append(pulp.lpSum(alternative_constraints[name][crit] for name, crit in best_row_positions) == 1.0)

    worst_row_positions = []
    for criteria in criteria_types.keys():
        if criteria_types[criteria] == "cost":
            idx = alternatives[criteria].idxmax()
        elif criteria_types[criteria] == "gain":
            idx = alternatives[criteria].idxmin()
        row = alternatives.loc[idx]
        worst_row_positions.append([row["name"], criteria])
    for name, crit in worst_row_positions:
        constraints.append(alternative_constraints[name][crit] == 0.0)
    return constraints

def get_monotonicity_constrains(constraints, alternatives, criteria_types, alternative_constraints):
    for col in alternatives.drop(columns = ["name"]).columns:
        if criteria_types[col] == "gain":
            alternatives.sort_values(by=col, inplace=True)
        if criteria_types[col] == "cost":
            alternatives.sort_values(by=col, inplace=True, ascending=False)
        prev_val = alternatives[col][0]
        for i in range(1, len(alternatives[col])):
            val = alternatives[col][i]
            if criteria_types[col] == "gain":
                if val>prev_val:
                    constraints.append(alternative_constraints[alternatives["name"][i-1]][col] <= alternative_constraints[alternatives["name"][i]][col])
                elif val<prev_val:
                    constraints.append(alternative_constraints[alternatives["name"][i]][col] <= alternative_constraints[alternatives["name"][i-1]][col])
            else:
                if val>prev_val:
                    constraints.append(alternative_constraints[alternatives["name"][i]][col] <= alternative_constraints[alternatives["name"][i-1]][col])
                elif val<prev_val:
                    constraints.append(alternative_constraints[alternatives["name"][i-1]][col] <= alternative_constraints[alternatives["name"][i]][col])
            prev_val = val
    return constraints

def get_threshold_constraints(constraints, alternative_constraints, upper_threshold, lower_threshold):
    for name in alternative_constraints.keys():
        for col in alternative_constraints[name].keys():
            constraints.append(alternative_constraints[name][col] >= lower_threshold)
            constraints.append(alternative_constraints[name][col] <= upper_threshold)
    return constraints

def define_LP_problem(alternatives, criteria_types, var_names, positive_errors, negative_errors, upper_threshold, lower_threshold):
    F = pulp.lpSum(positive_errors.values()) + pulp.lpSum(negative_errors.values())
    alternative_constraints = get_alternative_constraints(alternatives)
    constraints = get_comparison_constrains(alternative_constraints, var_names, positive_errors, negative_errors)
    constraints = get_normalization_constrains(constraints, alternatives, criteria_types, alternative_constraints)
    constraints = get_monotonicity_constrains(constraints, alternatives, criteria_types, alternative_constraints)
    constraints = get_threshold_constraints(constraints, alternative_constraints, upper_threshold, lower_threshold)
    problem = pulp.LpProblem('UTA Problem', pulp.LpMinimize)
    problem += F
    for constraint in constraints:
        problem += constraint
    return problem

def find_consistent_subset():
    pass

def plot_functions(alternatives, problem):
    pass

def calculate_results(alternatives, problem):
    var_values = {}
    result = {}
    for var in problem.variables():
        var_values[str(var.name)] = var.varValue
    for i, row in alternatives.iterrows():
        result[row["name"]] = 0.0
        for col in alternatives.drop(columns = ["name"]).columns:
            result[row["name"]] += var_values[f"{col}_{row[col]}"]
        result[row["name"]] = round(result[row["name"]], 5) #To avoid numerical errors
    sorted_results = sorted(result.items(), key=lambda x: x[1], reverse=True)
    return sorted_results

def solve_UTA(alternatives, criteria_types, preferences, verbose = True, plot_results = True, upper_threshold = 0.5, lower_threshold = 0.05):
    var_names = []
    compared_alternatives = set()
    positive_errors = {}
    negative_errors = {}
    for preference in preferences:
        split_sign = get_split_sign(preference)
        var1, condition, var2 = get_condition(preference, split_sign)
        var_names.append([var1, condition, var2])
        add_error_variables(var1, compared_alternatives, positive_errors, negative_errors)
        add_error_variables(var2, compared_alternatives, positive_errors, negative_errors)
    problem = define_LP_problem(alternatives, criteria_types, var_names, positive_errors, negative_errors, upper_threshold, lower_threshold)
    if verbose:
        print("\n___________________Problem Definition________________________\n")
        print(problem)
        print("\n")
    problem.solve()
    if verbose:
        print("\n_________________________Solution____________________________\n")
        print(f"Optimal Objective Function Value = {pulp.value(problem.objective)}")
        print("\n_________________________Variables___________________________\n")
        for var in problem.variables():
            print(f"Variable {var.name}: {var.varValue}")
    if plot_results:
        print("\n_________________________Plots________________________________\n")
        plot_functions(alternatives, problem)
    result = calculate_results(alternatives, problem)
    return result

In [41]:
results = solve_UTA(data, criteria_types, preferences)


___________________Problem Definition________________________

UTA_Problem:
MINIMIZE
1*negative_error_Assassin's_Creed_Unity + 1*negative_error_Dark_Souls:_Remastered + 1*negative_error_Hades + 1*negative_error_Kao_the_Kangaroo + 1*negative_error_Payday_3 + 1*negative_error_Subnautica + 1*negative_error_Terraria + 1*negative_error_Titan_Souls + 1*positive_error_Assassin's_Creed_Unity + 1*positive_error_Dark_Souls:_Remastered + 1*positive_error_Hades + 1*positive_error_Kao_the_Kangaroo + 1*positive_error_Payday_3 + 1*positive_error_Subnautica + 1*positive_error_Terraria + 1*positive_error_Titan_Souls + 0
SUBJECT TO
_C1: critic_score_66 - critic_score_74 + genres_5 - genres_8 + length_10
 - length_4 + negative_error_Payday_3 - negative_error_Titan_Souls
 + num_of_achievements_22 - num_of_achievements_27 - positive_error_Payday_3
 + positive_error_Titan_Souls + price_169 - price_68 + user_score_31
 - user_score_61 <= 0

_C2: critic_score_65 - critic_score_93 + genres_8 - genres_9 - lengt



Note: Warning due to names of games containing spaces

In [42]:
results

[('Skyrim', 1.0),
 ("Baldur's Gate 3", 0.75),
 ('Dark Souls: Remastered', 0.55),
 ('Dark Souls III', 0.55),
 ("Assassin's Creed Unity", 0.55),
 ('Inside', 0.3),
 ('Subnautica', 0.3),
 ('Payday 3', 0.3),
 ('Kao the Kangaroo', 0.3),
 ('Titan Souls', 0.3),
 ('Teardown', 0.3),
 ('Ori and the Will of the Wisps', 0.3),
 ('Dave the Diver', 0.3),
 ('The Forest', 0.3),
 ('Hades', 0.3),
 ('Trials Fusion', 0.3),
 ('Portal 2', 0.3),
 ('Enter the Gungeon', 0.3),
 ('Hollow Knight', 0.3),
 ('The Sims 3', 0.3),
 ('Dying Light', 0.3),
 ('Dishonored', 0.3),
 ('Rust', 0.3),
 ('Terraria', 0.3),
 ('Vampire Survivors', 0.3)]