In [1]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
import random
from collections import Counter
from tqdm import tqdm
from sklearn import svm
from numba import njit
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier, GradientBoostingClassifier
from xgboost import XGBClassifier
from sklearn.neural_network import MLPClassifier

In [2]:
dfSM=pd.read_excel('Actual Data_BalanceData_SMOTE.xlsx')

In [3]:
X=dfSM.drop('Target', axis=1)
y=dfSM['Target']
from sklearn import preprocessing
Xtrain, Xtest, ytrain, ytest = train_test_split(X, y, test_size=0.25,
                                                    random_state=100)

In [4]:
clf = GradientBoostingClassifier()
clf.fit(Xtrain, ytrain)
clf.score(Xtest, ytest)

0.7241379310344828

In [5]:
from sklearn.metrics import confusion_matrix, accuracy_score
def update_eta(Xtrain, Xtest, ytrain, ytest, tabu):
    """更新启发式信息, eta = TPR / d
    Returns:
        eta: 从tabu最后一个节点出发到其余节点的启发式信息
    """
    n_dims = Xtrain.shape[1]
    eta = np.zeros(n_dims)
    flist = list(set(range(n_dims)) - set(tabu)) 

    for i in flist:
        clf = GradientBoostingClassifier()
        clf.fit(Xtrain.iloc[:, tabu+[i]], ytrain)
        pred = clf.predict(Xtest.iloc[:, tabu+[i]])
        cf_matrix = confusion_matrix(ytest, pred)
        FN, TP = cf_matrix[1][0], cf_matrix[1][1]
        eta[i] = TP / (TP + FN) / (len(tabu)+1)

    return eta


In [6]:
def select_route(prob):
    """按路径转移概率选择下一个特征
    """
    cs = np.cumsum(prob)
    p = np.random.rand()
    for i in range(len(cs)):
        if cs[i] > p:
            break
    
    return i


In [7]:
def fitness_func(Xtrain, Xtest, ytrain, ytest, selected, omega=0.7):
    """适应度函数，评估特征子集好坏
    Returns:
        result: 适应度
    """
    clf = GradientBoostingClassifier()
    clf.fit(Xtrain.iloc[:, selected], ytrain)
    pred = clf.predict(Xtest.iloc[:, selected])

    cf_matrix = confusion_matrix(ytest, pred)
    TN, FP = cf_matrix[0][0], cf_matrix[0][1]
    FPR = FP / (TN + FP)
    f_result = omega*FPR + (1-omega)*(len(selected)/Xtrain.shape[1])
    acc = accuracy_score(ytest, pred)
    return f_result, acc


In [8]:
def fitness_func(Xtrain, Xtest, ytrain, ytest, selected, omega=0.8):
    """适应度函数，评估特征子集好坏
    Returns:
        result: 适应度
    """
    clf = GradientBoostingClassifier()
    clf.fit(Xtrain.iloc[:, selected], ytrain)
    pred = clf.predict(Xtest.iloc[:, selected])

    cf_matrix = confusion_matrix(ytest, pred)
    TN, FP = cf_matrix[0][0], cf_matrix[0][1]
    FPR = FP / (TN + FP)

    acc = accuracy_score(ytest, pred)
    f_result = omega*(1-acc) + (1-omega)*np.exp(len(selected)/Xtrain.shape[1]-2)
    # print(FPR, (1-omega)*np.exp(len(selected)/train_data.shape[1]-10), f_result)
    return f_result, acc

In [9]:
def init_eta(Xtrain, Xtest, ytrain, ytest):
    n_dims = Xtrain.shape[1]
    eta = np.zeros([n_dims, n_dims])
    for i in tqdm(range(n_dims)):
        for j in range(n_dims):
            clf = GradientBoostingClassifier()
            clf.fit(Xtrain.iloc[:, [i,j]], ytrain)
            pred = clf.predict(Xtest.iloc[:, [i,j]])
            cf_matrix = confusion_matrix(ytest, pred)
            FN, TP = cf_matrix[1][0], cf_matrix[1][1]
            eta[i][j] = TP / (TP + FN) /2
    
    return eta

In [10]:
n_epochs = 100
n_dims = Xtrain.shape[1]
n_ants = 5
print("the Num of the total features:", n_dims)

alpha = 1
beta = 0.2
omega = 0.8
rho = 0.3
mu = 0.7
gamma = 0.7

tau = np.ones([n_dims, n_dims]) # tau 即信息素矩阵
eta = init_eta(Xtrain, Xtest, ytrain, ytest) # eta 即启发式信息
print("init finishing")
feat_list = list(range(n_dims)) # feature 总list
best_score = np.inf
score_list = []
acc_list = []

# p_matrix = np.zeros((n_epochs, n_dims, n_dims))
path_matrix = np.zeros((n_epochs, n_ants, n_dims))
nf_matrix = np.zeros((n_epochs, n_ants))
tau_matrix = np.zeros((n_epochs, n_dims, n_dims))
best_matrix = np.zeros((n_epochs,))

for epoch in range(n_epochs):

    # ============ apply once fs ============
    ant_path = np.zeros([n_ants, n_dims])           # 初始化每只蚂蚁在当前迭代中的路径
    ant_acc = np.zeros(n_ants)
    ant_score = np.zeros(n_ants)
    n_feats = np.random.randint(1, n_dims, size=n_ants) # 初始化每只蚂蚁需要选择的特征数
    # n_feats = 8 * np.ones([n_ants], dtype=np.int64)

    for i in range(n_ants):

        ant_path[i, 0] = np.random.randint(n_dims)  # 为每只蚂蚁选择起始节点（特征）
        # ant_path[i, 0] = i
        visited = []                                # 已选择的 feature list
        
        for d in range(n_feats[i]-1):               # 共选择 n_feats-1 次特征
            visited.append(ant_path[i, d])          # 更新 selected 表
            # eta = update_eta(train_data, train_label, test_data, test_label, visited)
                                                    # 更新启发式信息, eta = TPR / d, array(n_dims,)
            p = (tau[int(visited[-1])] ** alpha) * (eta[int(visited[-1])] ** beta)
            p[[int(i) for i in visited]] = 0
            prob = p / sum(p)                       # 计算路径转移矩阵
            route = select_route(prob)              # 寻找下一个特征
            ant_path[i, d+1] = route

    path_matrix[epoch] = ant_path.copy()
    nf_matrix[epoch] = n_feats.copy()

    # ==== evaluate each selected subset ====
    for j in range(n_ants):
        selected = list(ant_path[j, :n_feats[j]])
        f, acc = fitness_func(Xtrain, Xtest, ytrain, ytest, selected, omega)
                                                    # 计算适应度函数
        ant_score[j] = f
        ant_acc[j] = acc
        if f <= best_score:                          # 保存为全局的最优解
            best_path = ant_path[j]
            best_score = f
            best_path_acc = acc
    
    best_ant = np.argmin(ant_score)                 # 最优蚂蚁
    best_matrix[epoch] = best_ant
    near_ant = np.argmin(np.concatenate([ant_score[:best_ant], [0], ant_score[best_ant+1:]]))
                                                    # 第二优蚂蚁
    print("Epoch {} Best Score: {}, the Accuracy: {}, Num of Features: {}".format(\
        epoch, ant_score[best_ant], ant_acc[best_ant], n_feats[best_ant]))
    
    score_list.append(ant_score[best_ant])
    acc_list.append(ant_acc[best_ant])

    # ======== update the eta matrix ========
    
    # stage 1 updating
    deta_tau_k = np.zeros([n_ants, n_dims, n_dims])
    for k in range(n_ants):
        value = mu * ant_acc[k] + (1-mu) / n_feats[k] # 更新值
        for m in range(n_feats[k]-1):
            a, b = int(ant_path[k, m]), int(ant_path[k, m+1])
            deta_tau_k[int(k), a, b] = value

    deta_tau_1 = np.sum(deta_tau_k, 0)

    # stage 2 updating
    deta_tau_2 = np.zeros([n_dims, n_dims])
    for p in range(n_feats[best_ant]-1):
        a, b = int(ant_path[best_ant, p]), int(ant_path[best_ant, p+1])
        deta_tau_2[a, b] = gamma * deta_tau_1[a, b]
        
    for p in range(n_feats[near_ant]-1):
        a, b = int(ant_path[near_ant, p]), int(ant_path[near_ant, p+1])
        deta_tau_2[a, b] += (1-gamma) * deta_tau_1[a, b]
    
    # update
    tau = (1-rho) * tau + rho * deta_tau_1 + deta_tau_2
    tau_matrix[epoch] = tau.copy()

print("The Best Ant Path: ", best_path)
print("The Best Score: ", best_score)
print("The Accuracy use Best Path: ", best_path_acc)



the Num of the total features: 23


  eta[i][j] = TP / (TP + FN) /2
  eta[i][j] = TP / (TP + FN) /2
  eta[i][j] = TP / (TP + FN) /2
  eta[i][j] = TP / (TP + FN) /2
  eta[i][j] = TP / (TP + FN) /2
  eta[i][j] = TP / (TP + FN) /2
  eta[i][j] = TP / (TP + FN) /2
  eta[i][j] = TP / (TP + FN) /2
  eta[i][j] = TP / (TP + FN) /2
  eta[i][j] = TP / (TP + FN) /2
  eta[i][j] = TP / (TP + FN) /2
  eta[i][j] = TP / (TP + FN) /2
  eta[i][j] = TP / (TP + FN) /2
  eta[i][j] = TP / (TP + FN) /2
  eta[i][j] = TP / (TP + FN) /2
  eta[i][j] = TP / (TP + FN) /2
  eta[i][j] = TP / (TP + FN) /2
  eta[i][j] = TP / (TP + FN) /2
  eta[i][j] = TP / (TP + FN) /2
  eta[i][j] = TP / (TP + FN) /2
  eta[i][j] = TP / (TP + FN) /2
  eta[i][j] = TP / (TP + FN) /2
  eta[i][j] = TP / (TP + FN) /2
 52%|██████████████████████▍                    | 12/23 [02:06<01:56, 10.55s/it]


KeyboardInterrupt: 

In [None]:
# PSO

In [1]:
import pandas as pd
from sklearn.model_selection import train_test_split
import random
import numpy as np
from collections import Counter
from tqdm import tqdm
from sklearn import svm
from numba import njit
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier, GradientBoostingClassifier
from xgboost import XGBClassifier
from sklearn.neural_network import MLPClassifier
dfSM=pd.read_excel('Actual Data_BalanceData_SMOTE.xlsx')

In [2]:
dfSM.columns

Index(['HDL', 'Heart rate', 'Number of hospitalization days', 'LDL', 'CHOL',
       'Sex', 'History of heart disease', 'BPs', 'Smoke', 'Trop i', 'Diabetes',
       'Age', 'TG', 'PTT', 'Chest pain', 'History of shortness of breath',
       'Family history of heart disease', 'FBS',
       'History of high blood pressure', 'PT', 'BPd', 'CPKMB', 'Hb', 'Target'],
      dtype='object')

In [None]:
import numpy as np
import seaborn as sns
import pandas as pd
import random
import pyswarms as ps

%load_ext autoreload
%autoreload 2
%matplotlib inline
y = dfSM.iloc[:,-1]
y = y.values
X = dfSM.drop('Target',axis=1)
X = X.values
from sklearn.ensemble import RandomForestClassifier
classifier = MLPClassifier()

# Define objective function
def f_per_particle(m, alpha):
    """Computes for the objective function per particle

    Inputs
    ------
    m : numpy.ndarray
        Binary mask that can be obtained from BinaryPSO, will
        be used to mask features.
    alpha: float (default is 0.5)
        Constant weight for trading-off classifier performance
        and number of features

    Returns
    -------
    numpy.ndarray
        Computed objective function
    """
    total_features = 23
    # Get the subset of the features from the binary mask
    if np.count_nonzero(m) == 0:
        X_subset = X
    else:
        X_subset = X[:,m==1]
    # Perform classification and store performance in P
    classifier.fit(X_subset, y)
    P = (classifier.predict(X_subset) == y).mean()
    # Compute for the objective function
    j = (alpha * (1.0 - P) + (1.0 - alpha) * (1 - (X_subset.shape[1] / total_features)))

    return j

def getK(D):
    K = []
    for k in range(D):
        s = 1
        for i in range(k):
            s += (D-i)
        l = ((D-k)/s)
        K.append(l)
    K[0] = 0.95
    return K

k = getK(10)

np.random.choice(list(range(10)), 1, k).item(0)+1

def initialize(D, NoP):
    pk = getK(D)
    k = np.random.choice(list(range(D)), 1, pk).item(0)+1
    X=np.zeros((NoP, k))
    V=np.zeros((NoP, k))
    C=np.zeros((NoP, k))
    for i in range(NoP):
        for j in range(k):
            X[i][j] = random.random()
            V[i][j] = random.uniform(0,1)
    cor = [0]*NoP
    for i in range(NoP):
        for j in range(k):
            Xim = np.mean(X[i])
            Xjm = np.mean(X[j])
            num = (np.sum(X[i])-Xim*(k-1))*(np.sum(X[j])-Xjm*(k-1))
            den = ((np.sum(X[i])-Xim*(k-1))**2) * ((np.sum(X[j])-Xjm*(k-1))**2)
            C[i][j] = num/den
        #s = np.sum(C[i])
        #for j in range(k):
        #    C[i][j] /= s
        if(i != j):
            cor[i] = np.sum(C[i])/k-1
    s = np.sum(cor)
    cor /= s
    return cor

initialize(8, 6)

def f(x, alpha=0.88):
    """Higher-level method to do classification in the
    whole swarm.

    Inputs
    ------
    x: numpy.ndarray of shape (n_particles, dimensions)
        The swarm that will perform the search

    Returns
    -------
    numpy.ndarray of shape (n_particles, )
        The computed loss for each particle
    """
    n_particles = x.shape[0]
    j = [f_per_particle(x[i], alpha) for i in range(n_particles)]
    return np.array(j)


# Initialize swarm, arbitrary
options = {'c1': 0.5, 'c2': 0.5, 'w':0.9, 'k': 30, 'p':2}

# Call instance of PSO
dimensions = 23 # dimensions should be the number of features
optimizer = ps.discrete.BinaryPSO(n_particles=30, dimensions=dimensions, options=options)

# Perform optimization
cost, pos = optimizer.optimize(f, iters=100)



2021-08-03 19:38:33,264 - pyswarms.discrete.binary - INFO - Optimize for 100 iters with {'c1': 0.5, 'c2': 0.5, 'w': 0.9, 'k': 30, 'p': 2}
























































































































In [None]:
classifier = MLPClassifier()
X_selected_features = X[:,pos==1] 
classifier.fit(X_selected_features, y)
subset_performance = (classifier.predict(X_selected_features) == y).mean()
print('Subset performance: %.3f' % (subset_performance))