# Marine data mGPS algorithm - 04/04/2025

In [55]:
# Import libraries
import ray
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_selection import RFE
from sklearn.pipeline import make_pipeline
from sklearn.model_selection import StratifiedKFold
import numpy as np
import time
from tqdm import tqdm
import os

  from .autonotebook import tqdm as notebook_tqdm
2025-04-04 15:09:32,193	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.


In [56]:
# Import datasets
os.chdir("/home/inf-21-2024/binp37/")

def data_normalize(df):
    """Normalizes DataFrame rows by dividing each element by the row sum."""
    return df.div(df.sum(axis=1), axis=0)

# Read the metadata for the metasub data.
marine_taxa = pd.read_csv("./data/marine/marine_taxa.csv")
taxa = list(marine_taxa.iloc[:,1:2512].columns)
marine_taxa = marine_taxa.fillna(0)
marine_taxa['Sea'] = marine_taxa['Sea'].str.replace('[^A-Za-z0-9_]+', '_', regex=True).apply(lambda x:str(x).lower()).astype('category')

# Normalize data
cols_to_drop = []
for i in range(1, 2512):  # Python indexing starts from 0
    if marine_taxa.iloc[:, i].sum() == 0:
        cols_to_drop.append(marine_taxa.columns[i])

marine_taxa = marine_taxa.drop(columns=cols_to_drop)
taxa_cols_after_drop = marine_taxa.columns[2:-6] # Adjusting index after dropping columns
taxa = list(taxa_cols_after_drop)

marine_taxa_new = marine_taxa.copy() # Create a new dataframe to avoid SettingWithCopyWarning
marine_taxa_new[taxa] = data_normalize(marine_taxa[taxa])
marine_taxa_new = marine_taxa_new.drop(columns='Unnamed: 0',axis=1)
marine_taxa_new.head()

Unnamed: 0,Root,Root;k__Archaea;p__Crenarchaeota;c__MBGA;o__;f__;g__;s__,Root;k__Archaea;p__Crenarchaeota;c__MBGA;o__NRP-J;f__;g__;s__,Root;k__Archaea;p__Crenarchaeota;c__MBGB;o__;f__;g__;s__,Root;k__Archaea;p__Crenarchaeota;c__MCG;o__;f__;g__;s__,Root;k__Archaea;p__Crenarchaeota;c__MCG;o__B10;f__;g__;s__,Root;k__Archaea;p__Crenarchaeota;c__MCG;o__pGrfC26;f__;g__;s__,Root;k__Archaea;p__Crenarchaeota;c__THSCG;o__;f__;g__;s__,Root;k__Archaea;p__Crenarchaeota;c__Thaumarchaeota;o__AK31;f__;g__;s__,Root;k__Archaea;p__Crenarchaeota;c__Thaumarchaeota;o__Cenarchaeales;f__;g__;s__,...,Root;k__Bacteria;p__[Thermi];c__Deinococci;o__Deinococcales;f__Trueperaceae;g__B-42;s__,Root;k__Bacteria;p__[Thermi];c__Deinococci;o__Deinococcales;f__Trueperaceae;g__GBI-58;s__,Root;k__Bacteria;p__[Thermi];c__Deinococci;o__Deinococcales;f__Trueperaceae;g__Truepera;s__,Root;k__Bacteria;p__[Thermi];c__Deinococci;o__Thermales;f__Thermaceae;g__Thermus;s__,Sample,latitude,longitude,accession,geometry,Sea
0,4758,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,ERS477998,42.1735,17.7252,ERS477998,POINT (17.7252 42.1735),mediterranean_sea
1,5575,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,ERS477979,42.2038,17.715,ERS477979,POINT (17.715 42.2038),mediterranean_sea
2,8280,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,ERS478017,33.9179,32.898,ERS478017,POINT (32.898 33.9179),mediterranean_sea
3,690,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,ERS478040,33.9235,32.8118,ERS478040,POINT (32.8118 33.9235),mediterranean_sea
4,14886,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,ERS493431,35.4002,-127.7499,ERS493431,POINT (-127.7499 35.4002),north_pacific_ocean


## Get Geogrpahically Informative Taxa (GITs)

### Determine the best set of features to select, GITs

In [85]:
import ray
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_selection import RFE
from sklearn.pipeline import make_pipeline
from sklearn.model_selection import StratifiedKFold
import numpy as np
import time
from tqdm import tqdm
import os

def parallel_rfe_feature_selection(X: pd.DataFrame, y: pd.Series, n_jobs: int = 1, random_state: int = 123,
                                   cv: int = 10, subsets: list = None, remove_correlated: bool = True,
                                   correlation_threshold: float = 0.98, num_cpus: int = None):
    """
    Performs parallel Recursive Feature Elimination (RFE) with cross-validation to select the best feature subset.

    Args:
        X (pd.DataFrame): DataFrame of features.
        y (pd.Series): Series of the target variable.
        n_jobs (int): Number of jobs for the base estimator (RandomForestClassifier).
        random_state (int): Random state for reproducibility.
        cv (int): Number of cross-validation folds.
        subsets (list, optional): List of feature subset sizes to evaluate. If None, default subsets are used. Defaults to None.
        remove_correlated (bool, optional): Whether to remove highly correlated features before RFE. Defaults to True.
        correlation_threshold (float, optional): Threshold for identifying highly correlated features. Defaults to 0.98.
        num_cpus (int, optional): Number of CPUs to use for Ray. If None, Ray will auto-detect. Defaults to None.

    Returns:
        tuple: A tuple containing:
            - best_params (dict): Dictionary with the best RFE parameters.
            - best_accuracy (float): The best mean cross-validation accuracy achieved.
            - results_df (pd.DataFrame): DataFrame containing the mean accuracy for each feature subset size.
            - elapsed_time (float): Total time taken for the feature selection process.
            - all_supports (dict): Dictionary where keys are (n_features, fold) and values are boolean arrays indicating feature support.
    """
    if ray.is_initialized():
        ray.shutdown()
    ray.init(ignore_reinit_error=True, num_cpus=num_cpus)

    model = RandomForestClassifier(n_jobs=n_jobs, random_state=random_state)

    if remove_correlated:
        # Compute correlation matrix
        print("Calculating correlation matrix...")
        corr_matrix = X.corr()
        upper_tri = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))

        # Identify correlated features (above threshold)
        correlated_features = [column for column in upper_tri.columns if any(upper_tri[column] > correlation_threshold)]

        # Drop correlated features
        X = X.drop(columns=correlated_features)
        print(f"Correlated features removed: {len(correlated_features)}")

    # Determine default subset sizes if not provided
    num_features = X.shape[1]
    if subsets is None:
        subsets = [num_features // 2, num_features // 4, num_features // 8, num_features // 16, num_features // 32, num_features // 64]
        subsets = [s for s in subsets if s > 0]  # Remove non-positive values

    n_features_options = sorted(list(set(subsets))) # Ensure unique and sorted subset sizes
    total_iterations = len(n_features_options) * cv

    print(f"\nStarting RFE with subsets of features: {n_features_options}")

    # Define remote function for parallel execution
    @ray.remote
    def evaluate_rfe_remote(n_features, fold, X_remote, y_remote):
        """Performs RFE feature selection and evaluates performance for a given fold."""
        pipe = make_pipeline(RFE(estimator=model, n_features_to_select=n_features, step=10))

        # We use the stratified K fold to split the data into training and validation sets
        skf = StratifiedKFold(n_splits=cv, shuffle=True, random_state=fold)
        train_index, test_index = list(skf.split(X_remote, y_remote))[fold]

        # train_index and test_index contain the index values for extracting training and testing data
        X_train = X_remote.iloc[train_index, :]
        X_test = X_remote.iloc[test_index, :]
        y_train = y_remote.iloc[train_index]
        y_test = y_remote.iloc[test_index]

        # Fit the model using the training data and then evaluate the score based on the testing data
        pipe.fit(X_train, y_train)
        score = pipe.score(X_test, y_test)

        support = pipe[0].support_ # Get the boolean mask of selected features

        return n_features, fold, score, support

    start_time = time.time()
    X_ray = ray.put(X)
    y_ray = ray.put(y)
    tasks = [evaluate_rfe_remote.remote(n_features, fold, X_ray, y_ray)
             for n_features in n_features_options for fold in range(cv)]

    results = []
    all_supports = {}
    with tqdm(total=total_iterations, desc='Parallel RFE + Cross-validation') as pbar:
        while tasks:
            done, tasks = ray.wait(tasks, num_returns=1)
            result = ray.get(done[0])
            n_features_res, fold_res, score_res, support_res = result
            results.append((n_features_res, score_res))  # (n_features, score)
            all_supports[(n_features_res, fold_res)] = support_res
            pbar.update(1)

    # Aggregate mean accuracy for each feature subset
    results_df = pd.DataFrame(results, columns=["n_features", "accuracy"])
    results_df = results_df.groupby("n_features").mean().reset_index()

    # Find best feature subset
    best_row = results_df.loc[results_df["accuracy"].idxmax()]
    best_n_features = int(best_row["n_features"])
    best_accuracy = best_row["accuracy"]

    end_time = time.time()
    elapsed_time = end_time - start_time

    best_params = {"rfe__n_features_to_select": best_n_features}

    ray.shutdown()

    return best_params, best_accuracy, results_df, elapsed_time, all_supports, X

X = marine_taxa_new[taxa]
y= marine_taxa_new['Sea']

best_parameters, best_score, all_results, time_taken, all_supports, return_data = parallel_rfe_feature_selection(
        X=X,
        y=y,
        n_jobs=-1,  # Use all available cores for RandomForest within each Ray task
        random_state=123,
        cv=5,
        subsets=[100, 200, 300, 500,1000],
        remove_correlated=True,
        correlation_threshold=0.95,
        num_cpus=50  # Limit Ray to 4 CPUs for this example
    )

print(f'\nBest params: {best_parameters}')
print(f'Best accuracy: {best_score:.6f}')
print(f'Mean accuracy for all tested feature subsets:\n{all_results}')
print(f'Total time taken: {time_taken:.2f} seconds')

# Get the support_ for the best performing number of features (across all folds)
best_n_features_from_params = best_parameters['rfe__n_features_to_select']
best_supports = {k: v for k, v in all_supports.items() if k[0] == best_n_features_from_params}

# Store the support arrays for the best models in this variable
support_for_best_models = best_supports

print(f"\nSupport arrays for the best number of features ({best_n_features_from_params}):")
for (n_features, fold), support in support_for_best_models.items():
    print(f"  n_features: {n_features}, Fold: {fold}, Support: {support}")

2025-04-04 15:56:35,931	INFO worker.py:1777 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m


Calculating correlation matrix...
Correlated features removed: 489

Starting RFE with subsets of features: [100, 200, 300, 500, 1000]


Parallel RFE + Cross-validation: 100%|██████████| 25/25 [02:05<00:00,  5.03s/it]



Best params: {'rfe__n_features_to_select': 500}
Best accuracy: 0.678917
Mean accuracy for all tested feature subsets:
   n_features  accuracy
0         100  0.664387
1         200  0.656125
2         300  0.610826
3         500  0.678917
4        1000  0.641026
Total time taken: 125.86 seconds

Support arrays for the best number of features (500):
  n_features: 500, Fold: 2, Support: [False False False ... False False False]
  n_features: 500, Fold: 3, Support: [False False False ... False False False]
  n_features: 500, Fold: 4, Support: [False False False ... False False False]
  n_features: 500, Fold: 1, Support: [False False False ... False False False]
  n_features: 500, Fold: 0, Support: [False False False ... False False False]


In [92]:
nn_data = pd.concat([marine_taxa_new[return_data.columns[support_for_best_models[(500,0)]]],marine_taxa_new[['latitude','longitude','Sea']]],axis=1)
nn_data.shape

(131, 503)