Skip to content

Commit

Permalink
Refactor performance load test with block streaming
Browse files Browse the repository at this point in the history
- InfluxDB hash field

Signed-off-by: Dumitru <dimasavva17@gmail.com>
  • Loading branch information
x3medima17 authored and lebdron committed Aug 28, 2019
1 parent 355d420 commit 97ec9cf
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 29 deletions.
2 changes: 1 addition & 1 deletion test/load/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ EXPOSE 8089 5557 5558

ENTRYPOINT ["/usr/local/bin/locust"]

RUN pip install grpcio-tools iroha influxdb
RUN pip install grpcio==1.19.0 grpcio-tools==1.19.0 iroha influxdb
12 changes: 8 additions & 4 deletions test/load/common/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def __init__(self):
def hatch_complete(self, user_count, **kw):
self._user_count = user_count

def request_success(self, request_type, name, response_time, response_length, **kw):
def request_success(self, request_type, name, response_time, response_length, tx_hash=None, sent=None, committed=None, **kw):
now = datetime.now().isoformat()
points = [{
"measurement": "request_success_duration",
Expand All @@ -25,7 +25,10 @@ def request_success(self, request_type, name, response_time, response_length, **
},
"time": now,
"fields": {
"value": response_time
"value": response_time,
"tx_hash": tx_hash,
"sent": sent,
"committed": committed
}
},
{
Expand All @@ -37,7 +40,7 @@ def request_success(self, request_type, name, response_time, response_length, **
}]
self._client.write_points(points)

def request_failure(self, request_type, name, response_time, exception, **kw):
def request_failure(self, request_type, name, response_time, exception, tx_hash=None, **kw):
now = datetime.now().isoformat()
points = [{
"measurement": "request_failure_duration",
Expand All @@ -47,7 +50,8 @@ def request_failure(self, request_type, name, response_time, exception, **kw):
},
"time": now,
"fields": {
"value": response_time
"value": response_time,
"tx_hash": tx_hash
}
},
{
Expand Down
2 changes: 2 additions & 0 deletions test/load/docker-compose-graphing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ services:
INFLUXDB_UDP_ENABLED: "true"
INFLUXDB_UDP_BIND_ADDRESS: ":4444"
INFLUXDB_UDP_DATABASE: "influxdb"
ports:
- 8086:8086
volumes:
- influxdb_data:/var/lib/influxdb
grafana:
Expand Down
8 changes: 5 additions & 3 deletions test/load/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,22 @@ x-common: &common
image: iroha-locust
env_file: config.env
volumes:
- ./:/tests
- './:/tests'
entrypoint:
- /tests/docker_start.sh


services:
locust-master:
<<: *common
ports:
- 8089:8089
- '8089:8089'
environment:
LOCUST_MODE: master

locust-slave:
<<: *common
environment:
LOCUST_MODE: slave
LOCUST_MASTER_HOST: locust-master


88 changes: 67 additions & 21 deletions test/load/locustfile-performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,41 +10,76 @@
import grpc
from iroha import Iroha, IrohaGrpc
from iroha import IrohaCrypto as ic

import iroha
import common.writer

import random
import gevent


HOSTNAME = os.environ['HOSTNAME']
ADMIN_PRIVATE_KEY = 'f101537e319568c765b2cc89698325604991dca57b9716b58016b253506cab70'

TXS = dict() # hash -> sent time
COMMITTED = set()
SENT = set()
BLOCKS = set()


def ascii_hash(tx):
return binascii.hexlify(ic.hash(tx)).decode('ascii')

class IrohaClient(IrohaGrpc):
"""
Simple, sample Iroha gRPC client implementation that wraps IrohaGrpc and
fires locust events on request_success and request_failure, so that all requests
Simple, sample Iroha gRPC client implementation that wraps IrohaGrpc and
fires locust events on request_success and request_failure, so that all requests
gets tracked in locust's statistics.
"""
def send_tx_await(self, transaction):
def send_tx_wait(self, transaction):
"""
Send a transaction to Iroha and wait for the final status to be reported in status stream
Send a transaction to Iroha if there are few transactions in the queue to be committed
:param transaction: protobuf Transaction
:return: None
"""
while len(SENT) - len(COMMITTED) > 100:
time.sleep(0.01)

hex_hash = ascii_hash(transaction)
start_time = time.time()

try:
tx_future = self._command_service_stub.Torii.future(transaction)
tx_status = 'NOT_RECEIVED'
while tx_status not in ['COMMITTED', 'REJECTED']:
for status in self.tx_status_stream(transaction):
tx_status = status[0]
self.send_tx(transaction)
SENT.add(hex_hash)
TXS[hex_hash] = start_time
except grpc.RpcError as e:
total_time = int((time.time() - start_time) * 1000)
events.request_failure.fire(request_type="grpc", name='send_tx_await', response_time=total_time, exception=e)
else:
total_time = int((time.time() - start_time) * 1000)
events.request_success.fire(request_type="grpc", name='send_tx_await', response_time=total_time, response_length=0)
# In this example, I've hardcoded response_length=0. If we would want the response length to be
# reported correctly in the statistics, we would probably need to hook in at a lower level
events.request_failure.fire(request_type="grpc", name='send_tx_wait', response_time=total_time, exception=e, tx_hash=hex_hash)


def block_listener(host):
iroha_api = iroha.Iroha("admin@test")
net = IrohaGrpc(host)
query = iroha_api.blocks_query()
ic.sign_query(query, ADMIN_PRIVATE_KEY)
print("Listeting blocks")
for block in net.send_blocks_stream_query(query):
BLOCKS.add(block.block_response.block.block_v1.payload.height)
hashes = block.block_response.block.block_v1.payload.rejected_transactions_hashes
txs = block.block_response.block.block_v1.payload.transactions
for tx in txs:
hashes.append(ascii_hash(tx))

for hash in hashes:
if hash not in TXS.keys():
continue
start_time = TXS[hash]
COMMITTED.add(hash)
del TXS[hash]
total_time = int((time.time() - start_time) * 1000)
try:
events.request_success.fire(request_type="grpc", name='send_tx_wait', response_time=total_time, response_length=0, tx_hash=hash, sent=start_time, committed=time.time())
except Exception as e:
print(e)

class IrohaLocust(Locust):
"""
Expand All @@ -54,22 +89,33 @@ class IrohaLocust(Locust):
def __init__(self, *args, **kwargs):
super(IrohaLocust, self).__init__(*args, **kwargs)
self.client = IrohaClient(self.host)
gevent.spawn(block_listener, self.host)


class ApiUser(IrohaLocust):

host = "127.0.0.1:50051"
min_wait = 100
min_wait = 1000
max_wait = 1000

class task_set(TaskSet):
@task
def send_tx(self):
print("Locust instance (%r) executing my_task" % (self.locust))
print("""
\n
Sent: {}
Committed: {}
Diff: {}
Blocks: {}\n
""".format(len(SENT), len(COMMITTED), len(SENT) - len(COMMITTED), len(BLOCKS)))
iroha = Iroha('admin@test')

desc = str(random.random())
tx = iroha.transaction([iroha.command(
'TransferAsset', src_account_id='admin@test', dest_account_id='test@test', asset_id='coin#test',
amount='0.01', description=HOSTNAME
amount='0.01', description=desc
)])

ic.sign_transaction(tx, ADMIN_PRIVATE_KEY)
self.client.send_tx_await(tx)
self.client.send_tx_wait(tx)

0 comments on commit 97ec9cf

Please sign in to comment.