# Stress Test Neo4j

Here I've done some rudimentary stress testing of Neo4j. 

1. Run container
2. Create fake dataset
3. Start logging container usage
4. Load dataset
5. Run queries
6. Clean up
7. Analyze logs

In [None]:
import docker
import os
import asyncio
from multiprocessing import Process
import json
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime
import time

from graph_database_stress_testing.dataset.generate_dataset import generate_dataset
from graph_database_stress_testing.neo4j.load import load_dataset
from graph_database_stress_testing.utilities.configuration import get_config
from graph_database_stress_testing.utilities.statistics import log_stats, cpu_percentage_system, cpu_percentage_allocated
from graph_database_stress_testing.neo4j.query import query, concurrent_query

latest_log_path = None
conf_path = './conf/conf.yaml'


def log_path(base_path):
    path = f'{base_path}neo4j-stats-{datetime.now()}/'
    return path


def stamp_time(label, latest_log_path):
    with open(latest_log_path+'timestamps.csv', 'a') as f:
        f.write(f'{label},{str(datetime.utcnow())}\n')


URI = get_config(conf_path)['neo4j']['uri']
USER = get_config(conf_path)['neo4j']['user']
PASSWORD = get_config(conf_path)['neo4j']['password']
CPUS = get_config(conf_path)['docker']['cpus']
MEMORY = get_config(conf_path)['docker']['memory']
LOG_DIR = get_config(conf_path)['logging']['log_dir']
DATA_PATH = get_config(conf_path)['data']['path']

## Set up container and logging

In [None]:
client = docker.from_env()
try:
    container = client.containers.get('neo4j')
    container.stop()
    container.remove()
except:
    pass

container = client.containers.run(
    'neo4j', volumes=[os.path.abspath(DATA_PATH)+':/var/lib/neo4j/import'],
    ports={'7474/tcp': 7474, '7687/tcp': 7687},
    detach=True, nano_cpus=CPUS * 10 ** 9, mem_limit=MEMORY,
    name='neo4j', environment=['NEO4J_AUTH=none'])
time.sleep(2)

In [None]:
generate_dataset(conf_path, seed=0)

In [None]:
latest_log_path = log_path(LOG_DIR)
os.mkdir(latest_log_path)

In [None]:
stats_logging = Process(target=log_stats, args=(container.id, latest_log_path))
stats_logging.start()
stamp_time('Started logging stats', latest_log_path)
time.sleep(2)

In [None]:
stamp_time('load dataset start', latest_log_path)
load_dataset(conf_path)
stamp_time('load dataset end', latest_log_path)
time.sleep(2)

## Run queries

In [None]:
simple_queries = [
    'MATCH (n) RETURN n as _10results LIMIT 10',
    'MATCH (n) RETURN n as _100results LIMIT 100',
    'MATCH (n) RETURN n as _1000results LIMIT 1000 ',
    'match(n:Person)-[r:LIVES_AT]->(m:Address) return count(r) as countLivesAt',
    'match(n:Person)-[r:WORKS_FOR]->(m:Company) return count(r) as countWorksFor',
]


stamp_time('Start simple queries', latest_log_path)
tasks = []
for q in simple_queries:
    tasks.append(concurrent_query(URI, USER, PASSWORD, q))
await asyncio.gather(*tasks)
stamp_time('End simple queries', latest_log_path)
time.sleep(2)


stamp_time('Start simple queries * 50', latest_log_path)
tasks = []
for q in simple_queries * 50:
    tasks.append(concurrent_query(URI, USER, PASSWORD, q))
await asyncio.gather(*tasks)
stamp_time('End simple queries * 50', latest_log_path)
time.sleep(2)

In [None]:
deeper_queries = [
    '''
    MATCH p=(n)-[* ..10]-(m)
    WHERE n.id = 'person3029' AND m.id = 'person4939'
    RETURN p, length(p)
    ORDER BY length(p) DESC
    LIMIT 1
    ''',
    '''
    MATCH p=(n)-[* ..20]-(m)
    WHERE n.id = 'person3029' AND m.id = 'person4939'
    RETURN p, length(p)
    ORDER BY length(p) DESC
    LIMIT 1
    ''',
    '''
    MATCH (a:Address)--(n)
    WITH a, count(n) AS degree
    ORDER BY degree desc
    LIMIT 2
    WITH collect(a)[0] AS a0, collect(a)[1] AS a1
    MATCH p = (a0)-[*..5]-(a1)
    RETURN count(p)
    ''',
    '''
    MATCH (a:Address)--(n)
    WITH a, count(n) AS degree
    ORDER BY degree desc
    LIMIT 2
    WITH collect(a)[0] AS a0, collect(a)[1] AS a1
    MATCH p = (a0)-[*..10]-(a1)
    RETURN count(p)
    ''',
    '''
    MATCH (a:Address)--(n)
    WITH a, count(n) AS degree
    ORDER BY degree desc
    LIMIT 2
    WITH collect(a)[0] AS a0, collect(a)[1] AS a1
    MATCH p = (a0)-[*..20]-(a1)
    RETURN count(p)
    ''',
]


stamp_time('Start deep queries', latest_log_path)
tasks = []
for q in deeper_queries:
    tasks.append(concurrent_query(URI, USER, PASSWORD, q))
await asyncio.gather(*tasks)
stamp_time('End deep queries', latest_log_path)
time.sleep(2)


stamp_time('Start deep queries * 50', latest_log_path)
tasks = []
for q in deeper_queries * 50:
    tasks.append(concurrent_query(URI, USER, PASSWORD, q))
await asyncio.gather(*tasks)
stamp_time('End deep queries * 50', latest_log_path)
time.sleep(2)

In [None]:
stamp_time('Stopped Logging Stats', latest_log_path)
stats_logging.terminate()
stats_logging.join()

## Visualise results

In [None]:
from datetime import timedelta
from graph_database_stress_testing.utilities.statistics import visualise_results

latest_log_path = '/Users/ll/wk/graph_database_stress_testing/logs/neo4j-stats-2023-08-21 16:19:36.287002/'
stats = []
with open(latest_log_path + 'stats.jsonl') as f:
    next(f)
    for line in f:
        raw = json.loads(line)
        stats.append(
            {
                'Datetime': raw['read'],
                'cpu %': cpu_percentage_allocated(raw, 10**9),
                'mem (gb)': raw['memory_stats']['usage'] / 10**9
            }
        )
stats_df = pd.DataFrame(stats)

timestamps = pd.read_csv(latest_log_path + 'timestamps.csv', header=None)
timestamps['label'] = timestamps[0]
timestamps['Datetime'] = pd.to_datetime(timestamps[1])

fig = visualise_results(stats_df['cpu %'], stats_df['mem (gb)'],
                        stats_df['Datetime'], timestamps['Datetime'], timestamps['label'])

In [None]:
fig.show()
fig.write_html(latest_log_path + 'summary.html')