# Equidad - Fairness - en el Aprendizaje Automático (ML) para problemas de clasificación


* En ML, un modelo es ***justo*** o ***tiene equidad*** si ***sus predicciones son independientes de un cierto conjunto de variables*** que consideramos ***sensibles*** (ejm-> genero, etnia, religión, edad, estado civil, orientación sexual, etc.)


* En problemas de clasificación, los modelos aprenden una función $h(X)$ para predecir una variable discreta $Y$, a partir de unas características conocidas $X$.


## Criterios de Equidad


* Se han definido 3 criterios de equidad (independencia, separación y suficiencia) para evaluar si un clasificador es justo; es decir, si sus predicciones no están influenciadas por alguna/s de las variables sensibles.


* Para evaluar estos 3 criterios consideraremos:

    + $X$: Conjunto de variables (características) que describen a un elemento.
    + $A$: Variable aleatoria protegida o sensible (genero, etnia, edad, etc.).
    + $h(X)$: Modelo de clasificación (función predictora).
    + $Y$: Predicción del clasificador (y_predict)
    + $T$: Target (y_true)
    

* Para ver un ejemplo de cálculo de estos criterios, usaremos el siguiente "dataset" de ejemplo donde:

    + $A -> genero$: toma los valores $\{'Hombre', 'Mujer'\}$
    + $T -> target$: es binario y toma los valores $\{0: negativo, 1: positivo\}$ 
    + $Y -> predicción$: es binario y toma los valores $\{0: negativo, 1: positivo\}$ 
    

|id|genero|T: target|Y: predicción|
|--|--|--|--|
|1|Hombre|1|1|
|2|Hombre|1|1|
|3|Mujer|0|0|
|4|Hombre|0|1|
|5|Mujer|1|0|
|6|Hombre|1|0|
|7|Hombre|1|1|
|8|Mujer|1|1|
|9|Hombre|0|0|
|10|Mujer|0|0|




### 1.- Independencia

* Decimos que las variables aleatorias $(Y,A)$ satisfacen la independencia si la variable sensible $A$ es estadísticamente independiente a la predicción $Y$.


$$P(Y=y \mid  A=a) = P(Y=y \mid  A=b)$$

#### ¿Cumple el criterio de Independencia?


* Evidentemente en cualquier caso real nunca van a ser las probabilidades iguales, por lo que hay que establecer un umbral $\epsilon$ en el que se considere que cumple o no el criterio de independencia.


* Por tanto el modelo cumple el criterio de independencia si:


$$\left | \  P(Y=y \mid  A=a) - P(Y=y \mid  A=b) \ \right | \leq  \epsilon$$


#### Ejemplo:


* *La probabilidad de ser clasificado por el algoritmo en cada uno de los grupos* $\{0: negativo, 1: positivo\}$ *es la misma para dos elementos (individuos) con características sensibles distinta*s $\{'Hombre', 'Mujer'\}$.


$$P(Y=1 \mid  A=Hombre) = P(Y=1 \mid  A=Mujer)$$


* $P(Y=1 \mid  A=Hombre) = \frac{4}{6} = \frac{4 \ predicción \ =1}{6 \ hombres} = 0.66$:

|id|genero|Y: predicción|
|--|--|--|--|
|1|Hombre|1|
|2|Hombre|1|
|4|Hombre|1|
|6|Hombre|0|
|7|Hombre|1|
|9|Hombre|0|

* $P(Y=1 \mid  A=Mujer) = \frac{1}{4} = \frac{1 \ predicción \ =1}{4 \ mujeres} = 0.25$:

|id|genero|Y: predicción|
|--|--|--|--|
|3|Mujer|0|
|5|Mujer|0|
|8|Mujer|1|
|10|Mujer|0|


* Por tanto:

$$ \frac{4}{6} \neq \frac{1}{4} \rightarrow 0.66 \neq 0.25$$



* De manera similar, podemos ver la independencia frente al target (y_true) para saber si la realidad esta sesgada:


$$P(T=t \mid  A=a) = P(T=t \mid  A=b)$$


$$P(T=1 \mid  A=Hombre) = P(T=1 \mid  A=Mujer)$$ 


$$\frac{4}{6} \neq \frac{2}{4} \rightarrow 0.66 \neq 0.5$$





### 2.- Separación


* Decimos que las variables aleatorias $(Y,A,T)$ satisfacen la separación si la variable sensible $A$ es estadisticamente independientes a la predicción $Y$ dado el valor objetivo $T$.


* "*La probabilidad de predecir un Verdadero Positivo y un Falso Positivo para cada grupo debe de ser la misma*".

$$P(Y=1 \mid T=1, A=a) = P(Y=1 \mid T=1, A=b)$$
$$P(Y=1 \mid T=0, A=a) = P(Y=1 \mid T=0, A=b)$$


* Una ligera "simplificación" de este criterio sería la de tomar solo la probablidad de Verdaderos Positivos, asumiendo que "elementos similares" deben de ser tratados por igual.

#### ¿Cumple el criterio de Separación?


* Una relajación del criterio de separación vendría dado por que la diferencia entre tasas no superase un determinado umbral $\epsilon$


$$\left | \  P(Y=1 \mid T=1, A=a) - P(Y=1 \mid T=1, A=b) \ \right | \leq  \epsilon$$



#### Ejemplo:


$$P(Y=1 \mid T=1, A=Hombre) = P(Y=1 \mid T=1, A=Mujer)$$


* $P(Y=1 \mid T=1, A=Hombre) = \frac{3}{4} = \frac{3 \ predicción \ = 1}{4 \ hombres \ target \ =1} = 0.75$:


|id|genero|T: target|Y: predicción|
|--|--|--|--|
|1|Hombre|1|1|
|2|Hombre|1|1|
|6|Hombre|1|0|
|7|Hombre|1|1|


* $P(Y=1 \mid T=1, A=Mujer) = \frac{1}{2} = \frac{1 \ predicción \ = 1}{2 \ mujeres \ target=1} = 0.5$:


|id|genero|T: target|Y: predicción|
|--|--|--|--|
|5|Mujer|1|0|
|8|Mujer|1|1|


* Por tanto:

$$ \frac{3}{4} \neq \frac{1}{2} \rightarrow 0.75 \neq 0.5$$


### 3.- Suficiencia


* Decimos que las variables aleatorias $(Y,A,T)$ satisfacen la suficiencia si la variable sensible $A$ es estadísticamente independiente al valor objetivo $T$ dada la predicción $Y$.


$$P(T=1 \mid Y=1, A=a) = P(T=1 \mid Y=1, A=b)$$


* Esto significa que la probabilidad de estar en realidad en cada uno de los grupos es la misma para dos individuos con características sensibles distintas dado que la predicción los englobe en el mismo grupo.


#### ¿Cumple el criterio de Suficiencia?


* Una relajación del criterio de suficiencia vendría dado por que la diferencia entre tasas no superase un determinado umbral $\epsilon$


$$\left | \  P(T=1 \mid Y=1, A=a) - P(T=1 \mid Y=1, A=b) \ \right | \leq  \epsilon$$


#### Ejemplo:


$$P(T=1 \mid Y=1, A=Hombre) = P(T=1 \mid Y=1, A=Mujer)$$



* $P(T=1 \mid Y=1, A=Hombre) = \frac{3}{4} = \frac{3 \ target = 1}{4 \ hombres \ prediccion \ =1} = 0.75$

|id|genero|T: target|Y: predicción|
|--|--|--|--|
|1|Hombre|1|1|
|2|Hombre|1|1|
|4|Hombre|0|1|
|7|Hombre|1|1|


* $P(T=1 \mid Y=1, A=Mujer) = \frac{1}{1} = \frac{1 \ target = 1}{1 \ Mujer \ prediccion \ =1} = 1.0$


|id|genero|T: target|Y: predicción|
|--|--|--|--|
|8|Mujer|1|1|


* Por tanto:

$$ \frac{3}{4} \neq \frac{1}{1} \rightarrow 0.75 \neq 1.0$$



<hr>

# IMPLEMENTACIÓN



In [1]:
import pandas as pd

# Definimos el Dataset de ejemplo
df_dataset = pd.DataFrame(
    {
        'Genero': ['Hombre', 'Hombre', 'Mujer', 'Hombre', 'Mujer', 'Hombre', 'Hombre', 'Mujer', 'Hombre', 'Mujer'],
        'Color':  ['AZUL',   'AZUL',   'VERDE', 'AZUL',   'AZUL',   'VERDE', 'ROSA',    'ROSA', 'ROSA',   'ROSA'],
        'y_true':    ['SI', 'SI', 'NO', 'NO', 'SI', 'SI', 'SI', 'SI', 'NO', 'NO'],
        'y_predict': ['SI', 'SI', 'NO', 'SI', 'NO', 'NO', 'SI', 'SI', 'NO', 'NO']}, 
    columns=['Genero', 'Color', 'y_true', 'y_predict'])

df_dataset

Unnamed: 0,Genero,Color,y_true,y_predict
0,Hombre,AZUL,SI,SI
1,Hombre,AZUL,SI,SI
2,Mujer,VERDE,NO,NO
3,Hombre,AZUL,NO,SI
4,Mujer,AZUL,SI,NO
5,Hombre,VERDE,SI,NO
6,Hombre,ROSA,SI,SI
7,Mujer,ROSA,SI,SI
8,Hombre,ROSA,NO,NO
9,Mujer,ROSA,NO,NO


In [2]:
# Definimos el Dataset de ejemplo
df_dataset2 = pd.DataFrame(
    {
        'Genero': ['ROJO', 'ROJO', 'ROJO', 'ROJO', 'AZUL', 'AZUL', 'AZUL', 'AZUL', 'AZUL', 'VERDE', 'VERDE', 'VERDE'],
        'y_true':    ['SI', 'SI', 'NO', 'XX', 'SI', 'SI', 'NO', 'NO', 'XX', 'SI', 'NO', 'XX'],
        'y_predict': ['SI', 'NO', 'XX', 'SI', 'SI', 'XX', 'SI', 'NO', 'XX', 'SI', 'SI', 'XX']}, 
    columns=['Genero', 'y_true', 'y_predict'])

df_dataset2

Unnamed: 0,Genero,y_true,y_predict
0,ROJO,SI,SI
1,ROJO,SI,NO
2,ROJO,NO,XX
3,ROJO,XX,SI
4,AZUL,SI,SI
5,AZUL,SI,XX
6,AZUL,NO,SI
7,AZUL,NO,NO
8,AZUL,XX,XX
9,VERDE,SI,SI


In [15]:
from typing import Dict, List, Tuple

from sklearn.metrics import confusion_matrix
import pandas as pd

from copy import deepcopy


# EXCEPCIONES: Tratar divisiones entre 0

class Fairness:
    
    __BINARY = 2
    __OTHER = 'OTHER'
    __FAIRNESS_SCORE = {'A+': 0.01, 'A': 0.03, 'B': 0.05, 'C': 0.1, 'D': 0.2, 'E': 1.0}

    def __init__(self, fairness_params: Dict):
        self.fairness_params = fairness_params
        self.target_values = None
        self.confusion_matrix = None
        self.correlation_matrix = None
        self.independence_info = list()
        self.separation_info = list()
        self.sufficience_info = list()
        self.independence_score = list()
        self.separation_score = list()
        self.sufficience_score = list()
        
    
            
    def pre_processing(self, df: pd.DataFrame, sensitive_cols: List[str], target_col: str, predict_col: str) -> None:
        """Función que realiza los procesamientos de datos previos a los cálculos "core" de la clase
        """
        # Obtengo los distintos valores del target
        self.target_values = df[predict_col].unique()
        
        # En las filas se representa el target, en las columnas las predicciones
        self.confusion_matrix = pd.crosstab(df[target_col], 
                                            df[predict_col], 
                                            rownames=[target_col], 
                                            colnames=[predict_col])
    
    def in_processing(self, df: pd.DataFrame, sensitive_cols: List[str], target_col: str, predict_col: str) -> None:
        """Función que realiza los procesamientos "cores" de la clase
        """
        # Para cada una de las variables sensibles
        for sensitive_col in sensitive_cols:
            
            # Obtengo los distintos valores de la variable sensible; para ver si es o no binaria
            sensitive_values = df[sensitive_col].unique()
            is_sensitive_col_binary = True if len(sensitive_values) == self.__BINARY else False
            
            # Para cada uno de los Targets
            for target_label in self.target_values:
            
                # Para cada uno de los valores sensibles
                for sensitive_value in sensitive_values:
                
                    # Binarizo las predicciones y target del Dataset
                    df_process = deepcopy(df)
                    df_process.loc[df_process[target_col] != target_label, target_col] = self.__OTHER
                    df_process.loc[df_process[predict_col] != target_label, predict_col] = self.__OTHER
                    
                    if is_sensitive_col_binary:
                        print("BINARY: {} - [{}] - {}".format(sensitive_col, " | ".join(sensitive_values), target_label))
                        self.process_sensitive_column(df=df_process,
                                                      sensitive_col=sensitive_col,
                                                      target_col=target_col,
                                                      predict_col=predict_col,
                                                      target_label=target_label,
                                                      sensitive_values=sensitive_values,
                                                      is_sensitive_col_binary=True)
                        break
                    else:
                        print("MULTIPLE: {} - {} - {}".format(sensitive_col, sensitive_value, target_label))
                        # Binarización de los valores sensibles para analizarlos de 1 en 1
                        df.loc[df[sensitive_col] != sensitive_value, sensitive_col] = self.__OTHER
                        self.process_sensitive_column(df=df_process,
                                                      sensitive_col=sensitive_col,
                                                      target_col=target_col,
                                                      predict_col=predict_col,
                                                      target_label=target_label,
                                                      sensitive_values=[sensitive_value, self.__OTHER],
                                                      is_sensitive_col_binary=False)

    
    def post_processing(self, df: pd.DataFrame, sensitive_cols: List[str], target_col: str, predict_col: str) -> None:
        """Función que realiza los procesamientos de datos posteriores a los cálculos "core" de la clase
        """
        pass    
    
    def process_sensitive_column(self, df: pd.DataFrame, sensitive_col: str, target_col: str, predict_col: str, 
                                 target_label: str, sensitive_values: List[str], is_sensitive_col_binary: bool) -> None:
        """Función que tiene que escribir en los atributos de la clase correspondientes los diferentes valores de los
        criterios para cada par de valores "valorVariable-valorTarget"
        """
        # Obtengo los resultados
        independence, separation, sufficience = self.fit_metrics(df=df,
                                                                 sensitive_col=sensitive_col,
                                                                 target_col=target_col,
                                                                 predict_col=predict_col,
                                                                 target_label=target_label,
                                                                 sensitive_values=sensitive_values)
        print(independence)
        print(separation)
        print(sufficience)
                    
    def fit_independence(self, df: pd.DataFrame, sensitive_col: str, predict_col: str, target_label: str, 
                         sensitive_values: List[str]) -> float:
        """
        A-> Variable sensible
        Y-> Predicción
        P(Y=y∣A=a) == P(Y=y∣A=b)
        """
        try:
            prob_a = (((df[(df[sensitive_col]==sensitive_values[0]) & (df[predict_col]==target_label)].shape[0])) / 
                      (df[df[sensitive_col]==sensitive_values[0]].shape[0]))
        except ZeroDivisionError:
            prob_a = 0
            print('WARNING: Probability P(Y={}|A={}) result is Zero, because ZeroDivisionError'
                  .format(target_label, sensitive_values[0]))
            
        try:
            prob_b = (((df[(df[sensitive_col]==sensitive_values[1]) & (df[predict_col]==target_label)].shape[0])) / 
                      (df[df[sensitive_col]==sensitive_values[1]].shape[0]))
        except ZeroDivisionError:
            prob_b = 0
            print('WARNING: Probability P(Y={}|A={}) result is Zero, because ZeroDivisionError'
                  .format(target_label, sensitive_values[1]))
            
#         print('\tProb A = {}'.format(prob_a))
#         print('\tProb B = {}'.format(prob_b))
        return abs(prob_a - prob_b)
    
    def fit_separation(self, df: pd.DataFrame, sensitive_col: str, target_col: str, predict_col: str, target_label: str, 
                       sensitive_values: List[str]) -> float:
        """      
        A-> Variable sensible
        Y-> Predicción
        T-> Target
        P(Y=1∣T=1,A=a)=P(Y=1∣T=1,A=b)
        """
        try:
            prob_a = ((df[(df[sensitive_col]==sensitive_values[0]) & (df[target_col]==target_label) & 
                          (df[predict_col]==target_label)]).shape[0] / 
                      (df[(df[sensitive_col]==sensitive_values[0]) & (df[target_col]==target_label)]).shape[0])
        except ZeroDivisionError:
            prob_a = 0
            print('WARNING: Probability P(Y={}|T={}, A={}) result is Zero, because ZeroDivisionError'
                  .format(target_label, target_label, sensitive_values[0]))
            
        try:
            prob_b = ((df[(df[sensitive_col]==sensitive_values[1]) & (df[target_col]==target_label) &
                          (df[predict_col]==target_label)]).shape[0] / 
                      (df[(df[sensitive_col]==sensitive_values[1]) & (df[target_col]==target_label)]).shape[0])
        except ZeroDivisionError:
            prob_b = 0
            print('WARNING: Probability P(Y={}|T={}, A={}) result is Zero, because ZeroDivisionError'
                  .format(target_label, target_label, sensitive_values[1]))
        
#         print('\tProb A = {}'.format(prob_a))
#         print('\tProb B = {}'.format(prob_b))
        return abs(prob_a - prob_b)
    
    def fit_sufficiency(self, df: pd.DataFrame, sensitive_col: str, target_col: str, predict_col: str, target_label: str, 
                        sensitive_values: List[str]) -> float:
        """
        A-> Variable sensible
        Y-> Predicción
        T-> Target
        P(T=1∣Y=1,A=a)=P(T=1∣Y=1,A=b)
        """
        try:
            prob_a = ((df[(df[sensitive_col]==sensitive_values[0]) & (df[target_col]==target_label) &
                          (df[predict_col]==target_label)]).shape[0] / 
                      (df[(df[sensitive_col]==sensitive_values[0]) & (df[predict_col]==target_label)]).shape[0])
        except ZeroDivisionError:
            prob_a = 0
            print('WARNING: Probability P(T={}|Y={}, A={}) result is Zero, because ZeroDivisionError'
                  .format(target_label, target_label, sensitive_values[0]))
            
        try:    
            prob_b = ((df[(df[sensitive_col]==sensitive_values[1]) & (df[target_col]==target_label) &
                          (df[predict_col]==target_label)]).shape[0] / 
                      (df[(df[sensitive_col]==sensitive_values[1]) & (df[predict_col]==target_label)]).shape[0])
        except ZeroDivisionError:
            prob_b = 0
            print('WARNING: Probability P(T={}|Y={}, A={}) result is Zero, because ZeroDivisionError'
                  .format(target_label, target_label, sensitive_values[1]))
        
#         print('\tProb A = {}'.format(prob_a))
#         print('\tProb B = {}'.format(prob_b))
        return abs(prob_a - prob_b)


    def fit_metrics(self, df: pd.DataFrame, sensitive_col: str, target_col: str, predict_col: str, target_label: str, 
                    sensitive_values: List[str]) -> Tuple[float, float, float]:
        
        independence = self.fit_independence(df=df,
                                             sensitive_col=sensitive_col,
                                             predict_col=predict_col,
                                             target_label=target_label,
                                             sensitive_values=sensitive_values)
#         print("Independence: {}\n".format(independence))
        separation = self.fit_separation(df=df,
                                         sensitive_col=sensitive_col,
                                         target_col=target_col,
                                         predict_col=predict_col,
                                         target_label=target_label,
                                         sensitive_values=sensitive_values)
#         print("Separation: {}\n".format(separation))
        sufficience = self.fit_sufficiency(df=df,
                                           sensitive_col=sensitive_col,
                                           target_col=target_col,
                                           predict_col=predict_col,
                                           target_label=target_label,
                                           sensitive_values=sensitive_values)
#         print("Sufficience: {}\n\n".format(sufficience))
        
        return independence, separation, sufficience
    
    def score_weight(self, df: pd.DataFrame, sensitive_col: str, 
                     predict_col: str, ground_truth: str, groupby_cols: List[str]) -> float:
        """
        Función para calcular el porcentaje (peso) que supone el cálculo de algún criterio respecto se su
        predicción y variable sensible
        """
        dfp = df.groupby(groupby_cols)[sensitive_col].agg({'count' : 'count'}).reset_index()
        dfp['pct'] = dfp['count'] / dfp['count'].sum()
        return dfp[dfp[predict_col] == ground_truth]['pct'].iloc[0]


    def get_fairness_score(self, score: float, fairness_score: Dict) -> str:
            """Función de devuelve la categoria Fairness en función de us Score dado
            """
            # Ordenamos el diccionario con los valores de menor a mayor
            try:
                fairness_score = dict(sorted(fairness_score.items(), key=lambda item: item[1], reverse=False))
            except:
                print('Error con el diccionario de Fairness Scores')

            if fairness_score[(list(fairness_score.keys())[-1])] < score:
                print('ERROR: El score dado es superior al valor máximo. Se devuelve la categoría más alta por defecto')
                return (list(fairness_score.keys())[-1])
            else:   
                for k, v in fairness_score.items():
                    if score < v:
                        return k
                        break
            
            
    def fit(self, df: pd.DataFrame, sensitive_cols: List[str], target_col: str, predict_col: str) -> None:
        
        self.pre_processing(df=df, 
                            sensitive_cols=sensitive_cols, 
                            target_col=target_col, 
                            predict_col=predict_col)
        
        self.in_processing(df=df, 
                            sensitive_cols=sensitive_cols, 
                            target_col=target_col, 
                            predict_col=predict_col)
        
        self.post_processing(df=df, 
                            sensitive_cols=sensitive_cols, 
                            target_col=target_col, 
                            predict_col=predict_col)
                   

fairness = Fairness(fairness_params={})
dict_result = fairness.fit(df=df_dataset,
                           sensitive_cols=['Genero', 'Color'],
                           target_col='y_true', 
                           predict_col='y_predict')

df_result = pd.DataFrame.from_dict(dict_result)
df_result
dict_result


BINARY: Genero - [Hombre | Mujer] - SI
0.41666666666666663
0.25
0.25
BINARY: Genero - [Hombre | Mujer] - NO
0.4166666666666667
0.5
0.16666666666666663
MULTIPLE: Color - AZUL - SI
0.75
0.6666666666666666
0.6666666666666666
MULTIPLE: Color - VERDE - SI
0.3333333333333333
0.6666666666666666
1.0
MULTIPLE: Color - ROSA - SI
0.5
0.6666666666666666
0.8
MULTIPLE: Color - AZUL - NO
0.5
0.75
0.6
MULTIPLE: Color - VERDE - NO
0.5
0.75
0.6
MULTIPLE: Color - ROSA - NO
0.5
0.75
0.6


In [16]:
fairness.confusion_matrix

y_predict,NO,SI
y_true,Unnamed: 1_level_1,Unnamed: 2_level_1
NO,3,1
SI,2,4


In [None]:
from typing import Dict, List, Tuple

from sklearn.metrics import confusion_matrix
import pandas as pd

from copy import deepcopy


# EXCEPCIONES: Tratar divisiones entre 0

class Fairness:
    
    __BINARY = 2
    __OTHER = 'OTHER'
    __FAIRNESS_SCORE = {'A+': 0.01, 'A': 0.03, 'B': 0.05, 'C': 0.1, 'D': 0.2, 'E': 1.0}

    def __init__(self, fairness_params: Dict):
        self.fairness_params = fairness_params
        self.target_values = None
        self.confusion_matrix = None
        self.correlation_matrix = None
        self.independence_info = None
        self.separation_info = None
        self.sufficience_info = None
        self.independence_score = None
        self.separation_score = None
        self.sufficience_score = None
        
    
            
    def pre_processing(self, df_dataset: pd.DataFrame, sensitive_cols: List[str], target_col: str, predict_col: str):
        """Función que realiza los procesamientos de datos previos a los cálculos "core" de la clase
        """
        # Obtengo los distintos valores del target
        self.target_values = df_dataset[predict_col].unique()
        
        # En las filas se representa el target, en las columnas las predicciones
        self.confusion_matrix = pd.crosstab(df_dataset[target_col], 
                                            df_dataset[predict_col], 
                                            rownames=[target_col], 
                                            colnames=[predict_col])
    
    def in_processing(self, df_dataset: pd.DataFrame, sensitive_cols: List[str], target_col: str, predict_col: str):
        """Función que realiza los procesamientos "cores" de la clase
        """
        # Para cada una de las variables sensibles
        for sensitive_col in sensitive_cols:
            
            # Obtengo los distintos valores de la variable sensible; para ver si es o no binaria
            sensitive_values = df_dataset[sensitive_col].unique()
            is_sensitive_col_binary = True if len(sensitive_values) == self.__BINARY else False
            
            # Para cada uno de los valores sensibles
            for sensitive_value in sensitive_values:
                
                # Para cada uno de los Targets
                for target_value in self.target_values:
                    
                    # Binarizo las predicciones y target del Dataset
                    df = deepcopy(df_dataset)
                    df.loc[df[target_col] != target_value, target_col] = self.__OTHER
                    df.loc[df[predict_col] != target_value, predict_col] = self.__OTHER
                    
                    print("{} - {} - {}".format(sensitive_col, sensitive_value, target_value))
                    print(df)
    
    def post_processing(self, df_dataset: pd.DataFrame, sensitive_cols: List[str], target_col: str, predict_col: str):
        """Función que realiza los procesamientos de datos posteriores a los cálculos "core" de la clase
        """
        pass
    
    
    def get_fairness_score(self, score: float, fairness_score: Dict):
        """Función de devuelve la categoria Fairness en función de us Score dado
        """
        # Ordenamos el diccionario con los valores de menor a mayor
        try:
            fairness_score = dict(sorted(fairness_score.items(), key=lambda item: item[1], reverse=False))
        except:
            print('Error con el diccionario de Fairness Scores')

        if fairness_score[(list(fairness_score.keys())[-1])] < score:
            print('ERROR: El score dado es superior al valor máximo. Se devuelve la categoría más alta por defecto')
            return (list(fairness_score.keys())[-1])
        else:   
            for k, v in fairness_score.items():
                if score < v:
                    return k
                    break
            
            
    def fit(self, df_dataset: pd.DataFrame, sensitive_cols: List[str], target_col: str, predict_col: str) -> List[Dict]:
        
        self.pre_processing(df_dataset=df_dataset, 
                            sensitive_cols=sensitive_cols, 
                            target_col=target_col, 
                            predict_col=predict_col)
        
        self.in_processing(df_dataset=df_dataset, 
                            sensitive_cols=sensitive_cols, 
                            target_col=target_col, 
                            predict_col=predict_col)
        
        self.post_processing(df_dataset=df_dataset, 
                            sensitive_cols=sensitive_cols, 
                            target_col=target_col, 
                            predict_col=predict_col)
         
        
        metrics = list()
        
        for ground_truth in self.ground_truth_values:
        
            # Para cada una de las variables sensibles
            for column in sensitive_cols:
            
                # Obtengo los distintos valores de la variable sensible; para ver si es o no binaria
                sensitive_values = df_dataset[column].unique()
                is_sensitive_col_binary = True if len(sensitive_values) == 2 else False
            

                # Para cada uno de los valores sensibles
                for sensitive_value in sensitive_values:
                    
                    # Binarizo las predicciones y target del Dataset
                    df_process = deepcopy(df_dataset)
                    df_process.loc[df_process[target_col] != ground_truth, target_col] = self._OTHER
                    df_process.loc[df_process[predict_col] != ground_truth, predict_col] = self._OTHER
                    
                    # Si sensitive feature es binaria -> "1 sensitive feature"
                    if is_sensitive_col_binary:
#                     if False:
                        print("\n{} - {} - {}".format(column, ", ".join(sensitive_values), ground_truth))
                        # Obtengo los resultados
                        independence, separation, sufficience = self.fit_metrics(df=df_process,
                                                                                 sensitive_col=column,
                                                                                 target_col=target_col,
                                                                                 predict_col=predict_col,
                                                                                 ground_truth=ground_truth,
                                                                                 sensitive_values=sensitive_values)
                        score_weight = self.score_weight(df=df_process, 
                                                         groupby_cols=[predict_col], 
                                                         sensitive_col=column, 
                                                         predict_col=predict_col,
                                                         ground_truth=ground_truth)
                        score_weight_sufficience = self.score_weight(df=df_process, 
                                                         groupby_cols=[target_col], 
                                                         sensitive_col=column, 
                                                         predict_col=target_col,
                                                         ground_truth=ground_truth)
                        metrics.append({'Sensitive_Feature': column,
                                        'is_Binary_Sensitive_feature': is_sensitive_col_binary,
                                        'Sensitive_Value': ", ".join(sensitive_values),
                                        'Ground_Truth': ground_truth, 
                                        'Independence_score': independence,
                                        'Separation_score': separation,
                                        'Sufficience_score': sufficience, 
                                        'Independence_Score_weight': score_weight,
                                        'Separation_Score_weight': score_weight,
                                        'Sufficience_Score_weight': score_weight_sufficience})
                        break
                    else:
                        print("\n{} - {} - {}".format(column, sensitive_value, ground_truth))
                        
                        # Binarizo los valores sensibles para analizarlos de 1 en 1
                        df_process.loc[df_process[column] != sensitive_value, column] = self._OTHER
                        # Obtengo los resultados
                        independence, separation, sufficience = self.fit_metrics(df=df_process,
                                                                                 sensitive_col=column,
                                                                                 target_col=target_col,
                                                                                 predict_col=predict_col,
                                                                                 ground_truth=ground_truth,
                                                                                 sensitive_values=[sensitive_value, self._OTHER])
                        score_weight = self.score_weight(df=df_process, 
                                                         groupby_cols=[column, predict_col], 
                                                         sensitive_col=column, 
                                                         predict_col=predict_col,
                                                         ground_truth=ground_truth)
                        score_weight_sufficience = self.score_weight(df=df_process, 
                                                         groupby_cols=[column, target_col], 
                                                         sensitive_col=column, 
                                                         predict_col=target_col,
                                                         ground_truth=ground_truth)
                        metrics.append({'Sensitive_Feature': column,
                                        'is_Binary_Sensitive_feature': is_sensitive_col_binary,
                                        'Sensitive_Value': sensitive_value,
                                        'Ground_Truth': ground_truth,
                                        'Independence_score': independence,
                                        'Separation_score': separation, 
                                        'Sufficience_score': sufficience, 
                                        'Independence_Score_weight': score_weight,
                                        'Separation_Score_weight': score_weight,
                                        'Sufficience_Score_weight': score_weight_sufficience})
            
        return metrics
    
    
    def fit_metrics(self, df: pd.DataFrame, sensitive_col: str, target_col: str, predict_col: str,
                       ground_truth: str, sensitive_values: List[str]) -> Tuple[float, float, float]:
        
        independence = self.fit_independence(df=df,
                                             sensitive_col=sensitive_col,
                                             predict_col=predict_col,
                                             ground_truth=ground_truth,
                                             sensitive_values=sensitive_values)
        print("Independence: {}\n".format(independence))
        separation = self.fit_separation(df=df,
                                         sensitive_col=sensitive_col,
                                         target_col=target_col,
                                         predict_col=predict_col,
                                         ground_truth=ground_truth,
                                         sensitive_values=sensitive_values)
        print("Separation: {}\n".format(separation))
        sufficience = self.fit_sufficiency(df=df,
                                           sensitive_col=sensitive_col,
                                           target_col=target_col,
                                           predict_col=predict_col,
                                           ground_truth=ground_truth,
                                           sensitive_values=sensitive_values)
        print("Sufficience: {}\n\n".format(sufficience))
        
        return independence, separation, sufficience
        
    
    def fit_independence(self, df: pd.DataFrame, sensitive_col: str, predict_col: str, 
                         ground_truth: str, sensitive_values: List[str]) -> float:
        """
        A-> Variable sensible
        Y-> Predicción
        P(Y=y∣A=a) == P(Y=y∣A=b)
        """
        prob_a = (((df[(df[sensitive_col]==sensitive_values[0]) & 
                       (df[predict_col]==ground_truth)].shape[0])) / 
                  (df[df[sensitive_col]==sensitive_values[0]].shape[0]))
        prob_b = (((df[(df[sensitive_col]==sensitive_values[1]) & 
                       (df[predict_col]==ground_truth)].shape[0])) / 
                  (df[df[sensitive_col]==sensitive_values[1]].shape[0]))
        
        print('\tProb A = {}'.format(prob_a))
        print('\tProb B = {}'.format(prob_b))
        return abs(prob_a-prob_b)
    
    def fit_separation(self, df: pd.DataFrame, sensitive_col: str, target_col: str, predict_col: str,
                       ground_truth: str, sensitive_values: List[str]) -> float:
        """      
        A-> Variable sensible
        Y-> Predicción
        T-> Target
        P(Y=1∣T=1,A=a)=P(Y=1∣T=1,A=b)
        """
        prob_a = ((df[(df[sensitive_col]==sensitive_values[0]) &
                      (df[target_col]==ground_truth) &
                      (df[predict_col]==ground_truth)]).shape[0] / 
                  (df[(df[sensitive_col]==sensitive_values[0]) &
                      (df[target_col]==ground_truth)]).shape[0])
        prob_b = ((df[(df[sensitive_col]==sensitive_values[1]) &
                      (df[target_col]==ground_truth) &
                      (df[predict_col]==ground_truth)]).shape[0] / 
                  (df[(df[sensitive_col]==sensitive_values[1]) &
                      (df[target_col]==ground_truth)]).shape[0])
        
        print('\tProb A = {}'.format(prob_a))
        print('\tProb B = {}'.format(prob_b))
        return abs(prob_a-prob_b)
    
    def fit_sufficiency(self, df: pd.DataFrame, sensitive_col: str, target_col: str, predict_col: str,
                       ground_truth: str, sensitive_values: List[str]) -> float:
        """
        A-> Variable sensible
        Y-> Predicción
        T-> Target
        P(T=1∣Y=1,A=a)=P(T=1∣Y=1,A=b)
        """
        prob_a = ((df[(df[sensitive_col]==sensitive_values[0]) &
                      (df[target_col]==ground_truth) &
                      (df[predict_col]==ground_truth)]).shape[0] / 
                  (df[(df[sensitive_col]==sensitive_values[0]) &
                      (df[predict_col]==ground_truth)]).shape[0])
        prob_b = ((df[(df[sensitive_col]==sensitive_values[1]) &
                      (df[target_col]==ground_truth) &
                      (df[predict_col]==ground_truth)]).shape[0] / 
                  (df[(df[sensitive_col]==sensitive_values[1]) &
                      (df[predict_col]==ground_truth)]).shape[0])
        
        print('\tProb A = {}'.format(prob_a))
        print('\tProb B = {}'.format(prob_b))
        return abs(prob_a-prob_b)
    
    
    def score_weight(self, df: pd.DataFrame, sensitive_col: str, 
                     predict_col: str, ground_truth: str, groupby_cols: List[str]) -> float:
        """
        Función para calcular el porcentaje (peso) que supone el cálculo de algún criterio respecto se su
        predicción y variable sensible
        """
        dfp = df.groupby(groupby_cols)[sensitive_col].agg({'count' : 'count'}).reset_index()
        dfp['pct'] = dfp['count'] / dfp['count'].sum()
        return dfp[dfp[predict_col] == ground_truth]['pct'].iloc[0]
            

fairness = Fairness(fairness_params={})
dict_result = fairness.fit(df_dataset=df_dataset,
                           sensitive_cols=['Genero'],
                           target_col='y_true', 
                           predict_col='y_predict')

df_result = pd.DataFrame.from_dict(dict_result)
df_result
dict_result

#### Función de agregación del Score

In [None]:
import numpy as np

# Definimos el Dataset de ejemplo
df_score = pd.DataFrame(
    {
        'Ground_Truth': ['SI', 'SI', 'NO', 'NO'],
        'Sensitive_Feature': ['Genero', 'Genero', 'Genero', 'Genero'],
        'Sensitive_Value': ['Hombre', 'Mujer', 'Hombre', 'Mujer'],
        'Independence_Score_weight': [0.4,      0.1,      0.2,      0.3],
        'Independence_score':        [0.416667, 0.416667, 0.416667, 0.416667],
        'Separation_Score_weight':   [0.4,      0.1,      0.2,      0.3],
        'Separation_score':          [0.25,     0.25,     0.5,      0.5],
        'Sufficience_Score_weight':  [0.4,      0.2,      0.2,      0.2],
        'Sufficience_score':         [0.25,     0.25,     0.16,     0.16]}, 
    columns=['Ground_Truth', 'Sensitive_Value', 'Sensitive_Feature', 'Independence_Score_weight', 
             'Independence_score', 'Separation_Score_weight', 'Separation_score', 'Sufficience_Score_weight', 
             'Sufficience_score'])

def global_score(df: pd.DataFrame, sensitive_cols: List[str]) -> Tuple[float, float, float]:
    global_scores = list()
    for sensitive_value in sensitive_cols:
        print('Processing {} Feature'.format(sensitive_value))
        df_filter = df[df['Sensitive_Feature'] == sensitive_value]
        independence_score = np.average(a = df_filter['Independence_score'], weights = df_filter['Independence_Score_weight'])
        separation_score = np.average(a = df_filter['Separation_score'], weights = df_filter['Separation_Score_weight'])
        sufficience_score = np.average(a = df_filter['Sufficience_score'], weights = df_filter['Sufficience_Score_weight'])
        global_scores.append({'Sensitive Value': sensitive_value,
                              'independence score': independence_score,
                              'separation score': separation_score,
                              'sufficience score': sufficience_score})
    return global_scores

print(global_score(df=df_score, sensitive_cols=['Genero']))

df_score

In [None]:
global_score(df=df_result, sensitive_cols=['Genero'])

In [None]:
def get_fairness_score(score: float, fairness_score: Dict):
    # Ordenamos el diccionario con los valores de menor a mayor
    try:
        fairness_score = dict(sorted(fairness_score.items(), key=lambda item: item[1], reverse=False))
    except:
        print('Error con el diccionario de Fairness Scores')
    
    if fairness_score[(list(fairness_score.keys())[-1])] < score:
        print('ERROR: El score dado es superior al valor máximo. Se devuelve la categoría más alta por defecto')
        return (list(fairness_score.keys())[-1])
    else:   
        for k, v in fairness_score.items():
            if score < v:
                return k
                break
    
get_fairness_score(score=0.01, fairness_score=FAIRNESS_SCORE)

In [None]:
list(FAIRNESS_SCORE.keys())[-1]

<hr>

# Ejemplo de aplicación real - Clasificación Binaria



## 1.- Lectura del Dataset

In [None]:
import pandas as pd

# Leemos el dataset
df = pd.read_csv('../../datasets/bodyPerformance.csv')
print('Tamaño del dataset {}'.format(df.shape))
df.sample(3)

In [None]:
df['rango_edad'] = df['age'].apply(lambda x: 10 if x < 20 
                                   else (20 if (x >= 20 and x < 30) 
                                         else (30 if (x >= 30 and x < 40) 
                                               else (40 if (x >= 40 and x < 50) 
                                                     else (50 if (x >= 50 and x < 60) 
                                                           else 60)))))
df.sample(5)

### Distribución del target por género

In [None]:
dfp = df.groupby(['gender', 'class'])['age'].agg({'count' : 'count'}).reset_index()
dfp['perc'] = dfp.groupby('gender')['count'].apply(lambda x: x*100/x.sum())
dfp

### Para clasificación binaria me quedo con las clases B y D

In [None]:
# df = df[df['class'].isin(['B', 'D'])].rename({'class': 'y_true'}, axis=1)
df = df[df['class'].isin(['A', 'B', 'C' , 'D'])].rename({'class': 'y_true'}, axis=1)
print('Tamaño del dataset {}'.format(df.shape))
df.sample(5)

<hr>

## 2.- Modelo & Predicción

In [None]:
from sklearn.preprocessing import LabelEncoder

# Codificamos las variables discretas
lb_gen = LabelEncoder()
lb_y = LabelEncoder()
df['gender'] = lb_gen.fit_transform(df['gender'])
df['y_true'] = lb_y.fit_transform(df['y_true'])
df.sample(5)

In [None]:
from sklearn.linear_model import LogisticRegression

# Creamos y entrenamos el modelo
model = LogisticRegression()
model.fit(df[df.columns[:-2]], df[df.columns[-2]])
print('Accuracy: {}'.format(model.score(df[df.columns[:-2]], df[df.columns[-2]])))

In [None]:
# Calculamos las predicciones
df['y_predict'] = model.predict(df[df.columns[:-2]])
df.sample(5)

In [None]:
# Deshacemos en LabelEncoder de la variable género, target y predicción
df['gender'] = lb_gen.inverse_transform(df['gender'])
df['y_true'] = lb_y.inverse_transform(df['y_true'])
df['y_predict'] = lb_y.inverse_transform(df['y_predict'])
df.sample(5)

## 3.- Criterio de Independencia (Binario)

In [None]:
fairness = Fairness(fairness_params={})
# dict_result = fairness.fit_fairness(df_dataset=df, sensitive_cols=['gender'], target_col='y_true', predict_col='y_predict')
# dict_result = fairness.fit_fairness(df_dataset=df, sensitive_cols=['rango_edad'], target_col='y_true', predict_col='y_predict')
dict_result = fairness.fit_fairness(df_dataset=df, sensitive_cols=['gender', 'rango_edad'], target_col='y_true', predict_col='y_predict')
df_result = pd.DataFrame.from_dict(dict_result)
df_result

In [None]:
global_score(df=df_result, sensitive_cols=['gender', 'rango_edad'])

In [None]:
fairness.confusion_matrix