In [22]:
import asyncio
import dateutil
import logging
import json
import os
import random
import shutil
import sys
import time
import uuid
import zipfile

import codetiming
import pandas as pd
import plotly.express as px
import urllib3

logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = logging.getLogger('soak-test')

In [2]:
import plotly.io as pio
pio.renderers.default = "notebook"

In [3]:
server_host = 'localhost'
server_port = 8888
username = 'admin'
password = 'admin'
cpb = '../cpbs/calculator/build/libs/calculator-5.0.0.0-SNAPSHOT-package.cpb'

nr_vnodes = 1000
vnode_x500_tmpl = "CN=Alice-{}, OU=Application, O=R3, L=London, C=GB"

cpb_file = os.path.abspath(os.path.join(os.getcwd(), cpb))

In [4]:
# create CPI
with open('GroupPolicy.json', 'w') as f:
    f.write('{ "groupId": "test-group" }')

cpi = 'test.cpi'
shutil.copy(cpb_file, cpi)

with zipfile.ZipFile(cpi, 'a') as zf:
    zf.write('GroupPolicy.json', 'GroupPolicy.json')

In [5]:
http = urllib3.HTTPSConnectionPool(
    server_host, 
    port=server_port, 
    headers=urllib3.make_headers(basic_auth="{}:{}".format(username, password)),
    cert_reqs='CERT_NONE',
    assert_hostname=False)
urllib3.disable_warnings()

In [6]:
# CPI upload
logger.info("Uploading CPI: {}".format(cpi))
with codetiming.Timer(name="CPI Upload") as t:
    with open(cpi, 'rb') as f:
        resp = http.request(
            'POST',
            '/api/v1/cpi/',
            fields= {
                "file": (os.path.basename(cpi), f.read()),
            }
        )

    request_id = json.loads(resp.data.decode("utf-8"))['id']
    logger.debug("CPI upload REQUEST ID: {}".format(request_id))

    checksum = ""
    while checksum == "":
        time.sleep(2)
        resp = http.request(
            'GET',
            "/api/v1/cpi/status/{}".format(request_id),
        )

        if resp.status >= 500:
            sys.exit("CPI Upload failed: {}".format(resp.data.decode("utf-8")))

        resp_json = json.loads(resp.data.decode("utf-8"))
        checksum = resp_json['checksum']
        status = resp_json['status']
        print("CPI Upload status: {}".format(status))

logger.info("CPI ID: {} uploaded in {}s".format(checksum, t.last))

INFO:soak-test:Uploading CPI: test.cpi
CPI Upload status: OK
Elapsed time: 2.9412 seconds
INFO:soak-test:CPI ID: 2A5238BD6260 uploaded in 2.9412405019999994s


In [7]:
# List all CPIs
resp = http.request(
        'GET',
        "/api/v1/cpi/",
    )
resp_json = json.loads(resp.data.decode("utf-8"))
first_cpi = resp_json['cpis'][0]
logger.info("CPI meta: {}".format(first_cpi))

INFO:soak-test:CPI meta: {'id': {'cpiName': 'calculator', 'cpiVersion': '5.0.0.0-SNAPSHOT', 'signerSummaryHash': 'null'}, 'fileChecksum': '2A5238BD6260480B4DDEE273F60DC28F7AABC3648247C2687D12FC4C2D0C7B4C', 'cpks': [{'id': {'name': 'net.corda.calculator', 'version': '5.0.0.0-SNAPSHOT', 'signerSummaryHash': 'SHA-256:DE109156623F82935E7C70F1A5944A53B4941682D624D6E7C8E762BCB72C319E'}, 'mainBundle': 'calculator-5.0.0.0-SNAPSHOT.jar', 'libraries': [], 'dependencies': [], 'type': 'UNKNOWN', 'hash': 'SHA-256:0F8AB1B613CBEACD3509DB5463F7DF79495E04DAC9D7FD67FB4B6736AE9A45A8'}], 'groupPolicy': '{ "groupId": "test-group" }'}


In [8]:
# Create VNodes
nodes = {}
for n in range(nr_vnodes):
    vnode_x500 = vnode_x500_tmpl.format(n)
    with codetiming.Timer(name="Create {}".format(vnode_x500), logger=None) as t:
        resp = http.request(
            'POST',
            '/api/v1/virtualnode',
            body= json.dumps({
                "request": {
                    "cpiFileChecksum": first_cpi['fileChecksum'],
                    "x500Name": vnode_x500
                }
            })
        )

        if resp.status >= 400:
            sys.exit("Creating {} failed: {}".format(vnode_x500, resp.data.decode("utf-8")))
    
    nodes[vnode_x500] = t.last
    logger.debug("Created: {} in {}".format(vnode_x500, t.last))
logger.info("Created {} vnodes in {}s".format(len(nodes), sum(nodes.values())))

INFO:soak-test:Created 1000 vnodes in 373.04565278400025s


In [13]:
df = pd.DataFrame(nodes.items(), columns=['VNode', 'Duration'])
fig = px.scatter(df, x='VNode', y='Duration')
fig.show()
df.to_csv('vnode_creation.csv', index=False)

In [10]:
# List all vnodes
resp = http.request(
        'GET',
        "/api/v1/virtualnode/",
    )
nodes_meta = json.loads(resp.data.decode("utf-8"))
# print("VNodes: {}".format(nodes_meta))
logger.info(nodes_meta["virtualNodes"][0])

INFO:soak-test:{'holdingIdentity': {'x500Name': 'CN=Alice-0, OU=Application, O=R3, L=London, C=GB', 'groupId': 'test-group', 'hash': '3EAC013DC7BB8527BD7A324515722B6B932A5EC9ACBB9ACA5439391840D3DC44', 'id': '3EAC013DC7BB'}, 'cpiIdentifier': {'name': 'calculator', 'version': '5.0.0.0-SNAPSHOT', 'signerSummaryHash': None}, 'vaultDdlConnectionId': '6d1a255b-7408-4d25-891e-da2d6707251d', 'vaultDmlConnectionId': 'b9321d6e-bed5-408a-9695-d7167d8a91e6', 'cryptoDdlConnectionId': '378b4fa8-59f2-4192-8cb2-604fe4f88509', 'cryptoDmlConnectionId': '317fe6e1-2517-40c8-b86c-8ec3238b8617', 'hsmConnectionId': None}


In [24]:
async def execute_flow(vnode_x500, i, vnode_holding_id):
    request_id = uuid.uuid4().hex
    with codetiming.Timer(name="Flow {} {}/{}".format(vnode_x500, i, flow_execution_range), logger=None) as t:
        flow_payload = '{ "a":' + str(random.randint(0,9999)) + ', "b":' + str(random.randint(0,9999)) + ' }' #yuk!
        url = '/api/v1/flow/{}/{}/net.corda.testing.calculator.CalculatorFlow'.format(vnode_holding_id, request_id)
        resp = http.request(
            'PUT',
            url,
            body= json.dumps({
                "requestBody": flow_payload
            })
        )

        if resp.status >= 400:
            sys.exit("Starting flow {} failed: {}".format(url, resp.data.decode("utf-8")))

        flow_start_resp = json.loads(resp.data.decode("utf-8"))
        flow_started = dateutil.parser.parse(flow_start_resp['flowStatus']['timestamp'])

        flow_status = ""
        flow_error = None
        while flow_status != "COMPLETED" and not flow_error:
            time.sleep(0.1)
            resp = http.request(
                'GET',
                "/api/v1/flow/{}/{}".format(vnode_holding_id, request_id),
            )

            if resp.status >= 500:
                sys.exit("Flow failed: {}".format(resp.data.decode("utf-8")))

            flow_status_resp = json.loads(resp.data.decode("utf-8"))
            flow_status = flow_status_resp['flowStatus']
            flow_error = flow_status_resp['flowError']
        
        flow_ended = dateutil.parser.parse(flow_status_resp['timestamp'])
        flow_execution_duration = flow_ended - flow_started

        return (vnode_x500, i, flow_status_resp['flowError'] is None, flow_execution_duration.total_seconds(), t.last)

In [27]:
# execute flows
# for each node, execute calculation flow 3 times
# for now, everything is sequential
# Create VNodes
flow_execution = {
    'vnode': [],
    'n': [],
    'success': [],
    'execution_time': [],
    'test_time': []
}
flow_execution_range = 5
lock = asyncio.Lock()

# for n in range(len(nodes_meta["virtualNodes"])):
for n in range(3):
    vnode = nodes_meta['virtualNodes'][n]
    vnode_x500 = vnode['holdingIdentity']['x500Name']
    vnode_holding_id = vnode['holdingIdentity']['id']
    for i in range(flow_execution_range):
        
        result = await execute_flow(vnode_x500, i, vnode_holding_id)
        # async with lock:
        flow_execution['vnode'].append(result[0])
        flow_execution['n'].append(result[1])
        flow_execution['success'].append(result[2])
        flow_execution['execution_time'].append(result[3])
        flow_execution['test_time'].append(result[4])

        logger.debug("Executed flow {}/{} for: {} in {}".format(i, flow_execution_range, vnode_x500, t.last))
logger.info("Executed {} flows in {}s with {} errors".format(
    len(flow_execution['vnode']), sum(flow_execution['execution_time']), len([r for r in flow_execution['success'] if r == False])))

INFO:soak-test:Executed 15 flows in 3.044366s with 0 errors


In [None]:
# execute flows
# for each node, execute calculation flow 3 times
# for now, everything is sequential
# Create VNodes
flow_execution = {
    'vnode': [],
    'n': [],
    'success': [],
    'execution_time': [],
    'test_time': []
}
flow_execution_range = 5
lock = asyncio.Lock()

# for n in range(len(nodes_meta["virtualNodes"])):
for n in range(3):
    vnode = nodes_meta['virtualNodes'][n]
    vnode_x500 = vnode['holdingIdentity']['x500Name']
    vnode_holding_id = vnode['holdingIdentity']['id']
    for i in range(flow_execution_range):
        
        result = execute_flow(vnode_x500, i, vnode_holding_id)
        # async with lock:
        flow_execution['vnode'].append(result[0])
        flow_execution['n'].append(result[1])
        flow_execution['success'].append(result[2])
        flow_execution['execution_time'].append(result[3])
        flow_execution['test_time'].append(result[4])

        logger.debug("Executed flow {}/{} for: {} in {}".format(i, flow_execution_range, vnode_x500, t.last))
logger.info("Executed {} flows in {}s with {} errors".format(
    len(flow_execution['vnode']), sum(flow_execution['execution_time']), len([r for r in flow_execution['success'] if r == False])))

INFO:soak-test:Executed 15 flows in 4.9760230000000005s with 0 errors


In [17]:
df = pd.DataFrame.from_dict(flow_execution)
fig = px.scatter(df, x='vnode', y='execution_time', color='success', hover_data=['vnode', 'n'])
fig.show()
df.to_csv('flow_execution.csv', index=False)