## Libraries

In [5]:
# Import necessary libraries
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, when, collect_list, size, count
from pyspark.sql import functions as F
import findspark
findspark.init()

## Read Nodes from .tsv

In [6]:
# Open nodes tsv and read into df_nodes

nodes_path = "C:/Users/bwc07/Downloads/nodes_test.tsv"
data = []   # Holds each line seperated in an array
temp = []   # Used to hold line after split by '\t'

# Read the file by each line
with open(nodes_path, "r") as file:
    for line in file:
        line = line[:-1]   #Remove '\n' at end of line
        temp = line.split('\t')
        data.append(temp)
        
# Make dataframe
df_nodes = pd.DataFrame(data)
df_nodes = df_nodes.drop(df_nodes.columns[-1], axis=1)   # Drop last column
df_nodes.columns = df_nodes.iloc[0]   # Make first row the Column titles
df_nodes = df_nodes[1:]   # Remove the first column
df_nodes.head()

Unnamed: 0,id,name,kind
1,Anatomy::UBERON:0000002,uterine cervix,Anatomy
2,Anatomy::UBERON:0000004,nose,Anatomy
3,Anatomy::UBERON:0000006,islet of Langerhans,Anatomy
4,Anatomy::UBERON:0000007,pituitary gland,Anatomy
5,Anatomy::UBERON:0000010,peripheral nervous system,Anatomy


## Read Edges from .tsv

In [7]:
# Open nodes tsv and read into df_edges

edges_path = "C:/Users/bwc07/Downloads/edges_test.tsv"
data_e = []   # Holds each line seperated in an array
temp_e = []   # Used to hold line after split by '\t'

# Read the file by each line
with open(edges_path, "r") as file:
    for line in file:
        line = line[:-1]   #Remove '\n' at end of line
        temp_e = line.split('\t')
        data_e.append(temp_e)
        
# Make dataframe
df_edges = pd.DataFrame(data_e)
df_edges = df_edges.drop(df_edges.columns[-1], axis=1)   # Drop last column
df_edges.columns = df_edges.iloc[0]   # Make first row the Column titles
df_edges = df_edges[1:]   # Remove the first column

# I have to do this because there was a format error when reading the data
temp_split = df_edges.iloc[409606][0].split(" ")
# Correct data
df_edges['ource'] = df_edges['ource'].replace([df_edges.iloc[409606][0]], temp_split[0])
df_edges['metaedge'] = df_edges['metaedge'].replace(["Gene::10000000"], temp_split[1])
df_edges['target'] = df_edges['target'].replace([None], "Gene::10000000")
df_edges.rename(columns = {'ource':'Source'}, inplace = True)

df_edges.head()

Unnamed: 0,Source,metaedge,target
1,Gene::801,GiG,Gene::7428
2,Gene::5987,GiG,Gene::9412
3,Gene::5747,GiG,Gene::79738
4,Gene::3725,GiG,Gene::10514
5,Gene::10014,GiG,Gene::55844


## Create Spark for Nodes

In [53]:
# Create a Spark session
spark1 = SparkSession.builder.appName("NODES").getOrCreate()

# Convert the Pandas DataFrame to a PySpark DataFrame
nodes = spark1.createDataFrame(df_nodes)

In [9]:
# Show the first few rows of the PySpark DataFrame
nodes.show(5, truncate=False)

+-----------------------+-------------------------+-------+
|id                     |name                     |kind   |
+-----------------------+-------------------------+-------+
|Anatomy::UBERON:0000002|uterine cervix           |Anatomy|
|Anatomy::UBERON:0000004|nose                     |Anatomy|
|Anatomy::UBERON:0000006|islet of Langerhans      |Anatomy|
|Anatomy::UBERON:0000007|pituitary gland          |Anatomy|
|Anatomy::UBERON:0000010|peripheral nervous system|Anatomy|
+-----------------------+-------------------------+-------+
only showing top 5 rows



In [10]:
# Number of Rows
nodes.count()

23042

## Create Spark for Edges

In [56]:
# Create a Spark session
spark2 = SparkSession.builder.appName("EDGES").getOrCreate()

# Convert the Pandas DataFrame to a PySpark DataFrame
edges = spark2.createDataFrame(df_edges)

In [12]:
# Show the first few rows of the PySpark DataFrame
edges.show(5, truncate=False)

+-----------+--------+-----------+
|Source     |metaedge|target     |
+-----------+--------+-----------+
|Gene::801  |GiG     |Gene::7428 |
|Gene::5987 |GiG     |Gene::9412 |
|Gene::5747 |GiG     |Gene::79738|
|Gene::3725 |GiG     |Gene::10514|
|Gene::10014|GiG     |Gene::55844|
+-----------+--------+-----------+
only showing top 5 rows



In [13]:
# Nummber of Rows
edges.count()

1292210

### Filter Nodes for Disease, Compound, and Gene

In [57]:
# Compound, Disease, or Gene = cdg
cdg = nodes.filter((nodes['kind'] == 'Disease') | 
                   (nodes['kind'] == 'Compound') | 
                   (nodes['kind'] == 'Gene'))

In [15]:
# Show first five entries
cdg.show(5)

+-----------------+------------+--------+
|               id|        name|    kind|
+-----------------+------------+--------+
|Compound::XL00001|  MagicpillA|Compound|
|Compound::XL00002|  MagicpillB|Compound|
|Compound::XL00003|  MagicpillC|Compound|
|Compound::DB00014|   Goserelin|Compound|
|Compound::DB00035|Desmopressin|Compound|
+-----------------+------------+--------+
only showing top 5 rows



In [16]:
# Number of Rows
cdg.count()

22640

### Filter Edges for Compound Associations with Gene and Disease

In [58]:
# treatments is when a compound associates with a gene or disease
# CtD = Compound treats Disease
# CbG = Compound binds to Gene
# CuG = Compound upregulates Gene
# CdG = Compound downregulates Gene
# CpD = Compound palliates Disease
                                   
treatments = edges.filter( (edges['metaedge'] == 'CtD') | 
                          (edges['metaedge'] == 'CbG') | 
                          (edges['metaedge'] == 'CuG') | 
                          (edges['metaedge'] == 'CdG') | 
                          (edges['metaedge'] == 'CpD') )

In [18]:
# Show first five enties and turn of truncation
treatments.show(5, truncate=False)

+-----------------+--------+-------------------+
|Source           |metaedge|target             |
+-----------------+--------+-------------------+
|Compound::DB00997|CtD     |Disease::DOID:363  |
|Compound::DB00206|CtD     |Disease::DOID:10763|
|Compound::DB00960|CtD     |Disease::DOID:10763|
|Compound::DB00665|CtD     |Disease::DOID:10283|
|Compound::DB00290|CtD     |Disease::DOID:2998 |
+-----------------+--------+-------------------+
only showing top 5 rows



In [19]:
# Number of Rows
treatments.count()

52577

## Join Edges Row to Nodes Row if the id in Nodes equals Source_id in Edges

In [59]:
# Join the row of treatments if the id of cdg equals the Source of treatments
joined_df = cdg.join(treatments, 
                     cdg['id'] == treatments['Source'], 
                     'inner')

In [60]:
# Show first five entries
joined_df.show(5)

+-----------------+--------------------+--------+-----------------+--------+--------------------+
|               id|                name|    kind|           Source|metaedge|              target|
+-----------------+--------------------+--------+-----------------+--------+--------------------+
|Compound::DB01073|         Fludarabine|Compound|Compound::DB01073|     CtD|Disease::DOID:006...|
|Compound::DB01073|         Fludarabine|Compound|Compound::DB01073|     CtD|  Disease::DOID:2531|
|Compound::DB00945|Acetylsalicylic acid|Compound|Compound::DB00945|     CtD|  Disease::DOID:3393|
|Compound::DB00945|Acetylsalicylic acid|Compound|Compound::DB00945|     CtD| Disease::DOID:13378|
|Compound::DB00945|Acetylsalicylic acid|Compound|Compound::DB00945|     CbG|          Gene::5743|
+-----------------+--------------------+--------+-----------------+--------+--------------------+
only showing top 5 rows



In [22]:
# Count the number of rows
joined_df.count()

52577

# Problem 1

## Create new Table of Compound and Number of Associations with Genes and Diseases

In [61]:
# Create new table where it groups by id first then sums the number of assoications of 
# diseases by CtD and CpD and sums the number of assoications of genes by CbG, CuG, CdG
# and makes two new columns called num_diseases and num_genes.
# 0 means it does not assoicate with any drugs and/or diseases
grouped_df_ID = joined_df.groupBy('id').agg(
    sum(when(col('metaedge').isin('CtD', 'CpD'), 1).otherwise(0)).alias('num_diseases'),
    sum(when(col('metaedge').isin('CbG', 'CuG', 'CdG'), 1).otherwise(0)).alias('num_genes')
)

## Number of Genes and Diseases Associated with Drugs

In [24]:
# Show the first five of the table
grouped_df_ID.show(5)

+-----------------+------------+---------+
|               id|num_diseases|num_genes|
+-----------------+------------+---------+
|Compound::DB01073|           2|        5|
|Compound::DB00945|           7|       18|
|Compound::DB01407|           0|       19|
|Compound::DB01466|           0|        7|
|Compound::DB01235|           2|       22|
+-----------------+------------+---------+
only showing top 5 rows



In [25]:
# Number of Rows
grouped_df_ID.count()

1445

## Top 5 Genes

In [26]:
# I make a new table where I order the num_genes by descending order
top_5_genes_withID = grouped_df_ID.orderBy(col('num_genes').desc())

In [27]:
# Show only the top 5 num_genes
result1 = top_5_genes_withID.select("id", "num_genes")
result1.limit(5).show()

+-----------------+---------+
|               id|num_genes|
+-----------------+---------+
|Compound::DB08865|      585|
|Compound::DB01254|      564|
|Compound::DB00997|      532|
|Compound::DB00570|      523|
|Compound::DB00390|      522|
+-----------------+---------+



# Problem 2

## Create simlar Table as Used Problem 1 but Group by Name

In [62]:
# Do the same as grouped_df_ID but now the table is grouped by name
grouped_df_name = joined_df.groupBy('name').agg(
    sum(when(col('metaedge').isin('CtD', 'CpD'), 1).otherwise(0)).alias('num_diseases'),
    sum(when(col('metaedge').isin('CbG', 'CuG', 'CdG'), 1).otherwise(0)).alias('num_genes')
)

In [29]:
# Show the first five entries
grouped_df_name.show(5)

+----------+------------+---------+
|      name|num_diseases|num_genes|
+----------+------------+---------+
| Auranofin|           2|       63|
|Tazarotene|           1|        5|
| Almitrine|           0|        1|
| Primidone|           1|       21|
| Nebivolol|           1|        4|
+----------+------------+---------+
only showing top 5 rows



In [30]:
grouped_df_name.count()

1445

### Create New Table that Sorts the Number of Drugs Based on How Many Disease Assoications

In [63]:
# Group by num_diseases and create a new column that 
# stores the list of drugs in list_in_names
grouped_num_disease = grouped_df_name.groupBy("num_diseases").agg(collect_list("name").alias("list_of_names"))

In [32]:
# Show first five entries
grouped_num_disease.show(5, truncate=100)

+------------+----------------------------------------------------------------------------------------------------+
|num_diseases|                                                                                       list_of_names|
+------------+----------------------------------------------------------------------------------------------------+
|          19|                                                                                      [Methotrexate]|
|           0|[Almitrine, Penciclovir, Probucol, Mepenzolate, Phenprocoumon, N-Acetyl-D-glucosamine, Naphazolin...|
|           7|                         [Fluorouracil, Acetylsalicylic acid, Dacarbazine, Vinblastine, Vincristine]|
|           6| [Irinotecan, Azathioprine, Capecitabine, Mitoxantrone, Naproxen, Clonidine, Gemcitabine, Bleomycin]|
|           9|                                                                 [Docetaxel, Celecoxib, Carboplatin]|
+------------+----------------------------------------------------------

In [64]:
# Create new table that counts how many drugs 
# associate with a certain number of diseases
grouped_num_disease_assoc = grouped_num_disease.withColumn("num_names", size( col("list_of_names") ))

In [34]:
grouped_num_disease_assoc.show(5)

+------------+--------------------+---------+
|num_diseases|       list_of_names|num_names|
+------------+--------------------+---------+
|          19|      [Methotrexate]|        1|
|           0|[Almitrine, Penci...|      893|
|           7|[Fluorouracil, Ac...|        5|
|           6|[Irinotecan, Azat...|        8|
|           9|[Docetaxel, Celec...|        3|
+------------+--------------------+---------+
only showing top 5 rows



In [91]:
# Select only the number of drugs and the number of diesease that they associate with
# and then order num_diseases in descending order
result2 = grouped_num_disease_assoc.select("num_names", "num_diseases")
result2 = result2.orderBy("num_names", ascending=True)

In [92]:
result2.show(10)

+---------+------------+
|num_names|num_diseases|
+---------+------------+
|        1|          19|
|        1|          11|
|        2|          17|
|        2|          10|
|        2|          13|
|        2|          14|
|        2|          16|
|        3|           9|
|        3|           8|
|        5|           7|
+---------+------------+
only showing top 10 rows



In [93]:
result2 = result2.groupBy("num_names").agg(
    F.count("num_diseases").alias("grouped_diseases")
)
result2 = result2.orderBy(col('num_names').asc())

In [88]:
result2.show(5)

+---------+----------------+
|num_names|grouped_diseases|
+---------+----------------+
|      893|               1|
|      332|               1|
|      107|               1|
|       42|               1|
|       30|               1|
+---------+----------------+
only showing top 5 rows



In [38]:
###############################################################################

In [39]:
# Extra associations that can be made from the data set

In [40]:
# Top 5 Disease Assoications 
top_5_diseases = grouped_df_name.orderBy(col('num_diseases').desc()).limit(5)
top_5_diseases.show()

+-------------+------------+---------+
|         name|num_diseases|num_genes|
+-------------+------------+---------+
| Methotrexate|          19|      175|
|   Prednisone|          17|       14|
|  Doxorubicin|          17|      532|
|Dexamethasone|          16|      391|
|Betamethasone|          16|      368|
+-------------+------------+---------+



In [41]:
filtered_df = joined_df.filter((col("metaedge") == "CtD") | (col("metaedge") == "CpD"))

In [42]:
filtered_df.show(5)

+-----------------+--------------------+--------+-----------------+--------+--------------------+
|               id|                name|    kind|           Source|metaedge|              target|
+-----------------+--------------------+--------+-----------------+--------+--------------------+
|Compound::DB01073|         Fludarabine|Compound|Compound::DB01073|     CtD|Disease::DOID:006...|
|Compound::DB01073|         Fludarabine|Compound|Compound::DB01073|     CtD|  Disease::DOID:2531|
|Compound::DB00945|Acetylsalicylic acid|Compound|Compound::DB00945|     CtD|  Disease::DOID:3393|
|Compound::DB00945|Acetylsalicylic acid|Compound|Compound::DB00945|     CtD| Disease::DOID:13378|
|Compound::DB00945|Acetylsalicylic acid|Compound|Compound::DB00945|     CpD|  Disease::DOID:9074|
+-----------------+--------------------+--------+-----------------+--------+--------------------+
only showing top 5 rows



In [43]:
# Group by 'target' and count the occurrences
result_df = filtered_df.groupBy("target").agg(count("target").alias("count"))
result_df = result_df.orderBy(col('count').desc())

In [44]:
result_df.show(5)

+-------------------+-----+
|             target|count|
+-------------------+-----+
|Disease::DOID:10763|   70|
| Disease::DOID:2531|   53|
| Disease::DOID:1612|   43|
| Disease::DOID:2841|   40|
| Disease::DOID:3393|   38|
+-------------------+-----+
only showing top 5 rows



In [45]:
###############################################################################

# Problem 3

In [46]:
# Order table in descenting order of num_genes
top_5_genes_withName = grouped_df_name.orderBy(col('num_genes').desc()).limit(5)

In [47]:
top_5_genes_withName.show()

+-----------+------------+---------+
|       name|num_diseases|num_genes|
+-----------+------------+---------+
| Crizotinib|           1|      585|
|  Dasatinib|           1|      564|
|Doxorubicin|          17|      532|
|Vinblastine|           7|      523|
|    Digoxin|           2|      522|
+-----------+------------+---------+



In [48]:
# Select only the tope 5 drug names that assoicate with the most num_genes
result3 = top_5_genes_withName.select("name").limit(5)

In [49]:
result3.show()

+-----------+
|       name|
+-----------+
| Crizotinib|
|  Dasatinib|
|Doxorubicin|
|Vinblastine|
|    Digoxin|
+-----------+



In [50]:
#Stop the Spark session when you're done
spark1.stop()
spark2.stop()