**Open Targets
Software Developer Technical Test**

This is the solution for the technical test. The solution is composed by two parts. The following part is to download the evidence and target files to our file system through FTP.

*First Part*

In [2]:
import fnmatch
from ftplib import FTP


Let's declare the remote directory for the FTP server

In [106]:
dir='/pub/databases/opentargets/platform/21.11/output/etl/json/evidence/sourceId=eva/'

Create the connection with the ftop.ebi.ac.uk server and create the login object

In [3]:
ftp = FTP('ftp.ebi.ac.uk')
ftp.login()
#change directory on FTP server.
ftp.cwd(dir)

'250 Directory successfully changed.'

It's needed to create an empty directory in our file system to allocate the evidence files from the FTP server

In [4]:
mkdir evidence_files

We should define a function to get and download the files from the FTP server using the login object. The purpose of the following function is to download the files in a given directory

In [5]:
def get_and_download_files(directory):
  # Get all files
  files = ftp.nlst()

  # Download files
  for file in files:
      if fnmatch.fnmatch(file, '*.json'):   #To download specific files.
          try:
              ftp.retrbinary("RETR " + file ,open(directory + file, 'wb').write)
          except EOFError:    # To avoid EOF errors.
              pass


We need to declare the evidence files folder 

In [6]:
local_evidence_folder = "/content/evidence_files/"

We proceed to call the function previously declared and download the files

In [7]:
get_and_download_files(local_evidence_folder)

We proceed to create the target files folder in our file system

In [8]:
mkdir target_files

In [9]:
dir_targets='/pub/databases/opentargets/platform/21.11/output/etl/json/targets/'

In [10]:
# change directory to target folder
ftp.cwd(dir_targets)

'250 Directory successfully changed.'

In [11]:
local_target_folder = "/content/target_files/"

Now, we should proceed to download the target files in our new directory

In [12]:
get_and_download_files(local_target_folder)

In [13]:
# close ftp connection
ftp.close()

***Second part***

In this part, we will proceed to make use of PySpark to process the downloaded files. First, we declare some relevant libraries to get the names of each file

In [30]:
import glob, os, json
import sys
evidence_json_folder = 'evidence_files/'
target_json_folder = 'target_files/'
json_pattern = os.path.join(evidence_json_folder, '*.json')
file_list = glob.glob(json_pattern)
json_pattern_target = os.path.join(target_json_folder, '*.json')
file_list_target = glob.glob(json_pattern_target)


We need to download pyspark in our local environment so that we install it by using pip. Please check https://spark.apache.org/docs/latest/api/python/user_guide/python_packaging.html for the documentation of PySpark Installation 

In [15]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m18.2 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=52220763630deb7637fb8d189e282855997b68eb9394deb8c98237d907637aad
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

Let's declare the pyspark libraries we need for our development

In [27]:
# Import SparkSession
from pyspark.sql import SparkSession
# Import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,BooleanType,DoubleType
#GroupBy on multiple columns
import pyspark.sql.functions as func 
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, rank



In order to make use of PySpark, first we need to create a Spark Session 

In [19]:
%%time
# Create SparkSession 
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("OpenTargetsApplication") \
      .getOrCreate() 
      
df = spark.read.json(file_list)
df.show() 

+-------------------+--------------------+---------------------+--------------------+--------------------+------------+-------------------+--------------------+-------------------+-------------------------+---------------+--------------------+--------------------+-----+------------+------------------+---------------+------------------------------+----------------+------------+
|      alleleOrigins| allelicRequirements|clinicalSignificances|    cohortPhenotypes|          confidence|datasourceId|         datatypeId|   diseaseFromSource|diseaseFromSourceId|diseaseFromSourceMappedId|      diseaseId|                  id|          literature|score|     studyId|targetFromSourceId|       targetId|variantFunctionalConsequenceId|       variantId| variantRsId|
+-------------------+--------------------+---------------------+--------------------+--------------------+------------+-------------------+--------------------+-------------------+-------------------------+---------------+------------------

Create a schema object by specifiyng the fields which we need from the evidence files.

In [64]:
%%time
schema = StructType([
      StructField("targetId",StringType(),False),
      StructField("diseaseId",StringType(),False),
      StructField("score",DoubleType(),False),
])

df_evidence_original = spark.read.schema(schema) \
        .json(file_list)
df_evidence_original.printSchema()
df_evidence_original.show()



root
 |-- targetId: string (nullable = true)
 |-- diseaseId: string (nullable = true)
 |-- score: double (nullable = true)

+---------------+---------------+-----+
|       targetId|      diseaseId|score|
+---------------+---------------+-----+
|ENSG00000215203|    EFO_0001063| 0.32|
|ENSG00000168288| Orphanet_79283| 0.32|
|ENSG00000147862|Orphanet_397612| 0.95|
|ENSG00000106571|   Orphanet_380| 0.02|
|ENSG00000116745|    Orphanet_65| 0.02|
|ENSG00000117118|    EFO_0000239| 0.02|
|ENSG00000269547|Orphanet_363694| 0.32|
|ENSG00000081479|  Orphanet_2143| 0.32|
|ENSG00000225830|    EFO_0001365| 0.02|
|ENSG00000127415|   Orphanet_579| 0.32|
|ENSG00000100697|Orphanet_140162| 0.02|
|ENSG00000162337|  Orphanet_2783|  0.9|
|ENSG00000109132| Orphanet_99803| 0.02|
|ENSG00000151067|     HP_0001657| 0.32|
|ENSG00000081237| Orphanet_35078| 0.02|
|ENSG00000081248|   Orphanet_681| 0.02|
|ENSG00000111799| Orphanet_75840| 0.32|
|ENSG00000159082|  Orphanet_1934| 0.92|
|ENSG00000012048|   Orphanet_145| 0.

We proceed to get the median based on targetId and diseaseId fields

In [71]:
%%time
df_evidence_median = df_evidence_original.groupBy("targetId","diseaseId") \
.agg(func.percentile_approx("score", 0.5) \
.alias("median"))

CPU times: user 7.45 ms, sys: 0 ns, total: 7.45 ms
Wall time: 45.8 ms


In [72]:
df_evidence_median.show()


+---------------+---------------+------+
|       targetId|      diseaseId|median|
+---------------+---------------+------+
|ENSG00000000419|    EFO_0003847|   0.0|
|ENSG00000000419|     HP_0001249|   0.0|
|ENSG00000000419|   Orphanet_137|  0.32|
|ENSG00000000971|    EFO_0001365|  0.32|
|ENSG00000000971|    EFO_0003884|  0.72|
|ENSG00000000971|    EFO_1001155|  0.32|
|ENSG00000000971|Orphanet_200421|  0.32|
|ENSG00000000971|  Orphanet_2134|   0.3|
|ENSG00000000971|Orphanet_329918|  0.32|
|ENSG00000000971| Orphanet_54370|  0.32|
|ENSG00000000971| Orphanet_93571|  0.32|
|ENSG00000000971| Orphanet_93579|  0.32|
|ENSG00000001036|    EFO_0004207|   0.3|
|ENSG00000001084| Orphanet_33574|  0.32|
|ENSG00000001461|    EFO_0010642|  0.32|
|ENSG00000001497|    EFO_0000508|  0.32|
|ENSG00000001497|    EFO_0003847|   0.0|
|ENSG00000001497|    EFO_0010642|  0.32|
|ENSG00000001497|     HP_0001249|   0.0|
|ENSG00000001497|     HP_0001263|   0.7|
+---------------+---------------+------+
only showing top

Let's create an extra field named "rank" which contains the position of the score based on targetId and diseaseId. Then, we select the top 3

In [66]:
%%time
window = Window.partitionBy("targetId", "diseaseId")\
.orderBy(df_evidence_original['score'].desc())
## select top 3
df_evidence_rank = df_evidence_original.select('*', rank().over(window).alias('rank'))\
.filter(col('rank') <= 3)

CPU times: user 7.47 ms, sys: 1.6 ms, total: 9.06 ms
Wall time: 54.4 ms


After creating the rank column, we can create three new columns to allocate each score (top_1, top_2, top_3)

In [69]:
import pyspark.sql.functions as f

df_evidence_rank = df_evidence_rank.filter(df_evidence_rank.rank <= 3)\
   .withColumn('col', f.expr('concat(rank, "_top")'))\
   .groupby('targetId', 'diseaseId' )\
   .pivot('col')\
   .agg(f.first(df_evidence_rank.score))

We can proceed to join the top score columns with the median 

In [73]:
%%time
df_evidence_rank = df_evidence_rank.join(df_evidence_median, ['targetId', 'diseaseId'])

CPU times: user 2.33 ms, sys: 716 µs, total: 3.04 ms
Wall time: 34.1 ms


In [74]:
df_evidence_rank.show()

+---------------+---------------+-----+-----+-----+------+
|       targetId|      diseaseId|1_top|2_top|3_top|median|
+---------------+---------------+-----+-----+-----+------+
|ENSG00000000419|    EFO_0003847|  0.0| null| null|   0.0|
|ENSG00000000419|     HP_0001249|  0.0| null| null|   0.0|
|ENSG00000000419|   Orphanet_137| 0.95| 0.92| null|  0.32|
|ENSG00000000971|    EFO_0001365| 0.92| null|  0.5|  0.32|
|ENSG00000000971|    EFO_0003884| 0.72| null| null|  0.72|
|ENSG00000000971|    EFO_1001155| 0.92|  0.9| null|  0.32|
|ENSG00000000971|Orphanet_200421| 0.92| null|  0.9|  0.32|
|ENSG00000000971|  Orphanet_2134| 0.92| null| null|   0.3|
|ENSG00000000971|Orphanet_329918| 0.92| null|  0.9|  0.32|
|ENSG00000000971| Orphanet_54370| 0.92| null|  0.9|  0.32|
|ENSG00000000971| Orphanet_93571| 0.92| null|  0.9|  0.32|
|ENSG00000000971| Orphanet_93579| 0.92| null|  0.9|  0.32|
|ENSG00000001036|    EFO_0004207|  0.3| null| null|   0.3|
|ENSG00000001084| Orphanet_33574|  0.9| 0.32| null|  0.3

A sample JSON object of each row

In [75]:
df_evidence_rank.toJSON().first()


'{"targetId":"ENSG00000000419","diseaseId":"EFO_0003847","1_top":0.0,"median":0.0}'

Let's create a target dataframe based on the target files

In [76]:
%%time
df_target = spark.read.json(file_list_target)
df_target.show() 

+-----------------+--------------------+--------------+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+----+--------------------+--------------------+
| alternativeGenes|        approvedName|approvedSymbol|       biotype|      chemicalProbes|          constraint|             dbXrefs|functionDescriptions|     genomicLocation|                  go|hallmarks|          homologues|             id|        nameSynonyms|       obsoleteNames|     obsoleteSymbols|            pathways|          proteinIds|safetyLiabilities|subcellularLocations|      symbolSynonyms|            synonyms|         targetClass| tep|        tractability|       transcriptIds|
+---

Again, we can declare a schema only for the fields that we need 

In [77]:
%%time
schema_target = StructType([
      StructField("id",StringType(),False),
      StructField("approvedSymbol",StringType(),False),
      StructField("approvedName",StringType(),False),
])

df_target = spark.read.schema(schema_target) \
        .json(file_list_target)
df_target.printSchema()
df_target.show()

root
 |-- id: string (nullable = true)
 |-- approvedSymbol: string (nullable = true)
 |-- approvedName: string (nullable = true)

+---------------+--------------+--------------------+
|             id|approvedSymbol|        approvedName|
+---------------+--------------+--------------------+
|ENSG00000004809|      SLC22A16|solute carrier fa...|
|ENSG00000007264|          MATK|megakaryocyte-ass...|
|ENSG00000012223|           LTF|    lactotransferrin|
|ENSG00000012779|         ALOX5|arachidonate 5-li...|
|ENSG00000050130|         JKAMP|JNK1/MAPK8 associ...|
|ENSG00000081818|        PCDHB4|protocadherin beta 4|
|ENSG00000086717|         PPEF1|protein phosphata...|
|ENSG00000087128|     TMPRSS11E|transmembrane ser...|
|ENSG00000099256|       PRTFDC1|phosphoribosyl tr...|
|ENSG00000100122|        CRYBB1|  crystallin beta B1|
|ENSG00000101363|        MANBAL|mannosidase beta ...|
|ENSG00000102243|         VGLL1|vestigial like fa...|
|ENSG00000103067|         ESRP2|epithelial splici...|
|ENSG0

We proceed to join both tables: evidence and target. These tables are joined by using the target

In [82]:
%%time
full_df = df_evidence_rank.join(df_target,df_evidence_rank["targetId"] == df_target["id"], how="inner").drop('id')

CPU times: user 3.29 ms, sys: 0 ns, total: 3.29 ms
Wall time: 26.3 ms


Finally we can order by the median of these columns

In [86]:
full_df = full_df.orderBy("median")


This is how it looks our final table with target and evidence fields 

In [87]:
full_df.show()

+---------------+--------------+-----+-----+-----+------+--------------+--------------------+
|       targetId|     diseaseId|1_top|2_top|3_top|median|approvedSymbol|        approvedName|
+---------------+--------------+-----+-----+-----+------+--------------+--------------------+
|ENSG00000168702|   EFO_0000668|  0.0| null| null|   0.0|         LRP1B|LDL receptor rela...|
|ENSG00000111596|   EFO_0002950|  0.0| null| null|   0.0|         CNOT2|CCR4-NOT transcri...|
|ENSG00000183696|    HP_0001657|  0.0| null| null|   0.0|          UPP1|uridine phosphory...|
|ENSG00000198835|   EFO_0003847| 0.92|  0.0| null|   0.0|          GJC2|gap junction prot...|
|ENSG00000111700|   EFO_0005556|  0.0| null| null|   0.0|       SLCO1B3|solute carrier or...|
|ENSG00000118407|    HP_0002269|  0.0| null| null|   0.0|        FILIP1|filamin A interac...|
|ENSG00000134755|   EFO_0004278|  0.0| null| null|   0.0|          DSC2|       desmocollin 2|
|ENSG00000061938|    HP_0002269|  0.0| null| null|   0.0|   

We proceed to print the json object based on the previous table

In [88]:
print(full_df.schema.json())


{"fields":[{"metadata":{},"name":"targetId","nullable":true,"type":"string"},{"metadata":{},"name":"diseaseId","nullable":true,"type":"string"},{"metadata":{},"name":"1_top","nullable":true,"type":"double"},{"metadata":{},"name":"2_top","nullable":true,"type":"double"},{"metadata":{},"name":"3_top","nullable":true,"type":"double"},{"metadata":{},"name":"median","nullable":true,"type":"double"},{"metadata":{},"name":"approvedSymbol","nullable":true,"type":"string"},{"metadata":{},"name":"approvedName","nullable":true,"type":"string"}],"type":"struct"}


This is for the final part about counting how many target-target pairs share a connection of at least two diseases. In this case, we need to make a inner join with the same table and the proper conditions

In [98]:
inner_full_df = full_df.alias("df1").join(full_df.alias("df2"), 
    (col("df1.diseaseId") == col("df2.diseaseId")) & (col("df1.targetId") != col("df2.targetId")), "inner")\
.select(col("df1.targetId").alias("t1"),col("df2.targetId").alias("t2"))


We select the shared connections in a new dataframe

In [103]:
inner_full_df = inner_full_df.groupBy("t1", "t2").count().filter(col("count")>1)

Finally, we get the total of target-target pairs with a shared connection of at least 2 diseases.

In [104]:
inner_full_df.count()

700828