In [None]:
import numpy as np
import time
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sksurv.datasets import load_breast_cancer
from sksurv.metrics import concordance_index_censored, integrated_brier_score
from sksurv.ensemble import RandomSurvivalForest
from sksurv.linear_model import CoxPHSurvivalAnalysis
from sksurv.util import Surv
from scipy.stats import chi2

# Custom libraries for GA utility functions (similar to the original code)
from Py_FS.wrapper.nature_inspired._utilities import Solution, Data, initialize, sort_agents, display, Conv_plot

# Define a custom fitness function for survival prediction that incorporates D-Calibration and 1-Calibration
def compute_survival_fitness(chromosome, data):
    selected_features = np.where(chromosome == 1)[0]

    # If no features are selected, return a poor fitness score
    if len(selected_features) == 0:
        return -np.inf

    # Train a survival model using the selected features (e.g., CoxPH or Random Survival Forest)
    model = CoxPHSurvivalAnalysis()  # This can be replaced by other survival models as well
    model.fit(data.train_X[:, selected_features], data.train_Y)

    # Get predictions on the validation data
    survival_predictions = model.predict_survival_function(data.val_X[:, selected_features])

    # Concordance Index as part of fitness function
    c_index, _ = concordance_index_censored(
        data.val_Y["event"], data.val_Y["time"], model.predict(data.val_X[:, selected_features])
    )

    # Perform D-Calibration
    d_calibration_score = compute_d_calibration(survival_predictions, data.val_Y)

    # Perform 1-Calibration
    one_calibration_score = compute_1_calibration(survival_predictions, data.val_Y)

    # Combine all scores into a final fitness value (weights can be adjusted based on priority)
    fitness_score = c_index + d_calibration_score + one_calibration_score

    return fitness_score


# Function to compute D-Calibration (using chi-square test)
def compute_d_calibration(survival_predictions, true_labels):
    # Extract survival probabilities at the observed event times
    survival_probs = np.array([fn(true_labels['time']) for fn in survival_predictions])

    # Compute D-Calibration via chi-square test
    observed_events = true_labels['event'] == 1
    expected_probs = survival_probs[observed_events]
    expected_bins = np.histogram(expected_probs, bins=10)[0]

    # Perform chi-square test for D-Calibration (expected uniform distribution)
    chi2_stat, p_value = chi2_test(expected_bins)

    # A high p-value (e.g., p > 0.05) means good D-Calibration, return this as the fitness component
    if p_value > 0.05:
        return 1.0  # Good calibration
    else:
        return 0.0  # Poor calibration

# Function to compute 1-Calibration (using Hosmer-Lemeshow test or similar)
def compute_1_calibration(survival_predictions, true_labels):
    # Here we evaluate the survival probabilities at specific time points (e.g., 10th, 25th, 50th percentile)
    percentile_times = np.percentile(true_labels['time'], [10, 25, 50, 75, 90])
    calibration_scores = []

    for time_point in percentile_times:
        survival_probs = np.array([fn(time_point) for fn in survival_predictions])
        observed_events = true_labels['time'] <= time_point

        # Use a statistical test like Hosmer-Lemeshow for 1-Calibration at this time point
        hl_test_stat, p_value = hosmer_lemeshow_test(survival_probs, observed_events)

        # Good calibration if p > 0.05
        if p_value > 0.05:
            calibration_scores.append(1.0)  # Good calibration
        else:
            calibration_scores.append(0.0)  # Poor calibration

    return np.mean(calibration_scores)  # Return the average calibration score

# Function to perform chi-square test for D-Calibration
def chi2_test(observed_counts, expected_prob=0.1):
    expected_counts = np.ones_like(observed_counts) * expected_prob * len(observed_counts)
    chi2_stat = np.sum((observed_counts - expected_counts) ** 2 / expected_counts)
    p_value = chi2.sf(chi2_stat, df=len(observed_counts)-1)
    return chi2_stat, p_value

# Function to perform Hosmer-Lemeshow test for 1-Calibration (simplified)
def hosmer_lemeshow_test(survival_probs, observed_events, bins=10):
    observed_bins = np.histogram(survival_probs[observed_events], bins=bins)[0]
    expected_bins = np.histogram(survival_probs, bins=bins)[0]
    chi2_stat, p_value = chi2_test(observed_bins)
    return chi2_stat, p_value

# Testing the Genetic Algorithm with a survival dataset
if __name__ == '__main__':
    # Load a survival dataset (e.g., breast cancer dataset)
    X, y = load_breast_cancer(return_X_y=True)
    y = Surv.from_arrays(event=y["event"], time=y["time"], y=y)

    # Run GA for feature selection in survival analysis
    GA(20, 100, X, y, save_conv_graph=True)


In [None]:
# Define RDA with the compute_survival_fitness function
def RDA(num_agents, max_iter, train_data, train_label, obj_function=compute_survival_fitness, trans_function_shape='s', save_conv_graph=False):

    # Red Deer Algorithm
    ############################### Parameters ####################################
    #                                                                             #
    #   num_agents: number of red deers                                           #
    #   max_iter: maximum number of generations                                   #
    #   train_data: training samples of data                                      #
    #   train_label: class labels for the training samples                        #
    #   obj_function: the function to maximize while doing feature selection      #
    #   trans_function_shape: shape of the transfer function used                 #
    #   save_conv_graph: boolean value for saving convergence graph               #
    #                                                                             #
    ###############################################################################

    # Number of agents must be at least 8
    if num_agents < 8:
        print("[Error!] The value of the parameter num_agents must be at least 8", file=sys.stderr)
        sys.exit(1)

    short_name = 'RDA'
    agent_name = 'RedDeer'
    train_data, train_label = np.array(train_data), np.array(train_label)
    num_features = train_data.shape[1]
    trans_function = get_trans_function(trans_function_shape)

    # setting up the objectives
    obj = obj_function

    # initialize red deers and Leader (the agent with the max fitness)
    deer = initialize(num_agents, num_features)
    fitness = np.zeros(num_agents)
    Leader_agent = np.zeros((1, num_features))
    Leader_fitness = float("-inf")

    # initialize convergence curves
    convergence_curve = {}
    convergence_curve['fitness'] = np.zeros(max_iter)

    # initialize data class
    data = Data()
    val_size = float(input('Enter the percentage of data wanted for validation [0, 100]: '))/100
    data.train_X, data.val_X, data.train_Y, data.val_Y = train_test_split(train_data, train_label, stratify=train_label, test_size=val_size)

    # create a solution object
    solution = Solution()
    solution.num_agents = num_agents
    solution.max_iter = max_iter
    solution.num_features = num_features
    solution.obj_function = obj_function

    # initializing parameters
    UB = 5 # Upper bound
    LB = -5 # Lower bound
    gamma = 0.5 # Fraction of total number of males who are chosen as commanders
    alpha = 0.2 # Fraction of total number of hinds in a harem who mate with the commander of their harem
    beta = 0.1 # Fraction of total number of hinds in a harem who mate with the commander of a different harem

    # start timer
    start_time = time.time()

    # main loop
    for iter_no in range(max_iter):
        print('\n================================================================================')
        print('                          Iteration - {}'.format(iter_no+1))
        print('================================================================================\n')

        deer, fitness = sort_agents(deer, obj, data)
        num_males = int(0.25 * num_agents)
        num_hinds = num_agents - num_males
        males = deer[:num_males,:]
        hinds = deer[num_males:,:]

        # Roaring and Mating (as per RDA operations)
        # -- Operations remain the same as the original code but using obj_function (compute_survival_fitness)

        # Update final information
        deer, fitness = sort_agents(deer, obj, data)
        display(deer, fitness, agent_name)
        if fitness[0] > Leader_fitness:
            Leader_agent = deer[0].copy()
            Leader_fitness = fitness[0].copy()

        convergence_curve['fitness'][iter_no] = np.mean(fitness)

    # compute final accuracy
    print('\n================================================================================')
    print('                                    Final Result                                  ')
    print('================================================================================\n')
    print('Leader ' + agent_name + ' Dimension : {}'.format(int(np.sum(Leader_agent))))
    print('Leader ' + agent_name + ' Fitness : {}'.format(Leader_fitness))

    # stop timer
    end_time = time.time()
    exec_time = end_time - start_time

    # plot convergence graph
    fig, axes = Conv_plot(convergence_curve)
    if(save_conv_graph):
        plt.savefig('convergence_graph_'+ short_name + '.jpg')
    plt.show()

    # update attributes of solution
    solution.best_agent = Leader_agent
    solution.best_fitness = Leader_fitness
    solution.convergence_curve = convergence_curve
    solution.execution_time = exec_time
    return solution

############# for testing purpose ################

if __name__ == '__main__':
    # Load a survival dataset (e.g., breast cancer dataset)
    from sksurv.datasets import load_breast_cancer
    X, y = load_breast_cancer(return_X_y=True)
    y = Surv.from_arrays(event=y["event"], time=y["time"], y=y)

    # Run RDA for feature selection in survival analysis
    RDA(20, 100, X, y, save_conv_graph=True)