In [4]:
import re
import ijson

In [5]:
# Define a function to fix a line
def fix_line(line):
    return re.sub(r'NumberInt\((\d+)\)', r'\1', line)

# Process the file line by line
with open('dblpv13.json', 'r') as infile, open('dblpv13_fixed.json', 'w') as outfile:
    for line in infile:
        outfile.write(fix_line(line))

In [None]:
def validate_large_json(filename):
    with open(filename, 'r') as f:
        parser = ijson.parse(f)
        try:
            for prefix, event, value in parser:
                pass
            print("The JSON file is valid.")
        except ijson.JSONError as e:
            print(f"Invalid JSON file: {e}")

validate_large_json('dblpv13_fixed.json')

KeyboardInterrupt: 

docker run --name advdaba_labo2 `
-p7474:7474 `
-p7687:7687 `
-v $HOME/neo4j/advDB-lab2/logs:/logs `
-v $HOME/neo4j/advDB-lab2/data:/data `
-v $HOME/neo4j/advDB-lab2/import:/var/lib/neo4j/import `
--memory="3g" `
--env NEO4J_AUTH=neo4j/testtest `
neo4j:latest

In [None]:
import json
from neo4j import GraphDatabase
import re

max_line_to_load = 10
filename = "dblpv13.json"
uri = "bolt://0.0.0.0:7687"
driver = GraphDatabase.driver(uri, auth=("neo4j", "testtest"))

def stream_json_objects(file):
    depth = 0
    obj = []
    for line in file:
        stripped = line.strip()
        if not stripped:
            continue

        if stripped[0] == '{':
            depth += 1
        if stripped[-1] == '}':
            depth -= 1
        
        obj.append(stripped)

        if depth == 0:
            yield json.loads(''.join(obj))
            obj = []

def corrected_json_lines(file):
    for line in file:
        yield re.sub(r'NumberInt\((\d+)\)', r'\1', line)

def add_article_and_related_data(tx, article):
    # Create ARTICLE node
    article_node = tx.run("MERGE (a:Article {_id: $id, title: $title}) RETURN a",
                          id=article["_id"], title=article["title"]).single()["a"]

    # Create AUTHORED relationships
    for author_name in article.get("authors", []):
        author_node = tx.run("MERGE (a:Author {_id: $name, name: $name}) RETURN a", 
                             name=author_name).single()["a"]
        tx.run("MERGE (a)-[:AUTHORED]->(b)", a=author_node, b=article_node)

    # Create CITES relationships
    for cited_article_id in article.get("references", []):
        tx.run("""
        MATCH (a:Article), (b:Article)
        WHERE a._id = $id AND b._id = $ref_id
        MERGE (a)-[:CITES]->(b)
        """, id=article["_id"], ref_id=cited_article_id)

print("Start processing...")
with open(filename, 'r') as file:
    articles = stream_json_objects(file)
    with driver.session() as session:
        for idx, item in enumerate(articles):
            if idx >= max_line_to_load:
                break
            if "title" not in item:
                print(f"Article {item['_id']} does not have a title. Skipping.")
                continue
            try:
                session.write_transaction(add_article_and_related_data, item)
            except Exception as e:
                print(f"Error processing article {item['_id']}: {e}")

driver.close()


Start processing...


JSONDecodeError: Expecting value: line 1 column 2 (char 1)

In [None]:
import json
import re

def corrected_json_lines(file):
    for line in file:
        yield re.sub(r'NumberInt\((\d+)\)', r'\1', line)

with open('dblpv13.json', 'r') as json_file:
    corrected_lines = corrected_json_lines(json_file)
    
    for line in corrected_lines:
        try:
            data = json.loads(line)
            # Process the JSON data here
            print(data)
        except json.JSONDecodeError as e:
            print(f"Error decoding JSON: {e}")


In [None]:
import pandas as pd
import json

filename = 'biggertest.json'

# Load the data from the file
with open(filename, 'r') as file:
    data = json.load(file)

# Create a list to hold consolidated data
consolidated_data = []

for article in data:
    article_id = article['_id']
    article_title = article['title']
    authors = article.get('authors', [])
    references = article.get('references', [])
    
    for author in authors:
        # Handle the case when author name or _id is missing
        author_name = author.get('name', None)  # Use None if 'name' key is missing
        author_id = author.get('_id', None)  # Use None if '_id' key is missing
        
        # If no references, append article and author details only
        if not references:
            consolidated_data.append({
                'article_id': article_id,
                'article_title': article_title,
                'author_id': author_id,
                'author_name': author_name,
                'cited_article_id': None
            })
        else:
            for reference in references:
                consolidated_data.append({
                    'article_id': article_id,
                    'article_title': article_title,
                    'author_id': author_id,
                    'author_name': author_name,
                    'cited_article_id': reference
                })
    
    # Handle case where there are no authors but there are references
    if not authors and references:
        for reference in references:
            consolidated_data.append({
                'article_id': article_id,
                'article_title': article_title,
                'author_id': None,
                'author_name': None,
                'cited_article_id': reference
            })

df = pd.DataFrame(consolidated_data)
print(df)


                   article_id  \
0    53e99784b7602d9701f3e133   
1    53e99784b7602d9701f3e133   
2    53e99784b7602d9701f3e133   
3    53e99784b7602d9701f3e133   
4    53e99784b7602d9701f3e133   
..                        ...   
139  53e99784b7602d9701f3f95a   
140  53e99784b7602d9701f3f95a   
141  53e99784b7602d9701f3f95a   
142  53e99784b7602d9701f3f95a   
143  53e99784b7602d9701f3f95b   

                                         article_title  \
0    The relationship between canopy parameters and...   
1    The relationship between canopy parameters and...   
2    The relationship between canopy parameters and...   
3    The relationship between canopy parameters and...   
4    The relationship between canopy parameters and...   
..                                                 ...   
139                                             FACETS   
140                                             FACETS   
141                                             FACETS   
142                    

In [None]:
import pandas as pd
import ijson

filename = 'dblpv13.json'

# Function to process an individual article from the JSON file
def process_article(article):
    data_list = []
    article_id = article['_id']
    article_title = article['title']
    authors = article.get('authors', [])
    references = article.get('references', [])

    for author in authors:
        author_name = author.get('name', None)
        author_id = author.get('_id', None)

        if not references:
            data_list.append({
                'article_id': article_id,
                'article_title': article_title,
                'author_id': author_id,
                'author_name': author_name,
                'cited_article_id': None
            })
        else:
            for reference in references:
                data_list.append({
                    'article_id': article_id,
                    'article_title': article_title,
                    'author_id': author_id,
                    'author_name': author_name,
                    'cited_article_id': reference
                })

    if not authors and references:
        for reference in references:
            data_list.append({
                'article_id': article_id,
                'article_title': article_title,
                'author_id': None,
                'author_name': None,
                'cited_article_id': reference
            })

    return data_list

# Iteratively read and process the JSON file
consolidated_data = []
with open(filename, 'r') as file:
    articles = ijson.items(file, 'item')
    for article in articles:
        consolidated_data.extend(process_article(article))

df = pd.DataFrame(consolidated_data)
display(df)



UnexpectedSymbol: Unexpected symbol 'N' at 103

In [None]:
import pandas as pd
import ijson
import re

filename = 'dblpv13.json'

# Function to preprocess JSON lines
def corrected_json_lines(file):
    for line in file:
        yield re.sub(r'NumberInt\((\d+)\)', r'\1', line)

# Function to process an individual article from the JSON file
def process_article(article):
    data_list = []
    article_id = article['_id']
    article_title = article['title']
    authors = article.get('authors', [])
    references = article.get('references', [])

    for author in authors:
        author_name = author.get('name', None)
        author_id = author.get('_id', None)

        if not references:
            data_list.append({
                'article_id': article_id,
                'article_title': article_title,
                'author_id': author_id,
                'author_name': author_name,
                'cited_article_id': None
            })
        else:
            for reference in references:
                data_list.append({
                    'article_id': article_id,
                    'article_title': article_title,
                    'author_id': author_id,
                    'author_name': author_name,
                    'cited_article_id': reference
                })

    if not authors and references:
        for reference in references:
            data_list.append({
                'article_id': article_id,
                'article_title': article_title,
                'author_id': None,
                'author_name': None,
                'cited_article_id': reference
            })

    return data_list

# Iteratively read and process the JSON file
consolidated_data = []
with open(filename, 'r') as file:
    # Apply preprocessing on each line for MongoDB's NumberInt
    corrected_file = corrected_json_lines(file)
    
    articles = ijson.items(corrected_file, 'item')
    for article in articles:
        print(article)
        consolidated_data.extend(process_article(article))

# df = pd.DataFrame(consolidated_data)
# display(df)


ValueError: not enough values to unpack (expected 3, got 2)

In [None]:
import pandas as pd
import json

filename = 'biggertest.json'

# Load the data from the file
with open(filename, 'r') as file:
    data = json.load(file)

# Create a list to hold consolidated data
consolidated_data = []

for article in data:
    article_id = article['_id']
    article_title = article['title']
    authors = article.get('authors', [])
    references = article.get('references', [])
    
    for author in authors:
        # Handle the case when author name or _id is missing
        author_name = author.get('name', None)  # Use None if 'name' key is missing
        author_id = author.get('_id', None)  # Use None if '_id' key is missing
        
        # If no references, append article and author details only
        if not references:
            consolidated_data.append({
                'article_id': article_id,
                'article_title': article_title,
                'author_id': author_id,
                'author_name': author_name,
                'cited_article_id': None
            })
        else:
            for reference in references:
                consolidated_data.append({
                    'article_id': article_id,
                    'article_title': article_title,
                    'author_id': author_id,
                    'author_name': author_name,
                    'cited_article_id': reference
                })
    
    # Handle case where there are no authors but there are references
    if not authors and references:
        for reference in references:
            consolidated_data.append({
                'article_id': article_id,
                'article_title': article_title,
                'author_id': None,
                'author_name': None,
                'cited_article_id': reference
            })

df = pd.DataFrame(consolidated_data)
display(df)


Unnamed: 0,article_id,article_title,author_id,author_name,cited_article_id
0,53e99784b7602d9701f3e133,The relationship between canopy parameters and...,53f45728dabfaec09f209538,Peijuan Wang,
1,53e99784b7602d9701f3e133,The relationship between canopy parameters and...,5601754345cedb3395e59457,Jiahua Zhang,
2,53e99784b7602d9701f3e133,The relationship between canopy parameters and...,53f38438dabfae4b34a08928,Donghui Xie,
3,53e99784b7602d9701f3e133,The relationship between canopy parameters and...,5601754345cedb3395e5945a,Yanyan Xu,
4,53e99784b7602d9701f3e133,The relationship between canopy parameters and...,53f43d25dabfaeecd6995149,Yun Xu,
...,...,...,...,...,...
139,53e99784b7602d9701f3f95a,FACETS,53f45ab1dabfaeb22f511541,Richard Groebner,
140,53e99784b7602d9701f3f95a,FACETS,53f4359fdabfaeee229a4700,Satish Balay,
141,53e99784b7602d9701f3f95a,FACETS,53f44990dabfaee4dc7ddf84,Lois C. McInnes,
142,53e99784b7602d9701f3f95a,FACETS,562cb37445cedb3398c9befe,Hong Zhang,


In [None]:
import pandas as pd
import ijson
import re

filename = 'dblpv13.json'

# Function to preprocess JSON lines
def corrected_json_content(file):
    corrected_lines = list(corrected_json_lines(file))
    return '\n'.join(corrected_lines)

def corrected_json_lines(file):
    for line in file:
        yield re.sub(r'NumberInt\((\d+)\)', r'\1', line)

# Function to process an individual article from the JSON file
def process_article(article):
    data_list = []
    article_id = article['_id']
    article_title = article['title']
    authors = article.get('authors', [])
    references = article.get('references', [])

    for author in authors:
        author_name = author.get('name', None)  # Use None if 'name' key is missing
        author_id = author.get('_id', None)  # Use None if '_id' key is missing

        # If no references, append article and author details only
        if not references:
            data_list.append({
                'article_id': article_id,
                'article_title': article_title,
                'author_id': author_id,
                'author_name': author_name,
                'cited_article_id': None
            })
        else:
            for reference in references:
                data_list.append({
                    'article_id': article_id,
                    'article_title': article_title,
                    'author_id': author_id,
                    'author_name': author_name,
                    'cited_article_id': reference
                })

    # Handle case where there are no authors but there are references
    if not authors and references:
        for reference in references:
            data_list.append({
                'article_id': article_id,
                'article_title': article_title,
                'author_id': None,
                'author_name': None,
                'cited_article_id': reference
            })

    return data_list

# Iteratively read, preprocess, and process the JSON file
consolidated_data = []
with open(filename, 'r') as file:
    corrected_content = corrected_json_content(file)
    articles = ijson.items(corrected_content, 'item')
    for article in articles:
        # consolidated_data.extend(process_article(article))
        print(article)

# df = pd.DataFrame(consolidated_data)
# display(df)


KeyboardInterrupt: 

In [None]:
import pandas as pd
import ijson
import re
from neo4j import GraphDatabase
import gc

filename = 'dblpv13.json'
uri = "bolt://localhost:7687"
driver = GraphDatabase.driver(uri, auth=("neo4j", "testtest"))

# Function to preprocess JSON lines
def corrected_json_content(file):
    corrected_lines = list(corrected_json_lines(file))
    return '\n'.join(corrected_lines)

def corrected_json_lines(file):
    for line in file:
        yield re.sub(r'NumberInt\((\d+)\)', r'\1', line)

# Function to process an individual article from the JSON file
def process_article(article):
    data_list = []
    article_id = article['_id']
    article_title = article['title']
    authors = article.get('authors', [])
    references = article.get('references', [])

    for author in authors:
        author_name = author.get('name', None)
        author_id = author.get('_id', None)

        # If no references, append article and author details only
        if not references:
            data_list.append({
                'article_id': article_id,
                'article_title': article_title,
                'author_id': author_id,
                'author_name': author_name,
                'cited_article_id': None
            })
        else:
            for reference in references:
                data_list.append({
                    'article_id': article_id,
                    'article_title': article_title,
                    'author_id': author_id,
                    'author_name': author_name,
                    'cited_article_id': reference
                })

    # Handle case where there are no authors but there are references
    if not authors and references:
        for reference in references:
            data_list.append({
                'article_id': article_id,
                'article_title': article_title,
                'author_id': None,
                'author_name': None,
                'cited_article_id': reference
            })

    return data_list

def add_article(tx, article_id, title, authors, cited_articles):
    # Merge the current ARTICLE node with its title and _id properties
    tx.run("MERGE (a:Article {_id: $id}) SET a.title = $title", id=article_id, title=title)
    
    for author in authors:
        if author:  
            # Merge the AUTHOR node with its name and _id properties
            tx.run("MERGE (a:Author {_id: $author_id, name: $name})", author_id=author["_id"], name=author["name"])
            # Merge the AUTHORED relationship between AUTHOR and ARTICLE
            tx.run("""
                MATCH (a:Author {_id: $author_id}), (b:Article {_id: $article_id})
                MERGE (a)-[:AUTHORED]->(b)
            """, author_id=author["_id"], article_id=article_id)
            
    for cited_article_id in cited_articles:
        tx.run("""
            MERGE (a:Article {_id: $article_id})
            MERGE (b:Article {_id: $cited_article_id})
            MERGE (a)-[:CITES]->(b)
        """, article_id=article_id, cited_article_id=cited_article_id)



def send_chunk_to_neo4j(chunk):
    with driver.session() as session:
        for _, row in chunk.iterrows():
            if row['author_name'] and row['author_id']:
                authors = [{"name": row['author_name'], "_id": row['author_id']}]
                if row['cited_article_id']:  # check if there's a valid cited_article_id
                    session.execute_write(add_article, row['article_id'], row['article_title'], authors, [row['cited_article_id']])
                else:
                    session.execute_write(add_article, row['article_id'], row['article_title'], authors, [])

chunk_size = 10  # adjust this based on your memory and performance needs
buffered_data = []

with open(filename, 'r') as file:
    articles = ijson.items(corrected_json_content(file), 'item')
    for article in articles:
        buffered_data.extend(process_article(article))
        if len(buffered_data) >= chunk_size:
            chunk_df = pd.DataFrame(buffered_data)
            send_chunk_to_neo4j(chunk_df)
            buffered_data = []  # reset the buffer
            del chunk_df  # delete the DataFrame
            gc.collect()  # collect garbage

# Send remaining buffered data (if any)
if buffered_data:
    chunk_df = pd.DataFrame(buffered_data)
    send_chunk_to_neo4j(chunk_df)
    del chunk_df  # delete the DataFrame
    gc.collect()  # collect garbage



KeyboardInterrupt: 

In [None]:
import ijson
import re
from neo4j import GraphDatabase

filename = 'dblpv13.json'
uri = "bolt://localhost:7687"
driver = GraphDatabase.driver(uri, auth=("neo4j", "testtest"))

# Function to preprocess JSON lines
def corrected_json_content(file):
    corrected_lines = list(corrected_json_lines(file))
    return '\n'.join(corrected_lines)

def corrected_json_lines(file):
    for line in file:
        yield re.sub(r'NumberInt\((\d+)\)', r'\1', line)

# Function to process an individual article from the JSON file
def process_article(article):
    data_list = []
    article_id = article['_id']
    article_title = article['title']
    authors = article.get('authors', [])
    references = article.get('references', [])

    for author in authors:
        author_name = author.get('name', None)
        author_id = author.get('_id', None)

        # If no references, append article and author details only
        if not references:
            data_list.append({
                'article_id': article_id,
                'article_title': article_title,
                'author_id': author_id,
                'author_name': author_name,
                'cited_article_id': None
            })
        else:
            for reference in references:
                data_list.append({
                    'article_id': article_id,
                    'article_title': article_title,
                    'author_id': author_id,
                    'author_name': author_name,
                    'cited_article_id': reference
                })

    # Handle case where there are no authors but there are references
    if not authors and references:
        for reference in references:
            data_list.append({
                'article_id': article_id,
                'article_title': article_title,
                'author_id': None,
                'author_name': None,
                'cited_article_id': reference
            })

    return data_list

def add_article(tx, article_id, title, authors, cited_articles):
    # Merge the current ARTICLE node with its title and _id properties
    tx.run("MERGE (a:Article {_id: $id}) SET a.title = $title", id=article_id, title=title)
    
    for author in authors:
        if author:  
            # Merge the AUTHOR node with its name and _id properties
            tx.run("MERGE (a:Author {_id: $author_id, name: $name})", author_id=author["_id"], name=author["name"])
            # Merge the AUTHORED relationship between AUTHOR and ARTICLE
            tx.run("""
                MATCH (a:Author {_id: $author_id}), (b:Article {_id: $article_id})
                MERGE (a)-[:AUTHORED]->(b)
            """, author_id=author["_id"], article_id=article_id)
            
    for cited_article_id in cited_articles:
        tx.run("""
            MERGE (a:Article {_id: $article_id})
            MERGE (b:Article {_id: $cited_article_id})
            MERGE (a)-[:CITES]->(b)
        """, article_id=article_id, cited_article_id=cited_article_id)

def articles_generator(filename):
    with open(filename, 'r') as file:
        articles = ijson.items(corrected_json_content(file), 'item')
        for article in articles:
            yield article

def process_and_send_to_neo4j(article):
    with driver.session() as session:
        data_list = process_article(article)
        for data in data_list:
            if data['author_name'] and data['author_id']:
                authors = [{"name": data['author_name'], "_id": data['author_id']}]
                if data['cited_article_id']:
                    session.execute_write(add_article, data['article_id'], data['article_title'], authors, [data['cited_article_id']])
                else:
                    session.execute_write(add_article, data['article_id'], data['article_title'], authors, [])

for article in articles_generator(filename):
    process_and_send_to_neo4j(article)


In [None]:
%load_ext memory_profiler


In [None]:
import ijson
import re
from neo4j import AsyncGraphDatabase
import datetime
import asyncio
import gc

filename = 'biggertest.json'
uri = "bolt://localhost:7687"
BATCH_SIZE = 5  # Adjust based on memory limits and performance
semaphore = asyncio.Semaphore(2)  # Limit concurrent tasks

async def async_session():
    driver = AsyncGraphDatabase.driver(uri, auth=("neo4j", "testtest"))
    return driver.async_session(database="neo4j")

# Function to correct JSON lines on-the-fly
def corrected_json_lines(file):
    for line in file:
        yield re.sub(r'NumberInt\((\d+)\)', r'\1', line)

# Function to process an individual article from the JSON file
async def process_article(article):
    article_id = article['_id']
    article_title = article['title']
    authors = article.get('authors', [])
    references = article.get('references', [])

    async with semaphore, await async_session() as session:
        for author in authors:
            if references:
                for reference in references:
                    await session.write_transaction(add_article, article_id, article_title, author, reference)
            else:
                await session.write_transaction(add_article, article_id, article_title, author)

        if not authors and references:
            for reference in references:
                await session.write_transaction(add_article, article_id, article_title, None, reference)
        elif not references:
            await session.write_transaction(add_article, article_id, article_title)

async def add_article(tx, article_id, title, author=None, cited_article_id=None):
    if title:
        tx.run("MERGE (a:Article {_id: $id}) SET a.title = $title", id=article_id, title=title)
    else:
        tx.run("MERGE (a:Article {_id: $id})", id=article_id)

    if author:
        tx.run("MERGE (a:Author {_id: $author_id, name: $name})", author_id=author['_id'], name=author['name'])
        tx.run("""
            MATCH (a:Author {_id: $author_id}), (b:Article {_id: $article_id})
            MERGE (a)-[:AUTHORED]->(b)
        """, author_id=author['_id'], article_id=article_id)

    if cited_article_id:
        tx.run("""
            MERGE (a:Article {_id: $article_id})
            MERGE (b:Article {_id: $cited_article_id})
            MERGE (a)-[:CITES]->(b)
        """, article_id=article_id, cited_article_id=cited_article_id)

def articles_generator(filename):
    with open(filename, 'r') as file:
        for article in ijson.items(file, 'item', use_float=True):
            yield article

async def process_and_send_to_neo4j(articles_batch):
    tasks = [process_article(article) for article in articles_batch]
    await asyncio.gather(*tasks)

async def main():
    # start
    start_time = datetime.datetime.now()
    print(f"Processing started at {start_time}")

    # process articles in batches
    articles_batch = []
    tasks = []

    for article in articles_generator(filename):
        articles_batch.append(article)
        if len(articles_batch) == BATCH_SIZE:
            tasks.append(asyncio.create_task(process_and_send_to_neo4j(articles_batch)))
            articles_batch = []

    # process any remaining articles
    if articles_batch:
        tasks.append(asyncio.create_task(process_and_send_to_neo4j(articles_batch)))

    await asyncio.gather(*tasks)

    # end
    end_time = datetime.datetime.now()
    elapsed_time = end_time - start_time
    print(f"Processing finished at {end_time}. Total time taken: {elapsed_time}")

def run_main():
    asyncio.run(main())

%memit run_main()

# Write a file to indicate that the data has been imported
with open('/tmp/data_imported', 'w') as f:
    f.write('Data imported on {}\n'.format(datetime.datetime.now()))


In [None]:
import nest_asyncio
nest_asyncio.apply()
import asyncio
from neo4j import AsyncGraphDatabase

uri = "bolt://localhost:7687"
auth = ("neo4j", "testtest")

async def test_connection():
    driver = AsyncGraphDatabase.driver(uri, auth=auth)
    async with driver.session() as session: 
        result = await session.run("MATCH (n) RETURN count(n) AS node_count")
        record = await result.single()  # Await here
        count = record["node_count"]
        print(f"Number of nodes in the database: {count}")

    await driver.close()

# Run the test
await test_connection()


Number of nodes in the database: 158


In [None]:
import ijson
import re
from neo4j import GraphDatabase
import datetime
import asyncio
import gc

filename = 'biggertest.json'
uri = "bolt://localhost:7687"
BATCH_SIZE = 5  # Adjust based on memory limits and performance
semaphore = asyncio.Semaphore(2)  # Limit concurrent tasks

# Function to correct JSON lines on-the-fly
def corrected_json_lines(file):
    for line in file:
        yield re.sub(r'NumberInt\((\d+)\)', r'\1', line)

# Function to process an individual article from the JSON file
async def process_article(article, driver):
    async with semaphore:
        try:
            article_id = article['_id']
            article_title = article['title']
            authors = article.get('authors', [])
            references = article.get('references', [])

            with driver.session() as session:
                with semaphore:
                    for author in authors:
                        if references:
                            for reference in references:
                                session.write_transaction(add_article, article_id, article_title, author, reference)
                        else:
                            session.write_transaction(add_article, article_id, article_title, author)

                    if not authors and references:
                        for reference in references:
                            session.write_transaction(add_article, article_id, article_title, None, reference)
                    elif not references:
                        session.write_transaction(add_article, article_id, article_title)
        except Exception as e:
            print(f"Error processing article {article['_id']}: {e}")

async def add_article(tx, article_id, title, author=None, cited_article_id=None):
    if title:
        tx.run("MERGE (a:Article {_id: $id}) SET a.title = $title", id=article_id, title=title)
    else:
        tx.run("MERGE (a:Article {_id: $id})", id=article_id)

    if author:
        tx.run("MERGE (a:Author {_id: $author_id, name: $name})", author_id=author['_id'], name=author['name'])
        tx.run("""
            MATCH (a:Author {_id: $author_id}), (b:Article {_id: $article_id})
            MERGE (a)-[:AUTHORED]->(b)
        """, author_id=author['_id'], article_id=article_id)

    if cited_article_id:
        tx.run("""
            MERGE (a:Article {_id: $article_id})
            MERGE (b:Article {_id: $cited_article_id})
            MERGE (a)-[:CITES]->(b)
        """, article_id=article_id, cited_article_id=cited_article_id)

def articles_generator(filename):
    with open(filename, 'r') as file:
        for article in ijson.items(file, 'item', use_float=True):
            yield article

async def process_and_send_to_neo4j(articles_batch, driver):
    tasks = [process_article(article, driver) for article in articles_batch]
    await asyncio.gather(*tasks)

async def main():
    driver = GraphDatabase.driver(uri, auth=("neo4j", "testtest"))
    start_time = datetime.datetime.now()
    print(f"Processing started at {start_time}")

    articles_batch = []
    tasks = []

    for article in articles_generator(filename):
        articles_batch.append(article)
        if len(articles_batch) == BATCH_SIZE:
            tasks.append(asyncio.create_task(process_and_send_to_neo4j(articles_batch, driver)))
            articles_batch = []
            gc.collect()

    if articles_batch:
        tasks.append(asyncio.create_task(process_and_send_to_neo4j(articles_batch, driver)))

    await asyncio.gather(*tasks)
    
    end_time = datetime.datetime.now()
    elapsed_time = end_time - start_time
    print(f"Processing finished at {end_time}. Total time taken: {elapsed_time}")

    driver.close()  # Ensure the driver is closed

def run_main():
    asyncio.run(main())

run_main()


Processing started at 2023-10-24 18:23:34.688560
Error processing article 53e99784b7602d9701f3e3f5: 'Semaphore' object does not support the context manager protocol
Error processing article 53e99784b7602d9701f3e133: 'Semaphore' object does not support the context manager protocol
Error processing article 53e99784b7602d9701f3e151: 'Semaphore' object does not support the context manager protocol
Error processing article 53e99784b7602d9701f3e15d: 'Semaphore' object does not support the context manager protocol
Error processing article 53e99784b7602d9701f3e161: 'Semaphore' object does not support the context manager protocol
Error processing article 53e99784b7602d9701f3e162: 'Semaphore' object does not support the context manager protocol
Error processing article 53e99784b7602d9701f3e165: 'Semaphore' object does not support the context manager protocol
Error processing article 53e99784b7602d9701f3e922: 'Semaphore' object does not support the context manager protocol
Error processing articl

In [11]:
import ijson

filename = 'dblpv13.json'

def process_large_json(file_path):
    with open(file_path, mode='r', encoding='utf-8') as file:
        parser = ijson.parse(file)
        for prefix, event, value in parser:
            if (prefix.endswith('.title') or 
                prefix.endswith('.authors.item._id') or 
                prefix.endswith('.authors.item.name') or 
                prefix.endswith('.references.item') or 
                prefix.endswith('._id')):
                print(f'{prefix}: {value}')

# Call the function with the path to your JSON file
process_large_json(filename)



UnexpectedSymbol: Unexpected symbol 'N' at 103

In [14]:
import json
import re

# Assuming data_str is a string containing your JSON data
data_str = '''
{
    "age": "NumberInt(25)",
    "other": "some text (with parentheses)"
}
'''

def clean_number_int(data_str):
    return re.sub(r'NumberInt\((\d+)\)', r'\1', data_str)

cleaned_data_str = clean_number_int(data_str)
data = json.loads(cleaned_data_str)

# Now data is a clean dictionary
print(data)  # Output: {'age': '25', 'other': 'some text (with parentheses)'}


{'age': '25', 'other': 'some text (with parentheses)'}


In [23]:
import ijson
import re
import io

# Assuming data_str is a string containing your JSON data
data_str = '''
{
    "age": "NumberInt(25)",
    "other": "some text (with parentheses)"
}
'''

def clean_number_int(data_str):
    return re.sub(r'NumberInt\((\d+)\)', r'\1', data_str)

# Clean the JSON data string
cleaned_data_str = clean_number_int(data_str)

# Convert the cleaned JSON string to a StringIO object
cleaned_data_io = io.StringIO(cleaned_data_str)

# Parse the cleaned JSON data with ijson
parser = ijson.parse(cleaned_data_io)
for prefix, event, value in parser:
    if prefix:
        print(f'{prefix} : {value}')


age : 25
other : some text (with parentheses)


In [31]:
import ijson
import bson
import json

def stream_mongo_json(file_path):
    parser = ijson.parse(open(file_path, 'r', encoding='utf-8'))
    buffer = ""
    for prefix, event, value in parser:
        if event in ('start_map', 'start_array'):
            buffer += '{' if event == 'start_map' else '['
        elif event in ('end_map', 'end_array'):
            buffer += '}' if event == 'end_map' else ']'
            if prefix == '':
                # When we close the main object/array, parse and yield
                yield bson.json_util.loads(buffer)
                buffer = ""
        elif event == 'map_key':
            buffer += json.dumps(value) + ':'
        else:
            buffer += json.dumps(value) + ','
        buffer = buffer.rstrip(',')

file_path = 'dblpv13.json'
for item in stream_mongo_json(file_path):
    print(item)  # Each item here is a Python dictionary


UnexpectedSymbol: Unexpected symbol 'N' at 103

In [29]:
import ijson
import re

filename = 'dblpv13.json'

class PreprocessedFile:
    def __init__(self, file):
        self.file = file
        self.buffer = b""

    def read(self, size=-1):
        while True:
            chunk = self.file.read(size)
            if not chunk:
                if self.buffer:
                    result, self.buffer = self.buffer, b""
                    return result
                else:
                    return b""
            self.buffer += chunk
            try:
                modified_buffer = re.sub(b'NumberInt\((\d+)\)', b'\\1', self.buffer)
                modified_buffer = re.sub(b'NumberLong\((\d+)\)', b'\\1', modified_buffer)
                result, self.buffer = modified_buffer, b""
                return result
            except re.error:
                pass


# Open the JSON filename to read it
with open(filename, 'rb') as file:
    # Create a PreprocessedFile object
    preprocessed_file = PreprocessedFile(file)
    
    # Use ijson to parse the preprocessed file
    data = ijson.items(preprocessed_file, '')
    for item in data:
        title = item.get('title')
        authors = [author['name'] for author in item.get('authors', [])]
        venue_name = item.get('venue', {}).get('name_d')
        year = item.get('year')
        
        print("Title:", title)
        print("Authors:", authors)
        print("Venue Name:", venue_name)
        print("Year:", year)
        print()



UnexpectedSymbol: Unexpected symbol 'N' at 7581939

In [None]:
import ijson
import re

filename = 'dblpv13.json'

def preprocess_file(file):
    for line in file:
        # Use a raw byte string literal for the pattern to avoid SyntaxWarning
        modified_line = re.sub(br'NumberInt\((\d+)\)', br'\1', line)
        yield modified_line

try:
    with open(filename, 'rb') as f:
        parser = ijson.parse(preprocess_file(f))
        for prefix, event, value in parser:  # Unpack three values here
            print(prefix, event, value)
            if event == 'string':
                print('Got string:', value)
except Exception as e:
    import traceback
    print("An unexpected error occurred:", e)
    traceback.print_exc()  # This will print the full traceback
except ValueError as e:
    print("Value Error:", e)




 91 10
An unexpected error occurred: too many values to unpack (expected 2)


Traceback (most recent call last):
  File "C:\Users\denis.iglesias\AppData\Local\Temp\ipykernel_40140\1481585169.py", line 15, in <module>
    for prefix, event, value in parser:  # Unpack three values here
  File "c:\Users\denis.iglesias\Miniconda3\envs\neo4j\Lib\site-packages\ijson\utils.py", line 55, in coros2gen
    f.send(value)
  File "c:\Users\denis.iglesias\Miniconda3\envs\neo4j\Lib\site-packages\ijson\common.py", line 72, in parse_basecoro
    event, value = yield
    ^^^^^^^^^^^^
ValueError: too many values to unpack (expected 2)


In [None]:
import ijson
import re
from neo4j import GraphDatabase
import datetime

filename1 = 'biggertest.json'
filename2 = 'dblpv13.json'
uri = "bolt://localhost:7687"
driver = GraphDatabase.driver(uri, auth=("neo4j", "testtest"))

def corrected_json_string(file):
    return ''.join(re.sub(r'NumberInt\((\d+)\)', r'\1', line) for line in file)


def articles_generator(filename):
    with open(filename, 'r', encoding='utf-8') as file:
        stream = corrected_json_string(file)
        print(stream[:1000])  # printing the first 1000 characters
        for article in ijson.items(stream, 'item', use_float=True):
            print(article)




articles_generator(filename2)

KeyboardInterrupt: 

In [None]:
import ijson
import re
import io

filename1 = 'biggertest.json'

def clean_number_int(data_str):
    return re.sub(r'NumberInt\((\d+)\)', r'\1', data_str)

# Read the JSON data from the file
with open(filename1, 'r', encoding='utf-8') as file:
    data_str = file.read()

# Clean the JSON data string
cleaned_data_str = clean_number_int(data_str)

# Convert the cleaned JSON string to a StringIO object
cleaned_data_io = io.StringIO(cleaned_data_str)

# Parse the cleaned JSON data with ijson
objects = ijson.items(cleaned_data_io, 'item')

# Collect the data into a list of dictionaries
data_list = [obj for obj in objects]

# Print the data in a structured format
for i, item in enumerate(data_list, 1):
    print(f'Item {i}:')
    for key, value in item.items():
        print(f'  {key}: {value}')
    print()  # Print a blank line between items


In [32]:
import ijson
import re
import io

filename1 = 'biggertest.json'

def clean_number_int(data_str):
    return re.sub(r'NumberInt\((\d+)\)', r'\1', data_str)

# Read the JSON data from the file
with open(filename1, 'r', encoding='utf-8') as file:
    data_str = file.read()

# Clean the JSON data string
cleaned_data_str = clean_number_int(data_str)

# Convert the cleaned JSON string to a StringIO object
cleaned_data_io = io.StringIO(cleaned_data_str)

# Parse the cleaned JSON data with ijson
objects = ijson.items(cleaned_data_io, 'item')

# Collect the data into a list of dictionaries, filtering for desired fields
filtered_data_list = []
for obj in objects:
    filtered_obj = {
        '_id': obj.get('_id', None),
        'title': obj.get('title', None),
        'authors': [
            {'_id': author.get('_id', None), 'name': author.get('name', None)}
            for author in obj.get('authors', [])
        ],
        'references': obj.get('references', [])
    }
    filtered_data_list.append(filtered_obj)

# Print the data in a structured format
for i, item in enumerate(filtered_data_list, 1):
    print(f'Item {i}:')
    for key, value in item.items():
        print(f'  {key}: {value}')
    print()  # Print a blank line between items


Item 1:
  _id: 53e99784b7602d9701f3e3f5
  title: 3GIO.
  authors: []
  references: []

Item 2:
  _id: 53e99784b7602d9701f3e133
  title: The relationship between canopy parameters and spectrum of winter wheat under different irrigations in Hebei Province.
  authors: [{'_id': '53f45728dabfaec09f209538', 'name': 'Peijuan Wang'}, {'_id': '5601754345cedb3395e59457', 'name': 'Jiahua Zhang'}, {'_id': '53f38438dabfae4b34a08928', 'name': 'Donghui Xie'}, {'_id': '5601754345cedb3395e5945a', 'name': 'Yanyan Xu'}, {'_id': '53f43d25dabfaeecd6995149', 'name': 'Yun Xu'}]
  references: []

Item 3:
  _id: 53e99784b7602d9701f3e151
  title: A solution to the problem of touching and broken characters.
  authors: [{'_id': '53f46797dabfaeb22f542630', 'name': 'Jairo Rocha'}, {'_id': '54328883dabfaeb4c6a8a699', 'name': 'Theo Pavlidis'}]
  references: ['53e99cf5b7602d97025ace63', '557e8a7a6fee0fe990caa63d', '53e9a96cb7602d97032c459a', '53e9b929b7602d9704515791', '557e59ebf6678c77ea222447']

Item 4:
  _id: 53e99

In [40]:
import ijson
import re
from neo4j import GraphDatabase

# Your add_article function here
def add_article(tx, article_id, title, author=None, cited_articles=None):
    tx.run(
        "MERGE (a:Article {_id: $id}) "
        "ON CREATE SET a.title = $title "
        "ON MATCH SET a.title = $title",
        id=article_id,
        title=title
    )
    if author:
        author_id = author.get('_id')
        author_name = author.get('name')
        if author_id and author_name:
            tx.run(
                "MERGE (a:Author {_id: $author_id, name: $name})", 
                author_id=author_id, 
                name=author_name
            )
            tx.run("""
                MATCH (a:Author {_id: $author_id}), (b:Article {_id: $article_id})
                MERGE (a)-[:AUTHORED]->(b)
            """, author_id=author_id, article_id=article_id)
    if cited_articles:
        for cited_article in cited_articles:
            cited_article_id = cited_article.get('_id')
            cited_article_title = cited_article.get('title')
            if cited_article_id:
                tx.run(
                    "MERGE (c:Article {_id: $cited_article_id}) "
                    "ON CREATE SET c.title = $cited_article_title "
                    "ON MATCH SET c.title = $cited_article_title",
                    cited_article_id=cited_article_id,
                    cited_article_title=cited_article_title
                )
                tx.run("""
                    MATCH (a:Article {_id: $article_id}), (c:Article {_id: $cited_article_id})
                    MERGE (a)-[:CITES]->(c)
                """, article_id=article_id, cited_article_id=cited_article_id)

def articles_generator(filename):
    with open(filename, 'r') as file:
        for article in ijson.items(file, 'item', use_float=True):
            yield article

def process_article(session, article):
    article_id = article['_id']
    title = article['title']
    for author in article.get('authors', []):
        session.execute_write(add_article, article_id, title, author=author)
    # Create a list of dictionaries for cited articles, assuming titles are not available
    cited_articles = [{'_id': cited_article_id, 'title': None} for cited_article_id in article.get('references', [])]
    # Now pass the cited_articles list to add_article
    session.execute_write(add_article, article_id, title, cited_articles=cited_articles)

def process_and_send_to_neo4j(articles_batch):
    with GraphDatabase.driver("neo4j://localhost:7687", auth=("neo4j", "testtest")) as driver:
        with driver.session() as session:
            for article in articles_batch:
                process_article(session, article)

def main(filename):
    batch_size = 100  # Process articles in batches of 100 (adjust as needed)
    articles_batch = []
    for article in articles_generator(filename):
        articles_batch.append(article)
        if len(articles_batch) >= batch_size:
            process_and_send_to_neo4j(articles_batch)
            articles_batch = []
    if articles_batch:  # Process any remaining articles
        process_and_send_to_neo4j(articles_batch)

# Run the main function with the filename as argument
filename1 = 'biggertest.json'
main(filename1)


In [None]:
import ijson
import re
import io
from neo4j import GraphDatabase

# Your add_article function here
def add_article(tx, article_id, title, authors=None, cited_articles=None):
    tx.run(
        "MERGE (a:Article {_id: $id}) "
        "ON CREATE SET a.title = $title "
        "ON MATCH SET a.title = $title",
        id=article_id,
        title=title
    )
    
    if authors:
        for author in authors:
            author_id = author.get('_id')
            author_name = author.get('name')
            if author_id and author_name:
                tx.run(
                    "MERGE (authorNode:Author {_id: $author_id, name: $name})", 
                    author_id=author_id, 
                    name=author_name
                )
                tx.run("""
                    MATCH (authorNode:Author {_id: $author_id}), (a:Article {_id: $article_id})
                    MERGE (authorNode)-[:AUTHORED]->(a)
                """, author_id=author_id, article_id=article_id)

    if cited_articles:
        for cited_article in cited_articles:
            cited_article_id = cited_article.get('_id')
            cited_article_title = cited_article.get('title')
            if cited_article_id:
                tx.run(
                    "MERGE (c:Article {_id: $cited_article_id}) "
                    "ON CREATE SET c.title = $cited_article_title "
                    "ON MATCH SET c.title = $cited_article_title",
                    cited_article_id=cited_article_id,
                    cited_article_title=cited_article_title
                )
                tx.run("""
                    MATCH (a:Article {_id: $article_id}), (c:Article {_id: $cited_article_id})
                    MERGE (a)-[:CITES]->(c)
                """, article_id=article_id, cited_article_id=cited_article_id)



def articles_generator(filename):
    with open(filename, 'r') as file:
        for article in ijson.items(file, 'item', use_float=True):
            yield article

def process_article(session, article):
    print(article)  # Debug line to print entire article data
    article_id = article['_id']
    title = article['title']
    authors = article.get('authors')  # Get the authors from the 'authors' field
    print(article_id, title, authors)  # Debug line to print the article data

    # Check if 'references' is a key in the article dictionary
    references = article.get('references')
    if references is None:
        references = []
    # Create a list of dictionaries for cited articles, assuming titles are not available
    cited_articles = [{'_id': cited_article_id, 'title': None} for cited_article_id in references]
    #print(cited_articles)  # Debug line to print the cited_articles data

    # Now pass the authors and cited_articles lists to add_article
    session.execute_write(add_article, article_id, title, authors=authors, cited_articles=cited_articles)


def process_and_send_to_neo4j(articles_batch):
    with GraphDatabase.driver("neo4j://localhost:7687", auth=("neo4j", "testtest")) as driver:
        with driver.session() as session:
            for article in articles_batch:
                process_article(session, article)

class PreprocessedFile:
    def __init__(self, filename):
        self.generator = preprocess_json(filename)
        self.buffer = ''

    def read(self, size=-1):
        while size < 0 or len(self.buffer) < size:
            try:
                self.buffer += next(self.generator)
            except StopIteration:
                # End of generator, return what's left in the buffer
                break
        if size < 0:
            result, self.buffer = self.buffer, ''
        else:
            result, self.buffer = self.buffer[:size], self.buffer[size:]
        return result

def preprocess_json(filename):
    with open(filename, 'r', encoding='utf-8') as file:
        for line in file:
            line = re.sub(r'NumberInt\((\d+)\)', r'\1', line)
            line = line.replace('NaN', 'null')
            yield line

def get_cleaned_data(filename):
    preprocessed_file = PreprocessedFile(filename)
    return ijson.parse(preprocessed_file)  # pass the custom file-like object to ijson.parse

def main(filename, neo4j_uri, neo4j_user, neo4j_password):
    driver = GraphDatabase.driver(neo4j_uri, auth=(neo4j_user, neo4j_password))
    
    with driver.session() as session:
        articles_batch = []  # Initialize an empty batch
        batch_size = 100  # Adjust batch size as needed
        key = None  # Initialize key to None before the loop
        article = {}  # Initialize article to an empty dictionary before the loop
        current_list = None  # Initialize current_list to None before the loop
        current_dict = None  # Initialize current_dict to None before the loop
        
        for prefix, event, value in get_cleaned_data(filename):
            if event == 'map_key':
                key = value
            elif key and prefix.endswith(f'.{key}'):
                if event == 'start_array':
                    current_list = []
                elif event == 'end_array':
                    article[key] = current_list
                    current_list = None
                elif current_list is not None:
                    if event == 'start_map':
                        current_dict = {}
                    elif event == 'end_map':
                        current_list.append(current_dict)
                        current_dict = None
                    elif current_dict is not None:
                        current_dict[key] = value
                    else:
                        current_list.append(value)
                else:
                    article[key] = value
            if prefix == 'item' and event == 'end_map':
                articles_batch.append(article)  # Append the article to the batch
                article = {}  # Reset the article for the next item
                if len(articles_batch) >= batch_size:
                    process_and_send_to_neo4j(articles_batch)  # Process the batch
                    articles_batch = []  # Reset the batch
        if articles_batch:  # Process any remaining articles
            process_and_send_to_neo4j(articles_batch)  # Process the remaining batch
    
    driver.close()

# Usage
filename1 = 'biggertest.json'
filename2 = 'dblpv13.json'
neo4j_uri = 'bolt://localhost:7687'
neo4j_user = 'neo4j'
neo4j_password = 'testtest'
main(filename1, neo4j_uri, neo4j_user, neo4j_password)

In [2]:
import ijson
import re
import io
from neo4j import GraphDatabase

def send_articles_to_neo4j(session, article_lists):
    for article in article_lists:
        article_id = article[0]
        article_title = article[1]
        
        # Create the article node
        session.run(
            "MERGE (a:Article {_id: $id}) "
            "ON CREATE SET a.title = $title "
            "ON MATCH SET a.title = $title",
            id=article_id,
            title=article_title
            )

def send_authors_to_neo4j(session, author_lists):
    for author in author_lists:
        article_id = author[0]
        article_title = author[1]
        author_id = author[2]
        author_name = author[3]
        
        if author_id and author_name:
            session.run("""
                MERGE (authorNode:Author {_id: $author_id, name: $name})
                MERGE (a:Article {_id: $article_id, title: $title})
                MERGE (authorNode)-[:AUTHORED]->(a)
            """, author_id=author_id, name=author_name, title=article_title,article_id=article_id)

def send_references_to_neo4j(session, reference_lists):
    for reference in reference_lists:
        article_id = reference[0]
        reference_article_id = reference[1]
        
        if reference_article_id:
            session.run("""
                MERGE (a:Article {_id: $article_id})
                MERGE (c:Article {_id: $reference_article_id})
                MERGE (a)-[:CITES]->(c)
            """, article_id=article_id, reference_article_id=reference_article_id)

def articles_generator(filename):
    with open(filename, 'r') as file:
        for article in ijson.items(file, 'item', use_float=True):
            yield article

def prepare_author_lists(article):
    """
    Prepare a list of lists for authors
    """
    # Check if 'authors' key exists in the article dictionary
    if 'authors' not in article or not article['authors']:
        return []  # Return an empty list if there are no authors
    try:
        # Master list to collect the flattened data
        author_lists = []

        # Iterate through each author in the authors list
        for author in article['authors']:
            # Create a new list with _id, title, authors._id, and authors.name
            author_data = [
                article['_id'],
                article['title'],
                author['_id'],
                author['name']
            ]
            # Append this list to the master list
            author_lists.append(author_data)
        return author_lists
    except:
        pass

def prepare_article_lists(article):
    """
    Create a new list with _id and title
    """
    article_data = [
        article['_id'],
        article['title'],
    ]
    return [article_data]

def prepare_references_lists(article):
    """
    Prepare a list of lists of references
    """
    # Check if 'authors' key exists in the article dictionary
    if 'references' not in article or not article['references']:
        return []  # Return an empty list if there are no authors
    try:
        # Master list to collect the flattened data
        reference_lists = []

        # Iterate through each reference in the references list
        for references in article['references']:
            # Create a new list with _id and references._id
            reference_data = [
                article['_id'],
                references['references']  # Assume each reference has an _id key
            ]
            # Append this list to the master list
            reference_lists.append(reference_data)
        return reference_lists
    except:
        pass

def preprocess_json(filename):
    with open(filename, 'r', encoding='utf-8') as file:
        for line in file:
            line = re.sub(r'NumberInt\((\d+)\)', r'\1', line)
            line = line.replace('NaN', 'null')
            yield line

class PreprocessedFile:
    def __init__(self, filename):
        self.generator = preprocess_json(filename)
        self.buffer = ''

    def read(self, size=-1):
        while size < 0 or len(self.buffer) < size:
            try:
                self.buffer += next(self.generator)
            except StopIteration:
                # End of generator, return what's left in the buffer
                break
        if size < 0:
            result, self.buffer = self.buffer, ''
        else:
            result, self.buffer = self.buffer[:size], self.buffer[size:]
        return result

def get_cleaned_data(filename):
    """
    Remove NumberInt and NaN from the JSON file and return a generator of objects
    """
    preprocessed_file = PreprocessedFile(filename)
    return ijson.parse(preprocessed_file)  # pass the custom file-like object to ijson.parse

def process_object(objects):
    """
    Process a single object from the JSON file
    """
    current_item = {}
    authors_list = []  # Initialize an empty list to collect authors
    references_list = []  # Initialize an empty list to collect references
    inside_authors = False  # A flag to check if we are processing the authors field
    inside_references = False  # A flag to check if we are processing the references field
    for _, event, value in objects:
        if event == 'map_key':
            current_key = value
            if current_key == 'authors':
                inside_authors = True  # Set flag to True when entering authors field
            elif current_key == 'references':
                inside_references = True  # Set flag to True when entering references field
            else:
                # Reset flags and assign lists to current_item when exiting authors or references fields
                if inside_authors:
                    inside_authors = False
                    current_item['authors'] = authors_list
                    authors_list = []  # Reset authors_list for next use
                if inside_references:
                    inside_references = False
                    current_item['references'] = references_list
                    references_list = []  # Reset references_list for next use
        elif event in ('string', 'number'):
            if inside_authors:
                # If inside authors, append a new author dictionary to authors_list
                authors_list.append({current_key: value})
            elif inside_references:
                # If inside references, append a new reference dictionary to references_list
                references_list.append({current_key: value})
            else:
                current_item[current_key] = value
        elif event == 'start_map':
            # If inside authors or references, process the next dictionary
            if inside_authors:
                authors_list.append(process_object(objects))
            elif inside_references:
                references_list.append(process_object(objects))
            else:
                current_item[current_key] = process_object(objects)
        elif event == 'end_map':
            # Ensure authors and references lists are assigned to current_item if they are the last fields
            if inside_authors:
                current_item['authors'] = authors_list
            if inside_references:
                current_item['references'] = references_list
            return current_item

def main(filename, neo4j_uri, neo4j_user, neo4j_password):
    driver = GraphDatabase.driver(neo4j_uri, auth=(neo4j_user, neo4j_password))
    
    with driver.session() as session:
        with open(filename, 'r', encoding='utf-8') as file:
            one_article = get_cleaned_data(filename)
            for prefix, event, value in one_article:
                if event == 'start_map':
                    one_article_dict = process_object(one_article)
                    # add articles to neo4j
                    article_lists = prepare_article_lists(one_article_dict)
                    if article_lists:
                        send_articles_to_neo4j(session, article_lists)
                    # add authors to neo4j
                    author_lists = prepare_author_lists(one_article_dict)
                    if author_lists:
                        send_authors_to_neo4j(session, author_lists)
                    # add references to neo4j
                    references_lists = prepare_references_lists(one_article_dict)
                    if references_lists:
                        send_references_to_neo4j(session, references_lists)

    driver.close()

# Usage
filename1 = 'biggertest.json'
filename2 = 'dblpv13.json'
neo4j_uri = 'bolt://localhost:7687'
neo4j_user = 'neo4j'
neo4j_password = 'testtest'
main(filename2, neo4j_uri, neo4j_user, neo4j_password)

BufferError: Existing exports of data: object cannot be re-sized

In [12]:
# Input dictionary
article_dict = {'_id': '53e99784b7602d9701f3e15d',
                'title': 'Timing yield estimation using statistical static timing analysis',
                'authors': [{'_id': '53f43b03dabfaedce555bf2a', 'name': 'Min Pan'}, {'_id': '53f45ee9dabfaee43ecda842', 'name': 'Chris C. N. Chu'}, {'_id': '53f42e8cdabfaee1c0a4274e', 'name': 'Hai Zhou'}], 'venue': {'_id': '53a72e2020f7420be8c80142', 'name_d': 'International Symposium on Circuits and Systems', 'type': 0, 'raw': 'ISCAS (3)'}, 'year': 2005, 'keywords': 'circuit analysis', 'fos': 'Statistics', 'n_citation': 28, 'page_start': '2461', 'page_end': '2464Vol.3', 'lang': 'en', 'volume': '', 'issue': '', 'issn': '', 'isbn': '0-7803-8834-8', 'doi': '10.1109/ISCAS.2005.1465124', 'pdf': '//static.aminer.org/pdf/PDF/000/423/329/timing_yield_estimation_using_statistical_static_timing_analysis.pdf', 'url': 'http://ieeexplore.ieee.org/xpl/abstractAuthors.jsp?tp=&arnumber=1465124', 'abstract': 'As process variations become a significant problem in deep sub-micron technology, a shift from deterministic static timing analysis to statistical static timing analysis for high-performance circuit designs could reduce the excessive conservatism that is built into current timing design methods. We address the timing yield problem for sequential circuits and propose a statistical approach to handle it. We consider the spatial and path reconvergence correlations between path delays, set-up time and hold time constraints, and clock skew due to process variations. We propose a method to get the timing yield based on the delay distributions of register-to-register paths in the circuit On average, the timing yield results obtained by our approach have average errors of less than 1.0% in comparison with Monte Carlo simulation. Experimental results show that shortest path variations and clock skew due to process variations have considerable impact on circuit timing, which could bias the timing yield results. In addition, the correlation between longest and shortest path delays is not significant.', 
                'references': [{'references': '53e9a8a9b7602d97031f6bb9'},
                               {'references': '599c7b6b601a182cd27360da'},
                               {'references': '53e9b443b7602d9703f3e52b'},
                               {'references': '53e9a6a6b7602d9702fdc57e'}, 
                               {'references': '599c7b6a601a182cd2735703'}, 
                               {'references': '53e9aad9b7602d970345afea'}, 
                               {'references': '5582821f0cf2bf7bae57ac18'}, 
                               {'references': '5e8911859fced0a24bb9a2ba'}, 
                               {'references': '53e9b002b7602d9703a5c932'}]}


# Master list to collect the flattened data
flattened_author = []
flattened_references = []

# Iterate through each author in the authors list
for author in article_dict['authors']:
    # Create a new list with _id, title, authors._id, and authors.name
    author_data = [
        article_dict['_id'],
        article_dict['title'],
        author['_id'],
        author['name']
    ]
    # Append this list to the master list
    flattened_author.append(author_data)  # Corrected here

# Output the flattened data for authors
for data in flattened_author:
    print(data)

# Iterate through each reference in the references list
for references in article_dict['references']:
    # Create a new list with _id, title, and references
    reference_data = [
        article_dict['_id'],
        article_dict['title'],
        references['references']
    ]
    # Append this list to the master list
    flattened_references.append(reference_data)  # Corrected here

# Output the flattened data for references
for data in flattened_references:
    print(data)



['53e99784b7602d9701f3e15d', 'Timing yield estimation using statistical static timing analysis', '53f43b03dabfaedce555bf2a', 'Min Pan']
['53e99784b7602d9701f3e15d', 'Timing yield estimation using statistical static timing analysis', '53f45ee9dabfaee43ecda842', 'Chris C. N. Chu']
['53e99784b7602d9701f3e15d', 'Timing yield estimation using statistical static timing analysis', '53f42e8cdabfaee1c0a4274e', 'Hai Zhou']
['53e99784b7602d9701f3e15d', 'Timing yield estimation using statistical static timing analysis', '53e9a8a9b7602d97031f6bb9']
['53e99784b7602d9701f3e15d', 'Timing yield estimation using statistical static timing analysis', '599c7b6b601a182cd27360da']
['53e99784b7602d9701f3e15d', 'Timing yield estimation using statistical static timing analysis', '53e9b443b7602d9703f3e52b']
['53e99784b7602d9701f3e15d', 'Timing yield estimation using statistical static timing analysis', '53e9a6a6b7602d9702fdc57e']
['53e99784b7602d9701f3e15d', 'Timing yield estimation using statistical static timi

In [None]:
import ijson
import re
from neo4j import GraphDatabase
import datetime
from tqdm import tqdm

def drop_all_constraints_and_indexes(tx):
    """ 
    Debugging function to drop all indexes and constraints in the database 
    """
    # Get the list of all constraints
    result = tx.run("SHOW CONSTRAINTS")
    for record in result:
        # Get the constraint name
        constraint_name = record['name']
        # Drop the constraint
        tx.run(f"DROP CONSTRAINT {constraint_name} IF EXISTS")

    # Now that constraints are dropped, get the list of all indexes
    result = tx.run("SHOW INDEXES")
    for record in result:
        # Get the index name
        index_name = record['name']
        # Drop the index
        tx.run(f"DROP INDEX {index_name} IF EXISTS")



def neo4j_index_constraints(session):
    with session.begin_transaction() as tx:
        # index
        tx.run("CREATE INDEX article_title_index FOR (n:Article) ON (n.title)")
        tx.run("CREATE INDEX author_name_index FOR (a:Author) ON (a.name)")
        # unique
        tx.run("CREATE CONSTRAINT article_id_uniqueness FOR (a:Article) REQUIRE (a._id) IS UNIQUE")
        tx.run("CREATE CONSTRAINT author_id_uniqueness FOR (a:Author) REQUIRE (a._id) IS UNIQUE")
        # Commit the transaction at the end of the batch
        tx.commit()

def send_articles_to_neo4j(session, article_lists):
    with session.begin_transaction() as tx:
        for article in article_lists:
            article_id = article[0]
            article_title = article[1]
            tx.run(
                "MERGE (a:Article {_id: $article_id}) "
                "ON CREATE SET a.title = $article_title "
                "ON MATCH SET a.title = $article_title",
                article_id=article_id,
                article_title=article_title
            )
        tx.commit()  # Commit the transaction at the end of the batch

def send_authors_to_neo4j(session, author_lists):
    with session.begin_transaction() as tx:
        for author in author_lists:
            article_id = author[0]
            article_title = author[1]
            author_id = author[2]
            author_name = author[3]
            
            if author_id and author_name:
                tx.run("""
                    MERGE (a:Article {_id: $article_id})
                    ON CREATE SET a.title = $article_title
                    ON MATCH SET a.title = $article_title
                    MERGE (authorNode:Author {_id: $author_id})
                    SET authorNode.name = $author_name
                    MERGE (authorNode)-[:AUTHORED]->(a)
                """,
                author_id=author_id,
                author_name=author_name,
                article_title=article_title,
                article_id=article_id)
        tx.commit()  # Commit the transaction at the end of the batch

def send_references_to_neo4j(session, reference_lists):
    with session.begin_transaction() as tx:
        for reference in reference_lists:
            article_id = reference[0]
            reference_article_id = reference[1]
            
            if reference_article_id:
                tx.run("""
                    MERGE (a:Article {_id: $article_id})
                    MERGE (c:Article {_id: $reference_article_id})
                    MERGE (a)-[:CITES]->(c)
                """,
                article_id=article_id,
                reference_article_id=reference_article_id)
        tx.commit()  # Commit the transaction at the end of the batch    

def articles_generator(filename):
    with open(filename, 'r') as file:
        for article in ijson.items(file, 'item', use_float=True):
            yield article

def prepare_author_lists(article):
    """
    Prepare a list of lists for authors
    """
    # Check if 'authors' key exists in the article dictionary
    if 'authors' not in article or not article['authors']:
        return []  # Return an empty list if there are no authors
    try:
        # Master list to collect the flattened data
        author_lists = []

        # Iterate through each author in the authors list
        for author in article['authors']:
            # Create a new list with _id, title, authors._id, and authors.name
            author_data = [
                article['_id'],
                article['title'],
                author['_id'],
                author['name']
            ]
            # Append this list to the master list
            author_lists.append(author_data)
        return author_lists
    except:
        pass

def prepare_article_lists(article):
    """
    Create a new list with _id and title
    """
    article_data = [
        article['_id'],
        article['title'],
    ]
    return [article_data]

def prepare_references_lists(article):
    """
    Prepare a list of lists of references
    """
    # Check if 'authors' key exists in the article dictionary
    if 'references' not in article or not article['references']:
        return []  # Return an empty list if there are no authors
    try:
        # Master list to collect the flattened data
        reference_lists = []

        # Iterate through each reference in the references list
        for references in article['references']:
            # Create a new list with _id and references._id
            reference_data = [
                article['_id'],
                references['references']  # Assume each reference has an _id key
            ]
            # Append this list to the master list
            reference_lists.append(reference_data)
        return reference_lists
    except:
        pass

def preprocess_json(filename):
    with open(filename, 'r', encoding='utf-8') as file:
        for line in file:
            line = re.sub(r'NumberInt\((\d+)\)', r'\1', line)
            line = line.replace('NaN', 'null')
            yield line

class PreprocessedFile:
    def __init__(self, filename):
        self.generator = preprocess_json(filename)
        self.buffer = ''

    def read(self, size=-1):
        while size < 0 or len(self.buffer) < size:
            try:
                self.buffer += next(self.generator)
            except StopIteration:
                # End of generator, return what's left in the buffer
                break
        if size < 0:
            result, self.buffer = self.buffer, ''
        else:
            result, self.buffer = self.buffer[:size], self.buffer[size:]
        return result

def get_cleaned_data(filename):
    """
    Remove NumberInt and NaN from the JSON file and return a generator of objects
    """
    preprocessed_file = PreprocessedFile(filename)
    return ijson.parse(preprocessed_file)  # pass the custom file-like object to ijson.parse

def process_object(objects):
    """
    Process a single object from the JSON file
    """
    current_item = {}
    authors_list = []  # Initialize an empty list to collect authors
    references_list = []  # Initialize an empty list to collect references
    inside_authors = False  # A flag to check if we are processing the authors field
    inside_references = False  # A flag to check if we are processing the references field
    for _, event, value in objects:
        if event == 'map_key':
            current_key = value
            if current_key == 'authors':
                inside_authors = True  # Set flag to True when entering authors field
            elif current_key == 'references':
                inside_references = True  # Set flag to True when entering references field
            else:
                # Reset flags and assign lists to current_item when exiting authors or references fields
                if inside_authors:
                    inside_authors = False
                    current_item['authors'] = authors_list
                    authors_list = []  # Reset authors_list for next use
                if inside_references:
                    inside_references = False
                    current_item['references'] = references_list
                    references_list = []  # Reset references_list for next use
        elif event in ('string', 'number'):
            if inside_authors:
                # If inside authors, append a new author dictionary to authors_list
                authors_list.append({current_key: value})
            elif inside_references:
                # If inside references, append a new reference dictionary to references_list
                references_list.append({current_key: value})
            else:
                current_item[current_key] = value
        elif event == 'start_map':
            # If inside authors or references, process the next dictionary
            if inside_authors:
                authors_list.append(process_object(objects))
            elif inside_references:
                references_list.append(process_object(objects))
            else:
                current_item[current_key] = process_object(objects)
        elif event == 'end_map':
            # Ensure authors and references lists are assigned to current_item if they are the last fields
            if inside_authors:
                current_item['authors'] = authors_list
            if inside_references:
                current_item['references'] = references_list
            return current_item

def main(filename, neo4j_uri, neo4j_user, neo4j_password):
    driver = GraphDatabase.driver(neo4j_uri, auth=(neo4j_user, neo4j_password))
    
    with driver.session() as session:
        # debug drop all indexes
        session.execute_write(drop_all_constraints_and_indexes)
        # optimize neo4j
        neo4j_index_constraints(session)
        # process articles
        with open(filename, 'r', encoding='utf-8') as file:
            one_article = get_cleaned_data(filename)
            # Wrap your loop with tqdm, and specify the total count of articles
            for _, event, _ in tqdm(one_article, total=5354309, unit='article'):
                if event == 'start_map':
                    one_article_dict = process_object(one_article)
                    # add authors/articles to neo4j
                    author_lists = prepare_author_lists(one_article_dict)
                    if author_lists:
                        send_authors_to_neo4j(session, author_lists)
                    else:
                        article_lists = prepare_article_lists(one_article_dict)
                        if article_lists:
                            send_articles_to_neo4j(session, article_lists)
                    # add references to neo4j
                    references_lists = prepare_references_lists(one_article_dict)
                    if references_lists:
                        send_references_to_neo4j(session, references_lists)

    driver.close()

# Usage
filename = 'dblpv13.json'
neo4j_uri = "bolt://localhost:7687"
neo4j_user = 'neo4j'
neo4j_password = 'testtest'

# start
start_time = datetime.datetime.now()
print(f"Processing started at {start_time}")

# process articles
main(filename, neo4j_uri, neo4j_user, neo4j_password)

# end
end_time = datetime.datetime.now()  # <-- 3. Get the current time again
elapsed_time = end_time - start_time
print(f"Processing finished at {end_time}. Total time taken: {elapsed_time}")

In [None]:
import ijson
import re
from neo4j import GraphDatabase
import datetime
from tqdm import tqdm
import logging
import hashlib

logging.basicConfig(level=logging.INFO)

def drop_all_constraints_and_indexes(tx):
    """ 
    Debugging function to drop all indexes and constraints in the database 
    """
    # Get the list of all constraints
    result = tx.run("SHOW CONSTRAINTS")
    for record in result:
        # Get the constraint name
        constraint_name = record['name']
        # Drop the constraint
        tx.run(f"DROP CONSTRAINT {constraint_name} IF EXISTS")

    # Now that constraints are dropped, get the list of all indexes
    result = tx.run("SHOW INDEXES")
    for record in result:
        # Get the index name
        index_name = record['name']
        # Drop the index
        tx.run(f"DROP INDEX {index_name} IF EXISTS")

def neo4j_index_constraints(session):
    with session.begin_transaction() as tx:
        # index
        tx.run("CREATE INDEX article_title_index FOR (n:Article) ON (n.title)")
        tx.run("CREATE INDEX author_name_index FOR (a:Author) ON (a.name)")
        # unique
        tx.run("CREATE CONSTRAINT article_id_uniqueness FOR (a:Article) REQUIRE (a._id) IS UNIQUE")
        tx.run("CREATE CONSTRAINT author_id_uniqueness FOR (a:Author) REQUIRE (a._id) IS UNIQUE")
        # Commit the transaction at the end of the batch
        tx.commit()

def send_articles_to_neo4j(session, article_lists):
    # Remove duplicates from article_lists
    print(article_lists)
    article_lists = remove_duplicates(article_lists)
    print(article_lists)
    tx = session.begin_transaction()
    query = """
    MERGE (a:Article {article_id: $article_id})
    SET a.title = $article_title
    """
    for article_info in article_lists:
        try:
            article_id, article_title = article_info  # Unpack article_id and article_title from article_info
        except ValueError as e:
            print(f'Error unpacking article_info: {e}')
            continue  # Skip to next iteration if error occurs

        # Execute the query to create or update the Article node
        tx.run(
            query,
            article_id=article_id,
            article_title=article_title
        )
    tx.commit()

def send_authors_to_neo4j(session, author_lists):
    # Remove duplicates from author_lists
    author_lists = remove_duplicates(author_lists)
    with session.begin_transaction() as tx:
        query = """
                MERGE (a:Article {_id: $article_id})
                ON CREATE SET a.title = $article_title
                ON MATCH SET a.title = $article_title
                MERGE (authorNode:Author {_id: $author_id})
                SET authorNode.name = $author_name
                MERGE (authorNode)-[:AUTHORED]->(a)
                """
        for author in author_lists:
            try:
                article_id = author[0]
                article_title = author[1]
                author_id = author[2]
                author_name = author[3]
                
                if author_id and author_name:
                    tx.run(
                        query,
                        author_id=author_id,
                        author_name=author_name,
                        article_title=article_title,
                        article_id=article_id
                        )
            except IndexError:
                print(f"Unexpected data structure: {author}")
        tx.commit()  # Commit the transaction at the end of the batch

def send_references_to_neo4j(session, reference_lists):
    # Remove duplicates from reference_lists
    reference_lists = remove_duplicates(reference_lists)
    with session.begin_transaction() as tx:
        query = """
                MERGE (a:Article {_id: $article_id})
                MERGE (c:Article {_id: $reference_article_id})
                MERGE (a)-[:CITES]->(c)
                """
        for reference in reference_lists:
            article_id = reference[0]
            reference_article_id = reference[1]
            
            if reference_article_id:
                tx.run(
                    query,
                    article_id=article_id,
                    reference_article_id=reference_article_id)
        tx.commit()  # Commit the transaction at the end of the batch    

def articles_generator(filename):
    with open(filename, 'r') as file:
        for article in ijson.items(file, 'item', use_float=True):
            yield article

def prepare_author_lists(article):
    #print(article)
    """
    Prepare a list of lists for authors
    [article_id, article_title, author_id, author_name]
    """
    # Extract the inner dictionary from the 'article' dictionary
    inner_dict = article.get(None, {})
    if not inner_dict:
        return []  # Return an empty list if there are no authors or the inner dictionary is missing
    
    # Check if 'authors' key exists in the inner dictionary
    if 'authors' not in inner_dict or not inner_dict['authors']:
        return []  # Return an empty list if there are no authors
    
    # Master list to collect the flattened data
    author_lists = []

    # Iterate through each author in the authors list
    for author in inner_dict['authors']:
        # Create a new list with _id, title, authors._id, and authors.name
        author_data = [
            inner_dict['_id'],
            inner_dict['title'],
            author.get('_id', None),  # Use None if '_id' is missing
            author.get('name', 'Unknown Author')  # Use a default value if 'name' is missing
        ]
        # Append this list to the master list
        author_lists.append(author_data)
    
    #print(author_lists)
    return author_lists

def prepare_article_lists(article):
    """
    Create a new list with _id and title
    [article_id, article_title]
    """
    #print(article)
    if article is None:
        return []  # Return an empty list if the article is None or not properly constructed
    try:
        # Access the inner dictionary
        inner_dict = article[None]
        # Create a new list with _id and title
        article_data = [
            inner_dict['_id'],
            inner_dict.get('title', 'Unknown title')  # Use a default value if 'title' is missing
        ]
        #print([article_data])
        return [article_data]
    except KeyError:
        return []  # Handle the case where _id or title is missing in the article

# def prepare_article_no_title_lists(article):
#     if article is None:
#         return []  # Return an empty list if the article is None or not properly constructed
#     try:
#         # Create a new list with _id and a default title
#         article_no_title_data = [
#             article['_id'],
#             "Unknown title"
#         ]
#         return [article_no_title_data]
#     except KeyError:
#         return []  # Handle the case where _id is missing in the article

def prepare_references_lists(article):
    """
    Prepare a list of lists of references
    """
    #print(article)
    if article is None or None not in article:
        return []  # Return an empty list if the article is None or not properly constructed

    try:
        references_data = []
        data = article[None]  # Access the nested dictionary containing the article data
        if 'references' in data:
            references = data['references']
            for reference in references:
                if 'references' in reference:
                    reference_id = reference['references']
                    # Use the '_id' field from the article data as the article_id
                    references_data.append([data['_id'], reference_id])

        #print("Reference list: ", references_data)
        return references_data
    except KeyError:
        return []  # Handle the case where the references structure is invalid

def preprocess_json(filename):
    with open(filename, 'r', encoding='utf-8') as file:
        for line in file:
            line = re.sub(r'NumberInt\((\d+)\)', r'\1', line)
            line = line.replace('NaN', 'null')
            yield line

class PreprocessedFile:
    def __init__(self, filename):
        self.generator = preprocess_json(filename)
        self.buffer = ''

    def read(self, size=-1):
        while size < 0 or len(self.buffer) < size:
            try:
                self.buffer += next(self.generator)
            except StopIteration:
                # End of generator, return what's left in the buffer
                break
        if size < 0:
            result, self.buffer = self.buffer, ''
        else:
            result, self.buffer = self.buffer[:size], self.buffer[size:]
        return result

def get_cleaned_data(filename):
    """
    Remove NumberInt and NaN from the JSON file and return a generator of objects
    """
    preprocessed_file = PreprocessedFile(filename)
    return ijson.parse(preprocessed_file)  # pass the custom file-like object to ijson.parse

def process_object(objects):
    current_item = {}
    authors_list = []
    references_list = []
    inside_authors = False
    inside_references = False
    current_key = None  # Initialize current_key

    # You can also turn objects into an iterator if it isn't already one
    objects_iterator = iter(objects)

    for obj in objects_iterator:
        if isinstance(obj, tuple) and len(obj) == 3:
            _, event, value = obj
        else:
            print(f"Unexpected tuple structure: {obj}")
            continue

        #print(f"Processing event: {event}, current_key: {current_key}, value: {value}")

        if event == 'map_key':
            current_key = value
            if current_key == 'authors':
                inside_authors = True
            elif current_key == 'references':
                inside_references = True
            else:
                # Reset flags and assign lists to current_item when exiting authors or references fields
                if inside_authors:
                    inside_authors = False
                    current_item['authors'] = authors_list
                    authors_list = []  # Reset authors_list for the next use
                if inside_references:
                    inside_references = False
                    current_item['references'] = references_list
                    references_list = []  # Reset references_list for the next use

        elif event in ('string', 'number'):
            if inside_authors:
                # If inside authors, append a new author dictionary to authors_list
                authors_list.append({current_key: value})
            elif inside_references:
                # If inside references, append a new reference dictionary to references_list
                references_list.append({current_key: value})
            else:
                current_item[current_key] = value

        elif event == 'start_map':
            nested_object = process_object(objects_iterator)  # Process the nested object
            if inside_authors:
                authors_list.append(nested_object)
            elif inside_references:
                references_list.append(nested_object)
            else:
                current_item[current_key] = nested_object  # Assign the fully constructed nested object

        elif event == 'end_map':
            if inside_authors:
                current_item['authors'] = authors_list
            if inside_references:
                current_item['references'] = references_list
            return current_item

    return current_item  # Return the processed object

def hash_list_elements(lst):
    # Concatenate elements of the list into a single string
    concatenated_str = ''.join(map(str, lst))
    # Hash the concatenated string
    return hashlib.md5(concatenated_str.encode()).hexdigest()

def process_batch(filename, BATCH_SIZE, prev_author_hash, prev_article_hash, prev_references_hash):
    processed_articles = 0
    author_dict_lists = []
    article_dict_lists = []
    references_dict_lists = []
    
    one_article_batch = iter(get_cleaned_data(filename))  # Convert to an iterator

    while processed_articles < BATCH_SIZE:
        try:
            # Gather events for one article
            events_for_one_article = []
            for event in one_article_batch:
                events_for_one_article.append(event)
                if event[0] == 'item' and event[1] == 'end_map':
                    # End of one article's map
                    break

            # Process if it's the start of an article
            if events_for_one_article and events_for_one_article[0][1] == 'start_map':
                one_article_dict = process_object(iter(events_for_one_article))

                # Add authors/articles to your data structure here
                author_lists = prepare_author_lists(one_article_dict)
                if author_lists:
                    author_dict_lists.extend(author_lists)

                # Add references to your data structure here
                references_lists = prepare_references_lists(one_article_dict)
                if references_lists:
                    references_dict_lists.extend(references_lists)
            
        except StopIteration:
            break

        processed_articles += 1

    # Hash the concatenated elements of the new lists
    new_author_hash = hash_list_elements(author_dict_lists)
    new_article_hash = hash_list_elements(article_dict_lists)
    new_references_hash = hash_list_elements(references_dict_lists)

    # Compare the hashes to check for differences
    if new_author_hash != prev_author_hash:
        # Author lists have different element orders
        prev_author_hash = new_author_hash

    if new_article_hash != prev_article_hash:
        # Article lists have different element orders
        prev_article_hash = new_article_hash

    if new_references_hash != prev_references_hash:
        # References lists have different element orders
        prev_references_hash = new_references_hash

    return author_dict_lists, article_dict_lists, references_dict_lists


def remove_duplicates(lists):
    """
    Remove duplicates from a list of lists
    """
    return [list(item) for item in set(tuple(item) for item in lists)]

# Define variables to store the previously processed lists
def main(filename, neo4j_uri, neo4j_user, neo4j_password, BATCH_SIZE, TOTAL_ARTICLES):
    driver = GraphDatabase.driver(neo4j_uri, auth=(neo4j_user, neo4j_password))
    
    with driver.session() as session:
        # debug drop all indexes
        session.execute_write(drop_all_constraints_and_indexes)
        # debug drop all tables
        session.run("MATCH (n) DETACH DELETE n")
        # optimize Neo4j
        neo4j_index_constraints(session)

        # Initialize tqdm with the total number of articles
        with tqdm(total=TOTAL_ARTICLES, unit=' article') as pbar:
            prev_author_hash = None
            prev_article_hash = None
            prev_references_hash = None

            while pbar.n < TOTAL_ARTICLES:  # pbar.n keeps track of the number of processed articles
                author_dict_lists, article_dict_lists, references_dict_lists = process_batch(filename, BATCH_SIZE, prev_author_hash, prev_article_hash, prev_references_hash)

                # Update the tqdm progress bar
                pbar.update(BATCH_SIZE)

                if author_dict_lists:
                    print(author_dict_lists)
                    send_authors_to_neo4j(session, author_dict_lists)
                    prev_author_hash = hash_list_elements(author_dict_lists)

                if article_dict_lists:
                    send_articles_to_neo4j(session, article_dict_lists)
                    prev_article_hash = hash_list_elements(article_dict_lists)

                if references_dict_lists:
                    send_references_to_neo4j(session, references_dict_lists)
                    prev_references_hash = hash_list_elements(references_dict_lists)

    driver.close()

# Usage
filename = 'dblpv13.json'
neo4j_uri = "bolt://localhost:7687"
neo4j_user = 'neo4j'
neo4j_password = 'testtest'
BATCH_SIZE = 10
TOTAL_ARTICLES = 5354309 

# start
start_time = datetime.datetime.now()
print(f"Processing started at {start_time}")

# process articles
main(filename, neo4j_uri, neo4j_user, neo4j_password, BATCH_SIZE, TOTAL_ARTICLES)

# end
end_time = datetime.datetime.now()
elapsed_time = end_time - start_time
print(f"Processing finished at {end_time}. Total time taken: {elapsed_time}")

In [31]:
import re
from collections import deque
import ijson
import pprint
from neo4j import GraphDatabase
from collections import defaultdict
from tqdm import tqdm
import datetime
import cProfile

def main():
    # JSON 
    def preprocess_line(line):
        #print(f"Processing line: {line}")  # Debugging print
        line = re.sub(r'NumberInt\((\d+)\)', r'\1', line)
        return line.replace('NaN', 'null')

    def preprocess_json(filename):
        with open(filename, 'r', encoding='utf-8') as file:
            for line in file:
                yield preprocess_line(line)

    class PreprocessedFile:
        def __init__(self, filename):
            self.generator = preprocess_json(filename)
            self.buffer = deque()

        def read(self, size=-1):
            while size < 0 or sum(len(line) for line in self.buffer) < size:
                try:
                    self.buffer.append(next(self.generator))
                except StopIteration:
                    # End of generator, return what's left in the buffer
                    break
            if size < 0:
                result = ''.join(self.buffer)
                self.buffer.clear()
            else:
                result_list = []
                remaining_size = size
                while self.buffer and remaining_size > 0:
                    line = self.buffer.popleft()
                    result_list.append(line[:remaining_size])
                    remaining_size -= len(line)
                    if remaining_size < 0:
                        self.buffer.appendleft(line[remaining_size:])
                result = ''.join(result_list)
            return result

    def get_cleaned_data(filename):
        """
        Remove NumberInt and NaN from the JSON file and return a generator of objects
        """
        preprocessed_file = PreprocessedFile(filename)
        return ijson.parse(preprocessed_file, use_float=True)

    def parse_ijson_object(cleaned_data, batch_size):
        batch = []  # This will store the current batch of articles
        article = {}  # Initialize an empty dictionary for the current article
        author = {}  # Temporary storage for author details
        is_in_authors = False  # Flag to track whether we're parsing authors
        current_key = None  # Variable to keep track of the current key
        
        for prefix, event, value in cleaned_data:
            if event == 'map_key':
                current_key = value  # Set the current key
                continue  # Skip to the next iteration
            
            if (prefix, event) == ('item', 'start_map'):
                # Start a new article
                article = {'authors': [], 'references': []}
            elif prefix.startswith('item.authors.item') and event == 'start_map':
                # Start a new author
                is_in_authors = True
                author = {}
            elif prefix.startswith('item.authors.item') and event == 'end_map':
                # Finish the current author and add to the article's authors list
                article['authors'].append(author)
                is_in_authors = False
            elif event in ('string', 'number'):
                if is_in_authors:
                    # We are within an author object
                    author[current_key] = value
                elif prefix == 'item.references.item':
                    # Directly append reference IDs to the references list
                    article['references'].append(value)
                else:
                    # For top-level fields, check if they are not part of the excluded fields
                    if current_key not in ('keywords', 'fos', 'url'):
                        article[current_key] = value
            elif (prefix, event) == ('item', 'end_map'):
                # End of the current article, add to batch
                batch.append(article)
                if len(batch) == batch_size:
                    yield batch
                    batch = []

        # Yield any remaining articles in the last batch
        if batch:
            yield batch

    # prepare lists
    def prepare_data_for_unwind(ijson_articles):
        return [
            {
                "articleId": article['_id'],
                "articleTitle": article.get('title', "Unknown Title"),
                "authorId": author.get('_id', 'Unknown Author ID'),
                "authorName": author.get('name', 'Unknown Author')
            }
            for article in ijson_articles if (article_id := article.get('_id')) and article_id != 'null'
            for authors in ([article.get('authors', [])] if isinstance(article.get('authors', {}), dict) else article.get('authors', []))
            for author in (authors if isinstance(authors, list) and authors else [{'_id': None, 'name': None}])
            if isinstance(author, dict) and author.get('_id') is not None
        ]

    # prepare neo4j
    def neo4j_startup(uri, username, password):
        def drop_all_constraints_and_indexes(tx):
            """ 
            Debugging function to drop all indexes and constraints in the database 
            """
            # Get the list of all constraints
            result = tx.run("SHOW CONSTRAINTS")
            for record in result:
                # Get the constraint name
                constraint_name = record['name']
                # Drop the constraint
                tx.run(f"DROP CONSTRAINT {constraint_name} IF EXISTS")

            # Now that constraints are dropped, get the list of all indexes
            result = tx.run("SHOW INDEXES")
            for record in result:
                # Get the index name
                index_name = record['name']
                # Drop the index
                tx.run(f"DROP INDEX {index_name} IF EXISTS")

        def neo4j_index_constraints(session):
            with session.begin_transaction() as tx:
                # index
                tx.run("CREATE INDEX article_title_index FOR (n:Article) ON (n.title)")
                tx.run("CREATE INDEX author_name_index FOR (a:Author) ON (a.name)")
                # unique
                tx.run("CREATE CONSTRAINT article_id_uniqueness FOR (a:Article) REQUIRE (a._id) IS UNIQUE")
                tx.run("CREATE CONSTRAINT author_id_uniqueness FOR (a:Author) REQUIRE (a._id) IS UNIQUE")
                # Commit the transaction at the end of the batch
                tx.commit()
        
        # Connect to Neo4j
        driver = GraphDatabase.driver(uri, auth=(username, password))
        
        # Start a session and process the data in batches
        with driver.session() as session:
            # debug drop all indexes
            session.execute_write(drop_all_constraints_and_indexes)
            # debug drop all tables
            session.run("MATCH (n) DETACH DELETE n")
            # optimize Neo4j
            neo4j_index_constraints(session)
        
        # Close the driver
        driver.close()

    # Send to neo4j
    def send_authors_to_neo4j(uri, username, password, unwind_list):
        # Function to send a single batch to the database
        def send_batch(tx, batch):
            query = """
            UNWIND $batch AS row
            WITH row WHERE row.articleId IS NOT NULL AND row.authorId IS NOT NULL
            MERGE (a:Article {_id: row.articleId})
                ON CREATE SET a.title = row.articleTitle

            // Handle authors with known IDs and names
            FOREACH(ignoreMe IN CASE WHEN row.authorId IS NOT NULL AND row.authorName <> 'Unknown Author' THEN [1] ELSE [] END |
                MERGE (author:Author {_id: row.authorId})
                    ON CREATE SET author.name = row.authorName
                MERGE (author)-[:AUTHORED]->(a)
            )

            // Handle authors with a name but no ID
            FOREACH(ignoreMe IN CASE WHEN row.authorId IS NULL AND row.authorName <> 'Unknown Author' THEN [1] ELSE [] END |
                CREATE (author:Author {name: row.authorName})
                MERGE (author)-[:AUTHORED]->(a)
            )

            // Handle articles with "Unknown Author"
            FOREACH(ignoreMe IN CASE WHEN row.authorName = 'Unknown Author' THEN [1] ELSE [] END |
                MERGE (unknownAuthor:Author {name: 'Unknown Author'})
                MERGE (unknownAuthor)-[:AUTHORED]->(a)
            )
            """
            tx.run(query, batch=batch)



        # Connect to Neo4j
        driver = GraphDatabase.driver(uri, auth=(username, password))
        
        # Start a session and process the data in batches
        with driver.session() as session:
            # send the data at once
            session.execute_write(send_batch, unwind_list)

        # Close the driver
        driver.close()


    # Usage
    filename = 'petitjson.json'
    neo4j_uri = "bolt://localhost:7687"
    neo4j_user = 'neo4j'
    neo4j_password = 'testtest'
    BATCH_SIZE = 1000
    TOTAL_ARTICLES = 5354309 



    # Neo4j cleanup and optimization
    neo4j_startup(neo4j_uri, neo4j_user, neo4j_password)

    # Parse JSON file and get a generator of cleaned data
    cleaned_data_generator = get_cleaned_data(filename)

    # Wrap your generator with tqdm to create a progress bar
    # The total parameter is optional; it provides an estimate of the total number of iterations (useful for generators)
    progress_bar = tqdm(parse_ijson_object(cleaned_data_generator, BATCH_SIZE), total=TOTAL_ARTICLES//BATCH_SIZE)

    # Process the data in batches
    for articles_batch in progress_bar:
        # Prepare the data for the UNWIND operation
        unwind_list = prepare_data_for_unwind(articles_batch)
        #print(f"Prepared data for batch: {unwind_list}")  # Debugging line

        # Connect to Neo4j and send the author data
        if unwind_list:
            try:
                send_authors_to_neo4j(neo4j_uri, neo4j_user, neo4j_password, unwind_list)
            except Exception as e:
                print(f"An error occurred: {e}")  # Debugging line


if __name__ == "__main__":
    import cProfile
    cProfile.run('main()', 'profile_stats')



  0%|          | 1/5354 [00:00<1:09:42,  1.28it/s]


In [49]:
import simdjson

def get_cleaned_data(filename):
    parser = simdjson.Parser()
    with open(filename, 'rb') as f:
        parsed_json = parser.parse(f.read(), recursive=True)
    return parsed_json

def extract_articles(doc):
    # Initialize an empty list to store the cleaned articles
    cleaned_articles = []

    # Loop through each article in the document
    for article in doc:
        # Skip the article if it has no '_id' or if it has no 'authors' or if authors have no '_id'
        if "_id" not in article or "authors" not in article or not all("_id" in author for author in article["authors"]):
            continue

        # Create a dictionary for the cleaned article
        cleaned_article = {
            "_id": article["_id"],
            "title": article.get("title", "Unknown title"),  # Default title if not present
            # Create the list of authors with checks for '_id' and 'name'
            "authors": [{"_id": author["_id"], "name": author.get("name", "Unknown author")} for author in article["authors"]],
            # Get the references if they exist, otherwise default to None
            "references": article.get("references", None)
        }
        # Add the cleaned article to the list
        cleaned_articles.append(cleaned_article)
    
    return cleaned_articles

def main():
    filename = 'petitjson.json'
    doc = get_cleaned_data(filename)
    articles = extract_articles(doc)
    for article in articles:
        print(article)  # Or process the articles as needed

if __name__ == "__main__":
    main()


{'_id': '53e99784b7602d9701f3e133', 'title': 'The relationship between canopy parameters and spectrum of winter wheat under different irrigations in Hebei Province.', 'authors': [{'_id': '53f45728dabfaec09f209538', 'name': 'Peijuan Wang'}, {'_id': '5601754345cedb3395e59457', 'name': 'Jiahua Zhang'}, {'_id': '53f38438dabfae4b34a08928', 'name': 'Donghui Xie'}, {'_id': '5601754345cedb3395e5945a', 'name': 'Yanyan Xu'}, {'_id': '53f43d25dabfaeecd6995149', 'name': 'Yun Xu'}], 'references': None}
{'_id': '53e99784b7602d9701f3e151', 'title': 'A solution to the problem of touching and broken characters.', 'authors': [{'_id': '53f46797dabfaeb22f542630', 'name': 'Jairo Rocha'}, {'_id': '54328883dabfaeb4c6a8a699', 'name': 'Theo Pavlidis'}], 'references': ['53e99cf5b7602d97025ace63', '557e8a7a6fee0fe990caa63d', '53e9a96cb7602d97032c459a', '53e9b929b7602d9704515791', '557e59ebf6678c77ea222447']}
{'_id': '53e99784b7602d9701f3e15d', 'title': 'Timing yield estimation using statistical static timing ana

In [67]:
import ijson

def process_file(filename):
    with open(filename, 'r', encoding='utf-8') as f:
        # Assuming the JSON data is a single array of objects
        objects = ijson.items(f, 'item')
        for obj in objects:
            print(f'_id: {obj["_id"]}, title: {obj["title"]}')

# Specify the file name
filename = 'dblpv13.json'

# Process the file
process_file(filename)



UnexpectedSymbol: Unexpected symbol 'N' at 103

In [2]:
from collections import deque
import ijson
from neo4j import GraphDatabase
from tqdm.notebook import tqdm
from itertools import islice
import cProfile
from multiprocessing import Pool


def main():
    # # JSON 
    # def preprocess_line(line):
    #     return line.replace('NaN', 'null').replace('NumberInt(', '').replace(')', '')

    # def preprocess_json(filename):
    #     with open(filename, 'r', encoding='utf-8') as file:
    #         for line in file:
    #             yield preprocess_line(line)

    # class PreprocessedFile:
    #     def __init__(self, filename):
    #         self.generator = preprocess_json(filename)
    #         self.buffer = deque()

    #     def read(self, size=-1):
    #         while size < 0 or sum(len(line) for line in self.buffer) < size:
    #             try:
    #                 self.buffer.append(next(self.generator))
    #             except StopIteration:
    #                 # End of generator, return what's left in the buffer
    #                 break
    #         if size < 0:
    #             result = ''.join(self.buffer)
    #             self.buffer.clear()
    #         else:
    #             result_list = []
    #             remaining_size = size
    #             while self.buffer and remaining_size > 0:
    #                 line = self.buffer.popleft()
    #                 result_list.append(line[:remaining_size])
    #                 remaining_size -= len(line)
    #                 if remaining_size < 0:
    #                     self.buffer.appendleft(line[remaining_size:])
    #             result = ''.join(result_list)
    #         return result

    # def get_cleaned_data(filename):
    #     """
    #     Remove NumberInt and NaN from the JSON file and return a generator of objects
    #     """
    #     preprocessed_file = PreprocessedFile(filename)
    #     return ijson.items(preprocessed_file, 'item')

    def preprocess_line(line):
        return line.replace('NaN', 'null').replace('NumberInt(', '').replace(')', '')

    def preprocess_json(filename, buffer_size):
        with open(filename, 'r', encoding='utf-8') as file:
            buffer = ''
            while True:
                chunk = file.read(buffer_size)
                if not chunk:
                    if buffer:
                        yield preprocess_line(buffer)
                    break  # End of file
                buffer += chunk
                while '\n' in buffer:
                    line, buffer = buffer.split('\n', 1)
                    yield preprocess_line(line)

    class PreprocessedFile:
        def __init__(self, filename, buffer_size=65536):  # buffer_size is in bytes
            self.generator = preprocess_json(filename, buffer_size)
            self.buffer = deque()

        def read(self, size=-1):
            while size < 0 or sum(len(line) for line in self.buffer) < size:
                try:
                    self.buffer.append(next(self.generator))
                except StopIteration:
                    break  # End of generator, return what's left in the buffer
            if size < 0:
                result = ''.join(self.buffer)
                self.buffer.clear()
            else:
                result_list = []
                remaining_size = size
                while self.buffer and remaining_size > 0:
                    line = self.buffer.popleft()
                    result_list.append(line[:remaining_size])
                    remaining_size -= len(line)
                    if remaining_size < 0:
                        self.buffer.appendleft(line[remaining_size:])
                result = ''.join(result_list)
            return result

    def get_cleaned_data(filename):
        preprocessed_file = PreprocessedFile(filename)
        return ijson.items(preprocessed_file, 'item')

    # def parse_ijson_object(cleaned_data, batch_size):
    #     def chunked_iterable(iterable, size):
    #         it = iter(iterable)
    #         while chunk := list(islice(it, size)):
    #             yield chunk

    #     def process_articles_chunk(articles_chunk):
    #         authors_batch_chunk = []
    #         references_batch_chunk = []
    #         for article in articles_chunk:
    #             article_id = article.get('_id')
    #             article_title = article.get('title')
    #             if not (article_id and article_title):
    #                 continue  # Skip this article if it doesn't have an id or title

    #             authors = article.get('authors', [])
    #             if authors:
    #                 for author in authors:
    #                     author_id = author.get('_id')
    #                     author_name = author.get('name')
    #                     if author_id and author_name:
    #                         entry = {
    #                             'article': {
    #                                 'article_id': article_id,
    #                                 'article_title': article_title
    #                             },
    #                             'authors': [{
    #                                 '_id': author_id,
    #                                 'name': author_name
    #                             }]
    #                         }
    #                         authors_batch_chunk.append(entry)

    #             # Only process the article for references if there are references
    #             references = article.get('references', [])
    #             if references:
    #                 references_data = {
    #                     'article_id': article['_id'],
    #                     'article_title': article['title'],
    #                     'references': references
    #                 }
    #                 references_batch_chunk.append(references_data)

    #         return authors_batch_chunk, references_batch_chunk

    #     for articles_chunk in chunked_iterable(cleaned_data, batch_size):
    #         articles_authors_batch, articles_references_batch = process_articles_chunk(articles_chunk)

    #         if len(articles_authors_batch) >= batch_size:
    #             yield articles_authors_batch, articles_references_batch
    #             articles_authors_batch = []
    #             articles_references_batch = []

    #     if articles_authors_batch or articles_references_batch:
    #         yield articles_authors_batch, articles_references_batch

    def parse_ijson_object(cleaned_data, batch_size):
        def chunked_iterable(iterable, size):
            it = iter(iterable)
            while chunk := list(islice(it, size)):
                yield chunk

        def trim_article(article):
            trimmed_article = {key: article[key] for key in ['_id', 'title', 'references'] if key in article}
            authors = article.get('authors')
            
            if authors and isinstance(authors, list):
                trimmed_authors = [{'_id': author['_id'], 'name': author['name']} for author in authors if '_id' in author and 'name' in author]
                if trimmed_authors:  # Check if the list is not empty
                    trimmed_article['authors'] = trimmed_authors
                        
            return trimmed_article

        def process_articles_chunk(articles_chunk):
            articles_chunk = list(map(trim_article, articles_chunk))
            authors_batch_chunk = []
            references_batch_chunk = []
            for article in articles_chunk:
                article_id = article.get('_id')
                article_title = article.get('title')
                if not (article_id and article_title):
                    continue  # Skip this article if it doesn't have an id or title

                authors = article.get('authors', [])
                if authors:
                    for author in authors:
                        author_id = author.get('_id')
                        author_name = author.get('name')
                        if author_id and author_name:
                            entry = {
                                'article': {
                                    'article_id': article_id,
                                    'article_title': article_title
                                },
                                'authors': [{
                                    '_id': author_id,
                                    'name': author_name
                                }]
                            }
                            authors_batch_chunk.append(entry)

                # Only process the article for references if there are references
                references = article.get('references', [])
                if references:
                    references_data = {
                        'article_id': article['_id'],
                        'article_title': article['title'],
                        'references': references
                    }
                    references_batch_chunk.append(references_data)

            return authors_batch_chunk, references_batch_chunk

        for articles_chunk in chunked_iterable(cleaned_data, batch_size):
            articles_authors_batch, articles_references_batch = process_articles_chunk(articles_chunk)

            if len(articles_authors_batch) >= batch_size:
                yield articles_authors_batch, articles_references_batch
                articles_authors_batch = []
                articles_references_batch = []

        if articles_authors_batch or articles_references_batch:
            yield articles_authors_batch, articles_references_batch

    # Neo4j
    def neo4j_startup(uri, username, password):
        def drop_all_constraints_and_indexes(tx):
            """ 
            Debugging function to drop all indexes and constraints in the database 
            """
            # Get the list of all constraints
            result = tx.run("SHOW CONSTRAINTS")
            for record in result:
                # Get the constraint name
                constraint_name = record['name']
                # Drop the constraint
                tx.run(f"DROP CONSTRAINT {constraint_name} IF EXISTS")

            # Now that constraints are dropped, get the list of all indexes
            result = tx.run("SHOW INDEXES")
            for record in result:
                # Get the index name
                index_name = record['name']
                # Drop the index
                tx.run(f"DROP INDEX {index_name} IF EXISTS")

        def neo4j_index_constraints(session):
            with session.begin_transaction() as tx:
                # index
                tx.run("CREATE INDEX article_title_index FOR (n:Article) ON (n.title)")
                tx.run("CREATE INDEX author_name_index FOR (a:Author) ON (a.name)")
                # unique
                tx.run("CREATE CONSTRAINT article_id_uniqueness FOR (a:Article) REQUIRE (a._id) IS UNIQUE")
                tx.run("CREATE CONSTRAINT author_id_uniqueness FOR (a:Author) REQUIRE (a._id) IS UNIQUE")
                # Commit the transaction at the end of the batch
                tx.commit()
        
        # Connect to Neo4j
        driver = GraphDatabase.driver(uri, auth=(username, password))
        
        # Start a session and process the data in batches
        with driver.session() as session:
            # debug drop all indexes
            session.run("MATCH (n) DETACH DELETE n")
            session.execute_write(drop_all_constraints_and_indexes)
            # optimize Neo4j
            neo4j_index_constraints(session)
        
        # Close the driver
        driver.close()

    def send_data_to_neo4j(uri, username, password, author_lists, references_lists):
        # Function to send a single batch to the database
        def send_batch_author(tx, authors_batch):
            query = """
            UNWIND $authors_batch AS row
            MERGE (a:Article {_id: row.article.article_id})
                ON CREATE SET a.title = row.article.article_title
            WITH a, row
            UNWIND row.authors AS authorData
            MERGE (author:Author {_id: authorData._id})
                ON CREATE SET author.name = authorData.name
            MERGE (author)-[:AUTHORED]->(a)
            """

            tx.run(query, authors_batch=authors_batch)

        def send_batch_ref(tx, references_batch):
            query = """
            UNWIND $references_batch AS refRow
            MERGE (refArticle:Article {_id: refRow.article_id})
                ON CREATE SET refArticle.title = refRow.article_title
            WITH refArticle, refRow
            UNWIND refRow.references AS reference
            MERGE (referredArticle:Article {_id: reference})
            MERGE (refArticle)-[:CITES]->(referredArticle)
            """

            tx.run(query, references_batch=references_batch)

        # Connect to Neo4j
        driver = GraphDatabase.driver(uri, auth=(username, password))
        
        # Start a session and process the data in batches
        with driver.session() as session:
            if author_lists:
                session.execute_write(send_batch_author, author_lists)
            if references_lists:
                session.execute_write(send_batch_ref, references_lists)

        # Close the driver
        driver.close()

    # Usage
    filename = 'dblpv13.json'
    neo4j_uri = "bolt://localhost:7687"
    neo4j_user = 'neo4j'
    neo4j_password = 'testtest'
    BATCH_SIZE = 10000
    TOTAL_ARTICLES = 5354309

    # Neo4j cleanup and optimization
    neo4j_startup(neo4j_uri, neo4j_user, neo4j_password)

    # Parse JSON file and get a generator of cleaned data
    cleaned_data_generator = get_cleaned_data(filename)
    
    # Estimate the total number of batches for the progress bar
    total_batches = TOTAL_ARTICLES // BATCH_SIZE
    if TOTAL_ARTICLES % BATCH_SIZE != 0:
        total_batches += 1  # Account for a partially-filled final batch

    # Create the generator
    article_batches_generator = parse_ijson_object(cleaned_data_generator, BATCH_SIZE)

    # Wrap the generator with tqdm, specifying the total number of batches
    article_batches_generator = tqdm(article_batches_generator, total=total_batches)

    # Loop through all the batches from the generator with a progress bar
    for articles_authors_batch, articles_references_batch in article_batches_generator:
        #print(articles_references_batch)
        send_data_to_neo4j(neo4j_uri, neo4j_user, neo4j_password, articles_authors_batch, articles_references_batch)

if __name__ == "__main__":
    # run main
    #cProfile.run('main()', filename='profile_output.pstats')
    main()



Transaction failed and will be retried in 0.9632741611738925s (The allocation of an extra 2.0 MiB would use more than the limit 537.6 MiB. Currently using 537.0 MiB. dbms.memory.transaction.total.max threshold reached)


  0%|          | 0/536 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [2]:
from collections import deque
import ijson
from neo4j import GraphDatabase
from tqdm.notebook import tqdm
from itertools import islice
import cProfile
from multiprocessing import Pool


def main():
    # JSON 
    def preprocess_line(line):
        return line.replace('NaN', 'null').replace('NumberInt(', '').replace(')', '')

    def preprocess_json(filename, buffer_size):
        with open(filename, 'r', encoding='utf-8') as file:
            buffer = ''
            while True:
                chunk = file.read(buffer_size)
                if not chunk:
                    if buffer:
                        yield preprocess_line(buffer)
                    break  # End of file
                buffer += chunk
                while '\n' in buffer:
                    line, buffer = buffer.split('\n', 1)
                    yield preprocess_line(line)

    class PreprocessedFile:
        def __init__(self, filename, buffer_size=1048576):  # buffer_size is in bytes
            self.generator = preprocess_json(filename, buffer_size)
            self.buffer = deque()

        def read(self, size=-1):
            while size < 0 or sum(len(line) for line in self.buffer) < size:
                try:
                    self.buffer.append(next(self.generator))
                except StopIteration:
                    break  # End of generator, return what's left in the buffer
            if size < 0:
                result = ''.join(self.buffer)
                self.buffer.clear()
            else:
                result_list = []
                remaining_size = size
                while self.buffer and remaining_size > 0:
                    line = self.buffer.popleft()
                    result_list.append(line[:remaining_size])
                    remaining_size -= len(line)
                    if remaining_size < 0:
                        self.buffer.appendleft(line[remaining_size:])
                result = ''.join(result_list)
            return result

    def get_cleaned_data(filename):
        preprocessed_file = PreprocessedFile(filename)
        return ijson.items(preprocessed_file, 'item')

    def parse_ijson_object(cleaned_data, batch_size):
        def chunked_iterable(iterable, size):
            it = iter(iterable)
            while chunk := list(islice(it, size)):
                yield chunk

        def trim_article(article):
            trimmed_article = {key: article[key] for key in ['_id', 'title', 'references'] if key in article}
            authors = article.get('authors')
            
            if authors and isinstance(authors, list):
                trimmed_authors = [{'_id': author['_id'], 'name': author['name']} for author in authors if '_id' in author and 'name' in author]
                if trimmed_authors:  # Check if the list is not empty
                    trimmed_article['authors'] = trimmed_authors
                        
            return trimmed_article

        def process_articles_chunk(articles_chunk):
            articles_chunk = list(map(trim_article, articles_chunk))
            authors_batch_chunk = []
            references_batch_chunk = []
            for article in articles_chunk:
                article_id = article.get('_id')
                article_title = article.get('title')
                if not (article_id and article_title):
                    continue  # Skip this article if it doesn't have an id or title

                authors = article.get('authors', [])
                if authors:
                    for author in authors:
                        author_id = author.get('_id')
                        author_name = author.get('name')
                        if author_id and author_name:
                            entry = {
                                'article': {
                                    'article_id': article_id,
                                    'article_title': article_title
                                },
                                'authors': [{
                                    '_id': author_id,
                                    'name': author_name
                                }]
                            }
                            authors_batch_chunk.append(entry)

                # Only process the article for references if there are references
                references = article.get('references', [])
                if references:
                    references_data = {
                        'article_id': article['_id'],
                        'article_title': article['title'],
                        'references': references
                    }
                    references_batch_chunk.append(references_data)

            return authors_batch_chunk, references_batch_chunk

        for articles_chunk in chunked_iterable(cleaned_data, batch_size):
            articles_authors_batch, articles_references_batch = process_articles_chunk(articles_chunk)

            if len(articles_authors_batch) >= batch_size:
                yield articles_authors_batch, articles_references_batch
                articles_authors_batch = []
                articles_references_batch = []

        if articles_authors_batch or articles_references_batch:
            yield articles_authors_batch, articles_references_batch

    # Neo4j
    def neo4j_startup(uri, username, password):
        def drop_all_constraints_and_indexes(tx):
            """ 
            Debugging function to drop all indexes and constraints in the database 
            """
            # Get the list of all constraints
            result = tx.run("SHOW CONSTRAINTS")
            for record in result:
                # Get the constraint name
                constraint_name = record['name']
                # Drop the constraint
                tx.run(f"DROP CONSTRAINT {constraint_name} IF EXISTS")

            # Now that constraints are dropped, get the list of all indexes
            result = tx.run("SHOW INDEXES")
            for record in result:
                # Get the index name
                index_name = record['name']
                # Drop the index
                tx.run(f"DROP INDEX {index_name} IF EXISTS")

        def neo4j_index_constraints(session):
            with session.begin_transaction() as tx:
                # index
                tx.run("CREATE INDEX article_title_index FOR (n:Article) ON (n.title)")
                tx.run("CREATE INDEX author_name_index FOR (a:Author) ON (a.name)")
                # unique
                tx.run("CREATE CONSTRAINT article_id_uniqueness FOR (a:Article) REQUIRE (a._id) IS UNIQUE")
                tx.run("CREATE CONSTRAINT author_id_uniqueness FOR (a:Author) REQUIRE (a._id) IS UNIQUE")
                # Commit the transaction at the end of the batch
                tx.commit()
        
        # Connect to Neo4j
        driver = GraphDatabase.driver(uri, auth=(username, password))
        
        # Start a session and process the data in batches
        with driver.session() as session:
            # debug drop all indexes
            session.run("MATCH (n) DETACH DELETE n")
            session.execute_write(drop_all_constraints_and_indexes)
            # optimize Neo4j
            neo4j_index_constraints(session)
        
        # Close the driver
        driver.close()

    def send_data_to_neo4j(uri, username, password, author_lists, references_lists):
        # Function to send a single batch to the database
        def send_batch_author(tx, authors_batch):
            query = """
            UNWIND $authors_batch AS row
            MERGE (a:Article {_id: row.article.article_id})
                ON CREATE SET a.title = row.article.article_title
            WITH a, row
            UNWIND row.authors AS authorData
            MERGE (author:Author {_id: authorData._id})
                ON CREATE SET author.name = authorData.name
            MERGE (author)-[:AUTHORED]->(a)
            """

            tx.run(query, authors_batch=authors_batch)

        def send_batch_ref(tx, references_batch):
            query = """
            UNWIND $references_batch AS refRow
            MERGE (refArticle:Article {_id: refRow.article_id})
                ON CREATE SET refArticle.title = refRow.article_title
            WITH refArticle, refRow
            UNWIND refRow.references AS reference
            MERGE (referredArticle:Article {_id: reference})
            MERGE (refArticle)-[:CITES]->(referredArticle)
            """

            tx.run(query, references_batch=references_batch)

        # Connect to Neo4j
        driver = GraphDatabase.driver(uri, auth=(username, password))
        
        # Start a session and process the data in batches
        with driver.session() as session:
            if author_lists:
                session.execute_write(send_batch_author, author_lists)
            if references_lists:
                session.execute_write(send_batch_ref, references_lists)

        # Close the driver
        driver.close()

    # Usage
    filename = 'dblpv13.json'
    neo4j_uri = "bolt://localhost:7687"
    neo4j_user = 'neo4j'
    neo4j_password = 'testtest'
    BATCH_SIZE = 10000
    TOTAL_ARTICLES = 5354309

    # Neo4j cleanup and optimization
    neo4j_startup(neo4j_uri, neo4j_user, neo4j_password)

    # Parse JSON file and get a generator of cleaned data
    cleaned_data_generator = get_cleaned_data(filename)
    
    # Estimate the total number of batches for the progress bar
    total_batches = TOTAL_ARTICLES // BATCH_SIZE
    if TOTAL_ARTICLES % BATCH_SIZE != 0:
        total_batches += 1  # Account for a partially-filled final batch

    # Create the generator
    article_batches_generator = parse_ijson_object(cleaned_data_generator, BATCH_SIZE)

    # Wrap the generator with tqdm, specifying the total number of batches
    article_batches_generator = tqdm(article_batches_generator, total=total_batches)

    # Loop through all the batches from the generator with a progress bar
    for articles_authors_batch, articles_references_batch in article_batches_generator:
        #print(articles_references_batch)
        send_data_to_neo4j(neo4j_uri, neo4j_user, neo4j_password, articles_authors_batch, articles_references_batch)

if __name__ == "__main__":
    # run main
    #cProfile.run('main()', filename='profile_output.pstats')
    main()



  0%|          | 0/536 [00:00<?, ?it/s]

Transaction failed and will be retried in 1.0412633066474275s (Couldn't connect to localhost:7687 (resolved to ()):
Failed to establish connection to ResolvedIPv6Address(('::1', 7687, 0, 0)) (reason [WinError 10061] Aucune connexion n’a pu être établie car l’ordinateur cible l’a expressément refusée)
Failed to establish connection to ResolvedIPv4Address(('127.0.0.1', 7687)) (reason [WinError 10061] Aucune connexion n’a pu être établie car l’ordinateur cible l’a expressément refusée))
Transaction failed and will be retried in 2.372332866952515s (Couldn't connect to localhost:7687 (resolved to ()):
Failed to establish connection to ResolvedIPv6Address(('::1', 7687, 0, 0)) (reason [WinError 10061] Aucune connexion n’a pu être établie car l’ordinateur cible l’a expressément refusée)
Failed to establish connection to ResolvedIPv4Address(('127.0.0.1', 7687)) (reason [WinError 10061] Aucune connexion n’a pu être établie car l’ordinateur cible l’a expressément refusée))
Transaction failed and 