Skip to content
This repository has been archived by the owner on Jul 1, 2021. It is now read-only.

Commit

Permalink
Remove old multiprocessing manager in favor of custom API
Browse files Browse the repository at this point in the history
  • Loading branch information
ChihChengLiang authored and pipermerriam committed Aug 1, 2019
1 parent 2437583 commit aebe299
Show file tree
Hide file tree
Showing 47 changed files with 1,125 additions and 711 deletions.
76 changes: 76 additions & 0 deletions scripts/benchmark/db_managers/bench_db_client.py
@@ -0,0 +1,76 @@
import pathlib
from trinity.db.manager.manager import (
DBManager,
)
from trinity.db.manager.client import (
SyncDBClient,
)
from eth.db.backends.level import LevelDB

import multiprocessing

import os
import signal
import time
import random

IPC_PATH = pathlib.Path("./tmp.ipc")
DB_PATH = pathlib.Path("./tmp-db/")


def random_bytes(num):
return random.getrandbits(8 * num).to_bytes(num, 'little')


def run_server(ipc_path):
db = LevelDB(db_path=DB_PATH)
manager = DBManager(db)

with manager.run(ipc_path):
try:
manager.wait_stopped()
except KeyboardInterrupt:
pass

ipc_path.unlink()


def run_client(ipc_path, client_id):

key_values = {
random_bytes(32): random_bytes(256)
for i in range(10000)
}

db_client = SyncDBClient.connect(ipc_path)

for _ in range(3):
start = time.perf_counter()
for key, value in key_values.items():
db_client.set(key, value)
db_client.get(key)
end = time.perf_counter()
duration = end - start

num_keys = len(key_values)
print(f"Client {client_id}: {num_keys/duration} get-set per second")


if __name__ == '__main__':
if IPC_PATH.exists():
IPC_PATH.unlink()

server = multiprocessing.Process(target=run_server, args=[IPC_PATH])

clients = [
multiprocessing.Process(target=run_client, args=[IPC_PATH, i])
for i in range(3)
]
server.start()
for client in clients:
client.start()
for client in clients:
client.join(600)

os.kill(server.pid, signal.SIGINT)
server.join(1)
81 changes: 81 additions & 0 deletions scripts/benchmark/db_managers/bench_old_manager.py
@@ -0,0 +1,81 @@
from eth.db.backends.level import LevelDB
import multiprocessing
from trinity.db.base import AsyncDBProxy
from multiprocessing.managers import (
BaseManager,
)
from trinity.db.beacon.manager import (
create_db_consumer_manager,
)

import os
import signal
import pathlib
import time
import random

IPC_PATH = pathlib.Path("./tmp.ipc")
DB_PATH = pathlib.Path("./tmp-db")

DB = LevelDB(db_path=DB_PATH)


def random_bytes(num):
return random.getrandbits(8 * num).to_bytes(num, 'little')


def run_server(ipc_path, db):

class DBManager(BaseManager):
pass

if ipc_path.exists():
ipc_path.unlink()
DBManager.register(
'get_db', callable=lambda: db, proxytype=AsyncDBProxy)
manager = DBManager(address=str(ipc_path))
server = manager.get_server()

try:
server.serve_forever()
print("Exit run server")
except KeyboardInterrupt:
pathlib.Path(ipc_path).unlink()


def run_client(ipc_path, client_id):
key_values = {
random_bytes(32): random_bytes(256)
for i in range(10000)
}

db_manager = create_db_consumer_manager(ipc_path)
db_client = db_manager.get_db()

for _ in range(3):
start = time.perf_counter()

for key, value in key_values.items():
db_client.set(key, value)
db_client.get(key)
end = time.perf_counter()
duration = end - start

num_keys = len(key_values)
print(f"Client {client_id}: {num_keys/duration} get-set per second")


if __name__ == '__main__':
server = multiprocessing.Process(target=run_server, args=[IPC_PATH, DB])
clients = [
multiprocessing.Process(target=run_client, args=[IPC_PATH, i])
for i in range(3)
]
server.start()
for client in clients:
client.start()
for client in clients:
client.join(600)

os.kill(server.pid, signal.SIGINT)
server.join(1)
79 changes: 79 additions & 0 deletions tests-trio/core-trio/database/test_trio_db_client.py
@@ -0,0 +1,79 @@
import pytest
import pytest_trio

import pathlib
import tempfile

from eth.db.atomic import AtomicDB

from trinity.db.manager.manager import (
DBManager,
)
from trinity.db.manager.client import (
TrioDBClient,
)


@pytest.fixture
def ipc_path():
with tempfile.TemporaryDirectory() as dir:
ipc_path = pathlib.Path(dir) / "db_manager.ipc"
yield ipc_path


@pytest.fixture
def db():
return AtomicDB()


@pytest.fixture
def db_manager(db, ipc_path):
manager = DBManager(db)
with manager.run(ipc_path) as running_manager:
yield running_manager


@pytest_trio.fixture
async def db_client(ipc_path, db_manager):
client = await TrioDBClient.connect(ipc_path)
try:
yield client
finally:
await client.aclose()


@pytest.mark.trio
async def test_async_client_read(ipc_path, db_client):
db[b'key'] = b'value'
assert await db_client.get(b'key') == b'value'


@pytest.mark.trio
async def test_atomic_db_set_and_get(db_client):
await db_client.set(b'key-1', b'value-1')
await db_client.set(b'key-2', b'value-2')
assert await db_client.get(b'key-1') == b'value-1'
assert await db_client.get(b'key-2') == b'value-2'


@pytest.mark.trio
async def test_atomic_db_set_and_delete(db, db_client):
db[b'key-1'] = b'origin'

await db_client.delete(b'key-1')
with pytest.raises(KeyError):
db[b'key-1']

with pytest.raises(KeyError):
await db_client.get(b'key-1')

assert not await db_client.exists(b'key-1')


@pytest.mark.trio
async def test_sync_db_client_exists_checking(db_client):
assert await db_client.exists(b'test-exists') is False

await db_client.set(b'test-exists', b'now-it-exists')

assert await db_client.exists(b'test-exists')
@@ -1,6 +1,7 @@
import trio

import pytest
import pytest_trio

from p2p.trio_service import (
background_service,
Expand All @@ -13,7 +14,7 @@
)


@pytest.fixture
@pytest_trio.trio_fixture
async def socket_pair():
sending_socket = trio.socket.socket(
family=trio.socket.AF_INET,
Expand All @@ -29,6 +30,7 @@ async def socket_pair():
return sending_socket, receiving_socket


@pytest.mark.trio
async def test_datagram_receiver(socket_pair):
sending_socket, receiving_socket = socket_pair
receiver_address = receiving_socket.getsockname()
Expand All @@ -47,6 +49,7 @@ async def test_datagram_receiver(socket_pair):
assert received_datagram.sender.port == sender_address[1]


@pytest.mark.trio
async def test_datagram_sender(socket_pair):
sending_socket, receiving_socket = socket_pair
receiver_address = receiving_socket.getsockname()
Expand Down
File renamed without changes.
73 changes: 73 additions & 0 deletions tests/core/database/test_asyncio_db_client.py
@@ -0,0 +1,73 @@
from eth.db.atomic import AtomicDB

import pathlib
import pytest
import tempfile
from trinity.db.manager import (
DBManager,
AsyncioDBCLient
)


@pytest.fixture
def ipc_path():
with tempfile.TemporaryDirectory() as dir:
ipc_path = pathlib.Path(dir) / "db_manager.ipc"
yield ipc_path


@pytest.fixture
def db():
return AtomicDB()


@pytest.fixture
def db_manager(db, ipc_path):
with DBManager(db).run(ipc_path) as manager:
yield manager


@pytest.fixture
async def db_client(ipc_path, db_manager):
client = await AsyncioDBCLient.connect(ipc_path)
try:
yield client
finally:
await client.aclose()


@pytest.mark.asyncio
async def test_async_client_read(db, db_client):
db[b'key'] = b'value'
assert await db_client.get(b'key') == b'value'


@pytest.mark.asyncio
async def test_atomic_db_set_and_get(db_client):
await db_client.set(b'key-1', b'value-1')
await db_client.set(b'key-2', b'value-2')
assert await db_client.get(b'key-1') == b'value-1'
assert await db_client.get(b'key-2') == b'value-2'


@pytest.mark.asyncio
async def test_atomic_db_set_and_delete(db, db_client):
db[b'key-1'] = b'origin'

await db_client.delete(b'key-1')
with pytest.raises(KeyError):
db[b'key-1']

with pytest.raises(KeyError):
await db_client.get(b'key-1')

assert not await db_client.exists(b'key-1')


@pytest.mark.asyncio
async def test_sync_db_client_exists_checking(db_client):
assert await db_client.exists(b'test-exists') is False

await db_client.set(b'test-exists', b'now-it-exists')

assert await db_client.exists(b'test-exists')

0 comments on commit aebe299

Please sign in to comment.