In [1]:
from pyspark.sql import SparkSession, Window
import pyspark.sql.functions as F
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import date
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.types import (
    StructType,
    StructField,
    DoubleType,
    DecimalType,
    StringType,
    FloatType,
)


spark = SparkSession.builder.getOrCreate()

### checking 20.05.2024 unique list of category

In [2]:
target_path = "gs://open-targets-data-releases/24.06/output/etl/parquet/targets/"
target = spark.read.parquet(target_path)

                                                                                

In [3]:
target_path = "gs://open-targets-data-releases/24.06/output/etl/parquet/targets/"
target = spark.read.parquet(target_path)
go = spark.read.parquet("gs://open-targets-data-releases/24.06/output/etl/parquet/go")
diseases_all = spark.read.parquet(
    "gs://open-targets-data-releases/24.06/output/etl/parquet/diseases/"
)

                                                                                

### make dataframes 

In [None]:
#### third revision:
## include ensemblId on categoryId
## include SL terms

In [3]:
column_order = ["targetId", "categoryType", "categoryLabel", "categoryId"]

##approvedSymbol
approvedSymbol = (
    target.select(F.col("id").alias("targetId"), "approvedSymbol")
    .withColumn("categoryType", F.lit("approvedSymbol"))
    .withColumn("categoryId", F.col("targetId"))
    .withColumnRenamed("approvedSymbol", "categoryLabel")
    .distinct()
    .persist()
    .select(*column_order)
)
## go
go_terms = (
    target.select(F.col("id").alias("targetId"), F.explode_outer(F.col("go")))
    .select("targetId", "col.*")
    .join(go, on="id", how="left")
    .withColumn(
        "categoryType",
        F.when(F.col("aspect") == "F", F.concat(F.lit("GO:"), F.lit("MF")))
        .when(F.col("aspect") == "P", F.concat(F.lit("GO:"), F.lit("BP")))
        .when(F.col("aspect") == "C", F.concat(F.lit("GO:"), F.lit("CC"))),
    )
    .withColumn("categoryLabel", F.col("name"))
    .drop("name")
    .withColumn("categoryId", F.col("id"))
    .select("targetId", "categoryType", "categoryLabel", "categoryId")
    .distinct()
    .select(*column_order)
).persist()

## location
location = (
    target.select(
        F.col("id").alias("targetId"),
        F.transform(
            "subcellularLocations",
            lambda x: F.struct(
                x.location.alias("categoryLabel"),
                x.termSL.alias("categoryId"),
                x.source.alias("categoryType"),
            ),
        ).alias("subcellularLocation"),
    )
    .select("targetId", F.explode_outer(F.col("subcellularLocation")))
    .select("targetId", "col.*")
    .distinct()
    .select(*column_order)
).persist()

## targetClass
targetClass = (
    target.select(F.col("id").alias("targetId"), "targetClass")
    .filter(F.col("targetClass").isNotNull())
    .select("targetId", F.explode_outer(F.col("targetClass")))
    .select("targetId", "col.*")
    .withColumn("categoryType", F.lit("ChEMBL target"))
    .withColumn("categoryId", F.lit(None))
    .drop("id", "level")
    .withColumnRenamed("label", "categoryLabel")
    .distinct()
    .select(*column_order)
    .persist()
)
## pathways
pathways = (
    target.select(
        F.col("id").alias("targetId"),
        F.transform(
            "pathways",
            lambda x: F.struct(
                x.pathway.alias("categoryLabel"),
                x.pathwayId.alias("categoryId"),
                # x.topLevelTerm.alias("categoryType"),
            ),
        ).alias("more"),
    )
    .select("targetId", F.explode_outer(F.col("more")))
    .select("targetId", "col.*")
    .withColumn("categoryType", F.lit("Reactome"))
    .distinct()
    .select(*column_order)
    .persist()
)
## union of everything
union = (
    (approvedSymbol.union(go_terms).union(location).union(targetClass).union(pathways))
    .filter(F.col("targetId").isNotNull())
    .persist()
)

In [14]:
### write to json format:
union.coalesce(1).write.format("json").save("gs://ot-team/jroldan/facetsTarget.json")

                                                                                

## James suggestions: 
    - groupBy categoryType and categoryLabel and collect on list targetId (I included categoryId)
    - Remove orphaned targetId with any other field.

In [None]:
unionV2 = (
    union.filter(
        (F.col("categoryType").isNotNull())
        | (F.col("categoryLabel").isNotNull())
        | (F.col("categoryId").isNotNull())
    )
    .groupBy("categoryType", "categoryLabel")
    .agg(
        F.collect_set("targetId").alias("targetId"),
        F.collect_set("categoryId").alias("categoryId"),
    )
)

In [29]:
unionV2.coalesce(1).write.format("json").save(
    "gs://ot-team/jroldan/facetsTargetV2.json"
)

                                                                                

In [33]:
realNames.filter(F.col("category") == "approvedSymbol").show()

+--------------+---------+-----------------+-----------------+----+
|      category|    label|        entityIds|        facetsIds|size|
+--------------+---------+-----------------+-----------------+----+
|approvedSymbol|   ACOT11|[ENSG00000162390]|[ENSG00000162390]|   1|
|approvedSymbol|    ACSM1|[ENSG00000166743]|[ENSG00000166743]|   1|
|approvedSymbol| AGPAT5P1|[ENSG00000234652]|[ENSG00000234652]|   1|
|approvedSymbol|   AIDAP2|[ENSG00000251429]|[ENSG00000251429]|   1|
|approvedSymbol|  ANKRD55|[ENSG00000164512]|[ENSG00000164512]|   1|
|approvedSymbol|  APCDD1L|[ENSG00000198768]|[ENSG00000198768]|   1|
|approvedSymbol|ATP5F1AP3|[ENSG00000263232]|[ENSG00000263232]|   1|
|approvedSymbol|  AURKAP1|[ENSG00000213033]|[ENSG00000213033]|   1|
|approvedSymbol|   BDKRB2|[ENSG00000168398]|[ENSG00000168398]|   1|
|approvedSymbol|    BEND4|[ENSG00000188848]|[ENSG00000188848]|   1|
|approvedSymbol|  BTF3P14|[ENSG00000262609]|[ENSG00000262609]|   1|
|approvedSymbol| C15orf39|[ENSG00000167173]|[ENS

In [34]:
realNames.filter(F.col("label") == "SUGT1P3").show()

+--------------+-------+--------------------+--------------------+----+
|      category|  label|           entityIds|           facetsIds|size|
+--------------+-------+--------------------+--------------------+----+
|approvedSymbol|SUGT1P3|[ENSG00000290464,...|[ENSG00000290464,...|   2|
+--------------+-------+--------------------+--------------------+----+



In [37]:
realNames.filter(F.col("size") > 1).groupBy("size", "category").count().show()

+----+--------------+-----+
|size|      category|count|
+----+--------------+-----+
|   2|approvedSymbol|  452|
|   2|      Reactome|   13|
|   7|approvedSymbol|    6|
|   3|approvedSymbol|    7|
|  50|approvedSymbol|    1|
|   8|approvedSymbol|    1|
| 170|approvedSymbol|    1|
|   4|approvedSymbol|    5|
|  22|approvedSymbol|    1|
| 756|approvedSymbol|    1|
|   6|approvedSymbol|    1|
|   9|approvedSymbol|    1|
|  33|approvedSymbol|    1|
|  19|approvedSymbol|    1|
|  27|approvedSymbol|    1|
+----+--------------+-----+



### 12.04.2024
    David wants to change: 
        * when catetory == approvedSymbol all datasourceId will be null . If the problem is limited to approvedSymbol this should flatten the field
        * so facetIds (Array) becomes datasourceId (nullable String)
        * the logic is that ENSGXX is not really the datasourceId for approvedSymbol if the same category can have multiple ENSGXXX


#### "targetIds" -> "entityIds"
#### "categoryType" -> "category"
#### "categoryLabel" -> "label"
#### "categoryIds" -> "facetIds"

In [4]:
### 2nd version:
### changes:
# when datasource == approvedsymbol, facetsId == null

column_order = ["targetId", "categoryType", "categoryLabel", "categoryId"]

##approvedSymbol
approvedSymbol = (
    target.select(F.col("id").alias("targetId"), "approvedSymbol")
    .withColumn("categoryType", F.lit("approvedSymbol"))
    .withColumn("categoryId", F.lit(None))  ### changed it 12.04.2024
    .withColumnRenamed("approvedSymbol", "categoryLabel")
    .distinct()
    .persist()
    .select(*column_order)
)
## go
go_terms = (
    target.select(F.col("id").alias("targetId"), F.explode_outer(F.col("go")))
    .select("targetId", "col.*")
    .join(go, on="id", how="left")
    .withColumn(
        "categoryType",
        F.when(F.col("aspect") == "F", F.concat(F.lit("GO:"), F.lit("MF")))
        .when(F.col("aspect") == "P", F.concat(F.lit("GO:"), F.lit("BP")))
        .when(F.col("aspect") == "C", F.concat(F.lit("GO:"), F.lit("CC"))),
    )
    .withColumn("categoryLabel", F.col("name"))
    .drop("name")
    .withColumn("categoryId", F.col("id"))
    .select("targetId", "categoryType", "categoryLabel", "categoryId")
    .distinct()
    .select(*column_order)
).persist()

## location
location = (
    target.select(
        F.col("id").alias("targetId"),
        F.transform(
            "subcellularLocations",
            lambda x: F.struct(
                x.location.alias("categoryLabel"),
                x.termSL.alias("categoryId"),
                x.source.alias("categoryType"),
            ),
        ).alias("subcellularLocation"),
    )
    .select("targetId", F.explode_outer(F.col("subcellularLocation")))
    .select("targetId", "col.*")
    .distinct()
    .select(*column_order)
).persist()

## targetClass
targetClass = (
    target.select(F.col("id").alias("targetId"), "targetClass")
    .filter(F.col("targetClass").isNotNull())
    .select("targetId", F.explode_outer(F.col("targetClass")))
    .select("targetId", "col.*")
    .withColumn("categoryType", F.lit("ChEMBL target"))
    .withColumn("categoryId", F.lit(None))
    .drop("id", "level")
    .withColumnRenamed("label", "categoryLabel")
    .distinct()
    .select(*column_order)
    .persist()
)
## pathways
pathways = (
    target.select(
        F.col("id").alias("targetId"),
        F.transform(
            "pathways",
            lambda x: F.struct(
                x.pathway.alias("categoryLabel"),
                x.pathwayId.alias("categoryId"),
                # x.topLevelTerm.alias("categoryType"),
            ),
        ).alias("more"),
    )
    .select("targetId", F.explode_outer(F.col("more")))
    .select("targetId", "col.*")
    .withColumn("categoryType", F.lit("Reactome"))
    .distinct()
    .select(*column_order)
    .persist()
)
## union of everything
union = (
    (approvedSymbol.union(go_terms).union(location).union(targetClass).union(pathways))
    .filter(F.col("targetId").isNotNull())
    .persist()
)

### with used names:

unionNew = union.selectExpr(
    "categoryType as category",
    "categoryLabel as label",
    "targetId as entityIds",
    "categoryId as datasourceId",
)

24/05/21 13:09:06 WARN CacheManager: Asked to cache already cached data.
24/05/21 13:09:06 WARN CacheManager: Asked to cache already cached data.
24/05/21 13:09:06 WARN CacheManager: Asked to cache already cached data.
24/05/21 13:09:06 WARN CacheManager: Asked to cache already cached data.


In [6]:
spark.read.parquet(
    "gs://open-targets-pre-data-releases/jhpis/output/etl/parquet/facetSearchTarget"
).groupBy("category").count().show(200, truncate=False)



+-----------------------------+-----+
|category                     |count|
+-----------------------------+-----+
|GO:CC                        |1823 |
|GO:MF                        |4673 |
|Subcellular Location         |817  |
|Tractability Antibody        |9    |
|GO:BP                        |12232|
|Approved Name                |47927|
|Approved Symbol              |61619|
|ChEMBL Target Class          |720  |
|Reactome                     |2140 |
|Target ID                    |63226|
|Tractability Small Molecule  |8    |
|Tractability PROTAC          |7    |
|Tractability Other Modalities|3    |
+-----------------------------+-----+



                                                                                

In [7]:
spark.read.parquet(
    "gs://open-targets-pre-data-releases/jhpis/output/etl/parquet/facetSearchDisease"
).select("category").distinct().show(200, truncate=False)



+----------------+
|category        |
+----------------+
|Disease         |
|Therapeutic Area|
+----------------+



                                                                                

In [6]:
unionNew.filter(
    (F.col("datasourceId").isNotNull()) & (F.col("category") == "Reactome")
).sort(F.col("entityIds").desc()).show()



+--------+--------------------+---------------+-------------+
|category|               label|      entityIds| datasourceId|
+--------+--------------------+---------------+-------------+
|Reactome|Detoxification of...|ENSG00000291237|R-HSA-3299685|
|Reactome|Transcriptional a...|ENSG00000291237|R-HSA-2151201|
|Reactome|FOXO-mediated tra...|ENSG00000291237|R-HSA-9615017|
|Reactome|Gene and protein ...|ENSG00000291237|R-HSA-8950505|
|Reactome|Deregulated CDK5 ...|ENSG00000291237|R-HSA-8862803|
|Reactome|Loss of proteins ...|ENSG00000290263| R-HSA-380284|
|Reactome|Regulation of PLK...|ENSG00000290263|R-HSA-2565942|
|Reactome|Recruitment of mi...|ENSG00000290263| R-HSA-380270|
|Reactome|Recruitment of Nu...|ENSG00000290263| R-HSA-380320|
|Reactome|Loss of Nlp from ...|ENSG00000290263| R-HSA-380259|
|Reactome|Anchoring of the ...|ENSG00000290263|R-HSA-5620912|
|Reactome|AURKA Activation ...|ENSG00000290263|R-HSA-8854518|
|Reactome|Detoxification of...|ENSG00000290203|R-HSA-3299685|
|Reactom

                                                                                

In [51]:
unionNew.coalesce(1).write.format("json").save(
    "gs://ot-team/jroldan/20240412_facetsTarget.json"
)

                                                                                

#### 15.04.2024
#### I need to groupby the file of facet targets by 

''' 
Good Morning Juan! I hope you had a nice weekend. When you get a moment, please could you update the target facet dataset in the same way that you did for diseases so that it also adheres to the same schema:
Schema
 |-- category: string (nullable = true)
 |-- label: string (nullable = true)
 |-- entityIds: Array
 |-- datasourceId: string (nullable = true)
I will make these changes in the api and then rebuild the data backend. Thank you very much!

thanks Juan, no rush. I'm happy to have a call if you want to clarify anything. As the grouping is done on label + category there will be many target (entity) ids belonging to that group so we have the option to either express this as an array of ids or explode across many rows (duplicating label + category). We need to do the former i.e. label + category are uniquely represented and each label + category  record has an array of target ids (entityIds). This means that we can assign an identifier to the label + category group and use that identifier as a lookup to resolve down to target ids.
'''

In [5]:
unionNew2 = unionNew.groupBy("category", "label", "datasourceId").agg(
    F.collect_set("entityIds").alias("entityIds")
)

#### Checking 20.05.2024

In [6]:
unionNew2.show()



+-------------+--------------------+------------+--------------------+
|     category|               label|datasourceId|           entityIds|
+-------------+--------------------+------------+--------------------+
|ChEMBL target|AGC protein kinas...|        null|   [ENSG00000122966]|
|ChEMBL target|AGC protein kinas...|        null|[ENSG00000138669,...|
|ChEMBL target|Atypical protein ...|        null|[ENSG00000119487,...|
|ChEMBL target|Atypical protein ...|        null|[ENSG00000124784,...|
|ChEMBL target|Atypical protein ...|        null|   [ENSG00000103319]|
|ChEMBL target|Atypical protein ...|        null|[ENSG00000119487,...|
|ChEMBL target|CAMK protein kina...|        null|   [ENSG00000130413]|
|ChEMBL target|CK1 protein kinas...|        null|[ENSG00000133275,...|
|ChEMBL target|CMGC protein kina...|        null|[ENSG00000112144,...|
|ChEMBL target|CMGC protein kina...|        null|[ENSG00000102225,...|
|ChEMBL target|CXC chemokine rec...|        null|[ENSG00000180871,...|
|ChEMB

                                                                                

In [19]:
unionNew2.show()



+-------------+--------------------+------------+--------------------+
|     category|               label|datasourceId|           entityIds|
+-------------+--------------------+------------+--------------------+
|ChEMBL target|AGC protein kinas...|        null|   [ENSG00000122966]|
|ChEMBL target|AGC protein kinas...|        null|[ENSG00000138669,...|
|ChEMBL target|Atypical protein ...|        null|[ENSG00000119487,...|
|ChEMBL target|Atypical protein ...|        null|[ENSG00000124784,...|
|ChEMBL target|Atypical protein ...|        null|   [ENSG00000103319]|
|ChEMBL target|Atypical protein ...|        null|[ENSG00000119487,...|
|ChEMBL target|CAMK protein kina...|        null|   [ENSG00000130413]|
|ChEMBL target|CK1 protein kinas...|        null|[ENSG00000133275,...|
|ChEMBL target|CMGC protein kina...|        null|[ENSG00000112144,...|
|ChEMBL target|CMGC protein kina...|        null|[ENSG00000102225,...|
|ChEMBL target|CXC chemokine rec...|        null|[ENSG00000180871,...|
|ChEMB

                                                                                

In [11]:
#### checking on 20.05.2024:
unionNew2.filter(F.col("category").isNotNull()).groupBy("category").count().show(
    20, truncate=False
)



+--------------------------+-----+
|category                  |count|
+--------------------------+-----+
|GO:CC                     |1823 |
|ChEMBL target             |720  |
|GO:MF                     |4673 |
|HPA_main                  |34   |
|GO:BP                     |12232|
|Reactome                  |2141 |
|uniprot                   |785  |
|approvedSymbol            |61619|
|HPA_additional            |35   |
|HPA_extracellular_location|1    |
+--------------------------+-----+



                                                                                

In [20]:
unionNew2.filter(F.col("datasourceId").isNotNull()).groupBy("category").count().show(
    20, truncate=False
)



+--------------+-----+
|category      |count|
+--------------+-----+
|GO:CC         |1823 |
|GO:MF         |4673 |
|HPA_main      |31   |
|GO:BP         |12232|
|Reactome      |2140 |
|uniprot       |785  |
|HPA_additional|32   |
+--------------+-----+



                                                                                

#### Checking 20.05.2024

In [18]:
unionNew2.coalesce(1).write.format("json").save(
    "gs://ot-team/jroldan/20240415_facetsTarget.json"
)

                                                                                

### Diseases

In [12]:
diseases = (
    diseases_all.selectExpr("id", "name as categoryLabel")
    .withColumn("diseaseId", F.array(F.col("id")))
    .withColumn("categoryType", F.lit("disease"))
    .select("diseaseId", "categoryType", "categoryLabel")
    .withColumn("categoryId", F.col("diseaseId"))
    .drop("id")
)

### Therapeutic Areas

In [13]:
therapeuticAreas = (
    diseases_all.withColumn("therapyAreas", F.explode_outer("therapeuticAreas"))
    .groupBy("therapyAreas")
    .agg(F.collect_set("id").alias("diseaseId"))
    .join(
        diseases_all.select("id", "name"),
        F.col("therapyAreas") == diseases_all.id,
        "left",
    )
    .withColumn("categoryId", F.array(F.col("id")))
    .drop("id")
    .withColumnRenamed("name", "categoryLabel")
    .withColumn("categoryType", F.lit("therapeutic area"))
    .drop("therapyAreas")
    .select(*diseases.columns)
)

### write disease/therapeutic areas facets

In [14]:
diseasesFacets = therapeuticAreas.union(diseases).persist()

In [None]:
diseasesFacets.coalesce(1).write.format("json").save(
    "gs://ot-team/jroldan/facetsDiseases.json"
)

In [None]:
diseasesFacets.selectExpr(
    "diseaseId as entityIds",
    "categoryType as category",
    "categoryLabel as label",
    "categoryId as datasourceId",
)

#### Changes on 12.04.2024
## everything in the same schema as the target facets 
##### "diseaseId" -> "entityIds"
##### "categoryType" -> "category"
##### "categoryLabel" -> "label"
##### "categoryId" -> "datasourceId"

In [67]:
diseasesFacets.withColumn("diseaseSize", F.size("diseaseId")).withColumn(
    "categoryIdSize", F.size("categoryId")
).groupBy("categoryIdSize").count().sort(F.col("categoryIdSize")).show()

+--------------+-----+
|categoryIdSize|count|
+--------------+-----+
|             1|25843|
+--------------+-----+



In [15]:
diseases = (
    diseases_all.selectExpr("id", "name as categoryLabel")
    .withColumn("diseaseId", F.col("id"))  ## change as null instead of array of same Id
    .withColumn("categoryType", F.lit("disease"))
    .select("diseaseId", "categoryType", "categoryLabel")
    .withColumn("categoryId", F.col("diseaseId"))
    .drop("id")
)

therapeuticAreas = (
    diseases_all.withColumn("therapyAreas", F.explode_outer("therapeuticAreas"))
    .selectExpr("therapyAreas as categoryId", "id as diseaseId")
    .join(
        diseases_all.select("id", "name"),
        F.col("categoryId") == diseases_all.id,
        "left",
    )
    .drop("id")
    .withColumnRenamed("name", "categoryLabel")
    .withColumn("categoryType", F.lit("therapeutic area"))
    .select(*diseases.columns)
)

diseasesFacets = (
    therapeuticAreas.union(diseases)
    .selectExpr(
        "diseaseId as entityIds",
        "categoryType as category",
        "categoryLabel as label",
        "categoryId as datasourceId",
    )
    .persist()
)

24/05/20 15:53:55 WARN CacheManager: Asked to cache already cached data.


In [103]:
diseasesFacets.coalesce(1).write.format("json").save(
    "gs://ot-team/jroldan/20240412_facetsDiseases.json"
)

                                                                                

#### New iteration with James: 
    #### entityIds should not be exploded across rows, they should be arrays, where the grouping is on category + label. 


In [16]:
diseasesFacetsV2 = diseasesFacets.groupBy("category", "label", "datasourceId").agg(
    F.collect_set(F.col("entityIds")).alias("entityIds")
)

In [119]:
diseasesFacetsV2.printSchema()

root
 |-- category: string (nullable = false)
 |-- label: string (nullable = true)
 |-- datasourceId: string (nullable = true)
 |-- entityIds: array (nullable = false)
 |    |-- element: string (containsNull = false)



In [118]:
diseasesFacetsV2.coalesce(1).write.format("json").save(
    "gs://ot-team/jroldan/20240412_facetsDiseasesV2.json"
)

                                                                                

#### checking on 20.05.2024

In [18]:
diseasesFacetsV2.groupBy("category").count().show()



+----------------+-----+
|        category|count|
+----------------+-----+
|therapeutic area|   26|
|         disease|25817|
+----------------+-----+



                                                                                

#### 15.07.2024
#### create the dataset for tissue specificity and distribution

In [None]:
### 2nd version:
### changes:
# when datasource == approvedsymbol, facetsId == null

column_order = ["targetId", "categoryType", "categoryLabel", "categoryId"]

##approvedSymbol
approvedSymbol = (
    target.select(F.col("id").alias("targetId"), "approvedSymbol")
    .withColumn("categoryType", F.lit("approvedSymbol"))
    .withColumn("categoryId", F.lit(None))  ### changed it 12.04.2024
    .withColumnRenamed("approvedSymbol", "categoryLabel")
    .distinct()
    .persist()
    .select(*column_order)
)
## go
go_terms = (
    target.select(F.col("id").alias("targetId"), F.explode_outer(F.col("go")))
    .select("targetId", "col.*")
    .join(go, on="id", how="left")
    .withColumn(
        "categoryType",
        F.when(F.col("aspect") == "F", F.concat(F.lit("GO:"), F.lit("MF")))
        .when(F.col("aspect") == "P", F.concat(F.lit("GO:"), F.lit("BP")))
        .when(F.col("aspect") == "C", F.concat(F.lit("GO:"), F.lit("CC"))),
    )
    .withColumn("categoryLabel", F.col("name"))
    .drop("name")
    .withColumn("categoryId", F.col("id"))
    .select("targetId", "categoryType", "categoryLabel", "categoryId")
    .distinct()
    .select(*column_order)
).persist()

## location
location = (
    target.select(
        F.col("id").alias("targetId"),
        F.transform(
            "subcellularLocations",
            lambda x: F.struct(
                x.location.alias("categoryLabel"),
                x.termSL.alias("categoryId"),
                x.source.alias("categoryType"),
            ),
        ).alias("subcellularLocation"),
    )
    .select("targetId", F.explode_outer(F.col("subcellularLocation")))
    .select("targetId", "col.*")
    .distinct()
    .select(*column_order)
).persist()

## targetClass
targetClass = (
    target.select(F.col("id").alias("targetId"), "targetClass")
    .filter(F.col("targetClass").isNotNull())
    .select("targetId", F.explode_outer(F.col("targetClass")))
    .select("targetId", "col.*")
    .withColumn("categoryType", F.lit("ChEMBL target"))
    .withColumn("categoryId", F.lit(None))
    .drop("id", "level")
    .withColumnRenamed("label", "categoryLabel")
    .distinct()
    .select(*column_order)
    .persist()
)
## pathways
pathways = (
    target.select(
        F.col("id").alias("targetId"),
        F.transform(
            "pathways",
            lambda x: F.struct(
                x.pathway.alias("categoryLabel"),
                x.pathwayId.alias("categoryId"),
                # x.topLevelTerm.alias("categoryType"),
            ),
        ).alias("more"),
    )
    .select("targetId", F.explode_outer(F.col("more")))
    .select("targetId", "col.*")
    .withColumn("categoryType", F.lit("Reactome"))
    .distinct()
    .select(*column_order)
    .persist()
)
## union of everything
union = (
    (approvedSymbol.union(go_terms).union(location).union(targetClass).union(pathways))
    .filter(F.col("targetId").isNotNull())
    .persist()
)

### with used names:

unionNew = union.selectExpr(
    "categoryType as category",
    "categoryLabel as label",
    "targetId as entityIds",
    "categoryId as datasourceId",
)

In [6]:
queryset = target.select("id").withColumnRenamed("id", "targetid")
hpa_data = "gs://ot-team/jroldan/hpa_data2/proteinatlas_file.json"

In [12]:
# Install gcsfs if not already installed
import sys
!{sys.executable} -m pip install gcsfs

import pandas as pd
import json
import gcsfs


def tissue_specific(hpa_data, queryset):
    cols_of_interest = [
        "Ensembl",
        "RNA tissue distribution",
        "RNA tissue specificity",
        "Antibody",
    ]

    fs = gcsfs.GCSFileSystem()
    with fs.open(hpa_data, "r") as f:
        data = json.load(f)  # Assuming the JSON file contains a single JSON object

    df = pd.DataFrame(data).filter(items=cols_of_interest)
    hpa_df = spark.createDataFrame(df)

    return hpa_df


testing = tissue_specific(hpa_data, queryset)

Defaulting to user installation because normal site-packages is not writeable


In [16]:
column_order = ["targetId", "categoryType", "categoryLabel", "categoryId"]

##approvedSymbol
approvedSymbol = (
    target.select(F.col("id").alias("targetId"), "approvedSymbol")
    .withColumn("categoryType", F.lit("approvedSymbol"))
    .withColumn("categoryId", F.lit(None))  ### changed it 12.04.2024
    .withColumnRenamed("approvedSymbol", "categoryLabel")
    .distinct()
    .persist()
    .select(*column_order)
)

24/07/15 20:54:02 WARN CacheManager: Asked to cache already cached data.


In [32]:
tissueSpecificity = (
    testing.withColumnRenamed("Ensembl", "targetId")
    .withColumn("categoryType", F.lit("tissueSpecificity"))
    .withColumn("categoryLabel", F.lit(F.col("RNA tissue specificity")))
    .withColumn("categoryId", F.lit(None))
    .select(*column_order)
)
tissueDistribution = (
    testing.withColumnRenamed("Ensembl", "targetId")
    .withColumn("categoryType", F.lit("tissueDistribution"))
    .withColumn("categoryLabel", F.lit(F.col("RNA tissue distribution")))
    .withColumn("categoryId", F.lit(None))
    .select(*column_order)
)

In [None]:
### testing gist to James:

In [2]:
### load datasets

target_path = "gs://open-targets-data-releases/24.03/output/etl/parquet/targets/"
target = spark.read.parquet(target_path)
go = spark.read.parquet("gs://open-targets-data-releases/24.03/output/etl/parquet/go")
diseases_all = spark.read.parquet(
    "gs://open-targets-data-releases/24.03/output/etl/parquet/diseases/"
)

#### Target Facets

column_order = ["targetId", "categoryType", "categoryLabel", "categoryId"]

##approvedSymbol
approvedSymbol = (
    target.select(F.col("id").alias("targetId"), "approvedSymbol")
    .withColumn("categoryType", F.lit("approvedSymbol"))
    .withColumn("categoryId", F.lit(None))  ### changed it 12.04.2024
    .withColumnRenamed("approvedSymbol", "categoryLabel")
    .distinct()
    .persist()
    .select(*column_order)
)
## go
go_terms = (
    target.select(F.col("id").alias("targetId"), F.explode_outer(F.col("go")))
    .select("targetId", "col.*")
    .join(go, on="id", how="left")
    .withColumn(
        "categoryType",
        F.when(F.col("aspect") == "F", F.concat(F.lit("GO:"), F.lit("MF")))
        .when(F.col("aspect") == "P", F.concat(F.lit("GO:"), F.lit("BP")))
        .when(F.col("aspect") == "C", F.concat(F.lit("GO:"), F.lit("CC"))),
    )
    .withColumn("categoryLabel", F.col("name"))
    .drop("name")
    .withColumn("categoryId", F.col("id"))
    .select("targetId", "categoryType", "categoryLabel", "categoryId")
    .distinct()
    .select(*column_order)
).persist()

## location
location = (
    target.select(
        F.col("id").alias("targetId"),
        F.transform(
            "subcellularLocations",
            lambda x: F.struct(
                x.location.alias("categoryLabel"),
                x.termSL.alias("categoryId"),
                x.source.alias("categoryType"),
            ),
        ).alias("subcellularLocation"),
    )
    .select("targetId", F.explode_outer(F.col("subcellularLocation")))
    .select("targetId", "col.*")
    .distinct()
    .select(*column_order)
).persist()

## targetClass
targetClass = (
    target.select(F.col("id").alias("targetId"), "targetClass")
    .filter(F.col("targetClass").isNotNull())
    .select("targetId", F.explode_outer(F.col("targetClass")))
    .select("targetId", "col.*")
    .withColumn("categoryType", F.lit("ChEMBL target"))
    .withColumn("categoryId", F.lit(None))
    .drop("id", "level")
    .withColumnRenamed("label", "categoryLabel")
    .distinct()
    .select(*column_order)
    .persist()
)
## pathways
pathways = (
    target.select(
        F.col("id").alias("targetId"),
        F.transform(
            "pathways",
            lambda x: F.struct(
                x.pathway.alias("categoryLabel"),
                x.pathwayId.alias("categoryId"),
                # x.topLevelTerm.alias("categoryType"),
            ),
        ).alias("more"),
    )
    .select("targetId", F.explode_outer(F.col("more")))
    .select("targetId", "col.*")
    .withColumn("categoryType", F.lit("Reactome"))
    .distinct()
    .select(*column_order)
    .persist()
)
## union of everything
union = (
    (approvedSymbol.union(go_terms).union(location).union(targetClass).union(pathways))
    .filter(F.col("targetId").isNotNull())
    .persist()
)

### with good names and format:

targetFacets = (
    union.selectExpr(
        "categoryType as category",
        "categoryLabel as label",
        "targetId as entityIds",
        "categoryId as datasourceId",
    )
    .groupBy("category", "label", "datasourceId")
    .agg(F.collect_set("entityIds").alias("entityIds"))
)

## Disease & Ther Area facets

diseases = (
    diseases_all.selectExpr("id", "name as categoryLabel")
    .withColumn("diseaseId", F.col("id"))  ## change as null instead of array of same Id
    .withColumn("categoryType", F.lit("disease"))
    .select("diseaseId", "categoryType", "categoryLabel")
    .withColumn("categoryId", F.col("diseaseId"))
    .drop("id")
)

therapeuticAreas = (
    diseases_all.withColumn("therapyAreas", F.explode_outer("therapeuticAreas"))
    .selectExpr("therapyAreas as categoryId", "id as diseaseId")
    .join(
        diseases_all.select("id", "name"),
        F.col("categoryId") == diseases_all.id,
        "left",
    )
    .drop("id")
    .withColumnRenamed("name", "categoryLabel")
    .withColumn("categoryType", F.lit("therapeutic area"))
    .select(*diseases.columns)
)

diseasesFacets = (
    therapeuticAreas.union(diseases)
    .selectExpr(
        "diseaseId as entityIds",
        "categoryType as category",
        "categoryLabel as label",
        "categoryId as datasourceId",
    )
    .groupBy("category", "label", "datasourceId")
    .agg(F.collect_set(F.col("entityIds")).alias("entityIds"))
    .persist()
)

24/04/19 07:28:57 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/04/19 07:29:12 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/04/19 07:29:27 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/04/19 07:29:42 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/04/19 07:29:57 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/04/19 07:30:12 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registere

#### preparing second gist for James 
#### 15.07.2024

In [38]:
column_order = ["targetId", "categoryType", "categoryLabel", "categoryId"]

tissueSpecificity = (
    testing.withColumnRenamed("Ensembl", "targetId")
    .withColumn("categoryType", F.lit("tissueSpecificity"))
    .withColumn("categoryLabel", F.lit(F.col("RNA tissue specificity")))
    .withColumn("categoryId", F.lit(None))
    .select(*column_order)
)
tissueDistribution = (
    testing.withColumnRenamed("Ensembl", "targetId")
    .withColumn("categoryType", F.lit("tissueDistribution"))
    .withColumn("categoryLabel", F.lit(F.col("RNA tissue distribution")))
    .withColumn("categoryId", F.lit(None))
    .select(*column_order)
)

## union of everything
unionDistrSpecif = (
    (tissueSpecificity.union(tissueDistribution))
    .filter(F.col("targetId").isNotNull())
    .persist()
)

### with good names and format:
targetFacetsDistrSpecif = (
    unionDistrSpecif.selectExpr(
        "categoryType as category",
        "categoryLabel as label",
        "targetId as entityIds",
        "categoryId as datasourceId",
    )
    .groupBy("category", "label", "datasourceId")
    .agg(F.collect_set("entityIds").alias("entityIds"))
    .withColumn("datasourceId", F.lit("HPA"))
)

24/07/15 21:14:57 WARN CacheManager: Asked to cache already cached data.


In [41]:
### write to json format:
targetFacetsDistrSpecif.coalesce(1).write.format("json").save(
    "gs://ot-team/jroldan/hpa_data2/facetsDistrSpecif.json"
)

                                                                                