In [None]:
!pip install fcapy[all]
!pip install frozendict
!pip install ipynb
!pip install sparselinear
!pip install bitsets
!pip install bitarray
import torch
!pip install torch-scatter -f https://data.pyg.org/whl/torch-2.0.0+cuda118.html
!pip install torch-sparse -f https://data.pyg.org/whl/torch-2.0.0+cuda118.html
!pip install torch-cluster -f https://data.pyg.org/whl/torch-2.0.0+cuda118.html
!pip install git+https://github.com/pyg-team/pytorch_geometric.git

In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, precision_score, jaccard_score, recall_score, accuracy_score, classification_report

from fcapy.context import FormalContext
from fcapy.lattice import ConceptLattice

from fcapy.visualizer import LineVizNx
import matplotlib.pyplot as plt

plt.rcParams['figure.facecolor'] = (1,1,1,1)

import neural_lib as nl

from fcapy.utils.utils import powerset

from fcapy import LIB_INSTALLED
if LIB_INSTALLED['numpy']:
    import numpy as np

from sparselinear import SparseLinear

In [11]:
# SIMILARITY SMC

def sim_s(b1,b2, G, M):
  un_b1b2 = b1.union(b2) # union of b1 and b2
  int_b1b2 = b1.intersection(b2) # intersection of b1 and b2
  len_un_b1b2 = len(un_b1b2) # length of union of b1 and b2
  len_int_b1b2 = len(int_b1b2) # length of intersection of b1 and b2
  len_M = len(M) # length of M
  len_difM_un = len(M.difference(un_b1b2)) # |M- B1 un B2|
  return (len_int_b1b2 + len_difM_un)/len_M

# SIMILARITY J

def sim_J(b1,b2, G, M):
  un_b1b2 = b1.union(b2) # union of b1 and b2
  int_b1b2 = b1.intersection(b2) # intersection of b1 and b2
  len_un_b1b2 = len(un_b1b2) # length of union of b1 and b2
  len_int_b1b2 = len(int_b1b2) # length of intersection of b1 and b2
  return len_int_b1b2/len_un_b1b2

# COHERENCE ES

def coh_es(A,B,G,M,K):
  sum_x1x2 = 0
  len_A = len(A)
  delit = len_A*(len_A-1)/2
  if delit == 0:
    return 0
  for x1 in A:
    for x2 in A:
      if x1!=x2:
        sum_x1x2+=sim_s(set(K.intention([x1])),set(K.intention([x2])), G, M) # upper sum
  return  0.5*sum_x1x2/delit

# COHERENCE EJ

def coh_eJ(A,B,G,M,K):
  sum_x1x2 = 0
  len_A = len(A)
  delit = len_A*(len_A-1)/2
  if delit == 0:
    return 0
  for x1 in A:
    for x2 in A:
      if x1!= x2:
        sum_x1x2+=sim_J(set(K.intention([x1])),set(K.intention([x2])), G, M) # upper sum
  return  0.5*sum_x1x2/delit

# 2EES

def alpha2_ees(i, conc, G, M, K, L):
  sum_coh_c = 0
  count = 0
  A = set(conc.extent)
  B = set(conc.intent)
  if coh_es(A,B,G,M,K) == 0:
    return 0
  a = set()
  b = set()
  UN = len(L.parents(i))
  if UN == 0:
    return 0
  for k in L.parents(i):
    c = L[k]
    a = set(c.extent)
    b = set(c.intent)
    if coh_es(a,b, G, M, K)>coh_es(A,B, G, M, K):
      pass
    else:
      count+=1
      sum_coh_c+=coh_es(a,b, G, M, K)/coh_es(A,B,G,M,K)
  if count == 0:
    return 0
  return 1-sum_coh_c/count

# 2EEJ

def alpha2_eeJ (i, conc, G, M, K, L):
  sum_coh_c = 0
  count = 0
  A = set(conc.extent)
  B = set(conc.intent)
  if coh_eJ(A,B,G,M,K) == 0:
    return 0
  a = set()
  b = set()
  UN = len(L.parents(i))
  if UN == 0:
    return 0
  for k in L.parents(i):
    c = L[k]
    a = set(c.extent)
    b = set(c.intent)
    if coh_eJ(a,b, G, M, K)>coh_eJ(A,B, G, M, K):
      pass
    else:
      count+=1
      sum_coh_c+=coh_eJ(a,b, G, M, K)/coh_eJ(A, B, G, M, K)
  if count == 0:
    return 0
  return 1-sum_coh_c/count

# 3EES
def alpha3_ees(i, conc, G, M, K, L):
  sum_coh_c = 0
  count = 0
  A = set(conc.extent)
  B = set(conc.intent)
  a = set()
  b = set()
  LN = len(L.children(i))
  if LN == 0:
    return 0
  for k in L.children(i):
    c = L[k]
    a = set(c.extent)
    b = set(c.intent)
    if coh_es(a,b, G, M, K)<coh_es(A,B, G, M, K):
      pass
    else:
      if coh_es(a,b, G, M, K) == 0:
        pass
      else:
        count+=1
        sum_coh_c+=coh_es(A,B,G,M, K)/coh_es(a,b, G, M, K)
  if count == 0:
    return 0
  return sum_coh_c/count

# 3EEJ

def alpha3_eeJ (i, conc, G, M, K, L):
  sum_coh_c = 0
  count = 0
  A = set(conc.extent)
  B = set(conc.intent)
  a = set()
  b = set()
  LN = len(L.children(i))
  if LN == 0:
    return 0
  for k in L.children(i):
    c = L[k]
    a = set(c.extent)
    b = set(c.intent)
    if coh_eJ(a,b, G, M, K)<coh_eJ(A,B, G, M, K):
      pass
    else:
      if coh_eJ(a,b, G, M, K) == 0:
        pass
      else:
        count+=1
        sum_coh_c+=coh_eJ(A,B,G,M, K)/coh_eJ(a,b, G, M, K)
  if count == 0:
    return 0
  return sum_coh_c/count

# BASIC LEVEL

# BL ees

def BL_ees (i, conc, G, M,K, L):
  return coh_es(set(conc.extent), set(conc.intent),G,M, K)*alpha2_ees(i, conc, G, M, K, L)*alpha3_ees(i, conc, G, M, K, L)

# BL eeJ

def BL_eeJ (i, conc, G, M,K, L):
  return coh_eJ(set(conc.extent), set(conc.intent),G,M, K)*alpha2_eeJ(i, conc, G, M, K, L)*alpha3_eeJ(i, conc, G, M, K, L)

# DELTA_STABILITY

import math

def log_stability_lbound(c_i, lattice: ConceptLattice, n_bin_attrs: int) -> float:
    extent_i = set(lattice[c_i].extent_i)
    children_i = lattice.children(c_i)
    if children_i:
        bound = min(len(extent_i - set(lattice[child_i].extent_i)) for child_i in children_i)
    else:
        bound = math.inf
    bound -= math.log2(n_bin_attrs)
    return bound

def delta_stability(c_i, lattice: ConceptLattice, n_bin_attrs: int) -> float:
    return log_stability_lbound(c_i, lattice, n_bin_attrs)+math.log2(n_bin_attrs)


# TARGET ENTROPY (taken from FCApy)

def target_entropy(c_i, lattice: ConceptLattice, context: FormalContext):
    """Compute the entropy of target labels of objects from concept extent"""
    target_ext = context.target[list(lattice[c_i].extent_i)]
    return np.var(target_ext)


# DATASET

In [10]:
df= pd.read_csv('/content/heart_disease_bin_prep.csv')

In [None]:
# This cell contains code for iris dataset
df_unp= pd.read_csv('/content/iris_dataset.csv')
df = pd.DataFrame()
df['sepalLength1'] = df_unp['sepalLength'] == 1
df['sepalLength2'] = df_unp['sepalLength'] == 2
df['sepalLength3'] = df_unp['sepalLength'] == 3
df['sepalLength4'] = df_unp['sepalLength'] == 4
df['sepalLength5'] = df_unp['sepalLength'] == 5
df['sepalWidth6'] = df_unp['sepalWidth'] == 6
df['sepalWidth7'] = df_unp['sepalWidth'] ==7
df['sepalWidth8'] = df_unp['sepalWidth'] == 8
df['sepalWidth9'] = df_unp['sepalWidth'] ==9
df['sepalWidth10'] = df_unp['sepalWidth'] == 10
df['petalLength11'] = df_unp['petalLength'] == 11
df['petalLength12'] = df_unp['petalLength'] == 12
df['petalLength13'] = df_unp['petalLength'] == 13
df['petalWidth14'] = df_unp['petalLength'] == 14
df['petalWidth15'] = df_unp['petalWidth'] == 15
df['petalWidth16'] = df_unp['petalWidth'] == 16
df['class'] = df_unp['class'] -17

In [9]:
df['sample_id'] = np.arange(0, df.shape[0], 1).astype(str)
df['id'] = df['sample_id'].apply(lambda x: 'h' + x)
df.drop(columns=['sample_id'], inplace=True)
df = df.set_index('id')

In [8]:
y_feat = 'target' # y_feat = 'class' for iris dataset
df_train, df_test = train_test_split(df, train_size=0.7, random_state=0)

x_train, y_train = df_train.drop(y_feat, axis=1), df_train[y_feat]
x_test, y_test = df_test.drop(y_feat, axis=1), df_test[y_feat]

In [7]:
# Creating Formal Context
%%time
K = FormalContext(data = x_train.values, target = y_train.values, attribute_names=x_train.columns)
K

In [6]:
# Computing Formal Concepts
%%time

L= ConceptLattice.from_context(K, algo='Sofia', is_monotone=True)
len(L)

# MEASURES

In [None]:
G = set(df.index) # set of objects
M =  set(df.columns) # set of attributes

In [None]:
# calculating indexes for concepts
for i in range(len(L)):
  L[i].measures[ 'BL_ees'] =  BL_ees(i, L[i], G, M, K, L)
  L[i].measures[ 'BL_eeJ'] =  BL_eeJ(i, L[i], G, M, K, L)
  L[i].measures[ 'target_entropy'] =  target_entropy(i, L, K)
  L[i].measures[ 'delta_stability'] = delta_stability(i, L, len(M))

In [5]:
# This cell contains code for computing lift interest indexes
b_i = ''
B = []
lifts = []
lift_i = 0
Pr_B = 1
Mult_Bi = 1
pair = []
for i in range(len(L)):
  for attr in L[i].intent:
     B.append(attr)
  if len(K.extension(B)) == 0:
    L[i].measures[ 'lift'] = 0
  else:
    Pr_B = len(K.extension(B))/len(G)
    for b_i in B:
      Mult_Bi*=len(K.extension([b_i]))/len(G)
    lift_i = Mult_Bi/Pr_B
    pair.append(i)
    pair.append(lift_i)
    lifts.append(pair)
    L[i].measures[ 'lift'] = lift_i
    b_i =''
    B = []
    lift_i = 0
    Pr_B = 0
    Mult_Bi = 1
    pair = []
print(lifts)

In [None]:
# finding best concept using indexes sort

In [None]:
n_concepts_s =7
best_concepts_s = list(L.measures['BL_ees'].argsort()[::-1][:n_concepts_s])
assert len({g_i for c in L[best_concepts_s] for g_i in c.extent_i})==K.n_objects, "Selected concepts do not cover all train objects"

In [None]:
n_concepts_J =7
best_concepts_J  = []
best_concepts_J = list(L.measures['BL_eeJ'].argsort()[::-1][:n_concepts_J])
assert len({g_i for c in L[best_concepts_J] for g_i in c.extent_i})==K.n_objects, "Selected concepts do not cover all train objects"

In [None]:
n_concepts_t =20
best_concepts_t  = []
best_concepts_t = list(L.measures['target_entropy'].argsort()[::-1][:n_concepts_t])
assert len({g_i for c in L[best_concepts_t] for g_i in c.extent_i})==K.n_objects, "Selected concepts do not cover all train objects"

In [None]:
n_concepts_d =7
best_concepts_d = []
best_concepts_d = list(L.measures['delta_stability'].argsort()[::-1][:n_concepts_d])
assert len({g_i for c in L[best_concepts_d] for g_i in c.extent_i})==K.n_objects, "Selected concepts do not cover all train objects"

In [None]:
n_concepts_l =7
best_concepts_l = []
best_concepts_l = list(L.measures['lift'].argsort()[::-1][:n_concepts_l])
assert len({g_i for c in L[best_concepts_l] for g_i in c.extent_i})==K.n_objects, "Selected concepts do not cover all train objects"

In [None]:
cn = 0

In [None]:
# creating nn from concept lattice using best concepts
cn = nl.ConceptNetwork.from_lattice(L, best_concepts_J, sorted(set(y_train)))

In [None]:
cn.fit(x_train, y_train,  n_epochs =2000) # nn fitting

In [1]:
# nn prediction on test data
y_pred = cn.predict(x_test).numpy()
print('Class prediction', y_pred[:10])
y_proba = cn.predict_proba(x_test).detach().numpy()
print('Class prediction with probabilities', y_proba[:10])
print('True class', y_test.values[:10])

In [2]:
# metrics evaluation on test data
print('Recall score:', recall_score(y_test.values.astype('int'), y_pred))
print('F1     score:', f1_score(y_test.values.astype('int'), y_pred))
print('Accuracy score:', accuracy_score(y_test.values.astype('int'), y_pred))

In [3]:
# prediction on train data (double check)
y_pred = cn.predict(x_train).numpy()
print('Class prediction', y_pred[:10])
y_proba = cn.predict_proba(x_train).detach().numpy()
print('Class prediction with probabilities', y_proba[:10])
print('True class', y_train.values[:10])

In [4]:
# metrics evaluation on train data (double check)
print('Recall score:', recall_score(y_train.values.astype('int'), y_pred, average='micro'))
print('F1     score:', f1_score(y_train.values.astype('int'), y_pred, average='micro'))
print('Accuracy score:', accuracy_score(y_train.values.astype('int'), y_pred))