In [1]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score
from sklearn.svm import SVC
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt

In [2]:
class QPSO:
    def __init__(self, n_particles, n_dimensions, max_iterations, g=1.5):
        """
        Initialize QPSO algorithm.
        
        Parameters:
        - n_particles: Number of particles in the swarm
        - n_dimensions: Number of features in the dataset
        - max_iterations: Maximum number of iterations
        - g: Parameter controlling the characteristic length (g > 1)
        """
        self.n_particles = n_particles
        self.n_dimensions = n_dimensions
        self.max_iterations = max_iterations
        self.g = g
        
        # Initialize particles with random positions in [0, 1]
        self.positions = np.random.random((n_particles, n_dimensions))
        
        # Initialize personal best positions and values
        self.pbest_positions = self.positions.copy()
        self.pbest_values = np.full(n_particles, np.inf)
        
        # Initialize global best position and value
        self.gbest_position = np.zeros(n_dimensions)
        self.gbest_value = np.inf
        
        # Initialize convergence curve
        self.convergence_curve = np.zeros(max_iterations)
    
    def update(self, fitness_function, callback=None, interval=10):
        """
        Update the positions of all particles based on QPSO algorithm.
        """
        for iter_idx in range(self.max_iterations):
            # Evaluate fitness for each particle
            for i in range(self.n_particles):
                fitness = fitness_function(self.positions[i])
                
                # Update personal best if the current position is better
                if fitness < self.pbest_values[i]:
                    self.pbest_values[i] = fitness
                    self.pbest_positions[i] = self.positions[i].copy()
                
                # Update global best if this particle's best is better
                if fitness < self.gbest_value:
                    self.gbest_value = fitness
                    self.gbest_position = self.positions[i].copy()
            
            # Calculate mean best position (mbest)
            mbest = np.mean(self.pbest_positions, axis=0)
            
            # Update positions using QPSO (delta potential well)
            for i in range(self.n_particles):
                # Calculate local attractor (p)
                phi = np.random.random(self.n_dimensions)
                p = phi * self.pbest_positions[i] + (1 - phi) * self.gbest_position
                
                # Calculate characteristic length (L)
                L = self.g * np.abs(mbest - self.positions[i])
                
                # Generate u (uniformly distributed random number)
                u = np.random.random(self.n_dimensions)
                
                # Update position based on delta potential well
                if np.random.random() > 0.5:
                    self.positions[i] = p + L * (-np.log(u))
                else:
                    self.positions[i] = p - L * (-np.log(u))
                
                # Clip positions to [0, 1]
                self.positions[i] = np.clip(self.positions[i], 0, 1)
            
            # Update convergence curve
            self.convergence_curve[iter_idx] = self.gbest_value
            
            # Call callback if provided and it's time
            if callback is not None and (iter_idx + 1) % interval == 0:
                callback(self, iter_idx + 1)
        
        return self.gbest_position, self.gbest_value, self.convergence_curve

In [8]:
def qpso_feature_selection(X, y, n_particles=30, max_iterations=100, g=1.5, alpha=0.99):
    """
    Perform feature selection using QPSO algorithm.
    
    Parameters:
    - X: Feature matrix
    - y: Target vector
    - n_particles: Number of particles in the swarm
    - max_iterations: Maximum number of iterations
    - g: Parameter controlling the characteristic length (g > 1)
    - alpha: Weighting factor for fitness function (0 <= alpha <= 1)
    """
    n_features = X.shape[1]
    
    # Initialize QPSO
    qpso = QPSO(n_particles, n_features, max_iterations, g)
    
    # Define fitness function
    def fitness_function(position):
        # Convert continuous position to binary (threshold = 0.5)
        binary_position = (position > 0.5).astype(int)
        
        # If no features are selected, return a high fitness value
        if np.sum(binary_position) == 0:
            return 1.0
        
        # Select the subset of features
        X_subset = X[:, binary_position == 1]
        
        # Evaluate the performance using SVM with 3-fold cross-validation
        classifier = SVC(C = 19,kernel='rbf', decision_function_shape='ovr')
        scores = cross_val_score(classifier, X_subset, y, cv=3)
        accuracy = np.mean(scores)
        
        # Calculate the fitness value (lower is better)
        # Balance between accuracy and number of features
        num_selected_features = np.sum(binary_position)
        fitness = alpha * (1.0 - accuracy) + (1.0 - alpha) * (num_selected_features / n_features)
        
        return fitness
    
    # Progress callback function
    def callback(s, iteration):
        best_value_avg = np.mean([s.pbest_values[i] for i in range(s.n_particles)])
        print(f"Iteration {iteration}/{max_iterations}: Best = {s.gbest_value:.6f}, Avg = {best_value_avg:.6f}")
    
    # Run QPSO algorithm
    best_position, best_fitness, convergence_curve = qpso.update(fitness_function, callback)
    
    # Convert the best position to binary (threshold = 0.5)
    selected_features = (best_position > 0.5).astype(bool)
    
    return selected_features, convergence_curve


In [9]:
def get_pca(x_train, x_test, n_components):
    pca = PCA(n_components=n_components, svd_solver='full')
    pca.fit(x_train)
    x_train = pca.transform(x_train)
    x_test = pca.transform(x_test)
    print(x_train.shape, x_test.shape)
    return x_train, x_test

In [10]:
def Opt_Features(csv_file, target_column):
    """
    Run feature selection on data from a CSV file.
    
    Parameters:
    - csv_file: Path to the CSV file
    - target_column: Name of the target column in the CSV
    """
    # Load data from CSV
    print(f"Loading data from {csv_file}...")
    data = pd.read_csv(csv_file)
    data = data.drop('id_code', axis=1)
    
    # Separate features and target
    X = data.drop(columns=[target_column]).values
    y = data[target_column].values
    
    # Scale features
    scaler = StandardScaler()
    X = scaler.fit_transform(X)
    
    
    # Split data into training and testing sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    temp_train_data = X_train
    temp_test_data = X_test
    temp_train_data, temp_test_data = get_pca(temp_train_data, temp_test_data, 0.98)
    
    # Get feature names
    feature_names = data.drop(columns=[target_column]).columns.tolist()
    
    print(f"Total number of features: {len(feature_names)}")
    print("Running QPSO for feature selection...")
    
    # Run QPSO for feature selection
    selected_features, convergence_curve = qpso_feature_selection(temp_train_data, y_train)
    
    # Get names of selected features
    selected_feature_names = [feature_names[i] for i in range(len(feature_names)) if selected_features[i]]
    
    print(f"Number of selected features: {np.sum(selected_features)}")
    print("Selected features:")
    for i, feature in enumerate(selected_feature_names):
        print(f"{i+1}. {feature}")
    
    # Evaluate performance with selected features
    X_train_selected = temp_train_data[:, selected_features]
    X_test_selected = temp_test_data[:, selected_features]
    
    # Train a classifier with selected features
    clf=SVC(C = 19,kernel='rbf', decision_function_shape='ovr')
    clf.fit(X_train_selected, y_train)
    
    # Evaluate on test set
    y_pred = clf.predict(X_test_selected)
    accuracy = accuracy_score(y_test, y_pred)
    print(f"Accuracy with selected features: {accuracy:.4f}")
    
    # Compare with performance using all features
    #clf_all = SVC()
    #clf_all.fit(X_train, y_train)
    #y_pred_all = clf_all.predict(X_test)
    #accuracy_all = accuracy_score(y_test, y_pred_all)
    #print(f"Accuracy with all features: {accuracy_all:.4f}")
    
    # Plot convergence curve
    plt.figure(figsize=(10, 6))
    plt.plot(range(1, len(convergence_curve) + 1), convergence_curve)
    plt.xlabel('Iteration')
    plt.ylabel('Fitness Value')
    plt.title('QPSO Convergence Curve')
    plt.grid(True)
    plt.show()
    
    # Save selected features to a file
    with open('selected_features.txt', 'w') as f:
        for feature in selected_feature_names:
            f.write(f"{feature}\n")
    
    print("Feature selection completed successfully.")



In [None]:

Opt_Features("C:/Users/hciii/Dhruba/D_Personal/Feature_Selection/featureoptimization/featureoptimization/efficientnetb3newfeat.csv","label")


Loading data from C:/Users/hciii/Dhruba/D_Personal/Feature_Selection/featureoptimization/featureoptimization/efficientnetb3newfeat.csv...
(13540, 83) (3385, 83)
Total number of features: 1536
Running QPSO for feature selection...
