## 1. LIBRERIE E SETTAGGI

In [1]:
import networkx as nx
import matplotlib.pyplot as plt
import os
import pandas
import numpy as np

from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer, IndexToString
from pyspark.ml import Pipeline
from pyspark.sql.functions import explode
from pyspark.sql.functions import arrays_overlap, array
from pyspark.sql.functions import col,array_contains
from pyspark.sql.functions import expr
from pyvis.network import Network

spark = SparkSession.builder.appName('Recommendation_system').getOrCreate()

## 2. CREAZIONE E PULIZIA DEI DATAFRAME

In [2]:
drugBankDF = spark.read.csv("DATA/uniprot links.csv", header=True, inferSchema=True)
drugBankDF.show()

+-----------+-------------------+-----------+----------+--------------------+
|DrugBank ID|               Name|       Type|UniProt ID|        UniProt Name|
+-----------+-------------------+-----------+----------+--------------------+
|    DB00001|          Lepirudin|BiotechDrug|    P00734|         Prothrombin|
|    DB00002|          Cetuximab|BiotechDrug|    P00533|Epidermal growth ...|
|    DB00002|          Cetuximab|BiotechDrug|    O75015|Low affinity immu...|
|    DB00002|          Cetuximab|BiotechDrug|    P00736|Complement C1r su...|
|    DB00002|          Cetuximab|BiotechDrug|    P02745|Complement C1q su...|
|    DB00002|          Cetuximab|BiotechDrug|    P02746|Complement C1q su...|
|    DB00002|          Cetuximab|BiotechDrug|    P02747|Complement C1q su...|
|    DB00002|          Cetuximab|BiotechDrug|    P08637|Low affinity immu...|
|    DB00002|          Cetuximab|BiotechDrug|    P09871|Complement C1s su...|
|    DB00002|          Cetuximab|BiotechDrug|    P12314|High aff

In [3]:
drugBankDF.count()

20941

In [4]:
ppiDF = spark.read.csv("DATA/species_13.csv", header=True, inferSchema=True, sep='\t')
ppiDF.show()

+-------------------+------------------+-----------------------+-----------------------+----------------------+----------------------+-------------------------------+-------------------------+-------------------------+--------------------+--------------------+--------------------+--------------------+-------------------------+-------------------+
|#ID(s) interactor A|ID(s) interactor B|Alt. ID(s) interactor A|Alt. ID(s) interactor B|Alias(es) interactor A|Alias(es) interactor B|Interaction detection method(s)|Publication 1st author(s)|Publication Identifier(s)|  Taxid interactor A|  Taxid interactor B| Interaction type(s)|  Source database(s)|Interaction identifier(s)|Confidence value(s)|
+-------------------+------------------+-----------------------+-----------------------+----------------------+----------------------+-------------------------------+-------------------------+-------------------------+--------------------+--------------------+--------------------+--------------------+

In [5]:
ppiDF.count()

1054920

Selezione delle colonne utili del DataFrame drugBankDF

In [6]:
drugBankDF = drugBankDF.select(drugBankDF["DrugBank ID"], drugBankDF["UniProt ID"])
drugBankDF.show()

+-----------+----------+
|DrugBank ID|UniProt ID|
+-----------+----------+
|    DB00001|    P00734|
|    DB00002|    P00533|
|    DB00002|    O75015|
|    DB00002|    P00736|
|    DB00002|    P02745|
|    DB00002|    P02746|
|    DB00002|    P02747|
|    DB00002|    P08637|
|    DB00002|    P09871|
|    DB00002|    P12314|
|    DB00002|    P12318|
|    DB00002|    P31994|
|    DB00002|    P31995|
|    DB00004|    P01589|
|    DB00004|    P14784|
|    DB00004|    P31785|
|    DB00005|    P01375|
|    DB00005|    P20333|
|    DB00005|    P12314|
|    DB00005|    P08637|
+-----------+----------+
only showing top 20 rows



Selezione delle colonne utili del DataFrame ppiDF

In [7]:
ppiDF = ppiDF.select(ppiDF['#ID(s) interactor A'], ppiDF['ID(s) interactor B'], ppiDF['Confidence value(s)'])
ppiDF.show()

+-------------------+------------------+-------------------+
|#ID(s) interactor A|ID(s) interactor B|Confidence value(s)|
+-------------------+------------------+-------------------+
|   uniprotkb:P38764|  uniprotkb:P40016|intact-miscore:0.76|
|   uniprotkb:Q01939|  uniprotkb:P40016|intact-miscore:0.40|
|   uniprotkb:P33299|  uniprotkb:P40016|intact-miscore:0.69|
|   uniprotkb:Q06103|  uniprotkb:P40016|intact-miscore:0.81|
|   uniprotkb:P38764|  uniprotkb:P40016|intact-miscore:0.76|
|   uniprotkb:P40016|  uniprotkb:P38764|intact-miscore:0.76|
|   uniprotkb:P53549|  uniprotkb:P40016|intact-miscore:0.55|
|   uniprotkb:P40016|  uniprotkb:Q08723|intact-miscore:0.70|
|   uniprotkb:Q08723|  uniprotkb:P40016|intact-miscore:0.70|
|   uniprotkb:P40016|  uniprotkb:P38886|intact-miscore:0.76|
|   uniprotkb:P40016|  uniprotkb:P43588|intact-miscore:0.76|
|   uniprotkb:P40016|  uniprotkb:P38764|intact-miscore:0.76|
|   uniprotkb:P40016|  uniprotkb:Q06103|intact-miscore:0.81|
|   uniprotkb:P53008|  u

Pulizia dei record del DataFrame ppiDF

In [8]:
ppiDF = ppiDF.withColumn('#ID(s) interactor A', regexp_replace('#ID(s) interactor A', 'uniprotkb:', ''))
ppiDF = ppiDF.withColumn('#ID(s) interactor A', regexp_replace('#ID(s) interactor A', 'chebi:', ''))
ppiDF = ppiDF.withColumn('#ID(s) interactor A', regexp_replace('#ID(s) interactor A', 'ensembl:', ''))
ppiDF = ppiDF.withColumn('#ID(s) interactor A', regexp_replace('#ID(s) interactor A', 'ensemblgenomes:', ''))
ppiDF = ppiDF.withColumn('#ID(s) interactor A', regexp_replace('#ID(s) interactor A', 'intact:', ''))
ppiDF = ppiDF.withColumn('#ID(s) interactor A', regexp_replace('#ID(s) interactor A', 'refseq:', ''))

ppiDF = ppiDF.withColumn('ID(s) interactor B', regexp_replace('ID(s) interactor B', 'uniprotkb:', ''))
ppiDF = ppiDF.withColumn('ID(s) interactor B', regexp_replace('ID(s) interactor B', 'chebi:', ''))
ppiDF = ppiDF.withColumn('ID(s) interactor B', regexp_replace('ID(s) interactor B', 'ensembl:', ''))
ppiDF = ppiDF.withColumn('ID(s) interactor B', regexp_replace('ID(s) interactor B', 'ensemblgenomes:', ''))
ppiDF = ppiDF.withColumn('ID(s) interactor B', regexp_replace('ID(s) interactor B', 'intact:', ''))
ppiDF = ppiDF.withColumn('ID(s) interactor B', regexp_replace('ID(s) interactor B', 'refseq:', ''))

ppiDF = ppiDF.withColumn('Confidence value(s)', regexp_replace('Confidence value(s)', 'intact-miscore:', ''))

ppiDF.show()

+-------------------+------------------+-------------------+
|#ID(s) interactor A|ID(s) interactor B|Confidence value(s)|
+-------------------+------------------+-------------------+
|             P38764|            P40016|               0.76|
|             Q01939|            P40016|               0.40|
|             P33299|            P40016|               0.69|
|             Q06103|            P40016|               0.81|
|             P38764|            P40016|               0.76|
|             P40016|            P38764|               0.76|
|             P53549|            P40016|               0.55|
|             P40016|            Q08723|               0.70|
|             Q08723|            P40016|               0.70|
|             P40016|            P38886|               0.76|
|             P40016|            P43588|               0.76|
|             P40016|            P38764|               0.76|
|             P40016|            Q06103|               0.81|
|             P53008|   

Ridenominazione dei parametri del DataFrame ppiDF

In [9]:
ppiDF = ppiDF.withColumnRenamed("#ID(s) interactor A", "Interactor_A")
ppiDF = ppiDF.withColumnRenamed("ID(s) interactor B", "Interactor_B")
ppiDF = ppiDF.withColumnRenamed("Confidence value(s)", "Confidence")

ppiDF.show()

+------------+------------+----------+
|Interactor_A|Interactor_B|Confidence|
+------------+------------+----------+
|      P38764|      P40016|      0.76|
|      Q01939|      P40016|      0.40|
|      P33299|      P40016|      0.69|
|      Q06103|      P40016|      0.81|
|      P38764|      P40016|      0.76|
|      P40016|      P38764|      0.76|
|      P53549|      P40016|      0.55|
|      P40016|      Q08723|      0.70|
|      Q08723|      P40016|      0.70|
|      P40016|      P38886|      0.76|
|      P40016|      P43588|      0.76|
|      P40016|      P38764|      0.76|
|      P40016|      Q06103|      0.81|
|      P53008|      P40016|      0.40|
|      P32565|      P40016|      0.40|
|      P40016|      P32565|      0.40|
|      P38764|      P40016|      0.76|
|      P53549|      P40016|      0.55|
|      P40016|      P38764|      0.76|
|      Q08723|      P40016|      0.70|
+------------+------------+----------+
only showing top 20 rows



Ridenominazione dei parametri del DataFrame drugBankDF

In [10]:
drugBankDF = drugBankDF.withColumnRenamed("DrugBank ID", "ID_DrugBank")
drugBankDF = drugBankDF.withColumnRenamed("UniProt ID", "ID_UniProt")

drugBankDF.show()

+-----------+----------+
|ID_DrugBank|ID_UniProt|
+-----------+----------+
|    DB00001|    P00734|
|    DB00002|    P00533|
|    DB00002|    O75015|
|    DB00002|    P00736|
|    DB00002|    P02745|
|    DB00002|    P02746|
|    DB00002|    P02747|
|    DB00002|    P08637|
|    DB00002|    P09871|
|    DB00002|    P12314|
|    DB00002|    P12318|
|    DB00002|    P31994|
|    DB00002|    P31995|
|    DB00004|    P01589|
|    DB00004|    P14784|
|    DB00004|    P31785|
|    DB00005|    P01375|
|    DB00005|    P20333|
|    DB00005|    P12314|
|    DB00005|    P08637|
+-----------+----------+
only showing top 20 rows



Cast dei valori 'Confidence' (stringhe) del DataFrame ppiDF in valori di tipo float

In [11]:
ppiDF = ppiDF.withColumn("Confidence", ppiDF["Confidence"].cast(FloatType()))
ppiDF.printSchema()

root
 |-- Interactor_A: string (nullable = true)
 |-- Interactor_B: string (nullable = true)
 |-- Confidence: float (nullable = true)



Eliminazione dei record del DataFrame ppiDF che presentano valori nulli nella colonna 'Confidence'

In [12]:
ppiDF = ppiDF.na.drop(subset=["Confidence"])
ppiDF.filter(ppiDF["Confidence"].isNull()).show()

+------------+------------+----------+
|Interactor_A|Interactor_B|Confidence|
+------------+------------+----------+
+------------+------------+----------+



DataFrame ppiDF pulito

In [13]:
ppiDF.show()

+------------+------------+----------+
|Interactor_A|Interactor_B|Confidence|
+------------+------------+----------+
|      P38764|      P40016|      0.76|
|      Q01939|      P40016|       0.4|
|      P33299|      P40016|      0.69|
|      Q06103|      P40016|      0.81|
|      P38764|      P40016|      0.76|
|      P40016|      P38764|      0.76|
|      P53549|      P40016|      0.55|
|      P40016|      Q08723|       0.7|
|      Q08723|      P40016|       0.7|
|      P40016|      P38886|      0.76|
|      P40016|      P43588|      0.76|
|      P40016|      P38764|      0.76|
|      P40016|      Q06103|      0.81|
|      P53008|      P40016|       0.4|
|      P32565|      P40016|       0.4|
|      P40016|      P32565|       0.4|
|      P38764|      P40016|      0.76|
|      P53549|      P40016|      0.55|
|      P40016|      P38764|      0.76|
|      Q08723|      P40016|       0.7|
+------------+------------+----------+
only showing top 20 rows



DataFrame drugBankDF pulito

In [14]:
drugBankDF.show()

+-----------+----------+
|ID_DrugBank|ID_UniProt|
+-----------+----------+
|    DB00001|    P00734|
|    DB00002|    P00533|
|    DB00002|    O75015|
|    DB00002|    P00736|
|    DB00002|    P02745|
|    DB00002|    P02746|
|    DB00002|    P02747|
|    DB00002|    P08637|
|    DB00002|    P09871|
|    DB00002|    P12314|
|    DB00002|    P12318|
|    DB00002|    P31994|
|    DB00002|    P31995|
|    DB00004|    P01589|
|    DB00004|    P14784|
|    DB00004|    P31785|
|    DB00005|    P01375|
|    DB00005|    P20333|
|    DB00005|    P12314|
|    DB00005|    P08637|
+-----------+----------+
only showing top 20 rows



Creo un file csv del DataFrame ppiDF pulito

In [15]:
ppiDF.toPandas().to_csv('ppi.csv', index=False, header=False)

Creo un nuovo DataFrame in cui, per ciascuna drug, è presente la lista delle proteine coinvolte (dal database DrugBank). Questo sarà utile per far si che il sistema raccomandi proteine, per ciascuna drug, non presenti nella lista delle proteine già coinvolte per quella determinata drug

In [16]:
drugTargetsDF = drugBankDF.groupBy("ID_DrugBank").agg(collect_list("ID_UniProt").alias("Proteins")).orderBy('ID_DrugBank')
drugTargetsDF.show()

+-----------+--------------------+
|ID_DrugBank|            Proteins|
+-----------+--------------------+
|    DB00001|            [P00734]|
|    DB00002|[P00533, O75015, ...|
|    DB00004|[P01589, P14784, ...|
|    DB00005|[P01375, P20333, ...|
|    DB00006|            [P00734]|
|    DB00007|            [P30968]|
|    DB00008|    [P48551, P17181]|
|    DB00009|[P00747, P02671, ...|
|    DB00010|            [Q02643]|
|    DB00011|    [P48551, P17181]|
|    DB00012|            [P19235]|
|    DB00013|[P00747, Q03405, ...|
|    DB00014|    [P22888, P30968]|
|    DB00015|[P00747, P02671, ...|
|    DB00016|            [P19235]|
|    DB00017|            [P30988]|
|    DB00018|    [P48551, P17181]|
|    DB00019|            [Q99062]|
|    DB00020|[P15509, P26951, ...|
|    DB00022|    [P48551, P17181]|
+-----------+--------------------+
only showing top 20 rows



## 3. CREAZIONE DEL DATAFRAME DA USARE PER IL SISTEMA DI RACCOMANDAZIONE

Il seguente DataFrame contiene, per ciascuna drug del DataFrame drugBankDF che agisce con una particolare proteina Id_UniProt, tanti record quanti sono le proteine che hanno un interazione con quest'ultima (Interactor_B nel DataFrame ppiDF), aventi valori di confidence > 0.5. Ciascun record contiene il valore 'Interaction' settato a 1, usato come feedback implicito

In [17]:
joinedDF = drugBankDF.join(ppiDF, (drugBankDF.ID_UniProt == ppiDF.Interactor_A) & (ppiDF.Confidence >= 0.5))
joinedDF.orderBy('ID_DrugBank').show()

+-----------+----------+------------+------------+----------+
|ID_DrugBank|ID_UniProt|Interactor_A|Interactor_B|Confidence|
+-----------+----------+------------+------------+----------+
|    DB00001|    P00734|      P00734|  EBI-941456|      0.56|
|    DB00001|    P00734|      P00734|      Q846V4|      0.73|
|    DB00001|    P00734|      P00734|      Q846V4|      0.73|
|    DB00001|    P00734|      P00734|      Q846V4|      0.73|
|    DB00001|    P00734|      P00734|  EBI-941456|      0.56|
|    DB00002|    P00533|      P00533|      P00533|      0.98|
|    DB00002|    P12314|      P12314|    Q9BXN2-6|      0.56|
|    DB00002|    P00533|      P00533|      P62994|      0.56|
|    DB00002|    P31994|      P31994|      P01857|      0.56|
|    DB00002|    P31994|      P31994|      P01857|      0.56|
|    DB00002|    P12314|      P12314|    Q8N6F1-2|      0.56|
|    DB00002|    P31994|      P31994|      P01857|      0.56|
|    DB00002|    P00533|      P00533|      P07948|      0.82|
|    DB0

Join tra joinedDF e drugTargetsDF e selezione delle colonne utili 

In [18]:
joinedDF = joinedDF.withColumnRenamed("ID_DrugBank", "ID_Drug")
joinedDF = joinedDF.join(drugTargetsDF, drugTargetsDF.ID_DrugBank == joinedDF.ID_Drug)
joinedDF = joinedDF.select(joinedDF['ID_Drug'], joinedDF['ID_UniProt'], joinedDF['Interactor_A'], joinedDF['Interactor_B'], joinedDF['Proteins'])

joinedDF.orderBy('ID_Drug').show()

+-------+----------+------------+------------+--------------------+
|ID_Drug|ID_UniProt|Interactor_A|Interactor_B|            Proteins|
+-------+----------+------------+------------+--------------------+
|DB00001|    P00734|      P00734|  EBI-941456|            [P00734]|
|DB00001|    P00734|      P00734|      Q846V4|            [P00734]|
|DB00001|    P00734|      P00734|      Q846V4|            [P00734]|
|DB00001|    P00734|      P00734|      Q846V4|            [P00734]|
|DB00001|    P00734|      P00734|  EBI-941456|            [P00734]|
|DB00002|    P00533|      P00533|      P62993|[P00533, O75015, ...|
|DB00002|    P00533|      P00533|      P06493|[P00533, O75015, ...|
|DB00002|    P00533|      P00533|      P38646|[P00533, O75015, ...|
|DB00002|    P00533|      P00533|      P11142|[P00533, O75015, ...|
|DB00002|    P00533|      P00533|      P22681|[P00533, O75015, ...|
|DB00002|    P09871|      P09871|      P00736|[P00533, O75015, ...|
|DB00002|    P00533|      P00533|      P31943|[P

Creazione della colonna 'Interaction'

In [19]:
joinedDF = joinedDF.withColumn("Interaction", lit(1))
joinedDF.orderBy('ID_Drug').show()

+-------+----------+------------+------------+--------------------+-----------+
|ID_Drug|ID_UniProt|Interactor_A|Interactor_B|            Proteins|Interaction|
+-------+----------+------------+------------+--------------------+-----------+
|DB00001|    P00734|      P00734|  EBI-941456|            [P00734]|          1|
|DB00001|    P00734|      P00734|      Q846V4|            [P00734]|          1|
|DB00001|    P00734|      P00734|      Q846V4|            [P00734]|          1|
|DB00001|    P00734|      P00734|      Q846V4|            [P00734]|          1|
|DB00001|    P00734|      P00734|  EBI-941456|            [P00734]|          1|
|DB00002|    P00533|      P00533|      P62993|[P00533, O75015, ...|          1|
|DB00002|    P00533|      P00533|      P06493|[P00533, O75015, ...|          1|
|DB00002|    P00533|      P00533|      P38646|[P00533, O75015, ...|          1|
|DB00002|    P00533|      P00533|      P11142|[P00533, O75015, ...|          1|
|DB00002|    P00533|      P00533|      P

Eliminazione dei duplicati e di eventuali record che presentano in 'Interactor_B' il valore '-'

In [20]:
joinedDF = joinedDF.filter(joinedDF.Interactor_B != "-")
joinedDF = joinedDF.dropDuplicates()

joinedDF.orderBy('ID_DrugBank', 'Interactor_B').show()

+-------+----------+------------+------------+--------------------+-----------+
|ID_Drug|ID_UniProt|Interactor_A|Interactor_B|            Proteins|Interaction|
+-------+----------+------------+------------+--------------------+-----------+
|DB00001|    P00734|      P00734|  EBI-941456|            [P00734]|          1|
|DB00001|    P00734|      P00734|      Q846V4|            [P00734]|          1|
|DB00002|    P00533|      P00533|      A4FU49|[P00533, O75015, ...|          1|
|DB00002|    P00533|      P00533| EBI-4399559|[P00533, O75015, ...|          1|
|DB00002|    P00533|      P00533|   NP_059022|[P00533, O75015, ...|          1|
|DB00002|    P00533|      P00533|      O00170|[P00533, O75015, ...|          1|
|DB00002|    P00533|      P00533|      O00401|[P00533, O75015, ...|          1|
|DB00002|    P00533|      P00533|      O00459|[P00533, O75015, ...|          1|
|DB00002|    P12314|      P12314|      O00526|[P00533, O75015, ...|          1|
|DB00002|    P00533|      P00533|      O

Creazione di una nuova colonna booleana 'Interactor_drug_target' usata per filtrare, per ciascuna drug, solo quelle proteine 'Interactor_B' che non hanno alcuna interazione diretta con quella specifica drug (viene, infatti, effettuato, per ciascun record, un check tra 'Interactor_B' e la lista in 'Proteins'. Se Interactor_B è presente in Proteins, il valore è True, False altrimenti)

In [21]:
joinedDF = joinedDF.withColumn("Interactor_drug_target", expr("array_contains(Proteins, Interactor_B)"))
joinedDF.orderBy('ID_DrugBank', 'Interactor_B').show(50)

+-------+----------+------------+------------+--------------------+-----------+----------------------+
|ID_Drug|ID_UniProt|Interactor_A|Interactor_B|            Proteins|Interaction|Interactor_drug_target|
+-------+----------+------------+------------+--------------------+-----------+----------------------+
|DB00001|    P00734|      P00734|  EBI-941456|            [P00734]|          1|                 false|
|DB00001|    P00734|      P00734|      Q846V4|            [P00734]|          1|                 false|
|DB00002|    P00533|      P00533|      A4FU49|[P00533, O75015, ...|          1|                 false|
|DB00002|    P00533|      P00533| EBI-4399559|[P00533, O75015, ...|          1|                 false|
|DB00002|    P00533|      P00533|   NP_059022|[P00533, O75015, ...|          1|                 false|
|DB00002|    P00533|      P00533|      O00170|[P00533, O75015, ...|          1|                 false|
|DB00002|    P00533|      P00533|      O00401|[P00533, O75015, ...|      

Per ciascuna drug, effettuo il conteggio dei record aventi lo stesso Interactor_B e Interactor_drug_target settato a 'false'. Tale quantità viene usata come rating

In [22]:
joinedDF = joinedDF.filter(joinedDF.Interactor_drug_target == 'false').groupBy("ID_Drug","Interactor_B").count()
joinedDF = joinedDF.withColumnRenamed("count", "Interactions")

joinedDF.orderBy('Interactions', ascending=False).show()

+-------+------------+------------+
|ID_Drug|Interactor_B|Interactions|
+-------+------------+------------+
|DB12010|      P08238|          47|
|DB12010|      Q16543|          19|
|DB12010|      P63104|          18|
|DB12010|      P61981|          15|
|DB12010|      P62993|          14|
|DB09130|      P00533|          13|
|DB12010|      Q12933|          11|
|DB12010|      Q04917|          11|
|DB12010|      P27986|          10|
|DB12010|      P31946|          10|
|DB15035|      P08238|          10|
|DB11638|      P00533|          10|
|DB12010|      P31947|           9|
|DB12010|      P19174|           9|
|DB12010|      P46108|           9|
|DB12010|      P07900|           9|
|DB12010|      P22681|           8|
|DB12010|      P04637|           8|
|DB12010|      Q06124|           8|
|DB12695|      P00533|           8|
+-------+------------+------------+
only showing top 20 rows



## 4. CONVERSIONE DEI VALORI DI TIPO 'STRING' IN INDICI

I valori da usare per addestrare il modello di raccomandazione possono essere soltanto di tipo 'intero' o 'double' (in questo caso i valori sono di tipo 'stringa'). Per ovviare a questo problema, utilizzo la funzione StringIndexer di Spark per codificare le stringhe. Tale funzione può essere usata per una sola colonna, quindi la funzione Pipelane viene utilizzata per combinare le due colonne indicizzate

In [23]:
drugIndexer = StringIndexer(inputCol='ID_Drug', outputCol='ID_Drug_index').fit(joinedDF)
proteinIndexer = StringIndexer(inputCol='Interactor_B', outputCol='Interactor_B_index').fit(joinedDF)

pipeline = Pipeline(stages=[drugIndexer, proteinIndexer])

indexedDF = pipeline.fit(joinedDF).transform(joinedDF)

indexedDF.show()

+-------+------------+------------+-------------+------------------+
|ID_Drug|Interactor_B|Interactions|ID_Drug_index|Interactor_B_index|
+-------+------------+------------+-------------+------------------+
|DB11712|      P12956|           1|        329.0|             551.0|
|DB00114|      Q9NVD7|           1|        200.0|            4935.0|
|DB04988|      P04406|           2|         41.0|             256.0|
|DB07529|      O75496|           1|        525.0|             102.0|
|DB00823|      P78362|           1|        150.0|             147.0|
|DB12010|      Q9GZT8|           2|          0.0|            3232.0|
|DB00624|      Q15084|           1|         81.0|             122.0|
|DB00074|      O43491|           1|       1220.0|            1123.0|
|DB14487|    O75182-2|           1|          1.0|            1540.0|
|DB00907|      P30825|           1|        214.0|             736.0|
|DB06870|    Q9BY66-3|           1|        619.0|             317.0|
|DB04573|      Q9BQ39|           1

## 5. CREAZIONE DEL MODELLO ALS E FITTING DEI DATI

Viene effettuato il training e la valutazione del nostro modello. Vengono provati diversi settaggi per trovare il più piccolo valore di RMSE, usato per testare l'accuracy delle predizioni del nostro modello.

La funzione RegressionEvaluator ci permette di iterare differenti rank, alpha, iterazioni, ecc. per trovare il valore più basso di RMSE.

A seguire vengono elencati i parametri usati quando si crea un oggetto ALS:

- maxIter: Numero massimo di iterazioni da eseguire (default: 10)

- regParam: Parametro di regolarizzazione. Riduce l'overfitting del modello, il che porta ad una riduzione della varianza nelle stime. (default: 0.01)

- rank: Dimensioni dei feature vectors da usare. Grandi rank possono portare a modelli migliori ma sono molto più costosi da calcolare (default: 10)

- alpha: Una costante usata per calcolare la confidence con dataset impliciti (default: 1.0)

- implicitPrefs: impostato a True quando si lavora con feedback implicito (dafault: False)

- coldStartStrategy: Strategia per gestire nuovi o sconosciuti items/users. Impostandolo su 'drop' si escludono questi item dai risultati (default: nan)

In [24]:
(training,test) = indexedDF.randomSplit([0.8, 0.2])

regParams = [0.01, 0.1]
ranks = [25]
alphas = [10.0, 20.0, 40.0, 60.0, 80.0]

aus_regParam = 0.0
aus_rank = 0
aus_alpha = 0.0
aus_rmse = 0.0

print('Creating ALS model ...')
for regParam in regParams:
    for rank in ranks:
        for alpha in alphas:
            aus_als = ALS(maxIter=10, regParam=regParam, rank=rank, alpha=alpha, userCol='ID_Drug_index', itemCol="Interactor_B_index", ratingCol="Interactions", coldStartStrategy="drop")
            aus_model = aus_als.fit(training)
            predictions = aus_model.transform(test)
            evaluator = RegressionEvaluator(metricName="rmse", labelCol="Interactions", predictionCol="prediction")
            rmse = evaluator.evaluate(predictions)

            if(aus_rmse == 0.0 or rmse < aus_rmse):
                aus_regParam = regParam
                aus_rank = rank
                aus_alpha = alpha
                aus_rmse = rmse
                model = aus_model

            print("For regParam: {0}, rank: {1}, alpha: {2}, RMSE: {3}".format(regParam, rank, alpha, rmse))

print('Chosen parameters: regParam = {0}, rank = {1}, alpha = {2}'.format(aus_regParam, aus_rank, aus_alpha))

Creating ALS model ...
For regParam: 0.01, rank: 25, alpha: 10.0, RMSE: 0.2846906755205193
For regParam: 0.01, rank: 25, alpha: 20.0, RMSE: 0.2846906755205193
For regParam: 0.01, rank: 25, alpha: 40.0, RMSE: 0.2846906755205193
For regParam: 0.01, rank: 25, alpha: 60.0, RMSE: 0.2846906755205193
For regParam: 0.01, rank: 25, alpha: 80.0, RMSE: 0.2846906755205193
For regParam: 0.1, rank: 25, alpha: 10.0, RMSE: 0.2689031685224229
For regParam: 0.1, rank: 25, alpha: 20.0, RMSE: 0.2689031685224229
For regParam: 0.1, rank: 25, alpha: 40.0, RMSE: 0.26890316852242296
For regParam: 0.1, rank: 25, alpha: 60.0, RMSE: 0.2689031685224229
For regParam: 0.1, rank: 25, alpha: 80.0, RMSE: 0.26890316852242296
Chosen parameters: regParam = 0.1, rank = 25, alpha = 10.0


## 6. SISTEMA DI RACCOMANDAZIONE

Selezione di n (scelto dall'utente) proteine raccomandate per drug

In [25]:
print("Insert a number of recommendations per drug:")
n = int(input())

protein_recs = model.recommendForAllUsers(n)

protein_recs.show()

Insert a number of recommendations per drug:
5
+-------------+--------------------+
|ID_Drug_index|     recommendations|
+-------------+--------------------+
|         1580|[[5263, 3.0869117...|
|          471|[[5263, 3.0935702...|
|         1591|[[3543, 3.6073778...|
|         4101|[[5263, 3.1222749...|
|         1342|[[5263, 3.0735283...|
|         2122|[[5263, 3.1680398...|
|         2142|[[5263, 3.194394]...|
|          463|[[5263, 2.9468124...|
|          833|[[5263, 3.0409713...|
|         3794|[[5263, 2.6746826...|
|         1645|[[5263, 3.204765]...|
|         3175|[[5263, 2.935325]...|
|          496|[[3543, 2.74789],...|
|         2366|[[5263, 3.0821824...|
|         2866|[[5263, 3.0592232...|
|         3997|[[3543, 2.924361]...|
|          148|[[5263, 2.983145]...|
|         1088|[[5263, 2.9785137...|
|         1238|[[5263, 3.1817076...|
|         3918|[[5263, 3.1363788...|
+-------------+--------------------+
only showing top 20 rows



Per ciascuna ID_Drug_index, splitto i valori [Interactor_B_index, rating] di 'recommendations' in due colonne

In [26]:
flatDrugRecs = protein_recs.withColumn('proteinAndRating', explode(protein_recs.recommendations)).select('ID_Drug_index', 'proteinAndRating.*')

flatDrugRecs.show()

+-------------+------------------+---------+
|ID_Drug_index|Interactor_B_index|   rating|
+-------------+------------------+---------+
|         1580|              5263|3.0869117|
|         1580|              6119|3.0869117|
|         1580|              3543|2.8562443|
|         1580|              6280|2.3151839|
|         1580|              1137|1.9725999|
|          471|              5263|3.0935702|
|          471|              6119|3.0935702|
|          471|              3543| 2.849777|
|          471|              6280|2.3201783|
|          471|              1137|2.0504289|
|         1591|              3543|3.6073778|
|         1591|              5263|3.2085395|
|         1591|              6119|3.2085395|
|         1591|              6280|2.4064047|
|         1591|              3937|2.0296333|
|         4101|              5263|3.1222749|
|         4101|              6119|3.1222749|
|         4101|              3543|2.8859322|
|         4101|              6280|2.3417063|
|         

## 7. CONVERSIONE DEGLI INDICI NELLE STRINGHE ORIGINALI

In [27]:
drugString = IndexToString(inputCol='ID_Drug_index', outputCol='ID_Drug', labels=drugIndexer.labels)
proteinString = IndexToString(inputCol='Interactor_B_index', outputCol='ID_Protein', labels=proteinIndexer.labels)

convertedDrugRecs = Pipeline(stages=[drugString, proteinString]).fit(indexedDF).transform(flatDrugRecs)
convertedDrugRecs = convertedDrugRecs.select(convertedDrugRecs['ID_Drug'], convertedDrugRecs['ID_Protein'], convertedDrugRecs['rating'])

convertedDrugRecs.select('ID_Drug', 'ID_Protein', 'rating').orderBy('rating', ascending=False).show()

+-------+----------+---------+
|ID_Drug|ID_Protein|   rating|
+-------+----------+---------+
|DB12010|    P08238|12.920674|
|DB12010|    P63104| 8.531653|
|DB12010|    P61981| 7.272068|
|DB12010|    Q16543|6.8025374|
|DB12010|    Q04917|6.7631745|
|DB08236|    Q9BPX5| 4.915586|
|DB08235|    Q9BPX5|4.8028083|
|DB08515|    Q9Y244|4.7891808|
|DB07728|    Q9Y375| 4.620285|
|DB07728|    Q9P032| 4.620285|
|DB04160|    Q9P032| 4.534795|
|DB04160|    Q9Y375| 4.534795|
|DB08236|    Q9P032| 4.426154|
|DB08236|    Q9Y375| 4.426154|
|DB08358|    Q9P032|4.2863016|
|DB08358|    Q9Y375|4.2863016|
|DB12695|    Q9UQL6|  4.26186|
|DB12695|    P30305|  4.26186|
|DB07080|    Q9Y244|4.1399984|
|DB00162|    Q9P032|4.0918837|
+-------+----------+---------+
only showing top 20 rows



## 8. VERIFICA DEL SISTEMA DI RACCOMANDAZIONE CREATO

In [28]:
print("Insert an id_drug: ")
id_drug = input()

print('Recommended Proteins for {0}'.format(id_drug))
convertedDrugRecs.filter(convertedDrugRecs.ID_Drug.isin(id_drug)).select(convertedDrugRecs['ID_Protein'], convertedDrugRecs['rating']).show(n)

csvDF = convertedDrugRecs.filter(convertedDrugRecs.ID_Drug.isin(id_drug)).select(convertedDrugRecs['ID_Protein'], convertedDrugRecs['rating'])
csvDF.toPandas().to_csv('rec.csv', index=False)

print('joinedDF')
joinedDF.filter(joinedDF.ID_Drug.isin(id_drug)).orderBy('Interactions', 'Interactor_B', ascending=False).show()



Insert an id_drug: 
DB12010
Recommended Proteins for DB12010
+----------+---------+
|ID_Protein|   rating|
+----------+---------+
|    P08238|12.920674|
|    P63104| 8.531653|
|    P61981| 7.272068|
|    Q16543|6.8025374|
|    Q04917|6.7631745|
+----------+---------+

joinedDF
+-------+------------+------------+
|ID_Drug|Interactor_B|Interactions|
+-------+------------+------------+
|DB12010|      P08238|          47|
|DB12010|      Q16543|          19|
|DB12010|      P63104|          18|
|DB12010|      P61981|          15|
|DB12010|      P62993|          14|
|DB12010|      Q12933|          11|
|DB12010|      Q04917|          11|
|DB12010|      P31946|          10|
|DB12010|      P27986|          10|
|DB12010|      P46108|           9|
|DB12010|      P31947|           9|
|DB12010|      P19174|           9|
|DB12010|      P07900|           9|
|DB12010|      Q06124|           8|
|DB12010|      P22681|           8|
|DB12010|      P04637|           8|
|DB12010|      P61962|           7|
|D

## 9. GENERAZIONE GRAFI

Creazione lista contenente le proteina raccomandate dal sistema

In [29]:
rec_list = []

rec_prot_DF = spark.read.csv("rec.csv", header=True, inferSchema=True)
rec_prot_array = np.array(rec_prot_DF.select("ID_Protein").collect())

for rec in rec_prot_array:
    rec_list.append(rec[0])

Creazione lista contenente le proteine che interagiscono in modo diretto con la drug inserita per il sistema di raccomandazione

In [30]:
dir_list = []

dir_prot_DF = drugBankDF.filter(drugBankDF.ID_DrugBank.isin(id_drug))
dir_prot_array = np.array(dir_prot_DF.select("ID_UniProt").collect())

for dir in dir_prot_array:
    dir_list.append(dir[0])

Creazione grafo contenenti tutte le PPI

In [31]:
G = nx.DiGraph()

f = open("ppi.csv", "r")

for line in f:
    node1, node2, weight = line.split(",")
    G.add_edge(node1, node2, weight=float(weight))

Filtraggio del grafo in modo tale che contenga soltanto i nodi collegati con una specifica proteina (raccomandata dal sistema) ad una certa distanza

In [32]:
print("Insert the center node ")
node = input()
print("Insert the radius ")
radius = input()
G = nx.generators.ego_graph(G, node, radius=int(radius))

nodes_list = []

for line in G.edges():
    nodes_list.append([line[0], line[1], G.edges[line[0], line[1]]["weight"]])

Insert the center node 
P61981
Insert the radius 
1


Rappresentazione del grafo parziale contenente le interazioni tra la proteina inserita e le proteine collegate in modo diretto alla drug, tra eventuali altre proteine raccomandate

In [33]:
def partial_graph(nodes_list):
    net = Network(height="100%", width="100%", bgcolor="#222222", font_color="white")

    for i in range(len(nodes_list)):
        node1 = nodes_list[i][0]
        node2 = nodes_list[i][1]
        w = float(nodes_list[i][2])

        if node1 in rec_list:
            if node2 in rec_list:
                net.add_node(node1, color="#ff4d4d")
                net.add_node(node2, color="#ff4d4d")
                net.add_edge(node1, node2, value=w, title=w, color="#ff3300", width=float(w))
            elif node2 in dir_list:
                net.add_node(node1, color="#ff4d4d")
                net.add_node(node2, color="#80ff80")
                net.add_edge(node1, node2, title=w, color="#ffcc66", width=float(w))
            else:
                continue
        elif node1 in dir_list:
            if node2 in rec_list:
                net.add_node(node1, color="#80ff80")
                net.add_node(node2, color="#ff4d4d")
                net.add_edge(node1, node2, value=w, title=w, color="#ffcc66", width=float(w))
            elif node2 in dir_list:
                net.add_node(node1, color="#80ff80")
                net.add_node(node2, color="#80ff80")
                net.add_edge(node1, node2, value=w, title=w, color="#66ff66", width=float(w))
            else:
                continue
        else:
            continue

    #net.show_buttons(filter_=['physics'])
    #net.show_buttons(filter_=['nodes'])

    net.set_options(
    """
    var options = {
      "physics": {
        "forceAtlas2Based": {
          "gravitationalConstant": -268,
          "centralGravity": 0.025,
          "springLength": 265,
          "springConstant": 0.14,
          "damping": 0.17
        },
        "maxVelocity": 0,
        "minVelocity": 0.01,
        "solver": "forceAtlas2Based",
        "timestep": 0.01
      },
      "nodes": {
        "borderWidthSelected": 4
      }
    }
    """
    )

    net.show("{0}_partial_network_for_{1}_IDdrug.html".format(node ,id_drug))


Rappresentazione del grafo completo contenente tutte le interazioni della proteina inserita

In [34]:
def complete_graph(nodes_list):
    net = Network(height="100%", width="100%", bgcolor="#222222", font_color="white")

    for i in range(len(nodes_list)):
        node1 = nodes_list[i][0]
        node2 = nodes_list[i][1]
        w = float(nodes_list[i][2])

        if node1 in rec_list:
            if node2 in rec_list:
                net.add_node(node1, color="#ff4d4d")
                net.add_node(node2, color="#ff4d4d")
                net.add_edge(node1, node2, value=w, title=w, color="#ff3300", width=float(w))
            elif node2 in dir_list:
                net.add_node(node1, color="#ff4d4d")
                net.add_node(node2, color="#80ff80")
                net.add_edge(node1, node2, title=w, color="#ffcc66", width=float(w))
            else:
                net.add_node(node1, color="#ff4d4d")
                net.add_node(node2)
                net.add_edge(node1, node2, value=w, title=w, color='black', width=float(w))
        elif node1 in dir_list:
            if node2 in rec_list:
                net.add_node(node1, color="#80ff80")
                net.add_node(node2, color="#ff4d4d")
                net.add_edge(node1, node2, value=w, title=w, color="#ffcc66", width=float(w))
            elif node2 in dir_list:
                net.add_node(node1, color="#80ff80")
                net.add_node(node2, color="#80ff80")
                net.add_edge(node1, node2, value=w, title=w, color="#66ff66", width=float(w))
            else:
                net.add_node(node1, color="#80ff80")
                net.add_node(node2)
                net.add_edge(node1, node2, value=w, title=w, color='black', width=float(w))
        else:
            if node2 in rec_list:
                net.add_node(node1)
                net.add_node(node2, color="#ff4d4d")
                net.add_edge(node1, node2, value=w, title=w, color='black', width=float(w))
            elif node2 in dir_list:
                net.add_node(node1)
                net.add_node(node2, color="#80ff80")
                net.add_edge(node1, node2, value=w, title=w, color='black', width=float(w))
            else:
                net.add_node(node1)
                net.add_node(node2)
                net.add_edge(node1, node2, value=w, title=w, color='black', width=float(w))

    #net.show_buttons(filter_=['physics'])
    #net.show_buttons(filter_=['nodes'])

    net.set_options(
    """
    var options = {
      "physics": {
        "forceAtlas2Based": {
          "gravitationalConstant": -268,
          "centralGravity": 0.025,
          "springLength": 265,
          "springConstant": 0.14,
          "damping": 0.17
        },
        "maxVelocity": 0,
        "minVelocity": 0.01,
        "solver": "forceAtlas2Based",
        "timestep": 0.01
      },
      "nodes": {
        "borderWidthSelected": 4
      }
    }
    """
    )

    net.show("{0}_complete_network_for_{1}_IDdrug.html".format(node ,id_drug))


In [35]:
partial_graph(nodes_list)

In [36]:
complete_graph(nodes_list)