In [33]:
import time
import pandas as pd
import numpy as np
from neo4j import GraphDatabase

pd.options.mode.chained_assignment = None

In [2]:
from ogb.lsc import WikiKG90MDataset
dataset = WikiKG90MDataset(root = './data')

In [3]:
print(dataset.num_entities) # number of entities
print(dataset.num_relations) # number of relation types
print(dataset.num_feat_dims) # dimensionality of entity/relation features.

87143637
1315
768


In [4]:
train_hrt = dataset.train_hrt # numpy ndarray of shape (num_triplets, 3)
print(train_hrt[0:10]) # get i-th training triplet [h, r, t]

[[      0    1182  650146]
 [      0    1182  650332]
 [      0     876  989978]
 [      0     168 1054521]
 [      0    1182 1632931]
 [      0     876 2437486]
 [      0     324 2705654]
 [      0     324 5885915]
 [      0     427 6972024]
 [      0     427 7156732]]


In [5]:
train_df = pd.DataFrame(train_hrt, columns = ['source', 'edge', 'target'])
train_df.head()

Unnamed: 0,source,edge,target
0,0,1182,650146
1,0,1182,650332
2,0,876,989978
3,0,168,1054521
4,0,1182,1632931


In [6]:
class Neo4jConnection:
    
    def __init__(self, uri, user, pwd):
        self.__uri = uri
        self.__user = user
        self.__pwd = pwd
        self.__driver = None
        try:
            self.__driver = GraphDatabase.driver(self.__uri, auth=(self.__user, self.__pwd))
        except Exception as e:
            print("Failed to create the driver:", e)
        
    def close(self):
        if self.__driver is not None:
            self.__driver.close()
        
    def query(self, query, parameters=None, db=None):
        assert self.__driver is not None, "Driver not initialized!"
        session = None
        response = None
        try: 
            session = self.__driver.session(database=db) if db is not None else self.__driver.session() 
            response = list(session.run(query, parameters))
        except Exception as e:
            print("Query failed:", e)
        finally: 
            if session is not None:
                session.close()
        return response

In [46]:
conn = Neo4jConnection(uri='bolt://neo4j:7687', user='neo4j', pwd='1234')
conn.query('CREATE CONSTRAINT nodes IF NOT EXISTS ON (n:Node) ASSERT n.id IS UNIQUE')

[]

In [8]:
src = pd.DataFrame(train_df[['source']]).drop_duplicates(subset=['source']).rename(columns={'source': 'node_id'})
tar = pd.DataFrame(train_df[['target']]).drop_duplicates(subset=['target']).rename(columns={'target': 'node_id'})
node_df = pd.concat([src, tar])
node_df.head()

Unnamed: 0,node_id
0,0
36,1
135,2
269,3
278,4


In [9]:
node_df_dedup = node_df.drop_duplicates(subset=['node_id'])
node_df_dedup.head()

Unnamed: 0,node_id
0,0
36,1
135,2
269,3
278,4


In [10]:
node_df_dedup.shape, node_df_dedup.dtypes

((87143637, 1),
 node_id    int64
 dtype: object)

In [44]:
def add_nodes(rows, batch_size=100000):
    
    query = '''UNWIND $rows AS row
               MERGE (:Node {id: row.node_id})
               RETURN COUNT(*) AS total
    '''
    
    return insert_data(query, rows, batch_size)


def add_edges(rows, batch_size=50000):
    
    
    query = """UNWIND $rows AS row
               MATCH (src:Node {id: row.source}), (tar:Node {id: row.target})
               CREATE (src)-[:REL_%s]->(tar)
    """ % edge
    
    return insert_data(query, rows, batch_size)


def get_embs(val):

    return dataset.entity_feat[val].tolist()


def add_embeddings(rows, batch_size=10000):
    
    #query = """CALL apoc.periodic.iterate(
    #               'UNWIND $rows AS row RETURN row',
    #               'MATCH(n:Node {id: row.node_id}) SET n.embedding = row.embedding', 
    #                {batchSize:1000, parallel:true, params: {rows, $rows}})
    #"""
    
    # ~52 seconds / 10k records
    #query = '''UNWIND $rows AS row
    #           MATCH (n:Node {id: row.node_id})
    #           SET n.embedding = row.embedding
    #'''
    
    #query = '''UNWIND $rows AS row
    #       MERGE (n:Node {id: row.node_id})
    #       SET n.embedding = row.embedding
    #'''
    
    #query = """CALL apoc.periodic.iterate('UNWIND $rows AS row RETURN row',
    #                                      'MATCH (n:Node {id: row.id}) SET n.embedding = row.embedding', 
    #                                      {params: {rows:$rows}, batchSize:1000, parallel:true})'
    #"""
    
    query = '''CALL apoc.periodic.iterate('UNWIND $rows AS row RETURN row',
           'MATCH (n:Node {id: row.node_id}) SET n.embedding = row.embedding', 
           {params:{rows:$rows}, batchSize:1000, parallel:true})
         '''
    
    return insert_data(query, rows, batch_size)


def insert_data(query, rows, batch_size = 100000):
    # Function to handle the updating the Neo4j database in batch mode.

    total = 0
    batch = 0
    start = time.time()
    result = None

    while batch * batch_size < len(rows):

        res = conn.query(query, parameters={'rows': rows[batch*batch_size:(batch+1)*batch_size].to_dict('records')})
        try:
            total += res[0]['total']
        except:
            print(res)
        batch += 1
        result = {"total":total, "batches":batch, "time":time.time()-start}
        print(result)

    return result

In [14]:
%%time
add_nodes(node_df_dedup)

StopIteration: 

In [16]:
edge_ls = train_df['edge'].unique().tolist()

In [42]:
for edge in edge_ls:
    #y = train_df['edge'].loc[train_df['edge'] == edge]
    y = train_df[train_df['edge'] == edge]
    #print(y.shape)
    add_edges(y)

[]
{'total': 0, 'batches': 1, 'time': 3.6605336666107178}
[]
{'total': 0, 'batches': 2, 'time': 7.425798177719116}
[]
{'total': 0, 'batches': 3, 'time': 11.456313848495483}
[]
{'total': 0, 'batches': 4, 'time': 12.49168348312378}
[]
{'total': 0, 'batches': 1, 'time': 3.7426490783691406}
[]
{'total': 0, 'batches': 2, 'time': 7.435235500335693}
[]
{'total': 0, 'batches': 3, 'time': 11.047674417495728}
[]
{'total': 0, 'batches': 4, 'time': 14.634310722351074}
[]
{'total': 0, 'batches': 5, 'time': 18.22140121459961}
[]
{'total': 0, 'batches': 6, 'time': 21.741209030151367}
[]
{'total': 0, 'batches': 7, 'time': 25.36509609222412}
[]
{'total': 0, 'batches': 8, 'time': 28.782644033432007}
[]
{'total': 0, 'batches': 9, 'time': 32.36529898643494}
[]
{'total': 0, 'batches': 10, 'time': 36.06322407722473}
[]
{'total': 0, 'batches': 11, 'time': 39.536763429641724}
[]
{'total': 0, 'batches': 12, 'time': 43.06456708908081}
[]
{'total': 0, 'batches': 13, 'time': 46.43952989578247}
[]
{'total': 0, 'ba

In [17]:
src.head()

Unnamed: 0,node_id
0,0
36,1
135,2
269,3
278,4


In [18]:
src.shape

(87110946, 1)

In [19]:
tar.head()

Unnamed: 0,node_id
0,650146
1,650332
2,989978
3,1054521
4,1632931


In [20]:
tar.shape

(25633917, 1)

In [21]:
src[~src.isin(tar)].dropna().shape

(87110614, 1)

In [22]:
node_df_dedup.head()

Unnamed: 0,node_id
0,0
36,1
135,2
269,3
278,4


In [47]:
%%time

test_df = node_df_dedup.head(100000)

batch = 0
batch_size = 10000
rows = test_df.shape[0]

while batch * batch_size < rows:

    start_time = time.time()
    start = batch * batch_size
    stop = (batch * batch_size) + batch_size

    temp_df = test_df.iloc[start:stop]
    temp_df['embedding'] = temp_df['node_id'].map(get_embs)
    add_embeddings(temp_df, batch_size=batch_size)
    
    #temp_ls = temp_df['embedding'].values.tolist()
    
    end_time = time.time()
    time_diff = int(end_time - start_time)

    print(start, stop, batch, batch_size, time_diff)
    
    batch += 1
    temp_df = pd.DataFrame()
    

{'total': 10000, 'batches': 1, 'time': 63.32327890396118}
0 10000 0 10000 63
{'total': 10000, 'batches': 1, 'time': 66.31492829322815}
10000 20000 1 10000 66
{'total': 10000, 'batches': 1, 'time': 65.08109831809998}
20000 30000 2 10000 65
{'total': 10000, 'batches': 1, 'time': 65.91580438613892}
30000 40000 3 10000 66
{'total': 10000, 'batches': 1, 'time': 65.8884003162384}
40000 50000 4 10000 66
{'total': 10000, 'batches': 1, 'time': 66.38389277458191}
50000 60000 5 10000 66
{'total': 10000, 'batches': 1, 'time': 65.23676013946533}
60000 70000 6 10000 65
{'total': 10000, 'batches': 1, 'time': 64.25139164924622}
70000 80000 7 10000 64
{'total': 10000, 'batches': 1, 'time': 66.12420439720154}
80000 90000 8 10000 66


KeyboardInterrupt: 