In [1]:
import json
import csv
import os
from neo4j import GraphDatabase
import psycopg2
from elasticsearch import Elasticsearch, helpers
from tqdm.notebook import tqdm
import time

In [4]:
dblp_dataset_dir_path = "./datasets/dblp-aminer_v12/"
author_path = dblp_dataset_dir_path + "author.json"
fos_path = dblp_dataset_dir_path + "fos.json"
venue_path = dblp_dataset_dir_path + "venue.json"
paper_author_path = dblp_dataset_dir_path + "paper_author.json"
paper_fos_path = dblp_dataset_dir_path + "paper_fos.json"
paper_paths = [dblp_dataset_dir_path + f for f in os.listdir(dblp_dataset_dir_path) if 'chunk' in f]
dataset_path = dblp_dataset_dir_path + "dblp.v12.json"
paper_paper_path = dblp_dataset_dir_path + "paper_paper.json"
N_RECORDS = 4894081

# Loading data into Neo4J

In [3]:
with open(paper_paper_path) as file:
    paper_ref_paper = json.load(file)

paper_ids = set()
    
with open(dblp_dataset_dir_path + 'edges.csv', 'w') as csv_file:
    writer = csv.writer(csv_file)
    for paper_id, ref_paper_id in tqdm(paper_ref_paper):
        paper_ids.add(paper_id)
        paper_ids.add(ref_paper_id)
        writer.writerow([paper_id, ref_paper_id])
        
with open(dblp_dataset_dir_path + 'nodes.csv','w') as csv_file:
    writer = csv.writer(csv_file)
    for paper_id in tqdm(paper_ids):
        writer.writerow([paper_id])

HBox(children=(FloatProgress(value=0.0, max=45564149.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=4146772.0), HTML(value='')))




In [4]:
print("Nb nodes :", len(paper_ids))
print("Nb edges :", len(paper_ref_paper))

Nb nodes : 4146772
Nb edges : 45564149


In [6]:
driver = GraphDatabase.driver("bolt://localhost:7687",
                              auth=("neo4j", "dblp_v12"),
                              encrypted=False)

In [7]:
with driver.session() as session:
    session.run("CREATE CONSTRAINT ON (n:Paper) ASSERT n.paper_id IS UNIQUE")
    session.run("CREATE CONSTRAINT ON (n:Paper) ASSERT EXISTS (n.paper_id)")

In [8]:
# loading nodes
with driver.session() as session:
    session.run("""USING PERIODIC COMMIT 1000 
                LOAD CSV FROM 'file:///nodes.csv' AS row 
                MERGE (n:Paper {paper_id: toInteger(row[0])})""")

In [9]:
# loading edges
with driver.session() as session:
    session.run("""USING PERIODIC COMMIT 1000 
                LOAD CSV FROM 'file:///edges.csv' AS row 
                MATCH (paper:Paper {paper_id: toInteger(row[0])}), (ref_paper:Paper {paper_id: toInteger(row[1])}) 
                USING INDEX paper:Paper(paper_id) 
                USING INDEX ref_paper:Paper(paper_id) 
                MERGE (paper)-[:CITES]->(ref_paper)""")

In [10]:
with driver.session() as session:
    print("Nb nodes :",list(session.run("MATCH (n) RETURN COUNT(n)").records()))
    print("Nb edges :",list(session.run("MATCH ()-[e]->() RETURN COUNT(e)").records()))

Nb nodes : [<Record COUNT(n)=4146772>]
Nb edges : [<Record COUNT(e)=45564149>]


# Loading data into PostgreSQL

In [22]:
connection = psycopg2.connect(host="localhost",port=5432, database="dblp_v12", 
                              user="dblp_v12", password="dblp_v12")

cursor = connection.cursor()

In [10]:
# Author
cursor.execute("""DROP TABLE IF EXISTS author""")

cursor.execute("""CREATE TABLE author(
                id BIGINT PRIMARY KEY,
                name TEXT NOT NULL,
                org TEXT);""")

connection.commit()

with open(author_path) as file:
    for author in tqdm(json.load(file).values()):
        if 'org' not in author or not author['org']:
            author['org'] = None
        try:
            cursor.execute("""INSERT INTO author (id, name, org)
                          VALUES (%(id)s, %(name)s, %(org)s)""",
                       {'id': author['id'],
                        'name': author['name'],
                        'org': author['org']})
        except:
            print(author)

connection.commit()

HBox(children=(FloatProgress(value=0.0, max=4398138.0), HTML(value='')))

{'name': 'Zhe-Yu Wang', 'org': 'D:Sourav2019\x004_APRIL\x001.04.2019IEEE\x00001547_Rakhi_O_Rakhi_r_Sourav, National Central University, Taiwan', 'id': 2939411041}



In [11]:
cursor.execute("""INSERT INTO author (id, name, org)
                  VALUES (2939411041, 'Zhe-Yu Wang', 'National Central University, Taiwan')""")

connection.commit()

In [8]:
# Fos
cursor.execute("""DROP TABLE IF EXISTS fos""")

cursor.execute("""CREATE TABLE fos(
                id BIGINT PRIMARY KEY,
                name TEXT NOT NULL);""")

connection.commit()

with open(fos_path) as file:
    for fos_id, fos_name in tqdm(json.load(file).items()):
        cursor.execute("""INSERT INTO fos (id, name)
                          VALUES (%(id)s, %(name)s)""",
                       {'id': fos_id,
                        'name': fos_name})

connection.commit()

HBox(children=(FloatProgress(value=0.0, max=132337.0), HTML(value='')))




In [9]:
# Venue
cursor.execute("""DROP TABLE IF EXISTS venue""")

cursor.execute("""CREATE TABLE venue(
                id BIGINT PRIMARY KEY,
                name TEXT NOT NULL,
                type TEXT);""")

connection.commit()

with open(venue_path) as file:
    for venue in tqdm(json.load(file).values()):
        if 'type' not in venue or not venue['type']:
            venue['type'] = None
        cursor.execute("""INSERT INTO venue (id, name, type)
                          VALUES (%(id)s, %(name)s, %(type)s)""",
                       {'id': venue['id'],
                        'name': venue['raw'],
                        'type': venue['type']})

connection.commit()

HBox(children=(FloatProgress(value=0.0, max=48740.0), HTML(value='')))




In [9]:
# Paper
cursor.execute("""DROP TABLE IF EXISTS paper""")

cursor.execute("""CREATE TABLE paper(
                id BIGINT PRIMARY KEY,
                title TEXT NOT NULL,
                venue_id BIGINT REFERENCES venue(venue_id),
                year INTEGER,
                n_citation INTEGER,
                page_start TEXT,
                page_end TEXT,
                doc_type TEXT,
                publisher TEXT,
                volume TEXT,
                issue TEXT,
                doi TEXT,
                abstract TEXT);""")

connection.commit()

for path in tqdm(paper_paths):
    with open(path) as file:
        for paper in tqdm(json.load(file)):
            if 'venue_id' not in paper:
                paper['venue_id'] = None
            if 'year' not in paper:
                paper['year'] = None
            if 'n_citation' not in paper:
                paper['n_citation'] = None
            if 'page_start' not in paper:
                paper['page_start'] = None
            if 'page_end' not in paper:
                paper['page_end'] = None
            if 'doc_type' not in paper:
                paper['doc_type'] = None
            if 'publisher' not in paper:
                paper['publisher'] = None
            if 'volume' not in paper:
                paper['volume'] = None
            if 'issue' not in paper:
                paper['issue'] = None
            if 'doi' not in paper:
                paper['doi'] = None
            if 'abstract' not in paper:
                paper['abstract'] = None        
            

            cursor.execute("""INSERT INTO paper (id, title, venue_id, year, n_citation, page_start, 
                              page_end, doc_type, publisher, volume, issue, doi, abstract) VALUES 
                              (%(id)s, %(title)s, %(venue_id)s, %(year)s, %(n_citation)s, %(page_start)s,
                               %(page_end)s, %(doc_type)s, %(publisher)s, %(volume)s, %(issue)s, %(doi)s, %(abstract)s)""",
                           {'id': paper['id'],
                            'title': paper['title'],
                            'venue_id': paper['venue_id'],
                            'year': paper['year'],
                            'n_citation': paper['n_citation'],
                            'page_start': paper['page_start'],
                            'page_end': paper['page_end'],
                            'doc_type': paper['doc_type'],
                            'publisher': paper['publisher'],
                            'volume': paper['volume'],
                            'issue': paper['issue'],
                            'doi': paper['doi'],
                            'abstract': paper['abstract']})

        
        connection.commit()

HBox(children=(FloatProgress(value=0.0, max=5.0), HTML(value='')))




In [13]:
# PaperAuthor
cursor.execute("""DROP TABLE IF EXISTS paper_author""")

cursor.execute("""CREATE TABLE paper_author(
                paper_id BIGINT NOT NULL,
                author_id BIGINT NOT NULL,
                PRIMARY KEY (paper_id, author_id));""")

connection.commit()

with open(paper_author_path) as file:
    for paper_id, author_id in tqdm(json.load(file)):
        cursor.execute("""INSERT INTO paper_author (paper_id, author_id)
                          VALUES (%(paper_id)s, %(author_id)s)""",
                       {'paper_id': paper_id,
                        'author_id': author_id})

connection.commit()

HBox(children=(FloatProgress(value=0.0, max=14934850.0), HTML(value='')))




In [15]:
cursor.execute("""ALTER TABLE paper_author
                  ADD FOREIGN KEY (paper_id) REFERENCES paper(paper_id)""")

cursor.execute("""ALTER TABLE paper_author
                  ADD FOREIGN KEY (author_id) REFERENCES author(author_id)""")

connection.commit()

In [17]:
# PaperFos
cursor.execute("""DROP TABLE IF EXISTS paper_fos""")

cursor.execute("""CREATE TABLE paper_fos(
                paper_id BIGINT NOT NULL,
                fos_id BIGINT NOT NULL,
                PRIMARY KEY (paper_id, fos_id));""")

connection.commit()

with open(paper_fos_path) as file:
    for paper_id, fos_id in tqdm(json.load(file)):
        cursor.execute("""INSERT INTO paper_fos (paper_id, fos_id)
                          VALUES (%(paper_id)s, %(fos_id)s)""",
                       {'paper_id': paper_id,
                        'fos_id': fos_id})

connection.commit()

HBox(children=(FloatProgress(value=0.0, max=44987427.0), HTML(value='')))




In [18]:
cursor.execute("""ALTER TABLE paper_fos
                  ADD FOREIGN KEY (paper_id) REFERENCES paper(paper_id)""")

cursor.execute("""ALTER TABLE paper_fos
                  ADD FOREIGN KEY (fos_id) REFERENCES fos(fos_id)""")

connection.commit()

In [20]:
# PaperPaper
cursor.execute("""DROP TABLE IF EXISTS paper_ref_paper""")

cursor.execute("""CREATE TABLE paper_ref_paper(
                paper_id BIGINT NOT NULL,
                ref_paper_id BIGINT NOT NULL,
                PRIMARY KEY (paper_id, ref_paper_id));""")

connection.commit()

with open(paper_paper_path) as file:
    for paper_id, ref_paper_id in tqdm(json.load(file)):
        cursor.execute("""INSERT INTO paper_ref_paper (paper_id, ref_paper_id)
                          VALUES (%(paper_id)s, %(ref_paper_id)s)""",
                       {'paper_id': paper_id,
                        'ref_paper_id': ref_paper_id})

connection.commit()

HBox(children=(FloatProgress(value=0.0, max=45564149.0), HTML(value='')))




In [21]:
cursor.execute("""ALTER TABLE paper_ref_paper
                  ADD FOREIGN KEY (paper_id) REFERENCES paper(paper_id)""")

cursor.execute("""ALTER TABLE paper_ref_paper
                  ADD FOREIGN KEY (ref_paper_id) REFERENCES paper(paper_id)""")

connection.commit()

In [23]:
# add references and citations list to paper table
# cursor.execute("ALTER TABLE paper ADD COLUMN reference_ids TEXT, ADD COLUMN citation_ids TEXT")

# connection.commit()

citation_dict = {}

with open(dataset_path) as file:
    file.readline() # skip first line
    for _ in tqdm(range(N_RECORDS)):
        line = file.readline()
        try:
            record = json.loads(line[1:])
        except:
            record = json.loads(line)
            
        if 'references' in record:
            cursor.execute("UPDATE paper SET reference_ids = %s WHERE id = %s ",
                          (json.dumps(record['references']), record['id']))
            
            for ref_id in record['references']:
                if ref_id in citation_dict:
                    citation_dict[ref_id].append(record['id'])
                else:
                    citation_dict[ref_id] = [record['id']]
        
    connection.commit()
        
for paper_id, citing_paper_ids in tqdm(citation_dict.items()):
    cursor.execute("UPDATE paper SET citation_ids = %s WHERE id = %s ",
                   (json.dumps(citing_paper_ids), paper_id))
    
connection.commit()

HBox(children=(FloatProgress(value=0.0, max=4894081.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=3087508.0), HTML(value='')))




In [22]:
cursor.close()
connection.close()

# Loading data into ElasticSearch

In [3]:
es=Elasticsearch([{'host':'localhost', 'port':9200}])

In [4]:
es.indices.delete(index='dblp_v12', ignore=[400, 404])

{'acknowledged': True}

In [5]:
settings = {
        "settings": {
            "number_of_shards": 1,
            "number_of_replicas": 0
        },
}

es.indices.create(index='dblp_v12', ignore=[400, 404], body=settings)

{'acknowledged': True, 'shards_acknowledged': True, 'index': 'dblp_v12'}

In [6]:
def reconstruct_abstract(inverted_index):    
    index = {}
    for word, list_of_pos in inverted_index.items():
        for pos in list_of_pos:
            index[pos] = word
    
    abstract_list = []
    for _, word in sorted(index.items(), key=lambda t: t[0]):
        abstract_list.append(word)
    
    return " ".join(abstract_list)

In [7]:
with open(dataset_path) as file:
    bulk = []
    file.readline() # skip first line
    
    for _ in tqdm(range(4894081)): # n_papers
        line = file.readline()
        try:
            paper = json.loads(line[1:])
        except:
            paper = json.loads(line)
            
        formatted_paper = {} 
        formatted_paper['paper_id'] = paper['id']
        formatted_paper['title'] = paper['title']

        if 'indexed_abstract' in paper:
            indexed_abstract = paper.pop('indexed_abstract')
            formatted_paper['abstract'] = reconstruct_abstract(indexed_abstract['InvertedIndex'])
        else:
            formatted_paper['abstract'] = ''
            
        if 'fos' in paper:
            formatted_paper['fos'] = ", ".join([fos['name'] for fos in paper['fos']])
        else:
            formatted_paper['fos'] = ''
            
        formatted_paper['doi'] = paper.pop('doi', '')
            
        bulk.append(formatted_paper)

        if len(bulk) > 5000:
            helpers.bulk(es, bulk, index='dblp_v12', doc_type='paper')
            bulk = []
                
    if bulk:
        helpers.bulk(es, bulk, index='dblp_v12', doc_type='paper')

HBox(children=(FloatProgress(value=0.0, max=4894081.0), HTML(value='')))




In [8]:
es.indices.refresh('dblp_v12')
es.cat.count('dblp_v12', params={"format": "json"})

[{'epoch': '1587563044', 'timestamp': '13:44:04', 'count': '4894081'}]