# Load tests

> Performance testing for apswutils using different configuration

Plan: 

1. Create a single db load test that uses threading to perform a high volume of writes and some reads to replicate behavior under load by many users. Lock errors will be append to a simple text file called 'lock-errors.txt'
2. Extract that from the cell and feed that into timeit and/or cProfile in another cell

In [None]:
import threading
import random
from pathlib import Path
from string import ascii_letters
from apswutils.db import *
import inspect

In [None]:
db = Database("load.db")

In [None]:
for t in db.tables: t.drop()

In [None]:
users = Table(db, 'Users')
users

<Table Users (does not exist yet)>

In [None]:
users.create(columns=dict(id=int, name=str), transform=True, pk='id')
users

<Table Users (id, name)>

In [None]:
def get_random_name(length=10):
    return ''.join(random.choice(ascii_letters) for _ in range(length))
get_random_name()

'lrGZVyXEEU'

In [None]:
def get_user(id): return users.get(id)
def set_user(id,name): return users.insert({'name':name},pk='id')

In [None]:
def db_worker():
    record = set_user(random.randint(1,1000), get_random_name()).result[0]
    record['type'] = 'write'
    return record

In [None]:
db_worker()

{'id': 11612, 'name': 'nqfVLiFKwl', 'type': 'write'}

In [None]:
def db_worker_batch(size=20):
    for i in range(size): db_worker()

In [None]:
def run_concurrent_workers(n_threads=10):
    print(users.count)
    threads = []
    for _ in range(n_threads):
        t = threading.Thread(target=db_worker_batch)
        threads.append(t)
        t.start()
    
    for t in threads:
        t.join()
    print(users.count)

In [None]:
run_concurrent_workers(5000)

101612
151612
