<a href="https://colab.research.google.com/github/BehrangEbrahimi13/Repo_Paper_01/blob/%2303-Implementation-Feature-Screening/Paper_01.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Functions

## Generate synthetic data

In [76]:
import numpy as np

def generate_synthetic_data(num_samples, num_class_dependent_features, num_class_independent_features, noise, low, high, round, seed):
  np.random.seed(seed)

  # Generate random class values according to a desired distribution
  class_data = np.random.uniform(low, high, size=num_samples).round(round)

  # Initialize an empty array to hold the feature data
  class_dependent_feature_data = np.zeros((num_samples, num_class_dependent_features))

  # Generate random values for each feature independently
  for i in range(num_class_dependent_features):
      class_dependent_feature_data[:, i] = np.random.uniform(low, high, size=num_samples).round(round)

  # Modify the feature values based on their relationship with the class
  for i in range(num_class_dependent_features):
      class_dependent_feature_data[:, i] += class_data * (i + 1) # You can multiply the class_data by a scaling factor to control the relationship strength

  # Create a linear combination of the last two features and add to the rest of the features
  dependent_column = np.zeros((num_class_dependent_features, 1))
  dependent_column[num_class_dependent_features-2:, 0] = 1
  new_dependent_feature = np.dot(class_dependent_feature_data, dependent_column)
  feature_data = np.column_stack((class_dependent_feature_data, new_dependent_feature))

  # Generate random independent features
  class_independent_features = np.random.rand(num_samples, num_class_independent_features)

  # Optional: Add some noise to the features to make them more diverse
  class_independent_features = class_independent_features + np.random.normal(0, noise, class_independent_features.shape)

  # Merge the independent features from class and feature data into a single feature_data
  feature_data = np.column_stack((feature_data, class_independent_features))

  # Merge the class and feature data into a single dataset
  # dataset = np.column_stack((class_data, feature_data))

  return class_data, feature_data


## Random Null Generation for a Dataset

In [None]:
import random
import numpy as np
import pandas as pd

def generate_random_array(shape, low, high, round, seed=None, as_dataframe=False):
    np.random.seed(seed)
    random_array = np.random.uniform(low, high, size=(shape)).round(round)
    if as_dataframe:
        return pd.DataFrame(random_array)
    return random_array

def generate_random_nulls(dataset, percentage, seed=None, as_dataframe=False):
    temp = dataset.copy()
    np.random.seed(seed)
    null_mask_indices = np.random.choice(range(temp.size), size=int(temp.size * percentage), replace=False)
    if as_dataframe:
        df_null_mask = pd.DataFrame(False, index=temp.index, columns=temp.columns)
        df_null_mask.values.flat[null_mask_indices] = True
        df_masked = temp.where(~df_null_mask)
        return df_masked
    temp.ravel()[null_mask_indices] = np.nan
    return temp

# Module 1 : PMIC
Generally, feature selection does not work on missing data, so imputation is needed beforehand. However, considering irrelevant features severely affect imputation, we use MIC to select features on missing data by ‘‘partial sample strategy’’ (PSS), which is called **PMIC**. ‘‘Partial sample strategy’’ means using the available values of all feature variables and class variable to calculate MIC

In [19]:
!pip install minepy

Collecting minepy
  Downloading minepy-1.2.6.tar.gz (496 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/497.0 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m92.2/497.0 kB[0m [31m2.6 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m497.0/497.0 kB[0m [31m7.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: minepy
  Building wheel for minepy (setup.py) ... [?25l[?25hdone
  Created wheel for minepy: filename=minepy-1.2.6-cp310-cp310-linux_x86_64.whl size=187016 sha256=0f67c43bb0870bd7b0801801e1ee37d5384cc0d26f83dd37078ef3720bfdb4b7
  Stored in directory: /root/.cache/pip/wheels/69/38/a6/825bb9b9ed81e6af43a0ef80c7cfe4cafcfdbc2f5cde2959d9
Successfully built minepy
Installing collected packages: minepy
Successfully installed minepy-1.2.6


In [20]:
import numpy as np
from minepy import MINE

mine = MINE()
def pmic_feature_selection(F, C, m):
    num_features = F.shape[1]
    pmic_scores = np.zeros(num_features)

    for i in range(num_features):
        musk = ~np.isnan(F[:, i])
        # Select column i without null values
        feature_without_null = F[musk, i]

        # Filter y based on non-null values in column i
        class_without_null = C[musk]

        # Calculate the MIC (Maximal Information Coefficient) score for the current feature and class
        mine.compute_score(feature_without_null, class_without_null)
        pmic_scores[i] = mine.mic()

    # Sort the indices of pmic_scores in descending order and select the top m indices
    top_m_features_idx = np.argsort(pmic_scores)[::-1][:m]
    return top_m_features_idx

### Example with synthetic data

In [None]:
num_samples = 1000
num_class_dependent_features = 6
num_class_independent_features = 3
low = 0
high = 1
round = 2
noise = 0.1
seed = 42

class_data, feature_data = generate_synthetic_data(num_samples, num_class_dependent_features, num_class_independent_features, noise, low, high, round, seed)

selected_idx = pmic_feature_selection(feature_data, class_data, num_class_dependent_features)
selected_features = feature_data[:, selected_idx]

# print("\nfeature_data: \n", feature_data)
print(f"\nSelected {num_class_dependent_features} indices of features :\n", selected_idx)
print(f"\nSelected {num_class_dependent_features} features :\n", selected_features)

## Example Module 1 with Functions

In [None]:
seed = 48
percentage = 0.5
select_top = 3
row = 5

# Generate a random 5x4 array with one-digit random values (0-9)
X = generate_random_array(shape=(row, 4), low=0, high=10, round=0, seed=seed, as_dataframe=False)
print("complete X : \n", X)

# Generate a random 5x1 array with one-digit random values (0-9)
Y = generate_random_array((row,), 0, 10, 0, seed=48, as_dataframe=False)
print("\ny: \n", Y)

X_with_null = generate_random_nulls(dataset=X, percentage=percentage, seed=seed, as_dataframe=False)
selected_idx = pmic_feature_selection(X_with_null, Y, select_top)
selected_features = X_with_null[:, selected_idx]

print("\nX_with_null: \n", X_with_null)
print(f"\nSelected {select_top} indices of features ({percentage * 100} percent null):\n", selected_idx)
print(f"\nSelected {select_top} features ({percentage * 100} percent null):\n", selected_features)

# Module 2 : Imputation for the missing data

## Non-negative Latent Factor

Description:
*   R: Incomplete data matrix of shape (n, m).
*   d: Rank of the non-negative latent factors.
*   lambda1, lambda2: Regularization parameters.
*   max_iter: Maximum number of iterations.








In [None]:
import numpy as np

def Imputes_the_missing_values_By_non_negative_latent_factor(R, d, lambda1, lambda2, max_iter):
    # Initialize non-negative matrix P randomly
    n, m = R.shape
    R_copy = np.copy(R)
    P = np.random.rand(n, d)

    # Initialize non-negative matrix Q randomly
    Q = np.random.rand(d, m)

    # Initialize I according to (17)
    I = np.ones((n, m))
    R_nan_mask = np.isnan(R_copy)
    I[R_nan_mask] = 0

    # Set zero for Null
    R_copy[R_nan_mask] = 0

    # Initialize iteration counter
    iter = 0

    # Convergence criterion
    converge = False

    while not converge and iter < max_iter:
        # Update P according to (22)
        P_new = P * ((I * R_copy) @ Q.T) / ((I * (P @ Q)) @ Q.T + lambda1 * P)

        # Update Q according to (23)
        Q_new = Q * (P_new.T @ (I * R_copy)) / (P_new.T @ (I * (P_new @ Q)) + lambda2 * Q)

        # Check convergence
        if np.allclose(P, P_new) and np.allclose(Q, Q_new):
            converge = True

        # Update P and Q
        P = P_new
        Q = Q_new

        # Increment iteration counter
        iter += 1

    # Impute R by (11) and obtain R_cpl
    PQ = np.round( P @ Q, decimals=3)
    R_cpl = np.where(R_nan_mask, PQ, R)

    return R_cpl


## Hyperimpute

In [None]:
!pip install hyperimpute

In [None]:
import pandas as pd
import numpy as np
from hyperimpute.plugins.imputers import Imputers
imputers = Imputers()

X = pd.DataFrame([[1, 4, 7, 10], [4, 7, np.nan, np.nan], [3, 6, 9, 12], [8, 11, 14, 17]])

method = "gain"

plugin = Imputers().get(method)
out = plugin.fit_transform(X.copy()).round(2)

print(method, out)

gain      0     1     2      3
0  1.0   4.0   7.0  10.00
1  4.0   7.0  10.3  13.45
2  3.0   6.0   9.0  12.00
3  8.0  11.0  14.0  17.00


## Example Module 2 Non-negative Latent Factor and GAIN with Functions

In [None]:
seed = 48
round = 2
percentage = 0.3
row = 5

# Generate a random 5x4 array with one-digit random values (0-9)
X = generate_random_array(shape=(row, 4), low=0, high=10, round=round, seed=seed, as_dataframe=False)
print("complete X : \n", X)

X_with_null = generate_random_nulls(dataset=X, percentage=percentage, seed=seed, as_dataframe=False)
print("\nX_with_null : \n", X_with_null)

R = np.copy(X_with_null)
d = 2
lambda1 = 0.1
lambda2 = 0.2
max_iter = 100

# Code execution
R_cpl = Imputes_the_missing_values_By_non_negative_latent_factor(R, d, lambda1, lambda2, max_iter)
print("\nComplete data after imputation by non_negative_latent_factor: \n", R_cpl)


method = "gain"
plugin = Imputers().get(method)
out = plugin.fit_transform(R.copy()).round(round)

print(f'\nComplete data after imputation by {method}: \n', out)

complete X : 
 [[0.17 8.92 2.85 2.99]
 [7.92 3.24 8.65 4.48]
 [5.48 3.57 1.12 1.42]
 [4.45 7.32 4.6  5.93]
 [3.37 4.54 1.87 4.09]]

X_with_null : 
 [[0.17  nan 2.85 2.99]
 [7.92  nan 8.65  nan]
 [ nan  nan 1.12 1.42]
 [ nan 7.32 4.6  5.93]
 [3.37 4.54 1.87 4.09]]

Complete data after imputation by non_negative_latent_factor: 
 [[ 0.17   0.235  2.85   2.99 ]
 [ 7.92   9.428  8.65  10.738]
 [ 0.906  1.078  1.12   1.42 ]
 [ 5.593  7.32   4.6    5.93 ]
 [ 3.37   4.54   1.87   4.09 ]]

Complete data after imputation by gain: 
       0     1     2     3
0  0.17  4.57  2.85  2.99
1  7.92  5.84  8.65  4.05
2  1.29  4.57  1.12  1.42
3  3.75  7.32  4.60  5.93
4  3.37  4.54  1.87  4.09


# Module 3 : Select Featurs

## Step 1: Dimension Reduction and Feature Extraction


In [None]:
import torch
import torch.nn as nn
import numpy as np
import pandas as pd

# Encoder Layer Class
class EncoderLayer(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(EncoderLayer, self).__init__()
        self.fc = nn.Linear(input_size, hidden_size)
        self.activation = nn.ReLU()

    def forward(self, x):
        out = self.fc(x)
        out = self.activation(out)
        return out

# Autoencoder Class
class Autoencoder(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes=None):
        super(Autoencoder, self).__init__()
        self.encoder = EncoderLayer(input_size, hidden_size)
        self.decoder = nn.Sequential(
            nn.Linear(hidden_size, input_size),
            nn.Sigmoid(),
        )
        self.num_classes = num_classes
        if num_classes:
            self.classifier = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        if self.num_classes:
            logits = self.classifier(encoded)
            return decoded, logits
        else:
            return decoded

# Function to generate random array data
def generate_random_array(shape, low, high, round_val, seed=None, as_dataframe=False):
    np.random.seed(seed)
    random_array = np.random.uniform(low, high, size=shape).round(round_val)
    if as_dataframe:
        return pd.DataFrame(random_array)
    return random_array

# Function to generate random null values in the dataset
def generate_random_nulls(dataset, percentage, seed=None, as_dataframe=False):
    temp = dataset.copy()
    np.random.seed(seed)
    null_mask_indices = np.random.choice(range(temp.size), size=int(temp.size * percentage), replace=False)
    if as_dataframe:
        df_null_mask = pd.DataFrame(False, index=temp.index, columns=temp.columns)
        df_null_mask.values.flat[null_mask_indices] = True
        df_masked = temp.where(~df_null_mask)
        return df_masked
    temp.ravel()[null_mask_indices] = np.nan
    return temp


### Scenario 1: Unsupervised Learning

In [None]:
# Scenario 1: Unsupervised Learning

# Generate sample data
data = generate_random_array((100, 10), 0, 100, 2, seed=1)

# Create an instance of the Autoencoder model for Scenario 1
autoencoder_unsupervised = Autoencoder(input_size=10, hidden_size=5)

# Train the Autoencoder model
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(autoencoder_unsupervised.parameters(), lr=0.001, weight_decay=1e-5)

for epoch in range(100):
    inputs = torch.Tensor(data)
    outputs = autoencoder_unsupervised(inputs)
    loss = criterion(outputs, inputs)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

# Encode the entire dataset
encoded_data = autoencoder_unsupervised.encoder(torch.Tensor(data)).detach().numpy()

print("Encoded Data:")
print(encoded_data)

# Normalize the encoded data between 0 and 1 for each column
normalized_data = np.zeros_like(encoded_data)

for i in range(encoded_data.shape[1]):
    column = encoded_data[:, i]
    min_val = np.min(column)
    max_val = np.max(column)
    if max_val - min_val == 0:
        normalized_column = np.zeros_like(column)
    else:
        normalized_column = (column - min_val) / (max_val - min_val)
    normalized_data[:, i] = normalized_column

print("Normalized Encoded Data:")
print(normalized_data)

### Scenario 2: Supervised Learning with 1 Class

In [None]:
# Scenario 2: Supervised Learning with 1 Class

# Generate sample data
data = generate_random_array((100, 10), 0, 100, 2, seed=2)
labels = np.random.randint(0, 2, size=(100,))

# Create an instance of the Autoencoder with Classification model for Scenario 2
autoencoder_supervised_1class = Autoencoder(input_size=10, hidden_size=5, num_classes=1)

# Train the Autoencoder with Classification model
criterion = nn.MSELoss()
classification_criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(autoencoder_supervised_1class.parameters(), lr=0.001, weight_decay=1e-5)

for epoch in range(100):
    inputs = torch.Tensor(data)
    labels_tensor = torch.Tensor(labels)

    # Forward pass
    outputs, logits = autoencoder_supervised_1class(inputs)
    reconstruction_loss = criterion(outputs, inputs)
    classification_loss = classification_criterion(logits.squeeze(), labels_tensor)

    # Total loss
    loss = reconstruction_loss + classification_loss

    # Backward pass and optimization
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

# Encode the entire dataset
encoded_data = autoencoder_supervised_1class.encoder(torch.Tensor(data)).detach().numpy()

print("Encoded Data:")
print(encoded_data)

# Normalize the encoded data between 0 and 1 for each column
normalized_data = np.zeros_like(encoded_data)

for i in range(encoded_data.shape[1]):
    column = encoded_data[:, i]
    min_val = np.min(column)
    max_val = np.max(column)
    if max_val - min_val == 0:
        normalized_column = np.zeros_like(column)
    else:
        normalized_column = (column - min_val) / (max_val - min_val)
    normalized_column = np.round(normalized_column, decimals=4)  # Round to 4 decimal places
    normalized_data[:, i] = normalized_column

print("Normalized Encoded Data:")
print(normalized_data)

### Scenario 3: Supervised Learning with Multiple Classes (3)

In [None]:
# Scenario 3: Supervised Learning with Multiple Classes (3)

# Generate sample data
data = generate_random_array((100, 10), 0, 100, 2, seed=3)
labels = np.random.randint(0, 3, size=(100,))

# Create an instance of the Autoencoder with Classification model for Scenario 3
autoencoder_supervised_multiclass = Autoencoder(input_size=10, hidden_size=5, num_classes=3)

# Train the Autoencoder with Classification model
criterion = nn.MSELoss()
classification_criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(autoencoder_supervised_multiclass.parameters(), lr=0.001, weight_decay=1e-5)

for epoch in range(100):
    inputs = torch.Tensor(data)
    labels_tensor = torch.Tensor(labels).long()

    # Forward pass
    outputs, logits = autoencoder_supervised_multiclass(inputs)
    reconstruction_loss = criterion(outputs, inputs)
    classification_loss = classification_criterion(logits.squeeze(), labels_tensor)

    # Total loss
    loss = reconstruction_loss + classification_loss

    # Backward pass and optimization
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

# Encode the entire dataset
encoded_data = autoencoder_supervised_multiclass.encoder(torch.Tensor(data)).detach().numpy()

print("Encoded Data:")
print(encoded_data)

# Normalize the encoded data between 0 and 1 for each column
normalized_data = np.zeros_like(encoded_data)

for i in range(encoded_data.shape[1]):
    column = encoded_data[:, i]
    min_val = np.min(column)
    max_val = np.max(column)
    if max_val - min_val == 0:
        normalized_column = np.zeros_like(column)
    else:
        normalized_column = (column - min_val) / (max_val - min_val)
    normalized_column = np.round(normalized_column, decimals=4)  # Round to 4 decimal places
    normalized_data[:, i] = normalized_column

print("Normalized Encoded Data:")
print(normalized_data)

## Step 2: Feature Screening

Screen all the features via multivariate rank distance correlation learning to select the relevant ones.

The reason for using feature screening instead of neural network is the small amount of training samples.

When training samples are limited, neural network can easily result in overfitting; however, if we only consider one feature at a time, the dependence between the feature and xencoded can still be well estimated even when the sample size is small.

In [None]:
import numpy as np
from scipy.spatial.distance import pdist, squareform

def multivariate_rank_distance(X, Y=None):
    """
    Compute the multivariate rank distance correlation (MrDc) between two sets of variables X and Y.
    If Y is not provided, it computes the unsupervised MrDc.
    Args:
        X (ndarray): Predictor variables, shape (n, p).
        Y (ndarray): Response variables, shape (n, m).
    Returns:
        float: The MrDc value.
    """
    n, p = X.shape

    if Y is None:  # Unsupervised MrDc
        Y = X

    # Compute pairwise distance matrices
    X_dist = squareform(pdist(X, 'euclidean'))
    Y_dist = squareform(pdist(Y, 'euclidean'))

    # Compute ranks
    X_ranks = np.argsort(X_dist, axis=0).argsort(axis=0)
    Y_ranks = np.argsort(Y_dist, axis=0).argsort(axis=0)

    # Compute rank distance correlation
    R = np.sum((X_ranks - Y_ranks) ** 2) / (n * (n - 1))
    return np.sqrt(R)

def mrdc_sis(X, k, y=None):
    """
    Perform feature screening using the MrDc-SIS method.
    Args:
        X (ndarray): Predictor variables, shape (n, p).
        k (int): Number of top-ranked variables to select.
        y (ndarray): Response variable (optional), shape (n,).
    Returns:
        ndarray: Indices of the selected variables.
    """
    p = X.shape[1]
    scores = np.zeros(p)

    # Compute MrDc scores for each predictor variable
    for i in range(p):
        if y is not None:  # Supervised MrDc
            scores[i] = multivariate_rank_distance(X[:, i:i+1], y.reshape(-1, 1))
        else:  # Unsupervised MrDc
            scores[i] = multivariate_rank_distance(X[:, i:i+1])

    # Rank the variables based on their scores
    ranks = np.argsort(scores)
    selected_indices = ranks[:k]

    return selected_indices

# Example usage for both supervised and unsupervised feature screening
# Generate random data
np.random.seed(0)
n = 100  # Sample size
p = 10   # Number of predictor variables
m = 3    # Number of response variables
X = np.random.rand(n, p)
y = np.random.rand(n)

# Perform supervised feature screening with MrDc-SIS
k = 3  # Number of top-ranked variables to select
selected_indices_supervised = mrdc_sis(X, k, y)
selected_variables_supervised = X[:, selected_indices_supervised]

print("Selected variables indices (supervised):", selected_indices_supervised)
print("Selected variables (supervised):")
print(selected_variables_supervised)

# Perform unsupervised feature screening with MrDc-SIS
selected_indices_unsupervised = mrdc_sis(X, k)
selected_variables_unsupervised = X[:, selected_indices_unsupervised]

print("Selected variables indices (unsupervised):", selected_indices_unsupervised)
print("Selected variables (unsupervised):")
print(selected_variables_unsupervised)

In [None]:
import numpy as np
from scipy.spatial.distance import pdist

# Generate a random dataset
np.random.seed(0)
X = np.random.rand(5, 3)

# Compute pairwise distances using Euclidean distance metric
distances = pdist(X, metric='euclidean')

print("X : ", X)

print("Pairwise distances:")
print(distances)

X :  [[0.5488135  0.71518937 0.60276338]
 [0.54488318 0.4236548  0.64589411]
 [0.43758721 0.891773   0.96366276]
 [0.38344152 0.79172504 0.52889492]
 [0.56804456 0.92559664 0.07103606]]
Pairwise distances:
[0.29473397 0.41689499 0.19662693 0.57216693 0.57586803 0.41860234
 0.76350759 0.44940452 0.90274337 0.51150232]
