In [2]:
import pandas as pd
import numpy as np
from sklearn.feature_selection import mutual_info_classif
from pathlib import Path
from frlearn.classifiers import FRNN
from sklearn.model_selection import train_test_split
from frlearn.base import probabilities_from_scores
from sklearn.metrics import roc_auc_score


index = ["bkblk","bknwy","bkon8","bkona","bkspr","bkxbq","bkxcr","bkxwp","blxwp","bxqsq","cntxt","dsopp","dwipd",
 "hdchk","katri","mulch","qxmsq","r2ar8","reskd","reskr","rimmx","rkxwp","rxmsq","simpl","skach","skewr",
 "skrxp","spcop","stlmt","thrsk","wkcti","wkna8","wknck","wkovl","wkpos","wtoeg","win"]


data = pd.read_csv('/home/hamid/hamash_amir/research/rough_set_cloned/kr-vs-kp_csv.csv')
data.rename(columns = {'class':"win"}, inplace=True)

ns = pd.get_dummies(data[data.columns[:-1]], prefix=data.columns[:-1],drop_first=True, dtype=int)

ns_y = data['win'].copy()
Y = pd.get_dummies(ns_y,prefix='win',drop_first=True, dtype=int)


class Colony:
    


    dataframe: object = ns
    pheromone = np.ones((ns.shape[1], ns.shape[1]))
    traversed_nodes = np.zeros((ns.shape[1],ns.shape[1]))
    ant_route: list[int] = []
    alpha: int = 0
    beta: int = 0
    feature_choices: list[str] = dataframe.columns.tolist()
    feature1: str
    log: list[str] = []
    fg: list[str] = feature_choices.copy()
    acc_criteria: float
    rho: int | float
    delta = np.zeros((ns.shape[0],ns.shape[1]))
    colony_number: int = 0
    overall_ant_route: dict = {}
    
    # def __init__(self):
    #     self.ant_route = ant_route
    #     self.feature1 = feature1
    #     self.fg = fg
    
    
    @classmethod
    def set_stopping_criteria(cls, criteria):
        cls.acc_criteria = criteria
    
    @classmethod
    def set_rate_decay(cls, rate_decay):
        cls.rho = rate_decay
    
    @classmethod
    def add_generation(cls):
        cls.colony_number += 1

    
    @classmethod
    def get_log(cls):
        print(cls.log)
    
    @classmethod
    def reset_colony(cls):
        
        Colony.pheromone = np.ones((ns.shape[1], ns.shape[1]))
        Colony.traversed_nodes = np.zeros((ns.shape[1], ns.shape[1]))

        Colony.overall_ant_route.clear()
        Colony.log = []
        Colony.fg = cls.feature_choices.copy()
        
    @classmethod
    def initialization_alpha_beta(cls, alpha, beta):
        
        cls.alpha = alpha
        cls.beta = beta

    @classmethod
    def reset_ant_route(cls):
        cls.ant_route = []

    @classmethod
    def initialize_feature1(cls):
        ant_route = cls.ant_route
        if not ant_route:
            cls.feature1 = np.random.choice(cls.feature_choices)
        else:
            cls.feature1 = cls.feature_choices[cls.ant_route[-1]]
    
    
    @classmethod
    def ant(cls):

        cls.initialization_alpha_beta(0.5, 0.5)
        cls.initialize_feature1()
        feature1 = cls.feature1
        features = cls.dataframe.columns.tolist()
        if any(feature1 in x for x in cls.fg):
            cls.fg.remove(feature1)
        i = features.index(feature1)
        if i not in cls.ant_route:
            cls.ant_route.append(i)

        feat1_dist_prob = {}
        for feature2 in cls.fg:
           
            pij = cls.__probability_transition_rule(cls.feature1, feature2)
      
            j = features.index(feature2)
            feat1_dist_prob[j] = pij

        
            cls.log.append(f'{features.index(cls.feature1)}' 
                           f'--{features.index(feature2)}--{float(pij)}')

    
        ant_next_target_index = [key for key, value in feat1_dist_prob.items()\
            if value == max(feat1_dist_prob.values())][0]
        cls.traversed_nodes[i,ant_next_target_index] = 1
        # mitting criterion
        cls.ant_route.append(ant_next_target_index)
        fg_index = [key for key,value in feat1_dist_prob.items()]

    
    @classmethod
    def ants(
        cls, make_initialize: bool | None=None,
            number_of_ants_first_gen: int | None=None,
            number_of_ants_next_gen: int | None=None, 
            criteria: int | str | None=None,
            rate_decay: float | None=None, 
            phero_rate_decay: float | None=None):
        
        if cls.colony_number == 0 or make_initialize == True:
            cls.initialize_colony(
                number_of_ants_first_generation=number_of_ants_first_gen,
                                init_criteria=criteria)
            cls.generate_next_ants(
                number_of_ants_next_generation=number_of_ants_next_gen,
                rate_decay=rate_decay, phero_rate_decay=phero_rate_decay)
        
    @classmethod
    def change_pheromone(cls, q, rho):
        """
        After each generation of several ants or after each generation 
        it update pheromone matrix according to following formula:
         
        phromone[i,j](t+1) = (1-rho)*phromone[i,j](t) + Delta(ij)(t)
        
        rho is pheromone decay coefficient along time
        
        Delta(ij)(t) = |_| = Delta(ij)(t) = 
                       |_|   q/sigma(landa_prime(ant_route)/len(ant_route)) ,if traversed_node[i,j] = 1
                       |_| = 0                                            ,if traversed_node[i,j] = 0
        
        landa_prime is rough_set_measure           
        """
        cls.set_rate_decay = rho
        minimum_ant_route_len = np.min([len(x) for x in cls.overall_ant_route.values])
        for i in range(cls.delta.shape[0]):
            for j in range(cls.delta.shape[0]):
                if cls.traversed_nodes[i,j] == 1:
                    cls.delta[i,j] = q/minimum_ant_route_len
                    cls.pheromone[i,j] = (1 - cls.rho)*cls.pheromone[i,j] + cls.delta[i,j]
                elif cls.traversed_nodes[i,j] == 0:
                    cls.delta[i,j] = 0
                    cls.pheromone[i,j] = (1 - cls.rho)*cls.pheromone[i,j] + cls.delta[i,j]
        

    @classmethod
    def positive_region(cls):
        # assuming df is your DataFrame
        # assuming the last column is the decision column
        # partition dataframe based on decision classes
        df = cls.dataframe
        partitions = [group for _, group in df.groupby(df.iloc[:,-1])]
    
        # find positive region for each partition
        positive_regions = [group[group.duplicated(df.columns[:-1], keep=False)] for group in partitions]
    
        # return union of all positive regions
        return pd.concat(positive_regions)


    
    @classmethod
    def __probability_transition_rule(cls, feature1, feature2):
        col_index = ns.columns.tolist()
        i = col_index.index(feature1)
        j = col_index.index(feature2)
        l = col_index.copy()
        

        feat1 = feature1
        feat2 = feature2

        mic_ij = mutual_info_classif(ns[[feat1, feat2]], y=Y['win_won'], random_state=0)[1]
        phrmn_ij = cls.pheromone[i,j]
        init = 0
        for k in range(0,len(l)):
            if k != l.index(feature2):
                phrmn_il = cls.pheromone[i,k]
                mic_il = cls.pheromone[i,k]
                init += (phrmn_il**cls.alpha) * (mic_il**cls.beta)
        pij = ((mic_ij**cls.beta) * (phrmn_ij**cls.alpha)) / init
        return pij

    @classmethod 
    def initialize_colony(cls, number_of_ants_first_generation, init_criteria):
        """
        first of all we run this method for initialize first generation of colony
        in this method we set manually number_of_ants_first_generation variable
        to a number As a CRITERIA just for initialize pheromone matrix.in next 
        generations we set rough set feature selection CRITERIA and when 
        selected features by each ant in a colony met this limit that ant stop
        exploration and next ant begins.
        CAUTIONS!: RUN THIS METHOD JUST ONE TIME IN EACH COLONY!
        """ 
        
        # cls.overal_ant_route = {}
        
        j = 0
        while j < number_of_ants_first_generation:
            i = 0
            while i < init_criteria-1: 
                cls.ant()
                i += 1
            cls.overall_ant_route[j] = cls.ant_route
            cls.reset_ant_route()
            j += 1
        cls.add_generation()

    @classmethod
    def is_rough_set_criteria_met(cls, selected_feature: list[int]) -> bool:
        
        data = ns.iloc[:, selected_feature]
        X_train, X_test, y_train, y_test = train_test_split(data.values, Y.values.squeeze(), test_size=0.33, random_state=42)
        
        clf = FRNN()
        model = clf(X_train, y_train)


        scores = model(X_test)

        probabilities = probabilities_from_scores(scores)
        auroc = roc_auc_score(y_test, probabilities[:,-1])

        if auroc >= cls.acc_criteria:
            return True
        else:
            return False 

        
    @classmethod 
    def generate_next_ants(cls, number_of_ants_next_generation: int, rate_decay, phero_rate_decay):
        
        try:
            if cls.colony_number > 0:
                j = 0
                while j < number_of_ants_next_generation:
                    i = 0
                    while not cls.is_rough_set_criteria_met(cls.ant_route): 
                    # for j in range(number_of_ants_first_generation):
                        cls.ant()
                        i += 1
                    j += 1
                    cls.reset_ant_route()
                cls.change_pheromone(q=rate_decay, rho=phero_rate_decay)
            else:
                error = Exception("Colony generation doesnt initialized first!")
                raise error
        except:
            print(error)


Unnamed: 0,win_won
0,1
1,1
2,1
3,1
4,1
...,...
3191,0
3192,0
3193,0
3194,0


In [None]:
ns.iloc[2398,:]

AUROC: 0.9863493471191769
Accuracy: 0.9488151658767773


In [12]:
from __future__ import annotations

from typing import Callable

import numpy as np

from frlearn.base import DataDescriptor, FeatureSelector, ClassSupervised
from frlearn.statistics.feature_preprocessors import Standardiser
from frlearn.array_functions import soft_min
from frlearn.t_norms import lukasiewicz_t_norm
from frlearn.uncategorised.quantifiers import QuadraticSigmoid
from frlearn.weights import QuantifierWeights
from frlearn.base import MultiClassClassifier
fr = MultiClassClassifier

In [13]:
index = ["bkblk","bknwy","bkon8","bkona","bkspr","bkxbq","bkxcr","bkxwp","blxwp","bxqsq","cntxt","dsopp","dwipd",
 "hdchk","katri","mulch","qxmsq","r2ar8","reskd","reskr","rimmx","rkxwp","rxmsq","simpl","skach","skewr",
 "skrxp","spcop","stlmt","thrsk","wkcti","wkna8","wknck","wkovl","wkpos","wtoeg","win"]


data = pd.read_csv('/home/hamid/hamash_amir/research/rough_set_cloned/kr-vs-kp_csv.csv')
data.rename(columns = {'class':"win"}, inplace=True)

ns = pd.get_dummies(data[data.columns[:-1]], prefix=data.columns[:-1],drop_first=True, dtype=int)

ns_y = data['win'].copy()
Y = pd.get_dummies(ns_y,prefix='win',drop_first=True, dtype=int)

In [14]:
X = ns.values
y = Y.values.squeeze()
# from frlearn.classifiers import FuzzyRoughEnsemble
# clf = FuzzyRoughEnsemble()
# clf.
# model = clf(X, y)
# model.
owa_weights = QuantifierWeights(QuadraticSigmoid(0.2, 1))
t_norm=lukasiewicz_t_norm
# X_scaled = Standardiser()(X)(X)
R_a = np.minimum(np.maximum(1 - np.abs(X[:, None, :] - X), 0), y[:, None, None] != y[:, None])
print(R_a)
R = t_norm(R_a, axis=-1)
pos_size = np.sum(soft_min(1 - R, owa_weights, k=None, axis=-1))

[[[0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]
  ...
  [0 1 1 ... 1 0 1]
  [0 1 0 ... 0 0 1]
  [0 1 0 ... 0 0 1]]

 [[0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]
  ...
  [0 1 1 ... 1 0 1]
  [0 1 0 ... 0 0 1]
  [0 1 0 ... 0 0 1]]

 [[0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]
  ...
  [0 1 1 ... 1 0 1]
  [0 1 0 ... 0 0 1]
  [0 1 0 ... 0 0 1]]

 ...

 [[0 1 1 ... 1 0 1]
  [0 1 1 ... 1 0 1]
  [0 1 1 ... 1 0 1]
  ...
  [0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]]

 [[0 1 0 ... 0 0 1]
  [0 1 0 ... 0 0 1]
  [0 1 0 ... 0 0 1]
  ...
  [0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]]

 [[0 1 0 ... 0 0 1]
  [0 1 0 ... 0 0 1]
  [0 1 0 ... 0 0 1]
  ...
  [0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]
  [0 0 0 ... 0 0 0]]]


In [9]:
r

array([[[False],
        [False],
        [False],
        ...,
        [ True],
        [ True],
        [ True]],

       [[False],
        [False],
        [False],
        ...,
        [ True],
        [ True],
        [ True]],

       [[False],
        [False],
        [False],
        ...,
        [ True],
        [ True],
        [ True]],

       ...,

       [[ True],
        [ True],
        [ True],
        ...,
        [False],
        [False],
        [False]],

       [[ True],
        [ True],
        [ True],
        ...,
        [False],
        [False],
        [False]],

       [[ True],
        [ True],
        [ True],
        ...,
        [False],
        [False],
        [False]]])

In [5]:
X = ns.values
y = Y.values.squeeze()
max = np.maximum(1 - np.abs(X[:, None, :] - X), 0)

In [6]:
max.shape

(3196, 3196, 37)

In [7]:
r = y[:, None, None] != y[:, None]
r.shape

(3196, 3196, 1)

In [15]:
R.shape

(3196, 3196)

In [19]:
X = ns.values
y = Y.values.squeeze()
co_Cs = [X[np.where(y != c)] for c in [0,1]]

0

In [27]:
Colon.make_change()

In [28]:
Colon.pherom

5

In [29]:
ant = Ant()
ant.multiple()
Colon.pherom

10