In [1]:
%reload_kedro

In [2]:
import itertools
from collections import defaultdict
from rich import print
from IPython.display import display, HTML

import dask
import dask.dataframe as dd
import dask.array as da
from dask.distributed import Client, progress, performance_report
from dask_jobqueue import SLURMCluster

In [3]:
cluster = SLURMCluster(
    cores=1,  # Number of cores per job
    processes=1,
    memory='32GB',  # Memory allocated to each worker
    walltime='00:60:00',  # Walltime limit for each job
    # Specify any additional SLURM or Dask configurations as needed
)

In [4]:
client = Client(cluster)  # start distributed scheduler locally.

In [5]:
# Scale the cluster to the desired number of workers
cluster.scale(jobs=4)  # Request 10 jobs, adjust based on your needs

In [6]:
!pwd

/home/rahit/scratch/modspy-data


In [33]:
nodes_df = dd.read_csv('./data/01_raw/monarch/monarch-kg_nodes.tsv', sep='\t', usecols=['id', 'category', 'name', 'in_taxon', 'in_taxon_label', 'symbol'], dtype={'id':'object', 
                        'category':'object', 'name':'object', 'in_taxon':'object', 'in_taxon_label':'object', 'symbol':'object'})
edges_df = dd.read_csv('./data/01_raw/monarch/monarch-kg_edges.tsv', sep='\t', usecols=['id', 'original_subject', 'predicate', 'original_object', 'category', 'subject', 'object'],
                      dtype={'id':'object', 'original_subject':'object', 'predicate':'object', 'original_object':'object', 'category':'object', 'subject':'object', 'object':'object'})

### Add category based ID to Node datagframe

In [34]:
# Group by 'node_type' and assign type-wise indices
nodes_df['type_index'] = nodes_df.groupby('category').cumcount()

In [35]:
nodes_df.head()

Unnamed: 0,id,category,name,in_taxon,in_taxon_label,symbol,type_index
0,PomBase:SPAC1002.01,biolink:Gene,mrx11,NCBITaxon:4896,Schizosaccharomyces pombe,mrx11,0
1,PomBase:SPAC1002.02,biolink:Gene,pom34,NCBITaxon:4896,Schizosaccharomyces pombe,pom34,1
2,PomBase:SPAC1002.03c,biolink:Gene,gls2,NCBITaxon:4896,Schizosaccharomyces pombe,gls2,2
3,PomBase:SPAC1002.04c,biolink:Gene,taf11,NCBITaxon:4896,Schizosaccharomyces pombe,taf11,3
4,PomBase:SPAC1002.05c,biolink:Gene,jmj2,NCBITaxon:4896,Schizosaccharomyces pombe,jmj2,4


In [36]:
# nodes_df = nodes_df.assign(enum_idx=1)
# nodes_df['enum_idx'] = nodes_df['enum_idx'].cumsum() - 1
nodes_df = nodes_df.set_index('id')
nodes_df.columns

Index(['category', 'name', 'in_taxon', 'in_taxon_label', 'symbol',
       'type_index'],
      dtype='object')

In [10]:
nodes_df.to_parquet('./data/02_intermediate/monarch/nodes_with_type_idx')  

In [8]:
edges_df.head()

Unnamed: 0,id,original_subject,predicate,original_object,category,subject,object
0,uuid:68d6e706-9bb0-11ee-b780-6b2918cfaf31,NCBIGene:64170,biolink:causes,OMIM:212050,biolink:CausalGeneToDiseaseAssociation,HGNC:16391,MONDO:0008905
1,uuid:68d6e707-9bb0-11ee-b780-6b2918cfaf31,NCBIGene:51256,biolink:causes,OMIM:248000,biolink:CausalGeneToDiseaseAssociation,HGNC:21066,MONDO:0009544
2,uuid:68d6e708-9bb0-11ee-b780-6b2918cfaf31,NCBIGene:28981,biolink:causes,OMIM:617895,biolink:CausalGeneToDiseaseAssociation,HGNC:14313,MONDO:0033485
3,uuid:68d6e709-9bb0-11ee-b780-6b2918cfaf31,NCBIGene:8216,biolink:causes,OMIM:616564,biolink:CausalGeneToDiseaseAssociation,HGNC:6742,MONDO:0014693
4,uuid:68d6e70a-9bb0-11ee-b780-6b2918cfaf31,NCBIGene:6505,biolink:contributes_to,OMIM:615232,biolink:CorrelatedGeneToDiseaseAssociation,HGNC:10939,MONDO:0014092


### Adding nodes' category information by merging node dataframe on the subject and object column.

In [10]:
_edf = edges_df.merge(nodes_df, left_on='subject', right_index=True, suffixes=('_ndf', '_edf'))    # @TODO swap _ndf and _edf 
_edf = _edf.rename(columns={'category_ndf': 'subject_category', 'category_edf': 'edge_category'})
print(f"Columns after merging on subject category: {_edf.columns}")

_edf = _edf.merge(nodes_df, left_on='object', right_index=True, suffixes=('_ndf', '_edf'))
_edf = _edf.rename(columns={'category': 'object_category'})
print(f"Columns after merging on object category: {_edf.columns}")

In [11]:
edges = _edf[['id','subject', 'subject_category', 'predicate', 'edge_category', 'object_category', 'object']]
print(f"Columns after renaming and triming: {edges.columns}")

In [12]:
# edges['edge_key']  = edges.apply(lambda row:(row['subject_category'], row['predicate'], row['object_category']), axis=1, meta=pd.Series(dtype=(set<str>)))
edges = edges.reset_index(drop=True)
edges.columns

Index(['id', 'subject', 'subject_category', 'predicate', 'edge_category',
       'object_category', 'object'],
      dtype='object')

In [13]:
edges.to_parquet('../../data/02_intermediate/monarch/edges_with_node_cat', write_index=False)  

In [19]:
!ls -ahl '../../data/02_intermediate/monarch/edges_with_node_cat'

total 168M
drwxr-x--- 2 rahit rahit  33K Feb  7 19:15 .
drwxr-x--- 3 rahit rahit  33K Feb  7 18:17 ..
-rw-r----- 1 rahit rahit 100M Feb  7 19:15 part.0.parquet
-rw-r----- 1 rahit rahit  16M Feb  7 19:15 part.1.parquet
-rw-r----- 1 rahit rahit  53M Feb  7 19:15 part.2.parquet


In [15]:
edges.shape[0].compute()

11412471

In [34]:
cluster.close()

In [35]:
client.close()

### Standerdizing the category names and adding a node ids from Node dataframe.

In [7]:
nodes_df = dd.read_parquet('./data/02_intermediate/monarch/nodes_with_type_idx')  
edges_df = dd.read_parquet('./data/02_intermediate/monarch/edges_with_node_cat')

In [8]:
# Repartition for efficient parallelization
edges_df = edges_df.repartition(npartitions=4)
edges_df = edges_df.persist()  # if on a distributed system

In [9]:

############# CHANGE ME #######################
# Should be removed. It is added because the edge dataframe has erroneous column names.
edges_df = edges_df.rename(columns={'subject_category': 'e_category', 'edge_category': 'subject_category'}).rename(columns={'e_category': 'edge_category'})
###############################################
# display(edges_df.head())
_edf = edges_df.merge(nodes_df, left_on='subject', right_on='id', suffixes=('_ndf', '_edf'))
# print(f"Columns after merging on subject category: {_edf.columns}")
# display(_edf.head())
_edf = _edf.rename(columns={'type_index': 'subject_id'})
# print(f"Columns after merging on subject category: {_edf.columns}")
# display(_edf.head())
_edf = _edf.merge(nodes_df, left_on='object', right_on='id', suffixes=('_ndf', '_edf'))
# print(f"Columns after merging on object category: {_edf.columns}")
_edf = _edf.rename(columns={'type_index': 'object_id'})
# print(f"Columns after merging on object category: {_edf.columns}")
# display(_edf.head())

# Keep only the columns we need
edges = _edf[['id','subject', 'subject_id', 'subject_category', 'predicate', 'edge_category', 'object_category', 'object_id', 'object']].copy()
# print(f"Columns after renaming and trimming: {edges.columns}")
    
# logger.info(f"➡️ Creating edges")

################## INITIALIZE EDGES ##################
# Convert the edge categories to categoricals for efficiency
edges['subject_category'] = edges['subject_category'].astype('category')
edges['predicate'] = edges['predicate'].astype('category')
edges['object_category'] = edges['object_category'].astype('category')
# edges['edge_category'] = edges['edge_category'].astype('category')



In [10]:
# Map ('subject_category','predicate','object_category') to integer index for optimization purpose
edges['edge_key'] = edges['subject_category'].astype(str) + '-' + edges['predicate'].astype(str) + '-' + edges['object_category'].astype(str)
edges['edge_key'] = edges['edge_key'].astype('category').cat.as_known()

# From unknown type categorical to known type categorical
edges = edges.categorize(columns=['subject_category','predicate','object_category', 'edge_key'])

mapping_dict = {k: i for i, k in enumerate(edges['edge_key'].unique().compute())}
edges['edge_key_id'] = edges['edge_key'].map(mapping_dict)
edges['edge_key_id'] = edges['edge_key_id'].astype('category')
edges = edges.categorize(columns=['edge_key_id'])   # From unknown type categorical to known type categorical

# len(edges['edge_key_id'].unique().compute()) == len(edges['edge_key'].unique().compute()) # n=100? (unconfirmed)

# Save edges dataframe at current state before reducing.
edges.to_parquet(f"./data/02_intermediate/monarch/edges_pre_df_reduction_v2")

### Reduce edge dataframe to only include the columns that are needed for the graph

In [35]:
edges = dd.read_parquet('./data/02_intermediate/monarch/edges_pre_df_reduction', 
                           columns=['edge_key_id','subject_id','object_id'])

In [None]:

# Create the key of the edge_index
def key_function(edge):
    return edge['edge_key']

# accumulate the edges for a specific key
def binop(accumulator, edge):
    # accumulator is a list of (subject_id, object_id) tuples
    # edge is the current item being processed
    accumulator.append([edge['subject_id'], edge['object_id']])
    return accumulator

# combine multiple list that are accumulated
def combine(accumulator1, accumulator2):
    # Combine the lists from two accumulators
    return accumulator1 + accumulator2

_temp_bag = edges_df.to_bag(format='dict')
# Using foldby to group by 'edge_key' and consolidate (subject_id, object_id) pairs
# _r = _temp_bag.foldby(key=key_function, binop=binop, combine=combine, initial=[])
edge_type_mappings = _temp_bag.foldby(key=lambda x: (x[0],x[1],x[2]), binop=binop, combine=combine, initial=[], split_every=8)


In [None]:
################################ CONSTRUCTION #################################

In [None]:
# Apply the function to each partition
_temp_e = __e.map_partitions(create_edge_tuple)[['edge_key_id', 'subject_id','object_id']]
_temp_bag = _temp_e.to_bag(format='dict')
_temp_bag.compute()

In [None]:
# Keep only necessary cols of edge dataframe to reduce memory overhead
edges = _edf[].copy()


In [18]:
print(f"Columns after renaming and triming: {edges.columns}")

In [20]:
display(_edf.head())

In [19]:
_edf.index.shape[0].compute()

In [15]:
cluster.close()
client.close()