In [1]:
from concepts import Context as BaseContext
import pandas as pd

In [None]:
class Context(BaseContext):
    def __init__(self, objetos, atributos, bools, confianza=0.85, soporte=32/712):
        super().__init__(objetos, atributos, bools)
        self.confianza = confianza
        self.soporte = soporte
        self.base_stem = None
        self.base_luxenburger = None
        self.base_completa = None

    @classmethod
    def desde_dataframe(cls, df, confianza=0.85, soporte=32/712):
        # Eliminar la primera columna del dataframe df que corresponde al id
        df = df.iloc[:, 1:]
        objetos = list(df.index.astype(str))
        atributos = list(df.columns.astype(str))
        bools = [tuple(bool(x) for x in fila) for fila in df.values]
        return cls(objetos, atributos, bools, confianza, soporte)


    def set_confianza(self, confianza):
        """Establece el valor del hiperparámetro confianza."""
        self.confianza = confianza

    def get_confianza(self):
        """Obtiene el valor del hiperparámetro confianza."""
        return self.confianza

    def set_soporte(self, soporte):
        """Establece el valor del hiperparámetro soporte."""
        self.soporte = soporte

    def get_soporte(self):
        """Obtiene el valor del hiperparámetro soporte."""
        return self.soporte
    
    def cerrar(self, conj, impls):
        """Cierra un conjunto de atributos usando la base de implicaciones de entrada."""
        cerrado = set(conj)
        cambios = True
        while cambios:
            cambios = False
            for impl,_,_ in impls:
                if impl[0].issubset(cerrado) and not impl[1].issubset(cerrado):
                    cerrado.update(impl[1])
                    cambios = True
        return cerrado

    def next_closure(self, conjunto, conj_impls):
        """Calcula el siguiente cierre de un conjunto de atributos."""
        rev_set_prop=sorted(self.properties, reverse=True)
        for atributo in rev_set_prop:
            if atributo in conjunto:
                conjunto=conjunto - {atributo}
            else:
                sig_conj_cerrado=self.cerrar(conjunto | {atributo}, conj_impls)
                if all(x >= atributo for x in sig_conj_cerrado - conjunto):
                    return sig_conj_cerrado
        return None
    
    def doble_derivada(self, conjunto):
        """Calcula la doble derivada de un conjunto de atributos."""
        conjunto2 = set(conjunto)
        return set(self.intension(self.extension(conjunto2)))

    def calcular_base_stem(self):
        """Calcula la base Stem de un contexto como lista de tuplas."""
        conj_atrib=set()
        base=[]
        atributos=set(self.properties)
        while conj_atrib!=atributos and conj_atrib!= None:
            conj_atrib_pp=self.doble_derivada(conj_atrib)
            if conj_atrib != conj_atrib_pp:
                implicacion = (conj_atrib, conj_atrib_pp - conj_atrib)
                base.append((implicacion, 1., self.soporte_implicacion(implicacion)))
            conj_atrib = self.next_closure(conj_atrib, base)
        self.base_stem = base
        return base

    def es_cerrado(self, conjunto):
        """Determina si un conjunto de atributos es cerrado en un contexto formal."""
        return conjunto == set(self.intension(self.extension(conjunto)))

    def soporte_atributos(self, conjunto):
        """Calcula el soporte de un conjunto de atributos en un contexto formal."""
        numerador = len(self.extension(conjunto))  # |A'|
        denominador = len(self.objects)  # |O|
        return numerador / denominador

    def soporte_concepto(self, concepto):
        """Calcula el soporte de un concepto en un contexto formal."""
        extension, _ = concepto
        numerador = len(extension)
        denominador = len(self.objects)
        return numerador / denominador

    def sigma_frecuente(self, concepto, sigma):
        """Determina si un concepto es frecuente en un contexto formal."""
        return self.soporte_concepto(concepto) >= sigma

    def es_inmediatamente_inferior(self, Y1, Y2):
        """Determina, por medio de sus conjuntos de atributos, si un concepto es inmediatamente inferior a otro en un contexto formal."""
        if Y1 == Y2:  # Si los conjuntos de atributos son iguales, no son inmediatamente inferiores
            return False
        if not Y1.issubset(Y2):  # Si Y1 no es subconjunto de Y2, no son inmediatamente inferiores
            return False
        if not self.es_cerrado(Y1) or not self.es_cerrado(Y2):  # Si alguno de los conjuntos de atributos no es cerrado, no son inmediatamente inferiores
            return False
        extension1 = self.extension(Y1)
        extension2 = self.extension(Y2)
        vecinos = [(set(extension), set(intension)) for extension, intension in self.neighbors(extension2)]
        conceptoY1 = (set(extension1), Y1)
        if conceptoY1 in vecinos:  # Si el concepto (A', Y1) está en los vecinos de (A'', Y2), entonces Y1 es inmediatamente inferior a Y2
            return True
        else:
            return False

    def soporte_implicacion(self, implicacion):
        """Calcula el soporte de una implicación en un contexto formal."""
        antecedente, consecuente = implicacion
        numerador = len(self.extension(antecedente | consecuente))  # |(Y1 ∪ Y2)'|
        denominador = len(self.objects)  # |O|
        return numerador / denominador

    def confianza_implicacion(self, implicacion):
        """Calcula la confianza de una implicación en un contexto formal."""
        antecedente, consecuente = implicacion
        numerador = len(self.extension(antecedente | consecuente))  # |(Y1 ∪ Y2)'|
        denominador = len(self.extension(antecedente))  # |Y1'|
        if denominador == 0:
            return 1.0
        return numerador / denominador

    def calcular_base_luxenburger(self):
        """Calcula la base de Luxenburger de un contexto formal como lista de tuplas de conjuntos de atributos."""
        base_luxenburger = []
        visitados = set()
        reticulo = self.lattice
        n = len(self.objects)
        def aux_base_luxenburger(concepto_actual):
            """Auxiliar para recorrer cada nodo del reticulo del contexto formal de arriba a abajo."""
            if concepto_actual in visitados: # Si el concepto actual ya fue visitado, no hacer nada
                return
            visitados.add(concepto_actual)
            for vecino in concepto_actual.lower_neighbors: # Recorrer los vecinos inferiores del concepto actual
                intencion_vecino = set(vecino.intent)
                intencion_actual = set(concepto_actual.intent)
                implicacion = (intencion_actual, intencion_vecino - intencion_actual) # Representa la implicacion Act -> Vec\Act
                num_objetos_actual = len(concepto_actual.extent)
                if num_objetos_actual != 0:
                    conf_Implicacion = len(vecino.extent)/num_objetos_actual
                else:
                    conf_Implicacion = 0.
                sop_Implicacion = len(vecino.extent)/n
                if sop_Implicacion >= self.soporte and conf_Implicacion >= self.confianza: # Si la implicacion es frecuente, agregarla a la base de Luxenburger
                    base_luxenburger.append((implicacion, conf_Implicacion, sop_Implicacion))
                elif sop_Implicacion < self.soporte: # Realizar la poda si el soporte es menor al umbral
                    return
                aux_base_luxenburger(vecino)
        aux_base_luxenburger(reticulo[len(reticulo)-1]) # Comenzar desde el concepto superior del reticulo
        self.base_luxenburger = base_luxenburger
        return base_luxenburger
    
    def calcular_base_completa(self):
        """Calcula la base completa como la union de la base Stem y de Luxenburger."""
        if self.base_stem is None:
            raise Exception("Primero debe calcular la base Stem.")
        if self.base_luxenburger is None:
            raise Exception("Primero debe calcular la base de Luxenburger.")

        base_completa = self.base_stem.copy()
        base_completa.extend(self.base_luxenburger)
        self.base_completa = base_completa
        return base_completa
    
    def cerrar_conjunto(self, conjunto):
        """Dado un conjunto de atributos y la base de implicaciones, devuelve el conjunto cerrado."""
        if self.base_completa is None:
            raise Exception("Primero debe calcular la base completa.")
        cerrado = set(conjunto)
        implicaciones = []
        
        for implicacion,_,_ in self.base_completa:
            if implicacion[0].issubset(cerrado):
                cerrado.update(implicacion[1])
                implicaciones.append(implicacion)
        return cerrado, implicaciones
    
    def entrenar(self):
        """Calcula todas las bases de un contexto formal."""
        self.calcular_base_stem()
        self.calcular_base_luxenburger()
        self.calcular_base_completa()
    
    def predecir(self, datos):
        """Predice las etiquetas de un dataframe de Xs y vacios aplicando el metodo XAI de la base completa."""
        if self.base_completa is None:
            raise Exception("Primero debe calcular la base completa.")
        etiquetas = []
        datos.drop(columns=["Survived"], inplace=True, errors='ignore')
        for i in range(len(datos)):
            instancia = datos.iloc[i]
            conjunto = set(datos.columns[1:][instancia.iloc[1:] == "X"])
            cerrado, implicaciones = self.cerrar_conjunto(conjunto)
            ''' ESTO SE DESCOMENTA Y SE COMENTA EL IF DE ABAJO PARA PROBARLO SIN EL ATRIBUTO NOTSURVIVED
            if "Survived" in cerrado:
                etiquetas.append("X")
            else:
                etiquetas.append("")
            '''
            
            if "Survived" in cerrado and "NotSurvived" in cerrado:
                maximo = max(implicaciones, key=lambda x: x[1])
                if "Survived" in maximo[1] and "NotSurvived" not in maximo[1]:
                    etiquetas.append("X")
                elif "NotSurvived" in maximo[1] and "Survived" not in maximo[1]:
                    etiquetas.append("")
                else:
                    etiquetas.append("X")
            elif "Survived" in cerrado:
                etiquetas.append("X")
            else:
                etiquetas.append("")
            
        return pd.Series(etiquetas, index=datos.index, name="Survived")
    
    def precision(self, datos, etiquetas):
        """Calcula la precisión de un dataframe de Xs y vacios con sus etiquetas aplicando el metodo XAI de la base completa."""
        if self.base_completa is None:
            raise Exception("Primero debe calcular la base de Luxenburger.")
        predicciones = self.predecir(datos)
        coincidencias = predicciones == etiquetas
        return coincidencias.mean()
    
    def falsos_positivos(self, datos, etiquetas):
        """Calcula la cantidad de falsos positivos de un dataframe de datos con sus etiquetas aplicando el metodo XAI de la base completa."""
        if self.base_completa is None:
            raise Exception("Primero debe calcular la base completa.")
        predicciones = self.predecir(datos)
        falsos_positivos = ((predicciones == "X") & (etiquetas == "")).sum()
        return falsos_positivos
    
    def falsos_negativos(self, datos, etiquetas):
        """Calcula la cantidad de falsos negativos de un dataframe de datos con sus etiquetas aplicando el metodo XAI de la base completa."""
        if self.base_completa is None:
            raise Exception("Primero debe calcular la base completa.")
        predicciones = self.predecir(datos)
        falsos_negativos = ((predicciones == "") & (etiquetas == "X")).sum()
        return falsos_negativos
    
    def calcula_resultados(self, datos, etiquetas):
        """Calcula la precisión, falsos positivos y falsos negativos de un dataframe de datos con sus etiquetas aplicando el metodo XAI de la base completa."""
        if self.base_completa is None:
            raise Exception("Primero debe calcular la base completa.")
        predicciones = self.predecir(datos)
        coincidencias = predicciones == etiquetas
        falsos_positivos = ((predicciones == "X") & (etiquetas == "")).sum()
        falsos_negativos = ((predicciones == "") & (etiquetas == "X")).sum()
        return coincidencias.mean(), falsos_positivos, falsos_negativos

In [3]:
# Cargar el dataset del Titanic
data = pd.read_csv("train.csv")
print(data.head())

   PassengerId  Survived  Pclass  \
0            1         0       3   
1            2         1       1   
2            3         1       3   
3            4         1       1   
4            5         0       3   

                                                Name     Sex   Age  SibSp  \
0                            Braund, Mr. Owen Harris    male  22.0      1   
1  Cumings, Mrs. John Bradley (Florence Briggs Th...  female  38.0      1   
2                             Heikkinen, Miss. Laina  female  26.0      0   
3       Futrelle, Mrs. Jacques Heath (Lily May Peel)  female  35.0      1   
4                           Allen, Mr. William Henry    male  35.0      0   

   Parch            Ticket     Fare Cabin Embarked  
0      0         A/5 21171   7.2500   NaN        S  
1      0          PC 17599  71.2833   C85        C  
2      0  STON/O2. 3101282   7.9250   NaN        S  
3      0            113803  53.1000  C123        S  
4      0            373450   8.0500   NaN        S  


In [None]:
# Reemplazar valores nulos con el valor más frecuente
data.loc[:, 'Sex'] = data['Sex'].fillna(data['Sex'].mode()[0])
data.loc[:, 'Age'] = data['Age'].fillna(data['Age'].median())
data.loc[:, 'Embarked'] = data['Embarked'].fillna(data['Embarked'].mode()[0])
data.loc[:, 'Pclass'] = data['Pclass'].fillna(data['Pclass'].mode()[0])
data.loc[:, 'Fare'] = data['Fare'].fillna(data['Fare'].mean())
data.loc[:, 'SibSp'] = data['SibSp'].fillna(data['SibSp'].mode()[0])
data.loc[:, 'Parch'] = data['Parch'].fillna(data['Parch'].mode()[0])

separacion_age = 4
separacion_fare = 8    

# Definir intervalos y etiquetas de edad
age_bins = [i/separacion_age*81 for i in range(separacion_age+1)]
age_labels = [f"{int(age_bins[i])}-{int(age_bins[i+1])}" for i in range(len(age_bins)-1)]
# Crear una nueva columna con categorías de edad
data["AgeGroup"] = pd.cut(data["Age"], bins=age_bins, labels=age_labels, right=False)

# Definir intervalos y etiquetas de tarifa
fare_bins = [i/separacion_fare*513 for i in range(separacion_fare+1)]
fare_labels = [f"{int(fare_bins[i])}-{int(fare_bins[i+1])}" for i in range(len(fare_bins)-1)]

# Crear una nueva columna con categorías de tarifa
data["FareGroup"] = pd.cut(data["Fare"], bins=fare_bins, labels=fare_labels, right=False)

# Definir intervalos y etiquetas de SibSp
sibsp_bins = [0, 1, 3, 5, 7, float('inf')]
sibsp_labels = ["0-1", "1-3", "3-5", "5-7", "7+"]

# Crear una nueva columna con categorías de SibSp
data["SibSpGroup"] = pd.cut(data["SibSp"], bins=sibsp_bins, labels=sibsp_labels, right=False)

# Definir intervalos y etiquetas de Parch
parch_bins = [0, 1, 3, 5,float('inf')]
parch_labels = ["0-1", "1-3", "3-5", "5+"]

# Crear una nueva columna con categorías de Parch
data["ParchGroup"] = pd.cut(data["Parch"], bins=parch_bins, labels=parch_labels, right=False)

# Convertir a codificación one-hot (columnas 1/0)
data = pd.get_dummies(data, columns=["FareGroup", "AgeGroup", "Embarked", "Pclass", "SibSpGroup", "ParchGroup"], dtype=int, drop_first=False)

# Convertir columnas a 1/0
data['Male'] = data['Sex'].map({'male': 1, 'female': 0})
data['Female'] = data['Sex'].map({'male': 0, 'female': 1})

# Añadir una columna que sea la contraria de la columna 'Survived'
'''ESTO SE COMENTARIA SI NO SE VA A USAR EL ATRIBUTO NOTSURVIVED'''
data['NotSurvived'] = data['Survived'].map({0: 1, 1: 0})

# Eliminar las columnas inútiles
data.drop(['Age', 'Sex', 'Fare', 'Name', 'Ticket', 'Cabin', 'SibSp', 'Parch'], axis=1, inplace=True)

# Convertir selectivamente columnas a tipo string antes de reemplazar 1 con 'X'
columns_to_convert = data.columns[1:]
data[columns_to_convert] = data[columns_to_convert].astype(str).map(lambda x: 'X' if x == '1' else '')

# Mostrar los nombres de las columnas
#print("Nombres de las columnas:", data.columns)

# Guardar el resultado final en un archivo CSV
data.to_csv("trainDummy.csv", index=False)

In [5]:
test_sub = pd.read_csv("test.csv", index_col="PassengerId")
print(test_sub.head())

             Pclass                                          Name     Sex  \
PassengerId                                                                 
892               3                              Kelly, Mr. James    male   
893               3              Wilkes, Mrs. James (Ellen Needs)  female   
894               2                     Myles, Mr. Thomas Francis    male   
895               3                              Wirz, Mr. Albert    male   
896               3  Hirvonen, Mrs. Alexander (Helga E Lindqvist)  female   

              Age  SibSp  Parch   Ticket     Fare Cabin Embarked  
PassengerId                                                       
892          34.5      0      0   330911   7.8292   NaN        Q  
893          47.0      1      0   363272   7.0000   NaN        S  
894          62.0      0      0   240276   9.6875   NaN        Q  
895          27.0      0      0   315154   8.6625   NaN        S  
896          22.0      1      1  3101298  12.2875   NaN   

In [6]:
# Reemplazar valores nulos con el valor más frecuente
test_sub.loc[:, 'Sex'] = test_sub['Sex'].fillna(test_sub['Sex'].mode()[0])
test_sub.loc[:, 'Age'] = test_sub['Age'].fillna(test_sub['Age'].median())
test_sub.loc[:, 'Embarked'] = test_sub['Embarked'].fillna(test_sub['Embarked'].mode()[0])
test_sub.loc[:, 'Pclass'] = test_sub['Pclass'].fillna(test_sub['Pclass'].mode()[0])
test_sub.loc[:, 'Fare'] = test_sub['Fare'].fillna(test_sub['Fare'].mean())
test_sub.loc[:, 'SibSp'] = test_sub['SibSp'].fillna(test_sub['SibSp'].mode()[0])
test_sub.loc[:, 'Parch'] = test_sub['Parch'].fillna(test_sub['Parch'].mode()[0])

separacion_age = 4
separacion_fare = 8    

# Definir intervalos y etiquetas de edad
age_bins = [i/separacion_age*81 for i in range(separacion_age+1)]
age_labels = [f"{int(age_bins[i])}-{int(age_bins[i+1])}" for i in range(len(age_bins)-1)]
# Crear una nueva columna con categorías de edad
test_sub["AgeGroup"] = pd.cut(test_sub["Age"], bins=age_bins, labels=age_labels, right=False)

# Definir intervalos y etiquetas de tarifa
fare_bins = [i/separacion_fare*513 for i in range(separacion_fare+1)]
fare_labels = [f"{int(fare_bins[i])}-{int(fare_bins[i+1])}" for i in range(len(fare_bins)-1)]

# Crear una nueva columna con categorías de tarifa
test_sub["FareGroup"] = pd.cut(test_sub["Fare"], bins=fare_bins, labels=fare_labels, right=False)

# Definir intervalos y etiquetas de SibSp
sibsp_bins = [0, 1, 3, 5, 7, float('inf')]
sibsp_labels = ["0-1", "1-3", "3-5", "5-7", "7+"]

# Crear una nueva columna con categorías de SibSp
test_sub["SibSpGroup"] = pd.cut(test_sub["SibSp"], bins=sibsp_bins, labels=sibsp_labels, right=False)

# Definir intervalos y etiquetas de Parch
parch_bins = [0, 1, 3, 5,float('inf')]
parch_labels = ["0-1", "1-3", "3-5", "5+"]

# Crear una nueva columna con categorías de Parch
test_sub["ParchGroup"] = pd.cut(test_sub["Parch"], bins=parch_bins, labels=parch_labels, right=False)

# Convertir a codificación one-hot (columnas 1/0)
test_sub = pd.get_dummies(test_sub, columns=["FareGroup", "AgeGroup", "Embarked", "Pclass", "SibSpGroup", "ParchGroup"], dtype=int, drop_first=False)

# Convertir columnas a 1/0
test_sub['Male'] = test_sub['Sex'].map({'male': 1, 'female': 0})
test_sub['Female'] = test_sub['Sex'].map({'male': 0, 'female': 1})

# Añadir una columna que sea la contraria de la columna 'Survived'
#data['NotSurvived'] = data['Survived'].map({0: 1, 1: 0})

# Eliminar las columnas inútiles
test_sub.drop(['Age', 'Sex', 'Fare', 'Name', 'Ticket', 'Cabin', 'SibSp', 'Parch'], axis=1, inplace=True)

# Convertir selectivamente columnas a tipo string antes de reemplazar 1 con 'X'
columns_to_convert = test_sub.columns[1:]
test_sub[columns_to_convert] = test_sub[columns_to_convert].astype(str).map(lambda x: 'X' if x == '1' else '')

# Mostrar los nombres de las columnas
#print("Nombres de las columnas:", data.columns)

# Guardar el resultado final en un archivo CSV
test_sub.to_csv("testDummy.csv", index=False)

In [None]:
context3=Context.fromfile('trainDummy.csv', frmat='csv')
context3.set_confianza(0.80)
context3.set_soporte(16/712)
context3.calcular_base_stem()
context3.calcular_base_luxenburger()
context3.calcular_base_completa()
# Aplicamos filtrado del capitulo 5 y además hacemos las modificaciones tras añadir el atributo objetivo negado
context3.base_completa = [i for i in context3.base_completa if i[2] >= 16/712 and 'Survived' not in i[0][0] and 'NotSurvived' not in i[0][0] and ('Survived' in i[0][1] or 'NotSurvived' in i[0][1])]
'''LO ANTERIOR SE COMENTA Y LO SIGUIENTE SE DESCOMENTA PARA TRABAJAR CON EL ATRIBUTO NOTSURVIVED'''
#context3.base_completa = [i for i in context3.base_completa if i[2] >= 16/712 and 'Survived' not in i[0][0] and 'Survived' in i[0][1]]

In [8]:
predicciones = context3.predecir(test_sub)
predicciones = predicciones.apply(lambda x: 1 if str(x).strip().upper() == "X" else 0)
predicciones.to_csv('submission.csv')