GENE ONTOLOGY

In [27]:
!pip install pronto



In [28]:
!wget http://purl.obolibrary.org/obo/go/go-basic.obo
import pronto

# Load the OBO file using the downloaded path
ontology = pronto.Ontology("go-basic.obo")

# Access terms
for term in ontology.terms():
    print(term.id, term.name)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
GO:1904333 positive regulation of error-prone translesion synthesis
GO:1904334 heme import across plasma membrane
GO:1904335 regulation of ductus arteriosus closure
GO:1904336 negative regulation of ductus arteriosus closure
GO:1904337 positive regulation of ductus arteriosus closure
GO:1904338 regulation of dopaminergic neuron differentiation
GO:1904339 negative regulation of dopaminergic neuron differentiation
GO:1904340 positive regulation of dopaminergic neuron differentiation
GO:1904341 regulation of colon smooth muscle contraction
GO:1904342 negative regulation of colon smooth muscle contraction
GO:1904343 positive regulation of colon smooth muscle contraction
GO:1904344 regulation of gastric mucosal blood circulation
GO:1904345 negative regulation of gastric mucosal blood circulation
GO:1904346 positive regulation of gastric mucosal blood circulation
GO:1904347 regulation of small intestine smooth muscle contractio

## .obo Parsing

In [29]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import json
spark = SparkSession.builder.appName("App").master("local").getOrCreate()

In [30]:
from google.colab import drive
drive.mount('/content/drive')

import pandas as pd

output_path = "/content/drive/MyDrive/obo_cleaned.csv"
df_go = pd.read_csv(output_path)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [31]:
oi = [len(x) > 0 for x in df_go[['name', 'is_a']]['is_a']]
df_go['empty'] = oi
new_obo = (df_go[df_go['empty'] == True])
new_obo = new_obo.dropna(subset=['namespace'])
new_obo.to_csv("obo_cleaned")

In [32]:
obo_df=df_go
obo_df[['name', 'namespace']]

Unnamed: 0,name,namespace
0,mitochondrion inheritance,biological_process
1,mitochondrial genome maintenance,biological_process
2,high-affinity zinc transmembrane transporter a...,molecular_function
3,low-affinity zinc ion transmembrane transporte...,molecular_function
4,"alpha-1,6-mannosyltransferase activity",molecular_function
...,...,...
40206,UDP-4-deoxy-4-formamido-beta-L-arabinopyranose...,biological_process
40207,UDP-4-deoxy-4-formamido-beta-L-arabinopyranose...,biological_process
40208,UDP-4-deoxy-4-formamido-beta-L-arabinopyranose...,biological_process
40209,kojic acid metabolic process,biological_process


## NER / Keyword Matching

In [33]:
file_path = "/content/drive/MyDrive/pubmed_final_dataset.parquet"
df = spark.read.parquet(file_path)
df.show(5)
df.printSchema()

+--------+--------------------+--------------------+----+--------------------+-----------------+--------------------+--------------------+--------------------+---------+--------------------+---------------+--------------------+-------------+
|    PMID|               Title|            Abstract|Year|             Journal|  PublicationType|        Author_Names|            Keywords|        MeshHeadings|Chemicals|          References|Reference_Count|             CitedBy|CitedBy_Count|
+--------+--------------------+--------------------+----+--------------------+-----------------+--------------------+--------------------+--------------------+---------+--------------------+---------------+--------------------+-------------+
|27803796|Identifying ELIXI...|The core mission ...|2016|       F1000Research|[Journal Article]|[Christine Durinx...|[Bioinformatics, ...|                NULL|       []|                  []|              0|[39433782, 379946...|           44|
|28232862|{"i":"Microplitis...|H

In [34]:
obo_file = obo_df[['name', 'namespace']]

In [35]:
from pyspark.sql.functions import col

df_selected = df.select(
    col("PMID").alias("PMID"), # Changed from col("MedlineCitation.PMID.#text")
    "Title", # Changed from "MedlineCitation.Article.ArticleTitle"
    "Abstract" # Changed from "MedlineCitation.Article.Abstract.AbstractText"
)
df_selected.show(5, truncate=True)

+--------+--------------------+--------------------+
|    PMID|               Title|            Abstract|
+--------+--------------------+--------------------+
|27803796|Identifying ELIXI...|The core mission ...|
|28232862|{"i":"Microplitis...|Herbivores emit v...|
|28299173|Systematic assess...|Selected gene mut...|
|28299189|Molecular signatu...|Anastasis (Greek ...|
|28343685|Evaluation of sou...|Retention index u...|
+--------+--------------------+--------------------+
only showing top 5 rows



In [36]:
df_selected.count()

24919

In [39]:
obo_file.to_csv("last_obo.csv")

In [40]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("GO_Matching").getOrCreate()
go_terms_df = spark.read.csv("last_obo.csv", header=True, inferSchema=True)
go_terms_df.show(5)

+---+--------------------+------------------+
|_c0|                name|         namespace|
+---+--------------------+------------------+
|  0|mitochondrion inh...|biological_process|
|  1|mitochondrial gen...|biological_process|
|  2|high-affinity zin...|molecular_function|
|  3|low-affinity zinc...|molecular_function|
|  4|alpha-1,6-mannosy...|molecular_function|
+---+--------------------+------------------+
only showing top 5 rows



In [41]:
def extract_plain_text(abstract):
    if isinstance(abstract, str):
        try:
            parsed = json.loads(abstract)
            def extract(obj):
                if isinstance(obj, dict):
                    return [v for val in obj.values() for v in extract(val)]
                elif isinstance(obj, list):
                    return [v for elem in obj for v in extract(elem)]
                elif isinstance(obj, str):
                    return [obj]
                else:
                    return []
            return ' '.join(extract(parsed)).lower()
        except:
            return abstract.lower()
    else:
        return str(abstract).lower()

extract_text_udf = udf(extract_plain_text, StringType())
pubmed_df = df_selected.withColumn("clean_abstract", extract_text_udf(df_selected["Abstract"]))
pubmed_df.show(2, truncate=False)

+--------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [42]:
from pyspark.sql.functions import broadcast, col, array_contains, lit, explode, split, lower

go_terms_list = [(row["name"].lower(), row["namespace"]) for row in go_terms_df.collect()]
broadcast_go_terms = spark.sparkContext.broadcast(go_terms_list)

In [43]:
from pyspark.sql.functions import expr, array, collect_list, struct

def find_matching_go_terms(abstract):
    matches = []
    for (term, namespace) in broadcast_go_terms.value:
        if term in abstract:
            matches.append((term, namespace))
    return matches

find_go_udf = udf(find_matching_go_terms, "array<struct<term:string, namespace:double>>")

result_df = pubmed_df.withColumn(
    "matched_go_terms",
    find_go_udf(col("clean_abstract"))
)
result_df.show(10, truncate=True)

+--------+--------------------+--------------------+--------------------+--------------------+
|    PMID|               Title|            Abstract|      clean_abstract|    matched_go_terms|
+--------+--------------------+--------------------+--------------------+--------------------+
|27803796|Identifying ELIXI...|The core mission ...|the core mission ...|                  []|
|28232862|{"i":"Microplitis...|Herbivores emit v...|herbivores emit v...|[{response to hos...|
|28299173|Systematic assess...|Selected gene mut...|selected gene mut...|[{gene expression...|
|28299189|Molecular signatu...|Anastasis (Greek ...|anastasis (greek ...|[{cell death, NUL...|
|28343685|Evaluation of sou...|Retention index u...|retention index u...|  [{behavior, NULL}]|
|28343686|Analysis of essen...|In the fields of ...|in the fields of ...|[{rent complex, N...|
|28343687|Multiple, simulta...|This paper detail...|this paper detail...|[{membrane, NULL}...|
|28343691|Improving the Qua...|PROBLEM: Rapid di..

In [46]:
go_terms_df.show()

+---+--------------------+------------------+
|_c0|                name|         namespace|
+---+--------------------+------------------+
|  0|mitochondrion inh...|biological_process|
|  1|mitochondrial gen...|biological_process|
|  2|high-affinity zin...|molecular_function|
|  3|low-affinity zinc...|molecular_function|
|  4|alpha-1,6-mannosy...|molecular_function|
|  5|heptaprenyl dipho...|molecular_function|
|  6| vacuole inheritance|biological_process|
|  7|single strand bre...|biological_process|
|  8|single-stranded D...|molecular_function|
|  9|phosphopyruvate h...|cellular_component|
| 10|    lactase activity|molecular_function|
| 11|alpha-glucoside t...|biological_process|
| 12|regulation of DNA...|biological_process|
| 13|regulation of mit...|biological_process|
| 14|mitotic spindle e...|biological_process|
| 15|maltose metabolic...|biological_process|
| 16|maltose biosynthe...|biological_process|
| 17|maltose catabolic...|biological_process|
| 18|alpha-1,2-mannosy...|molecula

In [55]:
from pyspark.sql.functions import when

go_terms_df = go_terms_df.withColumn(
    "namespace_code",
    when(col("namespace") == "biological_process", 0)
    .when(col("namespace") == "molecular_function", 1)
    .when(col("namespace") == "cellular_component", 2)
    .otherwise(None)
)


In [56]:
go_terms_df.show()

+---+--------------------+------------------+--------------+
|_c0|                name|         namespace|namespace_code|
+---+--------------------+------------------+--------------+
|  0|mitochondrion inh...|biological_process|             0|
|  1|mitochondrial gen...|biological_process|             0|
|  2|high-affinity zin...|molecular_function|             1|
|  3|low-affinity zinc...|molecular_function|             1|
|  4|alpha-1,6-mannosy...|molecular_function|             1|
|  5|heptaprenyl dipho...|molecular_function|             1|
|  6| vacuole inheritance|biological_process|             0|
|  7|single strand bre...|biological_process|             0|
|  8|single-stranded D...|molecular_function|             1|
|  9|phosphopyruvate h...|cellular_component|             2|
| 10|    lactase activity|molecular_function|             1|
| 11|alpha-glucoside t...|biological_process|             0|
| 12|regulation of DNA...|biological_process|             0|
| 13|regulation of mit..

In [47]:
go_terms_df.groupBy("namespace").count().show()

+------------------+-----+
|         namespace|count|
+------------------+-----+
|cellular_component| 4022|
|biological_process|26036|
|molecular_function|10153|
+------------------+-----+



In [53]:
# from pyspark.sql.functions import expr

# namespace_counts = result_df.withColumn(
#     "BP_count", expr("size(filter(matched_go_terms, x -> x.namespace = 'biological_process'))")
# ).withColumn(
#     "MF_count", expr("size(filter(matched_go_terms, x -> x.namespace = 'molecular_function'))")
# ).withColumn(
#     "CC_count", expr("size(filter(matched_go_terms, x -> x.namespace = 'cellular_component'))")
# )


In [59]:
go_term_tuples = go_terms_df.select("name", "namespace_code").rdd.map(tuple).collect()
broadcast_go_terms = spark.sparkContext.broadcast(go_term_tuples)

In [60]:
def find_matching_go_terms(abstract):
    matches = []
    for (term, code) in broadcast_go_terms.value:
        if term in abstract:
            matches.append((term, code))
    return matches


In [67]:
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import udf

go_struct_type = ArrayType(StructType([
    StructField("term", StringType()),
    StructField("namespace", IntegerType())
]))

find_go_udf = udf(find_matching_go_terms, go_struct_type)


In [68]:
result_df = pubmed_df.withColumn(
    "matched_go_terms",
    find_go_udf(col("clean_abstract"))
)


In [69]:
from pyspark.sql.functions import expr

namespace_counts = result_df.withColumn(
    "BP_count", expr("size(filter(matched_go_terms, x -> x.namespace = 0))")
).withColumn(
    "MF_count", expr("size(filter(matched_go_terms, x -> x.namespace = 1))")
).withColumn(
    "CC_count", expr("size(filter(matched_go_terms, x -> x.namespace = 2))")
)


In [70]:
namespace_counts.select(
    col("PMID"),
    "Title",
    "BP_count",
    "MF_count",
    "CC_count",
    "matched_go_terms"
).show()

+--------+--------------------+--------+--------+--------+--------------------+
|    PMID|               Title|BP_count|MF_count|CC_count|    matched_go_terms|
+--------+--------------------+--------+--------+--------+--------------------+
|27803796|Identifying ELIXI...|       0|       0|       0|                  []|
|28232862|{"i":"Microplitis...|       1|       0|       0|[{response to hos...|
|28299173|Systematic assess...|       2|       0|       0|[{gene expression...|
|28299189|Molecular signatu...|       5|       0|       0|[{cell death, 0},...|
|28343685|Evaluation of sou...|       1|       0|       0|     [{behavior, 0}]|
|28343686|Analysis of essen...|       0|       0|       0|                  []|
|28343687|Multiple, simulta...|       0|       2|       1|[{membrane, 2}, {...|
|28343691|Improving the Qua...|       0|       0|       0|                  []|
|28343692|Quality survey of...|       1|       0|       0|    [{transport, 0}]|
|28343693|Utility of radiat...|       0|

In [71]:
namespace_counts.show()

+--------+--------------------+--------------------+--------------------+--------------------+--------+--------+--------+
|    PMID|               Title|            Abstract|      clean_abstract|    matched_go_terms|BP_count|MF_count|CC_count|
+--------+--------------------+--------------------+--------------------+--------------------+--------+--------+--------+
|27803796|Identifying ELIXI...|The core mission ...|the core mission ...|                  []|       0|       0|       0|
|28232862|{"i":"Microplitis...|Herbivores emit v...|herbivores emit v...|[{response to hos...|       1|       0|       0|
|28299173|Systematic assess...|Selected gene mut...|selected gene mut...|[{gene expression...|       2|       0|       0|
|28299189|Molecular signatu...|Anastasis (Greek ...|anastasis (greek ...|[{cell death, 0},...|       5|       0|       0|
|28343685|Evaluation of sou...|Retention index u...|retention index u...|     [{behavior, 0}]|       1|       0|       0|
|28343686|Analysis of es

In [72]:
from pyspark.sql.functions import col


namespace_counts = namespace_counts.select(
    col("PMID"),
    col("BP_count"),
    col("MF_count"),
    col("CC_count"),
    col("matched_go_terms")
)

In [73]:
namespace_counts.printSchema()
namespace_counts.show(5, truncate=False)
namespace_counts.count()


root
 |-- PMID: string (nullable = true)
 |-- BP_count: integer (nullable = false)
 |-- MF_count: integer (nullable = false)
 |-- CC_count: integer (nullable = false)
 |-- matched_go_terms: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- term: string (nullable = true)
 |    |    |-- namespace: integer (nullable = true)

+--------+--------+--------+--------+------------------------------------------------------------------------------------------------+
|PMID    |BP_count|MF_count|CC_count|matched_go_terms                                                                                |
+--------+--------+--------+--------+------------------------------------------------------------------------------------------------+
|27803796|0       |0       |0       |[]                                                                                              |
|28232862|1       |0       |0       |[{response to host, 0}]                                       

24919

In [74]:
namespace_counts.select("matched_go_terms").show(truncate=False)


+------------------------------------------------------------------------------------------------+
|matched_go_terms                                                                                |
+------------------------------------------------------------------------------------------------+
|[]                                                                                              |
|[{response to host, 0}]                                                                         |
|[{gene expression, 0}, {learning, 0}]                                                           |
|[{cell death, 0}, {angiogenesis, 0}, {gene expression, 0}, {cell migration, 0}, {cell cycle, 0}]|
|[{behavior, 0}]                                                                                 |
|[]                                                                                              |
|[{membrane, 2}, {binding, 1}, {ion binding, 1}]                                                 |
|[]       

In [76]:
from pyspark.sql.functions import size

result_df = result_df.withColumn("num_matched", size("matched_go_terms"))

matched_count = result_df.filter("num_matched > 0").count()

total_count = result_df.count()

percentage = (matched_count / total_count) * 100

print(f"{matched_count}/{total_count} articles matched at least one GO term ({percentage:.2f}%).")


10969/24919 articles matched at least one GO term (44.02%).


In [None]:
# from pyspark.sql.functions import size

# result_df = result_df.withColumn("num_matched", size("matched_go_terms")).cache()

# counts = result_df.selectExpr(
#     "count(*) as total",
#     "sum(CASE WHEN num_matched > 0 THEN 1 ELSE 0 END) as matched"
# ).collect()[0]

# matched_count = counts['matched']
# total_count = counts['total']
# percentage = (matched_count / total_count) * 100

# print(f"{matched_count}/{total_count} articles matched at least one GO term ({percentage:.2f}%).")


In [77]:
from pyspark.sql.functions import explode, col

flattened = result_df.withColumn("go_term", explode("matched_go_terms"))

term_freq = flattened.groupBy("go_term.term").count().orderBy(col("count").desc())

term_freq.show(10)


+---------------+-----+
|           term|count|
+---------------+-----+
|         growth| 1650|
|       behavior| 1360|
|        binding|  981|
|      signaling|  927|
|       membrane|  844|
|      transport|  637|
|gene expression|  559|
|          aster|  546|
|      cognition|  494|
|       learning|  440|
+---------------+-----+
only showing top 10 rows



In [78]:
term_freq.toPandas().to_csv("go_term_frequencies.csv", index=False)
