In [14]:
GMT_PATH = "./data/c5.go.bp.v7.5.1.entrez.gmt"
from src.cluster_description import Cluster, ClustersDescription

def parse_gmt(path : str) -> dict[tuple[str, str]]:    
    # Create a dictionary to store the results
    results = {}
    
    # Open the GMT file
    with open(path, "r") as f:
        # For each line in the file
        for line in f:
            # Split the line into a list of words
            words = line.split("\t")

            # The motif name
            gene_set = words[0]
            link = words[1]
            genes = words[2:]
            # strip each gene
            genes = [gene.strip() for gene in genes]

            # Add the gene set to the dictionary
            results[gene_set] = [link, genes]
    
    # Return the results
    return results

In [49]:
clusters = ClustersDescription("../data/clusters/HMEC/chr16_spec_res.json", "chr16")
cluster = clusters["500kb_16_120_56500000_75000000"]

In [50]:
import pyranges as pr

In [51]:
features = pr.read_bed("../data/features/HMEC/CAGE/features.bed")
overlaps = cluster.find_overlaps("chr16", features)

In [53]:
FANTOM_PATH = "./data/FANTOM5_CAGE_peak_entrez_gene_tbl.tsv"

# read the table from the path
def parse_fantom_data(path : str) -> dict[str, list[str]]:
    # Create a dictionary to store the results
    results : dict[str, list[str]] = {}

    header_found = False

    # Open the file
    with open(path, "r") as f:
        # For each line in the file
        for line in f:
            # Go to the next line if it starts with a #
            if line.startswith("#"): continue

            # deal with the header
            if not header_found:
                header_found = True
                continue

            # Split the line into a list of words
            words = line.split("\t")
            cage_id = words[0].strip()
            entrez_id = words[1].strip()

            if entrez_id not in results:
                results[cage_id] = []
            
            results[cage_id].append(entrez_id)

    # Return the results
    return results

In [52]:
cage_to_entrez = parse_fantom_data(FANTOM_PATH)

In [55]:
# The names of the overlapping ranges
names = overlaps.Name.values.flatten()

# The entrez ids of the overlapping ranges
entrez_ids = [cage_to_entrez[name][0] for name in names if name in cage_to_entrez]

# Remove the NAs
entrez_ids = [entrez_id for entrez_id in entrez_ids if entrez_id != "NA"]

In [57]:
gmt = parse_gmt(GMT_PATH)

In [58]:
def get_ontology_group(entrez_id : str, gmt : dict[str, list[str]]) -> str:
    for gene_set, (link, genes) in gmt.items():
        if entrez_id in genes:
            return gene_set
    return None

sets = []
for entrez_id in entrez_ids:
    group = get_ontology_group(entrez_id[0], gmt)
    if group is not None:
        sets.append(group)

In [60]:
def histogram(sets : list[str]) -> dict[str, int]:
    results = {}
    for set_ in sets:
        if set_ not in results:
            results[set_] = 0
        results[set_] += 1
    return results

In [61]:
histogram(sets)

{'GOBP_XENOBIOTIC_METABOLIC_PROCESS': 37, 'GOBP_REPRODUCTION': 35}

In [64]:
class GeneSetEnrichment:
    gmt : dict[str, list[str]]
    cage_to_entrez : dict[str, list[str]]
    features : pr.PyRanges

    def __init__(self, gmt_path : str, cage_to_entrez_path : str, features_path : str):
        self.gmt = parse_gmt(gmt_path)
        self.cage_to_entrez = parse_fantom_data(cage_to_entrez_path)
        self.features = pr.read_bed(features_path)

    def get_gene_set(self, cluster : Cluster, chromosome : str) -> str:
        # The features overlapping with this cluster
        overlaps = cluster.find_overlaps(chromosome, self.features)

        # The names of the overlapping features
        names = overlaps.Name.values.flatten()

        # The entrez ids of the overlapping features
        entrez_ids = [self.cage_to_entrez[name][0] for name in names if name in self.cage_to_entrez]

        # Remove the NAs
        entrez_ids = [entrez_id for entrez_id in entrez_ids if entrez_id != "NA"]

        # The ontology groups of the overlapping genes
        sets = []
        for entrez_id in entrez_ids:
            group = get_ontology_group(entrez_id[0], self.gmt)
            if group is not None:
                sets.append(group)
        
        # Return the histogram of the ontology groups
        return histogram(sets)


In [67]:
geneset_enrichment = GeneSetEnrichment(GMT_PATH, FANTOM_PATH, "../data/features/HMEC/CAGE/features.bed")
chromo = "chr16"
clusters = ClustersDescription(f"../data/clusters/HMEC/{chromo}_spec_res.json", chromo)
cluster = clusters["500kb_35_595_56500000_90000000"]
geneset_enrichment.get_gene_set(cluster, chromo)

{'GOBP_XENOBIOTIC_METABOLIC_PROCESS': 59, 'GOBP_REPRODUCTION': 77}