In [1]:
from ppktstore.registry import configure_phenopacket_registry
from ppktstore.model import DefaultPhenopacketStore
from ppktstore.release.stats import PPKtStoreStats
import phenopackets as ppkt
import typing

# Import cohort via phenopacket-store-toolkit

## Method 1

In [None]:
from ppkt2synergy import CohortManager
phenopackets = CohortManager.from_ppkt_store(cohort_name='FBN1', ppkt_store_version="0.1.23")
print(phenopackets[4])

In [None]:
print(f"Number of patients by FBN1: {len(phenopackets)}")

In [None]:
from collections import Counter
import matplotlib.pyplot as plt

def diseases_count_table(phenopackets:typing.List[ppkt.Phenopacket]) -> typing.Dict[str, int]:
    """ 
    Returns:
        A dictionary with key=disease label and value=number of phenopackets in the cohort with the disease
    """
    diseases= []
    for phenopacket in phenopackets:
        if len(phenopacket.diseases) == 0:
            raise ValueError("Empty disease list")
        if len(phenopacket.diseases) != 1:
            print("Warning, number of diseases ", len(phenopacket.diseases))
        #if len(phenopacket.diseases) == 1:
        diseases.append(phenopacket.diseases[0].term.label)
    return Counter(diseases)

diseases_count = diseases_count_table(phenopackets)

labels = list(diseases_count.keys())
counts = list(diseases_count.values())

# plot graph to show diseases and count
plt.bar(labels, counts)
plt.xlabel('Disease')
plt.ylabel('Count')
plt.title('Disease Count by FBN1')
plt.xticks(rotation=45, ha="right") 
for i, count in enumerate(counts):
    plt.text(i, count + 0.1, str(count), ha='center', va='bottom') 
plt.show()



In [None]:
def HPO_count_table(phenopackets: list) -> int:
    HPOs= list()
    for phenopacket in phenopackets:
        if len(phenopacket.phenotypic_features) == 0:
            raise ValueError("Empty disease list")
        for phenotype in phenopacket.phenotypic_features:
            HPOs.append(phenotype.type.label)
        distribution = Counter(HPOs)
    return len(set(HPOs)),distribution

HPOs_count, distribution = HPO_count_table(phenopackets)
print(f'Number of distinct HPOs by FBN1: {HPOs_count}')
print(distribution)

## Method 2

In [None]:
def filter_cohorts(
    store: DefaultPhenopacketStore,
    cohort_names: typing.List[str]
) -> DefaultPhenopacketStore:
    """
    Filter cohorts from a DefaultPhenopacketStore by specified cohort names.

    :param store: Original DefaultPhenopacketStore instance.
    :param cohort_names: List of cohort names to include in the new store.
    :return: A new DefaultPhenopacketStore containing only the specified cohorts.
    """
    # Filter the cohorts based on the provided list of cohort names
    filtered_cohorts = [
        cohort for cohort in store.cohorts() if cohort.name in cohort_names
    ]

    # Return a new DefaultPhenopacketStore with the filtered cohorts
    return DefaultPhenopacketStore(
        name=f"{store.name}_filtered",  
        path=store.path, 
        cohorts=filtered_cohorts,  # only the filtered cohorts
    )


registry = configure_phenopacket_registry()
with registry.open_phenopacket_store(release="0.1.22") as ps:
    phenopackets = filter_cohorts(
    store=ps,  
    cohort_names=["FBN1"]  # Build a new phenopacketstore with cohort FBN1
    )
    stats = PPKtStoreStats(phenopackets)
    df = stats.get_summary_df()
    print(df.head())
    print(f'Number of patients by FBN1: {df.shape[0]}')
    diseases_count = stats.get_counts_per_disease_df()
    print(f'Number of Diseases by FBN1: {diseases_count}')
    HPOs_count = stats._get_total_and_unique_hpo_counts(phenopackets)
    print(f'Number of distinct HPOs by FBN1: {HPOs_count}')


# CohortHPO Correlation Test

In [9]:
import scipy.stats
from typing import Union
import numpy as np
from sklearn.metrics import matthews_corrcoef

def get_status_for_terms(patient, hpo_id_A: str, hpo_id_B: str) -> Union[tuple, None]:
    """
    Checks the statuses (observed or excluded) of two HPO term IDs in a patient's phenotype data.

    Args:
        phenotype: A PhenotypicFeature object containing the patient's phenotype data.
        hpo_id_A: The first HPO term ID to check.
        hpo_id_B: The second HPO term ID to check.

    Returns:
        tuple: A tuple (status_A, status_B) where each status is:
            - 1 if the phenotype is observed (not excluded).
            - 0 if the phenotype is excluded.
            - None if the HPO term is not present in the phenotype.
    """
    status_A = status_B = None

    for phenotype_feature in patient.phenotypic_features:
        if phenotype_feature.type.id == hpo_id_A:
            status_A = 0 if phenotype_feature.excluded else 1
        elif phenotype_feature.type.id == hpo_id_B:
            status_B = 0 if phenotype_feature.excluded else 1
    
        # Early stop if both statuses are found
        if status_A is not None and status_B is not None:
            return status_A, status_B    

    return None


def calculate_mcc_and_p(observed_status_A:list, observed_status_B:list)->tuple:
    """
    Calculate the Matthews Correlation Coefficient (MCC) and its associated p-value using a contingency table and chi-square test.

    Args:
        observed_status_A: List of 0/1 values for variable A.
        observed_status_B: List of 0/1 values for variable B.

    Returns:
        tuple: (mcc, p_value)
    """
    contingency_table = np.array([
        [sum((np.array(observed_status_A) == 1) & (np.array(observed_status_B) == 1)),
         sum((np.array(observed_status_A) == 1) & (np.array(observed_status_B) == 0))],
        [sum((np.array(observed_status_A) == 0) & (np.array(observed_status_B) == 1)),
         sum((np.array(observed_status_A) == 0) & (np.array(observed_status_B) == 0))]
    ])
    chi2, p_value, _, _ = scipy.stats.chi2_contingency(contingency_table)
    mcc = matthews_corrcoef(observed_status_A, observed_status_B)
    return mcc, p_value


def test_hpo_terms(cohort: str, hpo_id_A: str, hpo_id_B: str):
    """
    Perform correlation tests (Spearman and MCC) between two HPO terms in a cohort of patients.

    Args:
        cohort: The name of the patient cohort to analyze.
        hpo_id_A: The first HPO term ID to correlate.
        hpo_id_B: The second HPO term ID to correlate.
        url: The URL of the HPO ontology in JSON format.

    Returns:
        dict: A dictionary containing correlation coefficients and p-values for each test.
        If the data is insufficient (< 30 samples), an error message is returned.
    """
   
    observed_status_A = []
    observed_status_B = []
    
    # Import cohort
    patients = import_cohort(cohort)
    for patient in patients:
        result = get_status_for_terms(patient, hpo_id_A, hpo_id_B)
        if result is not None:
            status_A, status_B = result
            observed_status_A.append(status_A)
            
            observed_status_B.append(status_B)

    # If the number of observed data points is less than 30, return an error message. 
    # This threshold ensures sufficient statistical power for reliable correlation tests.
    if len(observed_status_A) < 30:
        return {"error": "Not enough data for correlation tests"}

    # Spearman test
    spearman_corr, spearman_p = scipy.stats.spearmanr(observed_status_A, observed_status_B)

    # MCC
    mcc, mcc_p = calculate_mcc_and_p(observed_status_A, observed_status_B)

    return {
        "Spearman": {"correlation": spearman_corr, "p_value": spearman_p},
        "MCC": {"correlation": mcc, "p_value": mcc_p}
    }

In [2]:
import phenopackets as ppkt 
import hpotk
from ppkt2synergy import get_status_for_terms

phenopkt = ppkt.Phenopacket()
#
shortStatureOc = ppkt.OntologyClass(id="HP:0004322", label="Short stature")
microcephalyOc = ppkt.OntologyClass(id="HP:0000252", label="Microcephaly")
deepPhiltrumOc = ppkt.OntologyClass(id="HP:0002002", label="Deep philtrum")
highPalateOc = ppkt.OntologyClass(id="HP:0000218", label="High palate")
shortStaturePf = ppkt.PhenotypicFeature(type=shortStatureOc)
microcephalyPf = ppkt.PhenotypicFeature(type=microcephalyOc)

phenopkt.phenotypic_features.append(shortStaturePf)
phenopkt.phenotypic_features.append(microcephalyPf)
#phenopkt.phenotypic_features.append(deepPhiltrum)
#phenopkt.phenotypic_features.append(highPalate)


get_status_for_terms(phenopkt, shortStatureOc.id, microcephalyOc.id)


(1, 1)

In [3]:
None == None

True

In [None]:
import hpotk
import pandas as pd

def generate_hpo_term_status_matrix(cohort: str) -> pd.DataFrame:
    """
    Generate a binary matrix of HPO term statuses for all patients in a cohort.

    Args:
        cohort: The patient cohort, a collection of patient objects with phenotypic features.

    Returns:
        A pandas DataFrame:
            - Rows represent patients (indexed by their IDs).
            - Columns represent unique HPO IDs (sorted alphabetically).
            - Values are binary (1: observed, 0: excluded).
    """

    patients = import_cohort(cohort)
    hpo_ids = set()
    status_data = {}

    # Single pass to collect HPO IDs and patient statuses
    for patient in patients:
        status_data[patient.id] = {}
        for feature in patient.phenotypic_features:
            hpo_id = feature.type.id
            hpo_ids.add(hpo_id)
            status_data[patient.id][hpo_id] = 0 if feature.excluded else 1

    # Create a sorted list of unique HPO IDs
    hpo_ids = sorted(hpo_ids)

    # Convert the status_data dictionary to a DataFrame
    status_matrix = pd.DataFrame.from_dict(status_data, orient='index', columns=hpo_ids)

    return status_matrix



def find_unrelated_hpo_terms(
        hpo_id: str, 
        cohort:str, 
        url= 'https://github.com/obophenotype/human-phenotype-ontology/releases/download/v2023-10-09/hp.json') -> dict:
    """
    Check if a given HPO ID  exists in the filtered terms_status DataFrame, where columns (HPO terms) with more than 50% missing values are removed.
    If it exists, return unrelated terms (terms not ancestors, descendants, or the term itself).
    If the term is not found, notify the user.

    Args:
        term: The HPO ID to check.
        cohort: The patient cohort to be analyzed.
        url: The URL or path to load the HPO ontology in JSON format.

    Returns:
        A dictionary with 'status' indicating success or failure, and 'data' containing the unrelated terms
        or a failure message.
    """
    # Load the HPO ontology
    hpo = hpotk.load_minimal_ontology(url)
    terms_status = generate_hpo_term_status_matrix(cohort)
    
    # Remove columns with None values exceeding 50%
    terms_status = terms_status.loc[:, terms_status.isna().mean() <= 0.5]

    # Check if the term exists in the filtered terms_status
    if hpo_id not in terms_status.columns:
        return {
            "error": f"The HPO term '{hpo_id}' was not found. It might not be relevant to this cohort or was excluded because it appears in less than 50% of patients. Please check the term and its applicability."

        }

    # Get ancestors and descendants of the term
    ancestors = hpo.graph.get_ancestors(hpo_id)
    descendants = hpo.graph.get_descendants(hpo_id)

    # Get unrelated terms
    related_terms = set(ancestors) | set(descendants) | {hpo_id}
    unrelated_terms = [col for col in terms_status.columns if col not in related_terms]

    return unrelated_terms

In [None]:
result = find_unrelated_hpo_terms("HP:0001634",'FBN1')
print(result)

In [None]:
results = test_hpo_terms('FBN1', 'HP:0001634', 'HP:0001382')
print(results)
results = test_hpo_terms('FBN1', 'HP:0001634', 'HP:0000098')
print(results)
results = test_hpo_terms('FBN1', 'HP:0001634', 'HP:0000218')
print(results)