Skip to content

Commit

Permalink
Finish the AAclust().filter_coverage() method
Browse files Browse the repository at this point in the history
  • Loading branch information
breimanntools committed Jul 1, 2024
1 parent 4813a0c commit 2ca949c
Show file tree
Hide file tree
Showing 5 changed files with 541 additions and 34 deletions.
113 changes: 108 additions & 5 deletions aaanalysis/feature_engineering/_aaclust.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
This is a script for the frontend of the AAclust class, a clustering wrapper object to obtain redundancy-reduced
scale subsets.
"""
from typing import Optional, Dict, List, Tuple, Type
from typing import Optional, Dict, List, Tuple, Type, Literal
import numpy as np
from sklearn.cluster import KMeans
from sklearn.base import ClusterMixin
Expand All @@ -13,6 +13,8 @@
from aaanalysis.template_classes import Wrapper
import aaanalysis.utils as ut

from ._backend.check_feature import check_df_cat

from ._backend.check_aaclust import check_metric
from ._backend.aaclust.aaclust_fit import estimate_lower_bound_n_clusters, optimize_n_clusters, merge_clusters
from ._backend.aaclust.aaclust_eval import evaluate_clustering
Expand Down Expand Up @@ -72,6 +74,29 @@ def post_check_n_clusters(n_clusters_actual=None, n_clusters=None):
f"during AAclust algorithm.", ConvergenceWarning)


# Matching functions for filter_coverage
def check_match_X_scale_ids(X=None, scale_ids=None, accept_none=True):
"""Verify that the number of samples in 'X' matches the length of 'names'."""
if accept_none and scale_ids is None:
return
n_samples, n_features = X.shape
if n_samples != len(scale_ids):
raise ValueError(f"n_samples does not match for 'X' ({len(X)}) and 'scale_ids' ({len(scale_ids)}).")


def check_match_scale_ids_names(scale_ids=None, names=None, df_cat=None, col_name=None):
"""Check scale elements corresponding to scale_ids are superset of names"""
all_scale_names = df_cat[df_cat[ut.COL_SCALE_ID].isin(scale_ids)][col_name].to_list()
ut.check_superset_subset(name_superset=f"df_cat ('{col_name}')",
superset=all_scale_names,
name_subset="names",
subset=names)
ut.check_superset_subset(name_subset=f"df_cat ('{col_name}')",
subset=all_scale_names,
name_superset="names",
superset=names)


# II Main Functions
class AAclust(Wrapper):
"""
Expand Down Expand Up @@ -487,8 +512,8 @@ def comp_correlation(X: ut.ArrayLike2D,
labels: ut.ArrayLike1D = None,
X_ref: Optional[ut.ArrayLike2D] = None,
labels_ref: Optional[ut.ArrayLike1D] = None,
names : Optional[List[str]] = None,
names_ref : Optional[List[str]] = None
names: Optional[List[str]] = None,
names_ref: Optional[List[str]] = None
) -> Tuple[pd.DataFrame, ut.ArrayLike1D]:
"""
Computes the Pearson correlation of given data with reference data.
Expand Down Expand Up @@ -580,7 +605,85 @@ def comp_coverage(names: List[str] = None,
ut.check_superset_subset(subset=names, name_subset="names",
superset=names_ref, name_superset="names_ref")
# Compute coverage
n_unique_names = len(set(names))
n_unique_intersect = len(set(names).intersection(set(names_ref)))
n_unique_ref = len(set(names_ref))
coverage = round(n_unique_names/n_unique_ref*100, 2)
coverage = round(n_unique_intersect/n_unique_ref*100, 2)
return coverage

def filter_coverage(self,
X: ut.ArrayLike2D,
scale_ids: List[str] = None,
names_ref: List[str] = None,
min_coverage: int = 100,
df_cat: pd.DataFrame = None,
col_name: Literal['category', 'subcategory', 'scale_name'] = "subcategory"
) -> List[str]:
"""
Select a redundancy-reduced set of numerical scales with defined subcategory coverage.
This method reduces the number of numerical scales in the feature matrix `X` by clustering them.
It ensures that the selected clusters cover a minimum percentage (`min_coverage`) of unique subcategories
in `names_ref`.
The process involves clustering the scales in `X` and selecting one scale per cluster. The initial number of
clusters is determined by the number of unique subcategories in `names_ref`. The number of clusters is increased
step-wise until the overlap (coverage) between the unique elements in `names_ref` and the subcategories of
the selected scales meets or exceeds the defined threshold (`min_coverage`).
Parameters
----------
X : array-like, shape (n_scales, n_features)
Feature matrix. `Rows` correspond to scales and `columns` to amino acids.
scale_ids : list of str
List of scale IDs corresponding to the rows in ``X``.
names_ref : list of str
List of reference sample names ('subcategories') representing the desired subcategories for coverage.
Must contain the same unique elements as the unique subcategories associated with ``scale_ids``
min_coverage : int, default=100
Minimum coverage percentage of unique subcategories to be achieved by the selected clusters.
df_cat : pd.DataFrame, optional
DataFrame containing the categorical information for each scale. Should include columns ``scale_ids`` and
the specified ``col_name``. Requiered columns are 'scale_id', 'category', 'subcategory', and 'scale_name'.
col_name : {'category', 'subcategory', 'scale_name'}, default='subcategory'
Column name in ``df_cat`` that contains the subcategory information (alternatively, category or scale name).
Returns
-------
selected_scale_ids : list of str
List of selected scale ids that meet the minimum coverage criteria.
See Also
--------
* :meth:`AAclust.fit`: The clustering function used in every round for scale selection.
* :meth:`AAclust.comp_coverage`: The function used to compute the subcategory coverage.
Examples
--------
.. include:: examples/aaclust_filter_coverage.rst
"""
# Check input
X = ut.check_X(X=X, min_n_samples=2)
scale_ids = ut.check_list_like(name="scale_ids", val=scale_ids, accept_none=False)
names_ref = ut.check_list_like(name="names_ref", val=names_ref, accept_none=False)
ut.check_number_range(name="min_coverage", val=min_coverage, just_int=True,
min_val=10, max_val=100, accept_none=False)
check_df_cat(df_cat=df_cat, accept_none=False)
ut.check_str_options(name="col_name", val=col_name,
list_str_options=[ut.COL_CAT, ut.COL_SUBCAT, ut.COL_SCALE_NAME])
check_match_X_scale_ids(X=X, scale_ids=scale_ids, accept_none=False)
check_match_scale_ids_names(scale_ids=scale_ids, names=names_ref, df_cat=df_cat, col_name=col_name)
# Set number of unique names as number of initial clusters
n = len(set(names_ref))
n_samples = len(scale_ids)
coverage = 0
selected_scale_ids = []
while coverage < min_coverage:
selected_scale_ids = self.fit(X, names=scale_ids, n_clusters=n).medoid_names_
selected_names = df_cat[df_cat[ut.COL_SCALE_ID].isin(selected_scale_ids)][col_name].to_list()
coverage = self.comp_coverage(names=selected_names, names_ref=names_ref)
# Heuristic step size to improve filtering speed
step_size = int((min_coverage - coverage)/10) + 1
n += step_size
if n > n_samples:
n = n_samples
return selected_scale_ids
2 changes: 1 addition & 1 deletion aaanalysis/feature_engineering/_backend/check_feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ def check_df_cat(df_cat=None, accept_none=True):
"""Check if df_cat is valid"""
if df_cat is None and accept_none:
return # Skip check
cols_cat = [ut.COL_SCALE_ID, ut.COL_CAT, ut.COL_SUBCAT]
cols_cat = [ut.COL_SCALE_ID, ut.COL_CAT, ut.COL_SUBCAT, ut.COL_SCALE_NAME]
ut.check_df(name="df_cat", df=df_cat, cols_requiered=cols_cat, accept_none=accept_none)


Expand Down
30 changes: 2 additions & 28 deletions aaanalysis/feature_engineering/_numerical_feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,48 +20,22 @@ def check_match_df_scales_letter_new(df_scales=None, letter_new=None):
raise ValueError(f"Letter '{letter_new}' already exists in alphabet of 'df_scales': {alphabet}")


# TODO (see AAclust.comp_coverage, AAclust.comp_correlation),
# TODO filter_correlation)
# TODO add filter_coverage
"""
def get_select_scales(list_subcat_ref=None):
""""""
print("top")
df_scales = aa.load_scales()
df_cat = aa.load_scales(name="scales_cat")
df_cat = df_cat[df_cat["subcategory"].isin(list_subcat_ref)]

df_scales = df_scales[df_cat["scale_id"].to_list()]
aac = aa.AAclust()
n = len(list_subcat_ref)
coverage = 0
scales = []
while coverage != 100:
X = np.array(df_scales).T
scales = aac.fit(X, names=list(df_scales), n_clusters=n).medoid_names_
list_subcat = df_cat[df_cat["scale_id"].isin(scales)]["subcategory"].to_list()
coverage = aac.comp_coverage(names=list_subcat, names_ref=list_subcat_ref)
#print(len(list_subcat), coverage, [x for x in list_subcat_ref if x not in list_subcat])
n += 1
df_scales = df_scales[scales]
return df_scales
"""
# II Main Functions
class NumericalFeature:
"""
Utility feature engineering class to process and filter numerical data structures,
such as amino acid scales or a feature matrix.
"""

"""
@staticmethod
def comp_correlation():

"""
@staticmethod
def filter_correlation():
"""


@staticmethod
def extend_alphabet(df_scales: pd.DataFrame = None,
new_letter: str = None,
Expand Down
Loading

0 comments on commit 2ca949c

Please sign in to comment.