In [1]:
import time
import pandas as pd
import numpy as np
from neo4j import GraphDatabase

In [2]:
from ogb.lsc import WikiKG90MDataset
dataset = WikiKG90MDataset(root = './data')

This will download 93.81GB. Will you proceed? (y/N)
 y


Downloading https://dgl-data.s3-accelerate.amazonaws.com/dataset/OGB-LSC/wikikg90m_kddcup2021.zip


Downloaded 93.81 GB: 100%|██████████| 96062/96062 [32:10<00:00, 49.76it/s]  


Extracting ./data/wikikg90m_kddcup2021.zip


In [3]:
print(dataset.num_entities) # number of entities
print(dataset.num_relations) # number of relation types
print(dataset.num_feat_dims) # dimensionality of entity/relation features.

87143637
1315
768


In [4]:
train_hrt = dataset.train_hrt # numpy ndarray of shape (num_triplets, 3)
print(train_hrt[0:10]) # get i-th training triplet [h, r, t]

[[      0    1182  650146]
 [      0    1182  650332]
 [      0     876  989978]
 [      0     168 1054521]
 [      0    1182 1632931]
 [      0     876 2437486]
 [      0     324 2705654]
 [      0     324 5885915]
 [      0     427 6972024]
 [      0     427 7156732]]


In [5]:
train_df = pd.DataFrame(train_hrt, columns = ['source', 'edge', 'target'])
train_df.head()

Unnamed: 0,source,edge,target
0,0,1182,650146
1,0,1182,650332
2,0,876,989978
3,0,168,1054521
4,0,1182,1632931


In [6]:
class Neo4jConnection:
    
    def __init__(self, uri, user, pwd):
        self.__uri = uri
        self.__user = user
        self.__pwd = pwd
        self.__driver = None
        try:
            self.__driver = GraphDatabase.driver(self.__uri, auth=(self.__user, self.__pwd))
        except Exception as e:
            print("Failed to create the driver:", e)
        
    def close(self):
        if self.__driver is not None:
            self.__driver.close()
        
    def query(self, query, parameters=None, db=None):
        assert self.__driver is not None, "Driver not initialized!"
        session = None
        response = None
        try: 
            session = self.__driver.session(database=db) if db is not None else self.__driver.session() 
            response = list(session.run(query, parameters))
        except Exception as e:
            print("Query failed:", e)
        finally: 
            if session is not None:
                session.close()
        return response

In [7]:
conn = Neo4jConnection(uri='bolt://neo4j:7687', user='neo4j', pwd='1234')
conn.query('CREATE CONSTRAINT nodes IF NOT EXISTS ON (n:Node) ASSERT n.id IS UNIQUE')

[]

In [8]:
src = pd.DataFrame(train_df[['source']]).drop_duplicates(subset=['source']).rename(columns={'source': 'node_id'})
tar = pd.DataFrame(train_df[['target']]).drop_duplicates(subset=['target']).rename(columns={'target': 'node_id'})
node_df = pd.concat([src, tar])
node_df.head()

Unnamed: 0,node_id
0,0
36,1
135,2
269,3
278,4


In [9]:
node_df_dedup = node_df.drop_duplicates(subset=['node_id'])
node_df_dedup.head()

Unnamed: 0,node_id
0,0
36,1
135,2
269,3
278,4


In [10]:
node_df_dedup.shape, node_df_dedup.dtypes

((87143637, 1),
 node_id    int64
 dtype: object)

In [11]:
def add_nodes(rows, batch_size=100000):
    
    query = '''UNWIND $rows AS row
               MERGE (:Node {id: row.node_id})
               RETURN COUNT(*) AS total
    '''
    
    return insert_data(query, rows, batch_size)


def add_edges(rows, batch_size=100000):
    
    query = """UNWIND $rows AS row
               MATCH (src:Node {id: row.source})
               MATCH (tar:Node {id: row.target})
               CREATE (src)-[:REL_'{}']->(tar)
    """.format(edge)
    
    return insert_data(query, rows, batch_size)


def insert_data(query, rows, batch_size = 100000):
    # Function to handle the updating the Neo4j database in batch mode.

    total = 0
    batch = 0
    start = time.time()
    result = None

    while batch * batch_size < len(rows):

        res = conn.query(query, parameters={'rows': rows[batch*batch_size:(batch+1)*batch_size].to_dict('records')})
        total += res[0]['total']
        batch += 1
        result = {"total":total, "batches":batch, "time":time.time()-start}
        print(result)

    return result

In [12]:
%%time
add_nodes(node_df_dedup)

{'total': 100000, 'batches': 1, 'time': 6.1925506591796875}
{'total': 200000, 'batches': 2, 'time': 10.378504037857056}
{'total': 300000, 'batches': 3, 'time': 14.053556442260742}
{'total': 400000, 'batches': 4, 'time': 18.04265284538269}
{'total': 500000, 'batches': 5, 'time': 22.34262180328369}
{'total': 600000, 'batches': 6, 'time': 26.126931190490723}
{'total': 700000, 'batches': 7, 'time': 29.86059880256653}
{'total': 800000, 'batches': 8, 'time': 34.584617376327515}
{'total': 900000, 'batches': 9, 'time': 38.3093957901001}
{'total': 1000000, 'batches': 10, 'time': 41.988617181777954}
{'total': 1100000, 'batches': 11, 'time': 45.79384732246399}
{'total': 1200000, 'batches': 12, 'time': 49.445215463638306}
{'total': 1300000, 'batches': 13, 'time': 53.07652926445007}
{'total': 1400000, 'batches': 14, 'time': 56.753628969192505}
{'total': 1500000, 'batches': 15, 'time': 60.392677545547485}
{'total': 1600000, 'batches': 16, 'time': 64.07553052902222}
{'total': 1700000, 'batches': 17, 

{'total': 87143637, 'batches': 872, 'time': 3143.2012157440186}

In [None]:
edge_ls = train_df['edge'].unique().tolist()

In [None]:
for edge in edge_ls:
    y = train_df['edge'].loc[train_df['edge'] == edge]
    add_edges(y)