In [26]:
import re
import asyncio
from typing import Callable, Optional, Tuple
import datetime



async def enqueuer(s: asyncio.streams.StreamReader, *queues: asyncio.Queue[str | None]):
    async for line in s:
        decoded_line = line.decode('utf-8')
        for q in queues:
            q.put_nowait(decoded_line)

    for q in queues:
        await q.join()
        q.put_nowait(None)
        res = await q.get()
        assert(res is None)
        q.task_done()


async def printer(q: asyncio.Queue[str | None]):
    while (line := await q.get()) is not None:
        print(line)
        q.task_done()
    q.put_nowait(None)


print_q = asyncio.Queue()
asyncio.create_task(printer(print_q))


event_sniffer_q = asyncio.Queue()

begin_t: Optional[datetime.datetime] = None
end_t: Optional[datetime.datetime] = None


proc = await asyncio.create_subprocess_shell(
    "../dist/k6 run ../perf-test/k6/scripts/insert.js",
    env={
        "K6_DURATION": '5s'
    },
    stdout=asyncio.subprocess.PIPE,
    stderr=asyncio.subprocess.PIPE,
)

cmd_stdout = proc.stdout
cmd_stderr = proc.stderr
assert(cmd_stdout)
assert(cmd_stderr)

async def events_sniffer(
    q: asyncio.Queue[str | None],
    *callbacks: Tuple[ re.Pattern, Callable[[re.Match], None] ]
):
    while (line := await q.get()) is not None:
        for (p, fn) in callbacks:
            if (m := p.match(line)):
                fn(m)

        q.task_done()
    q.put_nowait(None)

def begin_setter(_):
    global begin_t
    begin_t = datetime.datetime.now()

def end_setter(_):
    global end_t
    end_t = datetime.datetime.now()

asyncio.create_task(events_sniffer(event_sniffer_q,
    (re.compile(r".*STARTING NOW.*"), begin_setter),
    (re.compile(r".*ENDING NOW.*"), end_setter),
))

queues = [
    print_q,
    event_sniffer_q
]

asyncio.create_task(enqueuer(cmd_stdout, *queues))
asyncio.create_task(enqueuer(cmd_stderr, *queues))

await proc.wait()
await asyncio.gather(*[q.join() for q in queues])

print('cmd output digesting done')
print(f'cmd was doing something from {begin_t} to {end_t}')




          /\      |‾‾| /‾‾/   /‾‾/   

     /\  /  \     |  |/  /   /  /    

    /  \/    \    |     (   /   ‾‾\  

   /          \   |  |\  \ |  (‾)  | 

  / __________ \  |__| \__\ \_____/ .io



  execution: local

     script: ../perf-test/k6/scripts/insert.js

     output: -



  scenarios: (100.00%) 1 scenario, 1 max VUs, 35s max duration (incl. graceful stop):

           * default: 1 looping VUs for 5s (gracefulStop: 30s)



time="2024-09-10T23:35:45+03:00" level=info msg="This run's id: f8d482a4-b670-4d06-bdce-6bb8bf912058" source=console

time="2024-09-10T23:35:46+03:00" level=info msg="STARTING NOW" source=console



running (01.0s), 1/1 VUs, 62 complete and 0 interrupted iterations

default   [   9% ] 1 VUs  0.4s/5s



running (02.0s), 1/1 VUs, 168 complete and 0 interrupted iterations

default   [  29% ] 1 VUs  1.4s/5s



running (03.0s), 1/1 VUs, 270 complete and 0 interrupted iterations

default   [  49% ] 1 VUs  2.4s/5s



running (04.0s), 1/1 VUs, 415 complete and 0