In [1]:
import numpy as np
import pandas as pd
import csv
from IPython.display import clear_output

In [2]:
class SAC:
    # Конструктор класса
    ## Параметры:
    ## path - путь к файлу .csv с данными;
    ## numClasses - количество классов;
    ## sep - сепаратор, разделяющий оценки;
    ## enc - кодировка
    def __init__(self, path, numClasses, sep=",", enc="utf-8"):
        # Чтение файла
        with open(path, 'r', encoding=enc, newline='') as file:
            file = csv.reader(file, delimiter=sep, quotechar='"')
            self.alternatives = []
            for i, line in enumerate(file):
                # Первая строка файла - максимальный вектор
                # Остальные строки файла - векторы оценок
                vector = np.array(line[0].split(sep)).astype(int)
                if i > 0:
                    self.alternatives.append(vector.copy())
                else:
                    self.maxVector = vector.copy() # максимальный вектор оценок
        # Параметры класса
        self.path = path # путь к файлу
        self.sep = sep # сепаратор
        self.enc = enc # кодировка
        self.numClasses = numClasses # количество классов
        self.classes = [[] for _ in range(int(numClasses))] # двумерный список индексов альтернатив, разбитых по классам, list
        self.alternatives = np.array(self.alternatives) # массив альтернатив (векторов оценок), np.array
        self.numAlternatives = self.alternatives.shape[0] # количество альтернатив
        self.numCriteria = self.alternatives.shape[1] # количество критериев
        self.Yg = [i for i in range(self.numAlternatives)] # список неклассифицированных альтернатив, list
        self.setsG = [[i for i in range(numClasses)] for _ in range(self.numAlternatives)] # множества G альтернатив
        self.centers = np.empty((numClasses, self.numCriteria)) # массив центров классов, np.array
        # Начальная инициализация параметров класса
        ## Поиск лучшего вектора
        indBestVec = self.find_best_vector() # индекс лучшего вектора
        if indBestVec is not None:
            self.setsG[indBestVec] = [0]
            self.classes[0].append(indBestVec)
            self.centers[0] = self.alternatives[indBestVec].copy()
            self.Yg.remove(indBestVec)
        else:
            self.centers[0] = np.ones(self.numCriteria)
        ## Поиск худшего вектора
        indWorseVec = self.find_worse_vector() # индекс худшего вектора
        if indWorseVec is not None:
            self.setsG[indWorseVec] = [numClasses - 1]
            self.classes[numClasses - 1].append(indWorseVec)
            self.centers[numClasses - 1] = self.alternatives[indWorseVec].copy()
            self.Yg.remove(indWorseVec)
            self.maxVector = self.alternatives[indWorseVec].copy()
        else:
            self.centers[numClasses - 1] = self.maxVector.copy()
        self.recalc_params()
        self.recalc_centers()
    
    # Делает копию двумерного списка
    ## Возвращает 
    ### Копию двумерного списка, list
    def make_copy(self, arr):
        copyArr = []
        for i in range(len(arr)):
            copyArr.append(arr[i].copy())
        return copyArr
    
    # Поиск лучшего вектора
    ## Возвращает:
    ### Индекс лучшего вектора при условии, что он доминирует строго над всеми в подмножестве, int
    ### None, если такого вектора нет
    def find_best_vector(self):
        indBestVec = None
        for i in range(self.numAlternatives):
            count = 0
            for j in range(self.numAlternatives):
                vec1 = self.alternatives[i].copy()
                vec2 = self.alternatives[j].copy()
                if (i != j) and (self.is_dominating(vec1, vec2)):
                    count += 1
            if count == self.numAlternatives - 1:
                indBestVec = i
                break
        return indBestVec
    
    # Поиск худшего вектора
    ## Возвращает:
    ### Индекс худшего вектора при условии, что над ним доминируют строго все в подмножестве, int
    ### None, если такого вектора нет
    def find_worse_vector(self):
        indWorseVec = None
        for i in range(self.numAlternatives):
            count = 0
            for j in range(self.numAlternatives):
                vec1 = self.alternatives[i].copy()
                vec2 = self.alternatives[j].copy()
                if (i != j) and (self.is_dominating(vec2, vec1)):
                    count += 1
            if count == self.numAlternatives - 1:
                indWorseVec = i
                break
        return indWorseVec   
      
    # Проверка на доминирование
    ## Возвращает 
    ### True, если vec1 > vec2
    ### False в остальных случаях
    def is_dominating(self, vec1, vec2):
        vec3 = vec2 - vec1
        res = False if sum(vec3) == 0 else True
        if res:
            for elem in vec3:
                if elem < 0:
                    res = False
                    break
        return res
    
    # Поиск непустого класса снизу
    ## Возвращает
    ### Индекс первого непустого класса ниже текущего, int
    def find_index_below(self, index):
        indBelow = 0
        for i in range(index - 1, -1, -1):
            if len(self.classes[i]) != 0:
                indBelow = i
                break
        return indBelow
    
    # Поиск непустого класса сверху
    ## Возвращает
    ### Индекс первого непустого класса выше текущего, int
    def find_index_above(self, index):
        indAbove = self.numClasses - 1
        for i in range(index + 1, self.numClasses):
            if len(self.classes[i]) != 0:
                indAbove = i
                break
        return indAbove
     
    # Пересчёт параметров классов: setsG, classes, Yg
    def recalc_params(self):
        newG = self.make_copy(self.setsG)
        newYg = self.Yg.copy()
        # Проходим по всем векторам из Yg
        for indVec in self.Yg:
            vecCurrent = self.alternatives[indVec].copy() # текущий вектор
            # Проходим по всем доступным классам для текущего вектора
            for indClassCurr in self.setsG[indVec]:
                fl = False # флаг: если True, то indClassCurr из G удалить
                # Проходим по всем классам
                for indClass in range(self.numClasses):
                    # Проходим по всем векторам из класса classes[indClass]
                    for indVecClass in self.classes[indClass]:
                        vecClass = self.alternatives[indVecClass].copy() # вектор из класса classes[indClass]
                        if ((indClassCurr > indClass) and (self.is_dominating(vecCurrent, vecClass))) or \
                           ((indClassCurr < indClass) and (self.is_dominating(vecClass, vecCurrent))):
                            newG[indVec].remove(indClassCurr)
                            # Если для вектора остался один доступный класс,
                            # то вектор добавляется в этот класс и удаляется из Yg
                            if len(newG[indVec]) == 1:
                                self.classes[newG[indVec][0]].append(indVec)
                                newYg.remove(indVec)
                            fl = True
                            break
                    if fl:
                        break
        self.setsG = newG
        self.Yg = newYg
                
    # Пересчёт центров классов
    def recalc_centers(self):
        for indCl in range(self.numClasses):
            lenClass = len(self.classes[indCl])
            if lenClass != 0:
                meanVec = np.zeros(self.numCriteria)
                for indVec in self.classes[indCl]:
                    meanVec += self.alternatives[indVec].copy()
                meanVec /= lenClass
                self.centers[indCl] = meanVec.copy()
        for indCl in range(self.numClasses):
            lenClass = len(self.classes[indCl])
            if lenClass == 0:
                indBelow = self.find_index_below(indCl)
                indAbove = self.find_index_above(indCl)
                dispVector = (self.centers[indAbove] - self.centers[indBelow]) / (indAbove - indBelow) # вектор смещения
                for i in range(indBelow + 1, indAbove):
                    self.centers[i] = self.centers[i - 1].copy() + dispVector.copy()
                    indCl += 1
          
    # Расчёт p для всех векторов из множества Yg
    ## Возвращает
    ### Двумерный массив вероятностей, np.array
    def calc_p(self):
        dMax = sum(self.maxVector.copy() - np.ones(self.numCriteria)) # максимальное расстояние между векторами из Y
        p = np.zeros((len(self.Yg), self.numClasses))
        # Проходим по всем векторам из Yg
        for i in range(p.shape[0]):
            vecCurrent = self.alternatives[self.Yg[i]].copy() # текущий вектор
            d = [] # список расстояний до всех доступных центров классов для текущего вектора
            # Проходим по всем доступным классам для текущего вектора
            for indClass in self.setsG[self.Yg[i]]:
                # Расчёт расстояния от центра класса до текущего вектора
                d.append(sum(abs(vecCurrent - self.centers[indClass].copy())))       
            indCount = 0
            # Проходим по всем доступным классам для текущего вектора
            for indClass in self.setsG[self.Yg[i]]:
                # Расчёт вероятности принадлежности вектора классу
                p[i][indClass] = (dMax - d[indCount]) / (len(self.setsG[self.Yg[i]]) * dMax - sum(d))
                indCount += 1
        return p
    
    # Расчёт g для всех векторов из множества Yg
    ## Возвращает
    ### Двумерный массив g, np.array
    def calc_g(self):
        g = np.zeros((len(self.Yg), self.numClasses))
        # Проходим по всем индексам векторов из Yg
        for i in range(g.shape[0]):
            # Проходим по всем индексам доступных классов для текущего вектора
            for indClass in self.setsG[self.Yg[i]]:
                tempSetsG = self.make_copy(self.setsG)
                tempClasses = self.make_copy(self.classes)
                tempYg = self.Yg.copy()
                self.setsG[self.Yg[i]] = [indClass]
                self.classes[indClass].append(self.Yg[i])
                self.Yg.remove(self.Yg[i])
                self.recalc_params()
                g[i][indClass] = len(tempYg) - len(self.Yg) - 1
                self.setsG = tempSetsG
                self.classes = tempClasses
                self.Yg = tempYg
        return g
    
    # Расчёт коэффициента информативности для всех векторов из Yg
    ## Возвращает
    ### Массив коэффициентов информативностей для Yg, np.array
    def calc_coeff(self, v, p, g):
        coeff = np.empty(len(self.Yg))
        for i in range(len(self.Yg)):
            mx = 0 
            dx = 0
            for j in range(self.numClasses):
                mx += p[i][j] * g[i][j]
                dx += p[i][j] * g[i][j] * g[i][j]
            dx = dx - mx * mx
            coeff[i] = (mx / (1 + (v * np.sqrt(dx) / mx))) if mx != 0 else 0
        return coeff
    
    # Проверка согласованности векторов
    ## Возвращает
    ### True, если согласованны; False, если нет
    def check_consistency(self, VecsAndClasses):
        res = False
        for i in range(len(VecsAndClasses)):
            vec1 = self.alternatives[VecsAndClasses[i][0]].copy()
            class1 = VecsAndClasses[i][1]
            for j in range(i + 1, len(VecsAndClasses)):
                vec2 = self.alternatives[VecsAndClasses[j][0]].copy()
                class2 = VecsAndClasses[j][1]
                if (class1 > class2 and self.is_dominating(vec1, vec2)) or \
                (class1 < class2 and self.is_dominating(vec2, vec1)):
                    res = True
                    break
            if res:
                break
        return res
    
    # Изменение классов пользователем
    ## Возвращает
    ### Список пар исправленных векторов - индекс вектора и класса, list
    def change_classes(self, classifiedByDM, conflicts):
        while self.check_consistency(conflicts):
            resStr = "Conflicts:\n"
            for conf in conflicts:
                resStr += "\t{0}: class {1}\n".format(self.alternatives[conf[0]], conf[1] + 1)
            for i in range(len(conflicts)):
                resStr += "\nCurrent vector: {0}".format(self.alternatives[conflicts[i][0]])
                resStr += "\nCurrent class: {0}".format(conflicts[i][1] + 1)
                # clear_output(wait=True)
                print(resStr)
                inpClass = int(input("\nNew class: ")) - 1
                # print("\nNew class:", inpClass)
                conflicts[i][1] = inpClass
        for i in range(len(classifiedByDM)):
            for j in range(len(conflicts)):
                if classifiedByDM[i][0] == conflicts[j][0]:
                    classifiedByDM[i][1] = conflicts[j][1]
        return classifiedByDM
                
    # Поиск векторов, для которых нарушается правило непротиворечивости
    ## Возвращает
    ### Список пар для противоречивых векторов - индекс вектора и класса, list
    def find_conflicts(self, VecsAndClasses):
        conflicts = []
        for i in range(len(VecsAndClasses)):
            indVec1 = VecsAndClasses[i][0]
            vec1 = self.alternatives[indVec1].copy()
            class1 = VecsAndClasses[i][1]
            for j in range(i + 1, len(VecsAndClasses)):
                indVec2 = VecsAndClasses[j][0]
                vec2 = self.alternatives[indVec2].copy()
                class2 = VecsAndClasses[j][1]
                if (class1 > class2 and self.is_dominating(vec1, vec2)) or \
                (class1 < class2 and self.is_dominating(vec2, vec1)):
                    if [indVec1, class1] not in conflicts:
                        conflicts.append([indVec1, class1])
                    if [indVec2, class2] not in conflicts:
                        conflicts.append([indVec2, class2])
        return conflicts
    
    # Добавление векторов в классы вручную
    def add_manually(self, VecsAndClasses):
        for elem in VecsAndClasses:
            self.setsG[elem[0]] = [elem[1]]
            self.classes[elem[1]].append(elem[0])
            self.Yg.remove(elem[0])
        self.recalc_params()
        self.recalc_centers()
            
    # Классификация альтернатив
    def classificate(self, v):
        countQuest = 0
        classifiedByDM = [] # 
        while len(self.Yg) > 0:
            p = self.calc_p()
            g = self.calc_g()
            coeffs = self.calc_coeff(v, p, g)
            indMaxCoeff = np.where(coeffs == np.amax(coeffs))[0][0]
            indVector = self.Yg[indMaxCoeff]
            vector = self.alternatives[indVector]
            countQuest += 1
            resStr = "Classes:" 
            for indCl in range(self.numClasses):
                resStr += "\n\tClass {0}:\n\t\t".format(indCl + 1)
                for indVec in self.classes[indCl]:
                    resStr += "{0}  ".format(self.alternatives[indVec])
            resStr += "\n\nFrom subset classified: {0} of {1}".format(self.numAlternatives - len(self.Yg), self.numAlternatives)
            resStr += "\nQuestions to DM: {0}".format(countQuest)
            resStr += "\nVector: {0}".format(vector)
            resStr += "\nProbabilities:"
            for i in range(p[indMaxCoeff].size):
                resStr += " {0}({1})".format(int(g[indMaxCoeff][i]), round(p[indMaxCoeff][i], 3))
            resStr += "\nCoefficient: {0}".format(round(coeffs[indMaxCoeff], 3))
            resStr += "\nDispersion level: {0}\n\n".format(v)
            # clear_output(wait=True)
            print(resStr)
            inpClass = int(input()) - 1
            classifiedByDM.append([indVector, inpClass])
            if p[indMaxCoeff][inpClass] == 0:
                conflicts = self.find_conflicts(classifiedByDM)
                fixed = self.change_classes(classifiedByDM, conflicts)
                while self.check_consistency(fixed):
                    conflicts = self.find_conflicts(classifiedByDM)
                    fixed = self.change_classes(classifiedByDM, conflicts)
                classifiedByDM = self.make_copy(fixed)
                self.__init__(self.path, self.numClasses, self.sep, self.enc)
                self.add_manually(classifiedByDM)
            else:
                self.add_manually([[indVector, inpClass]])
        clear_output(wait=True)
        print("Questions to DM: {0}\n".format(countQuest))

In [3]:
obj = SAC("data.csv", 3)

obj.classificate(3)

clear_output(wait=True)
resStr = "Classes:"
for indClass in range(obj.numClasses):
    resStr += "\n\tClass {0}:\n\t\t".format(indClass + 1)
    for indVec in obj.classes[indClass]:
        resStr += "{0}  ".format(obj.alternatives[indVec])
resStr += "\n\nFrom subset classified: {0} of {1}".format(obj.numAlternatives - len(obj.Yg), obj.numAlternatives)
print(resStr)

Classes:
	Class 1:
		[1 1 1]  [1 2 3]  [1 1 2]  [1 1 3]  [1 2 1]  [1 2 2]  [1 3 2]  [1 3 1]  [1 3 3]  
	Class 2:
		[2 2 2]  [3 1 2]  [2 1 1]  [2 1 2]  [2 2 1]  [3 1 1]  [2 2 3]  [2 1 3]  [3 1 3]  
	Class 3:
		[3 3 3]  [2 3 1]  [2 3 2]  [2 3 3]  [3 3 1]  [3 3 2]  [3 2 2]  [3 2 3]  [3 2 1]  

From subset classified: 27 of 27
