In [1]:
from pyspark.sql import Window
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql import SparkSession

from pyspark.sql import Window
from pyspark.sql.functions import row_number, desc, col

from pyspark.sql.functions import explode
import sys
from gentropy.common.session import Session
from pyspark.sql import functions as f

from pyspark import SparkConf
from pyspark.sql import SparkSession
app_name = "example_app"
CREDENTIALS = "/Users/xg1/.config/gcloud/service_account_credentials.json" 


GCS_CONNECTOR_CONF = {
    "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
    "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar",
    "spark.hadoop.google.cloud.auth.service.account.enable": "true",
    "spark.hadoop.google.cloud.auth.service.account.json.keyfile": CREDENTIALS,
}

extended_spark_conf = {
    "spark.driver.memory": "12g",
    "spark.kryoserializer.buffer.max": "500m",
    "spark.driver.maxResultSize": "3g",
}

# Combine both configurations
combined_conf = {**GCS_CONNECTOR_CONF, **extended_spark_conf}

# Initialize SparkConf with the combined configuration
spark_config = SparkConf().setAll(combined_conf.items())

session = Session(
    spark_uri="local[*]",  # Use appropriate URI if needed
    app_name=app_name,
    extended_spark_conf=combined_conf
)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/11/27 12:29:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
from gentropy.common.spark_helpers import calculate_harmonic_sum
from gentropy.dataset.l2g_prediction import L2GPrediction
from gentropy.dataset.study_index import StudyIndex
from gentropy.dataset.study_locus import StudyLocus

l2g_ver_path="gs://ot_orchestration/benchmarks/l2g/fm0/gs_v5.1"
#credible_set_path="gs://ot_orchestration/releases/24.10_freeze6/credible_set"
# study_index_path="gs://ot_orchestration/releases/24.10_freeze6/study_index"

credible_set_path="gs://open-targets-pre-data-releases/24.12-uo_test/output/genetics/parquet/credible_set"
study_index_path="gs://open-targets-pre-data-releases/24.12-uo_test/output/genetics/parquet/study_index"
l2g_predictions_path="gs://open-targets-pre-data-releases/24.12-uo_test/output/genetics/parquet/l2g_predictions"

disease_index_path="gs://open-targets-pre-data-releases/24.12-uo_test/output/disease/disease.parquet"
output_dir="gs://genetics-portal-dev-analysis/xg1/L2G_evidence_associations/24.12-uo_test"

def Generate_evidence_associations(
            session,
            l2g_predictions_path,
            credible_set_path,
            study_index_path,
            disease_index_path,
            output_dir
        ):
    # Reading the predictions
    locus_to_gene_prediction = L2GPrediction.from_parquet(
            session, l2g_predictions_path
        )
        # Reading the credible set
    credible_sets = StudyLocus.from_parquet(session, credible_set_path)

        # Reading the study index
    study_index = StudyIndex.from_parquet(session, study_index_path)
    evidence_output_path = f"{output_dir}/evidence"
        # Generate evidence and save file:
    (
        locus_to_gene_prediction.to_disease_target_evidence(
            credible_sets, study_index, 0.05
        )
        .write.mode(session.write_mode)
        .option("compression", "gzip")
        .json(evidence_output_path)
    )

    disease_index = session.spark.read.parquet(disease_index_path).select(
        f.col("id").alias("diseaseId"),
        f.explode("ancestors").alias("ancestorDiseaseId"),
    )

    # Read in the L2G evidence
    disease_target_evidence = session.spark.read.json(evidence_output_path).select(
        f.col("targetFromSourceId").alias("targetId"),
        f.col("diseaseFromSourceMappedId").alias("diseaseId"),
        f.col("resourceScore"),
    )
    
    direct_associations_output_path = f"{output_dir}/direct_associations"
        # Generate direct assocations and save file
    (
        disease_target_evidence.groupBy("targetId", "diseaseId")
        .agg(f.collect_set("resourceScore").alias("scores"))
        .select(
            "targetId",
            "diseaseId",
            calculate_harmonic_sum(f.col("scores")).alias("harmonicSum"),
        )
        .write.mode(session.write_mode)
        .parquet(direct_associations_output_path)
    )

    # Generate indirect assocations and save file
    indirect_associations_output_path = f"{output_dir}/indirect_associations"
    (
        disease_target_evidence.join(disease_index, on="diseaseId", how="inner")
        .groupBy("targetId", "ancestorDiseaseId")
        .agg(f.collect_set("resourceScore").alias("scores"))
        .select(
            "targetId",
            "ancestorDiseaseId",
            calculate_harmonic_sum(f.col("scores")).alias("harmonicSum"),
        )
        .write.mode(session.write_mode)
        .parquet(indirect_associations_output_path)
    )



IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html



In [2]:
session.spark.read.parquet("gs://open-targets-pre-data-releases/24.12-uo_test/output/genetics/parquet/l2g_predictions").show()

[Stage 1:>                                                          (0 + 1) / 1]

+--------------------+---------------+--------------------+--------------------+
|        studyLocusId|         geneId|               score| locusToGeneFeatures|
+--------------------+---------------+--------------------+--------------------+
|0000570b33d89bb53...|ENSG00000134243|  0.8985205726375871|{credibleSetConfi...|
|000a701e88f9375ee...|ENSG00000084674|   0.931442044355483|{credibleSetConfi...|
|000e360ebd61a35e2...|ENSG00000116785| 0.14805195222596013|{credibleSetConfi...|
|000feb7a19759342e...|ENSG00000103653|0.055619774395107555|{credibleSetConfi...|
|00103a2c9375268ff...|ENSG00000124920| 0.06433315643528001|{credibleSetConfi...|
|0010ef0df4401e61e...|ENSG00000175164|  0.2308400042960181|{credibleSetConfi...|
|0013403cb7ae471dc...|ENSG00000178665| 0.07753057278471888|{credibleSetConfi...|
|00144236fa6b2f12e...|ENSG00000136816|  0.7977569007179145|{credibleSetConfi...|
|001bde9f8ca20c628...|ENSG00000021826|  0.9640945592599031|{credibleSetConfi...|
|0020b6d756777ce05...|ENSG00

                                                                                

In [5]:
Generate_evidence_associations(
    session=session,  # Pass your Spark session object
    l2g_predictions_path="gs://open-targets-pre-data-releases/24.12-uo_test/output/genetics/parquet/l2g_predictions",
    credible_set_path="gs://open-targets-pre-data-releases/24.12-uo_test/output/genetics/parquet/credible_set",
    study_index_path="gs://open-targets-pre-data-releases/24.12-uo_test/output/genetics/parquet/study_index",
    disease_index_path="gs://open-targets-pre-data-releases/24.12-uo_test/output/disease/disease.parquet",
    output_dir="gs://genetics-portal-dev-analysis/xg1/L2G_evidence_associations/24.12-uo_test"
)

24/11/26 12:02:52 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: gs://open-targets-pre-data-releases/24.12-uo_test/output/genetics/parquet/l2g_predictions.
java.io.IOException: Error accessing gs://open-targets-pre-data-releases/24.12-uo_test/output/genetics/parquet/l2g_predictions. reason=403 Forbidden
GET https://storage.googleapis.com/storage/v1/b/open-targets-pre-data-releases/o/24.12-uo_test%2Foutput%2Fgenetics%2Fparquet%2Fl2g_predictions?fields=bucket,name,timeCreated,updated,generation,metageneration,size,contentType,contentEncoding,md5Hash,crc32c,metadata
{
  "code" : 403,
  "errors" : [ {
    "domain" : "global",
    "message" : "open-targets-genetics-dev@appspot.gserviceaccount.com does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist).",
    "reason" : "forbidden"
  } ],
  "message" : "open-targets-genetics-dev@appspot.gs

Py4JJavaError: An error occurred while calling o74.load.
: java.io.IOException: Error accessing gs://open-targets-pre-data-releases/24.12-uo_test/output/genetics/parquet/l2g_predictions. reason=403 Forbidden
GET https://storage.googleapis.com/storage/v1/b/open-targets-pre-data-releases/o/24.12-uo_test%2Foutput%2Fgenetics%2Fparquet%2Fl2g_predictions?fields=bucket,name,timeCreated,updated,generation,metageneration,size,contentType,contentEncoding,md5Hash,crc32c,metadata
{
  "code" : 403,
  "errors" : [ {
    "domain" : "global",
    "message" : "open-targets-genetics-dev@appspot.gserviceaccount.com does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist).",
    "reason" : "forbidden"
  } ],
  "message" : "open-targets-genetics-dev@appspot.gserviceaccount.com does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist)."
}
	at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getObject(GoogleCloudStorageImpl.java:2336)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getItemInfo(GoogleCloudStorageImpl.java:2225)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.getFileInfoInternal(GoogleCloudStorageFileSystem.java:1243)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.getFileInfo(GoogleCloudStorageFileSystem.java:1217)
	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.lambda$getFileStatus$10(GoogleHadoopFileSystemBase.java:1081)
	at com.google.cloud.hadoop.fs.gcs.GhfsStorageStatistics.trackDuration(GhfsStorageStatistics.java:104)
	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getFileStatus(GoogleHadoopFileSystemBase.java:1070)
	at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1760)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:784)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:782)
	at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372)
	at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
	at scala.util.Success.$anonfun$map$1(Try.scala:255)
	at scala.util.Success.map(Try.scala:213)
	at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1434)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:295)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1016)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1665)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1598)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException: 403 Forbidden
GET https://storage.googleapis.com/storage/v1/b/open-targets-pre-data-releases/o/24.12-uo_test%2Foutput%2Fgenetics%2Fparquet%2Fl2g_predictions?fields=bucket,name,timeCreated,updated,generation,metageneration,size,contentType,contentEncoding,md5Hash,crc32c,metadata
{
  "code" : 403,
  "errors" : [ {
    "domain" : "global",
    "message" : "open-targets-genetics-dev@appspot.gserviceaccount.com does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist).",
    "reason" : "forbidden"
  } ],
  "message" : "open-targets-genetics-dev@appspot.gserviceaccount.com does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist)."
}
	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:118)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:37)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:439)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1111)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:525)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:466)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:576)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getObject(GoogleCloudStorageImpl.java:2328)
	... 23 more


In [None]:
session.spark.read.parquet("gs://open-targets-pre-data-releases/24.12-uo_test/output/genetics/parquet/l2g_predictions").count()

                                                                                

933703

24/11/27 18:07:02 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 2875522 ms exceeds timeout 120000 ms
24/11/27 18:07:02 WARN SparkContext: Killing executors is not supported by current scheduler.
24/11/27 18:07:02 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:103)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:87)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:643)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1057)
	at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:238)
	at s

In [None]:
def generate_evidence_associations_for_folders(session, folders, credible_set_path, study_index_path):
    for folder in folders:
        print(f"Processing folder: {folder}")
        Generate_evidence_associations(
            session,
            l2g_ver_path=folder,
            credible_set_path=credible_set_path,
            study_index_path=study_index_path
        )
        print(f"Completed processing for folder: {folder}")

# List of folders to iterate through
l2g_fm0_folders = [
    "gs://ot_orchestration/benchmarks/l2g/fm0/gs_v1",
    "gs://ot_orchestration/benchmarks/l2g/fm0/gs_v4",
    "gs://ot_orchestration/benchmarks/l2g/fm0/gs_v5.1",
    "gs://ot_orchestration/benchmarks/l2g/fm0/gs_v5",
    "gs://ot_orchestration/benchmarks/l2g/fm0/gs_v6",
    "gs://ot_orchestration/benchmarks/l2g/fm0/gs_v7",
    "gs://ot_orchestration/benchmarks/l2g/fm0/gs_v8",
]

# Example usage
generate_evidence_associations_for_folders(
    session=session,
    folders=l2g_fm0_folders,
    credible_set_path="gs://ot_orchestration/releases/24.10_freeze6/credible_set",
    study_index_path="gs://ot_orchestration/releases/24.10_freeze6/study_index"
)

In [7]:
direct_associations_path="gs://ot_orchestration/benchmarks/l2g/fm0/gs_v5.1/indirect_associations"
session.spark.read.parquet(direct_associations_path).printSchema()

root
 |-- targetId: string (nullable = true)
 |-- ancestorDiseaseId: string (nullable = true)
 |-- harmonicSum: double (nullable = true)



In [4]:
session.spark.read.parquet("gs://ot-team/irene/l2g/validation/chembl_w_flags").show()

24/11/27 13:23:16 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: gs://ot-team/irene/l2g/validation/chembl_w_flags.
java.io.IOException: Error accessing gs://ot-team/irene/l2g/validation/chembl_w_flags. reason=403 Forbidden
GET https://storage.googleapis.com/storage/v1/b/ot-team/o/irene%2Fl2g%2Fvalidation%2Fchembl_w_flags?fields=bucket,name,timeCreated,updated,generation,metageneration,size,contentType,contentEncoding,md5Hash,crc32c,metadata
{
  "code" : 403,
  "errors" : [ {
    "domain" : "global",
    "message" : "open-targets-genetics-dev@appspot.gserviceaccount.com does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist).",
    "reason" : "forbidden"
  } ],
  "message" : "open-targets-genetics-dev@appspot.gserviceaccount.com does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.ge

Py4JJavaError: An error occurred while calling o60.parquet.
: java.io.IOException: Error accessing gs://ot-team/irene/l2g/validation/chembl_w_flags. reason=403 Forbidden
GET https://storage.googleapis.com/storage/v1/b/ot-team/o/irene%2Fl2g%2Fvalidation%2Fchembl_w_flags?fields=bucket,name,timeCreated,updated,generation,metageneration,size,contentType,contentEncoding,md5Hash,crc32c,metadata
{
  "code" : 403,
  "errors" : [ {
    "domain" : "global",
    "message" : "open-targets-genetics-dev@appspot.gserviceaccount.com does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist).",
    "reason" : "forbidden"
  } ],
  "message" : "open-targets-genetics-dev@appspot.gserviceaccount.com does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist)."
}
	at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getObject(GoogleCloudStorageImpl.java:2336)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getItemInfo(GoogleCloudStorageImpl.java:2225)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.getFileInfoInternal(GoogleCloudStorageFileSystem.java:1243)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.getFileInfo(GoogleCloudStorageFileSystem.java:1217)
	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.lambda$getFileStatus$10(GoogleHadoopFileSystemBase.java:1081)
	at com.google.cloud.hadoop.fs.gcs.GhfsStorageStatistics.trackDuration(GhfsStorageStatistics.java:104)
	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getFileStatus(GoogleHadoopFileSystemBase.java:1070)
	at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1760)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:784)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:782)
	at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372)
	at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
	at scala.util.Success.$anonfun$map$1(Try.scala:255)
	at scala.util.Success.map(Try.scala:213)
	at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1434)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:295)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1016)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1665)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1598)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException: 403 Forbidden
GET https://storage.googleapis.com/storage/v1/b/ot-team/o/irene%2Fl2g%2Fvalidation%2Fchembl_w_flags?fields=bucket,name,timeCreated,updated,generation,metageneration,size,contentType,contentEncoding,md5Hash,crc32c,metadata
{
  "code" : 403,
  "errors" : [ {
    "domain" : "global",
    "message" : "open-targets-genetics-dev@appspot.gserviceaccount.com does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist).",
    "reason" : "forbidden"
  } ],
  "message" : "open-targets-genetics-dev@appspot.gserviceaccount.com does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist)."
}
	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:118)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:37)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:439)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1111)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:525)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:466)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:576)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getObject(GoogleCloudStorageImpl.java:2328)
	... 23 more


In [3]:
from pyspark.sql import Window
from pyspark.sql.types import StructType, StructField, StringType

def TA_OncoOrNot (diseases):
    ### create a dataframe asigning TA code, names and Oncology/Other 
    taDf = session.spark.createDataFrame(
        data=[
            ("MONDO_0045024", "cell proliferation disorder", "Oncology"),
            ("EFO_0005741", "infectious disease", "Other"),
            ("OTAR_0000014", "pregnancy or perinatal disease", "Other"),
            ("EFO_0005932", "animal disease", "Other"),
            ("MONDO_0024458", "disease of visual system", "Other"),
            ("EFO_0000319", "cardiovascular disease", "Other"),
            ("EFO_0009605", "pancreas disease", "Other"),
            ("EFO_0010282", "gastrointestinal disease", "Other"),
            ("OTAR_0000017", "reproductive system or breast disease", "Other"),
            ("EFO_0010285", "integumentary system disease", "Other"),
            ("EFO_0001379", "endocrine system disease", "Other"),
            ("OTAR_0000010", "respiratory or thoracic disease", "Other"),
            ("EFO_0009690", "urinary system disease", "Other"),
            ("OTAR_0000006", "musculoskeletal or connective tissue disease", "Other"),
            ("MONDO_0021205", "disease of ear", "Other"),
            ("EFO_0000540", "immune system disease", "Other"),
            ("EFO_0005803", "hematologic disease", "Other"),
            ("EFO_0000618", "nervous system disease", "Other"),
            ("MONDO_0002025", "psychiatric disorder", "Other"),
            ("MONDO_0024297", "nutritional or metabolic disease", "Other"),
            ("OTAR_0000018", "genetic, familial or congenital disease", "Other"),
            ("OTAR_0000009", "injury, poisoning or other complication", "Other"),
            ("EFO_0000651", "phenotype", "Other"),
            ("EFO_0001444", "measurement", "Other"),
            ("GO_0008150", "biological process", "Other"),
        ],
        schema=StructType(
            [
                StructField("taId", StringType(), True),
                StructField("taLabel", StringType(), True),
                StructField("taLabelSimple", StringType(), True),
            ]
        ),
    ).withColumn("taRank", f.monotonically_increasing_id())

    ### window over disease to take Oncology VS non oncology
    wByDisease = Window.partitionBy("diseaseId")
  ### explode therapy areas of diseases and joining the dataframe, categorise them between Onco or Others
    return (
        diseases.withColumn("taId", f.explode("therapeuticAreas"))
        .select(f.col("id").alias("diseaseId"), "taId", "parents")
        .join(taDf, on="taId", how="left")
        .withColumn("minRank", f.min("taRank").over(wByDisease))
        .filter(f.col("taRank") == f.col("minRank"))
        .drop("taRank", "minRank")
    )

### propagation of diseases  
def diseasePropagation(diseases):
    return diseases.select("id", "parents").withColumn(
        "diseaseIdPropagated",
        f.explode_outer(f.concat(f.array(f.col("id")), f.col("parents")))
    )

diseases=session.spark.read.parquet("/Users/xg1/Downloads/otg_releases/diseases.parquet")
TA_diseases=TA_OncoOrNot(diseases).select("diseaseId", "taLabelSimple").persist()
Propagated_diseases=diseasePropagation(diseases)

                                                                                

In [5]:
Propagated_diseases.show()

[Stage 1:>                                                          (0 + 1) / 1]

+-----------+--------------------+-------------------+
|         id|             parents|diseaseIdPropagated|
+-----------+--------------------+-------------------+
|EFO_0000255|     [MONDO_0000430]|        EFO_0000255|
|EFO_0000255|     [MONDO_0000430]|      MONDO_0000430|
|EFO_0000508|      [OTAR_0000018]|        EFO_0000508|
|EFO_0000508|      [OTAR_0000018]|       OTAR_0000018|
|EFO_0001054|[EFO_0004248, EFO...|        EFO_0001054|
|EFO_0001054|[EFO_0004248, EFO...|        EFO_0004248|
|EFO_0001054|[EFO_0004248, EFO...|        EFO_0009429|
|EFO_0001054|[EFO_0004248, EFO...|      MONDO_0000314|
|EFO_0001054|[EFO_0004248, EFO...|        EFO_0003100|
|EFO_0001054|[EFO_0004248, EFO...|      MONDO_0020590|
|EFO_0004287|[MONDO_0007263, E...|        EFO_0004287|
|EFO_0004287|[MONDO_0007263, E...|      MONDO_0007263|
|EFO_0004287|[MONDO_0007263, E...|        EFO_0004269|
|EFO_0004302|       [EFO_0001444]|        EFO_0004302|
|EFO_0004302|       [EFO_0001444]|        EFO_0001444|
|EFO_00050

                                                                                

In [4]:
Propagated_diseases.printSchema()

root
 |-- id: string (nullable = true)
 |-- parents: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- diseaseIdPropagated: string (nullable = true)



In [11]:
# Load chembl:
from pyspark.sql.window import Window

windowSpec = Window.partitionBy("diseaseFromSourceMappedId", "targetId")

# Process chembl evidence, find Max clinical phase reached for each disease-target pair:

chembl_evidence=session.spark.read.parquet(
    "/Users/xg1/Downloads/platform_2409_evidence/evidence/sourceId\=chembl").select(
        "targetId", "drugId", "clinicalPhase", 
        "diseaseFromSourceMappedId", 
        "diseaseFromSource", 
        "clinicalStatus").withColumn(
            "maxClinicalPhase", f.max("clinicalPhase").over(windowSpec)).filter(
                f.col("clinicalPhase") == f.col("maxClinicalPhase")).drop("clinicalPhase", "drugId").distinct()

# Filter out Oncology diseases:

chembl_evidence_noOncology=chembl_evidence.join(TA_diseases.withColumnRenamed(
    "diseaseId", "diseaseFromSourceMappedId"), on="diseaseFromSourceMappedId", how="inner").filter(
    (f.col("taLabelSimple") != "Oncology") | f.col("taLabelSimple").isNull()).drop(
        "diseaseFromSource", "clinicalStatus").distinct().persist()



In [6]:
chembl_evidence_noOncology.printSchema()

root
 |-- diseaseFromSourceMappedId: string (nullable = true)
 |-- targetId: string (nullable = true)
 |-- maxClinicalPhase: double (nullable = true)
 |-- taLabelSimple: string (nullable = true)



In [6]:
session.spark.read.parquet("gs://ot_orchestration/benchmarks/l2g/fm0/gs_v5.1/direct_associations").printSchema()

root
 |-- targetId: string (nullable = true)
 |-- diseaseId: string (nullable = true)
 |-- harmonicSum: double (nullable = true)



In [None]:

# propagate l2g genetic evidence through their parent efo terms::
prioritised_genes=session.spark.read.parquet("gs://ot_orchestration/benchmarks/l2g/fm0/gs_v5.1/direct_associations")
prioritised_genes_propagated=prioritised_genes.withColumnRenamed(
    "diseaseFromSourceMappedId", "l2gdiseaseFromSourceMappedId").join(
    Propagated_diseases.withColumnRenamed("id", "l2gdiseaseFromSourceMappedId"), 
    on="l2gdiseaseFromSourceMappedId", 
    how="inner").distinct()

# join propagated l2g evidence to filtered chembl evidence:

propagated_efo_gene_chembl = prioritised_genes_propagated.join(
    chembl_evidence_noOncology.withColumnRenamed("diseaseFromSourceMappedId", "diseaseIdPropagated"), 
    on=["targetId", "diseaseIdPropagated"], how="inner").persist()

# select relevant columns:

gene_efo_genetics_chembl=propagated_efo_gene_chembl.select(f.col("targetId"),
                 f.col("diseaseIdPropagated"),
                 f.col("l2gdiseaseFromSourceMappedId"),
                 f.col("studyId"),
                 f.col("studyLocusId"),
                 f.col("score"),
                 f.col("traitFromSource"),
                 f.col("maxClinicalPhase"),
                 #f.col("clinicalStatus"),
                 #f.col("taLabelSimple")
                 ).distinct().persist()

# deduplication of the propagated l2g evidence:
l2g_dedup=gene_efo_genetics_chembl.join(prioritised_genes.withColumnRenamed(
    "diseaseFromSourceMappedId", "l2gdiseaseFromSourceMappedId").select("targetId", "l2gdiseaseFromSourceMappedId", "studyLocusId"), 
    on=["targetId", "l2gdiseaseFromSourceMappedId", "studyLocusId"], 
    how="inner").persist()

# Right side join on chembl evidence, so this keeps target disease pairs which didnot have genetic support:

Efo_gene_genetics_chembl_bg=l2g_dedup.join(chembl_evidence_noOncology.withColumnRenamed(
    "diseaseFromSourceMappedId", "diseaseIdPropagated").select(
    "targetId", "diseaseIdPropagated", "maxClinicalPhase").distinct(),
                on=["targetId", "diseaseIdPropagated", "maxClinicalPhase"], 
                how="right").drop("drugId").distinct().persist()

In [8]:
prioritised_genes=session.spark.read.parquet("gs://ot_orchestration/benchmarks/l2g/fm0/gs_v6/direct_associations")

Efo_gene_genetics_chembl_bg=prioritised_genes.join(chembl_evidence_noOncology.withColumnRenamed(
    "diseaseFromSourceMappedId", "diseaseId").select(
    "targetId", "diseaseId", "maxClinicalPhase").distinct(),
                on=["targetId", "diseaseId"], 
                how="right").drop("drugId").distinct().persist()

Efo_gene_genetics_chembl_bg.show()



+---------------+-------------+-----------+----------------+
|       targetId|    diseaseId|harmonicSum|maxClinicalPhase|
+---------------+-------------+-----------+----------------+
|ENSG00000007314|  EFO_0000555|       null|             2.0|
|ENSG00000007314|  EFO_0003102|       null|             0.5|
|ENSG00000007314|  EFO_0003894|       null|             4.0|
|ENSG00000007314|  EFO_0004699|       null|             3.0|
|ENSG00000007314|  EFO_0801084|       null|             2.0|
|ENSG00000010310|  EFO_0003884|       null|             2.0|
|ENSG00000012504|  EFO_0004210|       null|             4.0|
|ENSG00000012504|MONDO_0019052|       null|             4.0|
|ENSG00000062822|  EFO_0004991|       null|             2.0|
|ENSG00000065989|  EFO_0003956|       null|             2.0|
|ENSG00000067191|  EFO_0000319|       null|             4.0|
|ENSG00000069535|  EFO_0004701|       null|             1.0|
|ENSG00000077092|  EFO_0003834|       null|             2.0|
|ENSG00000082175|  EFO_0

                                                                                

In [7]:
def analyze_genetic_support(direct_associations_path, chembl_background, cutoff, phases):
    """
    Analyze genetic support for different clinical phases and evidence sources.
    
    Args:
        df (pyspark.sql.DataFrame): The input DataFrame.
        cutoff (float): The cutoff for determining genetic support.
        phases (list[int]): List of clinical trial phases to analyze.
    
    Returns:
        pd.DataFrame: Results table with RS, odds ratio, and confidence intervals for each phase.
    """
    # Add a column to indicate genetic support
    direct_associations=session.spark.read.parquet(direct_associations_path)
    df=direct_associations.join(chembl_background.withColumnRenamed(
    "diseaseFromSourceMappedId", "diseaseId").select(
    "targetId", "diseaseId", "maxClinicalPhase").distinct(),
                on=["targetId", "diseaseId"], 
                how="right").drop("drugId").distinct().persist()


    df = df.withColumn(
        "geneticSupport", f.when(f.col("harmonicSum") > cutoff, True).otherwise(False)
    ).persist()

    results = []

    for phase in phases:
        print(f"Analyzing clinical phase: {phase}+")

        # Calculate N_G and N_negG
        N_G = df.filter(f.col("geneticSupport") == True).count()
        N_negG = df.filter(f.col("geneticSupport") == False).count()

        # Calculate X_G and X_negG
        X_G = df.filter((f.col("geneticSupport") == True) & (f.col("maxClinicalPhase") >= phase)).count()
        X_negG = df.filter((f.col("geneticSupport") == False) & (f.col("maxClinicalPhase") >= phase)).count()

        # Calculate P(G)
        P_G = N_G / (N_G + N_negG) if (N_G + N_negG) > 0 else 0

        # Calculate P(S)
        P_S = (X_G + X_negG) / (N_G + N_negG) if (N_G + N_negG) > 0 else 0

        # Calculate RS
        P_S_G = X_G / N_G if N_G > 0 else 0
        P_S_negG = X_negG / N_negG if N_negG > 0 else 0
        RS = (P_S_G / P_S_negG) if P_S_negG > 0 else float('inf')

        # Calculate confidence interval for RS
        ln_RS = np.log(RS)
        SE_ln_RS = np.sqrt(1/X_G + 1/(N_G - X_G) + 1/X_negG + 1/(N_negG - X_negG))
        z = norm.ppf(0.975)  # For 95% CI
        ci_ln_low = ln_RS - z * SE_ln_RS
        ci_ln_high = ln_RS + z * SE_ln_RS
        ci_low_RS = np.exp(ci_ln_low)
        ci_high_RS = np.exp(ci_ln_high)

        # Create the contingency table
        contingency_table = [[X_G, N_G - X_G], [X_negG, N_negG - X_negG]]
        print(contingency_table)
        
        # Perform Fisher's Exact Test
        odds_ratio, p_value = fisher_exact(contingency_table)

        # Calculate confidence interval for odds ratio
        ln_or = np.log(odds_ratio)
        se_ln_or = np.sqrt(1/contingency_table[0][0] + 1/contingency_table[0][1] + 1/contingency_table[1][0] + 1/contingency_table[1][1])
        ci_ln_low = ln_or - z * se_ln_or
        ci_ln_high = ln_or + z * se_ln_or
        ci_low = np.exp(ci_ln_low)
        ci_high = np.exp(ci_ln_high)

        # Store results
        results.append({
            "clinicalPhase": str(phase) + "+",
            "RS": RS,
            "RS_ci_low": ci_low_RS,
            "RS_ci_high": ci_high_RS,
            "odds_ratio": odds_ratio,
            "p_value": p_value,
            "ci_low": ci_low,
            "ci_high": ci_high
        })

    # Convert results to DataFrame for display
    results_table = pd.DataFrame(results)
    return results_table


In [19]:
import pandas as pd
from scipy.stats import fisher_exact
from scipy.stats import norm
import numpy as np


def analyze_propagated_genetic_support(direct_associations_path, chembl_background, cutoff, phases, propagation_df=Propagated_diseases, propagation=False):
    direct_associations=session.spark.read.parquet(direct_associations_path)

    if (propagation == True):
        propagated_direct_associations = (
            direct_associations
            .join(propagation_df, direct_associations["diseaseId"] == propagation_df["id"], "left")
            .select(
        "targetId",
        f.col("id").alias("associationRootDiseaseId"),  # Root disease ID
        "diseaseIdPropagated",  # Propagated term
        "harmonicSum"
    )
)

# Step 2: Propagate disease IDs for chembl_evidence
        propagated_chembl_evidence = (
            chembl_background
                .join(propagation_df, chembl_evidence["diseaseFromSourceMappedId"] == propagation_df["id"], "left")
                .select(
                    "targetId",
                    "diseaseIdPropagated",
                    f.col("id").alias("chemblRootDiseaseId"),  # Propagated term
                    "maxClinicalPhase",
                    "taLabelSimple"
    )
)

# Step 3: Join the propagated datasets
        joined_data = (
            propagated_direct_associations
            .join(
                propagated_chembl_evidence,
                ["targetId", "diseaseIdPropagated"],  # Match on targetId and propagated diseaseId
                "right"
            )
            .select(
                "targetId",
                "chemblRootDiseaseId",  # Retain the root disease ID from one dataset (same for both)
                "diseaseIdPropagated",
                "harmonicSum",
                "maxClinicalPhase",
                "taLabelSimple"
            )
        )

        # Step 4: Deduplicate after the join
        df = (
            joined_data
            .withColumn("termLength", f.length("diseaseIdPropagated"))  # Add term length for specificity
            .withColumn(
                "rank",
                f.row_number().over(
                    Window.partitionBy("targetId", "chemblRootDiseaseId").orderBy(f.col("termLength"))  # Rank by shortest term
                )
            )
            .filter(f.col("rank") == 1)  # Keep only the most specific term for each rootDiseaseId
            .drop("termLength", "rank")  # Clean up temporary columns
        )

    df = df.withColumn(
        "geneticSupport", f.when(f.col("harmonicSum") > cutoff, True).otherwise(False)
    ).persist()

    results = []

    for phase in phases:
        print(f"Analyzing clinical phase: {phase}+")

        # Calculate N_G and N_negG
        N_G = df.filter(f.col("geneticSupport") == True).count()
        N_negG = df.filter(f.col("geneticSupport") == False).count()

        # Calculate X_G and X_negG
        X_G = df.filter((f.col("geneticSupport") == True) & (f.col("maxClinicalPhase") >= phase)).count()
        X_negG = df.filter((f.col("geneticSupport") == False) & (f.col("maxClinicalPhase") >= phase)).count()

        # Calculate P(G)
        P_G = N_G / (N_G + N_negG) if (N_G + N_negG) > 0 else 0

        # Calculate P(S)
        P_S = (X_G + X_negG) / (N_G + N_negG) if (N_G + N_negG) > 0 else 0

        # Calculate RS
        P_S_G = X_G / N_G if N_G > 0 else 0
        P_S_negG = X_negG / N_negG if N_negG > 0 else 0
        RS = (P_S_G / P_S_negG) if P_S_negG > 0 else float('inf')

        # Calculate confidence interval for RS
        ln_RS = np.log(RS)
        SE_ln_RS = np.sqrt(1/X_G + 1/(N_G - X_G) + 1/X_negG + 1/(N_negG - X_negG))
        z = norm.ppf(0.975)  # For 95% CI
        ci_ln_low = ln_RS - z * SE_ln_RS
        ci_ln_high = ln_RS + z * SE_ln_RS
        ci_low_RS = np.exp(ci_ln_low)
        ci_high_RS = np.exp(ci_ln_high)

        # Create the contingency table
        contingency_table = [[X_G, N_G - X_G], [X_negG, N_negG - X_negG]]
        print(contingency_table)
        
        # Perform Fisher's Exact Test
        odds_ratio, p_value = fisher_exact(contingency_table)

        # Calculate confidence interval for odds ratio
        ln_or = np.log(odds_ratio)
        se_ln_or = np.sqrt(1/contingency_table[0][0] + 1/contingency_table[0][1] + 1/contingency_table[1][0] + 1/contingency_table[1][1])
        ci_ln_low = ln_or - z * se_ln_or
        ci_ln_high = ln_or + z * se_ln_or
        ci_low = np.exp(ci_ln_low)
        ci_high = np.exp(ci_ln_high)

        # Store results
        results.append({
            "clinicalPhase": str(phase) + "+",
            "RS": RS,
            "RS_ci_low": ci_low_RS,
            "RS_ci_high": ci_high_RS,
            "odds_ratio": odds_ratio,
            "p_value": p_value,
            "ci_low": ci_low,
            "ci_high": ci_high
        })

    # Convert results to DataFrame for display
    results_table = pd.DataFrame(results)
    return results_table

cutoff = 0.05
phases = [2, 3, 4]
evidence_sources = []

# Analyze each evidence source

l2g_fm0_folders = [

    "gs://ot_orchestration/benchmarks/l2g/fm0/gs_v5.1",
    "gs://ot_orchestration/benchmarks/l2g/fm0/gs_v5",
    "gs://ot_orchestration/benchmarks/l2g/fm0/gs_v6",
    "gs://ot_orchestration/benchmarks/l2g/fm0/gs_v7",
]


all_direct_associations=[folder + "/direct_associations" for folder in l2g_fm0_folders]
for source in all_direct_associations:
    print(f"Analyzing evidence source: {source}")
    results_table = analyze_propagated_genetic_support(source, chembl_evidence_noOncology, cutoff, phases, propagation=True)
    print(results_table)

In [37]:
cutoff = 0.3
phases = [2, 3, 4]
evidence_sources = []

# Analyze each evidence source

l2g_fm0_folders = [
    "/Users/xg1/Downloads/platform_2409_evidence",
    "gs://ot_orchestration/benchmarks/l2g/fm0/gs_v1",
    "gs://ot_orchestration/benchmarks/l2g/fm0/gs_v5.1",
    "gs://ot_orchestration/benchmarks/l2g/fm0/gs_v5",
    "gs://ot_orchestration/benchmarks/l2g/fm0/gs_v6",
    "gs://ot_orchestration/benchmarks/l2g/fm0/gs_v7",
]


all_direct_associations=[folder + "/direct_associations" for folder in l2g_fm0_folders]
for source in all_direct_associations:
    print(f"Analyzing evidence source: {source}")
    results_table = analyze_propagated_genetic_support(source, chembl_evidence_noOncology, cutoff, phases, propagation=True)
    print(results_table)

Analyzing evidence source: /Users/xg1/Downloads/platform_2409_evidence/direct_associations
Analyzing clinical phase: 2+


                                                                                

[[678, 78], [36513, 6235]]
Analyzing clinical phase: 3+


                                                                                

[[492, 264], [24660, 18088]]
Analyzing clinical phase: 4+


                                                                                

[[355, 401], [17289, 25459]]
  clinicalPhase        RS  RS_ci_low  RS_ci_high  odds_ratio   p_value  \
0            2+  1.049968   0.829349    1.329276    1.484308  0.000694   
1            3+  1.128148   0.970273    1.311710    1.366969  0.000043   
2            4+  1.161054   1.005211    1.341059    1.303633  0.000331   

     ci_low   ci_high  
0  1.172425  1.879156  
1  1.175673  1.589390  
2  1.128652  1.505743  
Analyzing evidence source: gs://ot_orchestration/benchmarks/l2g/fm0/gs_v1/direct_associations
Analyzing clinical phase: 2+


                                                                                

[[982, 92], [36209, 6221]]
Analyzing clinical phase: 3+


                                                                                

[[751, 323], [24401, 18029]]
Analyzing clinical phase: 4+


                                                                                

[[538, 536], [17106, 25324]]
  clinicalPhase        RS  RS_ci_low  RS_ci_high  odds_ratio       p_value  \
0            2+  1.071430   0.863820    1.328936    1.833865  2.763693e-09   
1            3+  1.215909   1.065734    1.387246    1.717914  1.362620e-16   
2            4+  1.242518   1.100720    1.402582    1.485940  1.836132e-10   

     ci_low   ci_high  
0  1.478520  2.274613  
1  1.505737  1.959989  
2  1.316363  1.677363  
Analyzing evidence source: gs://ot_orchestration/benchmarks/l2g/fm0/gs_v5.1/direct_associations
Analyzing clinical phase: 2+


                                                                                

[[1284, 146], [35907, 6167]]
Analyzing clinical phase: 3+


                                                                                

[[935, 495], [24217, 17857]]
Analyzing clinical phase: 4+


                                                                                

[[667, 763], [16977, 25097]]
  clinicalPhase        RS  RS_ci_low  RS_ci_high  odds_ratio       p_value  \
0            2+  1.052116   0.884709    1.251201    1.510452  1.177341e-06   
1            3+  1.135976   1.016987    1.268886    1.392819  2.774406e-09   
2            4+  1.155960   1.040006    1.284841    1.292296  2.411353e-06   

     ci_low   ci_high  
0  1.270117  1.796265  
1  1.246927  1.555779  
2  1.162667  1.436378  
Analyzing evidence source: gs://ot_orchestration/benchmarks/l2g/fm0/gs_v5/direct_associations


                                                                                

Analyzing clinical phase: 2+


                                                                                

[[1311, 160], [35880, 6153]]
Analyzing clinical phase: 3+




[[936, 535], [24216, 17817]]
Analyzing clinical phase: 4+
[[661, 810], [16983, 25050]]
  clinicalPhase        RS  RS_ci_low  RS_ci_high  odds_ratio   p_value  \
0            2+  1.044066   0.884067    1.233022    1.405132  0.000034   
1            3+  1.104463   0.991421    1.230394    1.287224  0.000004   
2            4+  1.112154   1.001736    1.234743    1.203676  0.000542   

     ci_low   ci_high  
0  1.189801  1.659434  
1  1.155477  1.433994  
2  1.084172  1.336354  
Analyzing evidence source: gs://ot_orchestration/benchmarks/l2g/fm0/gs_v6/direct_associations
Analyzing clinical phase: 2+
[[3546, 548], [33645, 5765]]
Analyzing clinical phase: 3+
[[2428, 1666], [22724, 16686]]
Analyzing clinical phase: 4+
[[1710, 2384], [15934, 23476]]
  clinicalPhase        RS  RS_ci_low  RS_ci_high  odds_ratio   p_value  \
0            2+  1.014558   0.923349    1.114776    1.108758  0.031967   
1            3+  1.028543   0.963355    1.098143    1.070141  0.042547   
2            4+  1.033070 



[[3313, 532], [33878, 5781]]
Analyzing clinical phase: 3+
[[2269, 1576], [22883, 16776]]
Analyzing clinical phase: 4+
[[1617, 2228], [16027, 23632]]
  clinicalPhase        RS  RS_ci_low  RS_ci_high  odds_ratio   p_value  \
0            2+  1.008670   0.916617    1.109968    1.062662  0.221356   
1            3+  1.022744   0.956192    1.093928    1.055489  0.119656   
2            4+  1.040646   0.973113    1.112867    1.070146  0.047941   

     ci_low   ci_high  
0  0.965681  1.169382  
1  0.986806  1.128952  
2  1.000698  1.144414  




In [39]:
import pandas as pd
from scipy.stats import fisher_exact
from scipy.stats import norm
import numpy as np


def analyze_oneway_propagated_genetic_support(direct_associations_path, chembl_background, cutoff, phases, propagation_df=Propagated_diseases, propagation=False):
    direct_associations=session.spark.read.parquet(direct_associations_path)

    if (propagation == True):
        propagated_direct_associations = (
            direct_associations
            .join(propagation_df, direct_associations["diseaseId"] == propagation_df["id"], "left")
            .select(
        "targetId",
        f.col("id").alias("associationRootDiseaseId"),  # Root disease ID
        "diseaseIdPropagated",  # Propagated term
        "harmonicSum"
    )
)

# Step 2: Propagate disease IDs for chembl_evidence
        propagated_chembl_evidence = (chembl_background).withColumnRenamed("diseaseFromSourceMappedId", "diseaseIdPropagated")
#                 .join(propagation_df, chembl_evidence["diseaseFromSourceMappedId"] == propagation_df["id"], "left")
#                 .select(
#                     "targetId",
#                     "diseaseIdPropagated",
#                     f.col("id").alias("chemblRootDiseaseId"),  # Propagated term
#                     "maxClinicalPhase",
#                     "taLabelSimple"
#     )
# )

# Step 3: Join the propagated datasets
        joined_data = (
            propagated_direct_associations
            .join(
                propagated_chembl_evidence,
                ["targetId", "diseaseIdPropagated"],  # Match on targetId and propagated diseaseId
                "right"
            )
            .select(
                "targetId",
                "associationRootDiseaseId",  # Retain the root disease ID from one dataset (same for both)
                "diseaseIdPropagated",
                "harmonicSum",
                "maxClinicalPhase",
                "taLabelSimple"
            )
        )

        # Step 4: Deduplicate after the join
        df = (
            joined_data
            .withColumn("termLength", f.length("diseaseIdPropagated"))  # Add term length for specificity
            .withColumn(
                "rank",
                f.row_number().over(
                    Window.partitionBy("targetId", "associationRootDiseaseId").orderBy(f.col("termLength"))  # Rank by shortest term
                )
            )
            .filter(f.col("rank") == 1)  # Keep only the most specific term for each rootDiseaseId
            .drop("termLength", "rank")  # Clean up temporary columns
        )

    df = df.withColumn(
        "geneticSupport", f.when(f.col("harmonicSum") > cutoff, True).otherwise(False)
    ).persist()

    results = []

    for phase in phases:
        print(f"Analyzing clinical phase: {phase}+")

        # Calculate N_G and N_negG
        N_G = df.filter(f.col("geneticSupport") == True).count()
        N_negG = df.filter(f.col("geneticSupport") == False).count()

        # Calculate X_G and X_negG
        X_G = df.filter((f.col("geneticSupport") == True) & (f.col("maxClinicalPhase") >= phase)).count()
        X_negG = df.filter((f.col("geneticSupport") == False) & (f.col("maxClinicalPhase") >= phase)).count()

        # Calculate P(G)
        P_G = N_G / (N_G + N_negG) if (N_G + N_negG) > 0 else 0

        # Calculate P(S)
        P_S = (X_G + X_negG) / (N_G + N_negG) if (N_G + N_negG) > 0 else 0

        # Calculate RS
        P_S_G = X_G / N_G if N_G > 0 else 0
        P_S_negG = X_negG / N_negG if N_negG > 0 else 0
        RS = (P_S_G / P_S_negG) if P_S_negG > 0 else float('inf')

        # Calculate confidence interval for RS
        ln_RS = np.log(RS)
        SE_ln_RS = np.sqrt(1/X_G + 1/(N_G - X_G) + 1/X_negG + 1/(N_negG - X_negG))
        z = norm.ppf(0.975)  # For 95% CI
        ci_ln_low = ln_RS - z * SE_ln_RS
        ci_ln_high = ln_RS + z * SE_ln_RS
        ci_low_RS = np.exp(ci_ln_low)
        ci_high_RS = np.exp(ci_ln_high)

        # Create the contingency table
        contingency_table = [[X_G, N_G - X_G], [X_negG, N_negG - X_negG]]
        print(contingency_table)
        
        # Perform Fisher's Exact Test
        odds_ratio, p_value = fisher_exact(contingency_table)

        # Calculate confidence interval for odds ratio
        ln_or = np.log(odds_ratio)
        se_ln_or = np.sqrt(1/contingency_table[0][0] + 1/contingency_table[0][1] + 1/contingency_table[1][0] + 1/contingency_table[1][1])
        ci_ln_low = ln_or - z * se_ln_or
        ci_ln_high = ln_or + z * se_ln_or
        ci_low = np.exp(ci_ln_low)
        ci_high = np.exp(ci_ln_high)

        # Store results
        results.append({
            "clinicalPhase": str(phase) + "+",
            "RS": RS,
            "RS_ci_low": ci_low_RS,
            "RS_ci_high": ci_high_RS,
            "odds_ratio": odds_ratio,
            "p_value": p_value,
            "ci_low": ci_low,
            "ci_high": ci_high
        })

    # Convert results to DataFrame for display
    results_table = pd.DataFrame(results)
    return results_table

cutoff = 0.3
phases = [2, 3, 4]
evidence_sources = []

# Analyze each evidence source

l2g_fm0_folders = [
    "/Users/xg1/Downloads/platform_2409_evidence",
    "gs://ot_orchestration/benchmarks/l2g/fm0/gs_v1",
    "gs://ot_orchestration/benchmarks/l2g/fm0/gs_v5.1",
    "gs://ot_orchestration/benchmarks/l2g/fm0/gs_v5",
    "gs://ot_orchestration/benchmarks/l2g/fm0/gs_v6",
    "gs://ot_orchestration/benchmarks/l2g/fm0/gs_v7",
]


all_direct_associations=[folder + "/direct_associations" for folder in l2g_fm0_folders]
for source in all_direct_associations:
    print(f"Analyzing evidence source: {source}")
    results_table = analyze_oneway_propagated_genetic_support(source, chembl_evidence_noOncology, cutoff, phases, propagation=True)
    print(results_table)

Analyzing evidence source: /Users/xg1/Downloads/platform_2409_evidence/direct_associations
Analyzing clinical phase: 2+


                                                                                

[[414, 55], [1243, 357]]
Analyzing clinical phase: 3+


                                                                                

[[285, 184], [733, 867]]
Analyzing clinical phase: 4+


                                                                                

[[214, 255], [471, 1129]]
  clinicalPhase        RS  RS_ci_low  RS_ci_high  odds_ratio       p_value  \
0            2+  1.136256   0.837631    1.541346    2.161896  1.756591e-07   
1            3+  1.326441   1.075377    1.636120    1.832070  1.282259e-08   
2            4+  1.550030   1.255017    1.914390    2.011623  1.174583e-10   

     ci_low   ci_high  
0  1.593716  2.932639  
1  1.485303  2.259796  
2  1.628757  2.484489  
Analyzing evidence source: gs://ot_orchestration/benchmarks/l2g/fm0/gs_v5.1/direct_associations
Analyzing clinical phase: 2+


                                                                                

[[810, 109], [1458, 398]]
Analyzing clinical phase: 3+


                                                                                

[[581, 338], [865, 991]]
Analyzing clinical phase: 4+


                                                                                

[[418, 501], [565, 1291]]
  clinicalPhase        RS  RS_ci_low  RS_ci_high  odds_ratio       p_value  \
0            2+  1.121993   0.892682    1.410207    2.028542  3.102742e-10   
1            3+  1.356508   1.153451    1.595312    1.969323  1.456282e-16   
2            4+  1.494137   1.269159    1.758995    1.906410  1.021926e-14   

     ci_low   ci_high  
0  1.613954  2.549629  
1  1.674533  2.316009  
2  1.619355  2.244350  
Analyzing evidence source: gs://ot_orchestration/benchmarks/l2g/fm0/gs_v5/direct_associations
Analyzing clinical phase: 2+


                                                                                

[[856, 124], [1500, 398]]
Analyzing clinical phase: 3+


                                                                                

[[601, 379], [897, 1001]]
Analyzing clinical phase: 4+


                                                                                

[[430, 550], [587, 1311]]
  clinicalPhase        RS  RS_ci_low  RS_ci_high  odds_ratio       p_value  \
0            2+  1.105230   0.888426    1.374941    1.831656  2.364059e-08   
1            3+  1.297634   1.109097    1.518220    1.769607  7.214186e-13   
2            4+  1.418732   1.209750    1.663816    1.746105  7.845150e-12   

     ci_low   ci_high  
0  1.472354  2.278638  
1  1.512497  2.070424  
2  1.488900  2.047742  
Analyzing evidence source: gs://ot_orchestration/benchmarks/l2g/fm0/gs_v6/direct_associations
Analyzing clinical phase: 2+




[[2292, 495], [1281, 320]]
Analyzing clinical phase: 3+
[[1532, 1255], [757, 844]]
Analyzing clinical phase: 4+
[[1100, 1687], [486, 1115]]
  clinicalPhase        RS  RS_ci_low  RS_ci_high  odds_ratio       p_value  \
0            2+  1.027827   0.879074    1.201750    1.156672  6.969114e-02   
1            3+  1.162565   1.027741    1.315076    1.361011  9.675842e-07   
2            4+  1.300202   1.140744    1.481949    1.495947  1.236766e-09   

     ci_low   ci_high  
0  0.989272  1.352399  
1  1.203173  1.539555  
2  1.312483  1.705057  
Analyzing evidence source: gs://ot_orchestration/benchmarks/l2g/fm0/gs_v7/direct_associations
Analyzing clinical phase: 2+


                                                                                       (0 + 0) / 9]

[[2168, 470], [1373, 343]]
Analyzing clinical phase: 3+


                                                                                

[[1447, 1191], [822, 894]]
Analyzing clinical phase: 4+




[[1051, 1587], [526, 1190]]
  clinicalPhase        RS  RS_ci_low  RS_ci_high  odds_ratio       p_value  \
0            2+  1.027144   0.879895    1.199034    1.152352  7.346350e-02   
1            3+  1.145089   1.013713    1.293491    1.321364  7.776620e-06   
2            4+  1.299749   1.142597    1.478516    1.498259  6.885459e-10   

     ci_low   ci_high  
0  0.987154  1.345195  
1  1.169764  1.492611  
2  1.317105  1.704330  


                                                                                

In [11]:
# Fisher's exact:

import pandas as pd
from scipy.stats import fisher_exact
from scipy.stats import norm
import numpy as np

# Define cutoff
cutoff = 0.05

# Add a column to indicate genetic support
df = Efo_gene_genetics_chembl_bg.withColumn(
    "geneticSupport", f.when(f.col("harmonicSum") > cutoff, True).otherwise(False)).persist()

# Get unique clinical trial phases
#phases = df.select("clinicalPhase").distinct().collect()
#phases = [row["clinicalPhase"] for row in phases]
phases = [2, 3, 4]
results = []

for phase in phases:
    print(f"Analyzing clinical phase: {phase}+")

    phase_df=df

    # Calculate N_G and N_negG
    N_G = phase_df.filter(f.col("geneticSupport") == True).count()
    N_negG = phase_df.filter(f.col("geneticSupport") == False).count()

    # Calculate X_G and X_negG
    X_G = phase_df.filter((f.col("geneticSupport") == True) & (f.col("maxClinicalPhase") >= phase)).count()
    X_negG = phase_df.filter((f.col("geneticSupport") == False) & (f.col("maxClinicalPhase") >= phase)).count()

    # Calculate P(G)
    P_G = N_G / (N_G + N_negG) if (N_G + N_negG) > 0 else 0

    # Calculate P(S)
    P_S = (X_G + X_negG) / (N_G + N_negG) if (N_G + N_negG) > 0 else 0

    # Calculate RS
    P_S_G = X_G / N_G if N_G > 0 else 0
    P_S_negG = X_negG / N_negG if N_negG > 0 else 0
    RS = (P_S_G / P_S_negG) if P_S_negG > 0 else float('inf')

    # Calculate confidence interval for RS
    ln_RS = np.log(RS)
    SE_ln_RS = np.sqrt(1/X_G + 1/(N_G - X_G) + 1/X_negG + 1/(N_negG - X_negG))
    z = norm.ppf(0.975)  # For 95% CI
    ci_ln_low = ln_RS - z * SE_ln_RS
    ci_ln_high = ln_RS + z * SE_ln_RS
    ci_low_RS = np.exp(ci_ln_low)
    ci_high_RS = np.exp(ci_ln_high)

    # Create the contingency table
    contingency_table = [[X_G, N_G - X_G], [X_negG, N_negG - X_negG]]
    print(contingency_table)
    
    # Perform Fisher's Exact Test
    odds_ratio, p_value = fisher_exact(contingency_table)

    # Calculate confidence interval for odds ratio
    ln_or = np.log(odds_ratio)
    se_ln_or = np.sqrt(1/contingency_table[0][0] + 1/contingency_table[0][1] + 1/contingency_table[1][0] + 1/contingency_table[1][1])
    z = norm.ppf(0.975)  # For 95% CI
    ci_ln_low = ln_or - z * se_ln_or
    ci_ln_high = ln_or + z * se_ln_or
    ci_low = np.exp(ci_ln_low)
    ci_high = np.exp(ci_ln_high)

    # Store results
    results.append({
        "clinicalPhase": str(phase) + "+",
        "RS": RS,
        "RS_ci_low": ci_low_RS,
        "RS_ci_high": ci_high_RS,
        "odds_ratio": odds_ratio,
        "p_value": p_value,
        "ci_low": ci_low,
        "ci_high": ci_high
    })

# Convert results to DataFrame for display
results_table = pd.DataFrame(results)
print(results_table)

Analyzing clinical phase: 2+


                                                                                

[[866, 95], [36325, 6218]]
Analyzing clinical phase: 3+


                                                                                

[[624, 337], [24528, 18015]]
Analyzing clinical phase: 4+




[[446, 515], [17198, 25345]]
  clinicalPhase        RS  RS_ci_low  RS_ci_high  odds_ratio   p_value  \
0            2+  1.055400   0.852474    1.306630    1.560412  0.000019   
1            3+  1.126230   0.985104    1.287575    1.359962  0.000006   
2            4+  1.148052   1.009868    1.305144    1.276268  0.000223   

     ci_low   ci_high  
0  1.260386  1.931857  
1  1.189547  1.554791  
2  1.122652  1.450904  


                                                                                

In [25]:
import pandas as pd
from scipy.stats import fisher_exact
from scipy.stats import norm
import numpy as np


def analyze_propagated_genetic_support2(direct_associations_path, chembl_background, cutoff, phases, propagation_df=Propagated_diseases, propagation=False):
    direct_associations=session.spark.read.parquet(direct_associations_path).withColumnRenamed("ancestorDiseaseId", "diseaseId")

    if (propagation == True):
        propagated_direct_associations = (
            direct_associations
            .join(propagation_df, direct_associations["diseaseId"] == propagation_df["id"], "left")
            .select(
        "targetId",
        f.col("id").alias("associationRootDiseaseId"),  # Root disease ID
        "diseaseIdPropagated",  # Propagated term
        "harmonicSum"
    )
)

# Step 2: Propagate disease IDs for chembl_evidence
        propagated_chembl_evidence = (
            chembl_background
                .join(propagation_df, chembl_evidence["diseaseFromSourceMappedId"] == propagation_df["id"], "left")
                .select(
                    "targetId",
                    "diseaseIdPropagated",
                    f.col("id").alias("chemblRootDiseaseId"),  # Propagated term
                    "maxClinicalPhase",
                    "taLabelSimple"
    )
)

# Step 3: Join the propagated datasets
        joined_data = (
            propagated_direct_associations
            .join(
                propagated_chembl_evidence,
                ["targetId", "diseaseIdPropagated"],  # Match on targetId and propagated diseaseId
                "right"
            )
            .select(
                "targetId",
                "chemblRootDiseaseId",  # Retain the root disease ID from one dataset (same for both)
                "diseaseIdPropagated",
                "harmonicSum",
                "maxClinicalPhase",
                "taLabelSimple"
            )
        )

        # Step 4: Deduplicate after the join
        df = (
            joined_data
            .withColumn("termLength", f.length("diseaseIdPropagated"))  # Add term length for specificity
            .withColumn(
                "rank",
                f.row_number().over(
                    Window.partitionBy("targetId", "chemblRootDiseaseId").orderBy(f.col("termLength"))  # Rank by shortest term
                )
            )
            .filter(f.col("rank") == 1)  # Keep only the most specific term for each rootDiseaseId
            .drop("termLength", "rank")  # Clean up temporary columns
        )

    df = df.withColumn(
        "geneticSupport", f.when(f.col("harmonicSum") > cutoff, True).otherwise(False)
    ).persist()

    results = []

    for phase in phases:
        print(f"Analyzing clinical phase: {phase}+")

        # Calculate N_G and N_negG
        N_G = df.filter(f.col("geneticSupport") == True).count()
        N_negG = df.filter(f.col("geneticSupport") == False).count()

        # Calculate X_G and X_negG
        X_G = df.filter((f.col("geneticSupport") == True) & (f.col("maxClinicalPhase") >= phase)).count()
        X_negG = df.filter((f.col("geneticSupport") == False) & (f.col("maxClinicalPhase") >= phase)).count()

        # Calculate P(G)
        P_G = N_G / (N_G + N_negG) if (N_G + N_negG) > 0 else 0

        # Calculate P(S)
        P_S = (X_G + X_negG) / (N_G + N_negG) if (N_G + N_negG) > 0 else 0

        # Calculate RS
        P_S_G = X_G / N_G if N_G > 0 else 0
        P_S_negG = X_negG / N_negG if N_negG > 0 else 0
        RS = (P_S_G / P_S_negG) if P_S_negG > 0 else float('inf')

        # Calculate confidence interval for RS
        ln_RS = np.log(RS)
        SE_ln_RS = np.sqrt(1/X_G + 1/(N_G - X_G) + 1/X_negG + 1/(N_negG - X_negG))
        z = norm.ppf(0.975)  # For 95% CI
        ci_ln_low = ln_RS - z * SE_ln_RS
        ci_ln_high = ln_RS + z * SE_ln_RS
        ci_low_RS = np.exp(ci_ln_low)
        ci_high_RS = np.exp(ci_ln_high)

        # Create the contingency table
        contingency_table = [[X_G, N_G - X_G], [X_negG, N_negG - X_negG]]
        print(contingency_table)
        
        # Perform Fisher's Exact Test
        odds_ratio, p_value = fisher_exact(contingency_table)

        # Calculate confidence interval for odds ratio
        ln_or = np.log(odds_ratio)
        se_ln_or = np.sqrt(1/contingency_table[0][0] + 1/contingency_table[0][1] + 1/contingency_table[1][0] + 1/contingency_table[1][1])
        ci_ln_low = ln_or - z * se_ln_or
        ci_ln_high = ln_or + z * se_ln_or
        ci_low = np.exp(ci_ln_low)
        ci_high = np.exp(ci_ln_high)

        # Store results
        results.append({
            "clinicalPhase": str(phase) + "+",
            "RS": RS,
            "RS_ci_low": ci_low_RS,
            "RS_ci_high": ci_high_RS,
            "odds_ratio": odds_ratio,
            "p_value": p_value,
            "ci_low": ci_low,
            "ci_high": ci_high
        })

    # Convert results to DataFrame for display
    results_table = pd.DataFrame(results)
    return results_table

cutoff = 0.2
phases = [2, 3, 4]
evidence_sources = []

# Analyze each evidence source

l2g_fm0_folders = [
    "gs://ot_orchestration/benchmarks/l2g/fm0/gs_v1",
    "gs://ot_orchestration/benchmarks/l2g/fm0/gs_v5.1",
    "gs://ot_orchestration/benchmarks/l2g/fm0/gs_v5",
    "gs://ot_orchestration/benchmarks/l2g/fm0/gs_v6",
    "gs://ot_orchestration/benchmarks/l2g/fm0/gs_v7",
]


all_direct_associations=[folder + "/indirect_associations" for folder in l2g_fm0_folders]
for source in all_direct_associations:
    print(f"Analyzing evidence source: {source}")
    results_table = analyze_propagated_genetic_support2(source, chembl_evidence_noOncology, cutoff, phases, propagation=True)
    print(results_table)


Analyzing evidence source: gs://ot_orchestration/benchmarks/l2g/fm0/gs_v1/indirect_associations


                                                                                

Analyzing clinical phase: 2+


                                                                                

[[1856, 284], [35335, 6029]]
Analyzing clinical phase: 3+


                                                                                

[[1383, 757], [23769, 17595]]
Analyzing clinical phase: 4+


                                                                                

[[994, 1146], [16650, 24714]]
  clinicalPhase        RS  RS_ci_low  RS_ci_high  odds_ratio       p_value  \
0            2+  1.015270   0.893436    1.153718    1.115064  9.538547e-02   
1            3+  1.124657   1.027108    1.231471    1.352398  4.522909e-11   
2            4+  1.153934   1.057579    1.259067    1.287451  1.605829e-08   

     ci_low   ci_high  
0  0.981254  1.267121  
1  1.235096  1.480842  
2  1.179947  1.404749  
Analyzing evidence source: gs://ot_orchestration/benchmarks/l2g/fm0/gs_v5.1/indirect_associations


                                                                                

Analyzing clinical phase: 2+


                                                                                

[[2110, 348], [35081, 5965]]
Analyzing clinical phase: 3+


                                                                                

[[1496, 962], [23656, 17390]]
Analyzing clinical phase: 4+


                                                                                

[[1058, 1400], [16586, 24460]]
  clinicalPhase        RS  RS_ci_low  RS_ci_high  odds_ratio   p_value  \
0            2+  1.004383   0.893776    1.128678     1.03096  0.637152   
1            3+  1.056037   0.971602    1.147810     1.14318  0.001611   
2            4+  1.065204   0.981107    1.156510     1.11448  0.009889   

     ci_low   ci_high  
0  0.917426  1.158544  
1  1.051778  1.242526  
2  1.026493  1.210010  
Analyzing evidence source: gs://ot_orchestration/benchmarks/l2g/fm0/gs_v5/indirect_associations
Analyzing clinical phase: 2+


                                                                                

[[2065, 353], [35126, 5960]]
Analyzing clinical phase: 3+


                                                                                

[[1471, 947], [23681, 17405]]
Analyzing clinical phase: 4+


                                                                                

[[1037, 1381], [16607, 24479]]
  clinicalPhase        RS  RS_ci_low  RS_ci_high  odds_ratio   p_value  \
0            2+  0.998916   0.889355    1.121974    0.992574  0.905406   
1            3+  1.055480   0.970472    1.147936    1.141660  0.001976   
2            4+  1.061024   0.976601    1.152745    1.106847  0.016991   

     ci_low   ci_high  
0  0.883708  1.114851  
1  1.049710  1.241664  
2  1.018778  1.202529  
Analyzing evidence source: gs://ot_orchestration/benchmarks/l2g/fm0/gs_v6/indirect_associations
Analyzing clinical phase: 2+


                                                                                

[[5287, 1011], [31904, 5302]]
Analyzing clinical phase: 3+


                                                                                

[[3590, 2708], [21562, 15644]]
Analyzing clinical phase: 4+


                                                                                

[[2538, 3760], [15106, 22100]]
  clinicalPhase        RS  RS_ci_low  RS_ci_high  odds_ratio   p_value  \
0            2+  0.978982   0.909800    1.053424    0.869066  0.000203   
1            3+  0.983594   0.931920    1.038133    0.961844  0.159402   
2            4+  0.992550   0.939963    1.048079    0.987522  0.657046   

     ci_low   ci_high  
0  0.807652  0.935150  
1  0.911313  1.015177  
2  0.935201  1.042769  
Analyzing evidence source: gs://ot_orchestration/benchmarks/l2g/fm0/gs_v7/indirect_associations
Analyzing clinical phase: 2+


                                                                                

[[5037, 981], [32154, 5332]]
Analyzing clinical phase: 3+


                                                                                

[[3423, 2595], [21729, 15757]]
Analyzing clinical phase: 4+




[[2428, 3590], [15216, 22270]]
  clinicalPhase        RS  RS_ci_low  RS_ci_high  odds_ratio   p_value  \
0            2+  0.975784   0.905924    1.051032    0.851448  0.000029   
1            3+  0.981260   0.928763    1.036724    0.956540  0.115327   
2            4+  0.993951   0.940316    1.050646    0.989860  0.723703   

     ci_low   ci_high  
0  0.790489  0.917107  
1  0.905366  1.010607  
2  0.936446  1.046322  


                                                                                

In [36]:
 session.spark.read.parquet("/Users/xg1/Downloads/platform_2409_evidence/associationByDatasourceDirect").filter(f.col("datasourceId")=="ot_genetics_portal").distinct().select("diseaseId", "targetId", f.col("score").alias("harmonicSum")).write.mode("overwrite").parquet("/Users/xg1/Downloads/platform_2409_evidence/direct_associations")

                                                                                

In [35]:
 session.spark.read.parquet("/Users/xg1/Downloads/platform_2409_evidence/associationByDatasourceDirect").filter(f.col("datasourceId")=="ot_genetics_portal").show()

+-------------------+------------------+-----------+---------------+--------------------+-------------+
|         datatypeId|      datasourceId|  diseaseId|       targetId|               score|evidenceCount|
+-------------------+------------------+-----------+---------------+--------------------+-------------+
|genetic_association|ot_genetics_portal|EFO_0000616|ENSG00000003249| 0.10909600891996758|            2|
|genetic_association|ot_genetics_portal|EFO_0000616|ENSG00000072756| 0.14098477246873808|            1|
|genetic_association|ot_genetics_portal|EFO_0000616|ENSG00000091181|  0.1402083272055787|            1|
|genetic_association|ot_genetics_portal|EFO_0000616|ENSG00000104044|  0.6937267487226765|            2|
|genetic_association|ot_genetics_portal|EFO_0000616|ENSG00000113851| 0.34085161514916557|            1|
|genetic_association|ot_genetics_portal|EFO_0000616|ENSG00000137486| 0.41934053284337475|            1|
|genetic_association|ot_genetics_portal|EFO_0000616|ENSG00000140

In [18]:
# Define parameters
cutoff = 0.05
phases = [2, 3, 4]
evidence_sources = []

# Analyze each evidence source

l2g_fm0_folders = [
    "gs://ot_orchestration/benchmarks/l2g/fm0/gs_v1",
    "gs://ot_orchestration/benchmarks/l2g/fm0/gs_v4",
    "gs://ot_orchestration/benchmarks/l2g/fm0/gs_v5.1",
    "gs://ot_orchestration/benchmarks/l2g/fm0/gs_v5",
    "gs://ot_orchestration/benchmarks/l2g/fm0/gs_v6",
    "gs://ot_orchestration/benchmarks/l2g/fm0/gs_v7",
    "gs://ot_orchestration/benchmarks/l2g/fm0/gs_v8",
]


all_direct_associations=[folder + "/direct_associations" for folder in l2g_fm0_folders]
for source in all_direct_associations:
    print(f"Analyzing evidence source: {source}")
    results_table = analyze_genetic_support(source, chembl_evidence_noOncology, cutoff, phases)
    print(results_table)

Analyzing evidence source: gs://ot_orchestration/benchmarks/l2g/fm0/gs_v1/direct_associations
Analyzing clinical phase: 2+


                                                                                

[[716, 67], [36475, 6246]]
Analyzing clinical phase: 3+


                                                                                

[[556, 227], [24596, 18125]]
Analyzing clinical phase: 4+


                                                                                

[[408, 375], [17236, 25485]]
  clinicalPhase        RS  RS_ci_low  RS_ci_high  odds_ratio       p_value  \
0            2+  1.071019   0.832582    1.377742    1.829974  4.670486e-07   
1            3+  1.233360   1.055674    1.440954    1.804939  1.636410e-14   
2            4+  1.291527   1.121074    1.487895    1.608707  5.208944e-11   

     ci_low   ci_high  
0  1.422573  2.354048  
1  1.544906  2.108739  
2  1.396395  1.853301  
Analyzing evidence source: gs://ot_orchestration/benchmarks/l2g/fm0/gs_v4/direct_associations
Analyzing clinical phase: 2+


                                                                                

[[778, 87], [36413, 6226]]
Analyzing clinical phase: 3+


                                                                                

[[557, 308], [24595, 18044]]
Analyzing clinical phase: 4+


                                                                                

[[405, 460], [17239, 25400]]
  clinicalPhase        RS  RS_ci_low  RS_ci_high  odds_ratio   p_value  \
0            2+  1.053208   0.842525    1.316574    1.529019  0.000113   
1            3+  1.116347   0.970028    1.284737    1.326754  0.000072   
2            4+  1.158067   1.011878    1.325378    1.297236  0.000179   

     ci_low   ci_high  
0  1.223156  1.911367  
1  1.152857  1.526882  
2  1.133478  1.484652  
Analyzing evidence source: gs://ot_orchestration/benchmarks/l2g/fm0/gs_v5.1/direct_associations
24/11/19 14:50:22 WARN CacheManager: Asked to cache already cached data.
24/11/19 14:50:22 WARN CacheManager: Asked to cache already cached data.
Analyzing clinical phase: 2+


                                                                                

[[866, 95], [36325, 6218]]
Analyzing clinical phase: 3+


                                                                                

[[624, 337], [24528, 18015]]
Analyzing clinical phase: 4+


                                                                                

[[446, 515], [17198, 25345]]
  clinicalPhase        RS  RS_ci_low  RS_ci_high  odds_ratio   p_value  \
0            2+  1.055400   0.852474    1.306630    1.560412  0.000019   
1            3+  1.126230   0.985104    1.287575    1.359962  0.000006   
2            4+  1.148052   1.009868    1.305144    1.276268  0.000223   

     ci_low   ci_high  
0  1.260386  1.931857  
1  1.189547  1.554791  
2  1.122652  1.450904  
Analyzing evidence source: gs://ot_orchestration/benchmarks/l2g/fm0/gs_v5/direct_associations
Analyzing clinical phase: 2+


                                                                                

[[841, 98], [36350, 6215]]
Analyzing clinical phase: 3+


                                                                                

[[605, 334], [24547, 18018]]
Analyzing clinical phase: 4+


                                                                                

[[439, 500], [17205, 25360]]
  clinicalPhase        RS  RS_ci_low  RS_ci_high  odds_ratio   p_value  \
0            2+  1.048766   0.849326    1.295039    1.467259  0.000251   
1            3+  1.117234   0.976161    1.278694    1.329588  0.000034   
2            4+  1.156636   1.015997    1.316745    1.294163  0.000110   

     ci_low   ci_high  
0  1.188235  1.811803  
1  1.161701  1.521737  
2  1.136801  1.473309  
Analyzing evidence source: gs://ot_orchestration/benchmarks/l2g/fm0/gs_v6/direct_associations
Analyzing clinical phase: 2+


                                                                                

[[1763, 228], [35428, 6085]]
Analyzing clinical phase: 3+


                                                                                

[[1223, 768], [23929, 17584]]
Analyzing clinical phase: 4+


                                                                                

[[874, 1117], [16770, 24743]]
  clinicalPhase        RS  RS_ci_low  RS_ci_high  odds_ratio   p_value  \
0            2+  1.037573   0.901485    1.194204    1.328102  0.000053   
1            3+  1.065650   0.971680    1.168709    1.170195  0.000821   
2            4+  1.086654   0.992475    1.189770    1.154456  0.002037   

     ci_low   ci_high  
0  1.153909  1.528591  
1  1.067006  1.283364  
2  1.054401  1.264007  
Analyzing evidence source: gs://ot_orchestration/benchmarks/l2g/fm0/gs_v7/direct_associations
Analyzing clinical phase: 2+


                                                                                

[[1727, 221], [35464, 6092]]
Analyzing clinical phase: 3+


                                                                                

[[1198, 750], [23954, 17602]]
Analyzing clinical phase: 4+


                                                                                

[[860, 1088], [16784, 24772]]
  clinicalPhase        RS  RS_ci_low  RS_ci_high  odds_ratio   p_value  \
0            2+  1.038842   0.900747    1.198108    1.342370  0.000033   
1            3+  1.066900   0.971849    1.171247    1.173761  0.000786   
2            4+  1.093069   0.997442    1.197864    1.166635  0.001029   

     ci_low   ci_high  
0  1.163927  1.548171  
1  1.069189  1.288559  
2  1.064572  1.278483  
Analyzing evidence source: gs://ot_orchestration/benchmarks/l2g/fm0/gs_v8/direct_associations


                                                                                

Analyzing clinical phase: 2+


                                                                                

[[980, 113], [36211, 6200]]
Analyzing clinical phase: 3+


                                                                                

[[687, 406], [24465, 17946]]
Analyzing clinical phase: 4+


                                                                                

[[490, 603], [17154, 25257]]
  clinicalPhase        RS  RS_ci_low  RS_ci_high  odds_ratio   p_value  \
0            2+  1.050132   0.862727    1.278245    1.484905  0.000042   
1            3+  1.089607   0.962349    1.233693    1.241233  0.000640   
2            4+  1.108381   0.982285    1.250663    1.196452  0.003711   

     ci_low   ci_high  
0  1.219912  1.807462  
1  1.096266  1.405369  
2  1.060336  1.350040  


24/11/20 07:56:56 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 948519 ms exceeds timeout 120000 ms
24/11/20 07:56:56 WARN SparkContext: Killing executors is not supported by current scheduler.
24/11/20 07:57:04 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:117)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:116)
	at org.apache.spark.storage.B

In [3]:
session.spark.read.parquet(direct_associations_path).show()

[Stage 2:>                                                          (0 + 1) / 1]

+---------------+-----------+--------------------+
|       targetId|  diseaseId|         harmonicSum|
+---------------+-----------+--------------------+
|ENSG00000000003|EFO_0004339|  0.5835221578081758|
|ENSG00000000003|EFO_0004747| 0.06507035758290003|
|ENSG00000000419|EFO_0004704| 0.03722785074236974|
|ENSG00000000457|EFO_0004704|  0.1448520264865551|
|ENSG00000000457|EFO_0008202|0.046780276112467156|
|ENSG00000000460|EFO_0004286| 0.18657911050403436|
|ENSG00000000460|EFO_0004308| 0.04232009095927354|
|ENSG00000000971|EFO_0004615|0.056876469336240285|
|ENSG00000000971|EFO_0004725|  0.4724564103881596|
|ENSG00000000971|EFO_0008034|  0.5068244343438777|
|ENSG00000000971|EFO_0008271|  0.3811047091185817|
|ENSG00000000971|EFO_0008543|  0.3487026416352643|
|ENSG00000000971|EFO_0009947|  0.3870307325929446|
|ENSG00000000971|EFO_0010554|  0.5760499149024054|
|ENSG00000000971|EFO_0010802|  0.5610099218854913|
|ENSG00000000971|EFO_0010834|  0.5068244343438777|
|ENSG00000000971|EFO_0010913|  

                                                                                

In [None]:
class LocusToGeneEvidenceStep:
    """Locus to gene evidence step."""

    def __init__(
        self,
        session: Session,
        locus_to_gene_predictions_path: str,
        credible_set_path: str,
        study_index_path: str,
        evidence_output_path: str,
        locus_to_gene_threshold: float,
    ) -> None:
        """Initialise the step and generate disease/target evidence.

        Args:
            session (Session): Session object that contains the Spark session
            locus_to_gene_predictions_path (str): Path to the L2G predictions dataset
            credible_set_path (str): Path to the credible set dataset
            study_index_path (str): Path to the study index dataset
            evidence_output_path (str): Path to the L2G evidence output dataset. The output format is ndjson gzipped.
            locus_to_gene_threshold (float, optional): Threshold to consider a gene as a target. Defaults to 0.05.
        """
        # Reading the predictions
        locus_to_gene_prediction = L2GPrediction.from_parquet(
            session, locus_to_gene_predictions_path
        )
        # Reading the credible set
        credible_sets = StudyLocus.from_parquet(session, credible_set_path)

        # Reading the study index
        study_index = StudyIndex.from_parquet(session, study_index_path)

        # Generate evidence and save file:
        (
            locus_to_gene_prediction.to_disease_target_evidence(
                credible_sets, study_index, locus_to_gene_threshold
            )
            .write.mode(session.write_mode)
            .option("compression", "gzip")
            .json(evidence_output_path)
        )


class LocusToGeneAssociationsStep:
    """Locus to gene associations step."""

    def __init__(
        self,
        session: Session,
        evidence_input_path: str,
        disease_index_path: str,
        direct_associations_output_path: str,
        indirect_associations_output_path: str,
    ) -> None:
        """Create direct and indirect association datasets.

        Args:
            session (Session): Session object that contains the Spark session
            evidence_input_path (str): Path to the L2G evidence input dataset
            disease_index_path (str): Path to disease index file
            direct_associations_output_path (str): Path to the direct associations output dataset
            indirect_associations_output_path (str): Path to the indirect associations output dataset
        """
        # Read in the disease index
        disease_index = session.spark.read.parquet(disease_index_path).select(
            f.col("id").alias("diseaseId"),
            f.explode("ancestors").alias("ancestorDiseaseId"),
        )

        # Read in the L2G evidence
        disease_target_evidence = session.spark.read.json(evidence_input_path).select(
            f.col("targetFromSourceId").alias("targetId"),
            f.col("diseaseFromSourceMappedId").alias("diseaseId"),
            f.col("resourceScore"),
        )

        # Generate direct assocations and save file
        (
            disease_target_evidence.groupBy("targetId", "diseaseId")
            .agg(f.collect_set("resourceScore").alias("scores"))
            .select(
                "targetId",
                "diseaseId",
                calculate_harmonic_sum(f.col("scores")).alias("harmonicSum"),
            )
            .write.mode(session.write_mode)
            .parquet(direct_associations_output_path)
        )

        # Generate indirect assocations and save file
        (
            disease_target_evidence.join(disease_index, on="diseaseId", how="inner")
            .groupBy("targetId", "ancestorDiseaseId")
            .agg(f.collect_set("resourceScore").alias("scores"))
            .select(
                "targetId",
                "ancestorDiseaseId",
                calculate_harmonic_sum(f.col("scores")).alias("harmonicSum"),
            )
            .write.mode(session.write_mode)
            .parquet(indirect_associations_output_path)
        )


1. Define priors for causality:
   - priorc1: Prior for a variant being causal for trait 1.
   - priorc2: Prior for a variant being causal for trait 2.
   - priorc12: Prior for a variant being causal for both traits.

2. Validate priors:
   - Ensure priors are provided as floats.
   - Assign default values if any priors are missing (e.g., priorc1 = 1e-4).

3. Preprocess input data:
   - Fill null values in Bayes Factor (BF) columns with 0.
   - Sum log Bayes Factors (\(\text{logBF}\)) for overlapping signals (trait 1 + trait 2).

4. Group data by genomic regions:
   - Aggregate BF vectors for each pair of signals by study locus.

5. Compute log-sums for BFs:
   - Compute \(\text{logsum1}\): Log-sum of BFs for trait 1.
   - Compute \(\text{logsum2}\): Log-sum of BFs for trait 2.
   - Compute \(\text{logsum12}\): Log-sum of combined BFs for traits 1 and 2.

6. Add priors and calculate Bayes Factors (BF) for hypotheses:
   - **H0 (no association)**: \(\text{logBF} = 0\).
   - **H1 (trait 1 only)**: \(\text{logBF} = \log(\text{priorc1}) + \text{logsum1}\).
   - **H2 (trait 2 only)**: \(\text{logBF} = \log(\text{priorc2}) + \text{logsum2}\).
   - **H3 (both traits, different causal variants)**:
       - Compute \(\text{sumlogsum} = \text{logsum1} + \text{logsum2}\).
       - Compute \(\text{logdiff}\) to handle cases where \(\text{logsum12} \neq \text{sumlogsum}\):
         - \[
         \text{logdiff} = \text{max} + \log\left(e^{\text{logsum1} - \text{max}} - e^{\text{logsum12} - \text{max}}\right),
         \]
           where \(\text{max} = \max(\text{sumlogsum}, \text{logsum12})\).
       - \(\text{logBF} = \log(\text{priorc1}) + \log(\text{priorc2}) + \text{logdiff}\).
   - **H4 (both traits, shared causal variant)**: \(\text{logBF} = \log(\text{priorc12}) + \text{logsum12}\).

7. Compute posterior probabilities for \(H0\)–\(H4\):
   - Combine logBFs for all hypotheses into a vector:
     \[
     \text{logBFs} = [\text{lH0BF}, \text{lH1BF}, \text{lH2BF}, \text{lH3BF}, \text{lH4BF}].
     \]
   - Normalize to posterior probabilities:
     - Subtract the log-sum of the \(\text{logBFs}\) from each \(\text{logBF}\):
       \[
       \text{normalized logBF} = \text{logBF} - \log\left(\sum e^{\text{logBF}}\right).
       \]
     - Convert normalized logBFs to probabilities:
       \[
       P(H_i) = e^{\text{normalized logBF}}.
       \]

8. Output results:
   - Add posterior probabilities (\(H0\)–\(H4\)) to the dataset.
   - Annotate data with the colocalisation method used (e.g., "COLOC").
   - Include harmonized effect sizes for reported variants.

9. Return the final dataset with colocalisation annotations and probabilities for each hypothesis.
