In [1]:
import os
import sys
import numpy as np
import networkx as nx
import itertools as it
import random as rd
import scipy.sparse as sp
import scipy.stats as st
import scipy.cluster.hierarchy as sch
import scipy.spatial.distance as dist
from sklearn.preprocessing import normalize
from scipy.cluster.hierarchy import fcluster
import math
import urllib
import gzip
import colorsys
import pickle as pk
import os.path
from collections import defaultdict, Counter, ChainMap
import umap
import time
# import statsmodels.sandbox.stats.multicomp as mc
from sklearn.preprocessing import normalize
import plotly.graph_objects as go
from plotly.offline import init_notebook_mode, iplot
import json

import matplotlib.pyplot as plt
# %matplotlib inline

  @numba.jit()
  @numba.jit()
  @numba.jit()
  @numba.jit()
2025-01-31 09:02:24.588920: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


## Functions

In [2]:

def rnd_walk_matrix(G, r, a):

    n = G.number_of_nodes()
    A = nx.adjacency_matrix(G,sorted(G.nodes()))

    factor = float((1-a)/n)

    E = np.multiply(factor,np.ones([n,n]))              # prepare 2nd scaling term
    A_tele = np.multiply(a,A) + E  

#     print(A_tele)
    M = normalize(A_tele, norm='l1', axis=0)                                 # column wise normalized MArkov matrix

    # mixture of Markov chains
    del A_tele
    del E

    U = np.identity(n,dtype=int) 
    H = (1-r)*M
    H1 = np.subtract(U,H)
    del U
    del M
    del H    

    W = r*np.linalg.inv(H1)   

    return W


def fisher_test(sig_level,sampleset,d_sample_attributes,d_attributes_sample,background):
    #####
    # IDENTIFY ALL TERMS TO BE TESTED IN SAMPLE
    #####
    # take only these neighboring genes that come with a disease annotation
    sample_overlap = set(sampleset) & set(d_sample_attributes.keys())
    # sample_overlap = sampleset
    
    l_terms = []
    for gene in sample_overlap:
        for x in d_sample_attributes[gene]:
            l_terms.append(x)
        
    # print('tests :%s ' %len(l_terms))
    # print('set of tests :%s ' %len(set(l_terms)))
    set_terms = set(l_terms)
    number_of_tests = len(set_terms)

    # make a dictionary of all GO terms with their p-values 
    d_term_p = {}

    for term in set_terms:
        attributeset = set(d_attributes_sample[term])

        ab = len(sample_overlap.intersection(attributeset))
        amb = len(sample_overlap.difference(attributeset))
        bma = len(attributeset.difference(sample_overlap))
#         backg = background - len(set(sample_overlap) | set(attributeset))
        backg = background - ab - amb - bma

        # pval = pvalue(ab , amb, bma,backg).right_tail
        oddsratio, pval = st.fisher_exact([[ab, amb], [bma, backg]],alternative='greater')
        d_term_p[term] = pval * number_of_tests
#         print(pval)
        # f.write('%s,%s,%s,%s,%.6f\n' %(ab,amb,bma,backg,pval))
    # f.close()
    return d_term_p 
	

##################################################################
##################################################################

def color_creator(n):

    colors = [colorsys.hsv_to_rgb(1.0/n*x,1,1) for x in range(n)]
    color_list = []
    for c in colors:
        cc = [int(y*255) for y in c]
        color_list.append('#%02x%02x%02x' % (cc[0],cc[1],cc[2]))
        
    return color_list



def make_different_hexcolors(N):
    N_colors = N

    # packing criterium
    d_min_c = 9./5*33554432/np.pi/N

    d_mindistance = d_min_c**(2./3.)
    # print(d_mindistance)

    # init_color = f"#{''.join(np.random.choice(list('0123456789abcdef'), size=6))}" 
    init_col = [rd.randint(0, 255),rd.randint(0, 255),rd.randint(0, 255)]
    # print(init_col)
    l_colors = []
    l_colors.append(init_col)
    # for n in range(1):
    while len(l_colors) < N_colors:
        rd_col = [rd.randint(0, 255),rd.randint(0, 255),rd.randint(0, 255)]
        # print(rd_col)
        l_dists = []
        for c in l_colors:
            d = (rd_col[0] - c[0])**2 + (rd_col[1] - c[1])**2 + (rd_col[2] - c[2])**2

            l_dists.append(d)
        if min(l_dists)>d_mindistance:
            l_colors.append(rd_col)

    l_hexcols = [rgb2hex(rgb[0],rgb[1],rgb[2]) for rgb in l_colors]
    return l_hexcols


def Ginfo(G):
    info = []
    info.append(f"Graph Name: {G.name}")
    info.append(f"Graph Type: {type(G).__name__}")
    info.append(f"Number of Nodes: {G.number_of_nodes()}")
    info.append(f"Number of Edges: {G.number_of_edges()}")
    # info.append(f"Density: {round(100. * nx.density(G), 3)} %")

    # if nx.is_weighted(G):
    #     info.append(f"Weighted: Yes")
        
    #     # Calculate the average degree considering edge weights for weighted graphs
    #     avg_weighted_degree = sum(weight for u, v, weight in G.edges(data=True)) / G.number_of_nodes()
    #     info.append(f"Average Weighted Degree: {round(avg_weighted_degree, 2)}")
    # else:
    #     info.append(f"Weighted: No")
        
    #     # Calculate the average degree as usual for unweighted graphs
    #     avg_degree = sum(dict(G.degree()).values()) / G.number_of_nodes()
    #     info.append(f"Average Degree: {round(avg_degree, 2)}")

    info.append(f"Connected: {'Yes' if nx.is_connected(G) else 'No'}")
    num_components = nx.number_connected_components(G)
    info.append(f"Number of Connected Components: {num_components}")

    info.append(f"Directed: {'Yes' if G.is_directed() else 'No'}")
    
    return "\n".join(info)

### ID mappings

In [3]:


def download_mapping_file(url, filename):
    urllib.request.urlretrieve(url, filename)

def parse_mapping_file(filename):
    mapping = {}
    with gzip.open(filename, 'rt') as file:
        for line in file:
            if line.startswith('#'):  # Skip header and comments
                continue
            fields = line.strip().split('\t')
            # print(fields)
            if fields[0] == '9606':
                gene_symbol = fields[2]
                entrez_id = fields[1]
                # print(entrez_id)
                # ensembl_id = fields[5]
                # Additional mapping field
                gene_name = fields[10]

                # try: 
                #     ent_int = int(entrez_id)
                #     gene_name = fields[10]
                # except:
                #     gene_name = entrez_id

                if gene_symbol not in mapping:
                    mapping[gene_symbol] = {
                        'entrez_id': entrez_id,
                        # 'ensembl_id': ensembl_id,
                        'gene_name': gene_name
                    }
    return mapping


def convert_symbols_to_entrez(gene_symbols, mapping):
    entrez_ids = []
    for symbol in gene_symbols:
        if symbol in mapping:
            entrez_ids.append(mapping[symbol]['entrez_id'])
    return entrez_ids

# def convert_symbols_to_ensembl(gene_symbols, mapping):
#     ensembl_ids = []
#     for symbol in gene_symbols:
#         if symbol in mapping:
#             ensembl_ids.append(mapping[symbol]['ensembl_id'])
#     return ensembl_ids

# Download Gene Info file (example using Homo sapiens)
filename = 'Homo_sapiens.gene_info.gz'
url = 'ftp://ftp.ncbi.nlm.nih.gov/gene/DATA/GENE_INFO/Mammalia/' + filename

# download_mapping_file(url, filename)

# Parse mapping file
mapping = parse_mapping_file(filename)

# Generate a dictionary of all existing gene symbols and their corresponding Entrez IDs and Ensemble IDs
all_gene_symbols = list(mapping.keys())
all_entrez_ids = [mapping[symbol]['entrez_id'] for symbol in all_gene_symbols]
# all_ensembl_ids = [mapping[symbol]['ensembl_id'] for symbol in all_gene_symbols]
symbol_to_entrez = dict(zip(all_gene_symbols, all_entrez_ids))
# symbol_to_ensembl = dict(zip(all_gene_symbols, all_ensembl_ids))
entrez_to_symbols = {v: k for k, v in symbol_to_entrez.items()}
# ensembl_to_symbols = {v: k for k, v in symbol_to_ensembl.items()}


## load data

In [4]:
path = '/Users/fmueller/work/NICE/GitNICE/Data/'


print('full PPI')
# PPI direct
G_ppi = nx.read_edgelist(path + 'PPIs/PPI_physical_elist.csv',data=False, delimiter=',')
print(Ginfo(G_ppi))
print('\nlcc PPI')

lcc = max(nx.connected_components(G_ppi), key=len)
G = nx.subgraph(G_ppi,lcc)

print(Ginfo(G))

full PPI
Graph Name: 
Graph Type: Graph
Number of Nodes: 18068
Number of Edges: 306914
Connected: No
Number of Connected Components: 2
Directed: No

lcc PPI
Graph Name: 
Graph Type: Graph
Number of Nodes: 18064
Number of Edges: 306911
Connected: Yes
Number of Connected Components: 1
Directed: No


### load annotation data 

In [8]:

## GO - Functions
# d_attributes_sample = pk.load( open(path + "GO/GOupF2genes.pkl", "rb" ) )
# d_sample_attributes = pk.load( open(path + "GO/gene2GOF_up.pkl", "rb" ) )

# GO - compionents
d_attributes_sample = pk.load( open(path + "GO/GOupC2genes.pkl", "rb" ) )
d_sample_attributes = pk.load( open(path + "GO/gene2GOC_up.pkl", "rb" ) )

d_go_names = pk.load( open(path + "GO/d_gonames.pkl", "rb" ) )



In [10]:
import requests

# Define the GO term for "protein complex" (GO:0043234)
GO_TERM = "GO:0043234"

# Use the Gene Ontology API to fetch children of the "protein complex" category
url = f"https://www.ebi.ac.uk/QuickGO/services/ontology/go/terms/{GO_TERM}/children"

# Send the request
response = requests.get(url, headers={"Accept": "application/json"})

list_GO_protein_complexes = []

# Parse and display results
if response.status_code == 200:
    data = response.json()
    children = data["results"][0]["children"]
    print("Protein Complex GO Terms:")
    for term in children:
        # print(f"{term['id']} - {term['name']}")
        list_GO_protein_complexes.append(term)
else:
    print("Error retrieving data")


Protein Complex GO Terms:


In [11]:
len(list_GO_protein_complexes)

281

In [25]:
n = 5
print(list_GO_protein_complexes[n]['id'])
print(list_GO_protein_complexes[n]['name'])
print(d_attributes_sample[list_GO_protein_complexes[n]['id']])

for gene in d_attributes_sample[list_GO_protein_complexes[n]['id']]:
    print(entrez_to_symbols[str(gene)])

GO:1990393
3M complex
{23363, 83987, 9820}
OBSL1
CCDC8
CUL7


In [27]:
import csv

def write_go_protein_data(list_GO_protein_complexes, d_attributes_sample, entrez_to_symbols, output_file="go_protein_complexes.csv"):
    """Writes GO protein complexes and their associated proteins to a CSV file."""
    
    with open(output_file, mode='w', newline='') as file:
        writer = csv.writer(file)
        
        # Write header
        writer.writerow(["GO ID", "GO Name", "Protein Symbols"])
        
        for n in range(len(list_GO_protein_complexes)):
            try:
                go_id = list_GO_protein_complexes[n]['id']
                go_name = list_GO_protein_complexes[n]['name']
                proteins = d_attributes_sample[go_id]
                
                if proteins:  # Ensure there are proteins
                    # Convert Entrez IDs to gene symbols
                    protein_symbols = [entrez_to_symbols.get(str(p), str(p)) for p in proteins]  # Keep original if not found
                    
                    # Write row
                    writer.writerow([go_id, go_name] + protein_symbols)
            except KeyError:
                pass  # Skip if GO ID is not found in d_attributes_sample

# Example usage
write_go_protein_data(list_GO_protein_complexes, d_attributes_sample, entrez_to_symbols)
