-
Notifications
You must be signed in to change notification settings - Fork 773
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Solution: parallelize the validation of transactions. This patch adds a new flag for the `start` command, namely `--experimental-parallel-validation`, that can be used to enable this experimental feature.
- Loading branch information
Showing
4 changed files
with
273 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
# Copyright BigchainDB GmbH and BigchainDB contributors | ||
# SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0) | ||
# Code is Apache-2.0 and docs are CC-BY-4.0 | ||
|
||
import multiprocessing as mp | ||
from collections import defaultdict | ||
|
||
from abci.types_pb2 import ResponseCheckTx, ResponseDeliverTx | ||
|
||
from bigchaindb import BigchainDB, App | ||
from bigchaindb.tendermint_utils import decode_transaction | ||
|
||
|
||
CodeTypeOk = 0 | ||
|
||
|
||
class ParallelValidationApp(App): | ||
def __init__(self, bigchaindb=None, events_queue=None): | ||
super().__init__(bigchaindb, events_queue) | ||
self.parallel_validator = ParallelValidator() | ||
self.parallel_validator.start() | ||
|
||
def check_tx(self, raw_transaction): | ||
return ResponseCheckTx(code=CodeTypeOk) | ||
|
||
def deliver_tx(self, raw_transaction): | ||
self.parallel_validator.validate(raw_transaction) | ||
return ResponseDeliverTx(code=CodeTypeOk) | ||
|
||
def end_block(self, request_end_block): | ||
result = self.parallel_validator.result(timeout=30) | ||
for transaction in result: | ||
if transaction: | ||
self.block_txn_ids.append(transaction.id) | ||
self.block_transactions.append(transaction) | ||
|
||
return super().end_block(request_end_block) | ||
|
||
|
||
RESET = 'reset' | ||
EXIT = 'exit' | ||
|
||
|
||
class ParallelValidator: | ||
def __init__(self, number_of_workers=mp.cpu_count()): | ||
self.number_of_workers = number_of_workers | ||
self.transaction_index = 0 | ||
self.routing_queues = [mp.Queue() for _ in range(self.number_of_workers)] | ||
self.workers = [] | ||
self.results_queue = mp.Queue() | ||
|
||
def start(self): | ||
for routing_queue in self.routing_queues: | ||
worker = ValidationWorker(routing_queue, self.results_queue) | ||
process = mp.Process(target=worker.run) | ||
process.start() | ||
self.workers.append(process) | ||
|
||
def stop(self): | ||
for routing_queue in self.routing_queues: | ||
routing_queue.put(EXIT) | ||
|
||
def validate(self, raw_transaction): | ||
dict_transaction = decode_transaction(raw_transaction) | ||
index = int(dict_transaction['id'], 16) % self.number_of_workers | ||
self.routing_queues[index].put((self.transaction_index, dict_transaction)) | ||
self.transaction_index += 1 | ||
|
||
def result(self, timeout=None): | ||
result_buffer = [None] * self.transaction_index | ||
for _ in range(self.transaction_index): | ||
index, transaction = self.results_queue.get(timeout=timeout) | ||
result_buffer[index] = transaction | ||
self.transaction_index = 0 | ||
for routing_queue in self.routing_queues: | ||
routing_queue.put(RESET) | ||
return result_buffer | ||
|
||
|
||
class ValidationWorker: | ||
"""Run validation logic in a loop. This Worker is suitable for a Process | ||
life: no thrills, just a queue to get some values, and a queue to return results. | ||
Note that a worker is expected to validate multiple transactions in | ||
multiple rounds, and it needs to keep in memory all transactions already | ||
validated, until a new round starts. To trigger a new round of validation, | ||
a ValidationWorker expects a `RESET` message. To exit the infinite loop the | ||
worker is in, it expects an `EXIT` message. | ||
""" | ||
|
||
def __init__(self, in_queue, results_queue): | ||
self.in_queue = in_queue | ||
self.results_queue = results_queue | ||
self.bigchaindb = BigchainDB() | ||
self.reset() | ||
|
||
def reset(self): | ||
# We need a place to store already validated transactions, | ||
# in case of dependant transactions in the same block. | ||
# `validated_transactions` maps an `asset_id` with the list | ||
# of all other transactions sharing the same asset. | ||
self.validated_transactions = defaultdict(list) | ||
|
||
def validate(self, dict_transaction): | ||
try: | ||
asset_id = dict_transaction['asset']['id'] | ||
except KeyError: | ||
asset_id = dict_transaction['id'] | ||
|
||
transaction = self.bigchaindb.is_valid_transaction( | ||
dict_transaction, | ||
self.validated_transactions[asset_id]) | ||
|
||
if transaction: | ||
self.validated_transactions[asset_id].append(transaction) | ||
return transaction | ||
|
||
def run(self): | ||
while True: | ||
message = self.in_queue.get() | ||
if message == RESET: | ||
self.reset() | ||
elif message == EXIT: | ||
return | ||
else: | ||
index, transaction = message | ||
self.results_queue.put((index, self.validate(transaction))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
# Copyright BigchainDB GmbH and BigchainDB contributors | ||
# SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0) | ||
# Code is Apache-2.0 and docs are CC-BY-4.0 | ||
|
||
import pytest | ||
|
||
from bigchaindb.common.crypto import generate_key_pair | ||
from bigchaindb.models import Transaction | ||
|
||
|
||
pytestmark = pytest.mark.tendermint | ||
|
||
|
||
def generate_create_and_transfer(keypair=None): | ||
if not keypair: | ||
keypair = generate_key_pair() | ||
priv_key, pub_key = keypair | ||
create_tx = Transaction.create([pub_key], [([pub_key], 10)]).sign([priv_key]) | ||
transfer_tx = Transaction.transfer( | ||
create_tx.to_inputs(), | ||
[([pub_key], 10)], | ||
asset_id=create_tx.id).sign([priv_key]) | ||
return create_tx, transfer_tx | ||
|
||
|
||
def test_validation_worker_process_multiple_transactions(b): | ||
import multiprocessing as mp | ||
from bigchaindb.parallel_validation import ValidationWorker, RESET, EXIT | ||
|
||
keypair = generate_key_pair() | ||
create_tx, transfer_tx = generate_create_and_transfer(keypair) | ||
double_spend = Transaction.transfer( | ||
create_tx.to_inputs(), | ||
[([keypair.public_key], 10)], | ||
asset_id=create_tx.id).sign([keypair.private_key]) | ||
|
||
in_queue, results_queue = mp.Queue(), mp.Queue() | ||
vw = ValidationWorker(in_queue, results_queue) | ||
|
||
# Note: in the following instructions, the worker will encounter two | ||
# `RESET` messages, and an `EXIT` message. When a worker processes a | ||
# `RESET` message, it forgets all transactions it has validated. This allow | ||
# us to re-validate the same transactions. This won't happen in real life, | ||
# but it's quite handy to check if the worker actually forgot about the | ||
# past transactions (if not, it will return `False` because the | ||
# transactions look like a double spend). | ||
# `EXIT` makes the worker to stop the infinite loop. | ||
in_queue.put((0, create_tx.to_dict())) | ||
in_queue.put((10, transfer_tx.to_dict())) | ||
in_queue.put((20, double_spend.to_dict())) | ||
in_queue.put(RESET) | ||
in_queue.put((0, create_tx.to_dict())) | ||
in_queue.put((5, transfer_tx.to_dict())) | ||
in_queue.put(RESET) | ||
in_queue.put((20, create_tx.to_dict())) | ||
in_queue.put((25, double_spend.to_dict())) | ||
in_queue.put((30, transfer_tx.to_dict())) | ||
in_queue.put(EXIT) | ||
|
||
vw.run() | ||
|
||
assert results_queue.get() == (0, create_tx) | ||
assert results_queue.get() == (10, transfer_tx) | ||
assert results_queue.get() == (20, False) | ||
assert results_queue.get() == (0, create_tx) | ||
assert results_queue.get() == (5, transfer_tx) | ||
assert results_queue.get() == (20, create_tx) | ||
assert results_queue.get() == (25, double_spend) | ||
assert results_queue.get() == (30, False) | ||
|
||
|
||
def test_parallel_validator_routes_transactions_correctly(b, monkeypatch): | ||
import os | ||
from collections import defaultdict | ||
import multiprocessing as mp | ||
from json import dumps | ||
from bigchaindb.parallel_validation import ParallelValidator | ||
|
||
# We want to make sure that the load is distributed across all workers. | ||
# Since introspection on an object running on a different process is | ||
# difficult, we create an additional queue where every worker can emit its | ||
# PID every time validation is called. | ||
validation_called_by = mp.Queue() | ||
|
||
# Validate is now a passthrough, and every time it is called it will emit | ||
# the PID of its worker to the designated queue. | ||
def validate(self, dict_transaction): | ||
validation_called_by.put((os.getpid(), dict_transaction['id'])) | ||
return dict_transaction | ||
|
||
monkeypatch.setattr( | ||
'bigchaindb.parallel_validation.ValidationWorker.validate', | ||
validate) | ||
|
||
# Transaction routing uses the `id` of the transaction. This test strips | ||
# down a transaction to just its `id`. We have two workers, so even ids | ||
# will be processed by one worker, odd ids by the other. | ||
transactions = [{'id': '0'}, {'id': '1'}, {'id': '2'}, {'id': '3'}] | ||
|
||
pv = ParallelValidator(number_of_workers=2) | ||
pv.start() | ||
|
||
# ParallelValidator is instantiated once, and then used several times. | ||
# Here we simulate this scenario by running it an arbitrary number of | ||
# times. | ||
# Note that the `ParallelValidator.result` call resets the object, and | ||
# makes it ready to validate a new set of transactions. | ||
for _ in range(2): | ||
# First, we push the transactions to the parallel validator instance | ||
for transaction in transactions: | ||
pv.validate(dumps(transaction).encode('utf8')) | ||
|
||
assert pv.result(timeout=1) == transactions | ||
|
||
# Now we analize the transaction processed by the workers | ||
worker_to_transactions = defaultdict(list) | ||
for _ in transactions: | ||
worker_pid, transaction_id = validation_called_by.get() | ||
worker_to_transactions[worker_pid].append(transaction_id) | ||
|
||
# The transactions are stored in two buckets. | ||
for _, transaction_ids in worker_to_transactions.items(): | ||
assert len(transaction_ids) == 2 | ||
|
||
# We have two workers, hence we have two different routes for | ||
# transactions. We have the route for even transactions, and the | ||
# route for odd transactions. Since we don't know which worker | ||
# processed what, we test that the transactions processed by a | ||
# worker are all even or all odd. | ||
assert (all(filter(lambda x: int(x) % 2 == 0, transaction_ids)) or | ||
all(filter(lambda x: int(x) % 2 == 1, transaction_ids))) | ||
|
||
pv.stop() |